2 from ConfigParser import SafeConfigParser
8 from optparse import OptionParser, OptionGroup
10 from pprint import pprint, pformat
12 from StringIO import StringIO
23 from htsworkflow.util import api
24 from htsworkflow.util.rdfhelp import \
32 from htsworkflow.submission.daf import \
34 MetadataLookupException, \
36 from htsworkflow.submission.condorfastq import CondorFastqExtract
38 logger = logging.getLogger('ucsc_gather')
40 def main(cmdline=None):
41 parser = make_parser()
42 opts, args = parser.parse_args(cmdline)
46 logging.basicConfig(level = logging.DEBUG )
48 logging.basicConfig(level = logging.INFO )
50 logging.basicConfig(level = logging.WARNING )
52 apidata = api.make_auth_from_opts(opts, parser)
54 model = get_model(opts.load_model)
56 mapper = DAFMapper(opts.name, opts.daf, model)
57 if opts.library_url is not None:
58 mapper.library_url = opts.library_url
59 submission_uri = get_submission_uri(opts.name)
62 if opts.load_rdf is not None:
63 if submission_uri is None:
64 parser.error("Please specify the submission name")
65 load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
67 if opts.make_ddf and opts.daf is None:
68 parser.error("Please specify your daf when making ddf files")
70 library_result_map = []
72 library_result_map.extend(read_library_result_map(a))
74 if opts.make_tree_from is not None:
75 make_tree_from(opts.make_tree_from, library_result_map)
78 link_daf(opts.daf, library_result_map)
81 extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
83 extractor.build_fastqs(library_result_map)
85 if opts.scan_submission:
86 scan_submission_dirs(mapper, library_result_map)
89 make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
92 sparql_query(model, opts.sparql)
95 writer = get_serializer()
96 print writer.serialize_model_to_string(model)
100 parser = OptionParser()
102 model = OptionGroup(parser, 'model')
103 model.add_option('--name', help="Set submission name")
104 model.add_option('--load-model', default=None,
105 help="Load model database")
106 model.add_option('--load-rdf', default=None,
107 help="load rdf statements into model")
108 model.add_option('--sparql', default=None, help="execute sparql query")
109 model.add_option('--print-rdf', action="store_true", default=False,
110 help="print ending model state")
111 parser.add_option_group(model)
113 commands = OptionGroup(parser, 'commands')
114 commands.add_option('--make-tree-from',
115 help="create directories & link data files",
117 commands.add_option('--fastq', default=False, action="store_true",
118 help="generate scripts for making fastq files")
119 commands.add_option('--scan-submission', default=False, action="store_true",
120 help="Import metadata for submission into our model")
121 commands.add_option('--link-daf', default=False, action="store_true",
122 help="link daf into submission directories")
123 commands.add_option('--make-ddf', help='make the ddfs', default=False,
125 parser.add_option_group(commands)
127 parser.add_option('--force', default=False, action="store_true",
128 help="Force regenerating fastqs")
129 parser.add_option('--daf', default=None, help='specify daf name')
130 parser.add_option('--library-url', default=None,
131 help="specify an alternate source for library information")
133 parser.add_option('--verbose', default=False, action="store_true",
134 help='verbose logging')
135 parser.add_option('--debug', default=False, action="store_true",
136 help='debug logging')
138 api.add_auth_options(parser)
142 def make_tree_from(source_path, library_result_map):
143 """Create a tree using data files from source path.
145 for lib_id, lib_path in library_result_map:
146 if not os.path.exists(lib_path):
147 logging.info("Making dir {0}".format(lib_path))
149 source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
150 if os.path.exists(source_lib_dir):
152 for filename in os.listdir(source_lib_dir):
153 source_pathname = os.path.join(source_lib_dir, filename)
154 target_pathname = os.path.join(lib_path, filename)
155 if not os.path.exists(source_pathname):
156 raise IOError("{0} does not exist".format(source_pathname))
157 if not os.path.exists(target_pathname):
158 os.symlink(source_pathname, target_pathname)
160 'LINK {0} to {1}'.format(source_pathname, target_pathname))
163 def link_daf(daf_path, library_result_map):
164 if not os.path.exists(daf_path):
165 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
167 base_daf = os.path.basename(daf_path)
169 for lib_id, result_dir in library_result_map:
170 if not os.path.exists(result_dir):
171 raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
172 submission_daf = os.path.join(result_dir, base_daf)
173 if not os.path.exists(submission_daf):
174 if not os.path.exists(daf_path):
175 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
176 os.link(daf_path, submission_daf)
179 def scan_submission_dirs(view_map, library_result_map):
180 """Look through our submission directories and collect needed information
182 for lib_id, result_dir in library_result_map:
183 logging.info("Importing %s from %s" % (lib_id, result_dir))
185 view_map.import_submission_dir(result_dir, lib_id)
186 except MetadataLookupException, e:
187 logging.error("Skipping %s: %s" % (lib_id, str(e)))
189 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
191 for lib_id, result_dir in library_result_map:
192 submissionNode = view_map.get_submission_node(result_dir)
194 make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
197 if make_condor and len(dag_fragment) > 0:
198 dag_filename = 'submission.dagman'
199 if not force and os.path.exists(dag_filename):
200 logging.warn("%s exists, please delete" % (dag_filename,))
202 f = open(dag_filename,'w')
203 f.write( os.linesep.join(dag_fragment))
204 f.write( os.linesep )
208 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
210 Make ddf files, and bonus condor file
212 query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
213 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
214 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
216 select ?submitView ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate
218 ?file ucscDaf:filename ?files ;
219 ucscDaf:md5sum ?md5sum .
220 ?submitView ucscDaf:has_file ?file ;
221 ucscDaf:view ?dafView ;
222 ucscDaf:submission <%(submission)s> .
223 ?dafView ucscDaf:name ?view .
224 <%(submission)s> submissionOntology:library ?library ;
226 OPTIONAL { ?library libraryOntology:antibody ?antibody }
227 OPTIONAL { ?library libraryOntology:cell_line ?cell }
228 OPTIONAL { <%(submission)s> ucscDaf:control ?control }
229 OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
230 OPTIONAL { ?library ucscDaf:sex ?sex }
231 OPTIONAL { ?library libraryOntology:library_id ?labExpId }
232 OPTIONAL { ?library libraryOntology:library_id ?labVersion }
233 OPTIONAL { ?library libraryOntology:replicate ?replicate }
234 OPTIONAL { ?library libraryOntology:condition ?treatment }
235 OPTIONAL { ?library ucscDaf:protocol ?protocol }
236 OPTIONAL { ?library ucscDaf:readType ?readType }
237 OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
239 ORDER BY ?submitView"""
242 name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
244 logging.error("Need name for %s" % (str(submissionNode)))
247 ddf_name = name + '.ddf'
248 if outdir is not None:
249 outfile = os.path.join(outdir, ddf_name)
250 output = open(outfile,'w')
254 formatted_query = query_template % {'submission': str(submissionNode.uri)}
256 query = RDF.SPARQLQuery(formatted_query)
257 results = query.execute(view_map.model)
259 variables = ['files']
260 # filename goes first
261 variables.extend(view_map.get_daf_variables())
263 variables += [ 'labExpId', 'md5sum']
264 output.write('\t'.join(variables))
265 output.write(os.linesep)
270 viewname = fromTypedNode(row['view'])
271 current = all_views.setdefault(viewname, {})
272 for variable_name in variables:
273 value = str(fromTypedNode(row[variable_name]))
274 if variable_name in ('files', 'md5sum'):
275 current.setdefault(variable_name,[]).append(value)
277 current[variable_name] = value
279 for view in all_views.keys():
281 for variable_name in variables:
282 if variable_name in ('files', 'md5sum'):
283 line.append(','.join(all_views[view][variable_name]))
285 line.append(all_views[view][variable_name])
286 output.write("\t".join(line))
287 output.write(os.linesep)
288 all_files.extend(all_views[view]['files'])
291 "Examined {0}, found files: {1}".format(
292 str(submissionNode), ", ".join(all_files)))
294 all_files.append(daf_name)
295 all_files.append(ddf_name)
298 archive_condor = make_condor_archive_script(name, all_files, outdir)
299 upload_condor = make_condor_upload_script(name, outdir)
301 dag_fragments.extend(
302 make_dag_fragment(name, archive_condor, upload_condor)
308 def read_library_result_map(filename):
310 Read a file that maps library id to result directory.
311 Does not support spaces in filenames.
316 stream = open(filename,'r')
321 if not line.startswith('#') and len(line) > 0 :
322 library_id, result_dir = line.split()
323 results.append((library_id, result_dir))
327 def make_condor_archive_script(name, files, outdir=None):
328 script = """Universe = vanilla
330 Executable = /bin/tar
331 arguments = czvhf ../%(archivename)s %(filelist)s
333 Error = compress.out.$(Process).log
334 Output = compress.out.$(Process).log
335 Log = /tmp/submission-compress-%(user)s.log
336 initialdir = %(initialdir)s
337 environment="GZIP=-3"
345 pathname = os.path.join(outdir, f)
346 if not os.path.exists(pathname):
347 raise RuntimeError("Missing %s from %s" % (f,outdir))
349 context = {'archivename': make_submission_name(name),
350 'filelist': " ".join(files),
351 'initialdir': os.path.abspath(outdir),
352 'user': os.getlogin()}
354 condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
355 condor_stream = open(condor_script,'w')
356 condor_stream.write(script % context)
357 condor_stream.close()
361 def make_condor_upload_script(name, outdir=None):
362 script = """Universe = vanilla
364 Executable = /usr/bin/lftp
365 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
367 Error = upload.out.$(Process).log
368 Output = upload.out.$(Process).log
369 Log = /tmp/submission-upload-%(user)s.log
370 initialdir = %(initialdir)s
377 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
379 encodeftp = 'encodeftp.cse.ucsc.edu'
380 ftpuser = auth.hosts[encodeftp][0]
381 ftppassword = auth.hosts[encodeftp][2]
382 context = {'archivename': make_submission_name(name),
383 'initialdir': os.path.abspath(outdir),
384 'user': os.getlogin(),
386 'ftppassword': ftppassword,
387 'ftphost': encodeftp}
389 condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
390 condor_stream = open(condor_script,'w')
391 condor_stream.write(script % context)
392 condor_stream.close()
393 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
398 def make_dag_fragment(ininame, archive_condor, upload_condor):
400 Make the couple of fragments compress and then upload the data.
402 cur_dir = os.getcwd()
403 archive_condor = os.path.join(cur_dir, archive_condor)
404 upload_condor = os.path.join(cur_dir, upload_condor)
405 job_basename = make_base_name(ininame)
408 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
409 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
410 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
415 def get_library_info(host, apidata, library_id):
416 url = api.library_url(host, library_id)
417 contents = api.retrieve_info(url, apidata)
421 def make_submission_section(line_counter, files, attributes):
423 Create a section in the submission ini file
425 inifile = [ "[line%s]" % (line_counter,) ]
426 inifile += ["files=%s" % (",".join(files))]
428 for k,v in attributes.items():
429 inifile += ["%s=%s" % (k,v)]
433 def make_base_name(pathname):
434 base = os.path.basename(pathname)
435 name, ext = os.path.splitext(base)
439 def make_submission_name(ininame):
440 name = make_base_name(ininame)
444 def make_ddf_name(pathname):
445 name = make_base_name(pathname)
449 def make_condor_name(pathname, run_type=None):
450 name = make_base_name(pathname)
452 if run_type is not None:
453 elements.append(run_type)
454 elements.append("condor")
455 return ".".join(elements)
458 def parse_filelist(file_string):
459 return file_string.split(",")
462 def validate_filelist(files):
464 Die if a file doesn't exist in a file list
467 if not os.path.exists(f):
468 raise RuntimeError("%s does not exist" % (f,))
470 if __name__ == "__main__":