Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20
21 import RDF
22
23 from htsworkflow.util import api
24 from htsworkflow.util.rdfhelp import \
25      dafTermOntology, \
26      fromTypedNode, \
27      get_model, \
28      get_serializer, \
29      load_into_model, \
30      sparql_query, \
31      submissionOntology
32 from htsworkflow.submission.daf import \
33      DAFMapper, \
34      MetadataLookupException, \
35      get_submission_uri
36 from htsworkflow.submission.condorfastq import CondorFastqExtract
37
38 logger = logging.getLogger('ucsc_gather')
39
40 def main(cmdline=None):
41     parser = make_parser()
42     opts, args = parser.parse_args(cmdline)
43     submission_uri = None
44
45     if opts.debug:
46         logging.basicConfig(level = logging.DEBUG )
47     elif opts.verbose:
48         logging.basicConfig(level = logging.INFO )
49     else:
50         logging.basicConfig(level = logging.WARNING )
51
52     apidata = api.make_auth_from_opts(opts, parser)
53
54     model = get_model(opts.load_model)
55     if opts.name:
56         mapper = DAFMapper(opts.name, opts.daf,  model)
57         if opts.library_url is not None:
58             mapper.library_url = opts.library_url
59         submission_uri = get_submission_uri(opts.name)
60
61
62     if opts.load_rdf is not None:
63         if submission_uri is None:
64             parser.error("Please specify the submission name")
65         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
66
67     if opts.make_ddf and opts.daf is None:
68         parser.error("Please specify your daf when making ddf files")
69
70     library_result_map = []
71     for a in args:
72         library_result_map.extend(read_library_result_map(a))
73
74     if opts.make_tree_from is not None:
75         make_tree_from(opts.make_tree_from, library_result_map)
76
77     if opts.link_daf:
78         if opts.daf is None:
79             parser.error("Please specify daf filename with --daf")
80         link_daf(opts.daf, library_result_map)
81
82     if opts.fastq:
83         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
84                                        force=opts.force)
85         extractor.build_fastqs(library_result_map)
86
87     if opts.scan_submission:
88         scan_submission_dirs(mapper, library_result_map)
89
90     if opts.make_ddf:
91         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
92
93     if opts.sparql:
94         sparql_query(model, opts.sparql)
95
96     if opts.print_rdf:
97         writer = get_serializer()
98         print writer.serialize_model_to_string(model)
99
100
101 def make_parser():
102     parser = OptionParser()
103
104     model = OptionGroup(parser, 'model')
105     model.add_option('--name', help="Set submission name")
106     model.add_option('--load-model', default=None,
107       help="Load model database")
108     model.add_option('--load-rdf', default=None,
109       help="load rdf statements into model")
110     model.add_option('--sparql', default=None, help="execute sparql query")
111     model.add_option('--print-rdf', action="store_true", default=False,
112       help="print ending model state")
113     parser.add_option_group(model)
114     # commands
115     commands = OptionGroup(parser, 'commands')
116     commands.add_option('--make-tree-from',
117                       help="create directories & link data files",
118                       default=None)
119     commands.add_option('--fastq', default=False, action="store_true",
120                         help="generate scripts for making fastq files")
121     commands.add_option('--scan-submission', default=False, action="store_true",
122                       help="Import metadata for submission into our model")
123     commands.add_option('--link-daf', default=False, action="store_true",
124                         help="link daf into submission directories")
125     commands.add_option('--make-ddf', help='make the ddfs', default=False,
126                       action="store_true")
127     parser.add_option_group(commands)
128
129     parser.add_option('--force', default=False, action="store_true",
130                       help="Force regenerating fastqs")
131     parser.add_option('--daf', default=None, help='specify daf name')
132     parser.add_option('--library-url', default=None,
133                       help="specify an alternate source for library information")
134     # debugging
135     parser.add_option('--verbose', default=False, action="store_true",
136                       help='verbose logging')
137     parser.add_option('--debug', default=False, action="store_true",
138                       help='debug logging')
139
140     api.add_auth_options(parser)
141
142     return parser
143
144 def make_tree_from(source_path, library_result_map):
145     """Create a tree using data files from source path.
146     """
147     for lib_id, lib_path in library_result_map:
148         if not os.path.exists(lib_path):
149             logger.info("Making dir {0}".format(lib_path))
150             os.mkdir(lib_path)
151         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
152         if os.path.exists(source_lib_dir):
153             pass
154         for filename in os.listdir(source_lib_dir):
155             source_pathname = os.path.join(source_lib_dir, filename)
156             target_pathname = os.path.join(lib_path, filename)
157             if not os.path.exists(source_pathname):
158                 raise IOError("{0} does not exist".format(source_pathname))
159             if not os.path.exists(target_pathname):
160                 os.symlink(source_pathname, target_pathname)
161                 logger.info(
162                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
163
164
165 def link_daf(daf_path, library_result_map):
166     if not os.path.exists(daf_path):
167         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
168
169     base_daf = os.path.basename(daf_path)
170
171     for lib_id, result_dir in library_result_map:
172         if not os.path.exists(result_dir):
173             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
174         submission_daf = os.path.join(result_dir, base_daf)
175         if not os.path.exists(submission_daf):
176             if not os.path.exists(daf_path):
177                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
178             os.link(daf_path, submission_daf)
179
180
181 def scan_submission_dirs(view_map, library_result_map):
182     """Look through our submission directories and collect needed information
183     """
184     for lib_id, result_dir in library_result_map:
185         logger.info("Importing %s from %s" % (lib_id, result_dir))
186         try:
187             view_map.import_submission_dir(result_dir, lib_id)
188         except MetadataLookupException, e:
189             logger.error("Skipping %s: %s" % (lib_id, str(e)))
190
191 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
192     dag_fragment = []
193     for lib_id, result_dir in library_result_map:
194         submissionNode = view_map.get_submission_node(result_dir)
195         dag_fragment.extend(
196             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
197         )
198
199     if make_condor and len(dag_fragment) > 0:
200         dag_filename = 'submission.dagman'
201         if not force and os.path.exists(dag_filename):
202             logger.warn("%s exists, please delete" % (dag_filename,))
203         else:
204             f = open(dag_filename,'w')
205             f.write( os.linesep.join(dag_fragment))
206             f.write( os.linesep )
207             f.close()
208
209
210 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
211     """
212     Make ddf files, and bonus condor file
213     """
214     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
215 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
216 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
217
218 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
219 WHERE {
220   ?file ucscDaf:filename ?files ;
221         ucscDaf:md5sum ?md5sum .
222   ?submitView ucscDaf:has_file ?file ;
223               ucscDaf:view ?dafView ;
224               ucscDaf:submission <%(submission)s> .
225   ?dafView ucscDaf:name ?view .
226   <%(submission)s> submissionOntology:library ?library ;
227
228   OPTIONAL { ?library libraryOntology:antibody ?antibody }
229   OPTIONAL { ?library libraryOntology:cell_line ?cell }
230   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
231   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
232   OPTIONAL { ?library ucscDaf:sex ?sex }
233   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
234   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
235   OPTIONAL { ?library libraryOntology:replicate ?replicate }
236   OPTIONAL { ?library libraryOntology:condition ?treatment }
237   OPTIONAL { ?library ucscDaf:protocol ?protocol }
238   OPTIONAL { ?library ucscDaf:readType ?readType }
239   OPTIONAL { ?library ucscDaf:strain ?strain }
240   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
241   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
242 }
243 ORDER BY  ?submitView"""
244     dag_fragments = []
245
246     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
247     if name is None:
248         logger.error("Need name for %s" % (str(submissionNode)))
249         return []
250
251     ddf_name = name + '.ddf'
252     if outdir is not None:
253         outfile = os.path.join(outdir, ddf_name)
254         output = open(outfile,'w')
255     else:
256         outfile = 'stdout:'
257         output = sys.stdout
258
259     formatted_query = query_template % {'submission': str(submissionNode.uri)}
260
261     query = RDF.SPARQLQuery(formatted_query)
262     results = query.execute(view_map.model)
263
264     # filename goes first
265     variables = view_map.get_daf_variables()
266     # 'controlId',
267     output.write('\t'.join(variables))
268     output.write(os.linesep)
269
270     all_views = {}
271     all_files = []
272     for row in results:
273         viewname = fromTypedNode(row['view'])
274         current = all_views.setdefault(viewname, {})
275         for variable_name in variables:
276             value = str(fromTypedNode(row[variable_name]))
277             if value is None or value == 'None':
278                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
279             if variable_name in ('files', 'md5sum'):
280                 current.setdefault(variable_name,[]).append(value)
281             else:
282                 current[variable_name] = value
283
284     for view in all_views.keys():
285         line = []
286         for variable_name in variables:
287             if variable_name in ('files', 'md5sum'):
288                 line.append(','.join(all_views[view][variable_name]))
289             else:
290                 line.append(all_views[view][variable_name])
291         output.write("\t".join(line))
292         output.write(os.linesep)
293         all_files.extend(all_views[view]['files'])
294
295     logger.info(
296         "Examined {0}, found files: {1}".format(
297             str(submissionNode), ", ".join(all_files)))
298
299     all_files.append(daf_name)
300     all_files.append(ddf_name)
301
302     if make_condor:
303         archive_condor = make_condor_archive_script(name, all_files, outdir)
304         upload_condor = make_condor_upload_script(name, outdir)
305
306         dag_fragments.extend(
307             make_dag_fragment(name, archive_condor, upload_condor)
308         )
309
310     return dag_fragments
311
312
313 def read_library_result_map(filename):
314     """
315     Read a file that maps library id to result directory.
316     Does not support spaces in filenames.
317
318     For example:
319       10000 result/foo/bar
320     """
321     stream = open(filename,'r')
322
323     results = []
324     for line in stream:
325         line = line.rstrip()
326         if not line.startswith('#') and len(line) > 0 :
327             library_id, result_dir = line.split()
328             results.append((library_id, result_dir))
329     return results
330
331
332 def make_condor_archive_script(name, files, outdir=None):
333     script = """Universe = vanilla
334
335 Executable = /bin/tar
336 arguments = czvhf ../%(archivename)s %(filelist)s
337
338 Error = compress.out.$(Process).log
339 Output = compress.out.$(Process).log
340 Log = /tmp/submission-compress-%(user)s.log
341 initialdir = %(initialdir)s
342 environment="GZIP=-3"
343 request_memory = 20
344
345 queue
346 """
347     if outdir is None:
348         outdir = os.getcwd()
349     for f in files:
350         pathname = os.path.join(outdir, f)
351         if not os.path.exists(pathname):
352             raise RuntimeError("Missing %s from %s" % (f,outdir))
353
354     context = {'archivename': make_submission_name(name),
355                'filelist': " ".join(files),
356                'initialdir': os.path.abspath(outdir),
357                'user': os.getlogin()}
358
359     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
360     condor_stream = open(condor_script,'w')
361     condor_stream.write(script % context)
362     condor_stream.close()
363     return condor_script
364
365
366 def make_condor_upload_script(name, outdir=None):
367     script = """Universe = vanilla
368
369 Executable = /usr/bin/lftp
370 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
371
372 Error = upload.out.$(Process).log
373 Output = upload.out.$(Process).log
374 Log = /tmp/submission-upload-%(user)s.log
375 initialdir = %(initialdir)s
376
377 queue
378 """
379     if outdir is None:
380         outdir = os.getcwd()
381
382     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
383
384     encodeftp = 'encodeftp.cse.ucsc.edu'
385     ftpuser = auth.hosts[encodeftp][0]
386     ftppassword = auth.hosts[encodeftp][2]
387     context = {'archivename': make_submission_name(name),
388                'initialdir': os.path.abspath(outdir),
389                'user': os.getlogin(),
390                'ftpuser': ftpuser,
391                'ftppassword': ftppassword,
392                'ftphost': encodeftp}
393
394     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
395     condor_stream = open(condor_script,'w')
396     condor_stream.write(script % context)
397     condor_stream.close()
398     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
399
400     return condor_script
401
402
403 def make_dag_fragment(ininame, archive_condor, upload_condor):
404     """
405     Make the couple of fragments compress and then upload the data.
406     """
407     cur_dir = os.getcwd()
408     archive_condor = os.path.join(cur_dir, archive_condor)
409     upload_condor = os.path.join(cur_dir, upload_condor)
410     job_basename = make_base_name(ininame)
411
412     fragments = []
413     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
414     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
415     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
416
417     return fragments
418
419
420 def get_library_info(host, apidata, library_id):
421     url = api.library_url(host, library_id)
422     contents = api.retrieve_info(url, apidata)
423     return contents
424
425
426 def make_submission_section(line_counter, files, attributes):
427     """
428     Create a section in the submission ini file
429     """
430     inifile = [ "[line%s]" % (line_counter,) ]
431     inifile += ["files=%s" % (",".join(files))]
432
433     for k,v in attributes.items():
434         inifile += ["%s=%s" % (k,v)]
435     return inifile
436
437
438 def make_base_name(pathname):
439     base = os.path.basename(pathname)
440     name, ext = os.path.splitext(base)
441     return name
442
443
444 def make_submission_name(ininame):
445     name = make_base_name(ininame)
446     return name + ".tgz"
447
448
449 def make_ddf_name(pathname):
450     name = make_base_name(pathname)
451     return name + ".ddf"
452
453
454 def make_condor_name(pathname, run_type=None):
455     name = make_base_name(pathname)
456     elements = [name]
457     if run_type is not None:
458         elements.append(run_type)
459     elements.append("condor")
460     return ".".join(elements)
461
462
463 def parse_filelist(file_string):
464     return file_string.split(",")
465
466
467 def validate_filelist(files):
468     """
469     Die if a file doesn't exist in a file list
470     """
471     for f in files:
472         if not os.path.exists(f):
473             raise RuntimeError("%s does not exist" % (f,))
474
475 if __name__ == "__main__":
476     main()