d34a5754038a74fa4c078878bbb51ba095fa4343
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 from glob import glob
4 import json
5 import logging
6 from optparse import OptionParser
7 import os
8 from pprint import pprint
9 import shlex
10 from StringIO import StringIO
11 import time
12 import sys
13 import urllib
14 import urllib2
15 import urlparse
16
17
18 from htsworkflow.util import api
19 from htsworkflow.pipelines.sequences import \
20     create_sequence_table, \
21     scan_for_sequences
22
23 def make_submission_name(ininame):
24     base = os.path.basename(ininame)
25     name, ext = os.path.splitext(base)
26     return name + '.tgz'
27
28 def make_ddf_name(pathname):
29     base = os.path.basename(pathname)
30     name, ext = os.path.splitext(base)
31     return name + '.ddf'
32
33 def make_condor_name(pathname):
34     base = os.path.basename(pathname)
35     name, ext = os.path.splitext(base)
36     return name + '.condor'
37     
38 def parse_filelist(file_string):
39     return file_string.split(',')
40
41 def validate_filelist(files):
42     """
43     Die if a file doesn't exist in a file list
44     """
45     for f in files:
46         if not os.path.exists(f):
47             raise RuntimeError("%s does not exist" % (f,))
48
49 def read_ddf_ini(filename, output=sys.stdout):
50     """
51     Read a ini file and dump out a tab delmited text file
52     """
53     file_list = []
54     config = SafeConfigParser()
55     config.read(filename)
56
57     order_by = shlex.split(config.get("config", "order_by"))
58
59     output.write("\t".join(order_by))
60     output.write(os.linesep)
61     sections = config.sections()
62     sections.sort()
63     for section in sections:
64         if section == "config":
65             # skip the config block
66             continue
67         values = []
68         for key in order_by:
69             v = config.get(section, key)
70             values.append(v)
71             if key == 'files':
72                 file_list.extend(parse_filelist(v))
73                 
74         output.write("\t".join(values))
75         output.write(os.linesep)
76     return file_list
77             
78 def make_condor_archive_script(ininame, files):
79     script = """Universe = vanilla
80
81 Executable = /bin/tar
82 arguments = czvf ../%(archivename)s %(filelist)s
83
84 Error = compress.err.$(Process).log
85 Output = compress.out.$(Process).log
86 Log = compress.log
87 initialdir = %(initialdir)s
88
89 queue 
90 """
91     for f in files:
92         if not os.path.exists(f):
93             raise RuntimeError("Missing %s" % (f,))
94
95     context = {'archivename': make_submission_name(ininame),
96                'filelist': " ".join(files),
97                'initialdir': os.getcwd()}
98
99     condor_script = make_condor_name(ininame)
100     condor_stream = open(condor_script,'w')
101     condor_stream.write(script % context)
102     condor_stream.close()
103
104 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
105     """
106     Make ddf files, and bonus condor file
107     """
108     curdir = os.getcwd()
109     if outdir is not None:
110         os.chdir(outdir)
111     output = sys.stdout
112     ddf_name = None
113     if guess_ddf:
114         ddf_name = make_ddf_name(ininame)
115         print ddf_name
116         output = open(ddf_name,'w')
117
118     file_list = read_ddf_ini(ininame, output)
119
120     file_list.append(daf_name)
121     if ddf_name is not None:
122         file_list.append(ddf_name)
123
124     if make_condor:
125         make_condor_archive_script(ininame, file_list)
126         
127     os.chdir(curdir)
128
129
130 def get_library_info(host, apidata, library_id):
131     url = api.library_url(host, library_id)
132     contents = api.retrieve_info(url, apidata)
133     return contents
134     
135 def read_library_result_map(filename):
136     stream = open(filename,'r')
137
138     results = []
139     for line in stream:
140         library_id, result_dir = line.strip().split()
141         results.append((library_id, result_dir))
142     return results
143
144 def condor_srf_to_fastq(srf_file, target_pathname):
145     script = """output=%(target_pathname)s
146 arguments="-c %(srf_file)s"
147 queue
148 """
149     params = {'srf_file': srf_file,
150               'target_pathname': target_pathname}
151     
152     return  script % params
153
154 def condor_qseq_to_fastq(qseq_file, target_pathname):
155     script = """
156 arguments="-i %(qseq_file)s -o %(target_pathname)s"
157 queue
158 """
159     params = {'qseq_file': qseq_file,
160               'target_pathname': target_pathname}
161     
162     return script % params
163
164 def find_archive_sequence_files(host, apidata, sequences_path, 
165                                 library_result_map):
166     """
167     Find all the archive sequence files possibly associated with our results.
168
169     """
170     logging.debug("Searching for sequence files in: %s" %(sequences_path,))
171
172     lib_db = {}
173     seq_dirs = set()
174     #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
175     candidate_lanes = {}
176     for lib_id, result_dir in library_result_map:
177         lib_info = get_library_info(host, apidata, lib_id)
178         lib_db[lib_id] = lib_info
179
180         for lane in lib_info['lane_set']:
181             lane_key = (lane['flowcell'], lane['lane_number'])
182             candidate_lanes[lane_key] = lib_id
183             seq_dirs.add(os.path.join(sequences_path, 
184                                          'flowcells', 
185                                          lane['flowcell']))
186     logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
187     candidate_seq_list = scan_for_sequences(seq_dirs)
188
189     # at this point we have too many sequences as scan_for_sequences
190     # returns all the sequences in a flowcell directory
191     # so lets filter out the extras
192     
193     for seq in candidate_seq_list:
194         lane_key = (seq.flowcell, seq.lane)
195         lib_id = candidate_lanes.get(lane_key, None)
196         if lib_id is not None:
197             lib_info = lib_db[lib_id]
198             lanes = lib_info.setdefault('lanes', {})
199             lanes.setdefault(lane_key, set()).add(seq)
200     
201     return lib_db
202
203 def build_fastqs(host, apidata, sequences_path, library_result_map):
204     """
205     Generate condor scripts to build any needed fastq files
206     """
207     qseq_condor_header = """
208 Universe=vanilla
209 executable=/home/diane/proj/gaworkflow/scripts/qseq2fastq
210 error=qseqfastq.err.$(process).log
211 output=qseqfastq.out.$(process).log
212 log=qseqfastq.log
213
214 """
215     qseq_condor_entries = []
216     srf_condor_header = """
217 Universe=vanilla
218 executable=/home/diane/bin/srf2fastq
219 error=srffastq.err.$(process)
220 log=srffastq.log
221
222 """
223     srf_condor_entries = []
224     fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
225     fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
226     lib_db = find_archive_sequence_files(host, 
227                                          apidata, 
228                                          sequences_path, 
229                                          library_result_map)
230
231     # find what targets we're missing
232     needed_targets = {}
233     for lib_id, result_dir in library_result_map:
234         lib = lib_db[lib_id]
235         for lane_key, sequences in lib['lanes'].items():
236             for seq in sequences:
237                 # skip srfs for the moment
238                 if seq.filetype == 'srf':
239                     continue
240                 filename_attributes = { 
241                     'flowcell': seq.flowcell,
242                     'lib_id': lib_id,
243                     'lane': seq.lane,
244                     'read': seq.read,
245                     'cycle': seq.cycle
246                     }
247                 # throw out test runs
248                 # FIXME: this should probably be configurable
249                 if seq.cycle < 50:
250                     continue
251                 if seq.flowcell == '30CUUAAXX':
252                     # 30CUUAAXX run sucked
253                     continue
254
255                 # end filters
256                 if seq.read is not None:
257                     target_name = fastq_paired_template % filename_attributes
258                 else:
259                     target_name = fastq_single_template % filename_attributes
260
261                 target_pathname = os.path.join(result_dir, target_name)
262                 if not os.path.exists(target_pathname):
263                     t = needed_targets.setdefault(target_pathname, {})
264                     t[seq.filetype] = seq
265                     
266     for target_pathname, available_sources in needed_targets.items():
267         if available_sources.has_key('qseq'):
268             source = available_sources['qseq']
269             qseq_condor_entries.append(
270                 condor_qseq_to_fastq(source.path, target_pathname)
271             )
272         #elif available_sources.has_key('srf'):
273         #    srf_condor_entries.append(condor_srf_to_fastq(source, target))
274         else:
275             print " need file", target_pathname
276
277 #
278 #        f = open('srf.fastq.condor','w')
279 #        f.write(srf_condor)
280 #        f.close()
281
282     if len(qseq_condor_entries) > 0:
283         #f = sys.stdout
284         f = open('qseq.fastq.condor','w')
285         f.write(qseq_condor_header)
286         for entry in qseq_condor_entries:
287             f.write(entry)
288         f.close()
289
290 def find_best_extension(extension_map, filename):
291     """
292     Search through extension_map looking for the best extension
293     The 'best' is the longest match
294
295     :Args:
296       extension_map (dict): '.ext' -> { 'view': 'name' or None }
297       filename (str): the filename whose extention we are about to examine
298     """
299     best_ext = None
300     path, last_ext = os.path.splitext(filename)
301
302     for ext in extension_map.keys():
303         if filename.endswith(ext):
304             if best_ext is None:
305                 best_ext = ext
306             elif len(ext) > len(best_ext):
307                 best_ext = ext
308     return best_ext
309
310 def add_submission_section(line_counter, files, standard_attributes, file_attributes):
311     """
312     Create a section in the submission ini file
313     """
314     inifile = [ '[line%s]' % (line_counter,) ]
315     inifile += ["files=%s" % (",".join(files))]
316     cur_attributes = {}
317     cur_attributes.update(standard_attributes)
318     cur_attributes.update(file_attributes)
319     
320     for k,v in cur_attributes.items():
321         inifile += ["%s=%s" % (k,v)]
322     return inifile
323     
324 def make_submission_ini(host, apidata, library_result_map):
325     attributes = {
326         '.bai':                   {'view': None}, # bam index
327         '.bam':                   {'view': 'Signal'},
328         '.condor':                {'view': None},
329         '.daf':                   {'view': None},
330         '.ddf':                   {'view': None},
331         '.splices.bam':           {'view': 'Splices'},
332         '.bed':                   {'view': 'TopHatJunctions'},
333         '.ini':                   {'view': None},
334         '.log':                   {'view': None},
335         '_r1.fastq':              {'view': 'FastqRd1'},
336         '_r2.fastq':              {'view': 'FastqRd2'},
337         '.tar.bz2':               {'view': None},
338         'novel.genes.expr':       {'view': 'GeneDeNovoFPKM'},
339         'novel.transcripts.expr': {'view': 'TranscriptDeNovoFPKM'},
340         '.genes.expr':            {'view': 'GeneFPKM'},
341         '.transcripts.expr':      {'view': 'TranscriptFPKM'},
342         '.stats.txt':             {'view': 'InsLength'},
343         '.gtf':                   {'view': 'CufflinksGeneModel'},
344         '.wig':                   {'view': 'RawSignal'},
345     }
346    
347     candidate_fastq_src = {}
348
349     for lib_id, result_dir in library_result_map:
350         inifile =  ['[config]']
351         inifile += ['order_by=files view cell localization rnaExtract mapAlgorithm readType replicate labVersion']
352         inifile += ['']
353         line_counter = 1
354         lib_info = get_library_info(host, apidata, lib_id)
355         result_ini = os.path.join(result_dir, result_dir+'.ini')
356
357         standard_attributes = {'cell': lib_info['cell_line'],
358                                'insertLength': '200', # ali
359                                'labVersion': 'TopHat',
360                                'localization': 'cell',
361                                'mapAlgorithm': 'TopHat',
362                                'readType': '2x75', #ali
363                                'replicate': lib_info['replicate'],
364                                'rnaExtract': 'longPolyA',
365                                }
366
367         # write fastq line
368         #fastqs = []
369         #for lane in lib_info['lane_set']:
370         #    target_name = "%s_%s_%s.fastq" % (lane['flowcell'], lib_id, lane['lane_number'])
371         #    fastqs.append(target_name)
372         #inifile.extend(
373         #    make_run_block(line_counter, fastqs, standard_attributes, attributes['.fastq'])
374         #)
375         #inifile += ['']
376         #line_counter += 1
377
378         # write other lines
379         submission_files = os.listdir(result_dir)
380         for f in submission_files:
381             best_ext = find_best_extension(attributes, f)
382
383             if best_ext is not None:
384                if attributes[best_ext]['view'] is None:
385                    continue
386                inifile.extend(
387                    add_submission_section(line_counter,
388                                           [f],
389                                           standard_attributes,
390                                           attributes[best_ext]
391                    )
392                )
393                inifile += ['']
394                line_counter += 1
395             else:
396                 raise ValueError("Unrecognized file: %s" % (f,))
397
398         f = open(result_ini,'w')
399         f.write(os.linesep.join(inifile))
400         f.close()
401
402 def link_daf(daf_path, library_result_map):
403     if not os.path.exists(daf_path):
404         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
405
406     base_daf = os.path.basename(daf_path)
407     
408     for lib_id, result_dir in library_result_map:
409         submission_daf = os.path.join(result_dir, base_daf)
410         if not os.path.exists(submission_daf):
411             os.link(daf_path, submission_daf)
412
413 def make_all_ddfs(library_result_map, daf_name):
414     for lib_id, result_dir in library_result_map:
415         ininame = result_dir+'.ini'
416         inipathname = os.path.join(result_dir, ininame)
417         if os.path.exists(inipathname):
418             make_ddf(ininame, daf_name, True, True, result_dir)
419             
420 def make_parser():
421     # Load defaults from the config files
422     config = SafeConfigParser()
423     config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
424     
425     sequence_archive = None
426     apiid = None
427     apikey = None
428     apihost = None
429     SECTION = 'sequence_archive'
430     if config.has_section(SECTION):
431         sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
432         sequence_archive = os.path.expanduser(sequence_archive)
433         apiid = config.get(SECTION, 'apiid', apiid)
434         apikey = config.get(SECTION, 'apikey', apikey)
435         apihost = config.get(SECTION, 'host', apihost)
436
437     parser = OptionParser()
438
439     parser.add_option('--fastq', help="generate scripts for making fastq files",
440                       default=False, action="store_true")
441
442     parser.add_option('--ini', help="generate submission ini file", default=False,
443                       action="store_true")
444
445     parser.add_option('--makeddf', help='make the ddfs', default=False,
446                       action="store_true")
447     
448     parser.add_option('--daf', default=None, help='specify daf name')
449     parser.add_option('--sequence', help="sequence repository", default=sequence_archive)
450     parser.add_option('--host', help="specify HTSWorkflow host", default=apihost)
451     parser.add_option('--apiid', help="Specify API ID", default=apiid)
452     parser.add_option('--apikey', help="Specify API KEY", default=apikey)
453     
454     return parser
455
456 def main(cmdline=None):
457     parser = make_parser()
458     opts, args = parser.parse_args(cmdline)
459
460     apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
461
462     if opts.host is None or opts.apiid is None or opts.apikey is None:
463         parser.error("Please specify host url, apiid, apikey")
464
465     if len(args) == 0:
466         parser.error("I need at least one library submission-dir input file")
467         
468     library_result_map = []
469     for a in args:
470         library_result_map.extend(read_library_result_map(a))
471
472     if opts.daf is not None:
473         link_daf(opts.daf, library_result_map)
474
475     if opts.fastq:
476         build_fastqs(opts.host, apidata, opts.sequence, library_result_map)
477
478     if opts.ini:
479         make_submission_ini(opts.host, apidata, library_result_map)
480
481     if opts.makeddf:
482         make_all_ddfs(library_result_map, opts.daf)
483         
484 if __name__ == "__main__":
485     #logging.basicConfig(level = logging.WARNING )
486     logging.basicConfig(level = logging.DEBUG )
487     main()