Add ability to override location to tar and lftp commands
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20 from zipfile import ZipFile
21
22 import RDF
23
24 from htsworkflow.util import api
25 from htsworkflow.util.rdfhelp import \
26      dafTermOntology, \
27      fromTypedNode, \
28      get_model, \
29      get_serializer, \
30      load_into_model, \
31      sparql_query, \
32      submissionOntology
33 from htsworkflow.submission.daf import \
34      UCSCSubmission, \
35      MetadataLookupException, \
36      get_submission_uri
37 from htsworkflow.submission.results import ResultMap
38 from htsworkflow.submission.condorfastq import CondorFastqExtract
39
40 logger = logging.getLogger('ucsc_gather')
41
42 TAR = '/bin/tar'
43 LFTP = '/usr/bin/lftp'
44
45 def main(cmdline=None):
46     parser = make_parser()
47     opts, args = parser.parse_args(cmdline)
48     submission_uri = None
49
50     global TAR
51     global LFTP
52     TAR = opts.tar
53     LFTP = opts.lftp
54
55     if opts.debug:
56         logging.basicConfig(level = logging.DEBUG )
57     elif opts.verbose:
58         logging.basicConfig(level = logging.INFO )
59     else:
60         logging.basicConfig(level = logging.WARNING )
61
62     apidata = api.make_auth_from_opts(opts, parser)
63
64     model = get_model(opts.model, opts.db_path)
65     mapper = None
66     if opts.name:
67         mapper = UCSCSubmission(opts.name, opts.daf,  model)
68         if opts.library_url is not None:
69             mapper.library_url = opts.library_url
70         submission_uri = get_submission_uri(opts.name)
71
72
73     if opts.load_rdf is not None:
74         if submission_uri is None:
75             parser.error("Please specify the submission name")
76         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
77
78     if opts.make_ddf and opts.daf is None:
79         parser.error("Please specify your daf when making ddf files")
80
81     results = ResultMap()
82     for a in args:
83         results.add_results_from_file(a)
84
85     if opts.make_tree_from is not None:
86         results.make_tree_from(opts.make_tree_from)
87
88     if opts.link_daf:
89         if mapper is None:
90             parser.error("Specify a submission model")
91         if mapper.daf is None:
92             parser.error("Please load a daf first")
93         mapper.link_daf(results)
94
95     if opts.fastq:
96         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
97                                        force=opts.force)
98         extractor.create_scripts(results)
99
100     if opts.scan_submission:
101         mapper.scan_submission_dirs(results)
102
103     if opts.make_ddf:
104         if not os.path.exists(TAR):
105             parser.error("%s does not exist, please specify --tar" % (TAR,))
106         if not os.path.exists(LFTP):
107             parser.error("%s does not exist, please specify --lftp" % (LFTP,))
108         make_all_ddfs(mapper, results, opts.daf, force=opts.force)
109
110     if opts.zip_ddf:
111         zip_ddfs(mapper, results, opts.daf)
112
113     if opts.sparql:
114         sparql_query(model, opts.sparql)
115
116     if opts.print_rdf:
117         writer = get_serializer()
118         print writer.serialize_model_to_string(model)
119
120
121 def make_parser():
122     parser = OptionParser()
123
124     model = OptionGroup(parser, 'model')
125     model.add_option('--name', help="Set submission name")
126     model.add_option('--db-path', default=None,
127                      help="set rdf database path")
128     model.add_option('--model', default=None,
129       help="Load model database")
130     model.add_option('--load-rdf', default=None,
131       help="load rdf statements into model")
132     model.add_option('--sparql', default=None, help="execute sparql query")
133     model.add_option('--print-rdf', action="store_true", default=False,
134       help="print ending model state")
135     model.add_option('--tar', default=TAR,
136                      help="override path to tar command")
137     model.add_option('--lftp', default=LFTP,
138                      help="override path to lftp command")
139     parser.add_option_group(model)
140     # commands
141     commands = OptionGroup(parser, 'commands')
142     commands.add_option('--make-tree-from',
143                       help="create directories & link data files",
144                       default=None)
145     commands.add_option('--fastq', default=False, action="store_true",
146                         help="generate scripts for making fastq files")
147     commands.add_option('--scan-submission', default=False, action="store_true",
148                       help="Import metadata for submission into our model")
149     commands.add_option('--link-daf', default=False, action="store_true",
150                         help="link daf into submission directories")
151     commands.add_option('--make-ddf', help='make the ddfs', default=False,
152                       action="store_true")
153     commands.add_option('--zip-ddf', default=False, action='store_true',
154                         help='zip up just the metadata')
155
156     parser.add_option_group(commands)
157
158     parser.add_option('--force', default=False, action="store_true",
159                       help="Force regenerating fastqs")
160     parser.add_option('--daf', default=None, help='specify daf name')
161     parser.add_option('--library-url', default=None,
162                       help="specify an alternate source for library information")
163     # debugging
164     parser.add_option('--verbose', default=False, action="store_true",
165                       help='verbose logging')
166     parser.add_option('--debug', default=False, action="store_true",
167                       help='debug logging')
168
169     api.add_auth_options(parser)
170
171     return parser
172
173
174 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
175     dag_fragment = []
176     for lib_id, result_dir in library_result_map.items():
177         submissionNode = view_map.get_submission_node(result_dir)
178         dag_fragment.extend(
179             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
180         )
181
182     if make_condor and len(dag_fragment) > 0:
183         dag_filename = 'submission.dagman'
184         if not force and os.path.exists(dag_filename):
185             logger.warn("%s exists, please delete" % (dag_filename,))
186         else:
187             f = open(dag_filename,'w')
188             f.write( os.linesep.join(dag_fragment))
189             f.write( os.linesep )
190             f.close()
191
192
193 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
194     """
195     Make ddf files, and bonus condor file
196     """
197     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
198 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
199 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
200
201 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
202 WHERE {
203   ?file ucscDaf:filename ?files ;
204         ucscDaf:md5sum ?md5sum .
205   ?submitView ucscDaf:has_file ?file ;
206               ucscDaf:view ?dafView ;
207               ucscDaf:submission <%(submission)s> .
208   ?dafView ucscDaf:name ?view .
209   <%(submission)s> submissionOntology:library ?library ;
210
211   OPTIONAL { ?library libraryOntology:antibody ?antibody }
212   OPTIONAL { ?library libraryOntology:cell_line ?cell }
213   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
214   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
215   OPTIONAL { ?library ucscDaf:sex ?sex }
216   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
217   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
218   OPTIONAL { ?library libraryOntology:replicate ?replicate }
219   OPTIONAL { ?library libraryOntology:condition_term ?treatment }
220   OPTIONAL { ?library ucscDaf:protocol ?protocol }
221   OPTIONAL { ?library ucscDaf:readType ?readType }
222   OPTIONAL { ?library ucscDaf:strain ?strain }
223   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
224   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
225 }
226 ORDER BY  ?submitView"""
227     dag_fragments = []
228
229     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
230     if name is None:
231         logger.error("Need name for %s" % (str(submissionNode)))
232         return []
233
234     ddf_name = make_ddf_name(name)
235     if outdir is not None:
236         outfile = os.path.join(outdir, ddf_name)
237         output = open(outfile,'w')
238     else:
239         outfile = 'stdout:'
240         output = sys.stdout
241
242     formatted_query = query_template % {'submission': str(submissionNode.uri)}
243
244     query = RDF.SPARQLQuery(formatted_query)
245     results = query.execute(view_map.model)
246
247     # filename goes first
248     variables = view_map.get_daf_variables()
249     # 'controlId',
250     output.write('\t'.join(variables))
251     output.write(os.linesep)
252
253     all_views = {}
254     all_files = []
255     for row in results:
256         viewname = fromTypedNode(row['view'])
257         current = all_views.setdefault(viewname, {})
258         for variable_name in variables:
259             value = str(fromTypedNode(row[variable_name]))
260             if value is None or value == 'None':
261                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
262             if variable_name in ('files', 'md5sum'):
263                 current.setdefault(variable_name,[]).append(value)
264             else:
265                 current[variable_name] = value
266
267     for view in all_views.keys():
268         line = []
269         for variable_name in variables:
270             if variable_name in ('files', 'md5sum'):
271                 line.append(','.join(all_views[view][variable_name]))
272             else:
273                 line.append(all_views[view][variable_name])
274         output.write("\t".join(line))
275         output.write(os.linesep)
276         all_files.extend(all_views[view]['files'])
277
278     logger.info(
279         "Examined {0}, found files: {1}".format(
280             str(submissionNode), ", ".join(all_files)))
281
282     all_files.append(daf_name)
283     all_files.append(ddf_name)
284
285     if make_condor:
286         archive_condor = make_condor_archive_script(name, all_files, outdir)
287         upload_condor = make_condor_upload_script(name, outdir)
288
289         dag_fragments.extend(
290             make_dag_fragment(name, archive_condor, upload_condor)
291         )
292
293     return dag_fragments
294
295
296 def zip_ddfs(view_map, library_result_map, daf_name):
297     """zip up just the ddf & daf files
298     """
299     rootdir = os.getcwd()
300     for lib_id, result_dir in library_result_map:
301         submissionNode = view_map.get_submission_node(result_dir)
302         nameNode = view_map.model.get_target(submissionNode,
303                                              submissionOntology['name'])
304         name = fromTypedNode(nameNode)
305         if name is None:
306             logger.error("Need name for %s" % (str(submissionNode)))
307             continue
308
309         zip_name = '../{0}.zip'.format(lib_id)
310         os.chdir(os.path.join(rootdir, result_dir))
311         with ZipFile(zip_name, 'w') as stream:
312             stream.write(make_ddf_name(name))
313             stream.write(daf_name)
314         os.chdir(rootdir)
315
316
317 def make_condor_archive_script(name, files, outdir=None):
318     script = """Universe = vanilla
319
320 Executable = %(tar)s
321 arguments = czvhf ../%(archivename)s %(filelist)s
322
323 Error = compress.out.$(Process).log
324 Output = compress.out.$(Process).log
325 Log = /tmp/submission-compress-%(user)s.log
326 initialdir = %(initialdir)s
327 environment="GZIP=-3"
328 request_memory = 20
329
330 queue
331 """
332     if outdir is None:
333         outdir = os.getcwd()
334     for f in files:
335         pathname = os.path.join(outdir, f)
336         if not os.path.exists(pathname):
337             raise RuntimeError("Missing %s from %s" % (f,outdir))
338
339     context = {'archivename': make_submission_name(name),
340                'filelist': " ".join(files),
341                'initialdir': os.path.abspath(outdir),
342                'user': os.getlogin(),
343                'tar': TAR}
344
345     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
346     condor_stream = open(condor_script,'w')
347     condor_stream.write(script % context)
348     condor_stream.close()
349     return condor_script
350
351
352 def make_condor_upload_script(name, lftp, outdir=None):
353     script = """Universe = vanilla
354
355 Executable = %(lftp)s
356 arguments = -c put %(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
357
358 Error = upload.out.$(Process).log
359 Output = upload.out.$(Process).log
360 Log = /tmp/submission-upload-%(user)s.log
361 initialdir = %(initialdir)s
362
363 queue
364 """
365     if outdir is None:
366         outdir = os.getcwd()
367
368     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
369
370     encodeftp = 'encodeftp.cse.ucsc.edu'
371     ftpuser = auth.hosts[encodeftp][0]
372     ftppassword = auth.hosts[encodeftp][2]
373     context = {'archivename': make_submission_name(name),
374                'initialdir': os.path.abspath(outdir),
375                'user': os.getlogin(),
376                'ftpuser': ftpuser,
377                'ftppassword': ftppassword,
378                'ftphost': encodeftp,
379                'lftp': LFTP}
380
381     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
382     condor_stream = open(condor_script,'w')
383     condor_stream.write(script % context)
384     condor_stream.close()
385     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
386
387     return condor_script
388
389
390 def make_dag_fragment(ininame, archive_condor, upload_condor):
391     """
392     Make the couple of fragments compress and then upload the data.
393     """
394     cur_dir = os.getcwd()
395     archive_condor = os.path.join(cur_dir, archive_condor)
396     upload_condor = os.path.join(cur_dir, upload_condor)
397     job_basename = make_base_name(ininame)
398
399     fragments = []
400     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
401     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
402     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
403
404     return fragments
405
406
407 def make_base_name(pathname):
408     base = os.path.basename(pathname)
409     name, ext = os.path.splitext(base)
410     return name
411
412
413 def make_submission_name(ininame):
414     name = make_base_name(ininame)
415     return name + ".tgz"
416
417
418 def make_ddf_name(pathname):
419     name = make_base_name(pathname)
420     return name + ".ddf"
421
422
423 def make_condor_name(pathname, run_type=None):
424     name = make_base_name(pathname)
425     elements = [name]
426     if run_type is not None:
427         elements.append(run_type)
428     elements.append("condor")
429     return ".".join(elements)
430
431
432 def parse_filelist(file_string):
433     return file_string.split(",")
434
435
436 def validate_filelist(files):
437     """
438     Die if a file doesn't exist in a file list
439     """
440     for f in files:
441         if not os.path.exists(f):
442             raise RuntimeError("%s does not exist" % (f,))
443
444 if __name__ == "__main__":
445     main()