Examine the DAF to determine if the DDF needs to include replicate information
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20
21 import RDF
22
23 from htsworkflow.util import api
24 from htsworkflow.util.rdfhelp import \
25      dafTermOntology, \
26      fromTypedNode, \
27      get_model, \
28      get_serializer, \
29      load_into_model, \
30      sparql_query, \
31      submissionOntology 
32 from htsworkflow.submission.daf import \
33      DAFMapper, \
34      MetadataLookupException, \
35      get_submission_uri
36 from htsworkflow.submission.condorfastq import CondorFastqExtract
37
38 logger = logging.getLogger('ucsc_gather')
39
40 def main(cmdline=None):
41     parser = make_parser()
42     opts, args = parser.parse_args(cmdline)
43     submission_uri = None
44     
45     if opts.debug:
46         logging.basicConfig(level = logging.DEBUG )
47     elif opts.verbose:
48         logging.basicConfig(level = logging.INFO )
49     else:
50         logging.basicConfig(level = logging.WARNING )        
51     
52     apidata = api.make_auth_from_opts(opts, parser)
53
54     model = get_model(opts.load_model)
55     if opts.name:
56         mapper = DAFMapper(opts.name, opts.daf,  model)
57         if opts.library_url is not None:
58             mapper.library_url = opts.library_url
59         submission_uri = get_submission_uri(opts.name)
60         
61
62     if opts.load_rdf is not None:
63         if submission_uri is None:
64             parser.error("Please specify the submission name")
65         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
66
67     if opts.make_ddf and opts.daf is None:
68         parser.error("Please specify your daf when making ddf files")
69
70     library_result_map = []
71     for a in args:
72         library_result_map.extend(read_library_result_map(a))
73
74     if opts.make_tree_from is not None:
75         make_tree_from(opts.make_tree_from, library_result_map)
76             
77     if opts.link_daf:
78         link_daf(opts.daf, library_result_map)
79
80     if opts.fastq:
81         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
82                                        force=opts.force)
83         extractor.build_fastqs(library_result_map)
84
85     if opts.scan_submission:
86         scan_submission_dirs(mapper, library_result_map)
87
88     if opts.make_ddf:
89         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
90
91     if opts.sparql:
92         sparql_query(model, opts.sparql)
93         
94     if opts.print_rdf:
95         writer = get_serializer()
96         print writer.serialize_model_to_string(model)
97
98         
99 def make_parser():
100     parser = OptionParser()
101
102     model = OptionGroup(parser, 'model')
103     model.add_option('--name', help="Set submission name")
104     model.add_option('--load-model', default=None,
105       help="Load model database")
106     model.add_option('--load-rdf', default=None,
107       help="load rdf statements into model")
108     model.add_option('--sparql', default=None, help="execute sparql query")
109     model.add_option('--print-rdf', action="store_true", default=False,
110       help="print ending model state")
111     parser.add_option_group(model)
112     # commands
113     commands = OptionGroup(parser, 'commands')
114     commands.add_option('--make-tree-from',
115                       help="create directories & link data files",
116                       default=None)
117     commands.add_option('--fastq', default=False, action="store_true",
118                         help="generate scripts for making fastq files")
119     commands.add_option('--scan-submission', default=False, action="store_true",
120                       help="Import metadata for submission into our model")
121     commands.add_option('--link-daf', default=False, action="store_true",
122                         help="link daf into submission directories")
123     commands.add_option('--make-ddf', help='make the ddfs', default=False,
124                       action="store_true")
125     parser.add_option_group(commands)
126     
127     parser.add_option('--force', default=False, action="store_true",
128                       help="Force regenerating fastqs")
129     parser.add_option('--daf', default=None, help='specify daf name')
130     parser.add_option('--library-url', default=None,
131                       help="specify an alternate source for library information")
132     # debugging
133     parser.add_option('--verbose', default=False, action="store_true",
134                       help='verbose logging')
135     parser.add_option('--debug', default=False, action="store_true",
136                       help='debug logging')
137
138     api.add_auth_options(parser)
139     
140     return parser
141
142 def make_tree_from(source_path, library_result_map):
143     """Create a tree using data files from source path.
144     """
145     for lib_id, lib_path in library_result_map:
146         if not os.path.exists(lib_path):
147             logging.info("Making dir {0}".format(lib_path))
148             os.mkdir(lib_path)
149         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
150         if os.path.exists(source_lib_dir):
151             pass
152         for filename in os.listdir(source_lib_dir):
153             source_pathname = os.path.join(source_lib_dir, filename)
154             target_pathname = os.path.join(lib_path, filename)
155             if not os.path.exists(source_pathname):
156                 raise IOError("{0} does not exist".format(source_pathname))
157             if not os.path.exists(target_pathname):
158                 os.symlink(source_pathname, target_pathname)
159                 logging.info(
160                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
161     
162
163 def link_daf(daf_path, library_result_map):
164     if not os.path.exists(daf_path):
165         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
166
167     base_daf = os.path.basename(daf_path)
168     
169     for lib_id, result_dir in library_result_map:
170         if not os.path.exists(result_dir):
171             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
172         submission_daf = os.path.join(result_dir, base_daf)
173         if not os.path.exists(submission_daf):
174             if not os.path.exists(daf_path):
175                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
176             os.link(daf_path, submission_daf)
177
178
179 def scan_submission_dirs(view_map, library_result_map):
180     """Look through our submission directories and collect needed information
181     """
182     for lib_id, result_dir in library_result_map:
183         logging.info("Importing %s from %s" % (lib_id, result_dir))
184         try:
185             view_map.import_submission_dir(result_dir, lib_id)
186         except MetadataLookupException, e:
187             logging.error("Skipping %s: %s" % (lib_id, str(e)))
188         
189 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
190     dag_fragment = []
191     for lib_id, result_dir in library_result_map:
192         submissionNode = view_map.get_submission_node(result_dir)
193         dag_fragment.extend(
194             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
195         )
196
197     if make_condor and len(dag_fragment) > 0:
198         dag_filename = 'submission.dagman'
199         if not force and os.path.exists(dag_filename):
200             logging.warn("%s exists, please delete" % (dag_filename,))
201         else:
202             f = open(dag_filename,'w')
203             f.write( os.linesep.join(dag_fragment))
204             f.write( os.linesep )
205             f.close()
206             
207
208 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
209     """
210     Make ddf files, and bonus condor file
211     """
212     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
213 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
214 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
215
216 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength ?replicate
217 WHERE {
218   ?file ucscDaf:filename ?files ;
219         ucscDaf:md5sum ?md5sum .
220   ?submitView ucscDaf:has_file ?file ;
221               ucscDaf:view ?dafView ;
222               ucscDaf:submission <%(submission)s> .
223   ?dafView ucscDaf:name ?view .
224   <%(submission)s> submissionOntology:library ?library ;
225
226   OPTIONAL { ?library libraryOntology:antibody ?antibody }
227   OPTIONAL { ?library libraryOntology:cell_line ?cell }
228   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
229   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
230   OPTIONAL { ?library ucscDaf:sex ?sex }
231   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
232   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
233   OPTIONAL { ?library libraryOntology:replicate ?replicate }
234   OPTIONAL { ?library libraryOntology:condition ?treatment }
235   OPTIONAL { ?library ucscDaf:protocol ?protocol }
236   OPTIONAL { ?library ucscDaf:readType ?readType }
237   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
238 }
239 ORDER BY  ?submitView""" 
240     dag_fragments = []
241
242     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
243     if name is None:
244         logging.error("Need name for %s" % (str(submissionNode)))
245         return []
246
247     ddf_name = name + '.ddf'
248     if outdir is not None:
249         outfile = os.path.join(outdir, ddf_name)
250         output = open(outfile,'w')
251     else:
252         output = sys.stdout
253
254     formatted_query = query_template % {'submission': str(submissionNode.uri)}
255
256     query = RDF.SPARQLQuery(formatted_query)
257     results = query.execute(view_map.model)
258
259     variables = ['files']
260     # filename goes first
261     variables.extend(view_map.get_daf_variables())
262     # 'controlId',
263     variables += [ 'labExpId', 'md5sum']
264     output.write('\t'.join(variables))
265     output.write(os.linesep)
266     
267     all_views = {}
268     all_files = []
269     for row in results:
270         viewname = fromTypedNode(row['view'])
271         current = all_views.setdefault(viewname, {})
272         for variable_name in variables:
273             value = str(fromTypedNode(row[variable_name]))
274             if variable_name in ('files', 'md5sum'):
275                 current.setdefault(variable_name,[]).append(value)
276             else:
277                 current[variable_name] = value
278
279     for view in all_views.keys():
280         line = []
281         for variable_name in variables:
282             if variable_name in ('files', 'md5sum'):
283                 line.append(','.join(all_views[view][variable_name]))
284             else:
285                 line.append(all_views[view][variable_name])
286         output.write("\t".join(line))
287         output.write(os.linesep)
288         all_files.extend(all_views[view]['files'])
289         
290     logging.info(
291         "Examined {0}, found files: {1}".format(
292             str(submissionNode), ", ".join(all_files)))
293
294     all_files.append(daf_name)
295     all_files.append(ddf_name)
296
297     if make_condor:
298         archive_condor = make_condor_archive_script(name, all_files, outdir)
299         upload_condor = make_condor_upload_script(name, outdir)
300         
301         dag_fragments.extend( 
302             make_dag_fragment(name, archive_condor, upload_condor)
303         ) 
304         
305     return dag_fragments
306
307
308 def read_library_result_map(filename):
309     """
310     Read a file that maps library id to result directory.
311     Does not support spaces in filenames. 
312     
313     For example:
314       10000 result/foo/bar
315     """
316     stream = open(filename,'r')
317
318     results = []
319     for line in stream:
320         line = line.rstrip()
321         if not line.startswith('#') and len(line) > 0 :
322             library_id, result_dir = line.split()
323             results.append((library_id, result_dir))
324     return results
325
326
327 def make_condor_archive_script(name, files, outdir=None):
328     script = """Universe = vanilla
329
330 Executable = /bin/tar
331 arguments = czvhf ../%(archivename)s %(filelist)s
332
333 Error = compress.out.$(Process).log
334 Output = compress.out.$(Process).log
335 Log = /tmp/submission-compress-%(user)s.log
336 initialdir = %(initialdir)s
337 environment="GZIP=-3"
338 request_memory = 20
339
340 queue 
341 """
342     if outdir is None:
343         outdir = os.getcwd()
344     for f in files:
345         pathname = os.path.join(outdir, f)
346         if not os.path.exists(pathname):
347             raise RuntimeError("Missing %s from %s" % (f,outdir))
348
349     context = {'archivename': make_submission_name(name),
350                'filelist': " ".join(files),
351                'initialdir': os.path.abspath(outdir), 
352                'user': os.getlogin()}
353
354     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
355     condor_stream = open(condor_script,'w')
356     condor_stream.write(script % context)
357     condor_stream.close()
358     return condor_script
359
360
361 def make_condor_upload_script(name, outdir=None):
362     script = """Universe = vanilla
363
364 Executable = /usr/bin/lftp
365 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
366
367 Error = upload.out.$(Process).log
368 Output = upload.out.$(Process).log
369 Log = /tmp/submission-upload-%(user)s.log
370 initialdir = %(initialdir)s
371
372 queue 
373 """
374     if outdir is None:
375         outdir = os.getcwd()
376         
377     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
378     
379     encodeftp = 'encodeftp.cse.ucsc.edu'
380     ftpuser = auth.hosts[encodeftp][0]
381     ftppassword = auth.hosts[encodeftp][2]
382     context = {'archivename': make_submission_name(name),
383                'initialdir': os.path.abspath(outdir),
384                'user': os.getlogin(),
385                'ftpuser': ftpuser,
386                'ftppassword': ftppassword,
387                'ftphost': encodeftp}
388
389     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
390     condor_stream = open(condor_script,'w')
391     condor_stream.write(script % context)
392     condor_stream.close()
393     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
394
395     return condor_script
396
397
398 def make_dag_fragment(ininame, archive_condor, upload_condor):
399     """
400     Make the couple of fragments compress and then upload the data.
401     """
402     cur_dir = os.getcwd()
403     archive_condor = os.path.join(cur_dir, archive_condor)
404     upload_condor = os.path.join(cur_dir, upload_condor)
405     job_basename = make_base_name(ininame)
406
407     fragments = []
408     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
409     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
410     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
411
412     return fragments
413
414
415 def get_library_info(host, apidata, library_id):
416     url = api.library_url(host, library_id)
417     contents = api.retrieve_info(url, apidata)
418     return contents
419
420
421 def make_submission_section(line_counter, files, attributes):
422     """
423     Create a section in the submission ini file
424     """
425     inifile = [ "[line%s]" % (line_counter,) ]
426     inifile += ["files=%s" % (",".join(files))]
427
428     for k,v in attributes.items():
429         inifile += ["%s=%s" % (k,v)]
430     return inifile
431
432
433 def make_base_name(pathname):
434     base = os.path.basename(pathname)
435     name, ext = os.path.splitext(base)
436     return name
437
438
439 def make_submission_name(ininame):
440     name = make_base_name(ininame)
441     return name + ".tgz"
442
443
444 def make_ddf_name(pathname):
445     name = make_base_name(pathname)
446     return name + ".ddf"
447
448
449 def make_condor_name(pathname, run_type=None):
450     name = make_base_name(pathname)
451     elements = [name]
452     if run_type is not None:
453         elements.append(run_type)
454     elements.append("condor")
455     return ".".join(elements)
456
457
458 def parse_filelist(file_string):
459     return file_string.split(",")
460
461
462 def validate_filelist(files):
463     """
464     Die if a file doesn't exist in a file list
465     """
466     for f in files:
467         if not os.path.exists(f):
468             raise RuntimeError("%s does not exist" % (f,))
469
470 if __name__ == "__main__":
471     main()