3830dbef7d49b0f78d820662c0e1aaadf01335ee
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 from glob import glob
4 import json
5 import logging
6 from optparse import OptionParser
7 import os
8 from pprint import pprint, pformat
9 import shlex
10 from StringIO import StringIO
11 import time
12 import sys
13 import types
14 import urllib
15 import urllib2
16 import urlparse
17
18
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21     create_sequence_table, \
22     scan_for_sequences
23
24
25 def main(cmdline=None):
26     parser = make_parser()
27     opts, args = parser.parse_args(cmdline)
28     
29     if opts.debug:
30         logging.basicConfig(level = logging.DEBUG )
31     elif opts.verbose:
32         logging.basicConfig(level = logging.INFO )
33     else:
34         logging.basicConfig(level = logging.WARNING )
35         
36     
37     apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
38
39     if opts.host is None or opts.apiid is None or opts.apikey is None:
40         parser.error("Please specify host url, apiid, apikey")
41
42     if len(args) == 0:
43         parser.error("I need at least one library submission-dir input file")
44         
45     library_result_map = []
46     for a in args:
47         library_result_map.extend(read_library_result_map(a))
48
49     if opts.daf is not None:
50         link_daf(opts.daf, library_result_map)
51
52     if opts.fastq:
53         build_fastqs(opts.host, 
54                      apidata, 
55                      opts.sequence, 
56                      library_result_map,
57                      force=opts.force)
58
59     if opts.ini:
60         make_submission_ini(opts.host, apidata, library_result_map)
61
62     if opts.makeddf:
63         make_all_ddfs(library_result_map, opts.daf)
64
65 def make_parser():
66     # Load defaults from the config files
67     config = SafeConfigParser()
68     config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
69     
70     sequence_archive = None
71     apiid = None
72     apikey = None
73     apihost = None
74     SECTION = 'sequence_archive'
75     if config.has_section(SECTION):
76         sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
77         sequence_archive = os.path.expanduser(sequence_archive)
78         apiid = config.get(SECTION, 'apiid', apiid)
79         apikey = config.get(SECTION, 'apikey', apikey)
80         apihost = config.get(SECTION, 'host', apihost)
81
82     parser = OptionParser()
83
84     # commands
85     parser.add_option('--fastq', help="generate scripts for making fastq files",
86                       default=False, action="store_true")
87
88     parser.add_option('--ini', help="generate submission ini file", default=False,
89                       action="store_true")
90
91     parser.add_option('--makeddf', help='make the ddfs', default=False,
92                       action="store_true")
93     
94     parser.add_option('--daf', default=None, help='specify daf name')
95     parser.add_option('--force', default=False, action="store_true",
96                       help="Force regenerating fastqs")
97
98     # configuration options
99     parser.add_option('--apiid', default=apiid, help="Specify API ID")
100     parser.add_option('--apikey', default=apikey, help="Specify API KEY")
101     parser.add_option('--host',  default=apihost,
102                       help="specify HTSWorkflow host",)
103     parser.add_option('--sequence', default=sequence_archive,
104                       help="sequence repository")
105
106     # debugging
107     parser.add_option('--verbose', default=False, action="store_true",
108                       help='verbose logging')
109     parser.add_option('--debug', default=False, action="store_true",
110                       help='debug logging')
111
112     return parser
113
114 def build_fastqs(host, apidata, sequences_path, library_result_map, 
115                  force=False ):
116     """
117     Generate condor scripts to build any needed fastq files
118     
119     Args:
120       host (str): root of the htsworkflow api server
121       apidata (dict): id & key to post to the server
122       sequences_path (str): root of the directory tree to scan for files
123       library_result_map (list):  [(library_id, destination directory), ...]
124     """
125     qseq_condor_header = """
126 Universe=vanilla
127 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
128 error=log/qseq2fastq.err.$(process).log
129 output=log/qseq2fastq.out.$(process).log
130 log=log/qseq2fastq.log
131
132 """
133     qseq_condor_entries = []
134     srf_condor_header = """
135 Universe=vanilla
136 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2named_fastq.py
137 output=log/srf_pair_fastq.out.$(process).log
138 error=log/srf_pair_fastq.err.$(process).log
139 log=log/srf_pair_fastq.log
140 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
141
142 """
143     srf_condor_entries = []
144     fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
145     fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
146     lib_db = find_archive_sequence_files(host, 
147                                          apidata, 
148                                          sequences_path, 
149                                          library_result_map)
150
151     # find what targets we're missing
152     needed_targets = {}
153     for lib_id, result_dir in library_result_map:
154         lib = lib_db[lib_id]
155         lane_dict = make_lane_dict(lib_db, lib_id)
156
157         for lane_key, sequences in lib['lanes'].items():
158             for seq in sequences:
159                 paired = lane_dict[seq.flowcell]['paired_end']
160                 if paired and seq.read is None:
161                     seq.read = 1
162                 filename_attributes = { 
163                     'flowcell': seq.flowcell,
164                     'lib_id': lib_id,
165                     'lane': seq.lane,
166                     'read': seq.read,
167                     'cycle': seq.cycle
168                     }
169                 # throw out test runs
170                 # FIXME: this should probably be configurable
171                 if seq.cycle < 50:
172                     continue
173                 if seq.flowcell == '30CUUAAXX':
174                     # 30CUUAAXX run sucked
175                     continue
176                 if seq.flowcell == '30DY0AAXX':
177                     # 30DY0 only ran for 151 bases instead of 152
178                     # it is actually 76 1st read, 75 2nd read
179                     seq.mid_point = 76
180
181                 # end filters
182                 if paired:
183                     target_name = fastq_paired_template % filename_attributes
184                 else:
185                     target_name = fastq_single_template % filename_attributes
186
187                 target_pathname = os.path.join(result_dir, target_name)
188                 if force or not os.path.exists(target_pathname):
189                     t = needed_targets.setdefault(target_pathname, {})
190                     t[seq.filetype] = seq
191
192     for target_pathname, available_sources in needed_targets.items():
193         logging.debug(' target : %s' % (target_pathname,))
194         logging.debug(' candidate sources: %s' % (available_sources,))
195         if available_sources.has_key('qseq'):
196             source = available_sources['qseq']
197             qseq_condor_entries.append(
198                 condor_qseq_to_fastq(source.path, 
199                                      target_pathname, 
200                                      source.flowcell,
201                                      force=force)
202             )
203         elif available_sources.has_key('srf'):
204             source = available_sources['srf']
205             mid = getattr(source, 'mid_point', None)
206             srf_condor_entries.append(
207                 condor_srf_to_fastq(source.path, 
208                                     target_pathname,
209                                     paired,
210                                     source.flowcell,
211                                     mid,
212                                     force=force)
213             )
214         else:
215             print " need file", target_pathname
216
217     if len(srf_condor_entries) > 0:
218         make_submit_script('srf.fastq.condor', 
219                            srf_condor_header,
220                            srf_condor_entries)
221
222     if len(qseq_condor_entries) > 0:
223         make_submit_script('qseq.fastq.condor', 
224                            qseq_condor_header,
225                            qseq_condor_entries)
226
227 def link_daf(daf_path, library_result_map):
228     if not os.path.exists(daf_path):
229         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
230
231     base_daf = os.path.basename(daf_path)
232     
233     for lib_id, result_dir in library_result_map:
234         submission_daf = os.path.join(result_dir, base_daf)
235         if not os.path.exists(submission_daf):
236             os.link(daf_path, submission_daf)
237
238 def make_submission_ini(host, apidata, library_result_map, paired=True):
239     # ma is "map algorithm"
240     ma = 'TH1014'
241
242     if paired:
243         aligns = "Paired"
244     else:
245         aligns = "Aligns"
246
247     attributes = {
248       # bam index
249      '.bai':                   {'view': None, 'MapAlgorithm': 'NA'},
250      '.bam':                   {'view': aligns, 'MapAlgorithm': ma},
251      '.splices.bam':           {'view': 'Splices', 'MapAlgorithm': ma},
252      '.jnct':                  {'view': 'Junctions', 'MapAlgorithm': ma},
253      '.plus.bigwig':           {'view': 'PlusSignal', 'MapAlgorithm': ma},
254      '.minus.bigwig':          {'view': 'MinusSignal', 'MapAlgorithm': ma},
255      '.bigwig':                {'view': 'Signal', 'MapAlgorithm': ma},
256      '.tar.bz2':               {'view': None},
257      '.condor':                {'view': None},
258      '.daf':                   {'view': None},
259      '.ddf':                   {'view': None},
260      'denovo.genes.expr':       {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
261      'denovo.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
262      'novel.genes.expr':       {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
263      'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
264      '.genes.expr':            {'view': 'GeneFPKM', 'MapAlgorithm': ma},
265      '.transcripts.expr':      {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
266      '.transcript.expr':       {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
267      '.fastq':                 {'view': 'Fastq', 'MapAlgorithm': 'NA' },
268      '_r1.fastq':              {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
269      '_r2.fastq':              {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
270      '.gtf':                   {'view': 'GeneModel', 'MapAlgorithm': ma},
271      '.ini':                   {'view': None},
272      '.log':                   {'view': None},
273      '.stats.txt':             {'view': 'InsLength', 'MapAlgorithm': ma},
274      '.srf':                   {'view': None},
275      '.wig':                   {'view': None},
276      '.zip':                   {'view': None},
277     }
278    
279     candidate_fastq_src = {}
280
281     for lib_id, result_dir in library_result_map:
282         order_by = ['order_by=files', 'view', 'replicate', 'cell', 
283                     'readType', 'mapAlgorithm', 'insertLength' ]
284         inifile =  ['[config]']
285         inifile += [" ".join(order_by)]
286         inifile += ['']
287         line_counter = 1
288         lib_info = get_library_info(host, apidata, lib_id)
289         result_ini = os.path.join(result_dir, result_dir+'.ini')
290
291         if lib_info['cell_line'].lower() == 'unknown':
292             logging.warn("Library %s missing cell_line" % (lib_id,))
293
294         standard_attributes = {'cell': lib_info['cell_line'],
295                                'replicate': lib_info['replicate'],
296                                }
297         if paired:
298             if lib_info['insert_size'] is None:
299                 errmsg = "Library %s is missing insert_size, assuming 200"
300                 logging.warn(errmsg % (lib_id,))
301                 insert_size = 200
302             else:
303                 insert_size = lib_info['insert_size']
304             standard_attributes['insertLength'] = insert_size
305             standard_attributes['readType'] = '2x75'
306         else:
307             standard_attributes['insertLength'] = 'ilNA'
308             standard_attributes['readType'] = '1x75D'
309
310         # write other lines
311         submission_files = os.listdir(result_dir)
312         fastqs = {}
313         for f in submission_files:
314             best_ext = find_best_extension(attributes, f)
315
316             if best_ext is not None:
317                if attributes[best_ext]['view'] is None:
318                    
319                    continue
320                elif best_ext.endswith('fastq'):
321                    fastqs.setdefault(best_ext, set()).add(f)
322                else:
323                    inifile.extend(
324                        make_submission_section(line_counter,
325                                                [f],
326                                                standard_attributes,
327                                                attributes[best_ext]
328                                                )
329                        )
330                    inifile += ['']
331                    line_counter += 1
332             else:
333                 raise ValueError("Unrecognized file: %s" % (f,))
334
335         # add in fastqs on a single line.
336         for extension, fastq_set in fastqs.items():
337             inifile.extend(
338                 make_submission_section(line_counter, 
339                                         fastq_set,
340                                         standard_attributes,
341                                         attributes[extension])
342             )
343             inifile += ['']
344             line_counter += 1
345             
346         f = open(result_ini,'w')
347         f.write(os.linesep.join(inifile))
348
349 def make_lane_dict(lib_db, lib_id):
350     """
351     Convert the lane_set in a lib_db to a dictionary
352     indexed by flowcell ID
353     """
354     result = []
355     for lane in lib_db[lib_id]['lane_set']:
356         result.append((lane['flowcell'], lane))
357     return dict(result)
358
359 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
360     dag_fragment = []
361     for lib_id, result_dir in library_result_map:
362         ininame = result_dir+'.ini'
363         inipathname = os.path.join(result_dir, ininame)
364         if os.path.exists(inipathname):
365             dag_fragment.extend(
366                 make_ddf(ininame, daf_name, True, make_condor, result_dir)
367             )
368
369     if make_condor and len(dag_fragment) > 0:
370         dag_filename = 'submission.dagman'
371         if os.path.exists(dag_filename):
372             logging.warn("%s exists, please delete" % (dag_filename,))
373         else:
374             f = open(dag_filename,'w')
375             f.write( os.linesep.join(dag_fragment))
376             f.write( os.linesep )
377             f.close()
378             
379
380 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
381     """
382     Make ddf files, and bonus condor file
383     """
384     dag_fragments = []
385     curdir = os.getcwd()
386     if outdir is not None:
387         os.chdir(outdir)
388     output = sys.stdout
389     ddf_name = None
390     if guess_ddf:
391         ddf_name = make_ddf_name(ininame)
392         print ddf_name
393         output = open(ddf_name,'w')
394
395     file_list = read_ddf_ini(ininame, output)
396
397     file_list.append(daf_name)
398     if ddf_name is not None:
399         file_list.append(ddf_name)
400
401     if make_condor:
402         archive_condor = make_condor_archive_script(ininame, file_list)
403         upload_condor = make_condor_upload_script(ininame)
404         
405         dag_fragments.extend( 
406             make_dag_fragment(ininame, archive_condor, upload_condor)
407         ) 
408         
409     os.chdir(curdir)
410     
411     return dag_fragments
412
413 def read_ddf_ini(filename, output=sys.stdout):
414     """
415     Read a ini file and dump out a tab delmited text file
416     """
417     file_list = []
418     config = SafeConfigParser()
419     config.read(filename)
420
421     order_by = shlex.split(config.get("config", "order_by"))
422
423     output.write("\t".join(order_by))
424     output.write(os.linesep)
425     sections = config.sections()
426     sections.sort()
427     for section in sections:
428         if section == "config":
429             # skip the config block
430             continue
431         values = []
432         for key in order_by:
433             v = config.get(section, key)
434             values.append(v)
435             if key == 'files':
436                 file_list.extend(parse_filelist(v))
437                 
438         output.write("\t".join(values))
439         output.write(os.linesep)
440     return file_list
441     
442 def read_library_result_map(filename):
443     """
444     Read a file that maps library id to result directory.
445     Does not support spaces in filenames. 
446     
447     For example:
448       10000 result/foo/bar
449     """
450     stream = open(filename,'r')
451
452     results = []
453     for line in stream:
454         if not line.startswith('#'):
455             library_id, result_dir = line.strip().split()
456             results.append((library_id, result_dir))
457     return results
458             
459 def make_condor_archive_script(ininame, files):
460     script = """Universe = vanilla
461
462 Executable = /bin/tar
463 arguments = czvf ../%(archivename)s %(filelist)s
464
465 Error = compress.err.$(Process).log
466 Output = compress.out.$(Process).log
467 Log = /tmp/submission-compress.log
468 initialdir = %(initialdir)s
469
470 queue 
471 """
472     for f in files:
473         if not os.path.exists(f):
474             raise RuntimeError("Missing %s" % (f,))
475
476     context = {'archivename': make_submission_name(ininame),
477                'filelist': " ".join(files),
478                'initialdir': os.getcwd()}
479
480     condor_script = make_condor_name(ininame, 'archive')
481     condor_stream = open(condor_script,'w')
482     condor_stream.write(script % context)
483     condor_stream.close()
484     return condor_script
485
486 def make_condor_upload_script(ininame):
487     script = """Universe = vanilla
488
489 Executable = /usr/bin/lftp
490 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
491
492 Error = upload.err.$(Process).log
493 Output = upload.out.$(Process).log
494 Log = /tmp/submission-upload.log
495 initialdir = %(initialdir)s
496
497 queue 
498 """
499     context = {'archivename': make_submission_name(ininame),
500                'initialdir': os.getcwd()}
501
502     condor_script = make_condor_name(ininame, 'upload')
503     condor_stream = open(condor_script,'w')
504     condor_stream.write(script % context)
505     condor_stream.close()
506     return condor_script
507
508 def make_dag_fragment(ininame, archive_condor, upload_condor):
509     """
510     Make the couple of fragments compress and then upload the data.
511     """
512     cur_dir = os.getcwd()
513     archive_condor = os.path.join(cur_dir, archive_condor)
514     upload_condor = os.path.join(cur_dir, upload_condor)
515     job_basename = make_base_name(ininame)
516
517     fragments = []
518     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
519     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
520     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
521
522     return fragments
523
524 def get_library_info(host, apidata, library_id):
525     url = api.library_url(host, library_id)
526     contents = api.retrieve_info(url, apidata)
527     return contents
528
529 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
530                         mid=None, force=False):
531     args = [ srf_file, ]
532     if paired:
533         args.extend(['--left', target_pathname])
534         # this is ugly. I did it because I was pregenerating the target
535         # names before I tried to figure out what sources could generate
536         # those targets, and everything up to this point had been
537         # one-to-one. So I couldn't figure out how to pair the 
538         # target names. 
539         # With this at least the command will run correctly.
540         # however if we rename the default targets, this'll break
541         # also I think it'll generate it twice.
542         args.extend(['--right', 
543                      target_pathname.replace('_r1.fastq', '_r2.fastq')])
544     else:
545         args.extend(['--single', target_pathname ])
546     if flowcell is not None:
547         args.extend(['--flowcell', flowcell])
548
549     if mid is not None:
550         args.extend(['-m', str(mid)])
551
552     if force:
553         args.extend(['--force'])
554
555     script = """
556 arguments="%s"
557 queue
558 """ % (" ".join(args),)
559     
560     return  script 
561
562 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
563     args = ['-i', qseq_file, '-o', target_pathname ]
564     if flowcell is not None:
565         args.extend(['-f', flowcell])
566     script = """
567 arguments="%s"
568 queue
569 """ % (" ".join(args))
570
571     return script 
572
573 def find_archive_sequence_files(host, apidata, sequences_path, 
574                                 library_result_map):
575     """
576     Find all the archive sequence files possibly associated with our results.
577
578     """
579     logging.debug("Searching for sequence files in: %s" %(sequences_path,))
580
581     lib_db = {}
582     seq_dirs = set()
583     #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
584     candidate_lanes = {}
585     for lib_id, result_dir in library_result_map:
586         lib_info = get_library_info(host, apidata, lib_id)
587         lib_db[lib_id] = lib_info
588
589         for lane in lib_info['lane_set']:
590             lane_key = (lane['flowcell'], lane['lane_number'])
591             candidate_lanes[lane_key] = lib_id
592             seq_dirs.add(os.path.join(sequences_path, 
593                                          'flowcells', 
594                                          lane['flowcell']))
595     logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
596     candidate_seq_list = scan_for_sequences(seq_dirs)
597
598     # at this point we have too many sequences as scan_for_sequences
599     # returns all the sequences in a flowcell directory
600     # so lets filter out the extras
601     
602     for seq in candidate_seq_list:
603         lane_key = (seq.flowcell, seq.lane)
604         lib_id = candidate_lanes.get(lane_key, None)
605         if lib_id is not None:
606             lib_info = lib_db[lib_id]
607             lanes = lib_info.setdefault('lanes', {})
608             lanes.setdefault(lane_key, set()).add(seq)
609     
610     return lib_db
611
612 def find_best_extension(extension_map, filename):
613     """
614     Search through extension_map looking for the best extension
615     The 'best' is the longest match
616
617     :Args:
618       extension_map (dict): '.ext' -> { 'view': 'name' or None }
619       filename (str): the filename whose extention we are about to examine
620     """
621     best_ext = None
622     path, last_ext = os.path.splitext(filename)
623
624     for ext in extension_map.keys():
625         if filename.endswith(ext):
626             if best_ext is None:
627                 best_ext = ext
628             elif len(ext) > len(best_ext):
629                 best_ext = ext
630     return best_ext
631     
632 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
633     """
634     Create a section in the submission ini file
635     """
636     inifile = [ '[line%s]' % (line_counter,) ]
637     inifile += ["files=%s" % (",".join(files))]
638     cur_attributes = {}
639     cur_attributes.update(standard_attributes)
640     cur_attributes.update(file_attributes)
641     
642     for k,v in cur_attributes.items():
643         inifile += ["%s=%s" % (k,v)]
644     return inifile
645
646 def make_base_name(pathname):
647     base = os.path.basename(pathname)
648     name, ext = os.path.splitext(base)
649     return name
650
651 def make_submission_name(ininame):
652     name = make_base_name(ininame)
653     return name + '.tgz'
654
655 def make_ddf_name(pathname):
656     name = make_base_name(pathname)
657     return name + '.ddf'
658
659 def make_condor_name(pathname, run_type=None):
660     name = make_base_name(pathname)
661     elements = [name]
662     if run_type is not None:
663         elements.append(run_type)
664     elements.append('condor')
665     return ".".join(elements)
666     
667 def make_submit_script(target, header, body_list):
668     """
669     write out a text file
670
671     this was intended for condor submit scripts
672     
673     Args:
674       target (str or stream): 
675         if target is a string, we will open and close the file
676         if target is a stream, the caller is responsible.
677
678       header (str);
679         header to write at the beginning of the file
680       body_list (list of strs):
681         a list of blocks to add to the file.
682     """
683     if type(target) in types.StringTypes:
684         f = open(target,'w')
685     else:
686         f = target
687     f.write(header)
688     for entry in body_list:
689         f.write(entry)
690     if type(target) in types.StringTypes:
691         f.close()
692
693 def parse_filelist(file_string):
694     return file_string.split(',')
695
696 def validate_filelist(files):
697     """
698     Die if a file doesn't exist in a file list
699     """
700     for f in files:
701         if not os.path.exists(f):
702             raise RuntimeError("%s does not exist" % (f,))
703         
704 if __name__ == "__main__":
705     main()