2 from __future__ import print_function, unicode_literals
4 from six.moves.configparser import SafeConfigParser
10 from optparse import OptionParser, OptionGroup
12 from pprint import pprint, pformat
14 from six.moves import StringIO
19 from zipfile import ZipFile
21 if not 'DJANGO_SETTINGS_MODULE' in os.environ:
22 os.environ['DJANGO_SETTINGS_MODULE'] = 'htsworkflow.settings'
24 from htsworkflow.util import api
25 from htsworkflow.util.rdfns import \
28 from htsworkflow.submission.daf import \
30 MetadataLookupException, \
32 from htsworkflow.submission.results import ResultMap
33 from htsworkflow.submission.condorfastq import CondorFastqExtract
35 logger = logging.getLogger('ucsc_gather')
38 LFTP = '/usr/bin/lftp'
40 def main(cmdline=None):
41 parser = make_parser()
42 opts, args = parser.parse_args(cmdline)
51 logging.basicConfig(level = logging.DEBUG )
53 logging.basicConfig(level = logging.INFO )
55 logging.basicConfig(level = logging.WARNING )
57 apidata = api.make_auth_from_opts(opts, parser)
59 model = get_model(opts.model, opts.db_path)
62 mapper = UCSCSubmission(opts.name, opts.daf, model)
63 if opts.library_url is not None:
64 mapper.library_url = opts.library_url
65 submission_uri = get_submission_uri(opts.name)
68 if opts.load_rdf is not None:
69 if submission_uri is None:
70 parser.error("Please specify the submission name")
71 load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
73 if opts.make_ddf and opts.daf is None:
74 parser.error("Please specify your daf when making ddf files")
78 results.add_results_from_file(a)
80 if opts.make_tree_from is not None:
81 results.make_tree_from(opts.make_tree_from)
85 parser.error("Specify a submission model")
86 if mapper.daf is None:
87 parser.error("Please load a daf first")
88 mapper.link_daf(results)
91 flowcells = os.path.join(opts.sequence, 'flowcells')
92 extractor = CondorFastqExtract(opts.host, flowcells,
94 extractor.create_scripts(results)
96 if opts.scan_submission:
97 mapper.scan_submission_dirs(results)
100 if not os.path.exists(TAR):
101 parser.error("%s does not exist, please specify --tar" % (TAR,))
102 if not os.path.exists(LFTP):
103 parser.error("%s does not exist, please specify --lftp" % (LFTP,))
104 make_all_ddfs(mapper, results, opts.daf, force=opts.force)
107 zip_ddfs(mapper, results, opts.daf)
110 sparql_query(model, opts.sparql)
113 writer = get_serializer()
114 print(writer.serialize_model_to_string(model))
118 parser = OptionParser()
120 model = OptionGroup(parser, 'model')
121 model.add_option('--name', help="Set submission name")
122 model.add_option('--db-path', default=None,
123 help="set rdf database path")
124 model.add_option('--model', default=None,
125 help="Load model database")
126 model.add_option('--load-rdf', default=None,
127 help="load rdf statements into model")
128 model.add_option('--sparql', default=None, help="execute sparql query")
129 model.add_option('--print-rdf', action="store_true", default=False,
130 help="print ending model state")
131 model.add_option('--tar', default=TAR,
132 help="override path to tar command")
133 model.add_option('--lftp', default=LFTP,
134 help="override path to lftp command")
135 parser.add_option_group(model)
137 commands = OptionGroup(parser, 'commands')
138 commands.add_option('--make-tree-from',
139 help="create directories & link data files",
141 commands.add_option('--fastq', default=False, action="store_true",
142 help="generate scripts for making fastq files")
143 commands.add_option('--scan-submission', default=False, action="store_true",
144 help="Import metadata for submission into our model")
145 commands.add_option('--link-daf', default=False, action="store_true",
146 help="link daf into submission directories")
147 commands.add_option('--make-ddf', help='make the ddfs', default=False,
149 commands.add_option('--zip-ddf', default=False, action='store_true',
150 help='zip up just the metadata')
152 parser.add_option_group(commands)
154 parser.add_option('--force', default=False, action="store_true",
155 help="Force regenerating fastqs")
156 parser.add_option('--daf', default=None, help='specify daf name')
157 parser.add_option('--library-url', default=None,
158 help="specify an alternate source for library information")
160 parser.add_option('--verbose', default=False, action="store_true",
161 help='verbose logging')
162 parser.add_option('--debug', default=False, action="store_true",
163 help='debug logging')
165 api.add_auth_options(parser)
170 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
172 for lib_id, result_dir in library_result_map.items():
173 submissionNode = view_map.get_submission_node(result_dir)
175 make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
178 if make_condor and len(dag_fragment) > 0:
179 dag_filename = 'submission.dagman'
180 if not force and os.path.exists(dag_filename):
181 logger.warn("%s exists, please delete" % (dag_filename,))
183 f = open(dag_filename,'w')
184 f.write( os.linesep.join(dag_fragment))
185 f.write( os.linesep )
189 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
191 Make ddf files, and bonus condor file
193 query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
194 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
195 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
197 select ?submitView ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
199 ?file ucscDaf:filename ?files ;
200 ucscDaf:md5sum ?md5sum .
201 ?submitView ucscDaf:has_file ?file ;
202 ucscDaf:view ?dafView ;
203 ucscDaf:submission <%(submission)s> .
204 ?dafView ucscDaf:name ?view .
205 <%(submission)s> submissionOntology:library ?library ;
207 OPTIONAL { ?library libraryOntology:antibody ?antibody }
208 OPTIONAL { ?library libraryOntology:cell_line ?cell }
209 OPTIONAL { <%(submission)s> ucscDaf:control ?control }
210 OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
211 OPTIONAL { ?library ucscDaf:sex ?sex }
212 OPTIONAL { ?library libraryOntology:library_id ?labExpId }
213 OPTIONAL { ?library libraryOntology:library_id ?labVersion }
214 OPTIONAL { ?library libraryOntology:replicate ?replicate }
215 OPTIONAL { ?library libraryOntology:condition_term ?treatment }
216 OPTIONAL { ?library ucscDaf:protocol ?protocol }
217 OPTIONAL { ?library ucscDaf:readType ?readType }
218 OPTIONAL { ?library ucscDaf:strain ?strain }
219 OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
220 OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
222 ORDER BY ?submitView"""
225 names = list(view_map.model.objects(submissionNode, submissionOntology['name']))
227 logger.error("Need name for %s" % (str(submissionNode)))
230 ddf_name = make_ddf_name(names[0].toPython())
231 if outdir is not None:
232 outfile = os.path.join(outdir, ddf_name)
233 output = open(outfile,'w')
238 formatted_query = query_template % {'submission': str(submissionNode.uri)}
240 results = view_map.model.query(formatted_query)
242 # filename goes first
243 variables = view_map.get_daf_variables()
245 output.write('\t'.join(variables))
246 output.write(os.linesep)
251 viewname = fromTypedNode(row['view'])
252 current = all_views.setdefault(viewname, {})
253 for variable_name in variables:
254 value = str(fromTypedNode(row[variable_name]))
255 if value is None or value == 'None':
256 logger.warn("{0}: {1} was None".format(outfile, variable_name))
257 if variable_name in ('files', 'md5sum'):
258 current.setdefault(variable_name,[]).append(value)
260 current[variable_name] = value
262 for view in all_views.keys():
264 for variable_name in variables:
265 if variable_name in ('files', 'md5sum'):
266 line.append(','.join(all_views[view][variable_name]))
268 line.append(all_views[view][variable_name])
269 output.write("\t".join(line))
270 output.write(os.linesep)
271 all_files.extend(all_views[view]['files'])
274 "Examined {0}, found files: {1}".format(
275 str(submissionNode), ", ".join(all_files)))
277 all_files.append(daf_name)
278 all_files.append(ddf_name)
281 archive_condor = make_condor_archive_script(name, all_files, outdir)
282 upload_condor = make_condor_upload_script(name, outdir)
284 dag_fragments.extend(
285 make_dag_fragment(name, archive_condor, upload_condor)
291 def zip_ddfs(view_map, library_result_map, daf_name):
292 """zip up just the ddf & daf files
294 rootdir = os.getcwd()
295 for lib_id, result_dir in library_result_map:
296 submissionNode = view_map.get_submission_node(result_dir)
297 nameNodes = list(view_map.model.objects(submissionNode,
298 submissionOntology['name']))
299 if len(nameNodes) == 0:
300 logger.error("Need name for %s" % (str(submissionNode)))
302 name = nameNodes[0].toPython()
304 zip_name = '../{0}.zip'.format(lib_id)
305 os.chdir(os.path.join(rootdir, result_dir))
306 with ZipFile(zip_name, 'w') as stream:
307 stream.write(make_ddf_name(name))
308 stream.write(daf_name)
312 def make_condor_archive_script(name, files, outdir=None):
313 script = """Universe = vanilla
316 arguments = czvhf ../%(archivename)s %(filelist)s
318 Error = compress.out.$(Process).log
319 Output = compress.out.$(Process).log
320 Log = /tmp/submission-compress-%(user)s.log
321 initialdir = %(initialdir)s
322 environment="GZIP=-3"
330 pathname = os.path.join(outdir, f)
331 if not os.path.exists(pathname):
332 raise RuntimeError("Missing %s from %s" % (f,outdir))
334 context = {'archivename': make_submission_name(name),
335 'filelist': " ".join(files),
336 'initialdir': os.path.abspath(outdir),
337 'user': os.getlogin(),
340 condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
341 condor_stream = open(condor_script,'w')
342 condor_stream.write(script % context)
343 condor_stream.close()
347 def make_condor_upload_script(name, lftp, outdir=None):
348 script = """Universe = vanilla
350 Executable = %(lftp)s
351 arguments = -c put %(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
353 Error = upload.out.$(Process).log
354 Output = upload.out.$(Process).log
355 Log = /tmp/submission-upload-%(user)s.log
356 initialdir = %(initialdir)s
363 auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
365 encodeftp = 'encodeftp.cse.ucsc.edu'
366 ftpuser = auth.hosts[encodeftp][0]
367 ftppassword = auth.hosts[encodeftp][2]
368 context = {'archivename': make_submission_name(name),
369 'initialdir': os.path.abspath(outdir),
370 'user': os.getlogin(),
372 'ftppassword': ftppassword,
373 'ftphost': encodeftp,
376 condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
377 condor_stream = open(condor_script,'w')
378 condor_stream.write(script % context)
379 condor_stream.close()
380 os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
385 def make_dag_fragment(ininame, archive_condor, upload_condor):
387 Make the couple of fragments compress and then upload the data.
389 cur_dir = os.getcwd()
390 archive_condor = os.path.join(cur_dir, archive_condor)
391 upload_condor = os.path.join(cur_dir, upload_condor)
392 job_basename = make_base_name(ininame)
395 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
396 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
397 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
402 def make_base_name(pathname):
403 base = os.path.basename(pathname)
404 name, ext = os.path.splitext(base)
408 def make_submission_name(ininame):
409 name = make_base_name(ininame)
413 def make_ddf_name(pathname):
414 name = make_base_name(pathname)
418 def make_condor_name(pathname, run_type=None):
419 name = make_base_name(pathname)
421 if run_type is not None:
422 elements.append(run_type)
423 elements.append("condor")
424 return ".".join(elements)
427 def parse_filelist(file_string):
428 return file_string.split(",")
431 def validate_filelist(files):
433 Die if a file doesn't exist in a file list
436 if not os.path.exists(f):
437 raise RuntimeError("%s does not exist" % (f,))
439 if __name__ == "__main__":