c65feecdfae6598260aba209ea5cabd24d8b2b6c
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 import fnmatch
4 from glob import glob
5 import json
6 import logging
7 import netrc
8 from optparse import OptionParser
9 import os
10 from pprint import pprint, pformat
11 import shlex
12 from StringIO import StringIO
13 import stat
14 from subprocess import Popen, PIPE
15 import sys
16 import time
17 import types
18 import urllib
19 import urllib2
20 import urlparse
21
22 from htsworkflow.util import api
23 from htsworkflow.submission.condorfastq import CondorFastqExtract
24
25 def main(cmdline=None):
26     parser = make_parser()
27     opts, args = parser.parse_args(cmdline)
28     
29     if opts.debug:
30         logging.basicConfig(level = logging.DEBUG )
31     elif opts.verbose:
32         logging.basicConfig(level = logging.INFO )
33     else:
34         logging.basicConfig(level = logging.WARNING )        
35     
36     apidata = api.make_auth_from_opts(opts, parser)
37
38     if opts.makeddf and opts.daf is None:
39         parser.error("Please specify your daf when making ddf files")
40
41     if len(args) == 0:
42         parser.error("I need at least one library submission-dir input file")
43         
44     library_result_map = []
45     for a in args:
46         library_result_map.extend(read_library_result_map(a))
47
48     if opts.make_tree_from is not None:
49         make_tree_from(opts.make_tree_from, library_result_map)
50             
51     if opts.daf is not None:
52         link_daf(opts.daf, library_result_map)
53
54     if opts.fastq:
55         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
56                                        force=opts.force)
57         extractor.build_fastqs(library_result_map)
58
59     if opts.ini:
60         make_submission_ini(opts.host, apidata, library_result_map)
61
62     if opts.makeddf:
63         make_all_ddfs(library_result_map, opts.daf, force=opts.force)
64
65
66 def make_parser():
67     parser = OptionParser()
68
69     # commands
70     parser.add_option('--make-tree-from',
71                       help="create directories & link data files",
72                       default=None)
73     parser.add_option('--fastq', help="generate scripts for making fastq files",
74                       default=False, action="store_true")
75
76     parser.add_option('--ini', help="generate submission ini file", default=False,
77                       action="store_true")
78
79     parser.add_option('--makeddf', help='make the ddfs', default=False,
80                       action="store_true")
81     
82     parser.add_option('--daf', default=None, help='specify daf name')
83     parser.add_option('--force', default=False, action="store_true",
84                       help="Force regenerating fastqs")
85
86     # debugging
87     parser.add_option('--verbose', default=False, action="store_true",
88                       help='verbose logging')
89     parser.add_option('--debug', default=False, action="store_true",
90                       help='debug logging')
91
92     api.add_auth_options(parser)
93     
94     return parser
95
96
97 def make_tree_from(source_path, library_result_map):
98     """Create a tree using data files from source path.
99     """
100     for lib_id, lib_path in library_result_map:
101         if not os.path.exists(lib_path):
102             logging.info("Making dir {0}".format(lib_path))
103             os.mkdir(lib_path)
104         source_lib_dir = os.path.join(source_path, lib_path)
105         if os.path.exists(source_lib_dir):
106             pass
107         for filename in os.listdir(source_lib_dir):
108             source_pathname = os.path.join(source_lib_dir, filename)
109             target_pathname = os.path.join(lib_path, filename)
110             if not os.path.exists(source_pathname):
111                 raise IOError("{0} does not exist".format(source_pathname))
112             if not os.path.exists(target_pathname):
113                 os.symlink(source_pathname, target_pathname)
114                 logging.info(
115                     'LINK {0} to {1}'.format(source_pathname, target_pathname))
116     
117
118 def link_daf(daf_path, library_result_map):
119     if not os.path.exists(daf_path):
120         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
121
122     base_daf = os.path.basename(daf_path)
123     
124     for lib_id, result_dir in library_result_map:
125         if not os.path.exists(result_dir):
126             raise RuntimeError("Couldn't find target directory %s" %(result_dir,))
127         submission_daf = os.path.join(result_dir, base_daf)
128         if not os.path.exists(submission_daf):
129             if not os.path.exists(daf_path):
130                 raise RuntimeError("Couldn't find daf: %s" %(daf_path,))
131             os.link(daf_path, submission_daf)
132
133
134 def make_submission_ini(host, apidata, library_result_map, paired=True):
135     #attributes = get_filename_attribute_map(paired)
136     view_map = NameToViewMap(host, apidata)
137     
138     candidate_fastq_src = {}
139
140     for lib_id, result_dir in library_result_map:
141         order_by = ['order_by=files', 'view', 'replicate', 'cell', 
142                     'readType', 'mapAlgorithm', 'insertLength', 'md5sum' ]
143         inifile =  ['[config]']
144         inifile += [" ".join(order_by)]
145         inifile += ['']
146         line_counter = 1
147         result_ini = os.path.join(result_dir, result_dir+'.ini')
148
149         # write other lines
150         submission_files = os.listdir(result_dir)
151         fastqs = {}
152         fastq_attributes = {}
153         for f in submission_files:
154             attributes = view_map.find_attributes(f, lib_id)
155             if attributes is None:
156                 raise ValueError("Unrecognized file: %s" % (f,))
157             attributes['md5sum'] = "None"
158             
159             ext = attributes["extension"]
160             if attributes['view'] is None:                   
161                 continue               
162             elif attributes.get("type", None) == 'fastq':
163                 fastqs.setdefault(ext, set()).add(f)
164                 fastq_attributes[ext] = attributes
165             else:
166                 md5sum = make_md5sum(os.path.join(result_dir,f))
167                 if md5sum is not None:
168                     attributes['md5sum']=md5sum
169                 inifile.extend(
170                     make_submission_section(line_counter,
171                                             [f],
172                                             attributes
173                                             )
174                     )
175                 inifile += ['']
176                 line_counter += 1
177                 # add in fastqs on a single line.
178
179         for extension, fastq_files in fastqs.items():
180             inifile.extend(
181                 make_submission_section(line_counter, 
182                                         fastq_files,
183                                         fastq_attributes[extension])
184             )
185             inifile += ['']
186             line_counter += 1
187             
188         f = open(result_ini,'w')
189         f.write(os.linesep.join(inifile))
190
191         
192 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
193     dag_fragment = []
194     for lib_id, result_dir in library_result_map:
195         ininame = result_dir+'.ini'
196         inipathname = os.path.join(result_dir, ininame)
197         if os.path.exists(inipathname):
198             dag_fragment.extend(
199                 make_ddf(ininame, daf_name, True, make_condor, result_dir)
200             )
201
202     if make_condor and len(dag_fragment) > 0:
203         dag_filename = 'submission.dagman'
204         if not force and os.path.exists(dag_filename):
205             logging.warn("%s exists, please delete" % (dag_filename,))
206         else:
207             f = open(dag_filename,'w')
208             f.write( os.linesep.join(dag_fragment))
209             f.write( os.linesep )
210             f.close()
211             
212
213 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
214     """
215     Make ddf files, and bonus condor file
216     """
217     dag_fragments = []
218     curdir = os.getcwd()
219     if outdir is not None:
220         os.chdir(outdir)
221     output = sys.stdout
222     ddf_name = None
223     if guess_ddf:
224         ddf_name = make_ddf_name(ininame)
225         print ddf_name
226         output = open(ddf_name,'w')
227
228     file_list = read_ddf_ini(ininame, output)
229     logging.info(
230         "Read config {0}, found files: {1}".format(
231             ininame, ", ".join(file_list)))
232
233     file_list.append(daf_name)
234     if ddf_name is not None:
235         file_list.append(ddf_name)
236
237     if make_condor:
238         archive_condor = make_condor_archive_script(ininame, file_list)
239         upload_condor = make_condor_upload_script(ininame)
240         
241         dag_fragments.extend( 
242             make_dag_fragment(ininame, archive_condor, upload_condor)
243         ) 
244         
245     os.chdir(curdir)
246     
247     return dag_fragments
248
249
250 def read_ddf_ini(filename, output=sys.stdout):
251     """
252     Read a ini file and dump out a tab delmited text file
253     """
254     file_list = []
255     config = SafeConfigParser()
256     config.read(filename)
257
258     order_by = shlex.split(config.get("config", "order_by"))
259
260     output.write("\t".join(order_by))
261     output.write(os.linesep)
262     sections = config.sections()
263     sections.sort()
264     for section in sections:
265         if section == "config":
266             # skip the config block
267             continue
268         values = []
269         for key in order_by:
270             v = config.get(section, key)
271             values.append(v)
272             if key == 'files':
273                 file_list.extend(parse_filelist(v))
274                 
275         output.write("\t".join(values))
276         output.write(os.linesep)
277     return file_list
278
279
280 def read_library_result_map(filename):
281     """
282     Read a file that maps library id to result directory.
283     Does not support spaces in filenames. 
284     
285     For example:
286       10000 result/foo/bar
287     """
288     stream = open(filename,'r')
289
290     results = []
291     for line in stream:
292         line = line.rstrip()
293         if not line.startswith('#') and len(line) > 0 :
294             library_id, result_dir = line.split()
295             results.append((library_id, result_dir))
296     return results
297
298
299 def make_condor_archive_script(ininame, files):
300     script = """Universe = vanilla
301
302 Executable = /bin/tar
303 arguments = czvhf ../%(archivename)s %(filelist)s
304
305 Error = compress.err.$(Process).log
306 Output = compress.out.$(Process).log
307 Log = /tmp/submission-compress-%(user)s.log
308 initialdir = %(initialdir)s
309 environment="GZIP=-3"
310 request_memory = 20
311
312 queue 
313 """
314     for f in files:
315         if not os.path.exists(f):
316             raise RuntimeError("Missing %s" % (f,))
317
318     context = {'archivename': make_submission_name(ininame),
319                'filelist': " ".join(files),
320                'initialdir': os.getcwd(),
321                'user': os.getlogin()}
322
323     condor_script = make_condor_name(ininame, 'archive')
324     condor_stream = open(condor_script,'w')
325     condor_stream.write(script % context)
326     condor_stream.close()
327     return condor_script
328
329
330 def make_condor_upload_script(ininame):
331     script = """Universe = vanilla
332
333 Executable = /usr/bin/lftp
334 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
335
336 Error = upload.err.$(Process).log
337 Output = upload.out.$(Process).log
338 Log = /tmp/submission-upload-%(user)s.log
339 initialdir = %(initialdir)s
340
341 queue 
342 """
343     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
344     
345     encodeftp = 'encodeftp.cse.ucsc.edu'
346     ftpuser = auth.hosts[encodeftp][0]
347     ftppassword = auth.hosts[encodeftp][2]
348     context = {'archivename': make_submission_name(ininame),
349                'initialdir': os.getcwd(),
350                'user': os.getlogin(),
351                'ftpuser': ftpuser,
352                'ftppassword': ftppassword,
353                'ftphost': encodeftp}
354
355     condor_script = make_condor_name(ininame, 'upload')
356     condor_stream = open(condor_script,'w')
357     condor_stream.write(script % context)
358     condor_stream.close()
359     os.chmod(condor_script, stat.S_IREAD|stat.S_IWRITE)
360
361     return condor_script
362
363
364 def make_dag_fragment(ininame, archive_condor, upload_condor):
365     """
366     Make the couple of fragments compress and then upload the data.
367     """
368     cur_dir = os.getcwd()
369     archive_condor = os.path.join(cur_dir, archive_condor)
370     upload_condor = os.path.join(cur_dir, upload_condor)
371     job_basename = make_base_name(ininame)
372
373     fragments = []
374     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
375     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
376     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
377
378     return fragments
379
380
381 def get_library_info(host, apidata, library_id):
382     url = api.library_url(host, library_id)
383     contents = api.retrieve_info(url, apidata)
384     return contents
385
386
387 class NameToViewMap(object):
388     """Determine view attributes for a given submission file name
389     """
390     def __init__(self, root_url, apidata):
391         self.root_url = root_url
392         self.apidata = apidata
393         
394         self.lib_cache = {}
395         self.lib_paired = {}
396         # ma is "map algorithm"
397         ma = 'TH1014'
398
399         self.patterns = [
400             # for 2011 Feb 18 elements submission
401             ('final_Cufflinks_genes_*gtf',       'GeneDeNovo'),
402             ('final_Cufflinks_transcripts_*gtf', 'TranscriptDeNovo'),
403             ('final_exonFPKM-Cufflinks-0.9.3-GENCODE-v3c-*.gtf',       
404              'ExonsGencV3c'),
405             ('final_GENCODE-v3-Cufflinks-0.9.3.genes-*gtf',          
406              'GeneGencV3c'),
407             ('final_GENCODE-v3-Cufflinks-0.9.3.transcripts-*gtf',    
408              'TranscriptGencV3c'),
409             ('final_TSS-Cufflinks-0.9.3-GENCODE-v3c-*.gtf', 'TSS'),
410             ('final_junctions-*.bed6+3',                    'Junctions'),
411             
412             ('*.bai',                   None),
413             ('*.splices.bam',           'Splices'),
414             ('*.bam',                   self._guess_bam_view),
415             ('junctions.bed',           'Junctions'),
416             ('*.jnct',                  'Junctions'),
417             ('*unique.bigwig',         None),
418             ('*plus.bigwig',           'PlusSignal'),
419             ('*minus.bigwig',          'MinusSignal'),
420             ('*.bigwig',                'Signal'),
421             ('*.tar.bz2',               None),
422             ('*.condor',                None),
423             ('*.daf',                   None),
424             ('*.ddf',                   None),
425
426             ('*ufflinks?0.9.3.genes.gtf',       'GeneDeNovo'),
427             ('*ufflinks?0.9.3.transcripts.gtf', 'TranscriptDeNovo'),
428             ('*GENCODE-v3c.exonFPKM.gtf',        'ExonsGencV3c'),
429             ('*GENCODE-v3c.genes.gtf',           'GeneGencV3c'),
430             ('*GENCODE-v3c.transcripts.gtf',     'TranscriptGencV3c'),
431             ('*GENCODE-v3c.TSS.gtf',             'TSS'),
432             ('*.junctions.bed6+3',                'Junctions'),
433             
434             ('*.?ufflinks-0.9.0?genes.expr',       'GeneDeNovo'),
435             ('*.?ufflinks-0.9.0?transcripts.expr', 'TranscriptDeNovo'),
436             ('*.?ufflinks-0.9.0?transcripts.gtf',  'GeneModel'),
437
438             ('*.GENCODE-v3c?genes.expr',       'GeneGCV3c'),
439             ('*.GENCODE-v3c?transcript*.expr', 'TranscriptGCV3c'),
440             ('*.GENCODE-v3c?transcript*.gtf',  'TranscriptGencV3c'),
441             ('*.GENCODE-v4?genes.expr',        None), #'GeneGCV4'),
442             ('*.GENCODE-v4?transcript*.expr',  None), #'TranscriptGCV4'),
443             ('*.GENCODE-v4?transcript*.gtf',   None), #'TranscriptGencV4'),
444             ('*_1.75mers.fastq',              'FastqRd1'),
445             ('*_2.75mers.fastq',              'FastqRd2'),
446             ('*_r1.fastq',              'FastqRd1'),
447             ('*_r2.fastq',              'FastqRd2'),
448             ('*.fastq',                 'Fastq'),
449             ('*.gtf',                   'GeneModel'),
450             ('*.ini',                   None),
451             ('*.log',                   None),
452             ('*.md5',                   None),
453             ('paired-end-distribution*', 'InsLength'),
454             ('*.stats.txt',              'InsLength'),
455             ('*.srf',                   None),
456             ('*.wig',                   None),
457             ('*.zip',                   None),
458             ('transfer_log',            None),
459             ]
460
461         self.views = {
462             None: {"MapAlgorithm": "NA"},
463             "Paired": {"MapAlgorithm": ma},
464             "Aligns": {"MapAlgorithm": ma},
465             "Single": {"MapAlgorithm": ma},
466             "Splices": {"MapAlgorithm": ma},
467             "Junctions": {"MapAlgorithm": ma},
468             "PlusSignal": {"MapAlgorithm": ma},
469             "MinusSignal": {"MapAlgorithm": ma},
470             "Signal": {"MapAlgorithm": ma},
471             "GeneModel": {"MapAlgorithm": ma},
472             "GeneDeNovo": {"MapAlgorithm": ma},
473             "TranscriptDeNovo": {"MapAlgorithm": ma},
474             "ExonsGencV3c": {"MapAlgorithm": ma},
475             "GeneGencV3c": {"MapAlgorithm": ma},
476             "TSS": {"MapAlgorithm": ma},
477             "GeneGCV3c": {"MapAlgorithm": ma},
478             "TranscriptGCV3c": {"MapAlgorithm": ma},
479             "TranscriptGencV3c": {"MapAlgorithm": ma},
480             "GeneGCV4": {"MapAlgorithm": ma},
481             "TranscriptGCV4": {"MapAlgorithm": ma},
482             "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
483             "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
484             "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
485             "InsLength": {"MapAlgorithm": ma},
486             }
487         # view name is one of the attributes
488         for v in self.views.keys():
489             self.views[v]['view'] = v
490             
491     def find_attributes(self, pathname, lib_id):
492         """Looking for the best extension
493         The 'best' is the longest match
494         
495         :Args:
496         filename (str): the filename whose extention we are about to examine
497         """
498         path, filename = os.path.splitext(pathname)
499         if not self.lib_cache.has_key(lib_id):
500             self.lib_cache[lib_id] = get_library_info(self.root_url,
501                                                       self.apidata, lib_id)
502
503         lib_info = self.lib_cache[lib_id]
504         if lib_info['cell_line'].lower() == 'unknown':
505             logging.warn("Library %s missing cell_line" % (lib_id,))
506         attributes = {
507             'cell': lib_info['cell_line'],
508             'replicate': lib_info['replicate'],
509             }
510         is_paired = self._is_paired(lib_id, lib_info)
511         
512         if is_paired:
513             attributes.update(self.get_paired_attributes(lib_info))
514         else:
515             attributes.update(self.get_single_attributes(lib_info))
516             
517         for pattern, view in self.patterns:
518             if fnmatch.fnmatch(pathname, pattern):
519                 if callable(view):
520                     view = view(is_paired=is_paired)
521                     
522                 attributes.update(self.views[view])
523                 attributes["extension"] = pattern
524                 return attributes
525
526
527     def _guess_bam_view(self, is_paired=True):
528         """Guess a view name based on library attributes
529         """
530         if is_paired:
531             return "Paired"
532         else:
533             return "Aligns"
534
535
536     def _is_paired(self, lib_id, lib_info):
537         """Determine if a library is paired end"""
538         # TODO: encode this information in the library type page.
539         single = (1,3,6)
540         if len(lib_info["lane_set"]) == 0:
541             # we haven't sequenced anything so guess based on library type
542             if lib_info['library_type_id'] in single:
543                 return False
544             else:
545                 return True
546
547         if not self.lib_paired.has_key(lib_id):
548             is_paired = 0
549             isnot_paired = 0
550             failed = 0
551             # check to see if all the flowcells are the same.
552             # otherwise we might need to do something complicated
553             for flowcell in lib_info["lane_set"]:
554                 # yes there's also a status code, but this comparison 
555                 # is easier to read
556                 if flowcell["status"].lower() == "failed":
557                     # ignore failed flowcell
558                     failed += 1
559                     pass
560                 elif flowcell["paired_end"]:
561                     is_paired += 1
562                 else:
563                     isnot_paired += 1
564                     
565             logging.debug("Library %s: %d paired, %d single, %d failed" % \
566                      (lib_info["library_id"], is_paired, isnot_paired, failed))
567
568             if is_paired > isnot_paired:
569                 self.lib_paired[lib_id] = True
570             elif is_paired < isnot_paired:
571                 self.lib_paired[lib_id] = False
572             else:
573                 raise RuntimeError("Equal number of paired & unpaired lanes."\
574                                    "Can't guess library paired status")
575             
576         return self.lib_paired[lib_id]
577
578     def get_paired_attributes(self, lib_info):
579         if lib_info['insert_size'] is None:
580             errmsg = "Library %s is missing insert_size, assuming 200"
581             logging.warn(errmsg % (lib_info["library_id"],))
582             insert_size = 200
583         else:
584             insert_size = lib_info['insert_size']
585         return {'insertLength': insert_size,
586                 'readType': '2x75'}
587
588     def get_single_attributes(self, lib_info):
589         return {'insertLength':'ilNA',
590                 'readType': '1x75D'
591                 }
592
593 def make_submission_section(line_counter, files, attributes):
594     """
595     Create a section in the submission ini file
596     """
597     inifile = [ "[line%s]" % (line_counter,) ]
598     inifile += ["files=%s" % (",".join(files))]
599
600     for k,v in attributes.items():
601         inifile += ["%s=%s" % (k,v)]
602     return inifile
603
604
605 def make_base_name(pathname):
606     base = os.path.basename(pathname)
607     name, ext = os.path.splitext(base)
608     return name
609
610
611 def make_submission_name(ininame):
612     name = make_base_name(ininame)
613     return name + ".tgz"
614
615
616 def make_ddf_name(pathname):
617     name = make_base_name(pathname)
618     return name + ".ddf"
619
620
621 def make_condor_name(pathname, run_type=None):
622     name = make_base_name(pathname)
623     elements = [name]
624     if run_type is not None:
625         elements.append(run_type)
626     elements.append("condor")
627     return ".".join(elements)
628
629
630 def parse_filelist(file_string):
631     return file_string.split(",")
632
633
634 def validate_filelist(files):
635     """
636     Die if a file doesn't exist in a file list
637     """
638     for f in files:
639         if not os.path.exists(f):
640             raise RuntimeError("%s does not exist" % (f,))
641
642 def make_md5sum(filename):
643     """Quickly find the md5sum of a file
644     """
645     md5_cache = os.path.join(filename+".md5")
646     print md5_cache
647     if os.path.exists(md5_cache):
648         logging.debug("Found md5sum in {0}".format(md5_cache))
649         stream = open(md5_cache,'r')
650         lines = stream.readlines()
651         md5sum = parse_md5sum_line(lines, filename)
652     else:
653         md5sum = make_md5sum_unix(filename, md5_cache)
654     return md5sum
655     
656 def make_md5sum_unix(filename, md5_cache):
657     cmd = ["md5sum", filename]
658     logging.debug("Running {0}".format(" ".join(cmd)))
659     p = Popen(cmd, stdout=PIPE)
660     stdin, stdout = p.communicate()
661     retcode = p.wait()
662     logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
663     if retcode != 0:
664         logging.error("Trouble with md5sum for {0}".format(filename))
665         return None
666     lines = stdin.split(os.linesep)
667     md5sum = parse_md5sum_line(lines, filename)
668     if md5sum is not None:
669         logging.debug("Caching sum in {0}".format(md5_cache))
670         stream = open(md5_cache, "w")
671         stream.write(stdin)
672         stream.close()
673     return md5sum
674
675 def parse_md5sum_line(lines, filename):
676     md5sum, md5sum_filename = lines[0].split()
677     if md5sum_filename != filename:
678         errmsg = "MD5sum and I disagre about filename. {0} != {1}"
679         logging.error(errmsg.format(filename, md5sum_filename))
680         return None
681     return md5sum
682
683 if __name__ == "__main__":
684     main()