Collect fastqs by read and add them to the configuration ini file as a
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 from glob import glob
4 import json
5 import logging
6 from optparse import OptionParser
7 import os
8 from pprint import pprint, pformat
9 import shlex
10 from StringIO import StringIO
11 import time
12 import sys
13 import types
14 import urllib
15 import urllib2
16 import urlparse
17
18
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21     create_sequence_table, \
22     scan_for_sequences
23
24 def make_submission_name(ininame):
25     base = os.path.basename(ininame)
26     name, ext = os.path.splitext(base)
27     return name + '.tgz'
28
29 def make_ddf_name(pathname):
30     base = os.path.basename(pathname)
31     name, ext = os.path.splitext(base)
32     return name + '.ddf'
33
34 def make_condor_name(pathname):
35     base = os.path.basename(pathname)
36     name, ext = os.path.splitext(base)
37     return name + '.condor'
38     
39 def make_submit_script(target, header, body_list):
40     """
41     write out a text file
42
43     this was intended for condor submit scripts
44     
45     Args:
46       target (str or stream): 
47         if target is a string, we will open and close the file
48         if target is a stream, the caller is responsible.
49
50       header (str);
51         header to write at the beginning of the file
52       body_list (list of strs):
53         a list of blocks to add to the file.
54     """
55     if type(target) in types.StringTypes:
56         f = open(target,'w')
57     else:
58         f = target
59     f.write(header)
60     for entry in body_list:
61         f.write(entry)
62     if type(target) in types.StringTypes:
63         f.close()
64
65 def parse_filelist(file_string):
66     return file_string.split(',')
67
68 def validate_filelist(files):
69     """
70     Die if a file doesn't exist in a file list
71     """
72     for f in files:
73         if not os.path.exists(f):
74             raise RuntimeError("%s does not exist" % (f,))
75
76 def read_ddf_ini(filename, output=sys.stdout):
77     """
78     Read a ini file and dump out a tab delmited text file
79     """
80     file_list = []
81     config = SafeConfigParser()
82     config.read(filename)
83
84     order_by = shlex.split(config.get("config", "order_by"))
85
86     output.write("\t".join(order_by))
87     output.write(os.linesep)
88     sections = config.sections()
89     sections.sort()
90     for section in sections:
91         if section == "config":
92             # skip the config block
93             continue
94         values = []
95         for key in order_by:
96             v = config.get(section, key)
97             values.append(v)
98             if key == 'files':
99                 file_list.extend(parse_filelist(v))
100                 
101         output.write("\t".join(values))
102         output.write(os.linesep)
103     return file_list
104             
105 def make_condor_archive_script(ininame, files):
106     script = """Universe = vanilla
107
108 Executable = /bin/tar
109 arguments = czvf ../%(archivename)s %(filelist)s
110
111 Error = compress.err.$(Process).log
112 Output = compress.out.$(Process).log
113 Log = compress.log
114 initialdir = %(initialdir)s
115
116 queue 
117 """
118     for f in files:
119         if not os.path.exists(f):
120             raise RuntimeError("Missing %s" % (f,))
121
122     context = {'archivename': make_submission_name(ininame),
123                'filelist': " ".join(files),
124                'initialdir': os.getcwd()}
125
126     condor_script = make_condor_name(ininame)
127     condor_stream = open(condor_script,'w')
128     condor_stream.write(script % context)
129     condor_stream.close()
130
131 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
132     """
133     Make ddf files, and bonus condor file
134     """
135     curdir = os.getcwd()
136     if outdir is not None:
137         os.chdir(outdir)
138     output = sys.stdout
139     ddf_name = None
140     if guess_ddf:
141         ddf_name = make_ddf_name(ininame)
142         print ddf_name
143         output = open(ddf_name,'w')
144
145     file_list = read_ddf_ini(ininame, output)
146
147     file_list.append(daf_name)
148     if ddf_name is not None:
149         file_list.append(ddf_name)
150
151     if make_condor:
152         make_condor_archive_script(ininame, file_list)
153         
154     os.chdir(curdir)
155
156
157 def get_library_info(host, apidata, library_id):
158     url = api.library_url(host, library_id)
159     contents = api.retrieve_info(url, apidata)
160     return contents
161     
162 def read_library_result_map(filename):
163     stream = open(filename,'r')
164
165     results = []
166     for line in stream:
167         if not line.startswith('#'):
168             library_id, result_dir = line.strip().split()
169             results.append((library_id, result_dir))
170     return results
171
172 def condor_srf_to_fastq(srf_file, target_pathname):
173     script = """output=%(target_pathname)s
174 arguments="-c %(srf_file)s"
175 queue
176 """
177     params = {'srf_file': srf_file,
178               'target_pathname': target_pathname}
179     
180     return  script % params
181
182 def condor_qseq_to_fastq(qseq_file, target_pathname):
183     script = """
184 arguments="-i %(qseq_file)s -o %(target_pathname)s"
185 queue
186 """
187     params = {'qseq_file': qseq_file,
188               'target_pathname': target_pathname}
189     
190     return script % params
191
192 def find_archive_sequence_files(host, apidata, sequences_path, 
193                                 library_result_map):
194     """
195     Find all the archive sequence files possibly associated with our results.
196
197     """
198     logging.debug("Searching for sequence files in: %s" %(sequences_path,))
199
200     lib_db = {}
201     seq_dirs = set()
202     #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
203     candidate_lanes = {}
204     for lib_id, result_dir in library_result_map:
205         lib_info = get_library_info(host, apidata, lib_id)
206         lib_db[lib_id] = lib_info
207
208         for lane in lib_info['lane_set']:
209             lane_key = (lane['flowcell'], lane['lane_number'])
210             candidate_lanes[lane_key] = lib_id
211             seq_dirs.add(os.path.join(sequences_path, 
212                                          'flowcells', 
213                                          lane['flowcell']))
214     logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
215     candidate_seq_list = scan_for_sequences(seq_dirs)
216
217     # at this point we have too many sequences as scan_for_sequences
218     # returns all the sequences in a flowcell directory
219     # so lets filter out the extras
220     
221     for seq in candidate_seq_list:
222         lane_key = (seq.flowcell, seq.lane)
223         lib_id = candidate_lanes.get(lane_key, None)
224         if lib_id is not None:
225             lib_info = lib_db[lib_id]
226             lanes = lib_info.setdefault('lanes', {})
227             lanes.setdefault(lane_key, set()).add(seq)
228     
229     return lib_db
230
231 def build_fastqs(host, apidata, sequences_path, library_result_map, 
232                  paired=True ):
233     """
234     Generate condor scripts to build any needed fastq files
235     
236     Args:
237       host (str): root of the htsworkflow api server
238       apidata (dict): id & key to post to the server
239       sequences_path (str): root of the directory tree to scan for files
240       library_result_map (list):  [(library_id, destination directory), ...]
241       paired: should we assume that we are processing paired end records?
242               if False, we will treat this as single ended.
243     """
244     qseq_condor_header = """
245 Universe=vanilla
246 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/gaworkflow/scripts/qseq2fastq
247 error=qseqfastq.err.$(process).log
248 output=qseqfastq.out.$(process).log
249 log=qseqfastq.log
250
251 """
252     qseq_condor_entries = []
253     srf_condor_header = """
254 Universe=vanilla
255 executable=/woldlab/rattus/lvol0/mus/home/diane/bin/srf2fastq
256 output=srf2fastq.out.$(process).log
257 error=srf2fastq.err.$(process).log
258 log=srffastq.log
259
260 """
261     srf_condor_entries = []
262     fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
263     fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
264     lib_db = find_archive_sequence_files(host, 
265                                          apidata, 
266                                          sequences_path, 
267                                          library_result_map)
268
269     # find what targets we're missing
270     needed_targets = {}
271     for lib_id, result_dir in library_result_map:
272         lib = lib_db[lib_id]
273         for lane_key, sequences in lib['lanes'].items():
274             for seq in sequences:
275                 filename_attributes = { 
276                     'flowcell': seq.flowcell,
277                     'lib_id': lib_id,
278                     'lane': seq.lane,
279                     'read': seq.read,
280                     'cycle': seq.cycle
281                     }
282                 # throw out test runs
283                 # FIXME: this should probably be configurable
284                 if seq.cycle < 50:
285                     continue
286                 if seq.flowcell == '30CUUAAXX':
287                     # 30CUUAAXX run sucked
288                     continue
289
290                 # end filters
291                 if paired:
292                     target_name = fastq_paired_template % filename_attributes
293                 else:
294                     target_name = fastq_single_template % filename_attributes
295
296                 target_pathname = os.path.join(result_dir, target_name)
297                 if not os.path.exists(target_pathname):
298                     t = needed_targets.setdefault(target_pathname, {})
299                     t[seq.filetype] = seq
300                     
301     for target_pathname, available_sources in needed_targets.items():
302         logging.debug(' target : %s' % (target_pathname,))
303         logging.debug(' candidate sources: %s' % (available_sources,))
304         if available_sources.has_key('qseq'):
305             source = available_sources['qseq']
306             qseq_condor_entries.append(
307                 condor_qseq_to_fastq(source.path, target_pathname)
308             )
309         elif available_sources.has_key('srf'):
310             source = available_sources['srf']
311             if source.read is not None:
312                 logging.warn(
313                     "srf -> fastq paired end doesn't work yet: %s" % (source,)
314                 )
315             else:
316                 srf_condor_entries.append(
317                     condor_srf_to_fastq(source.path, target_pathname)
318                 )
319         else:
320             print " need file", target_pathname
321
322     if len(srf_condor_entries) > 0:
323         make_submit_script('srf.fastq.condor', 
324                            srf_condor_header,
325                            srf_condor_entries)
326
327     if len(qseq_condor_entries) > 0:
328         make_submit_script('qseq.fastq.condor', 
329                            qseq_condor_header,
330                            qseq_condor_entries)
331
332 def find_best_extension(extension_map, filename):
333     """
334     Search through extension_map looking for the best extension
335     The 'best' is the longest match
336
337     :Args:
338       extension_map (dict): '.ext' -> { 'view': 'name' or None }
339       filename (str): the filename whose extention we are about to examine
340     """
341     best_ext = None
342     path, last_ext = os.path.splitext(filename)
343
344     for ext in extension_map.keys():
345         if filename.endswith(ext):
346             if best_ext is None:
347                 best_ext = ext
348             elif len(ext) > len(best_ext):
349                 best_ext = ext
350     return best_ext
351
352 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
353     """
354     Create a section in the submission ini file
355     """
356     inifile = [ '[line%s]' % (line_counter,) ]
357     inifile += ["files=%s" % (",".join(files))]
358     cur_attributes = {}
359     cur_attributes.update(standard_attributes)
360     cur_attributes.update(file_attributes)
361     
362     for k,v in cur_attributes.items():
363         inifile += ["%s=%s" % (k,v)]
364     return inifile
365     
366 def make_submission_ini(host, apidata, library_result_map):
367     attributes = {
368         '.bai':                   {'view': None}, # bam index
369         '.bam':                   {'view': 'Signal'},
370         '.splices.bam':           {'view': 'Splices'},
371         '.bed':                   {'view': 'TopHatJunctions'},
372         '.bigwig':                {'view': 'RawSigna'},
373         '.tar.bz2':               {'view': None},
374         '.condor':                {'view': None},
375         '.daf':                   {'view': None},
376         '.ddf':                   {'view': None},
377         'novel.genes.expr':       {'view': 'GeneDeNovoFPKM'},
378         'novel.transcripts.expr': {'view': 'TranscriptDeNovoFPKM'},
379         '.genes.expr':            {'view': 'GeneFPKM'},
380         '.transcripts.expr':      {'view': 'TranscriptFPKM'},
381         '.fastq':                 {'view': 'Fastq' },
382         '_r1.fastq':              {'view': 'FastqRd1'},
383         '_r2.fastq':              {'view': 'FastqRd2'},
384         '.gtf':                   {'view': 'CufflinksGeneModel'},
385         '.ini':                   {'view': None},
386         '.log':                   {'view': None},
387         '.stats.txt':             {'view': 'InsLength'},
388         '.srf':                   {'view': None},
389         '.wig':                   {'view': 'RawSignal'},
390     }
391    
392     candidate_fastq_src = {}
393
394     for lib_id, result_dir in library_result_map:
395         inifile =  ['[config]']
396         inifile += ['order_by=files view cell localization rnaExtract mapAlgorithm readType replicate labVersion']
397         inifile += ['']
398         line_counter = 1
399         lib_info = get_library_info(host, apidata, lib_id)
400         result_ini = os.path.join(result_dir, result_dir+'.ini')
401
402         if lib_info['cell_line'].lower() == 'unknown':
403             logging.warn("Library %s missing cell_line" % (lib_id,))
404
405         standard_attributes = {'cell': lib_info['cell_line'],
406                                'insertLength': '200', 
407                                'labVersion': 'TopHat',
408                                'localization': 'cell',
409                                'mapAlgorithm': 'TopHat',
410                                'readType': '2x75', 
411                                'replicate': lib_info['replicate'],
412                                'rnaExtract': 'longPolyA',
413                                }
414
415         # write other lines
416         submission_files = os.listdir(result_dir)
417         fastqs = {}
418         for f in submission_files:
419             best_ext = find_best_extension(attributes, f)
420
421             if best_ext is not None:
422                if attributes[best_ext]['view'] is None:
423                    
424                    continue
425                elif best_ext.endswith('fastq'):
426                    fastqs.setdefault(best_ext, set()).add(f)
427                else:
428                    inifile.extend(
429                        make_submission_section(line_counter,
430                                                [f],
431                                                standard_attributes,
432                                                attributes[best_ext]
433                                                )
434                        )
435                    inifile += ['']
436                    line_counter += 1
437             else:
438                 raise ValueError("Unrecognized file: %s" % (f,))
439
440         # add in fastqs on a single line.
441         for extension, fastq_set in fastqs.items():
442             inifile.extend(
443                 make_submission_section(line_counter, 
444                                         fastq_set,
445                                         standard_attributes,
446                                         attributes[extension])
447             )
448             inifile += ['']
449             line_counter += 1
450             
451         f = open(result_ini,'w')
452         f.write(os.linesep.join(inifile))
453
454 def link_daf(daf_path, library_result_map):
455     if not os.path.exists(daf_path):
456         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
457
458     base_daf = os.path.basename(daf_path)
459     
460     for lib_id, result_dir in library_result_map:
461         submission_daf = os.path.join(result_dir, base_daf)
462         if not os.path.exists(submission_daf):
463             os.link(daf_path, submission_daf)
464
465 def make_all_ddfs(library_result_map, daf_name):
466     for lib_id, result_dir in library_result_map:
467         ininame = result_dir+'.ini'
468         inipathname = os.path.join(result_dir, ininame)
469         if os.path.exists(inipathname):
470             make_ddf(ininame, daf_name, True, True, result_dir)
471             
472 def make_parser():
473     # Load defaults from the config files
474     config = SafeConfigParser()
475     config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
476     
477     sequence_archive = None
478     apiid = None
479     apikey = None
480     apihost = None
481     SECTION = 'sequence_archive'
482     if config.has_section(SECTION):
483         sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
484         sequence_archive = os.path.expanduser(sequence_archive)
485         apiid = config.get(SECTION, 'apiid', apiid)
486         apikey = config.get(SECTION, 'apikey', apikey)
487         apihost = config.get(SECTION, 'host', apihost)
488
489     parser = OptionParser()
490
491     # commands
492     parser.add_option('--fastq', help="generate scripts for making fastq files",
493                       default=False, action="store_true")
494
495     parser.add_option('--ini', help="generate submission ini file", default=False,
496                       action="store_true")
497
498     parser.add_option('--makeddf', help='make the ddfs', default=False,
499                       action="store_true")
500     
501     parser.add_option('--daf', default=None, help='specify daf name')
502
503     # configuration options
504     parser.add_option('--apiid', default=apiid, help="Specify API ID")
505     parser.add_option('--apikey', default=apikey, help="Specify API KEY")
506     parser.add_option('--host',  default=apihost,
507                       help="specify HTSWorkflow host",)
508     parser.add_option('--sequence', default=sequence_archive,
509                       help="sequence repository")
510     parser.add_option('--single', default=False, action="store_true", 
511                       help="treat the sequences as single ended runs")
512
513     # debugging
514     parser.add_option('--verbose', default=False, action="store_true",
515                       help='verbose logging')
516     parser.add_option('--debug', default=False, action="store_true",
517                       help='debug logging')
518
519     return parser
520
521 def main(cmdline=None):
522     parser = make_parser()
523     opts, args = parser.parse_args(cmdline)
524     
525     if opts.debug:
526         logging.basicConfig(level = logging.DEBUG )
527     elif opts.verbose:
528         logging.basicConfig(level = logging.INFO )
529     else:
530         logging.basicConfig(level = logging.WARNING )
531         
532     
533     apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
534
535     if opts.host is None or opts.apiid is None or opts.apikey is None:
536         parser.error("Please specify host url, apiid, apikey")
537
538     if len(args) == 0:
539         parser.error("I need at least one library submission-dir input file")
540         
541     library_result_map = []
542     for a in args:
543         library_result_map.extend(read_library_result_map(a))
544
545     if opts.daf is not None:
546         link_daf(opts.daf, library_result_map)
547
548     if opts.fastq:
549         build_fastqs(opts.host, 
550                      apidata, 
551                      opts.sequence, 
552                      library_result_map,
553                      not opts.single)
554
555     if opts.ini:
556         make_submission_ini(opts.host, apidata, library_result_map)
557
558     if opts.makeddf:
559         make_all_ddfs(library_result_map, opts.daf)
560         
561 if __name__ == "__main__":
562     main()