Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20 from zipfile import ZipFile
21
22 import RDF
23
24 from htsworkflow.util import api
25 from htsworkflow.util.rdfhelp import \
26      dafTermOntology, \
27      fromTypedNode, \
28      get_model, \
29      get_serializer, \
30      load_into_model, \
31      sparql_query, \
32      submissionOntology
33 from htsworkflow.submission.daf import \
34      DAFMapper, \
35      MetadataLookupException, \
36      get_submission_uri
37 from htsworkflow.submission.condorfastq import CondorFastqExtract
38
39 logger = logging.getLogger('ucsc_gather')
40
41 def main(cmdline=None):
42     parser = make_parser()
43     opts, args = parser.parse_args(cmdline)
44     submission_uri = None
45
46     if opts.debug:
47         logging.basicConfig(level = logging.DEBUG )
48     elif opts.verbose:
49         logging.basicConfig(level = logging.INFO )
50     else:
51         logging.basicConfig(level = logging.WARNING )
52
53     apidata = api.make_auth_from_opts(opts, parser)
54
55     model = get_model(opts.model, opts.db_path)
56     if opts.name:
57         mapper = DAFMapper(opts.name, opts.daf,  model)
58         if opts.library_url is not None:
59             mapper.library_url = opts.library_url
60         submission_uri = get_submission_uri(opts.name)
61
62
63     if opts.load_rdf is not None:
64         if submission_uri is None:
65             parser.error("Please specify the submission name")
66         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
67
68     if opts.make_ddf and opts.daf is None:
69         parser.error("Please specify your daf when making ddf files")
70
71     library_result_map = []
72     for a in args:
73         library_result_map.extend(read_library_result_map(a))
74
75     if opts.make_tree_from is not None:
76         make_tree_from(opts.make_tree_from, library_result_map)
77
78     if opts.link_daf:
79         if opts.daf is None:
80             parser.error("Please specify daf filename with --daf")
81         link_daf(opts.daf, library_result_map)
82
83     if opts.fastq:
84         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
85                                        force=opts.force)
86         extractor.create_scripts(library_result_map)
87
88     if opts.scan_submission:
89         scan_submission_dirs(mapper, library_result_map)
90
91     if opts.make_ddf:
92         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
93
94     if opts.zip_ddf:
95         zip_ddfs(mapper, library_result_map, opts.daf)
96
97     if opts.sparql:
98         sparql_query(model, opts.sparql)
99
100     if opts.print_rdf:
101         writer = get_serializer()
102         print writer.serialize_model_to_string(model)
103
104
105 def make_parser():
106     parser = OptionParser()
107
108     model = OptionGroup(parser, 'model')
109     model.add_option('--name', help="Set submission name")
110     model.add_option('--db-path', default=None,
111                      help="set rdf database path")
112     model.add_option('--model', default=None,
113       help="Load model database")
114     model.add_option('--load-rdf', default=None,
115       help="load rdf statements into model")
116     model.add_option('--sparql', default=None, help="execute sparql query")
117     model.add_option('--print-rdf', action="store_true", default=False,
118       help="print ending model state")
119     parser.add_option_group(model)
120     # commands
121     commands = OptionGroup(parser, 'commands')
122     commands.add_option('--make-tree-from',
123                       help="create directories & link data files",
124                       default=None)
125     commands.add_option('--fastq', default=False, action="store_true",
126                         help="generate scripts for making fastq files")
127     commands.add_option('--scan-submission', default=False, action="store_true",
128                       help="Import metadata for submission into our model")
129     commands.add_option('--link-daf', default=False, action="store_true",
130                         help="link daf into submission directories")
131     commands.add_option('--make-ddf', help='make the ddfs', default=False,
132                       action="store_true")
133     commands.add_option('--zip-ddf', default=False, action='store_true',
134                         help='zip up just the metadata')
135
136     parser.add_option_group(commands)
137
138     parser.add_option('--force', default=False, action="store_true",
139                       help="Force regenerating fastqs")
140     parser.add_option('--daf', default=None, help='specify daf name')
141     parser.add_option('--library-url', default=None,
142                       help="specify an alternate source for library information")
143     # debugging
144     parser.add_option('--verbose', default=False, action="store_true",
145                       help='verbose logging')
146     parser.add_option('--debug', default=False, action="store_true",
147                       help='debug logging')
148
149     api.add_auth_options(parser)
150
151     return parser
152
153 def make_tree_from(source_path, library_result_map):
154     """Create a tree using data files from source path.
155     """
156     for lib_id, lib_path in library_result_map:
157         if not os.path.exists(lib_path):
158             logger.info("Making dir {0}".format(lib_path))
159             os.mkdir(lib_path)
160         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
161         if os.path.exists(source_lib_dir):
162             pass
163         for filename in os.listdir(source_lib_dir):
164             source_pathname = os.path.join(source_lib_dir, filename)
165             target_pathname = os.path.join(lib_path, filename)
166             if not os.path.exists(source_pathname):
167                 raise IOError("{0} does not exist".format(source_pathname))
168             if not os.path.exists(target_pathname):
169                 os.symlink(source_pathname, target_pathname)
170                 logger.info(
171                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
172
173
174 def link_daf(daf_path, library_result_map):
175     if not os.path.exists(daf_path):
176         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
177
178     base_daf = os.path.basename(daf_path)
179
180     for lib_id, result_dir in library_result_map:
181         if not os.path.exists(result_dir):
182             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
183         submission_daf = os.path.join(result_dir, base_daf)
184         if not os.path.exists(submission_daf):
185             if not os.path.exists(daf_path):
186                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
187             os.link(daf_path, submission_daf)
188
189
190 def scan_submission_dirs(view_map, library_result_map):
191     """Look through our submission directories and collect needed information
192     """
193     for lib_id, result_dir in library_result_map:
194         logger.info("Importing %s from %s" % (lib_id, result_dir))
195         try:
196             view_map.import_submission_dir(result_dir, lib_id)
197         except MetadataLookupException, e:
198             logger.error("Skipping %s: %s" % (lib_id, str(e)))
199
200
201 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
202     dag_fragment = []
203     for lib_id, result_dir in library_result_map:
204         submissionNode = view_map.get_submission_node(result_dir)
205         dag_fragment.extend(
206             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
207         )
208
209     if make_condor and len(dag_fragment) > 0:
210         dag_filename = 'submission.dagman'
211         if not force and os.path.exists(dag_filename):
212             logger.warn("%s exists, please delete" % (dag_filename,))
213         else:
214             f = open(dag_filename,'w')
215             f.write( os.linesep.join(dag_fragment))
216             f.write( os.linesep )
217             f.close()
218
219
220 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
221     """
222     Make ddf files, and bonus condor file
223     """
224     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
225 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
226 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
227
228 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
229 WHERE {
230   ?file ucscDaf:filename ?files ;
231         ucscDaf:md5sum ?md5sum .
232   ?submitView ucscDaf:has_file ?file ;
233               ucscDaf:view ?dafView ;
234               ucscDaf:submission <%(submission)s> .
235   ?dafView ucscDaf:name ?view .
236   <%(submission)s> submissionOntology:library ?library ;
237
238   OPTIONAL { ?library libraryOntology:antibody ?antibody }
239   OPTIONAL { ?library libraryOntology:cell_line ?cell }
240   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
241   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
242   OPTIONAL { ?library ucscDaf:sex ?sex }
243   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
244   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
245   OPTIONAL { ?library libraryOntology:replicate ?replicate }
246   OPTIONAL { ?library libraryOntology:condition_term ?treatment }
247   OPTIONAL { ?library ucscDaf:protocol ?protocol }
248   OPTIONAL { ?library ucscDaf:readType ?readType }
249   OPTIONAL { ?library ucscDaf:strain ?strain }
250   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
251   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
252 }
253 ORDER BY  ?submitView"""
254     dag_fragments = []
255
256     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
257     if name is None:
258         logger.error("Need name for %s" % (str(submissionNode)))
259         return []
260
261     ddf_name = make_ddf_name(name)
262     if outdir is not None:
263         outfile = os.path.join(outdir, ddf_name)
264         output = open(outfile,'w')
265     else:
266         outfile = 'stdout:'
267         output = sys.stdout
268
269     formatted_query = query_template % {'submission': str(submissionNode.uri)}
270
271     query = RDF.SPARQLQuery(formatted_query)
272     results = query.execute(view_map.model)
273
274     # filename goes first
275     variables = view_map.get_daf_variables()
276     # 'controlId',
277     output.write('\t'.join(variables))
278     output.write(os.linesep)
279
280     all_views = {}
281     all_files = []
282     for row in results:
283         viewname = fromTypedNode(row['view'])
284         current = all_views.setdefault(viewname, {})
285         for variable_name in variables:
286             value = str(fromTypedNode(row[variable_name]))
287             if value is None or value == 'None':
288                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
289             if variable_name in ('files', 'md5sum'):
290                 current.setdefault(variable_name,[]).append(value)
291             else:
292                 current[variable_name] = value
293
294     for view in all_views.keys():
295         line = []
296         for variable_name in variables:
297             if variable_name in ('files', 'md5sum'):
298                 line.append(','.join(all_views[view][variable_name]))
299             else:
300                 line.append(all_views[view][variable_name])
301         output.write("\t".join(line))
302         output.write(os.linesep)
303         all_files.extend(all_views[view]['files'])
304
305     logger.info(
306         "Examined {0}, found files: {1}".format(
307             str(submissionNode), ", ".join(all_files)))
308
309     all_files.append(daf_name)
310     all_files.append(ddf_name)
311
312     if make_condor:
313         archive_condor = make_condor_archive_script(name, all_files, outdir)
314         upload_condor = make_condor_upload_script(name, outdir)
315
316         dag_fragments.extend(
317             make_dag_fragment(name, archive_condor, upload_condor)
318         )
319
320     return dag_fragments
321
322
323 def zip_ddfs(view_map, library_result_map, daf_name):
324     """zip up just the ddf & daf files
325     """
326     rootdir = os.getcwd()
327     for lib_id, result_dir in library_result_map:
328         submissionNode = view_map.get_submission_node(result_dir)
329         nameNode = view_map.model.get_target(submissionNode,
330                                              submissionOntology['name'])
331         name = fromTypedNode(nameNode)
332         if name is None:
333             logger.error("Need name for %s" % (str(submissionNode)))
334             continue
335
336         zip_name = '../{0}.zip'.format(lib_id)
337         os.chdir(os.path.join(rootdir, result_dir))
338         with ZipFile(zip_name, 'w') as stream:
339             stream.write(make_ddf_name(name))
340             stream.write(daf_name)
341         os.chdir(rootdir)
342
343
344 def read_library_result_map(filename):
345     """
346     Read a file that maps library id to result directory.
347     Does not support spaces in filenames.
348
349     For example:
350       10000 result/foo/bar
351     """
352     stream = open(filename,'r')
353
354     results = []
355     for line in stream:
356         line = line.rstrip()
357         if not line.startswith('#') and len(line) > 0 :
358             library_id, result_dir = line.split()
359             results.append((library_id, result_dir))
360     return results
361
362
363 def make_condor_archive_script(name, files, outdir=None):
364     script = """Universe = vanilla
365
366 Executable = /bin/tar
367 arguments = czvhf ../%(archivename)s %(filelist)s
368
369 Error = compress.out.$(Process).log
370 Output = compress.out.$(Process).log
371 Log = /tmp/submission-compress-%(user)s.log
372 initialdir = %(initialdir)s
373 environment="GZIP=-3"
374 request_memory = 20
375
376 queue
377 """
378     if outdir is None:
379         outdir = os.getcwd()
380     for f in files:
381         pathname = os.path.join(outdir, f)
382         if not os.path.exists(pathname):
383             raise RuntimeError("Missing %s from %s" % (f,outdir))
384
385     context = {'archivename': make_submission_name(name),
386                'filelist': " ".join(files),
387                'initialdir': os.path.abspath(outdir),
388                'user': os.getlogin()}
389
390     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
391     condor_stream = open(condor_script,'w')
392     condor_stream.write(script % context)
393     condor_stream.close()
394     return condor_script
395
396
397 def make_condor_upload_script(name, outdir=None):
398     script = """Universe = vanilla
399
400 Executable = /usr/bin/lftp
401 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
402
403 Error = upload.out.$(Process).log
404 Output = upload.out.$(Process).log
405 Log = /tmp/submission-upload-%(user)s.log
406 initialdir = %(initialdir)s
407
408 queue
409 """
410     if outdir is None:
411         outdir = os.getcwd()
412
413     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
414
415     encodeftp = 'encodeftp.cse.ucsc.edu'
416     ftpuser = auth.hosts[encodeftp][0]
417     ftppassword = auth.hosts[encodeftp][2]
418     context = {'archivename': make_submission_name(name),
419                'initialdir': os.path.abspath(outdir),
420                'user': os.getlogin(),
421                'ftpuser': ftpuser,
422                'ftppassword': ftppassword,
423                'ftphost': encodeftp}
424
425     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
426     condor_stream = open(condor_script,'w')
427     condor_stream.write(script % context)
428     condor_stream.close()
429     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
430
431     return condor_script
432
433
434 def make_dag_fragment(ininame, archive_condor, upload_condor):
435     """
436     Make the couple of fragments compress and then upload the data.
437     """
438     cur_dir = os.getcwd()
439     archive_condor = os.path.join(cur_dir, archive_condor)
440     upload_condor = os.path.join(cur_dir, upload_condor)
441     job_basename = make_base_name(ininame)
442
443     fragments = []
444     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
445     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
446     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
447
448     return fragments
449
450
451 def get_library_info(host, apidata, library_id):
452     url = api.library_url(host, library_id)
453     contents = api.retrieve_info(url, apidata)
454     return contents
455
456
457 def make_base_name(pathname):
458     base = os.path.basename(pathname)
459     name, ext = os.path.splitext(base)
460     return name
461
462
463 def make_submission_name(ininame):
464     name = make_base_name(ininame)
465     return name + ".tgz"
466
467
468 def make_ddf_name(pathname):
469     name = make_base_name(pathname)
470     return name + ".ddf"
471
472
473 def make_condor_name(pathname, run_type=None):
474     name = make_base_name(pathname)
475     elements = [name]
476     if run_type is not None:
477         elements.append(run_type)
478     elements.append("condor")
479     return ".".join(elements)
480
481
482 def parse_filelist(file_string):
483     return file_string.split(",")
484
485
486 def validate_filelist(files):
487     """
488     Die if a file doesn't exist in a file list
489     """
490     for f in files:
491         if not os.path.exists(f):
492             raise RuntimeError("%s does not exist" % (f,))
493
494 if __name__ == "__main__":
495     main()