6961fa9bdf30818512f78815223fcc8a9137c6ce
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from __future__ import print_function, unicode_literals
3
4 from ConfigParser import SafeConfigParser
5 import fnmatch
6 from glob import glob
7 import json
8 import logging
9 import netrc
10 from optparse import OptionParser, OptionGroup
11 import os
12 from pprint import pprint, pformat
13 import shlex
14 from six.moves import StringIO
15 import stat
16 import sys
17 import time
18 import types
19 import urllib
20 import urllib2
21 import urlparse
22 from zipfile import ZipFile
23
24 import RDF
25
26 if not 'DJANGO_SETTINGS_MODULE' in os.environ:
27     os.environ['DJANGO_SETTINGS_MODULE'] = 'htsworkflow.settings'
28
29 from htsworkflow.util import api
30 from htsworkflow.util.rdfhelp import \
31      dafTermOntology, \
32      fromTypedNode, \
33      get_model, \
34      get_serializer, \
35      load_into_model, \
36      sparql_query, \
37      submissionOntology
38 from htsworkflow.submission.daf import \
39      UCSCSubmission, \
40      MetadataLookupException, \
41      get_submission_uri
42 from htsworkflow.submission.results import ResultMap
43 from htsworkflow.submission.condorfastq import CondorFastqExtract
44
45 logger = logging.getLogger('ucsc_gather')
46
47 TAR = '/bin/tar'
48 LFTP = '/usr/bin/lftp'
49
50 def main(cmdline=None):
51     parser = make_parser()
52     opts, args = parser.parse_args(cmdline)
53     submission_uri = None
54
55     global TAR
56     global LFTP
57     TAR = opts.tar
58     LFTP = opts.lftp
59
60     if opts.debug:
61         logging.basicConfig(level = logging.DEBUG )
62     elif opts.verbose:
63         logging.basicConfig(level = logging.INFO )
64     else:
65         logging.basicConfig(level = logging.WARNING )
66
67     apidata = api.make_auth_from_opts(opts, parser)
68
69     model = get_model(opts.model, opts.db_path)
70     mapper = None
71     if opts.name:
72         mapper = UCSCSubmission(opts.name, opts.daf,  model)
73         if opts.library_url is not None:
74             mapper.library_url = opts.library_url
75         submission_uri = get_submission_uri(opts.name)
76
77
78     if opts.load_rdf is not None:
79         if submission_uri is None:
80             parser.error("Please specify the submission name")
81         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
82
83     if opts.make_ddf and opts.daf is None:
84         parser.error("Please specify your daf when making ddf files")
85
86     results = ResultMap()
87     for a in args:
88         results.add_results_from_file(a)
89
90     if opts.make_tree_from is not None:
91         results.make_tree_from(opts.make_tree_from)
92
93     if opts.link_daf:
94         if mapper is None:
95             parser.error("Specify a submission model")
96         if mapper.daf is None:
97             parser.error("Please load a daf first")
98         mapper.link_daf(results)
99
100     if opts.fastq:
101         flowcells = os.path.join(opts.sequence, 'flowcells')
102         extractor = CondorFastqExtract(opts.host, flowcells,
103                                        force=opts.force)
104         extractor.create_scripts(results)
105
106     if opts.scan_submission:
107         mapper.scan_submission_dirs(results)
108
109     if opts.make_ddf:
110         if not os.path.exists(TAR):
111             parser.error("%s does not exist, please specify --tar" % (TAR,))
112         if not os.path.exists(LFTP):
113             parser.error("%s does not exist, please specify --lftp" % (LFTP,))
114         make_all_ddfs(mapper, results, opts.daf, force=opts.force)
115
116     if opts.zip_ddf:
117         zip_ddfs(mapper, results, opts.daf)
118
119     if opts.sparql:
120         sparql_query(model, opts.sparql)
121
122     if opts.print_rdf:
123         writer = get_serializer()
124         print(writer.serialize_model_to_string(model))
125
126
127 def make_parser():
128     parser = OptionParser()
129
130     model = OptionGroup(parser, 'model')
131     model.add_option('--name', help="Set submission name")
132     model.add_option('--db-path', default=None,
133                      help="set rdf database path")
134     model.add_option('--model', default=None,
135       help="Load model database")
136     model.add_option('--load-rdf', default=None,
137       help="load rdf statements into model")
138     model.add_option('--sparql', default=None, help="execute sparql query")
139     model.add_option('--print-rdf', action="store_true", default=False,
140       help="print ending model state")
141     model.add_option('--tar', default=TAR,
142                      help="override path to tar command")
143     model.add_option('--lftp', default=LFTP,
144                      help="override path to lftp command")
145     parser.add_option_group(model)
146     # commands
147     commands = OptionGroup(parser, 'commands')
148     commands.add_option('--make-tree-from',
149                       help="create directories & link data files",
150                       default=None)
151     commands.add_option('--fastq', default=False, action="store_true",
152                         help="generate scripts for making fastq files")
153     commands.add_option('--scan-submission', default=False, action="store_true",
154                       help="Import metadata for submission into our model")
155     commands.add_option('--link-daf', default=False, action="store_true",
156                         help="link daf into submission directories")
157     commands.add_option('--make-ddf', help='make the ddfs', default=False,
158                       action="store_true")
159     commands.add_option('--zip-ddf', default=False, action='store_true',
160                         help='zip up just the metadata')
161
162     parser.add_option_group(commands)
163
164     parser.add_option('--force', default=False, action="store_true",
165                       help="Force regenerating fastqs")
166     parser.add_option('--daf', default=None, help='specify daf name')
167     parser.add_option('--library-url', default=None,
168                       help="specify an alternate source for library information")
169     # debugging
170     parser.add_option('--verbose', default=False, action="store_true",
171                       help='verbose logging')
172     parser.add_option('--debug', default=False, action="store_true",
173                       help='debug logging')
174
175     api.add_auth_options(parser)
176
177     return parser
178
179
180 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
181     dag_fragment = []
182     for lib_id, result_dir in library_result_map.items():
183         submissionNode = view_map.get_submission_node(result_dir)
184         dag_fragment.extend(
185             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
186         )
187
188     if make_condor and len(dag_fragment) > 0:
189         dag_filename = 'submission.dagman'
190         if not force and os.path.exists(dag_filename):
191             logger.warn("%s exists, please delete" % (dag_filename,))
192         else:
193             f = open(dag_filename,'w')
194             f.write( os.linesep.join(dag_fragment))
195             f.write( os.linesep )
196             f.close()
197
198
199 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
200     """
201     Make ddf files, and bonus condor file
202     """
203     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
204 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
205 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
206
207 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
208 WHERE {
209   ?file ucscDaf:filename ?files ;
210         ucscDaf:md5sum ?md5sum .
211   ?submitView ucscDaf:has_file ?file ;
212               ucscDaf:view ?dafView ;
213               ucscDaf:submission <%(submission)s> .
214   ?dafView ucscDaf:name ?view .
215   <%(submission)s> submissionOntology:library ?library ;
216
217   OPTIONAL { ?library libraryOntology:antibody ?antibody }
218   OPTIONAL { ?library libraryOntology:cell_line ?cell }
219   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
220   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
221   OPTIONAL { ?library ucscDaf:sex ?sex }
222   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
223   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
224   OPTIONAL { ?library libraryOntology:replicate ?replicate }
225   OPTIONAL { ?library libraryOntology:condition_term ?treatment }
226   OPTIONAL { ?library ucscDaf:protocol ?protocol }
227   OPTIONAL { ?library ucscDaf:readType ?readType }
228   OPTIONAL { ?library ucscDaf:strain ?strain }
229   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
230   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
231 }
232 ORDER BY  ?submitView"""
233     dag_fragments = []
234
235     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
236     if name is None:
237         logger.error("Need name for %s" % (str(submissionNode)))
238         return []
239
240     ddf_name = make_ddf_name(name)
241     if outdir is not None:
242         outfile = os.path.join(outdir, ddf_name)
243         output = open(outfile,'w')
244     else:
245         outfile = 'stdout:'
246         output = sys.stdout
247
248     formatted_query = query_template % {'submission': str(submissionNode.uri)}
249
250     query = RDF.SPARQLQuery(formatted_query)
251     results = query.execute(view_map.model)
252
253     # filename goes first
254     variables = view_map.get_daf_variables()
255     # 'controlId',
256     output.write('\t'.join(variables))
257     output.write(os.linesep)
258
259     all_views = {}
260     all_files = []
261     for row in results:
262         viewname = fromTypedNode(row['view'])
263         current = all_views.setdefault(viewname, {})
264         for variable_name in variables:
265             value = str(fromTypedNode(row[variable_name]))
266             if value is None or value == 'None':
267                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
268             if variable_name in ('files', 'md5sum'):
269                 current.setdefault(variable_name,[]).append(value)
270             else:
271                 current[variable_name] = value
272
273     for view in all_views.keys():
274         line = []
275         for variable_name in variables:
276             if variable_name in ('files', 'md5sum'):
277                 line.append(','.join(all_views[view][variable_name]))
278             else:
279                 line.append(all_views[view][variable_name])
280         output.write("\t".join(line))
281         output.write(os.linesep)
282         all_files.extend(all_views[view]['files'])
283
284     logger.info(
285         "Examined {0}, found files: {1}".format(
286             str(submissionNode), ", ".join(all_files)))
287
288     all_files.append(daf_name)
289     all_files.append(ddf_name)
290
291     if make_condor:
292         archive_condor = make_condor_archive_script(name, all_files, outdir)
293         upload_condor = make_condor_upload_script(name, outdir)
294
295         dag_fragments.extend(
296             make_dag_fragment(name, archive_condor, upload_condor)
297         )
298
299     return dag_fragments
300
301
302 def zip_ddfs(view_map, library_result_map, daf_name):
303     """zip up just the ddf & daf files
304     """
305     rootdir = os.getcwd()
306     for lib_id, result_dir in library_result_map:
307         submissionNode = view_map.get_submission_node(result_dir)
308         nameNode = view_map.model.get_target(submissionNode,
309                                              submissionOntology['name'])
310         name = fromTypedNode(nameNode)
311         if name is None:
312             logger.error("Need name for %s" % (str(submissionNode)))
313             continue
314
315         zip_name = '../{0}.zip'.format(lib_id)
316         os.chdir(os.path.join(rootdir, result_dir))
317         with ZipFile(zip_name, 'w') as stream:
318             stream.write(make_ddf_name(name))
319             stream.write(daf_name)
320         os.chdir(rootdir)
321
322
323 def make_condor_archive_script(name, files, outdir=None):
324     script = """Universe = vanilla
325
326 Executable = %(tar)s
327 arguments = czvhf ../%(archivename)s %(filelist)s
328
329 Error = compress.out.$(Process).log
330 Output = compress.out.$(Process).log
331 Log = /tmp/submission-compress-%(user)s.log
332 initialdir = %(initialdir)s
333 environment="GZIP=-3"
334 request_memory = 20
335
336 queue
337 """
338     if outdir is None:
339         outdir = os.getcwd()
340     for f in files:
341         pathname = os.path.join(outdir, f)
342         if not os.path.exists(pathname):
343             raise RuntimeError("Missing %s from %s" % (f,outdir))
344
345     context = {'archivename': make_submission_name(name),
346                'filelist': " ".join(files),
347                'initialdir': os.path.abspath(outdir),
348                'user': os.getlogin(),
349                'tar': TAR}
350
351     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
352     condor_stream = open(condor_script,'w')
353     condor_stream.write(script % context)
354     condor_stream.close()
355     return condor_script
356
357
358 def make_condor_upload_script(name, lftp, outdir=None):
359     script = """Universe = vanilla
360
361 Executable = %(lftp)s
362 arguments = -c put %(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
363
364 Error = upload.out.$(Process).log
365 Output = upload.out.$(Process).log
366 Log = /tmp/submission-upload-%(user)s.log
367 initialdir = %(initialdir)s
368
369 queue
370 """
371     if outdir is None:
372         outdir = os.getcwd()
373
374     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
375
376     encodeftp = 'encodeftp.cse.ucsc.edu'
377     ftpuser = auth.hosts[encodeftp][0]
378     ftppassword = auth.hosts[encodeftp][2]
379     context = {'archivename': make_submission_name(name),
380                'initialdir': os.path.abspath(outdir),
381                'user': os.getlogin(),
382                'ftpuser': ftpuser,
383                'ftppassword': ftppassword,
384                'ftphost': encodeftp,
385                'lftp': LFTP}
386
387     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
388     condor_stream = open(condor_script,'w')
389     condor_stream.write(script % context)
390     condor_stream.close()
391     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
392
393     return condor_script
394
395
396 def make_dag_fragment(ininame, archive_condor, upload_condor):
397     """
398     Make the couple of fragments compress and then upload the data.
399     """
400     cur_dir = os.getcwd()
401     archive_condor = os.path.join(cur_dir, archive_condor)
402     upload_condor = os.path.join(cur_dir, upload_condor)
403     job_basename = make_base_name(ininame)
404
405     fragments = []
406     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
407     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
408     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
409
410     return fragments
411
412
413 def make_base_name(pathname):
414     base = os.path.basename(pathname)
415     name, ext = os.path.splitext(base)
416     return name
417
418
419 def make_submission_name(ininame):
420     name = make_base_name(ininame)
421     return name + ".tgz"
422
423
424 def make_ddf_name(pathname):
425     name = make_base_name(pathname)
426     return name + ".ddf"
427
428
429 def make_condor_name(pathname, run_type=None):
430     name = make_base_name(pathname)
431     elements = [name]
432     if run_type is not None:
433         elements.append(run_type)
434     elements.append("condor")
435     return ".".join(elements)
436
437
438 def parse_filelist(file_string):
439     return file_string.split(",")
440
441
442 def validate_filelist(files):
443     """
444     Die if a file doesn't exist in a file list
445     """
446     for f in files:
447         if not os.path.exists(f):
448             raise RuntimeError("%s does not exist" % (f,))
449
450 if __name__ == "__main__":
451     main()