use six.moves to work around urllib / urllib2 / urlparse to urllib 2to3 cleanup
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from __future__ import print_function, unicode_literals
3
4 from ConfigParser import SafeConfigParser
5 import fnmatch
6 from glob import glob
7 import json
8 import logging
9 import netrc
10 from optparse import OptionParser, OptionGroup
11 import os
12 from pprint import pprint, pformat
13 import shlex
14 from six.moves import StringIO
15 import stat
16 import sys
17 import time
18 import types
19 from zipfile import ZipFile
20
21 import RDF
22
23 if not 'DJANGO_SETTINGS_MODULE' in os.environ:
24     os.environ['DJANGO_SETTINGS_MODULE'] = 'htsworkflow.settings'
25
26 from htsworkflow.util import api
27 from htsworkflow.util.rdfhelp import \
28      dafTermOntology, \
29      fromTypedNode, \
30      get_model, \
31      get_serializer, \
32      load_into_model, \
33      sparql_query, \
34      submissionOntology
35 from htsworkflow.submission.daf import \
36      UCSCSubmission, \
37      MetadataLookupException, \
38      get_submission_uri
39 from htsworkflow.submission.results import ResultMap
40 from htsworkflow.submission.condorfastq import CondorFastqExtract
41
42 logger = logging.getLogger('ucsc_gather')
43
44 TAR = '/bin/tar'
45 LFTP = '/usr/bin/lftp'
46
47 def main(cmdline=None):
48     parser = make_parser()
49     opts, args = parser.parse_args(cmdline)
50     submission_uri = None
51
52     global TAR
53     global LFTP
54     TAR = opts.tar
55     LFTP = opts.lftp
56
57     if opts.debug:
58         logging.basicConfig(level = logging.DEBUG )
59     elif opts.verbose:
60         logging.basicConfig(level = logging.INFO )
61     else:
62         logging.basicConfig(level = logging.WARNING )
63
64     apidata = api.make_auth_from_opts(opts, parser)
65
66     model = get_model(opts.model, opts.db_path)
67     mapper = None
68     if opts.name:
69         mapper = UCSCSubmission(opts.name, opts.daf,  model)
70         if opts.library_url is not None:
71             mapper.library_url = opts.library_url
72         submission_uri = get_submission_uri(opts.name)
73
74
75     if opts.load_rdf is not None:
76         if submission_uri is None:
77             parser.error("Please specify the submission name")
78         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
79
80     if opts.make_ddf and opts.daf is None:
81         parser.error("Please specify your daf when making ddf files")
82
83     results = ResultMap()
84     for a in args:
85         results.add_results_from_file(a)
86
87     if opts.make_tree_from is not None:
88         results.make_tree_from(opts.make_tree_from)
89
90     if opts.link_daf:
91         if mapper is None:
92             parser.error("Specify a submission model")
93         if mapper.daf is None:
94             parser.error("Please load a daf first")
95         mapper.link_daf(results)
96
97     if opts.fastq:
98         flowcells = os.path.join(opts.sequence, 'flowcells')
99         extractor = CondorFastqExtract(opts.host, flowcells,
100                                        force=opts.force)
101         extractor.create_scripts(results)
102
103     if opts.scan_submission:
104         mapper.scan_submission_dirs(results)
105
106     if opts.make_ddf:
107         if not os.path.exists(TAR):
108             parser.error("%s does not exist, please specify --tar" % (TAR,))
109         if not os.path.exists(LFTP):
110             parser.error("%s does not exist, please specify --lftp" % (LFTP,))
111         make_all_ddfs(mapper, results, opts.daf, force=opts.force)
112
113     if opts.zip_ddf:
114         zip_ddfs(mapper, results, opts.daf)
115
116     if opts.sparql:
117         sparql_query(model, opts.sparql)
118
119     if opts.print_rdf:
120         writer = get_serializer()
121         print(writer.serialize_model_to_string(model))
122
123
124 def make_parser():
125     parser = OptionParser()
126
127     model = OptionGroup(parser, 'model')
128     model.add_option('--name', help="Set submission name")
129     model.add_option('--db-path', default=None,
130                      help="set rdf database path")
131     model.add_option('--model', default=None,
132       help="Load model database")
133     model.add_option('--load-rdf', default=None,
134       help="load rdf statements into model")
135     model.add_option('--sparql', default=None, help="execute sparql query")
136     model.add_option('--print-rdf', action="store_true", default=False,
137       help="print ending model state")
138     model.add_option('--tar', default=TAR,
139                      help="override path to tar command")
140     model.add_option('--lftp', default=LFTP,
141                      help="override path to lftp command")
142     parser.add_option_group(model)
143     # commands
144     commands = OptionGroup(parser, 'commands')
145     commands.add_option('--make-tree-from',
146                       help="create directories & link data files",
147                       default=None)
148     commands.add_option('--fastq', default=False, action="store_true",
149                         help="generate scripts for making fastq files")
150     commands.add_option('--scan-submission', default=False, action="store_true",
151                       help="Import metadata for submission into our model")
152     commands.add_option('--link-daf', default=False, action="store_true",
153                         help="link daf into submission directories")
154     commands.add_option('--make-ddf', help='make the ddfs', default=False,
155                       action="store_true")
156     commands.add_option('--zip-ddf', default=False, action='store_true',
157                         help='zip up just the metadata')
158
159     parser.add_option_group(commands)
160
161     parser.add_option('--force', default=False, action="store_true",
162                       help="Force regenerating fastqs")
163     parser.add_option('--daf', default=None, help='specify daf name')
164     parser.add_option('--library-url', default=None,
165                       help="specify an alternate source for library information")
166     # debugging
167     parser.add_option('--verbose', default=False, action="store_true",
168                       help='verbose logging')
169     parser.add_option('--debug', default=False, action="store_true",
170                       help='debug logging')
171
172     api.add_auth_options(parser)
173
174     return parser
175
176
177 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
178     dag_fragment = []
179     for lib_id, result_dir in library_result_map.items():
180         submissionNode = view_map.get_submission_node(result_dir)
181         dag_fragment.extend(
182             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
183         )
184
185     if make_condor and len(dag_fragment) > 0:
186         dag_filename = 'submission.dagman'
187         if not force and os.path.exists(dag_filename):
188             logger.warn("%s exists, please delete" % (dag_filename,))
189         else:
190             f = open(dag_filename,'w')
191             f.write( os.linesep.join(dag_fragment))
192             f.write( os.linesep )
193             f.close()
194
195
196 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
197     """
198     Make ddf files, and bonus condor file
199     """
200     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
201 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
202 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
203
204 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
205 WHERE {
206   ?file ucscDaf:filename ?files ;
207         ucscDaf:md5sum ?md5sum .
208   ?submitView ucscDaf:has_file ?file ;
209               ucscDaf:view ?dafView ;
210               ucscDaf:submission <%(submission)s> .
211   ?dafView ucscDaf:name ?view .
212   <%(submission)s> submissionOntology:library ?library ;
213
214   OPTIONAL { ?library libraryOntology:antibody ?antibody }
215   OPTIONAL { ?library libraryOntology:cell_line ?cell }
216   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
217   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
218   OPTIONAL { ?library ucscDaf:sex ?sex }
219   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
220   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
221   OPTIONAL { ?library libraryOntology:replicate ?replicate }
222   OPTIONAL { ?library libraryOntology:condition_term ?treatment }
223   OPTIONAL { ?library ucscDaf:protocol ?protocol }
224   OPTIONAL { ?library ucscDaf:readType ?readType }
225   OPTIONAL { ?library ucscDaf:strain ?strain }
226   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
227   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
228 }
229 ORDER BY  ?submitView"""
230     dag_fragments = []
231
232     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
233     if name is None:
234         logger.error("Need name for %s" % (str(submissionNode)))
235         return []
236
237     ddf_name = make_ddf_name(name)
238     if outdir is not None:
239         outfile = os.path.join(outdir, ddf_name)
240         output = open(outfile,'w')
241     else:
242         outfile = 'stdout:'
243         output = sys.stdout
244
245     formatted_query = query_template % {'submission': str(submissionNode.uri)}
246
247     query = RDF.SPARQLQuery(formatted_query)
248     results = query.execute(view_map.model)
249
250     # filename goes first
251     variables = view_map.get_daf_variables()
252     # 'controlId',
253     output.write('\t'.join(variables))
254     output.write(os.linesep)
255
256     all_views = {}
257     all_files = []
258     for row in results:
259         viewname = fromTypedNode(row['view'])
260         current = all_views.setdefault(viewname, {})
261         for variable_name in variables:
262             value = str(fromTypedNode(row[variable_name]))
263             if value is None or value == 'None':
264                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
265             if variable_name in ('files', 'md5sum'):
266                 current.setdefault(variable_name,[]).append(value)
267             else:
268                 current[variable_name] = value
269
270     for view in all_views.keys():
271         line = []
272         for variable_name in variables:
273             if variable_name in ('files', 'md5sum'):
274                 line.append(','.join(all_views[view][variable_name]))
275             else:
276                 line.append(all_views[view][variable_name])
277         output.write("\t".join(line))
278         output.write(os.linesep)
279         all_files.extend(all_views[view]['files'])
280
281     logger.info(
282         "Examined {0}, found files: {1}".format(
283             str(submissionNode), ", ".join(all_files)))
284
285     all_files.append(daf_name)
286     all_files.append(ddf_name)
287
288     if make_condor:
289         archive_condor = make_condor_archive_script(name, all_files, outdir)
290         upload_condor = make_condor_upload_script(name, outdir)
291
292         dag_fragments.extend(
293             make_dag_fragment(name, archive_condor, upload_condor)
294         )
295
296     return dag_fragments
297
298
299 def zip_ddfs(view_map, library_result_map, daf_name):
300     """zip up just the ddf & daf files
301     """
302     rootdir = os.getcwd()
303     for lib_id, result_dir in library_result_map:
304         submissionNode = view_map.get_submission_node(result_dir)
305         nameNode = view_map.model.get_target(submissionNode,
306                                              submissionOntology['name'])
307         name = fromTypedNode(nameNode)
308         if name is None:
309             logger.error("Need name for %s" % (str(submissionNode)))
310             continue
311
312         zip_name = '../{0}.zip'.format(lib_id)
313         os.chdir(os.path.join(rootdir, result_dir))
314         with ZipFile(zip_name, 'w') as stream:
315             stream.write(make_ddf_name(name))
316             stream.write(daf_name)
317         os.chdir(rootdir)
318
319
320 def make_condor_archive_script(name, files, outdir=None):
321     script = """Universe = vanilla
322
323 Executable = %(tar)s
324 arguments = czvhf ../%(archivename)s %(filelist)s
325
326 Error = compress.out.$(Process).log
327 Output = compress.out.$(Process).log
328 Log = /tmp/submission-compress-%(user)s.log
329 initialdir = %(initialdir)s
330 environment="GZIP=-3"
331 request_memory = 20
332
333 queue
334 """
335     if outdir is None:
336         outdir = os.getcwd()
337     for f in files:
338         pathname = os.path.join(outdir, f)
339         if not os.path.exists(pathname):
340             raise RuntimeError("Missing %s from %s" % (f,outdir))
341
342     context = {'archivename': make_submission_name(name),
343                'filelist': " ".join(files),
344                'initialdir': os.path.abspath(outdir),
345                'user': os.getlogin(),
346                'tar': TAR}
347
348     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
349     condor_stream = open(condor_script,'w')
350     condor_stream.write(script % context)
351     condor_stream.close()
352     return condor_script
353
354
355 def make_condor_upload_script(name, lftp, outdir=None):
356     script = """Universe = vanilla
357
358 Executable = %(lftp)s
359 arguments = -c put %(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
360
361 Error = upload.out.$(Process).log
362 Output = upload.out.$(Process).log
363 Log = /tmp/submission-upload-%(user)s.log
364 initialdir = %(initialdir)s
365
366 queue
367 """
368     if outdir is None:
369         outdir = os.getcwd()
370
371     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
372
373     encodeftp = 'encodeftp.cse.ucsc.edu'
374     ftpuser = auth.hosts[encodeftp][0]
375     ftppassword = auth.hosts[encodeftp][2]
376     context = {'archivename': make_submission_name(name),
377                'initialdir': os.path.abspath(outdir),
378                'user': os.getlogin(),
379                'ftpuser': ftpuser,
380                'ftppassword': ftppassword,
381                'ftphost': encodeftp,
382                'lftp': LFTP}
383
384     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
385     condor_stream = open(condor_script,'w')
386     condor_stream.write(script % context)
387     condor_stream.close()
388     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
389
390     return condor_script
391
392
393 def make_dag_fragment(ininame, archive_condor, upload_condor):
394     """
395     Make the couple of fragments compress and then upload the data.
396     """
397     cur_dir = os.getcwd()
398     archive_condor = os.path.join(cur_dir, archive_condor)
399     upload_condor = os.path.join(cur_dir, upload_condor)
400     job_basename = make_base_name(ininame)
401
402     fragments = []
403     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
404     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
405     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
406
407     return fragments
408
409
410 def make_base_name(pathname):
411     base = os.path.basename(pathname)
412     name, ext = os.path.splitext(base)
413     return name
414
415
416 def make_submission_name(ininame):
417     name = make_base_name(ininame)
418     return name + ".tgz"
419
420
421 def make_ddf_name(pathname):
422     name = make_base_name(pathname)
423     return name + ".ddf"
424
425
426 def make_condor_name(pathname, run_type=None):
427     name = make_base_name(pathname)
428     elements = [name]
429     if run_type is not None:
430         elements.append(run_type)
431     elements.append("condor")
432     return ".".join(elements)
433
434
435 def parse_filelist(file_string):
436     return file_string.split(",")
437
438
439 def validate_filelist(files):
440     """
441     Die if a file doesn't exist in a file list
442     """
443     for f in files:
444         if not os.path.exists(f):
445             raise RuntimeError("%s does not exist" % (f,))
446
447 if __name__ == "__main__":
448     main()