e230e5c186d66cd06e74b74dcc2a7ca945bc31ba
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 from glob import glob
4 import json
5 import logging
6 from optparse import OptionParser
7 import os
8 from pprint import pprint, pformat
9 import shlex
10 from StringIO import StringIO
11 import time
12 import sys
13 import types
14 import urllib
15 import urllib2
16 import urlparse
17
18
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21     create_sequence_table, \
22     scan_for_sequences
23
24
25 def main(cmdline=None):
26     parser = make_parser()
27     opts, args = parser.parse_args(cmdline)
28     
29     if opts.debug:
30         logging.basicConfig(level = logging.DEBUG )
31     elif opts.verbose:
32         logging.basicConfig(level = logging.INFO )
33     else:
34         logging.basicConfig(level = logging.WARNING )
35         
36     
37     apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
38
39     if opts.host is None or opts.apiid is None or opts.apikey is None:
40         parser.error("Please specify host url, apiid, apikey")
41
42     if len(args) == 0:
43         parser.error("I need at least one library submission-dir input file")
44         
45     library_result_map = []
46     for a in args:
47         library_result_map.extend(read_library_result_map(a))
48
49     if opts.daf is not None:
50         link_daf(opts.daf, library_result_map)
51
52     if opts.fastq:
53         build_fastqs(opts.host, 
54                      apidata, 
55                      opts.sequence, 
56                      library_result_map,
57                      not opts.single)
58
59     if opts.ini:
60         make_submission_ini(opts.host, apidata, library_result_map, not opts.single)
61
62     if opts.makeddf:
63         make_all_ddfs(library_result_map, opts.daf)
64
65 def make_parser():
66     # Load defaults from the config files
67     config = SafeConfigParser()
68     config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
69     
70     sequence_archive = None
71     apiid = None
72     apikey = None
73     apihost = None
74     SECTION = 'sequence_archive'
75     if config.has_section(SECTION):
76         sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
77         sequence_archive = os.path.expanduser(sequence_archive)
78         apiid = config.get(SECTION, 'apiid', apiid)
79         apikey = config.get(SECTION, 'apikey', apikey)
80         apihost = config.get(SECTION, 'host', apihost)
81
82     parser = OptionParser()
83
84     # commands
85     parser.add_option('--fastq', help="generate scripts for making fastq files",
86                       default=False, action="store_true")
87
88     parser.add_option('--ini', help="generate submission ini file", default=False,
89                       action="store_true")
90
91     parser.add_option('--makeddf', help='make the ddfs', default=False,
92                       action="store_true")
93     
94     parser.add_option('--daf', default=None, help='specify daf name')
95
96     # configuration options
97     parser.add_option('--apiid', default=apiid, help="Specify API ID")
98     parser.add_option('--apikey', default=apikey, help="Specify API KEY")
99     parser.add_option('--host',  default=apihost,
100                       help="specify HTSWorkflow host",)
101     parser.add_option('--sequence', default=sequence_archive,
102                       help="sequence repository")
103     parser.add_option('--single', default=False, action="store_true", 
104                       help="treat the sequences as single ended runs")
105
106     # debugging
107     parser.add_option('--verbose', default=False, action="store_true",
108                       help='verbose logging')
109     parser.add_option('--debug', default=False, action="store_true",
110                       help='debug logging')
111
112     return parser
113
114 def build_fastqs(host, apidata, sequences_path, library_result_map, 
115                  paired=True ):
116     """
117     Generate condor scripts to build any needed fastq files
118     
119     Args:
120       host (str): root of the htsworkflow api server
121       apidata (dict): id & key to post to the server
122       sequences_path (str): root of the directory tree to scan for files
123       library_result_map (list):  [(library_id, destination directory), ...]
124       paired: should we assume that we are processing paired end records?
125               if False, we will treat this as single ended.
126     """
127     qseq_condor_header = """
128 Universe=vanilla
129 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
130 error=log/qseq2fastq.err.$(process).log
131 output=log/qseq2fastq.out.$(process).log
132 log=log/qseq2fastq.log
133
134 """
135     qseq_condor_entries = []
136     srf_condor_header = """
137 Universe=vanilla
138 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2named_fastq.py
139 output=log/srf_pair_fastq.out.$(process).log
140 error=log/srf_pair_fastq.err.$(process).log
141 log=log/srf_pair_fastq.log
142 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
143
144 """
145     srf_condor_entries = []
146     fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
147     fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
148     lib_db = find_archive_sequence_files(host, 
149                                          apidata, 
150                                          sequences_path, 
151                                          library_result_map)
152
153     # find what targets we're missing
154     needed_targets = {}
155     for lib_id, result_dir in library_result_map:
156         lib = lib_db[lib_id]
157         for lane_key, sequences in lib['lanes'].items():
158             for seq in sequences:
159                 if paired and seq.read is None:
160                     seq.read = 1
161                 filename_attributes = { 
162                     'flowcell': seq.flowcell,
163                     'lib_id': lib_id,
164                     'lane': seq.lane,
165                     'read': seq.read,
166                     'cycle': seq.cycle
167                     }
168                 # throw out test runs
169                 # FIXME: this should probably be configurable
170                 if seq.cycle < 50:
171                     continue
172                 if seq.flowcell == '30CUUAAXX':
173                     # 30CUUAAXX run sucked
174                     continue
175                 if seq.flowcell == '30DY0AAXX':
176                     # 30DY0 only ran for 151 bases instead of 152
177                     # it is actually 76 1st read, 75 2nd read
178                     seq.mid_point = 76
179
180                 # end filters
181                 if paired:
182                     target_name = fastq_paired_template % filename_attributes
183                 else:
184                     target_name = fastq_single_template % filename_attributes
185
186                 target_pathname = os.path.join(result_dir, target_name)
187                 if not os.path.exists(target_pathname):
188                     t = needed_targets.setdefault(target_pathname, {})
189                     t[seq.filetype] = seq
190                     
191     for target_pathname, available_sources in needed_targets.items():
192         logging.debug(' target : %s' % (target_pathname,))
193         logging.debug(' candidate sources: %s' % (available_sources,))
194         if available_sources.has_key('qseq'):
195             source = available_sources['qseq']
196             qseq_condor_entries.append(
197                 condor_qseq_to_fastq(source.path, 
198                                      target_pathname, 
199                                      source.flowcell)
200             )
201         elif available_sources.has_key('srf'):
202             source = available_sources['srf']
203             mid = getattr(source, 'mid_point', None)
204             srf_condor_entries.append(
205                 condor_srf_to_fastq(source.path, 
206                                     target_pathname,
207                                     paired,
208                                     source.flowcell,
209                                     mid)
210             )
211         else:
212             print " need file", target_pathname
213
214     if len(srf_condor_entries) > 0:
215         make_submit_script('srf.fastq.condor', 
216                            srf_condor_header,
217                            srf_condor_entries)
218
219     if len(qseq_condor_entries) > 0:
220         make_submit_script('qseq.fastq.condor', 
221                            qseq_condor_header,
222                            qseq_condor_entries)
223
224 def link_daf(daf_path, library_result_map):
225     if not os.path.exists(daf_path):
226         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
227
228     base_daf = os.path.basename(daf_path)
229     
230     for lib_id, result_dir in library_result_map:
231         submission_daf = os.path.join(result_dir, base_daf)
232         if not os.path.exists(submission_daf):
233             os.link(daf_path, submission_daf)
234
235 def make_submission_ini(host, apidata, library_result_map, paired=True):
236     # ma is "map algorithm"
237     ma = 'TH1014'
238
239     if paired:
240         aligns = "Paired"
241     else:
242         aligns = "Aligns"
243
244     attributes = {
245       # bam index
246      '.bai':                   {'view': None, 'MapAlgorithm': 'NA'},
247      '.bam':                   {'view': aligns, 'MapAlgorithm': ma},
248      '.splices.bam':           {'view': 'Splices', 'MapAlgorithm': ma},
249      '.jnct':                  {'view': 'Junctions', 'MapAlgorithm': ma},
250      '.plus.bigwig':           {'view': 'PlusSignal', 'MapAlgorithm': ma},
251      '.minus.bigwig':          {'view': 'MinusSignal', 'MapAlgorithm': ma},
252      '.bigwig':                {'view': 'Signal', 'MapAlgorithm': ma},
253      '.tar.bz2':               {'view': None},
254      '.condor':                {'view': None},
255      '.daf':                   {'view': None},
256      '.ddf':                   {'view': None},
257      'novel.genes.expr':       {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
258      'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
259      '.genes.expr':            {'view': 'GeneFPKM', 'MapAlgorithm': ma},
260      '.transcripts.expr':      {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
261      '.fastq':                 {'view': 'Fastq', 'MapAlgorithm': 'NA' },
262      '_r1.fastq':              {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
263      '_r2.fastq':              {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
264      '.gtf':                   {'view': 'GeneModel', 'MapAlgorithm': ma},
265      '.ini':                   {'view': None},
266      '.log':                   {'view': None},
267      '.stats.txt':             {'view': 'InsLength', 'MapAlgorithm': ma},
268      '.srf':                   {'view': None},
269      '.wig':                   {'view': None},
270      '.zip':                   {'view': None},
271     }
272    
273     candidate_fastq_src = {}
274
275     for lib_id, result_dir in library_result_map:
276         order_by = ['order_by=files', 'view', 'replicate', 'cell', 
277                     'readType', 'mapAlgorithm', 'insertLength' ]
278         inifile =  ['[config]']
279         inifile += [" ".join(order_by)]
280         inifile += ['']
281         line_counter = 1
282         lib_info = get_library_info(host, apidata, lib_id)
283         result_ini = os.path.join(result_dir, result_dir+'.ini')
284
285         if lib_info['cell_line'].lower() == 'unknown':
286             logging.warn("Library %s missing cell_line" % (lib_id,))
287
288         standard_attributes = {'cell': lib_info['cell_line'],
289                                'replicate': lib_info['replicate'],
290                                }
291         if paired:
292             if lib_info['insert_size'] is None:
293                 errmsg = "Library %s is missing insert_size, assuming 200"
294                 logging.warn(errmsg % (lib_id,))
295                 insert_size = 200
296             else:
297                 insert_size = lib_info['insert_size']
298             standard_attributes['insertLength'] = insert_size
299             standard_attributes['readType'] = '2x75'
300         else:
301             standard_attributes['insertLength'] = 'ilNA'
302             standard_attributes['readType'] = '1x75D'
303
304         # write other lines
305         submission_files = os.listdir(result_dir)
306         fastqs = {}
307         for f in submission_files:
308             best_ext = find_best_extension(attributes, f)
309
310             if best_ext is not None:
311                if attributes[best_ext]['view'] is None:
312                    
313                    continue
314                elif best_ext.endswith('fastq'):
315                    fastqs.setdefault(best_ext, set()).add(f)
316                else:
317                    inifile.extend(
318                        make_submission_section(line_counter,
319                                                [f],
320                                                standard_attributes,
321                                                attributes[best_ext]
322                                                )
323                        )
324                    inifile += ['']
325                    line_counter += 1
326             else:
327                 raise ValueError("Unrecognized file: %s" % (f,))
328
329         # add in fastqs on a single line.
330         for extension, fastq_set in fastqs.items():
331             inifile.extend(
332                 make_submission_section(line_counter, 
333                                         fastq_set,
334                                         standard_attributes,
335                                         attributes[extension])
336             )
337             inifile += ['']
338             line_counter += 1
339             
340         f = open(result_ini,'w')
341         f.write(os.linesep.join(inifile))
342
343
344 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
345     dag_fragment = []
346     for lib_id, result_dir in library_result_map:
347         ininame = result_dir+'.ini'
348         inipathname = os.path.join(result_dir, ininame)
349         if os.path.exists(inipathname):
350             dag_fragment.extend(
351                 make_ddf(ininame, daf_name, True, make_condor, result_dir)
352             )
353
354     if make_condor and len(dag_fragment) > 0:
355         dag_filename = 'submission.dagman'
356         if os.path.exists(dag_filename):
357             logging.warn("%s exists, please delete" % (dag_filename,))
358         else:
359             f = open(dag_filename,'w')
360             f.write( os.linesep.join(dag_fragment))
361             f.write( os.linesep )
362             f.close()
363             
364
365 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
366     """
367     Make ddf files, and bonus condor file
368     """
369     dag_fragments = []
370     curdir = os.getcwd()
371     if outdir is not None:
372         os.chdir(outdir)
373     output = sys.stdout
374     ddf_name = None
375     if guess_ddf:
376         ddf_name = make_ddf_name(ininame)
377         print ddf_name
378         output = open(ddf_name,'w')
379
380     file_list = read_ddf_ini(ininame, output)
381
382     file_list.append(daf_name)
383     if ddf_name is not None:
384         file_list.append(ddf_name)
385
386     if make_condor:
387         archive_condor = make_condor_archive_script(ininame, file_list)
388         upload_condor = make_condor_upload_script(ininame)
389         
390         dag_fragments.extend( 
391             make_dag_fragment(ininame, archive_condor, upload_condor)
392         ) 
393         
394     os.chdir(curdir)
395     
396     return dag_fragments
397
398 def read_ddf_ini(filename, output=sys.stdout):
399     """
400     Read a ini file and dump out a tab delmited text file
401     """
402     file_list = []
403     config = SafeConfigParser()
404     config.read(filename)
405
406     order_by = shlex.split(config.get("config", "order_by"))
407
408     output.write("\t".join(order_by))
409     output.write(os.linesep)
410     sections = config.sections()
411     sections.sort()
412     for section in sections:
413         if section == "config":
414             # skip the config block
415             continue
416         values = []
417         for key in order_by:
418             v = config.get(section, key)
419             values.append(v)
420             if key == 'files':
421                 file_list.extend(parse_filelist(v))
422                 
423         output.write("\t".join(values))
424         output.write(os.linesep)
425     return file_list
426     
427 def read_library_result_map(filename):
428     """
429     Read a file that maps library id to result directory.
430     Does not support spaces in filenames. 
431     
432     For example:
433       10000 result/foo/bar
434     """
435     stream = open(filename,'r')
436
437     results = []
438     for line in stream:
439         if not line.startswith('#'):
440             library_id, result_dir = line.strip().split()
441             results.append((library_id, result_dir))
442     return results
443             
444 def make_condor_archive_script(ininame, files):
445     script = """Universe = vanilla
446
447 Executable = /bin/tar
448 arguments = czvf ../%(archivename)s %(filelist)s
449
450 Error = compress.err.$(Process).log
451 Output = compress.out.$(Process).log
452 Log = /tmp/submission-compress.log
453 initialdir = %(initialdir)s
454
455 queue 
456 """
457     for f in files:
458         if not os.path.exists(f):
459             raise RuntimeError("Missing %s" % (f,))
460
461     context = {'archivename': make_submission_name(ininame),
462                'filelist': " ".join(files),
463                'initialdir': os.getcwd()}
464
465     condor_script = make_condor_name(ininame, 'archive')
466     condor_stream = open(condor_script,'w')
467     condor_stream.write(script % context)
468     condor_stream.close()
469     return condor_script
470
471 def make_condor_upload_script(ininame):
472     script = """Universe = vanilla
473
474 Executable = /usr/bin/lftp
475 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
476
477 Error = upload.err.$(Process).log
478 Output = upload.out.$(Process).log
479 Log = /tmp/submission-upload.log
480 initialdir = %(initialdir)s
481
482 queue 
483 """
484     context = {'archivename': make_submission_name(ininame),
485                'initialdir': os.getcwd()}
486
487     condor_script = make_condor_name(ininame, 'upload')
488     condor_stream = open(condor_script,'w')
489     condor_stream.write(script % context)
490     condor_stream.close()
491     return condor_script
492
493 def make_dag_fragment(ininame, archive_condor, upload_condor):
494     """
495     Make the couple of fragments compress and then upload the data.
496     """
497     cur_dir = os.getcwd()
498     archive_condor = os.path.join(cur_dir, archive_condor)
499     upload_condor = os.path.join(cur_dir, upload_condor)
500     job_basename = make_base_name(ininame)
501
502     fragments = []
503     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
504     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
505     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
506
507     return fragments
508
509 def get_library_info(host, apidata, library_id):
510     url = api.library_url(host, library_id)
511     contents = api.retrieve_info(url, apidata)
512     return contents
513
514 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
515                         mid=None):
516     args = ['-c', srf_file, ]
517     if paired:
518         args.extend(['--left', target_pathname])
519         # this is ugly. I did it because I was pregenerating the target
520         # names before I tried to figure out what sources could generate
521         # those targets, and everything up to this point had been
522         # one-to-one. So I couldn't figure out how to pair the 
523         # target names. 
524         # With this at least the command will run correctly.
525         # however if we rename the default targets, this'll break
526         # also I think it'll generate it twice.
527         args.extend(['--right', 
528                      target_pathname.replace('_r1.fastq', '_r2.fastq')])
529     else:
530         args.extend(['--single', target_pathname ])
531     if flowcell is not None:
532         args.extend(['--flowcell', flowcell])
533
534     if mid is not None:
535         args.extend(['-m', str(mid)])
536
537     script = """
538 arguments="%s"
539 queue
540 """ % (" ".join(args),)
541     
542     return  script 
543
544 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None):
545     args = ['-i', qseq_file, '-o', target_pathname ]
546     if flowcell is not None:
547         args.extend(['-f', flowcell])
548     script = """
549 arguments="%s"
550 queue
551 """ % (" ".join(args))
552
553     return script 
554
555 def find_archive_sequence_files(host, apidata, sequences_path, 
556                                 library_result_map):
557     """
558     Find all the archive sequence files possibly associated with our results.
559
560     """
561     logging.debug("Searching for sequence files in: %s" %(sequences_path,))
562
563     lib_db = {}
564     seq_dirs = set()
565     #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
566     candidate_lanes = {}
567     for lib_id, result_dir in library_result_map:
568         lib_info = get_library_info(host, apidata, lib_id)
569         lib_db[lib_id] = lib_info
570
571         for lane in lib_info['lane_set']:
572             lane_key = (lane['flowcell'], lane['lane_number'])
573             candidate_lanes[lane_key] = lib_id
574             seq_dirs.add(os.path.join(sequences_path, 
575                                          'flowcells', 
576                                          lane['flowcell']))
577     logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
578     candidate_seq_list = scan_for_sequences(seq_dirs)
579
580     # at this point we have too many sequences as scan_for_sequences
581     # returns all the sequences in a flowcell directory
582     # so lets filter out the extras
583     
584     for seq in candidate_seq_list:
585         lane_key = (seq.flowcell, seq.lane)
586         lib_id = candidate_lanes.get(lane_key, None)
587         if lib_id is not None:
588             lib_info = lib_db[lib_id]
589             lanes = lib_info.setdefault('lanes', {})
590             lanes.setdefault(lane_key, set()).add(seq)
591     
592     return lib_db
593
594 def find_best_extension(extension_map, filename):
595     """
596     Search through extension_map looking for the best extension
597     The 'best' is the longest match
598
599     :Args:
600       extension_map (dict): '.ext' -> { 'view': 'name' or None }
601       filename (str): the filename whose extention we are about to examine
602     """
603     best_ext = None
604     path, last_ext = os.path.splitext(filename)
605
606     for ext in extension_map.keys():
607         if filename.endswith(ext):
608             if best_ext is None:
609                 best_ext = ext
610             elif len(ext) > len(best_ext):
611                 best_ext = ext
612     return best_ext
613     
614 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
615     """
616     Create a section in the submission ini file
617     """
618     inifile = [ '[line%s]' % (line_counter,) ]
619     inifile += ["files=%s" % (",".join(files))]
620     cur_attributes = {}
621     cur_attributes.update(standard_attributes)
622     cur_attributes.update(file_attributes)
623     
624     for k,v in cur_attributes.items():
625         inifile += ["%s=%s" % (k,v)]
626     return inifile
627
628 def make_base_name(pathname):
629     base = os.path.basename(pathname)
630     name, ext = os.path.splitext(base)
631     return name
632
633 def make_submission_name(ininame):
634     name = make_base_name(ininame)
635     return name + '.tgz'
636
637 def make_ddf_name(pathname):
638     name = make_base_name(pathname)
639     return name + '.ddf'
640
641 def make_condor_name(pathname, run_type=None):
642     name = make_base_name(pathname)
643     elements = [name]
644     if run_type is not None:
645         elements.append(run_type)
646     elements.append('condor')
647     return ".".join(elements)
648     
649 def make_submit_script(target, header, body_list):
650     """
651     write out a text file
652
653     this was intended for condor submit scripts
654     
655     Args:
656       target (str or stream): 
657         if target is a string, we will open and close the file
658         if target is a stream, the caller is responsible.
659
660       header (str);
661         header to write at the beginning of the file
662       body_list (list of strs):
663         a list of blocks to add to the file.
664     """
665     if type(target) in types.StringTypes:
666         f = open(target,'w')
667     else:
668         f = target
669     f.write(header)
670     for entry in body_list:
671         f.write(entry)
672     if type(target) in types.StringTypes:
673         f.close()
674
675 def parse_filelist(file_string):
676     return file_string.split(',')
677
678 def validate_filelist(files):
679     """
680     Die if a file doesn't exist in a file list
681     """
682     for f in files:
683         if not os.path.exists(f):
684             raise RuntimeError("%s does not exist" % (f,))
685         
686 if __name__ == "__main__":
687     main()