Put partial support back in for srf files.
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 from glob import glob
4 import json
5 import logging
6 from optparse import OptionParser
7 import os
8 from pprint import pprint, pformat
9 import shlex
10 from StringIO import StringIO
11 import time
12 import sys
13 import types
14 import urllib
15 import urllib2
16 import urlparse
17
18
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21     create_sequence_table, \
22     scan_for_sequences
23
24 def make_submission_name(ininame):
25     base = os.path.basename(ininame)
26     name, ext = os.path.splitext(base)
27     return name + '.tgz'
28
29 def make_ddf_name(pathname):
30     base = os.path.basename(pathname)
31     name, ext = os.path.splitext(base)
32     return name + '.ddf'
33
34 def make_condor_name(pathname):
35     base = os.path.basename(pathname)
36     name, ext = os.path.splitext(base)
37     return name + '.condor'
38     
39 def make_submit_script(target, header, body_list):
40     """
41     write out a text file
42
43     this was intended for condor submit scripts
44     
45     Args:
46       target (str or stream): 
47         if target is a string, we will open and close the file
48         if target is a stream, the caller is responsible.
49
50       header (str);
51         header to write at the beginning of the file
52       body_list (list of strs):
53         a list of blocks to add to the file.
54     """
55     if type(target) in types.StringTypes:
56         f = open(target,'w')
57     else:
58         f = target
59     f.write(header)
60     for entry in body_list:
61         f.write(entry)
62     if type(target) in types.StringTypes:
63         f.close()
64
65 def parse_filelist(file_string):
66     return file_string.split(',')
67
68 def validate_filelist(files):
69     """
70     Die if a file doesn't exist in a file list
71     """
72     for f in files:
73         if not os.path.exists(f):
74             raise RuntimeError("%s does not exist" % (f,))
75
76 def read_ddf_ini(filename, output=sys.stdout):
77     """
78     Read a ini file and dump out a tab delmited text file
79     """
80     file_list = []
81     config = SafeConfigParser()
82     config.read(filename)
83
84     order_by = shlex.split(config.get("config", "order_by"))
85
86     output.write("\t".join(order_by))
87     output.write(os.linesep)
88     sections = config.sections()
89     sections.sort()
90     for section in sections:
91         if section == "config":
92             # skip the config block
93             continue
94         values = []
95         for key in order_by:
96             v = config.get(section, key)
97             values.append(v)
98             if key == 'files':
99                 file_list.extend(parse_filelist(v))
100                 
101         output.write("\t".join(values))
102         output.write(os.linesep)
103     return file_list
104             
105 def make_condor_archive_script(ininame, files):
106     script = """Universe = vanilla
107
108 Executable = /bin/tar
109 arguments = czvf ../%(archivename)s %(filelist)s
110
111 Error = compress.err.$(Process).log
112 Output = compress.out.$(Process).log
113 Log = compress.log
114 initialdir = %(initialdir)s
115
116 queue 
117 """
118     for f in files:
119         if not os.path.exists(f):
120             raise RuntimeError("Missing %s" % (f,))
121
122     context = {'archivename': make_submission_name(ininame),
123                'filelist': " ".join(files),
124                'initialdir': os.getcwd()}
125
126     condor_script = make_condor_name(ininame)
127     condor_stream = open(condor_script,'w')
128     condor_stream.write(script % context)
129     condor_stream.close()
130
131 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
132     """
133     Make ddf files, and bonus condor file
134     """
135     curdir = os.getcwd()
136     if outdir is not None:
137         os.chdir(outdir)
138     output = sys.stdout
139     ddf_name = None
140     if guess_ddf:
141         ddf_name = make_ddf_name(ininame)
142         print ddf_name
143         output = open(ddf_name,'w')
144
145     file_list = read_ddf_ini(ininame, output)
146
147     file_list.append(daf_name)
148     if ddf_name is not None:
149         file_list.append(ddf_name)
150
151     if make_condor:
152         make_condor_archive_script(ininame, file_list)
153         
154     os.chdir(curdir)
155
156
157 def get_library_info(host, apidata, library_id):
158     url = api.library_url(host, library_id)
159     contents = api.retrieve_info(url, apidata)
160     return contents
161     
162 def read_library_result_map(filename):
163     stream = open(filename,'r')
164
165     results = []
166     for line in stream:
167         library_id, result_dir = line.strip().split()
168         results.append((library_id, result_dir))
169     return results
170
171 def condor_srf_to_fastq(srf_file, target_pathname):
172     script = """output=%(target_pathname)s
173 arguments="-c %(srf_file)s"
174 queue
175 """
176     params = {'srf_file': srf_file,
177               'target_pathname': target_pathname}
178     
179     return  script % params
180
181 def condor_qseq_to_fastq(qseq_file, target_pathname):
182     script = """
183 arguments="-i %(qseq_file)s -o %(target_pathname)s"
184 queue
185 """
186     params = {'qseq_file': qseq_file,
187               'target_pathname': target_pathname}
188     
189     return script % params
190
191 def find_archive_sequence_files(host, apidata, sequences_path, 
192                                 library_result_map):
193     """
194     Find all the archive sequence files possibly associated with our results.
195
196     """
197     logging.debug("Searching for sequence files in: %s" %(sequences_path,))
198
199     lib_db = {}
200     seq_dirs = set()
201     #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
202     candidate_lanes = {}
203     for lib_id, result_dir in library_result_map:
204         lib_info = get_library_info(host, apidata, lib_id)
205         lib_db[lib_id] = lib_info
206
207         for lane in lib_info['lane_set']:
208             lane_key = (lane['flowcell'], lane['lane_number'])
209             candidate_lanes[lane_key] = lib_id
210             seq_dirs.add(os.path.join(sequences_path, 
211                                          'flowcells', 
212                                          lane['flowcell']))
213     logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
214     candidate_seq_list = scan_for_sequences(seq_dirs)
215
216     # at this point we have too many sequences as scan_for_sequences
217     # returns all the sequences in a flowcell directory
218     # so lets filter out the extras
219     
220     for seq in candidate_seq_list:
221         lane_key = (seq.flowcell, seq.lane)
222         lib_id = candidate_lanes.get(lane_key, None)
223         if lib_id is not None:
224             lib_info = lib_db[lib_id]
225             lanes = lib_info.setdefault('lanes', {})
226             lanes.setdefault(lane_key, set()).add(seq)
227     
228     return lib_db
229
230 def build_fastqs(host, apidata, sequences_path, library_result_map, 
231                  paired=True ):
232     """
233     Generate condor scripts to build any needed fastq files
234     
235     Args:
236       host (str): root of the htsworkflow api server
237       apidata (dict): id & key to post to the server
238       sequences_path (str): root of the directory tree to scan for files
239       library_result_map (list):  [(library_id, destination directory), ...]
240       paired: should we assume that we are processing paired end records?
241               if False, we will treat this as single ended.
242     """
243     qseq_condor_header = """
244 Universe=vanilla
245 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/gaworkflow/scripts/qseq2fastq
246 error=qseqfastq.err.$(process).log
247 output=qseqfastq.out.$(process).log
248 log=qseqfastq.log
249
250 """
251     qseq_condor_entries = []
252     srf_condor_header = """
253 Universe=vanilla
254 executable=/woldlab/rattus/lvol0/mus/home/diane/bin/srf2fastq
255 output=srf2fastq.out.$(process).log
256 error=srf2fastq.err.$(process).log
257 log=srffastq.log
258
259 """
260     srf_condor_entries = []
261     fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
262     fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
263     lib_db = find_archive_sequence_files(host, 
264                                          apidata, 
265                                          sequences_path, 
266                                          library_result_map)
267
268     # find what targets we're missing
269     needed_targets = {}
270     for lib_id, result_dir in library_result_map:
271         lib = lib_db[lib_id]
272         for lane_key, sequences in lib['lanes'].items():
273             for seq in sequences:
274                 filename_attributes = { 
275                     'flowcell': seq.flowcell,
276                     'lib_id': lib_id,
277                     'lane': seq.lane,
278                     'read': seq.read,
279                     'cycle': seq.cycle
280                     }
281                 # throw out test runs
282                 # FIXME: this should probably be configurable
283                 if seq.cycle < 50:
284                     continue
285                 if seq.flowcell == '30CUUAAXX':
286                     # 30CUUAAXX run sucked
287                     continue
288
289                 # end filters
290                 if paired:
291                     target_name = fastq_paired_template % filename_attributes
292                 else:
293                     target_name = fastq_single_template % filename_attributes
294
295                 target_pathname = os.path.join(result_dir, target_name)
296                 if not os.path.exists(target_pathname):
297                     t = needed_targets.setdefault(target_pathname, {})
298                     t[seq.filetype] = seq
299                     
300     for target_pathname, available_sources in needed_targets.items():
301         logging.debug(' target : %s' % (target_pathname,))
302         logging.debug(' candidate sources: %s' % (available_sources,))
303         if available_sources.has_key('qseq'):
304             source = available_sources['qseq']
305             qseq_condor_entries.append(
306                 condor_qseq_to_fastq(source.path, target_pathname)
307             )
308         elif available_sources.has_key('srf'):
309             source = available_sources['srf']
310             if source.read is not None:
311                 logging.warn(
312                     "srf -> fastq paired end doesn't work yet: %s" % (source,)
313                 )
314             else:
315                 srf_condor_entries.append(
316                     condor_srf_to_fastq(source.path, target_pathname)
317                 )
318         else:
319             print " need file", target_pathname
320
321     if len(srf_condor_entries) > 0:
322         make_submit_script('srf.fastq.condor', 
323                            srf_condor_header,
324                            srf_condor_entries)
325
326     if len(qseq_condor_entries) > 0:
327         make_submit_script('qseq.fastq.condor', 
328                            qseq_condor_header,
329                            qseq_condor_entries)
330
331 def find_best_extension(extension_map, filename):
332     """
333     Search through extension_map looking for the best extension
334     The 'best' is the longest match
335
336     :Args:
337       extension_map (dict): '.ext' -> { 'view': 'name' or None }
338       filename (str): the filename whose extention we are about to examine
339     """
340     best_ext = None
341     path, last_ext = os.path.splitext(filename)
342
343     for ext in extension_map.keys():
344         if filename.endswith(ext):
345             if best_ext is None:
346                 best_ext = ext
347             elif len(ext) > len(best_ext):
348                 best_ext = ext
349     return best_ext
350
351 def add_submission_section(line_counter, files, standard_attributes, file_attributes):
352     """
353     Create a section in the submission ini file
354     """
355     inifile = [ '[line%s]' % (line_counter,) ]
356     inifile += ["files=%s" % (",".join(files))]
357     cur_attributes = {}
358     cur_attributes.update(standard_attributes)
359     cur_attributes.update(file_attributes)
360     
361     for k,v in cur_attributes.items():
362         inifile += ["%s=%s" % (k,v)]
363     return inifile
364     
365 def make_submission_ini(host, apidata, library_result_map):
366     attributes = {
367         '.bai':                   {'view': None}, # bam index
368         '.bam':                   {'view': 'Signal'},
369         '.condor':                {'view': None},
370         '.daf':                   {'view': None},
371         '.ddf':                   {'view': None},
372         '.splices.bam':           {'view': 'Splices'},
373         '.bed':                   {'view': 'TopHatJunctions'},
374         '.ini':                   {'view': None},
375         '.log':                   {'view': None},
376         '_r1.fastq':              {'view': 'FastqRd1'},
377         '_r2.fastq':              {'view': 'FastqRd2'},
378         '.tar.bz2':               {'view': None},
379         'novel.genes.expr':       {'view': 'GeneDeNovoFPKM'},
380         'novel.transcripts.expr': {'view': 'TranscriptDeNovoFPKM'},
381         '.genes.expr':            {'view': 'GeneFPKM'},
382         '.transcripts.expr':      {'view': 'TranscriptFPKM'},
383         '.stats.txt':             {'view': 'InsLength'},
384         '.gtf':                   {'view': 'CufflinksGeneModel'},
385         '.wig':                   {'view': 'RawSignal'},
386     }
387    
388     candidate_fastq_src = {}
389
390     for lib_id, result_dir in library_result_map:
391         inifile =  ['[config]']
392         inifile += ['order_by=files view cell localization rnaExtract mapAlgorithm readType replicate labVersion']
393         inifile += ['']
394         line_counter = 1
395         lib_info = get_library_info(host, apidata, lib_id)
396         result_ini = os.path.join(result_dir, result_dir+'.ini')
397
398         standard_attributes = {'cell': lib_info['cell_line'],
399                                'insertLength': '200', # ali
400                                'labVersion': 'TopHat',
401                                'localization': 'cell',
402                                'mapAlgorithm': 'TopHat',
403                                'readType': '2x75', #ali
404                                'replicate': lib_info['replicate'],
405                                'rnaExtract': 'longPolyA',
406                                }
407
408         # write fastq line
409         #fastqs = []
410         #for lane in lib_info['lane_set']:
411         #    target_name = "%s_%s_%s.fastq" % (lane['flowcell'], lib_id, lane['lane_number'])
412         #    fastqs.append(target_name)
413         #inifile.extend(
414         #    make_run_block(line_counter, fastqs, standard_attributes, attributes['.fastq'])
415         #)
416         #inifile += ['']
417         #line_counter += 1
418
419         # write other lines
420         submission_files = os.listdir(result_dir)
421         for f in submission_files:
422             best_ext = find_best_extension(attributes, f)
423
424             if best_ext is not None:
425                if attributes[best_ext]['view'] is None:
426                    continue
427                inifile.extend(
428                    add_submission_section(line_counter,
429                                           [f],
430                                           standard_attributes,
431                                           attributes[best_ext]
432                    )
433                )
434                inifile += ['']
435                line_counter += 1
436             else:
437                 raise ValueError("Unrecognized file: %s" % (f,))
438
439         f = open(result_ini,'w')
440         f.write(os.linesep.join(inifile))
441         f.close()
442
443 def link_daf(daf_path, library_result_map):
444     if not os.path.exists(daf_path):
445         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
446
447     base_daf = os.path.basename(daf_path)
448     
449     for lib_id, result_dir in library_result_map:
450         submission_daf = os.path.join(result_dir, base_daf)
451         if not os.path.exists(submission_daf):
452             os.link(daf_path, submission_daf)
453
454 def make_all_ddfs(library_result_map, daf_name):
455     for lib_id, result_dir in library_result_map:
456         ininame = result_dir+'.ini'
457         inipathname = os.path.join(result_dir, ininame)
458         if os.path.exists(inipathname):
459             make_ddf(ininame, daf_name, True, True, result_dir)
460             
461 def make_parser():
462     # Load defaults from the config files
463     config = SafeConfigParser()
464     config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
465     
466     sequence_archive = None
467     apiid = None
468     apikey = None
469     apihost = None
470     SECTION = 'sequence_archive'
471     if config.has_section(SECTION):
472         sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
473         sequence_archive = os.path.expanduser(sequence_archive)
474         apiid = config.get(SECTION, 'apiid', apiid)
475         apikey = config.get(SECTION, 'apikey', apikey)
476         apihost = config.get(SECTION, 'host', apihost)
477
478     parser = OptionParser()
479
480     # commands
481     parser.add_option('--fastq', help="generate scripts for making fastq files",
482                       default=False, action="store_true")
483
484     parser.add_option('--ini', help="generate submission ini file", default=False,
485                       action="store_true")
486
487     parser.add_option('--makeddf', help='make the ddfs', default=False,
488                       action="store_true")
489     
490     parser.add_option('--daf', default=None, help='specify daf name')
491
492     # configuration options
493     parser.add_option('--apiid', default=apiid, help="Specify API ID")
494     parser.add_option('--apikey', default=apikey, help="Specify API KEY")
495     parser.add_option('--host',  default=apihost,
496                       help="specify HTSWorkflow host",)
497     parser.add_option('--sequence', default=sequence_archive,
498                       help="sequence repository")
499     parser.add_option('--single', default=False, action="store_true", 
500                       help="treat the sequences as single ended runs")
501
502     # debugging
503     parser.add_option('--verbose', default=False, action="store_true",
504                       help='verbose logging')
505     parser.add_option('--debug', default=False, action="store_true",
506                       help='debug logging')
507
508     return parser
509
510 def main(cmdline=None):
511     parser = make_parser()
512     opts, args = parser.parse_args(cmdline)
513     
514     if opts.debug:
515         logging.basicConfig(level = logging.DEBUG )
516     elif opts.verbose:
517         logging.basicConfig(level = logging.INFO )
518     else:
519         logging.basicConfig(level = logging.WARNING )
520         
521     
522     apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
523
524     if opts.host is None or opts.apiid is None or opts.apikey is None:
525         parser.error("Please specify host url, apiid, apikey")
526
527     if len(args) == 0:
528         parser.error("I need at least one library submission-dir input file")
529         
530     library_result_map = []
531     for a in args:
532         library_result_map.extend(read_library_result_map(a))
533
534     if opts.daf is not None:
535         link_daf(opts.daf, library_result_map)
536
537     if opts.fastq:
538         build_fastqs(opts.host, 
539                      apidata, 
540                      opts.sequence, 
541                      library_result_map,
542                      not opts.single)
543
544     if opts.ini:
545         make_submission_ini(opts.host, apidata, library_result_map)
546
547     if opts.makeddf:
548         make_all_ddfs(library_result_map, opts.daf)
549         
550 if __name__ == "__main__":
551     main()