Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser, OptionGroup
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 from subprocess import Popen, PIPE
15 import sys
16 import time
17 import types
18 import urllib
19 import urllib2
20 import urlparse
21
22 from htsworkflow.util import api
23 from htsworkflow.util.rdfhelp import \
24      dafTermOntology, \
25      fromTypedNode, \
26      get_model, \
27      get_serializer, \
28      load_into_model, \
29      submissionOntology 
30 from htsworkflow.submission.daf import \
31      DAFMapper, \
32      MetadataLookupException, \
33      get_submission_uri
34 from htsworkflow.submission.condorfastq import CondorFastqExtract
35
36 logger = logging.getLogger('ucsc_gather')
37
38 def main(cmdline=None):
39     parser = make_parser()
40     opts, args = parser.parse_args(cmdline)
41     
42     if opts.debug:
43         logging.basicConfig(level = logging.DEBUG )
44     elif opts.verbose:
45         logging.basicConfig(level = logging.INFO )
46     else:
47         logging.basicConfig(level = logging.WARNING )        
48     
49     apidata = api.make_auth_from_opts(opts, parser)
50
51     model = get_model(opts.load_model)
52     if opts.name:
53         mapper = DAFMapper(opts.name, opts.daf,  model)
54         submission_uri = get_submission_uri(opts.name)
55
56     if opts.library_url is not None:
57         mapper.library_url = opts.library_url
58         
59     if opts.load_rdf is not None:
60         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
61
62     if opts.make_ddf and opts.daf is None:
63         parser.error("Please specify your daf when making ddf files")
64
65     if len(args) == 0:
66         parser.error("I need at least one library submission-dir input file")
67         
68     library_result_map = []
69     for a in args:
70         library_result_map.extend(read_library_result_map(a))
71
72     if opts.make_tree_from is not None:
73         make_tree_from(opts.make_tree_from, library_result_map)
74             
75     if opts.link_daf:
76         link_daf(opts.daf, library_result_map)
77
78     if opts.fastq:
79         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
80                                        force=opts.force)
81         extractor.build_fastqs(library_result_map)
82
83     if opts.scan_submission:
84         scan_submission_dirs(mapper, library_result_map)
85
86     if opts.make_ddf:
87         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
88
89     if opts.print_rdf:
90         writer = get_serializer()
91         print writer.serialize_model_to_string(model)
92
93         
94 def make_parser():
95     parser = OptionParser()
96
97     model = OptionGroup(parser, 'model')
98     model.add_option('--name', help="Set submission name")
99     model.add_option('--load-model', default=None,
100       help="Load model database")
101     model.add_option('--load-rdf', default=None,
102       help="load rdf statements into model")
103     model.add_option('--print-rdf', action="store_true", default=False,
104       help="print ending model state")
105     parser.add_option_group(model)
106     # commands
107     commands = OptionGroup(parser, 'commands')
108     commands.add_option('--make-tree-from',
109                       help="create directories & link data files",
110                       default=None)
111     commands.add_option('--fastq', default=False, action="store_true",
112                         help="generate scripts for making fastq files")
113     commands.add_option('--scan-submission', default=False, action="store_true",
114                       help="Import metadata for submission into our model")
115     commands.add_option('--link-daf', default=False, action="store_true",
116                         help="link daf into submission directories")
117     commands.add_option('--make-ddf', help='make the ddfs', default=False,
118                       action="store_true")
119     parser.add_option_group(commands)
120     
121     parser.add_option('--force', default=False, action="store_true",
122                       help="Force regenerating fastqs")
123     parser.add_option('--daf', default=None, help='specify daf name')
124     parser.add_option('--library-url', default=None,
125                       help="specify an alternate source for library information")
126     # debugging
127     parser.add_option('--verbose', default=False, action="store_true",
128                       help='verbose logging')
129     parser.add_option('--debug', default=False, action="store_true",
130                       help='debug logging')
131
132     api.add_auth_options(parser)
133     
134     return parser
135
136 def make_tree_from(source_path, library_result_map):
137     """Create a tree using data files from source path.
138     """
139     for lib_id, lib_path in library_result_map:
140         if not os.path.exists(lib_path):
141             logging.info("Making dir {0}".format(lib_path))
142             os.mkdir(lib_path)
143         source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
144         if os.path.exists(source_lib_dir):
145             pass
146         for filename in os.listdir(source_lib_dir):
147             source_pathname = os.path.join(source_lib_dir, filename)
148             target_pathname = os.path.join(lib_path, filename)
149             if not os.path.exists(source_pathname):
150                 raise IOError("{0} does not exist".format(source_pathname))
151             if not os.path.exists(target_pathname):
152                 os.symlink(source_pathname, target_pathname)
153                 logging.info(
154                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
155     
156
157 def link_daf(daf_path, library_result_map):
158     if not os.path.exists(daf_path):
159         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
160
161     base_daf = os.path.basename(daf_path)
162     
163     for lib_id, result_dir in library_result_map:
164         if not os.path.exists(result_dir):
165             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
166         submission_daf = os.path.join(result_dir, base_daf)
167         if not os.path.exists(submission_daf):
168             if not os.path.exists(daf_path):
169                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
170             os.link(daf_path, submission_daf)
171
172
173 def scan_submission_dirs(view_map, library_result_map):
174     """Look through our submission directories and collect needed information
175     """
176     for lib_id, result_dir in library_result_map:
177         logging.info("Importing %s from %s" % (lib_id, result_dir))
178         try:
179             view_map.import_submission_dir(result_dir, lib_id)
180         except MetadataLookupException, e:
181             logging.error("Skipping %s: %s" % (lib_id, str(e)))
182         
183 def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
184     dag_fragment = []
185     for lib_id, result_dir in library_result_map:
186         submissionNode = view_map.get_submission_node(result_dir)
187         dag_fragment.extend(
188             make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
189         )
190
191     if make_condor and len(dag_fragment) > 0:
192         dag_filename = 'submission.dagman'
193         if not force and os.path.exists(dag_filename):
194             logging.warn("%s exists, please delete" % (dag_filename,))
195         else:
196             f = open(dag_filename,'w')
197             f.write( os.linesep.join(dag_fragment))
198             f.write( os.linesep )
199             f.close()
200             
201
202 def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
203     """
204     Make ddf files, and bonus condor file
205     """
206     dag_fragments = []
207
208     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
209     if name is None:
210         logging.error("Need name for %s" % (str(submissionNode)))
211         return []
212
213     ddf_name = name + '.ddf'
214     if outdir is not None:
215         outfile = os.path.join(outdir, ddf_name)
216         output = open(outfile,'w')
217     else:
218         output = sys.stdout
219     
220     # filename goes first
221     variables = ['filename']
222     variables.extend(view_map.get_daf_variables())
223     output.write('\t'.join(variables))
224     output.write(os.linesep)
225     
226     nameTerm = dafTermOntology['name']
227
228     submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
229     file_list = []
230     for viewNode in submission_views:
231         record = []
232         for variable_name in variables:
233             varNode = dafTermOntology[variable_name]
234             values = list(view_map.model.get_targets(viewNode, varNode))
235             
236             if variable_name == 'view':
237                 nameNode = view_map.model.get_target(values[0], nameTerm)
238                 values = [fromTypedNode(nameNode)]
239             else:
240                 values = [ fromTypedNode(v) for v in values ]
241                 if variable_name == 'filename':
242                     file_list.extend(values)
243                               
244             if len(values) == 0:
245                 attribute = "#None#"
246             elif len(values) == 1:
247                 attribute = values[0]
248             else:
249                 attribute = ",".join(values)
250             record.append(attribute)
251         output.write('\t'.join(record))
252         output.write(os.linesep)
253             
254     logging.info(
255         "Examined {0}, found files: {1}".format(
256             str(submissionNode), ", ".join(file_list)))
257
258     file_list.append(daf_name)
259     file_list.append(ddf_name)
260     
261     if make_condor:
262         print name, file_list
263         archive_condor = make_condor_archive_script(name, file_list, outdir)
264         upload_condor = make_condor_upload_script(name, outdir)
265         
266         dag_fragments.extend( 
267             make_dag_fragment(name, archive_condor, upload_condor)
268         ) 
269         
270     return dag_fragments
271
272
273 def read_library_result_map(filename):
274     """
275     Read a file that maps library id to result directory.
276     Does not support spaces in filenames. 
277     
278     For example:
279       10000 result/foo/bar
280     """
281     stream = open(filename,'r')
282
283     results = []
284     for line in stream:
285         line = line.rstrip()
286         if not line.startswith('#') and len(line) > 0 :
287             library_id, result_dir = line.split()
288             results.append((library_id, result_dir))
289     return results
290
291
292 def make_condor_archive_script(name, files, outdir=None):
293     script = """Universe = vanilla
294
295 Executable = /bin/tar
296 arguments = czvhf ../%(archivename)s %(filelist)s
297
298 Error = compress.err.$(Process).log
299 Output = compress.out.$(Process).log
300 Log = /tmp/submission-compress-%(user)s.log
301 initialdir = %(initialdir)s
302 environment="GZIP=-3"
303 request_memory = 20
304
305 queue 
306 """
307     if outdir is None:
308         outdir = os.getcwd()
309     for f in files:
310         pathname = os.path.join(outdir, f)
311         if not os.path.exists(pathname):
312             raise RuntimeError("Missing %s" % (f,))
313
314     context = {'archivename': make_submission_name(name),
315                'filelist': " ".join(files),
316                'initialdir': os.getcwd(),
317                'user': os.getlogin()}
318
319     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
320     condor_stream = open(condor_script,'w')
321     condor_stream.write(script % context)
322     condor_stream.close()
323     return condor_script
324
325
326 def make_condor_upload_script(name, outdir=None):
327     script = """Universe = vanilla
328
329 Executable = /usr/bin/lftp
330 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
331
332 Error = upload.err.$(Process).log
333 Output = upload.out.$(Process).log
334 Log = /tmp/submission-upload-%(user)s.log
335 initialdir = %(initialdir)s
336
337 queue 
338 """
339     if outdir is None:
340         outdir = os.getcwd()
341         
342     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
343     
344     encodeftp = 'encodeftp.cse.ucsc.edu'
345     ftpuser = auth.hosts[encodeftp][0]
346     ftppassword = auth.hosts[encodeftp][2]
347     context = {'archivename': make_submission_name(name),
348                'initialdir': outdir,
349                'user': os.getlogin(),
350                'ftpuser': ftpuser,
351                'ftppassword': ftppassword,
352                'ftphost': encodeftp}
353
354     condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
355     condor_stream = open(condor_script,'w')
356     condor_stream.write(script % context)
357     condor_stream.close()
358     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
359
360     return condor_script
361
362
363 def make_dag_fragment(ininame, archive_condor, upload_condor):
364     """
365     Make the couple of fragments compress and then upload the data.
366     """
367     cur_dir = os.getcwd()
368     archive_condor = os.path.join(cur_dir, archive_condor)
369     upload_condor = os.path.join(cur_dir, upload_condor)
370     job_basename = make_base_name(ininame)
371
372     fragments = []
373     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
374     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
375     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
376
377     return fragments
378
379
380 def get_library_info(host, apidata, library_id):
381     url = api.library_url(host, library_id)
382     contents = api.retrieve_info(url, apidata)
383     return contents
384
385
386 def make_submission_section(line_counter, files, attributes):
387     """
388     Create a section in the submission ini file
389     """
390     inifile = [ "[line%s]" % (line_counter,) ]
391     inifile += ["files=%s" % (",".join(files))]
392
393     for k,v in attributes.items():
394         inifile += ["%s=%s" % (k,v)]
395     return inifile
396
397
398 def make_base_name(pathname):
399     base = os.path.basename(pathname)
400     name, ext = os.path.splitext(base)
401     return name
402
403
404 def make_submission_name(ininame):
405     name = make_base_name(ininame)
406     return name + ".tgz"
407
408
409 def make_ddf_name(pathname):
410     name = make_base_name(pathname)
411     return name + ".ddf"
412
413
414 def make_condor_name(pathname, run_type=None):
415     name = make_base_name(pathname)
416     elements = [name]
417     if run_type is not None:
418         elements.append(run_type)
419     elements.append("condor")
420     return ".".join(elements)
421
422
423 def parse_filelist(file_string):
424     return file_string.split(",")
425
426
427 def validate_filelist(files):
428     """
429     Die if a file doesn't exist in a file list
430     """
431     for f in files:
432         if not os.path.exists(f):
433             raise RuntimeError("%s does not exist" % (f,))
434
435 def make_md5sum(filename):
436     """Quickly find the md5sum of a file
437     """
438     md5_cache = os.path.join(filename+".md5")
439     print md5_cache
440     if os.path.exists(md5_cache):
441         logging.debug("Found md5sum in {0}".format(md5_cache))
442         stream = open(md5_cache,'r')
443         lines = stream.readlines()
444         md5sum = parse_md5sum_line(lines, filename)
445     else:
446         md5sum = make_md5sum_unix(filename, md5_cache)
447     return md5sum
448     
449 def make_md5sum_unix(filename, md5_cache):
450     cmd = ["md5sum", filename]
451     logging.debug("Running {0}".format(" ".join(cmd)))
452     p = Popen(cmd, stdout=PIPE)
453     stdin, stdout = p.communicate()
454     retcode = p.wait()
455     logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
456     if retcode != 0:
457         logging.error("Trouble with md5sum for {0}".format(filename))
458         return None
459     lines = stdin.split(os.linesep)
460     md5sum = parse_md5sum_line(lines, filename)
461     if md5sum is not None:
462         logging.debug("Caching sum in {0}".format(md5_cache))
463         stream = open(md5_cache, "w")
464         stream.write(stdin)
465         stream.close()
466     return md5sum
467
468 def parse_md5sum_line(lines, filename):
469     md5sum, md5sum_filename = lines[0].split()
470     if md5sum_filename != filename:
471         errmsg = "MD5sum and I disagre about filename. {0} != {1}"
472         logging.error(errmsg.format(filename, md5sum_filename))
473         return None
474     return md5sum
475
476 if __name__ == "__main__":
477     main()