Stop copying attributes from library to a submission node
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 import sys
15 import time
16 import types
17 import urllib
18 import urllib2
19 import urlparse
20
21 import RDF
22
23 from htsworkflow.util import api
24 from htsworkflow.util.rdfhelp import \
25      dafTermOntology, \
26      fromTypedNode, \
27      get_model, \
28      get_serializer, \
29      load_into_model, \
30      sparql_query, \
31      submissionOntology 
32 from htsworkflow.submission.daf import \
33      DAFMapper, \
34      MetadataLookupException, \
35      get_submission_uri
36 from htsworkflow.submission.condorfastq import CondorFastqExtract
37
38 logger = logging.getLogger('ucsc_gather')
39
40 def main(cmdline=None):
41     parser = make_parser()
42     opts, args = parser.parse_args(cmdline)
43     submission_uri = None
44     
45     if opts.debug:
46         logging.basicConfig(level = logging.DEBUG )
47     elif opts.verbose:
48         logging.basicConfig(level = logging.INFO )
49     else:
50         logging.basicConfig(level = logging.WARNING )        
51     
52     apidata = api.make_auth_from_opts(opts, parser)
53
54     model = get_model(opts.load_model)
55     if opts.name:
56         mapper = DAFMapper(opts.name, opts.daf,  model)
57         if opts.library_url is not None:
58             mapper.library_url = opts.library_url
59         submission_uri = get_submission_uri(opts.name)
60         
61
62     if opts.load_rdf is not None:
63         if submission_uri is None:
64             parser.error("Please specify the submission name")
65         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
66
67     if opts.make_ddf and opts.daf is None:
68         parser.error("Please specify your daf when making ddf files")
69
70     library_result_map = []
71     for a in args:
72         library_result_map.extend(read_library_result_map(a))
73
74     if opts.make_tree_from is not None:
75         make_tree_from(opts.make_tree_from, library_result_map)
76             
77     if opts.link_daf:
78         link_daf(opts.daf, library_result_map)
79
80     if opts.fastq:
81         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
82                                        force=opts.force)
83         extractor.build_fastqs(library_result_map)
84
85     if opts.scan_submission:
86         scan_submission_dirs(mapper, library_result_map)
87
88     if opts.make_ddf:
89         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
90
91     if opts.sparql:
92         sparql_query(model, opts.sparql)
93         
94     if opts.print_rdf:
95         writer = get_serializer()
96         print writer.serialize_model_to_string(model)
97
98         
99 def make_parser():
100     parser = OptionParser()
101
102     model = OptionGroup(parser, 'model')
103     model.add_option('--name', help="Set submission name")
104     model.add_option('--load-model', default=None,
105       help="Load model database")
106     model.add_option('--load-rdf', default=None,
107       help="load rdf statements into model")
108     model.add_option('--sparql', default=None, help="execute sparql query")
109     model.add_option('--print-rdf', action="store_true", default=False,
110       help="print ending model state")
111     parser.add_option_group(model)
112     # commands
113     commands = OptionGroup(parser, 'commands')
114     commands.add_option('--make-tree-from',
115                       help="create directories & link data files",
116                       default=None)
117     commands.add_option('--fastq', default=False, action="store_true",
118                         help="generate scripts for making fastq files")
119     commands.add_option('--scan-submission', default=False, action="store_true",
120                       help="Import metadata for submission into our model")
121     commands.add_option('--link-daf', default=False, action="store_true",
122                         help="link daf into submission directories")
123     commands.add_option('--make-ddf', help='make the ddfs', default=False,
124                       action="store_true")
125     parser.add_option_group(commands)
126     
127     parser.add_option('--force', default=False, action="store_true",
128                       help="Force regenerating fastqs")
129     parser.add_option('--daf', default=None, help='specify daf name')
130     parser.add_option('--library-url', default=None,
131                       help="specify an alternate source for library information")
132     # debugging
133     parser.add_option('--verbose', default=False, action="store_true",
134                       help='verbose logging')
135     parser.add_option('--debug', default=False, action="store_true",
136                       help='debug logging')
137
138     api.add_auth_options(parser)
139     
140     return parser
141
142 def make_tree_from(source_path, library_result_map):
143     """Create a tree using data files from source path.
144     """
145     for lib_id, lib_path in library_result_map:
146         if not os.path.exists(lib_path):
147             logging.info("Making dir {0}".format(lib_path))
148             os.mkdir(lib_path)
149         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
150         if os.path.exists(source_lib_dir):
151             pass
152         for filename in os.listdir(source_lib_dir):
153             source_pathname = os.path.join(source_lib_dir, filename)
154             target_pathname = os.path.join(lib_path, filename)
155             if not os.path.exists(source_pathname):
156                 raise IOError("{0} does not exist".format(source_pathname))
157             if not os.path.exists(target_pathname):
158                 os.symlink(source_pathname, target_pathname)
159                 logging.info(
160                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
161     
162
163 def link_daf(daf_path, library_result_map):
164     if not os.path.exists(daf_path):
165         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
166
167     base_daf = os.path.basename(daf_path)
168     
169     for lib_id, result_dir in library_result_map:
170         if not os.path.exists(result_dir):
171             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
172         submission_daf = os.path.join(result_dir, base_daf)
173         if not os.path.exists(submission_daf):
174             if not os.path.exists(daf_path):
175                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
176             os.link(daf_path, submission_daf)
177
178
179 def scan_submission_dirs(view_map, library_result_map):
180     """Look through our submission directories and collect needed information
181     """
182     for lib_id, result_dir in library_result_map:
183         logging.info("Importing %s from %s" % (lib_id, result_dir))
184         try:
185             view_map.import_submission_dir(result_dir, lib_id)
186         except MetadataLookupException, e:
187             logging.error("Skipping %s: %s" % (lib_id, str(e)))
188         
189 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
190     dag_fragment = []
191     for lib_id, result_dir in library_result_map:
192         submissionNode = view_map.get_submission_node(result_dir)
193         dag_fragment.extend(
194             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
195         )
196
197     if make_condor and len(dag_fragment) > 0:
198         dag_filename = 'submission.dagman'
199         if not force and os.path.exists(dag_filename):
200             logging.warn("%s exists, please delete" % (dag_filename,))
201         else:
202             f = open(dag_filename,'w')
203             f.write( os.linesep.join(dag_fragment))
204             f.write( os.linesep )
205             f.close()
206             
207
208 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
209     """
210     Make ddf files, and bonus condor file
211     """
212     query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
213 PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
214 PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
215
216 select ?submitView  ?files ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol ?readType ?insertLength
217 WHERE {
218   ?file ucscDaf:filename ?files ;
219         ucscDaf:md5sum ?md5sum .
220   ?submitView ucscDaf:has_file ?file ;
221               ucscDaf:view ?dafView ;
222               ucscDaf:submission <%(submission)s> .
223   ?dafView ucscDaf:name ?view .
224   <%(submission)s> submissionOntology:library ?library ;
225
226   OPTIONAL { ?library libraryOntology:antibody ?antibody }
227   OPTIONAL { ?library libraryOntology:cell_line ?cell }
228   OPTIONAL { <%(submission)s> ucscDaf:control ?control }
229   OPTIONAL { <%(submission)s> ucscDaf:controlId ?controlId }
230   OPTIONAL { ?library ucscDaf:sex ?sex }
231   OPTIONAL { ?library libraryOntology:library_id ?labExpId }
232   OPTIONAL { ?library libraryOntology:library_id ?labVersion }
233   OPTIONAL { ?library libraryOntology:condition ?treatment }
234   OPTIONAL { ?library ucscDaf:protocol ?protocol }
235   OPTIONAL { ?library ucscDaf:readType ?readType }
236   OPTIONAL { ?library libraryOntology:insert_size ?insertLength }
237 }
238 ORDER BY  ?submitView""" 
239     dag_fragments = []
240
241     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
242     if name is None:
243         logging.error("Need name for %s" % (str(submissionNode)))
244         return []
245
246     ddf_name = name + '.ddf'
247     if outdir is not None:
248         outfile = os.path.join(outdir, ddf_name)
249         output = open(outfile,'w')
250     else:
251         output = sys.stdout
252
253     formatted_query = query_template % {'submission': str(submissionNode.uri)}
254
255     query = RDF.SPARQLQuery(formatted_query)
256     results = query.execute(view_map.model)
257
258     variables = ['files']
259     # filename goes first
260     variables.extend(view_map.get_daf_variables())
261     # 'controlId',
262     variables += [ 'labExpId', 'md5sum']
263     output.write('\t'.join(variables))
264     output.write(os.linesep)
265     
266     all_views = {}
267     all_files = []
268     for row in results:
269         viewname = fromTypedNode(row['view'])
270         current = all_views.setdefault(viewname, {})
271         for variable_name in variables:
272             value = str(fromTypedNode(row[variable_name]))
273             if variable_name in ('files', 'md5sum'):
274                 current.setdefault(variable_name,[]).append(value)
275             else:
276                 current[variable_name] = value
277
278     for view in all_views.keys():
279         line = []
280         for variable_name in variables:
281             if variable_name in ('files', 'md5sum'):
282                 line.append(','.join(all_views[view][variable_name]))
283             else:
284                 line.append(all_views[view][variable_name])
285         output.write("\t".join(line))
286         output.write(os.linesep)
287         all_files.extend(all_views[view]['files'])
288         
289     logging.info(
290         "Examined {0}, found files: {1}".format(
291             str(submissionNode), ", ".join(all_files)))
292
293     all_files.append(daf_name)
294     all_files.append(ddf_name)
295
296     if make_condor:
297         archive_condor = make_condor_archive_script(name, all_files, outdir)
298         upload_condor = make_condor_upload_script(name, outdir)
299         
300         dag_fragments.extend( 
301             make_dag_fragment(name, archive_condor, upload_condor)
302         ) 
303         
304     return dag_fragments
305
306
307 def read_library_result_map(filename):
308     """
309     Read a file that maps library id to result directory.
310     Does not support spaces in filenames. 
311     
312     For example:
313       10000 result/foo/bar
314     """
315     stream = open(filename,'r')
316
317     results = []
318     for line in stream:
319         line = line.rstrip()
320         if not line.startswith('#') and len(line) > 0 :
321             library_id, result_dir = line.split()
322             results.append((library_id, result_dir))
323     return results
324
325
326 def make_condor_archive_script(name, files, outdir=None):
327     script = """Universe = vanilla
328
329 Executable = /bin/tar
330 arguments = czvhf ../%(archivename)s %(filelist)s
331
332 Error = compress.out.$(Process).log
333 Output = compress.out.$(Process).log
334 Log = /tmp/submission-compress-%(user)s.log
335 initialdir = %(initialdir)s
336 environment="GZIP=-3"
337 request_memory = 20
338
339 queue 
340 """
341     if outdir is None:
342         outdir = os.getcwd()
343     for f in files:
344         pathname = os.path.join(outdir, f)
345         if not os.path.exists(pathname):
346             raise RuntimeError("Missing %s from %s" % (f,outdir))
347
348     context = {'archivename': make_submission_name(name),
349                'filelist': " ".join(files),
350                'initialdir': os.path.abspath(outdir), 
351                'user': os.getlogin()}
352
353     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
354     condor_stream = open(condor_script,'w')
355     condor_stream.write(script % context)
356     condor_stream.close()
357     return condor_script
358
359
360 def make_condor_upload_script(name, outdir=None):
361     script = """Universe = vanilla
362
363 Executable = /usr/bin/lftp
364 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
365
366 Error = upload.out.$(Process).log
367 Output = upload.out.$(Process).log
368 Log = /tmp/submission-upload-%(user)s.log
369 initialdir = %(initialdir)s
370
371 queue 
372 """
373     if outdir is None:
374         outdir = os.getcwd()
375         
376     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
377     
378     encodeftp = 'encodeftp.cse.ucsc.edu'
379     ftpuser = auth.hosts[encodeftp][0]
380     ftppassword = auth.hosts[encodeftp][2]
381     context = {'archivename': make_submission_name(name),
382                'initialdir': os.path.abspath(outdir),
383                'user': os.getlogin(),
384                'ftpuser': ftpuser,
385                'ftppassword': ftppassword,
386                'ftphost': encodeftp}
387
388     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
389     condor_stream = open(condor_script,'w')
390     condor_stream.write(script % context)
391     condor_stream.close()
392     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
393
394     return condor_script
395
396
397 def make_dag_fragment(ininame, archive_condor, upload_condor):
398     """
399     Make the couple of fragments compress and then upload the data.
400     """
401     cur_dir = os.getcwd()
402     archive_condor = os.path.join(cur_dir, archive_condor)
403     upload_condor = os.path.join(cur_dir, upload_condor)
404     job_basename = make_base_name(ininame)
405
406     fragments = []
407     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
408     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
409     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
410
411     return fragments
412
413
414 def get_library_info(host, apidata, library_id):
415     url = api.library_url(host, library_id)
416     contents = api.retrieve_info(url, apidata)
417     return contents
418
419
420 def make_submission_section(line_counter, files, attributes):
421     """
422     Create a section in the submission ini file
423     """
424     inifile = [ "[line%s]" % (line_counter,) ]
425     inifile += ["files=%s" % (",".join(files))]
426
427     for k,v in attributes.items():
428         inifile += ["%s=%s" % (k,v)]
429     return inifile
430
431
432 def make_base_name(pathname):
433     base = os.path.basename(pathname)
434     name, ext = os.path.splitext(base)
435     return name
436
437
438 def make_submission_name(ininame):
439     name = make_base_name(ininame)
440     return name + ".tgz"
441
442
443 def make_ddf_name(pathname):
444     name = make_base_name(pathname)
445     return name + ".ddf"
446
447
448 def make_condor_name(pathname, run_type=None):
449     name = make_base_name(pathname)
450     elements = [name]
451     if run_type is not None:
452         elements.append(run_type)
453     elements.append("condor")
454     return ".".join(elements)
455
456
457 def parse_filelist(file_string):
458     return file_string.split(",")
459
460
461 def validate_filelist(files):
462     """
463     Die if a file doesn't exist in a file list
464     """
465     for f in files:
466         if not os.path.exists(f):
467             raise RuntimeError("%s does not exist" % (f,))
468
469 if __name__ == "__main__":
470     main()