Further clean up ddf generation.
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20
21 import RDF
22
23 from htsworkflow.util import api
24 from htsworkflow.util.rdfhelp import \
25      dafTermOntology, \
26      fromTypedNode, \
27      get_model, \
28      get_serializer, \
29      load_into_model, \
30      sparql_query, \
31      submissionOntology 
32 from htsworkflow.submission.daf import \
33      DAFMapper, \
34      MetadataLookupException, \
35      get_submission_uri
36 from htsworkflow.submission.condorfastq import CondorFastqExtract
37
38 logger = logging.getLogger('ucsc_gather')
39
40 def main(cmdline=None):
41     parser = make_parser()
42     opts, args = parser.parse_args(cmdline)
43     
44     if opts.debug:
45         logging.basicConfig(level = logging.DEBUG )
46     elif opts.verbose:
47         logging.basicConfig(level = logging.INFO )
48     else:
49         logging.basicConfig(level = logging.WARNING )        
50     
51     apidata = api.make_auth_from_opts(opts, parser)
52
53     model = get_model(opts.load_model)
54     if opts.name:
55         mapper = DAFMapper(opts.name, opts.daf,  model)
56         submission_uri = get_submission_uri(opts.name)
57
58     if opts.library_url is not None:
59         mapper.library_url = opts.library_url
60         
61     if opts.load_rdf is not None:
62         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
63
64     if opts.make_ddf and opts.daf is None:
65         parser.error("Please specify your daf when making ddf files")
66
67     library_result_map = []
68     for a in args:
69         library_result_map.extend(read_library_result_map(a))
70
71     if opts.make_tree_from is not None:
72         make_tree_from(opts.make_tree_from, library_result_map)
73             
74     if opts.link_daf:
75         link_daf(opts.daf, library_result_map)
76
77     if opts.fastq:
78         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
79                                        force=opts.force)
80         extractor.build_fastqs(library_result_map)
81
82     if opts.scan_submission:
83         scan_submission_dirs(mapper, library_result_map)
84
85     if opts.make_ddf:
86         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
87
88     if opts.sparql:
89         sparql_query(model, opts.sparql)
90         
91     if opts.print_rdf:
92         writer = get_serializer()
93         print writer.serialize_model_to_string(model)
94
95         
96 def make_parser():
97     parser = OptionParser()
98
99     model = OptionGroup(parser, 'model')
100     model.add_option('--name', help="Set submission name")
101     model.add_option('--load-model', default=None,
102       help="Load model database")
103     model.add_option('--load-rdf', default=None,
104       help="load rdf statements into model")
105     model.add_option('--sparql', default=None, help="execute sparql query")
106     model.add_option('--print-rdf', action="store_true", default=False,
107       help="print ending model state")
108     parser.add_option_group(model)
109     # commands
110     commands = OptionGroup(parser, 'commands')
111     commands.add_option('--make-tree-from',
112                       help="create directories & link data files",
113                       default=None)
114     commands.add_option('--fastq', default=False, action="store_true",
115                         help="generate scripts for making fastq files")
116     commands.add_option('--scan-submission', default=False, action="store_true",
117                       help="Import metadata for submission into our model")
118     commands.add_option('--link-daf', default=False, action="store_true",
119                         help="link daf into submission directories")
120     commands.add_option('--make-ddf', help='make the ddfs', default=False,
121                       action="store_true")
122     parser.add_option_group(commands)
123     
124     parser.add_option('--force', default=False, action="store_true",
125                       help="Force regenerating fastqs")
126     parser.add_option('--daf', default=None, help='specify daf name')
127     parser.add_option('--library-url', default=None,
128                       help="specify an alternate source for library information")
129     # debugging
130     parser.add_option('--verbose', default=False, action="store_true",
131                       help='verbose logging')
132     parser.add_option('--debug', default=False, action="store_true",
133                       help='debug logging')
134
135     api.add_auth_options(parser)
136     
137     return parser
138
139 def make_tree_from(source_path, library_result_map):
140     """Create a tree using data files from source path.
141     """
142     for lib_id, lib_path in library_result_map:
143         if not os.path.exists(lib_path):
144             logging.info("Making dir {0}".format(lib_path))
145             os.mkdir(lib_path)
146         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
147         if os.path.exists(source_lib_dir):
148             pass
149         for filename in os.listdir(source_lib_dir):
150             source_pathname = os.path.join(source_lib_dir, filename)
151             target_pathname = os.path.join(lib_path, filename)
152             if not os.path.exists(source_pathname):
153                 raise IOError("{0} does not exist".format(source_pathname))
154             if not os.path.exists(target_pathname):
155                 os.symlink(source_pathname, target_pathname)
156                 logging.info(
157                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
158     
159
160 def link_daf(daf_path, library_result_map):
161     if not os.path.exists(daf_path):
162         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
163
164     base_daf = os.path.basename(daf_path)
165     
166     for lib_id, result_dir in library_result_map:
167         if not os.path.exists(result_dir):
168             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
169         submission_daf = os.path.join(result_dir, base_daf)
170         if not os.path.exists(submission_daf):
171             if not os.path.exists(daf_path):
172                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
173             os.link(daf_path, submission_daf)
174
175
176 def scan_submission_dirs(view_map, library_result_map):
177     """Look through our submission directories and collect needed information
178     """
179     for lib_id, result_dir in library_result_map:
180         logging.info("Importing %s from %s" % (lib_id, result_dir))
181         try:
182             view_map.import_submission_dir(result_dir, lib_id)
183         except MetadataLookupException, e:
184             logging.error("Skipping %s: %s" % (lib_id, str(e)))
185         
186 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
187     dag_fragment = []
188     for lib_id, result_dir in library_result_map:
189         submissionNode = view_map.get_submission_node(result_dir)
190         dag_fragment.extend(
191             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
192         )
193
194     if make_condor and len(dag_fragment) > 0:
195         dag_filename = 'submission.dagman'
196         if not force and os.path.exists(dag_filename):
197             logging.warn("%s exists, please delete" % (dag_filename,))
198         else:
199             f = open(dag_filename,'w')
200             f.write( os.linesep.join(dag_fragment))
201             f.write( os.linesep )
202             f.close()
203             
204
205 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
206     """
207     Make ddf files, and bonus condor file
208     """
209     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
210 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
211 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
212
213 select ?submitView  ?filename ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol
214 WHERE {
215   ?file ucscDaf:filename ?filename ;
216         ucscDaf:md5sum ?md5sum .
217   ?submitView ucscDaf:has_file ?file ;
218               ucscDaf:view ?dafView ;
219               ucscDaf:submission %(submission)s .
220   ?dafView ucscDaf:name ?view .
221   %(submission)s submissionOntology:library ?library .
222
223   OPTIONAL { ?submitView ucscDaf:antibody ?antibody }
224   OPTIONAL { ?submitView ucscDaf:cell ?cell }
225   OPTIONAL { ?submitView ucscDaf:control ?control }
226   OPTIONAL { ?library ucscDaf:controlId ?controlId }
227   OPTIONAL { ?submitView ucscDaf:sex ?sex }
228   OPTIONAL { ?submitView ucscDaf:labVersion ?labExpId }
229   OPTIONAL { ?submitView ucscDaf:labVersion ?labVersion }
230   OPTIONAL { ?library ucscDaf:treatment ?treatment }
231   OPTIONAL { ?submitView ucscDaf:protocol ?protocol }
232 }
233 ORDER BY  ?submitView""" 
234     dag_fragments = []
235
236     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
237     if name is None:
238         logging.error("Need name for %s" % (str(submissionNode)))
239         return []
240
241     ddf_name = name + '.ddf'
242     if outdir is not None:
243         outfile = os.path.join(outdir, ddf_name)
244         output = open(outfile,'w')
245     else:
246         output = sys.stdout
247
248     formatted_query = query_template % {'submission': str(submissionNode)}
249
250     query = RDF.SPARQLQuery(formatted_query)
251     results = query.execute(view_map.model)
252
253     variables = ['filename']
254     # filename goes first
255     variables.extend(view_map.get_daf_variables())
256     variables += ['controlId', 'labExpId', 'md5sum']
257     output.write('\t'.join(variables))
258     output.write(os.linesep)
259     
260     all_views = {}
261     all_files = []
262     for row in results:
263         viewname = fromTypedNode(row['view'])
264         current = all_views.setdefault(viewname, {})
265         for variable_name in variables:
266             value = str(fromTypedNode(row[variable_name]))
267             if variable_name in ('filename', 'md5sum'):
268                 current.setdefault(variable_name,[]).append(value)
269             else:
270                 current[variable_name] = value
271
272     for view in all_views.keys():
273         line = []
274         for variable_name in variables:
275             if variable_name in ('filename', 'md5sum'):
276                 line.append(','.join(all_views[view][variable_name]))
277             else:
278                 line.append(all_views[view][variable_name])
279         output.write("\t".join(line))
280         output.write(os.linesep)
281         all_files.extend(all_views[view]['filename'])
282         
283     logging.info(
284         "Examined {0}, found files: {1}".format(
285             str(submissionNode), ", ".join(all_files)))
286
287     all_files.append(daf_name)
288     all_files.append(ddf_name)
289
290     if make_condor:
291         archive_condor = make_condor_archive_script(name, all_files, outdir)
292         upload_condor = make_condor_upload_script(name, outdir)
293         
294         dag_fragments.extend( 
295             make_dag_fragment(name, archive_condor, upload_condor)
296         ) 
297         
298     return dag_fragments
299
300
301 def read_library_result_map(filename):
302     """
303     Read a file that maps library id to result directory.
304     Does not support spaces in filenames. 
305     
306     For example:
307       10000 result/foo/bar
308     """
309     stream = open(filename,'r')
310
311     results = []
312     for line in stream:
313         line = line.rstrip()
314         if not line.startswith('#') and len(line) > 0 :
315             library_id, result_dir = line.split()
316             results.append((library_id, result_dir))
317     return results
318
319
320 def make_condor_archive_script(name, files, outdir=None):
321     script = """Universe = vanilla
322
323 Executable = /bin/tar
324 arguments = czvhf ../%(archivename)s %(filelist)s
325
326 Error = compress.out.$(Process).log
327 Output = compress.out.$(Process).log
328 Log = /tmp/submission-compress-%(user)s.log
329 initialdir = %(initialdir)s
330 environment="GZIP=-3"
331 request_memory = 20
332
333 queue 
334 """
335     if outdir is None:
336         outdir = os.getcwd()
337     for f in files:
338         pathname = os.path.join(outdir, f)
339         if not os.path.exists(pathname):
340             raise RuntimeError("Missing %s" % (f,))
341
342     context = {'archivename': make_submission_name(name),
343                'filelist': " ".join(files),
344                'initialdir': os.path.abspath(outdir), 
345                'user': os.getlogin()}
346
347     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
348     condor_stream = open(condor_script,'w')
349     condor_stream.write(script % context)
350     condor_stream.close()
351     return condor_script
352
353
354 def make_condor_upload_script(name, outdir=None):
355     script = """Universe = vanilla
356
357 Executable = /usr/bin/lftp
358 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
359
360 Error = upload.out.$(Process).log
361 Output = upload.out.$(Process).log
362 Log = /tmp/submission-upload-%(user)s.log
363 initialdir = %(initialdir)s
364
365 queue 
366 """
367     if outdir is None:
368         outdir = os.getcwd()
369         
370     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
371     
372     encodeftp = 'encodeftp.cse.ucsc.edu'
373     ftpuser = auth.hosts[encodeftp][0]
374     ftppassword = auth.hosts[encodeftp][2]
375     context = {'archivename': make_submission_name(name),
376                'initialdir': os.path.abspath(outdir),
377                'user': os.getlogin(),
378                'ftpuser': ftpuser,
379                'ftppassword': ftppassword,
380                'ftphost': encodeftp}
381
382     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
383     condor_stream = open(condor_script,'w')
384     condor_stream.write(script % context)
385     condor_stream.close()
386     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
387
388     return condor_script
389
390
391 def make_dag_fragment(ininame, archive_condor, upload_condor):
392     """
393     Make the couple of fragments compress and then upload the data.
394     """
395     cur_dir = os.getcwd()
396     archive_condor = os.path.join(cur_dir, archive_condor)
397     upload_condor = os.path.join(cur_dir, upload_condor)
398     job_basename = make_base_name(ininame)
399
400     fragments = []
401     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
402     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
403     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
404
405     return fragments
406
407
408 def get_library_info(host, apidata, library_id):
409     url = api.library_url(host, library_id)
410     contents = api.retrieve_info(url, apidata)
411     return contents
412
413
414 def make_submission_section(line_counter, files, attributes):
415     """
416     Create a section in the submission ini file
417     """
418     inifile = [ "[line%s]" % (line_counter,) ]
419     inifile += ["files=%s" % (",".join(files))]
420
421     for k,v in attributes.items():
422         inifile += ["%s=%s" % (k,v)]
423     return inifile
424
425
426 def make_base_name(pathname):
427     base = os.path.basename(pathname)
428     name, ext = os.path.splitext(base)
429     return name
430
431
432 def make_submission_name(ininame):
433     name = make_base_name(ininame)
434     return name + ".tgz"
435
436
437 def make_ddf_name(pathname):
438     name = make_base_name(pathname)
439     return name + ".ddf"
440
441
442 def make_condor_name(pathname, run_type=None):
443     name = make_base_name(pathname)
444     elements = [name]
445     if run_type is not None:
446         elements.append(run_type)
447     elements.append("condor")
448     return ".".join(elements)
449
450
451 def parse_filelist(file_string):
452     return file_string.split(",")
453
454
455 def validate_filelist(files):
456     """
457     Die if a file doesn't exist in a file list
458     """
459     for f in files:
460         if not os.path.exists(f):
461             raise RuntimeError("%s does not exist" % (f,))
462
463 if __name__ == "__main__":
464     main()