f1e092015a8db5fbf90a7d51feffdd340d21078a
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20
21 import RDF
22
23 from htsworkflow.util import api
24 from htsworkflow.util.rdfhelp import \
25      dafTermOntology, \
26      fromTypedNode, \
27      get_model, \
28      get_serializer, \
29      load_into_model, \
30      sparql_query, \
31      submissionOntology
32 from htsworkflow.submission.daf import \
33      DAFMapper, \
34      MetadataLookupException, \
35      get_submission_uri
36 from htsworkflow.submission.condorfastq import CondorFastqExtract
37
38 logger = logging.getLogger('ucsc_gather')
39
40 def main(cmdline=None):
41     parser = make_parser()
42     opts, args = parser.parse_args(cmdline)
43     submission_uri = None
44
45     if opts.debug:
46         logging.basicConfig(level = logging.DEBUG )
47     elif opts.verbose:
48         logging.basicConfig(level = logging.INFO )
49     else:
50         logging.basicConfig(level = logging.WARNING )
51
52     apidata = api.make_auth_from_opts(opts, parser)
53
54     model = get_model(opts.load_model)
55     if opts.name:
56         mapper = DAFMapper(opts.name, opts.daf,  model)
57         if opts.library_url is not None:
58             mapper.library_url = opts.library_url
59         submission_uri = get_submission_uri(opts.name)
60
61
62     if opts.load_rdf is not None:
63         if submission_uri is None:
64             parser.error("Please specify the submission name")
65         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
66
67     if opts.make_ddf and opts.daf is None:
68         parser.error("Please specify your daf when making ddf files")
69
70     library_result_map = []
71     for a in args:
72         library_result_map.extend(read_library_result_map(a))
73
74     if opts.make_tree_from is not None:
75         make_tree_from(opts.make_tree_from, library_result_map)
76
77     if opts.link_daf:
78         if opts.daf is None:
79             parser.error("Please specify daf filename with --daf")
80         link_daf(opts.daf, library_result_map)
81
82     if opts.fastq:
83         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
84                                        force=opts.force)
85         extractor.build_fastqs(library_result_map)
86
87     if opts.scan_submission:
88         scan_submission_dirs(mapper, library_result_map)
89
90     if opts.make_ddf:
91         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
92
93     if opts.sparql:
94         sparql_query(model, opts.sparql)
95
96     if opts.print_rdf:
97         writer = get_serializer()
98         print writer.serialize_model_to_string(model)
99
100
101 def make_parser():
102     parser = OptionParser()
103
104     model = OptionGroup(parser, 'model')
105     model.add_option('--name', help="Set submission name")
106     model.add_option('--load-model', default=None,
107       help="Load model database")
108     model.add_option('--load-rdf', default=None,
109       help="load rdf statements into model")
110     model.add_option('--sparql', default=None, help="execute sparql query")
111     model.add_option('--print-rdf', action="store_true", default=False,
112       help="print ending model state")
113     parser.add_option_group(model)
114     # commands
115     commands = OptionGroup(parser, 'commands')
116     commands.add_option('--make-tree-from',
117                       help="create directories & link data files",
118                       default=None)
119     commands.add_option('--fastq', default=False, action="store_true",
120                         help="generate scripts for making fastq files")
121     commands.add_option('--scan-submission', default=False, action="store_true",
122                       help="Import metadata for submission into our model")
123     commands.add_option('--link-daf', default=False, action="store_true",
124                         help="link daf into submission directories")
125     commands.add_option('--make-ddf', help='make the ddfs', default=False,
126                       action="store_true")
127     parser.add_option_group(commands)
128
129     parser.add_option('--force', default=False, action="store_true",
130                       help="Force regenerating fastqs")
131     parser.add_option('--daf', default=None, help='specify daf name')
132     parser.add_option('--library-url', default=None,
133                       help="specify an alternate source for library information")
134     # debugging
135     parser.add_option('--verbose', default=False, action="store_true",
136                       help='verbose logging')
137     parser.add_option('--debug', default=False, action="store_true",
138                       help='debug logging')
139
140     api.add_auth_options(parser)
141
142     return parser
143
144 def make_tree_from(source_path, library_result_map):
145     """Create a tree using data files from source path.
146     """
147     for lib_id, lib_path in library_result_map:
148         if not os.path.exists(lib_path):
149             logging.info("Making dir {0}".format(lib_path))
150             os.mkdir(lib_path)
151         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
152         if os.path.exists(source_lib_dir):
153             pass
154         for filename in os.listdir(source_lib_dir):
155             source_pathname = os.path.join(source_lib_dir, filename)
156             target_pathname = os.path.join(lib_path, filename)
157             if not os.path.exists(source_pathname):
158                 raise IOError("{0} does not exist".format(source_pathname))
159             if not os.path.exists(target_pathname):
160                 os.symlink(source_pathname, target_pathname)
161                 logging.info(
162                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
163
164
165 def link_daf(daf_path, library_result_map):
166     if not os.path.exists(daf_path):
167         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
168
169     base_daf = os.path.basename(daf_path)
170
171     for lib_id, result_dir in library_result_map:
172         if not os.path.exists(result_dir):
173             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
174         submission_daf = os.path.join(result_dir, base_daf)
175         if not os.path.exists(submission_daf):
176             if not os.path.exists(daf_path):
177                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
178             os.link(daf_path, submission_daf)
179
180
181 def scan_submission_dirs(view_map, library_result_map):
182     """Look through our submission directories and collect needed information
183     """
184     for lib_id, result_dir in library_result_map:
185         logging.info("Importing %s from %s" % (lib_id, result_dir))
186         try:
187             view_map.import_submission_dir(result_dir, lib_id)
188         except MetadataLookupException, e:
189             logging.error("Skipping %s: %s" % (lib_id, str(e)))
190
191 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
192     dag_fragment = []
193     for lib_id, result_dir in library_result_map:
194         submissionNode = view_map.get_submission_node(result_dir)
195         dag_fragment.extend(
196             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
197         )
198
199     if make_condor and len(dag_fragment) > 0:
200         dag_filename = 'submission.dagman'
201         if not force and os.path.exists(dag_filename):
202             logging.warn("%s exists, please delete" % (dag_filename,))
203         else:
204             f = open(dag_filename,'w')
205             f.write( os.linesep.join(dag_fragment))
206             f.write( os.linesep )
207             f.close()
208
209
210 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
211     """
212     Make ddf files, and bonus condor file
213     """
214     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
215 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
216 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
217
218 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate
219 WHERE {
220   ?file ucscDaf:filename ?files ;
221         ucscDaf:md5sum ?md5sum .
222   ?submitView ucscDaf:has_file ?file ;
223               ucscDaf:view ?dafView ;
224               ucscDaf:submission <%(submission)s> .
225   ?dafView ucscDaf:name ?view .
226   <%(submission)s> submissionOntology:library ?library ;
227
228   OPTIONAL { ?library libraryOntology:antibody ?antibody }
229   OPTIONAL { ?library libraryOntology:cell_line ?cell }
230   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
231   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
232   OPTIONAL { ?library ucscDaf:sex ?sex }
233   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
234   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
235   OPTIONAL { ?library libraryOntology:replicate ?replicate }
236   OPTIONAL { ?library libraryOntology:condition ?treatment }
237   OPTIONAL { ?library ucscDaf:protocol ?protocol }
238   OPTIONAL { ?library ucscDaf:readType ?readType }
239   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
240 }
241 ORDER BY  ?submitView"""
242     dag_fragments = []
243
244     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
245     if name is None:
246         logging.error("Need name for %s" % (str(submissionNode)))
247         return []
248
249     ddf_name = name + '.ddf'
250     if outdir is not None:
251         outfile = os.path.join(outdir, ddf_name)
252         output = open(outfile,'w')
253     else:
254         output = sys.stdout
255
256     formatted_query = query_template % {'submission': str(submissionNode.uri)}
257
258     query = RDF.SPARQLQuery(formatted_query)
259     results = query.execute(view_map.model)
260
261     variables = ['files']
262     # filename goes first
263     variables.extend(view_map.get_daf_variables())
264     # 'controlId',
265     variables += [ 'labExpId', 'md5sum']
266     output.write('\t'.join(variables))
267     output.write(os.linesep)
268
269     all_views = {}
270     all_files = []
271     for row in results:
272         viewname = fromTypedNode(row['view'])
273         current = all_views.setdefault(viewname, {})
274         for variable_name in variables:
275             value = str(fromTypedNode(row[variable_name]))
276             if variable_name in ('files', 'md5sum'):
277                 current.setdefault(variable_name,[]).append(value)
278             else:
279                 current[variable_name] = value
280
281     for view in all_views.keys():
282         line = []
283         for variable_name in variables:
284             if variable_name in ('files', 'md5sum'):
285                 line.append(','.join(all_views[view][variable_name]))
286             else:
287                 line.append(all_views[view][variable_name])
288         output.write("\t".join(line))
289         output.write(os.linesep)
290         all_files.extend(all_views[view]['files'])
291
292     logging.info(
293         "Examined {0}, found files: {1}".format(
294             str(submissionNode), ", ".join(all_files)))
295
296     all_files.append(daf_name)
297     all_files.append(ddf_name)
298
299     if make_condor:
300         archive_condor = make_condor_archive_script(name, all_files, outdir)
301         upload_condor = make_condor_upload_script(name, outdir)
302
303         dag_fragments.extend(
304             make_dag_fragment(name, archive_condor, upload_condor)
305         )
306
307     return dag_fragments
308
309
310 def read_library_result_map(filename):
311     """
312     Read a file that maps library id to result directory.
313     Does not support spaces in filenames.
314
315     For example:
316       10000 result/foo/bar
317     """
318     stream = open(filename,'r')
319
320     results = []
321     for line in stream:
322         line = line.rstrip()
323         if not line.startswith('#') and len(line) > 0 :
324             library_id, result_dir = line.split()
325             results.append((library_id, result_dir))
326     return results
327
328
329 def make_condor_archive_script(name, files, outdir=None):
330     script = """Universe = vanilla
331
332 Executable = /bin/tar
333 arguments = czvhf ../%(archivename)s %(filelist)s
334
335 Error = compress.out.$(Process).log
336 Output = compress.out.$(Process).log
337 Log = /tmp/submission-compress-%(user)s.log
338 initialdir = %(initialdir)s
339 environment="GZIP=-3"
340 request_memory = 20
341
342 queue
343 """
344     if outdir is None:
345         outdir = os.getcwd()
346     for f in files:
347         pathname = os.path.join(outdir, f)
348         if not os.path.exists(pathname):
349             raise RuntimeError("Missing %s from %s" % (f,outdir))
350
351     context = {'archivename': make_submission_name(name),
352                'filelist': " ".join(files),
353                'initialdir': os.path.abspath(outdir),
354                'user': os.getlogin()}
355
356     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
357     condor_stream = open(condor_script,'w')
358     condor_stream.write(script % context)
359     condor_stream.close()
360     return condor_script
361
362
363 def make_condor_upload_script(name, outdir=None):
364     script = """Universe = vanilla
365
366 Executable = /usr/bin/lftp
367 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
368
369 Error = upload.out.$(Process).log
370 Output = upload.out.$(Process).log
371 Log = /tmp/submission-upload-%(user)s.log
372 initialdir = %(initialdir)s
373
374 queue
375 """
376     if outdir is None:
377         outdir = os.getcwd()
378
379     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
380
381     encodeftp = 'encodeftp.cse.ucsc.edu'
382     ftpuser = auth.hosts[encodeftp][0]
383     ftppassword = auth.hosts[encodeftp][2]
384     context = {'archivename': make_submission_name(name),
385                'initialdir': os.path.abspath(outdir),
386                'user': os.getlogin(),
387                'ftpuser': ftpuser,
388                'ftppassword': ftppassword,
389                'ftphost': encodeftp}
390
391     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
392     condor_stream = open(condor_script,'w')
393     condor_stream.write(script % context)
394     condor_stream.close()
395     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
396
397     return condor_script
398
399
400 def make_dag_fragment(ininame, archive_condor, upload_condor):
401     """
402     Make the couple of fragments compress and then upload the data.
403     """
404     cur_dir = os.getcwd()
405     archive_condor = os.path.join(cur_dir, archive_condor)
406     upload_condor = os.path.join(cur_dir, upload_condor)
407     job_basename = make_base_name(ininame)
408
409     fragments = []
410     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
411     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
412     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
413
414     return fragments
415
416
417 def get_library_info(host, apidata, library_id):
418     url = api.library_url(host, library_id)
419     contents = api.retrieve_info(url, apidata)
420     return contents
421
422
423 def make_submission_section(line_counter, files, attributes):
424     """
425     Create a section in the submission ini file
426     """
427     inifile = [ "[line%s]" % (line_counter,) ]
428     inifile += ["files=%s" % (",".join(files))]
429
430     for k,v in attributes.items():
431         inifile += ["%s=%s" % (k,v)]
432     return inifile
433
434
435 def make_base_name(pathname):
436     base = os.path.basename(pathname)
437     name, ext = os.path.splitext(base)
438     return name
439
440
441 def make_submission_name(ininame):
442     name = make_base_name(ininame)
443     return name + ".tgz"
444
445
446 def make_ddf_name(pathname):
447     name = make_base_name(pathname)
448     return name + ".ddf"
449
450
451 def make_condor_name(pathname, run_type=None):
452     name = make_base_name(pathname)
453     elements = [name]
454     if run_type is not None:
455         elements.append(run_type)
456     elements.append("condor")
457     return ".".join(elements)
458
459
460 def parse_filelist(file_string):
461     return file_string.split(",")
462
463
464 def validate_filelist(files):
465     """
466     Die if a file doesn't exist in a file list
467     """
468     for f in files:
469         if not os.path.exists(f):
470             raise RuntimeError("%s does not exist" % (f,))
471
472 if __name__ == "__main__":
473     main()