Finish new RDF based ddf construction.
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 from subprocess import Popen, PIPE
15 import sys
16 import time
17 import types
18 import urllib
19 import urllib2
20 import urlparse
21
22 from htsworkflow.util import api
23 from htsworkflow.util.rdfhelp import \
24      dafTermOntology, \
25      fromTypedNode, \
26      get_model, \
27      get_serializer, \
28      load_into_model, \
29      submissionOntology 
30 from htsworkflow.submission.daf import \
31      DAFMapper, \
32      MetadataLookupException, \
33      get_submission_uri
34 from htsworkflow.submission.condorfastq import CondorFastqExtract
35
36 logger = logging.getLogger('ucsc_gather')
37
38 def main(cmdline=None):
39     parser = make_parser()
40     opts, args = parser.parse_args(cmdline)
41     
42     if opts.debug:
43         logging.basicConfig(level = logging.DEBUG )
44     elif opts.verbose:
45         logging.basicConfig(level = logging.INFO )
46     else:
47         logging.basicConfig(level = logging.WARNING )        
48     
49     apidata = api.make_auth_from_opts(opts, parser)
50
51     model = get_model(opts.load_model)
52     mapper = DAFMapper(opts.name, opts.daf,  model)
53     submission_uri = get_submission_uri(opts.name)
54
55     if opts.library_url is not None:
56         mapper.library_url = opts.library_url
57         
58     if opts.load_rdf is not None:
59         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
60
61     if opts.make_ddf and opts.daf is None:
62         parser.error("Please specify your daf when making ddf files")
63
64     if len(args) == 0:
65         parser.error("I need at least one library submission-dir input file")
66         
67     library_result_map = []
68     for a in args:
69         library_result_map.extend(read_library_result_map(a))
70
71     if opts.make_tree_from is not None:
72         make_tree_from(opts.make_tree_from, library_result_map)
73             
74     if opts.link_daf:
75         link_daf(opts.daf, library_result_map)
76
77     if opts.fastq:
78         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
79                                        force=opts.force)
80         extractor.build_fastqs(library_result_map)
81
82     if opts.scan_submission:
83         scan_submission_dirs(mapper, library_result_map)
84
85     if opts.make_ddf:
86         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
87
88     if opts.print_rdf:
89         writer = get_serializer()
90         print writer.serialize_model_to_string(model)
91
92         
93 def make_parser():
94     parser = OptionParser()
95
96     model = OptionGroup(parser, 'model')
97     model.add_option('--name', help="Set submission name")
98     model.add_option('--load-model', default=None,
99       help="Load model database")
100     model.add_option('--load-rdf', default=None,
101       help="load rdf statements into model")
102     model.add_option('--print-rdf', action="store_true", default=False,
103       help="print ending model state")
104     parser.add_option_group(model)
105     # commands
106     commands = OptionGroup(parser, 'commands')
107     commands.add_option('--make-tree-from',
108                       help="create directories & link data files",
109                       default=None)
110     commands.add_option('--fastq', default=False, action="store_true",
111                         help="generate scripts for making fastq files")
112     commands.add_option('--scan-submission', default=False, action="store_true",
113                       help="Import metadata for submission into our model")
114     commands.add_option('--link-daf', default=False, action="store_true",
115                         help="link daf into submission directories")
116     commands.add_option('--make-ddf', help='make the ddfs', default=False,
117                       action="store_true")
118     parser.add_option_group(commands)
119     
120     parser.add_option('--force', default=False, action="store_true",
121                       help="Force regenerating fastqs")
122     parser.add_option('--daf', default=None, help='specify daf name')
123     parser.add_option('--library-url', default=None,
124                       help="specify an alternate source for library information")
125     # debugging
126     parser.add_option('--verbose', default=False, action="store_true",
127                       help='verbose logging')
128     parser.add_option('--debug', default=False, action="store_true",
129                       help='debug logging')
130
131     api.add_auth_options(parser)
132     
133     return parser
134
135 def make_tree_from(source_path, library_result_map):
136     """Create a tree using data files from source path.
137     """
138     for lib_id, lib_path in library_result_map:
139         if not os.path.exists(lib_path):
140             logging.info("Making dir {0}".format(lib_path))
141             os.mkdir(lib_path)
142         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
143         if os.path.exists(source_lib_dir):
144             pass
145         for filename in os.listdir(source_lib_dir):
146             source_pathname = os.path.join(source_lib_dir, filename)
147             target_pathname = os.path.join(lib_path, filename)
148             if not os.path.exists(source_pathname):
149                 raise IOError("{0} does not exist".format(source_pathname))
150             if not os.path.exists(target_pathname):
151                 os.symlink(source_pathname, target_pathname)
152                 logging.info(
153                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
154     
155
156 def link_daf(daf_path, library_result_map):
157     if not os.path.exists(daf_path):
158         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
159
160     base_daf = os.path.basename(daf_path)
161     
162     for lib_id, result_dir in library_result_map:
163         if not os.path.exists(result_dir):
164             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
165         submission_daf = os.path.join(result_dir, base_daf)
166         if not os.path.exists(submission_daf):
167             if not os.path.exists(daf_path):
168                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
169             os.link(daf_path, submission_daf)
170
171
172 def scan_submission_dirs(view_map, library_result_map):
173     """Look through our submission directories and collect needed information
174     """
175     for lib_id, result_dir in library_result_map:
176         logging.info("Importing %s from %s" % (lib_id, result_dir))
177         try:
178             view_map.import_submission_dir(result_dir, lib_id)
179         except MetadataLookupException, e:
180             logging.error("Skipping %s: %s" % (lib_id, str(e)))
181         
182 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
183     dag_fragment = []
184     for lib_id, result_dir in library_result_map:
185         submissionNode = view_map.get_submission_node(result_dir)
186         dag_fragment.extend(
187             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
188         )
189
190     if make_condor and len(dag_fragment) > 0:
191         dag_filename = 'submission.dagman'
192         if not force and os.path.exists(dag_filename):
193             logging.warn("%s exists, please delete" % (dag_filename,))
194         else:
195             f = open(dag_filename,'w')
196             f.write( os.linesep.join(dag_fragment))
197             f.write( os.linesep )
198             f.close()
199             
200
201 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
202     """
203     Make ddf files, and bonus condor file
204     """
205     dag_fragments = []
206
207     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
208     if name is None:
209         logging.error("Need name for %s" % (str(submissionNode)))
210         return []
211
212     ddf_name = name + '.ddf'
213     if outdir is not None:
214         outfile = os.path.join(outdir, ddf_name)
215         output = open(outfile,'w')
216     else:
217         output = sys.stdout
218     
219     # filename goes first
220     variables = ['filename']
221     variables.extend(view_map.get_daf_variables())
222     output.write('\t'.join(variables))
223     output.write(os.linesep)
224     
225     nameTerm = dafTermOntology['name']
226
227     submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
228     file_list = []
229     for viewNode in submission_views:
230         record = []
231         for variable_name in variables:
232             varNode = dafTermOntology[variable_name]
233             values = list(view_map.model.get_targets(viewNode, varNode))
234             
235             if variable_name == 'view':
236                 nameNode = view_map.model.get_target(values[0], nameTerm)
237                 values = [fromTypedNode(nameNode)]
238             else:
239                 values = [ fromTypedNode(v) for v in values ]
240                 if variable_name == 'filename':
241                     file_list.extend(values)
242                               
243             if len(values) == 0:
244                 attribute = "#None#"
245             elif len(values) == 1:
246                 attribute = values[0]
247             else:
248                 attribute = ",".join(values)
249             record.append(attribute)
250         output.write('\t'.join(record))
251         output.write(os.linesep)
252             
253     logging.info(
254         "Examined {0}, found files: {1}".format(
255             str(submissionNode), ", ".join(file_list)))
256
257     file_list.append(daf_name)
258     file_list.append(ddf_name)
259     
260     if make_condor:
261         print name, file_list
262         archive_condor = make_condor_archive_script(name, file_list, outdir)
263         upload_condor = make_condor_upload_script(name, outdir)
264         
265         dag_fragments.extend( 
266             make_dag_fragment(name, archive_condor, upload_condor)
267         ) 
268         
269     return dag_fragments
270
271
272 def read_library_result_map(filename):
273     """
274     Read a file that maps library id to result directory.
275     Does not support spaces in filenames. 
276     
277     For example:
278       10000 result/foo/bar
279     """
280     stream = open(filename,'r')
281
282     results = []
283     for line in stream:
284         line = line.rstrip()
285         if not line.startswith('#') and len(line) > 0 :
286             library_id, result_dir = line.split()
287             results.append((library_id, result_dir))
288     return results
289
290
291 def make_condor_archive_script(name, files, outdir=None):
292     script = """Universe = vanilla
293
294 Executable = /bin/tar
295 arguments = czvhf ../%(archivename)s %(filelist)s
296
297 Error = compress.err.$(Process).log
298 Output = compress.out.$(Process).log
299 Log = /tmp/submission-compress-%(user)s.log
300 initialdir = %(initialdir)s
301 environment="GZIP=-3"
302 request_memory = 20
303
304 queue 
305 """
306     if outdir is None:
307         outdir = os.getcwd()
308     for f in files:
309         pathname = os.path.join(outdir, f)
310         if not os.path.exists(pathname):
311             raise RuntimeError("Missing %s" % (f,))
312
313     context = {'archivename': make_submission_name(name),
314                'filelist': " ".join(files),
315                'initialdir': os.getcwd(),
316                'user': os.getlogin()}
317
318     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
319     condor_stream = open(condor_script,'w')
320     condor_stream.write(script % context)
321     condor_stream.close()
322     return condor_script
323
324
325 def make_condor_upload_script(name, outdir=None):
326     script = """Universe = vanilla
327
328 Executable = /usr/bin/lftp
329 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
330
331 Error = upload.err.$(Process).log
332 Output = upload.out.$(Process).log
333 Log = /tmp/submission-upload-%(user)s.log
334 initialdir = %(initialdir)s
335
336 queue 
337 """
338     if outdir is None:
339         outdir = os.getcwd()
340         
341     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
342     
343     encodeftp = 'encodeftp.cse.ucsc.edu'
344     ftpuser = auth.hosts[encodeftp][0]
345     ftppassword = auth.hosts[encodeftp][2]
346     context = {'archivename': make_submission_name(name),
347                'initialdir': outdir,
348                'user': os.getlogin(),
349                'ftpuser': ftpuser,
350                'ftppassword': ftppassword,
351                'ftphost': encodeftp}
352
353     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
354     condor_stream = open(condor_script,'w')
355     condor_stream.write(script % context)
356     condor_stream.close()
357     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
358
359     return condor_script
360
361
362 def make_dag_fragment(ininame, archive_condor, upload_condor):
363     """
364     Make the couple of fragments compress and then upload the data.
365     """
366     cur_dir = os.getcwd()
367     archive_condor = os.path.join(cur_dir, archive_condor)
368     upload_condor = os.path.join(cur_dir, upload_condor)
369     job_basename = make_base_name(ininame)
370
371     fragments = []
372     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
373     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
374     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
375
376     return fragments
377
378
379 def get_library_info(host, apidata, library_id):
380     url = api.library_url(host, library_id)
381     contents = api.retrieve_info(url, apidata)
382     return contents
383
384
385 def make_submission_section(line_counter, files, attributes):
386     """
387     Create a section in the submission ini file
388     """
389     inifile = [ "[line%s]" % (line_counter,) ]
390     inifile += ["files=%s" % (",".join(files))]
391
392     for k,v in attributes.items():
393         inifile += ["%s=%s" % (k,v)]
394     return inifile
395
396
397 def make_base_name(pathname):
398     base = os.path.basename(pathname)
399     name, ext = os.path.splitext(base)
400     return name
401
402
403 def make_submission_name(ininame):
404     name = make_base_name(ininame)
405     return name + ".tgz"
406
407
408 def make_ddf_name(pathname):
409     name = make_base_name(pathname)
410     return name + ".ddf"
411
412
413 def make_condor_name(pathname, run_type=None):
414     name = make_base_name(pathname)
415     elements = [name]
416     if run_type is not None:
417         elements.append(run_type)
418     elements.append("condor")
419     return ".".join(elements)
420
421
422 def parse_filelist(file_string):
423     return file_string.split(",")
424
425
426 def validate_filelist(files):
427     """
428     Die if a file doesn't exist in a file list
429     """
430     for f in files:
431         if not os.path.exists(f):
432             raise RuntimeError("%s does not exist" % (f,))
433
434 def make_md5sum(filename):
435     """Quickly find the md5sum of a file
436     """
437     md5_cache = os.path.join(filename+".md5")
438     print md5_cache
439     if os.path.exists(md5_cache):
440         logging.debug("Found md5sum in {0}".format(md5_cache))
441         stream = open(md5_cache,'r')
442         lines = stream.readlines()
443         md5sum = parse_md5sum_line(lines, filename)
444     else:
445         md5sum = make_md5sum_unix(filename, md5_cache)
446     return md5sum
447     
448 def make_md5sum_unix(filename, md5_cache):
449     cmd = ["md5sum", filename]
450     logging.debug("Running {0}".format(" ".join(cmd)))
451     p = Popen(cmd, stdout=PIPE)
452     stdin, stdout = p.communicate()
453     retcode = p.wait()
454     logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
455     if retcode != 0:
456         logging.error("Trouble with md5sum for {0}".format(filename))
457         return None
458     lines = stdin.split(os.linesep)
459     md5sum = parse_md5sum_line(lines, filename)
460     if md5sum is not None:
461         logging.debug("Caching sum in {0}".format(md5_cache))
462         stream = open(md5_cache, "w")
463         stream.write(stdin)
464         stream.close()
465     return md5sum
466
467 def parse_md5sum_line(lines, filename):
468     md5sum, md5sum_filename = lines[0].split()
469     if md5sum_filename != filename:
470         errmsg = "MD5sum and I disagre about filename. {0} != {1}"
471         logging.error(errmsg.format(filename, md5sum_filename))
472         return None
473     return md5sum
474
475 if __name__ == "__main__":
476     main()