port ucsc_gather to rdflib
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from __future__ import print_function, unicode_literals
3
4 from six.moves.configparser import SafeConfigParser
5 import fnmatch
6 from glob import glob
7 import json
8 import logging
9 import netrc
10 from optparse import OptionParser, OptionGroup
11 import os
12 from pprint import pprint, pformat
13 import shlex
14 from six.moves import StringIO
15 import stat
16 import sys
17 import time
18 import types
19 from zipfile import ZipFile
20
21 if not 'DJANGO_SETTINGS_MODULE' in os.environ:
22     os.environ['DJANGO_SETTINGS_MODULE'] = 'htsworkflow.settings'
23
24 from htsworkflow.util import api
25 from htsworkflow.util.rdfns import \
26      dafTermOntology, \
27      submissionOntology
28 from htsworkflow.submission.daf import \
29      UCSCSubmission, \
30      MetadataLookupException, \
31      get_submission_uri
32 from htsworkflow.submission.results import ResultMap
33 from htsworkflow.submission.condorfastq import CondorFastqExtract
34
35 logger = logging.getLogger('ucsc_gather')
36
37 TAR = '/bin/tar'
38 LFTP = '/usr/bin/lftp'
39
40 def main(cmdline=None):
41     parser = make_parser()
42     opts, args = parser.parse_args(cmdline)
43     submission_uri = None
44
45     global TAR
46     global LFTP
47     TAR = opts.tar
48     LFTP = opts.lftp
49
50     if opts.debug:
51         logging.basicConfig(level = logging.DEBUG )
52     elif opts.verbose:
53         logging.basicConfig(level = logging.INFO )
54     else:
55         logging.basicConfig(level = logging.WARNING )
56
57     apidata = api.make_auth_from_opts(opts, parser)
58
59     model = get_model(opts.model, opts.db_path)
60     mapper = None
61     if opts.name:
62         mapper = UCSCSubmission(opts.name, opts.daf,  model)
63         if opts.library_url is not None:
64             mapper.library_url = opts.library_url
65         submission_uri = get_submission_uri(opts.name)
66
67
68     if opts.load_rdf is not None:
69         if submission_uri is None:
70             parser.error("Please specify the submission name")
71         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
72
73     if opts.make_ddf and opts.daf is None:
74         parser.error("Please specify your daf when making ddf files")
75
76     results = ResultMap()
77     for a in args:
78         results.add_results_from_file(a)
79
80     if opts.make_tree_from is not None:
81         results.make_tree_from(opts.make_tree_from)
82
83     if opts.link_daf:
84         if mapper is None:
85             parser.error("Specify a submission model")
86         if mapper.daf is None:
87             parser.error("Please load a daf first")
88         mapper.link_daf(results)
89
90     if opts.fastq:
91         flowcells = os.path.join(opts.sequence, 'flowcells')
92         extractor = CondorFastqExtract(opts.host, flowcells,
93                                        force=opts.force)
94         extractor.create_scripts(results)
95
96     if opts.scan_submission:
97         mapper.scan_submission_dirs(results)
98
99     if opts.make_ddf:
100         if not os.path.exists(TAR):
101             parser.error("%s does not exist, please specify --tar" % (TAR,))
102         if not os.path.exists(LFTP):
103             parser.error("%s does not exist, please specify --lftp" % (LFTP,))
104         make_all_ddfs(mapper, results, opts.daf, force=opts.force)
105
106     if opts.zip_ddf:
107         zip_ddfs(mapper, results, opts.daf)
108
109     if opts.sparql:
110         sparql_query(model, opts.sparql)
111
112     if opts.print_rdf:
113         writer = get_serializer()
114         print(writer.serialize_model_to_string(model))
115
116
117 def make_parser():
118     parser = OptionParser()
119
120     model = OptionGroup(parser, 'model')
121     model.add_option('--name', help="Set submission name")
122     model.add_option('--db-path', default=None,
123                      help="set rdf database path")
124     model.add_option('--model', default=None,
125       help="Load model database")
126     model.add_option('--load-rdf', default=None,
127       help="load rdf statements into model")
128     model.add_option('--sparql', default=None, help="execute sparql query")
129     model.add_option('--print-rdf', action="store_true", default=False,
130       help="print ending model state")
131     model.add_option('--tar', default=TAR,
132                      help="override path to tar command")
133     model.add_option('--lftp', default=LFTP,
134                      help="override path to lftp command")
135     parser.add_option_group(model)
136     # commands
137     commands = OptionGroup(parser, 'commands')
138     commands.add_option('--make-tree-from',
139                       help="create directories & link data files",
140                       default=None)
141     commands.add_option('--fastq', default=False, action="store_true",
142                         help="generate scripts for making fastq files")
143     commands.add_option('--scan-submission', default=False, action="store_true",
144                       help="Import metadata for submission into our model")
145     commands.add_option('--link-daf', default=False, action="store_true",
146                         help="link daf into submission directories")
147     commands.add_option('--make-ddf', help='make the ddfs', default=False,
148                       action="store_true")
149     commands.add_option('--zip-ddf', default=False, action='store_true',
150                         help='zip up just the metadata')
151
152     parser.add_option_group(commands)
153
154     parser.add_option('--force', default=False, action="store_true",
155                       help="Force regenerating fastqs")
156     parser.add_option('--daf', default=None, help='specify daf name')
157     parser.add_option('--library-url', default=None,
158                       help="specify an alternate source for library information")
159     # debugging
160     parser.add_option('--verbose', default=False, action="store_true",
161                       help='verbose logging')
162     parser.add_option('--debug', default=False, action="store_true",
163                       help='debug logging')
164
165     api.add_auth_options(parser)
166
167     return parser
168
169
170 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
171     dag_fragment = []
172     for lib_id, result_dir in library_result_map.items():
173         submissionNode = view_map.get_submission_node(result_dir)
174         dag_fragment.extend(
175             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
176         )
177
178     if make_condor and len(dag_fragment) > 0:
179         dag_filename = 'submission.dagman'
180         if not force and os.path.exists(dag_filename):
181             logger.warn("%s exists, please delete" % (dag_filename,))
182         else:
183             f = open(dag_filename,'w')
184             f.write( os.linesep.join(dag_fragment))
185             f.write( os.linesep )
186             f.close()
187
188
189 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
190     """
191     Make ddf files, and bonus condor file
192     """
193     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
194 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
195 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
196
197 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
198 WHERE {
199   ?file ucscDaf:filename ?files ;
200         ucscDaf:md5sum ?md5sum .
201   ?submitView ucscDaf:has_file ?file ;
202               ucscDaf:view ?dafView ;
203               ucscDaf:submission <%(submission)s> .
204   ?dafView ucscDaf:name ?view .
205   <%(submission)s> submissionOntology:library ?library ;
206
207   OPTIONAL { ?library libraryOntology:antibody ?antibody }
208   OPTIONAL { ?library libraryOntology:cell_line ?cell }
209   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
210   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
211   OPTIONAL { ?library ucscDaf:sex ?sex }
212   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
213   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
214   OPTIONAL { ?library libraryOntology:replicate ?replicate }
215   OPTIONAL { ?library libraryOntology:condition_term ?treatment }
216   OPTIONAL { ?library ucscDaf:protocol ?protocol }
217   OPTIONAL { ?library ucscDaf:readType ?readType }
218   OPTIONAL { ?library ucscDaf:strain ?strain }
219   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
220   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
221 }
222 ORDER BY  ?submitView"""
223     dag_fragments = []
224
225     names = list(view_map.model.objects(submissionNode, submissionOntology['name']))
226     if len(names) == 0:
227         logger.error("Need name for %s" % (str(submissionNode)))
228         return []
229
230     ddf_name = make_ddf_name(names[0].toPython())
231     if outdir is not None:
232         outfile = os.path.join(outdir, ddf_name)
233         output = open(outfile,'w')
234     else:
235         outfile = 'stdout:'
236         output = sys.stdout
237
238     formatted_query = query_template % {'submission': str(submissionNode.uri)}
239
240     results = view_map.model.query(formatted_query)
241
242     # filename goes first
243     variables = view_map.get_daf_variables()
244     # 'controlId',
245     output.write('\t'.join(variables))
246     output.write(os.linesep)
247
248     all_views = {}
249     all_files = []
250     for row in results:
251         viewname = fromTypedNode(row['view'])
252         current = all_views.setdefault(viewname, {})
253         for variable_name in variables:
254             value = str(fromTypedNode(row[variable_name]))
255             if value is None or value == 'None':
256                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
257             if variable_name in ('files', 'md5sum'):
258                 current.setdefault(variable_name,[]).append(value)
259             else:
260                 current[variable_name] = value
261
262     for view in all_views.keys():
263         line = []
264         for variable_name in variables:
265             if variable_name in ('files', 'md5sum'):
266                 line.append(','.join(all_views[view][variable_name]))
267             else:
268                 line.append(all_views[view][variable_name])
269         output.write("\t".join(line))
270         output.write(os.linesep)
271         all_files.extend(all_views[view]['files'])
272
273     logger.info(
274         "Examined {0}, found files: {1}".format(
275             str(submissionNode), ", ".join(all_files)))
276
277     all_files.append(daf_name)
278     all_files.append(ddf_name)
279
280     if make_condor:
281         archive_condor = make_condor_archive_script(name, all_files, outdir)
282         upload_condor = make_condor_upload_script(name, outdir)
283
284         dag_fragments.extend(
285             make_dag_fragment(name, archive_condor, upload_condor)
286         )
287
288     return dag_fragments
289
290
291 def zip_ddfs(view_map, library_result_map, daf_name):
292     """zip up just the ddf & daf files
293     """
294     rootdir = os.getcwd()
295     for lib_id, result_dir in library_result_map:
296         submissionNode = view_map.get_submission_node(result_dir)
297         nameNodes = list(view_map.model.objects(submissionNode,
298                                                 submissionOntology['name']))
299         if len(nameNodes) == 0:
300             logger.error("Need name for %s" % (str(submissionNode)))
301             continue
302         name = nameNodes[0].toPython()
303
304         zip_name = '../{0}.zip'.format(lib_id)
305         os.chdir(os.path.join(rootdir, result_dir))
306         with ZipFile(zip_name, 'w') as stream:
307             stream.write(make_ddf_name(name))
308             stream.write(daf_name)
309         os.chdir(rootdir)
310
311
312 def make_condor_archive_script(name, files, outdir=None):
313     script = """Universe = vanilla
314
315 Executable = %(tar)s
316 arguments = czvhf ../%(archivename)s %(filelist)s
317
318 Error = compress.out.$(Process).log
319 Output = compress.out.$(Process).log
320 Log = /tmp/submission-compress-%(user)s.log
321 initialdir = %(initialdir)s
322 environment="GZIP=-3"
323 request_memory = 20
324
325 queue
326 """
327     if outdir is None:
328         outdir = os.getcwd()
329     for f in files:
330         pathname = os.path.join(outdir, f)
331         if not os.path.exists(pathname):
332             raise RuntimeError("Missing %s from %s" % (f,outdir))
333
334     context = {'archivename': make_submission_name(name),
335                'filelist': " ".join(files),
336                'initialdir': os.path.abspath(outdir),
337                'user': os.getlogin(),
338                'tar': TAR}
339
340     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
341     condor_stream = open(condor_script,'w')
342     condor_stream.write(script % context)
343     condor_stream.close()
344     return condor_script
345
346
347 def make_condor_upload_script(name, lftp, outdir=None):
348     script = """Universe = vanilla
349
350 Executable = %(lftp)s
351 arguments = -c put %(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
352
353 Error = upload.out.$(Process).log
354 Output = upload.out.$(Process).log
355 Log = /tmp/submission-upload-%(user)s.log
356 initialdir = %(initialdir)s
357
358 queue
359 """
360     if outdir is None:
361         outdir = os.getcwd()
362
363     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
364
365     encodeftp = 'encodeftp.cse.ucsc.edu'
366     ftpuser = auth.hosts[encodeftp][0]
367     ftppassword = auth.hosts[encodeftp][2]
368     context = {'archivename': make_submission_name(name),
369                'initialdir': os.path.abspath(outdir),
370                'user': os.getlogin(),
371                'ftpuser': ftpuser,
372                'ftppassword': ftppassword,
373                'ftphost': encodeftp,
374                'lftp': LFTP}
375
376     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
377     condor_stream = open(condor_script,'w')
378     condor_stream.write(script % context)
379     condor_stream.close()
380     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
381
382     return condor_script
383
384
385 def make_dag_fragment(ininame, archive_condor, upload_condor):
386     """
387     Make the couple of fragments compress and then upload the data.
388     """
389     cur_dir = os.getcwd()
390     archive_condor = os.path.join(cur_dir, archive_condor)
391     upload_condor = os.path.join(cur_dir, upload_condor)
392     job_basename = make_base_name(ininame)
393
394     fragments = []
395     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
396     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
397     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
398
399     return fragments
400
401
402 def make_base_name(pathname):
403     base = os.path.basename(pathname)
404     name, ext = os.path.splitext(base)
405     return name
406
407
408 def make_submission_name(ininame):
409     name = make_base_name(ininame)
410     return name + ".tgz"
411
412
413 def make_ddf_name(pathname):
414     name = make_base_name(pathname)
415     return name + ".ddf"
416
417
418 def make_condor_name(pathname, run_type=None):
419     name = make_base_name(pathname)
420     elements = [name]
421     if run_type is not None:
422         elements.append(run_type)
423     elements.append("condor")
424     return ".".join(elements)
425
426
427 def parse_filelist(file_string):
428     return file_string.split(",")
429
430
431 def validate_filelist(files):
432     """
433     Die if a file doesn't exist in a file list
434     """
435     for f in files:
436         if not os.path.exists(f):
437             raise RuntimeError("%s does not exist" % (f,))
438
439 if __name__ == "__main__":
440     main()