c929524abd6e97f606187e15929f2179758c6282
[htsworkflow.git] / encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20
21 import RDF
22
23 from htsworkflow.util import api
24 from htsworkflow.util.rdfhelp import \
25      dafTermOntology, \
26      fromTypedNode, \
27      get_model, \
28      get_serializer, \
29      load_into_model, \
30      sparql_query, \
31      submissionOntology
32 from htsworkflow.submission.daf import \
33      DAFMapper, \
34      MetadataLookupException, \
35      get_submission_uri
36 from htsworkflow.submission.condorfastq import CondorFastqExtract
37
38 logger = logging.getLogger('ucsc_gather')
39
40 def main(cmdline=None):
41     parser = make_parser()
42     opts, args = parser.parse_args(cmdline)
43     submission_uri = None
44
45     if opts.debug:
46         logging.basicConfig(level = logging.DEBUG )
47     elif opts.verbose:
48         logging.basicConfig(level = logging.INFO )
49     else:
50         logging.basicConfig(level = logging.WARNING )
51
52     apidata = api.make_auth_from_opts(opts, parser)
53
54     model = get_model(opts.model, opts.db_path)
55     if opts.name:
56         mapper = DAFMapper(opts.name, opts.daf,  model)
57         if opts.library_url is not None:
58             mapper.library_url = opts.library_url
59         submission_uri = get_submission_uri(opts.name)
60
61
62     if opts.load_rdf is not None:
63         if submission_uri is None:
64             parser.error("Please specify the submission name")
65         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
66
67     if opts.make_ddf and opts.daf is None:
68         parser.error("Please specify your daf when making ddf files")
69
70     library_result_map = []
71     for a in args:
72         library_result_map.extend(read_library_result_map(a))
73
74     if opts.make_tree_from is not None:
75         make_tree_from(opts.make_tree_from, library_result_map)
76
77     if opts.link_daf:
78         if opts.daf is None:
79             parser.error("Please specify daf filename with --daf")
80         link_daf(opts.daf, library_result_map)
81
82     if opts.fastq:
83         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
84                                        force=opts.force)
85         extractor.build_fastqs(library_result_map)
86
87     if opts.scan_submission:
88         scan_submission_dirs(mapper, library_result_map)
89
90     if opts.make_ddf:
91         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
92
93     if opts.sparql:
94         sparql_query(model, opts.sparql)
95
96     if opts.print_rdf:
97         writer = get_serializer()
98         print writer.serialize_model_to_string(model)
99
100
101 def make_parser():
102     parser = OptionParser()
103
104     model = OptionGroup(parser, 'model')
105     model.add_option('--name', help="Set submission name")
106     model.add_option('--db-path', default=None,
107                      help="set rdf database path")
108     model.add_option('--model', default=None,
109       help="Load model database")
110     model.add_option('--load-rdf', default=None,
111       help="load rdf statements into model")
112     model.add_option('--sparql', default=None, help="execute sparql query")
113     model.add_option('--print-rdf', action="store_true", default=False,
114       help="print ending model state")
115     parser.add_option_group(model)
116     # commands
117     commands = OptionGroup(parser, 'commands')
118     commands.add_option('--make-tree-from',
119                       help="create directories & link data files",
120                       default=None)
121     commands.add_option('--fastq', default=False, action="store_true",
122                         help="generate scripts for making fastq files")
123     commands.add_option('--scan-submission', default=False, action="store_true",
124                       help="Import metadata for submission into our model")
125     commands.add_option('--link-daf', default=False, action="store_true",
126                         help="link daf into submission directories")
127     commands.add_option('--make-ddf', help='make the ddfs', default=False,
128                       action="store_true")
129     parser.add_option_group(commands)
130
131     parser.add_option('--force', default=False, action="store_true",
132                       help="Force regenerating fastqs")
133     parser.add_option('--daf', default=None, help='specify daf name')
134     parser.add_option('--library-url', default=None,
135                       help="specify an alternate source for library information")
136     # debugging
137     parser.add_option('--verbose', default=False, action="store_true",
138                       help='verbose logging')
139     parser.add_option('--debug', default=False, action="store_true",
140                       help='debug logging')
141
142     api.add_auth_options(parser)
143
144     return parser
145
146 def make_tree_from(source_path, library_result_map):
147     """Create a tree using data files from source path.
148     """
149     for lib_id, lib_path in library_result_map:
150         if not os.path.exists(lib_path):
151             logger.info("Making dir {0}".format(lib_path))
152             os.mkdir(lib_path)
153         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
154         if os.path.exists(source_lib_dir):
155             pass
156         for filename in os.listdir(source_lib_dir):
157             source_pathname = os.path.join(source_lib_dir, filename)
158             target_pathname = os.path.join(lib_path, filename)
159             if not os.path.exists(source_pathname):
160                 raise IOError("{0} does not exist".format(source_pathname))
161             if not os.path.exists(target_pathname):
162                 os.symlink(source_pathname, target_pathname)
163                 logger.info(
164                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
165
166
167 def link_daf(daf_path, library_result_map):
168     if not os.path.exists(daf_path):
169         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
170
171     base_daf = os.path.basename(daf_path)
172
173     for lib_id, result_dir in library_result_map:
174         if not os.path.exists(result_dir):
175             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
176         submission_daf = os.path.join(result_dir, base_daf)
177         if not os.path.exists(submission_daf):
178             if not os.path.exists(daf_path):
179                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
180             os.link(daf_path, submission_daf)
181
182
183 def scan_submission_dirs(view_map, library_result_map):
184     """Look through our submission directories and collect needed information
185     """
186     for lib_id, result_dir in library_result_map:
187         logger.info("Importing %s from %s" % (lib_id, result_dir))
188         try:
189             view_map.import_submission_dir(result_dir, lib_id)
190         except MetadataLookupException, e:
191             logger.error("Skipping %s: %s" % (lib_id, str(e)))
192
193 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
194     dag_fragment = []
195     for lib_id, result_dir in library_result_map:
196         submissionNode = view_map.get_submission_node(result_dir)
197         dag_fragment.extend(
198             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
199         )
200
201     if make_condor and len(dag_fragment) > 0:
202         dag_filename = 'submission.dagman'
203         if not force and os.path.exists(dag_filename):
204             logger.warn("%s exists, please delete" % (dag_filename,))
205         else:
206             f = open(dag_filename,'w')
207             f.write( os.linesep.join(dag_fragment))
208             f.write( os.linesep )
209             f.close()
210
211
212 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
213     """
214     Make ddf files, and bonus condor file
215     """
216     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
217 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
218 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
219
220 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?strain ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate ?mapAlgorithm
221 WHERE {
222   ?file ucscDaf:filename ?files ;
223         ucscDaf:md5sum ?md5sum .
224   ?submitView ucscDaf:has_file ?file ;
225               ucscDaf:view ?dafView ;
226               ucscDaf:submission <%(submission)s> .
227   ?dafView ucscDaf:name ?view .
228   <%(submission)s> submissionOntology:library ?library ;
229
230   OPTIONAL { ?library libraryOntology:antibody ?antibody }
231   OPTIONAL { ?library libraryOntology:cell_line ?cell }
232   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
233   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
234   OPTIONAL { ?library ucscDaf:sex ?sex }
235   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
236   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
237   OPTIONAL { ?library libraryOntology:replicate ?replicate }
238   OPTIONAL { ?library libraryOntology:condition ?treatment }
239   OPTIONAL { ?library ucscDaf:protocol ?protocol }
240   OPTIONAL { ?library ucscDaf:readType ?readType }
241   OPTIONAL { ?library ucscDaf:strain ?strain }
242   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
243   OPTIONAL { ?library ucscDaf:mapAlgorithm ?mapAlgorithm }
244 }
245 ORDER BY  ?submitView"""
246     dag_fragments = []
247
248     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
249     if name is None:
250         logger.error("Need name for %s" % (str(submissionNode)))
251         return []
252
253     ddf_name = name + '.ddf'
254     if outdir is not None:
255         outfile = os.path.join(outdir, ddf_name)
256         output = open(outfile,'w')
257     else:
258         outfile = 'stdout:'
259         output = sys.stdout
260
261     formatted_query = query_template % {'submission': str(submissionNode.uri)}
262
263     query = RDF.SPARQLQuery(formatted_query)
264     results = query.execute(view_map.model)
265
266     # filename goes first
267     variables = view_map.get_daf_variables()
268     # 'controlId',
269     output.write('\t'.join(variables))
270     output.write(os.linesep)
271
272     all_views = {}
273     all_files = []
274     for row in results:
275         viewname = fromTypedNode(row['view'])
276         current = all_views.setdefault(viewname, {})
277         for variable_name in variables:
278             value = str(fromTypedNode(row[variable_name]))
279             if value is None or value == 'None':
280                 logger.warn("{0}: {1} was None".format(outfile, variable_name))
281             if variable_name in ('files', 'md5sum'):
282                 current.setdefault(variable_name,[]).append(value)
283             else:
284                 current[variable_name] = value
285
286     for view in all_views.keys():
287         line = []
288         for variable_name in variables:
289             if variable_name in ('files', 'md5sum'):
290                 line.append(','.join(all_views[view][variable_name]))
291             else:
292                 line.append(all_views[view][variable_name])
293         output.write("\t".join(line))
294         output.write(os.linesep)
295         all_files.extend(all_views[view]['files'])
296
297     logger.info(
298         "Examined {0}, found files: {1}".format(
299             str(submissionNode), ", ".join(all_files)))
300
301     all_files.append(daf_name)
302     all_files.append(ddf_name)
303
304     if make_condor:
305         archive_condor = make_condor_archive_script(name, all_files, outdir)
306         upload_condor = make_condor_upload_script(name, outdir)
307
308         dag_fragments.extend(
309             make_dag_fragment(name, archive_condor, upload_condor)
310         )
311
312     return dag_fragments
313
314
315 def read_library_result_map(filename):
316     """
317     Read a file that maps library id to result directory.
318     Does not support spaces in filenames.
319
320     For example:
321       10000 result/foo/bar
322     """
323     stream = open(filename,'r')
324
325     results = []
326     for line in stream:
327         line = line.rstrip()
328         if not line.startswith('#') and len(line) > 0 :
329             library_id, result_dir = line.split()
330             results.append((library_id, result_dir))
331     return results
332
333
334 def make_condor_archive_script(name, files, outdir=None):
335     script = """Universe = vanilla
336
337 Executable = /bin/tar
338 arguments = czvhf ../%(archivename)s %(filelist)s
339
340 Error = compress.out.$(Process).log
341 Output = compress.out.$(Process).log
342 Log = /tmp/submission-compress-%(user)s.log
343 initialdir = %(initialdir)s
344 environment="GZIP=-3"
345 request_memory = 20
346
347 queue
348 """
349     if outdir is None:
350         outdir = os.getcwd()
351     for f in files:
352         pathname = os.path.join(outdir, f)
353         if not os.path.exists(pathname):
354             raise RuntimeError("Missing %s from %s" % (f,outdir))
355
356     context = {'archivename': make_submission_name(name),
357                'filelist': " ".join(files),
358                'initialdir': os.path.abspath(outdir),
359                'user': os.getlogin()}
360
361     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
362     condor_stream = open(condor_script,'w')
363     condor_stream.write(script % context)
364     condor_stream.close()
365     return condor_script
366
367
368 def make_condor_upload_script(name, outdir=None):
369     script = """Universe = vanilla
370
371 Executable = /usr/bin/lftp
372 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
373
374 Error = upload.out.$(Process).log
375 Output = upload.out.$(Process).log
376 Log = /tmp/submission-upload-%(user)s.log
377 initialdir = %(initialdir)s
378
379 queue
380 """
381     if outdir is None:
382         outdir = os.getcwd()
383
384     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
385
386     encodeftp = 'encodeftp.cse.ucsc.edu'
387     ftpuser = auth.hosts[encodeftp][0]
388     ftppassword = auth.hosts[encodeftp][2]
389     context = {'archivename': make_submission_name(name),
390                'initialdir': os.path.abspath(outdir),
391                'user': os.getlogin(),
392                'ftpuser': ftpuser,
393                'ftppassword': ftppassword,
394                'ftphost': encodeftp}
395
396     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
397     condor_stream = open(condor_script,'w')
398     condor_stream.write(script % context)
399     condor_stream.close()
400     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
401
402     return condor_script
403
404
405 def make_dag_fragment(ininame, archive_condor, upload_condor):
406     """
407     Make the couple of fragments compress and then upload the data.
408     """
409     cur_dir = os.getcwd()
410     archive_condor = os.path.join(cur_dir, archive_condor)
411     upload_condor = os.path.join(cur_dir, upload_condor)
412     job_basename = make_base_name(ininame)
413
414     fragments = []
415     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
416     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
417     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
418
419     return fragments
420
421
422 def get_library_info(host, apidata, library_id):
423     url = api.library_url(host, library_id)
424     contents = api.retrieve_info(url, apidata)
425     return contents
426
427
428 def make_submission_section(line_counter, files, attributes):
429     """
430     Create a section in the submission ini file
431     """
432     inifile = [ "[line%s]" % (line_counter,) ]
433     inifile += ["files=%s" % (",".join(files))]
434
435     for k,v in attributes.items():
436         inifile += ["%s=%s" % (k,v)]
437     return inifile
438
439
440 def make_base_name(pathname):
441     base = os.path.basename(pathname)
442     name, ext = os.path.splitext(base)
443     return name
444
445
446 def make_submission_name(ininame):
447     name = make_base_name(ininame)
448     return name + ".tgz"
449
450
451 def make_ddf_name(pathname):
452     name = make_base_name(pathname)
453     return name + ".ddf"
454
455
456 def make_condor_name(pathname, run_type=None):
457     name = make_base_name(pathname)
458     elements = [name]
459     if run_type is not None:
460         elements.append(run_type)
461     elements.append("condor")
462     return ".".join(elements)
463
464
465 def parse_filelist(file_string):
466     return file_string.split(",")
467
468
469 def validate_filelist(files):
470     """
471     Die if a file doesn't exist in a file list
472     """
473     for f in files:
474         if not os.path.exists(f):
475             raise RuntimeError("%s does not exist" % (f,))
476
477 if __name__ == "__main__":
478     main()