mark the example submission rule files as being raw, so the escapes dont get confused
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20 from zipfile import ZipFile
21
22 import RDF
23
24 if not 'DJANGO_SETTINGS_MODULE' in os.environ:
25     os.environ['DJANGO_SETTINGS_MODULE'] = 'htsworkflow.settings'
26
27 from htsworkflow.util import api
28 from htsworkflow.util.rdfhelp import \
29      dafTermOntology, \
30      fromTypedNode, \
31      get_model, \
32      get_serializer, \
33      load_into_model, \
34      sparql_query, \
35      submissionOntology
36 from htsworkflow.submission.daf import \
37      UCSCSubmission, \
38      MetadataLookupException, \
39      get_submission_uri
40 from htsworkflow.submission.results import ResultMap
41 from htsworkflow.submission.condorfastq import CondorFastqExtract
42
43 logger = logging.getLogger('ucsc_gather')
44
45 TAR = '/bin/tar'
46 LFTP = '/usr/bin/lftp'
47
48 def main(cmdline=None):
49     parser = make_parser()
50     opts, args = parser.parse_args(cmdline)
51     submission_uri = None
52
53     global TAR
54     global LFTP
55     TAR = opts.tar
56     LFTP = opts.lftp
57
58     if opts.debug:
59         logging.basicConfig(level = logging.DEBUG )
60     elif opts.verbose:
61         logging.basicConfig(level = logging.INFO )
62     else:
63         logging.basicConfig(level = logging.WARNING )
64
65     apidata = api.make_auth_from_opts(opts, parser)
66
67     model = get_model(opts.model, opts.db_path)
68     mapper = None
69     if opts.name:
70         mapper = UCSCSubmission(opts.name, opts.daf,  model)
71         if opts.library_url is not None:
72             mapper.library_url = opts.library_url
73         submission_uri = get_submission_uri(opts.name)
74
75
76     if opts.load_rdf is not None:
77         if submission_uri is None:
78             parser.error("Please specify the submission name")
79         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
80
81     if opts.make_ddf and opts.daf is None:
82         parser.error("Please specify your daf when making ddf files")
83
84     results = ResultMap()
85     for a in args:
86         results.add_results_from_file(a)
87
88     if opts.make_tree_from is not None:
89         results.make_tree_from(opts.make_tree_from)
90
91     if opts.link_daf:
92         if mapper is None:
93             parser.error("Specify a submission model")
94         if mapper.daf is None:
95             parser.error("Please load a daf first")
96         mapper.link_daf(results)
97
98     if opts.fastq:
99         flowcells = os.path.join(opts.sequence, 'flowcells')
100         extractor = CondorFastqExtract(opts.host, flowcells,
101                                        force=opts.force)
102         extractor.create_scripts(results)
103
104     if opts.scan_submission:
105         mapper.scan_submission_dirs(results)
106
107     if opts.make_ddf:
108         if not os.path.exists(TAR):
109             parser.error("%s does not exist, please specify --tar" % (TAR,))
110         if not os.path.exists(LFTP):
111             parser.error("%s does not exist, please specify --lftp" % (LFTP,))
112         make_all_ddfs(mapper, results, opts.daf, force=opts.force)
113
114     if opts.zip_ddf:
115         zip_ddfs(mapper, results, opts.daf)
116
117     if opts.sparql:
118         sparql_query(model, opts.sparql)
119
120     if opts.print_rdf:
121         writer = get_serializer()
122         print writer.serialize_model_to_string(model)
123
124
125 def make_parser():
126     parser = OptionParser()
127
128     model = OptionGroup(parser, 'model')
129     model.add_option('--name', help="Set submission name")
130     model.add_option('--db-path', default=None,
131                      help="set rdf database path")
132     model.add_option('--model', default=None,
133       help="Load model database")
134     model.add_option('--load-rdf', default=None,
135       help="load rdf statements into model")
136     model.add_option('--sparql', default=None, help="execute sparql query")
137     model.add_option('--print-rdf', action="store_true", default=False,
138       help="print ending model state")
139     model.add_option('--tar', default=TAR,
140                      help="override path to tar command")
141     model.add_option('--lftp', default=LFTP,
142                      help="override path to lftp command")
143     parser.add_option_group(model)
144     # commands
145     commands = OptionGroup(parser, 'commands')
146     commands.add_option('--make-tree-from',
147                       help="create directories & link data files",
148                       default=None)
149     commands.add_option('--fastq', default=False, action="store_true",
150                         help="generate scripts for making fastq files")
151     commands.add_option('--scan-submission', default=False, action="store_true",
152                       help="Import metadata for submission into our model")
153     commands.add_option('--link-daf', default=False, action="store_true",
154                         help="link daf into submission directories")
155     commands.add_option('--make-ddf', help='make the ddfs', default=False,
156                       action="store_true")
157     commands.add_option('--zip-ddf', default=False, action='store_true',
158                         help='zip up just the metadata')
159
160     parser.add_option_group(commands)
161
162     parser.add_option('--force', default=False, action="store_true",
163                       help="Force regenerating fastqs")
164     parser.add_option('--daf', default=None, help='specify daf name')
165     parser.add_option('--library-url', default=None,
166                       help="specify an alternate source for library information")
167     # debugging
168     parser.add_option('--verbose', default=False, action="store_true",
169                       help='verbose logging')
170     parser.add_option('--debug', default=False, action="store_true",
171                       help='debug logging')
172
173     api.add_auth_options(parser)
174
175     return parser
176
177
178 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
179     dag_fragment = []
180     for lib_id, result_dir in library_result_map.items():
181         submissionNode = view_map.get_submission_node(result_dir)
182         dag_fragment.extend(
183             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
184         )
185
186     if make_condor and len(dag_fragment) > 0:
187         dag_filename = 'submission.dagman'
188         if not force and os.path.exists(dag_filename):
189             logger.warn("%s exists, please delete" % (dag_filename,))
190         else:
191             f = open(dag_filename,'w')
192             f.write( os.linesep.join(dag_fragment))
193             f.write( os.linesep )
194             f.close()
195
196
197 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
198     """
199     Make ddf files, and bonus condor file
200     """
201     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
202 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
203 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
204
205 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
206 WHERE {
207   ?file ucscDaf:filename ?files ;
208         ucscDaf:md5sum ?md5sum .
209   ?submitView ucscDaf:has_file ?file ;
210               ucscDaf:view ?dafView ;
211               ucscDaf:submission <%(submission)s> .
212   ?dafView ucscDaf:name ?view .
213   <%(submission)s> submissionOntology:library ?library ;
214
215   OPTIONAL { ?library libraryOntology:antibody ?antibody }
216   OPTIONAL { ?library libraryOntology:cell_line ?cell }
217   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
218   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
219   OPTIONAL { ?library ucscDaf:sex ?sex }
220   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
221   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
222   OPTIONAL { ?library libraryOntology:replicate ?replicate }
223   OPTIONAL { ?library libraryOntology:condition_term ?treatment }
224   OPTIONAL { ?library ucscDaf:protocol ?protocol }
225   OPTIONAL { ?library ucscDaf:readType ?readType }
226   OPTIONAL { ?library ucscDaf:strain ?strain }
227   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
228   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
229 }
230 ORDER BY  ?submitView"""
231     dag_fragments = []
232
233     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
234     if name is None:
235         logger.error("Need name for %s" % (str(submissionNode)))
236         return []
237
238     ddf_name = make_ddf_name(name)
239     if outdir is not None:
240         outfile = os.path.join(outdir, ddf_name)
241         output = open(outfile,'w')
242     else:
243         outfile = 'stdout:'
244         output = sys.stdout
245
246     formatted_query = query_template % {'submission': str(submissionNode.uri)}
247
248     query = RDF.SPARQLQuery(formatted_query)
249     results = query.execute(view_map.model)
250
251     # filename goes first
252     variables = view_map.get_daf_variables()
253     # 'controlId',
254     output.write('\t'.join(variables))
255     output.write(os.linesep)
256
257     all_views = {}
258     all_files = []
259     for row in results:
260         viewname = fromTypedNode(row['view'])
261         current = all_views.setdefault(viewname, {})
262         for variable_name in variables:
263             value = str(fromTypedNode(row[variable_name]))
264             if value is None or value == 'None':
265                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
266             if variable_name in ('files', 'md5sum'):
267                 current.setdefault(variable_name,[]).append(value)
268             else:
269                 current[variable_name] = value
270
271     for view in all_views.keys():
272         line = []
273         for variable_name in variables:
274             if variable_name in ('files', 'md5sum'):
275                 line.append(','.join(all_views[view][variable_name]))
276             else:
277                 line.append(all_views[view][variable_name])
278         output.write("\t".join(line))
279         output.write(os.linesep)
280         all_files.extend(all_views[view]['files'])
281
282     logger.info(
283         "Examined {0}, found files: {1}".format(
284             str(submissionNode), ", ".join(all_files)))
285
286     all_files.append(daf_name)
287     all_files.append(ddf_name)
288
289     if make_condor:
290         archive_condor = make_condor_archive_script(name, all_files, outdir)
291         upload_condor = make_condor_upload_script(name, outdir)
292
293         dag_fragments.extend(
294             make_dag_fragment(name, archive_condor, upload_condor)
295         )
296
297     return dag_fragments
298
299
300 def zip_ddfs(view_map, library_result_map, daf_name):
301     """zip up just the ddf & daf files
302     """
303     rootdir = os.getcwd()
304     for lib_id, result_dir in library_result_map:
305         submissionNode = view_map.get_submission_node(result_dir)
306         nameNode = view_map.model.get_target(submissionNode,
307                                              submissionOntology['name'])
308         name = fromTypedNode(nameNode)
309         if name is None:
310             logger.error("Need name for %s" % (str(submissionNode)))
311             continue
312
313         zip_name = '../{0}.zip'.format(lib_id)
314         os.chdir(os.path.join(rootdir, result_dir))
315         with ZipFile(zip_name, 'w') as stream:
316             stream.write(make_ddf_name(name))
317             stream.write(daf_name)
318         os.chdir(rootdir)
319
320
321 def make_condor_archive_script(name, files, outdir=None):
322     script = """Universe = vanilla
323
324 Executable = %(tar)s
325 arguments = czvhf ../%(archivename)s %(filelist)s
326
327 Error = compress.out.$(Process).log
328 Output = compress.out.$(Process).log
329 Log = /tmp/submission-compress-%(user)s.log
330 initialdir = %(initialdir)s
331 environment="GZIP=-3"
332 request_memory = 20
333
334 queue
335 """
336     if outdir is None:
337         outdir = os.getcwd()
338     for f in files:
339         pathname = os.path.join(outdir, f)
340         if not os.path.exists(pathname):
341             raise RuntimeError("Missing %s from %s" % (f,outdir))
342
343     context = {'archivename': make_submission_name(name),
344                'filelist': " ".join(files),
345                'initialdir': os.path.abspath(outdir),
346                'user': os.getlogin(),
347                'tar': TAR}
348
349     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
350     condor_stream = open(condor_script,'w')
351     condor_stream.write(script % context)
352     condor_stream.close()
353     return condor_script
354
355
356 def make_condor_upload_script(name, lftp, outdir=None):
357     script = """Universe = vanilla
358
359 Executable = %(lftp)s
360 arguments = -c put %(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
361
362 Error = upload.out.$(Process).log
363 Output = upload.out.$(Process).log
364 Log = /tmp/submission-upload-%(user)s.log
365 initialdir = %(initialdir)s
366
367 queue
368 """
369     if outdir is None:
370         outdir = os.getcwd()
371
372     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
373
374     encodeftp = 'encodeftp.cse.ucsc.edu'
375     ftpuser = auth.hosts[encodeftp][0]
376     ftppassword = auth.hosts[encodeftp][2]
377     context = {'archivename': make_submission_name(name),
378                'initialdir': os.path.abspath(outdir),
379                'user': os.getlogin(),
380                'ftpuser': ftpuser,
381                'ftppassword': ftppassword,
382                'ftphost': encodeftp,
383                'lftp': LFTP}
384
385     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
386     condor_stream = open(condor_script,'w')
387     condor_stream.write(script % context)
388     condor_stream.close()
389     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
390
391     return condor_script
392
393
394 def make_dag_fragment(ininame, archive_condor, upload_condor):
395     """
396     Make the couple of fragments compress and then upload the data.
397     """
398     cur_dir = os.getcwd()
399     archive_condor = os.path.join(cur_dir, archive_condor)
400     upload_condor = os.path.join(cur_dir, upload_condor)
401     job_basename = make_base_name(ininame)
402
403     fragments = []
404     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
405     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
406     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
407
408     return fragments
409
410
411 def make_base_name(pathname):
412     base = os.path.basename(pathname)
413     name, ext = os.path.splitext(base)
414     return name
415
416
417 def make_submission_name(ininame):
418     name = make_base_name(ininame)
419     return name + ".tgz"
420
421
422 def make_ddf_name(pathname):
423     name = make_base_name(pathname)
424     return name + ".ddf"
425
426
427 def make_condor_name(pathname, run_type=None):
428     name = make_base_name(pathname)
429     elements = [name]
430     if run_type is not None:
431         elements.append(run_type)
432     elements.append("condor")
433     return ".".join(elements)
434
435
436 def parse_filelist(file_string):
437     return file_string.split(",")
438
439
440 def validate_filelist(files):
441     """
442     Die if a file doesn't exist in a file list
443     """
444     for f in files:
445         if not os.path.exists(f):
446             raise RuntimeError("%s does not exist" % (f,))
447
448 if __name__ == "__main__":
449     main()