Split parts of build_fastqs into a seperate function.
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 from glob import glob
4 import json
5 import logging
6 from optparse import OptionParser
7 import os
8 from pprint import pprint, pformat
9 import shlex
10 from StringIO import StringIO
11 import time
12 import sys
13 import types
14 import urllib
15 import urllib2
16 import urlparse
17
18
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21     create_sequence_table, \
22     scan_for_sequences
23
24 def main(cmdline=None):
25     parser = make_parser()
26     opts, args = parser.parse_args(cmdline)
27     
28     if opts.debug:
29         logging.basicConfig(level = logging.DEBUG )
30     elif opts.verbose:
31         logging.basicConfig(level = logging.INFO )
32     else:
33         logging.basicConfig(level = logging.WARNING )
34         
35     
36     apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
37
38     if opts.host is None or opts.apiid is None or opts.apikey is None:
39         parser.error("Please specify host url, apiid, apikey")
40
41     if len(args) == 0:
42         parser.error("I need at least one library submission-dir input file")
43         
44     library_result_map = []
45     for a in args:
46         library_result_map.extend(read_library_result_map(a))
47
48     if opts.daf is not None:
49         link_daf(opts.daf, library_result_map)
50
51     if opts.fastq:
52         build_fastqs(opts.host, 
53                      apidata, 
54                      opts.sequence, 
55                      library_result_map,
56                      force=opts.force)
57
58     if opts.ini:
59         make_submission_ini(opts.host, apidata, library_result_map)
60
61     if opts.makeddf:
62         make_all_ddfs(library_result_map, opts.daf)
63
64 def make_parser():
65     # Load defaults from the config files
66     config = SafeConfigParser()
67     config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
68     
69     sequence_archive = None
70     apiid = None
71     apikey = None
72     apihost = None
73     SECTION = 'sequence_archive'
74     if config.has_section(SECTION):
75         sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
76         sequence_archive = os.path.expanduser(sequence_archive)
77         apiid = config.get(SECTION, 'apiid', apiid)
78         apikey = config.get(SECTION, 'apikey', apikey)
79         apihost = config.get(SECTION, 'host', apihost)
80
81     parser = OptionParser()
82
83     # commands
84     parser.add_option('--fastq', help="generate scripts for making fastq files",
85                       default=False, action="store_true")
86
87     parser.add_option('--ini', help="generate submission ini file", default=False,
88                       action="store_true")
89
90     parser.add_option('--makeddf', help='make the ddfs', default=False,
91                       action="store_true")
92     
93     parser.add_option('--daf', default=None, help='specify daf name')
94     parser.add_option('--force', default=False, action="store_true",
95                       help="Force regenerating fastqs")
96
97     # configuration options
98     parser.add_option('--apiid', default=apiid, help="Specify API ID")
99     parser.add_option('--apikey', default=apikey, help="Specify API KEY")
100     parser.add_option('--host',  default=apihost,
101                       help="specify HTSWorkflow host",)
102     parser.add_option('--sequence', default=sequence_archive,
103                       help="sequence repository")
104
105     # debugging
106     parser.add_option('--verbose', default=False, action="store_true",
107                       help='verbose logging')
108     parser.add_option('--debug', default=False, action="store_true",
109                       help='debug logging')
110
111     return parser
112
113 def build_fastqs(host, apidata, sequences_path, library_result_map, 
114                  force=False ):
115     """
116     Generate condor scripts to build any needed fastq files
117     
118     Args:
119       host (str): root of the htsworkflow api server
120       apidata (dict): id & key to post to the server
121       sequences_path (str): root of the directory tree to scan for files
122       library_result_map (list):  [(library_id, destination directory), ...]
123     """
124     qseq_condor_header = """
125 Universe=vanilla
126 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
127 error=log/qseq2fastq.err.$(process).log
128 output=log/qseq2fastq.out.$(process).log
129 log=log/qseq2fastq.log
130
131 """
132     qseq_condor_entries = []
133     srf_condor_header = """
134 Universe=vanilla
135 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2named_fastq.py
136 output=log/srf_pair_fastq.out.$(process).log
137 error=log/srf_pair_fastq.err.$(process).log
138 log=log/srf_pair_fastq.log
139 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
140
141 """
142     srf_condor_entries = []
143     lib_db = find_archive_sequence_files(host, 
144                                          apidata, 
145                                          sequences_path, 
146                                          library_result_map)
147
148     needed_targets = find_missing_targets(library_result_map, lib_db, force)
149
150     for target_pathname, available_sources in needed_targets.items():
151         logging.debug(' target : %s' % (target_pathname,))
152         logging.debug(' candidate sources: %s' % (available_sources,))
153         if available_sources.has_key('qseq'):
154             source = available_sources['qseq']
155             qseq_condor_entries.append(
156                 condor_qseq_to_fastq(source.path, 
157                                      target_pathname, 
158                                      source.flowcell,
159                                      force=force)
160             )
161         elif available_sources.has_key('srf'):
162             source = available_sources['srf']
163             mid = getattr(source, 'mid_point', None)
164             srf_condor_entries.append(
165                 condor_srf_to_fastq(source.path, 
166                                     target_pathname,
167                                     source.paired,
168                                     source.flowcell,
169                                     mid,
170                                     force=force)
171             )
172         else:
173             print " need file", target_pathname
174
175     if len(srf_condor_entries) > 0:
176         make_submit_script('srf.fastq.condor', 
177                            srf_condor_header,
178                            srf_condor_entries)
179
180     if len(qseq_condor_entries) > 0:
181         make_submit_script('qseq.fastq.condor', 
182                            qseq_condor_header,
183                            qseq_condor_entries)
184
185 def find_missing_targets(library_result_map, lib_db, force=False):
186     """
187     Check if the sequence file exists.
188     This requires computing what the sequence name is and checking
189     to see if it can be found in the sequence location.
190
191     Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
192     """
193     fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
194     fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
195     # find what targets we're missing
196     needed_targets = {}
197     for lib_id, result_dir in library_result_map:
198         lib = lib_db[lib_id]
199         lane_dict = make_lane_dict(lib_db, lib_id)
200         
201         for lane_key, sequences in lib['lanes'].items():
202             for seq in sequences:
203                 seq.paired = lane_dict[seq.flowcell]['paired_end']
204                 lane_status = lane_dict[seq.flowcell]['status']
205
206                 if seq.paired and seq.read is None:
207                     seq.read = 1
208                 filename_attributes = { 
209                     'flowcell': seq.flowcell,
210                     'lib_id': lib_id,
211                     'lane': seq.lane,
212                     'read': seq.read,
213                     'cycle': seq.cycle
214                     }
215                 # skip bad runs
216                 if lane_status == 'Failed':
217                     continue
218                 if seq.flowcell == '30DY0AAXX':
219                     # 30DY0 only ran for 151 bases instead of 152
220                     # it is actually 76 1st read, 75 2nd read
221                     seq.mid_point = 76
222
223                 # end filters
224                 if seq.paired:
225                     target_name = fastq_paired_template % filename_attributes
226                 else:
227                     target_name = fastq_single_template % filename_attributes
228
229                 target_pathname = os.path.join(result_dir, target_name)
230                 if force or not os.path.exists(target_pathname):
231                     t = needed_targets.setdefault(target_pathname, {})
232                     t[seq.filetype] = seq
233
234     return needed_targets
235
236 def link_daf(daf_path, library_result_map):
237     if not os.path.exists(daf_path):
238         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
239
240     base_daf = os.path.basename(daf_path)
241     
242     for lib_id, result_dir in library_result_map:
243         submission_daf = os.path.join(result_dir, base_daf)
244         if not os.path.exists(submission_daf):
245             os.link(daf_path, submission_daf)
246
247 def make_submission_ini(host, apidata, library_result_map, paired=True):
248     # ma is "map algorithm"
249     ma = 'TH1014'
250
251     if paired:
252         aligns = "Paired"
253     else:
254         aligns = "Aligns"
255
256     attributes = {
257       # bam index
258      '.bai':                   {'view': None, 'MapAlgorithm': 'NA'},
259      '.bam':                   {'view': aligns, 'MapAlgorithm': ma},
260      '.splices.bam':           {'view': 'Splices', 'MapAlgorithm': ma},
261      '.jnct':                  {'view': 'Junctions', 'MapAlgorithm': ma},
262      '.plus.bigwig':           {'view': 'PlusSignal', 'MapAlgorithm': ma},
263      '.minus.bigwig':          {'view': 'MinusSignal', 'MapAlgorithm': ma},
264      '.bigwig':                {'view': 'Signal', 'MapAlgorithm': ma},
265      '.tar.bz2':               {'view': None},
266      '.condor':                {'view': None},
267      '.daf':                   {'view': None},
268      '.ddf':                   {'view': None},
269      'denovo.genes.expr':       {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
270      'denovo.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
271      'novel.genes.expr':       {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
272      'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
273      '.genes.expr':            {'view': 'GeneFPKM', 'MapAlgorithm': ma},
274      '.transcripts.expr':      {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
275      '.transcript.expr':       {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
276      '.fastq':                 {'view': 'Fastq', 'MapAlgorithm': 'NA' },
277      '_r1.fastq':              {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
278      '_r2.fastq':              {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
279      '.gtf':                   {'view': 'GeneModel', 'MapAlgorithm': ma},
280      '.ini':                   {'view': None},
281      '.log':                   {'view': None},
282      '.stats.txt':             {'view': 'InsLength', 'MapAlgorithm': ma},
283      '.srf':                   {'view': None},
284      '.wig':                   {'view': None},
285      '.zip':                   {'view': None},
286     }
287    
288     candidate_fastq_src = {}
289
290     for lib_id, result_dir in library_result_map:
291         order_by = ['order_by=files', 'view', 'replicate', 'cell', 
292                     'readType', 'mapAlgorithm', 'insertLength' ]
293         inifile =  ['[config]']
294         inifile += [" ".join(order_by)]
295         inifile += ['']
296         line_counter = 1
297         lib_info = get_library_info(host, apidata, lib_id)
298         result_ini = os.path.join(result_dir, result_dir+'.ini')
299
300         if lib_info['cell_line'].lower() == 'unknown':
301             logging.warn("Library %s missing cell_line" % (lib_id,))
302
303         standard_attributes = {'cell': lib_info['cell_line'],
304                                'replicate': lib_info['replicate'],
305                                }
306         if paired:
307             if lib_info['insert_size'] is None:
308                 errmsg = "Library %s is missing insert_size, assuming 200"
309                 logging.warn(errmsg % (lib_id,))
310                 insert_size = 200
311             else:
312                 insert_size = lib_info['insert_size']
313             standard_attributes['insertLength'] = insert_size
314             standard_attributes['readType'] = '2x75'
315         else:
316             standard_attributes['insertLength'] = 'ilNA'
317             standard_attributes['readType'] = '1x75D'
318
319         # write other lines
320         submission_files = os.listdir(result_dir)
321         fastqs = {}
322         for f in submission_files:
323             best_ext = find_best_extension(attributes, f)
324
325             if best_ext is not None:
326                if attributes[best_ext]['view'] is None:
327                    
328                    continue
329                elif best_ext.endswith('fastq'):
330                    fastqs.setdefault(best_ext, set()).add(f)
331                else:
332                    inifile.extend(
333                        make_submission_section(line_counter,
334                                                [f],
335                                                standard_attributes,
336                                                attributes[best_ext]
337                                                )
338                        )
339                    inifile += ['']
340                    line_counter += 1
341             else:
342                 raise ValueError("Unrecognized file: %s" % (f,))
343
344         # add in fastqs on a single line.
345         for extension, fastq_set in fastqs.items():
346             inifile.extend(
347                 make_submission_section(line_counter, 
348                                         fastq_set,
349                                         standard_attributes,
350                                         attributes[extension])
351             )
352             inifile += ['']
353             line_counter += 1
354             
355         f = open(result_ini,'w')
356         f.write(os.linesep.join(inifile))
357
358 def make_lane_dict(lib_db, lib_id):
359     """
360     Convert the lane_set in a lib_db to a dictionary
361     indexed by flowcell ID
362     """
363     result = []
364     for lane in lib_db[lib_id]['lane_set']:
365         result.append((lane['flowcell'], lane))
366     return dict(result)
367
368 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
369     dag_fragment = []
370     for lib_id, result_dir in library_result_map:
371         ininame = result_dir+'.ini'
372         inipathname = os.path.join(result_dir, ininame)
373         if os.path.exists(inipathname):
374             dag_fragment.extend(
375                 make_ddf(ininame, daf_name, True, make_condor, result_dir)
376             )
377
378     if make_condor and len(dag_fragment) > 0:
379         dag_filename = 'submission.dagman'
380         if os.path.exists(dag_filename):
381             logging.warn("%s exists, please delete" % (dag_filename,))
382         else:
383             f = open(dag_filename,'w')
384             f.write( os.linesep.join(dag_fragment))
385             f.write( os.linesep )
386             f.close()
387             
388
389 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
390     """
391     Make ddf files, and bonus condor file
392     """
393     dag_fragments = []
394     curdir = os.getcwd()
395     if outdir is not None:
396         os.chdir(outdir)
397     output = sys.stdout
398     ddf_name = None
399     if guess_ddf:
400         ddf_name = make_ddf_name(ininame)
401         print ddf_name
402         output = open(ddf_name,'w')
403
404     file_list = read_ddf_ini(ininame, output)
405
406     file_list.append(daf_name)
407     if ddf_name is not None:
408         file_list.append(ddf_name)
409
410     if make_condor:
411         archive_condor = make_condor_archive_script(ininame, file_list)
412         upload_condor = make_condor_upload_script(ininame)
413         
414         dag_fragments.extend( 
415             make_dag_fragment(ininame, archive_condor, upload_condor)
416         ) 
417         
418     os.chdir(curdir)
419     
420     return dag_fragments
421
422 def read_ddf_ini(filename, output=sys.stdout):
423     """
424     Read a ini file and dump out a tab delmited text file
425     """
426     file_list = []
427     config = SafeConfigParser()
428     config.read(filename)
429
430     order_by = shlex.split(config.get("config", "order_by"))
431
432     output.write("\t".join(order_by))
433     output.write(os.linesep)
434     sections = config.sections()
435     sections.sort()
436     for section in sections:
437         if section == "config":
438             # skip the config block
439             continue
440         values = []
441         for key in order_by:
442             v = config.get(section, key)
443             values.append(v)
444             if key == 'files':
445                 file_list.extend(parse_filelist(v))
446                 
447         output.write("\t".join(values))
448         output.write(os.linesep)
449     return file_list
450     
451 def read_library_result_map(filename):
452     """
453     Read a file that maps library id to result directory.
454     Does not support spaces in filenames. 
455     
456     For example:
457       10000 result/foo/bar
458     """
459     stream = open(filename,'r')
460
461     results = []
462     for line in stream:
463         line = line.rstrip()
464         if not line.startswith('#') and len(line) > 0 :
465             library_id, result_dir = line.split()
466             results.append((library_id, result_dir))
467     return results
468             
469 def make_condor_archive_script(ininame, files):
470     script = """Universe = vanilla
471
472 Executable = /bin/tar
473 arguments = czvf ../%(archivename)s %(filelist)s
474
475 Error = compress.err.$(Process).log
476 Output = compress.out.$(Process).log
477 Log = /tmp/submission-compress.log
478 initialdir = %(initialdir)s
479
480 queue 
481 """
482     for f in files:
483         if not os.path.exists(f):
484             raise RuntimeError("Missing %s" % (f,))
485
486     context = {'archivename': make_submission_name(ininame),
487                'filelist': " ".join(files),
488                'initialdir': os.getcwd()}
489
490     condor_script = make_condor_name(ininame, 'archive')
491     condor_stream = open(condor_script,'w')
492     condor_stream.write(script % context)
493     condor_stream.close()
494     return condor_script
495
496 def make_condor_upload_script(ininame):
497     script = """Universe = vanilla
498
499 Executable = /usr/bin/lftp
500 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
501
502 Error = upload.err.$(Process).log
503 Output = upload.out.$(Process).log
504 Log = /tmp/submission-upload.log
505 initialdir = %(initialdir)s
506
507 queue 
508 """
509     context = {'archivename': make_submission_name(ininame),
510                'initialdir': os.getcwd()}
511
512     condor_script = make_condor_name(ininame, 'upload')
513     condor_stream = open(condor_script,'w')
514     condor_stream.write(script % context)
515     condor_stream.close()
516     return condor_script
517
518 def make_dag_fragment(ininame, archive_condor, upload_condor):
519     """
520     Make the couple of fragments compress and then upload the data.
521     """
522     cur_dir = os.getcwd()
523     archive_condor = os.path.join(cur_dir, archive_condor)
524     upload_condor = os.path.join(cur_dir, upload_condor)
525     job_basename = make_base_name(ininame)
526
527     fragments = []
528     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
529     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
530     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
531
532     return fragments
533
534 def get_library_info(host, apidata, library_id):
535     url = api.library_url(host, library_id)
536     contents = api.retrieve_info(url, apidata)
537     return contents
538
539 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
540                         mid=None, force=False):
541     args = [ srf_file, ]
542     if paired:
543         args.extend(['--left', target_pathname])
544         # this is ugly. I did it because I was pregenerating the target
545         # names before I tried to figure out what sources could generate
546         # those targets, and everything up to this point had been
547         # one-to-one. So I couldn't figure out how to pair the 
548         # target names. 
549         # With this at least the command will run correctly.
550         # however if we rename the default targets, this'll break
551         # also I think it'll generate it twice.
552         args.extend(['--right', 
553                      target_pathname.replace('_r1.fastq', '_r2.fastq')])
554     else:
555         args.extend(['--single', target_pathname ])
556     if flowcell is not None:
557         args.extend(['--flowcell', flowcell])
558
559     if mid is not None:
560         args.extend(['-m', str(mid)])
561
562     if force:
563         args.extend(['--force'])
564
565     script = """
566 arguments="%s"
567 queue
568 """ % (" ".join(args),)
569     
570     return  script 
571
572 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
573     args = ['-i', qseq_file, '-o', target_pathname ]
574     if flowcell is not None:
575         args.extend(['-f', flowcell])
576     script = """
577 arguments="%s"
578 queue
579 """ % (" ".join(args))
580
581     return script 
582
583 def find_archive_sequence_files(host, apidata, sequences_path, 
584                                 library_result_map):
585     """
586     Find all the archive sequence files possibly associated with our results.
587
588     """
589     logging.debug("Searching for sequence files in: %s" %(sequences_path,))
590
591     lib_db = {}
592     seq_dirs = set()
593     #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
594     candidate_lanes = {}
595     for lib_id, result_dir in library_result_map:
596         lib_info = get_library_info(host, apidata, lib_id)
597         lib_db[lib_id] = lib_info
598
599         for lane in lib_info['lane_set']:
600             lane_key = (lane['flowcell'], lane['lane_number'])
601             candidate_lanes[lane_key] = lib_id
602             seq_dirs.add(os.path.join(sequences_path, 
603                                          'flowcells', 
604                                          lane['flowcell']))
605     logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
606     candidate_seq_list = scan_for_sequences(seq_dirs)
607
608     # at this point we have too many sequences as scan_for_sequences
609     # returns all the sequences in a flowcell directory
610     # so lets filter out the extras
611     
612     for seq in candidate_seq_list:
613         lane_key = (seq.flowcell, seq.lane)
614         lib_id = candidate_lanes.get(lane_key, None)
615         if lib_id is not None:
616             lib_info = lib_db[lib_id]
617             lanes = lib_info.setdefault('lanes', {})
618             lanes.setdefault(lane_key, set()).add(seq)
619     
620     return lib_db
621
622 def find_best_extension(extension_map, filename):
623     """
624     Search through extension_map looking for the best extension
625     The 'best' is the longest match
626
627     :Args:
628       extension_map (dict): '.ext' -> { 'view': 'name' or None }
629       filename (str): the filename whose extention we are about to examine
630     """
631     best_ext = None
632     path, last_ext = os.path.splitext(filename)
633
634     for ext in extension_map.keys():
635         if filename.endswith(ext):
636             if best_ext is None:
637                 best_ext = ext
638             elif len(ext) > len(best_ext):
639                 best_ext = ext
640     return best_ext
641     
642 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
643     """
644     Create a section in the submission ini file
645     """
646     inifile = [ '[line%s]' % (line_counter,) ]
647     inifile += ["files=%s" % (",".join(files))]
648     cur_attributes = {}
649     cur_attributes.update(standard_attributes)
650     cur_attributes.update(file_attributes)
651     
652     for k,v in cur_attributes.items():
653         inifile += ["%s=%s" % (k,v)]
654     return inifile
655
656 def make_base_name(pathname):
657     base = os.path.basename(pathname)
658     name, ext = os.path.splitext(base)
659     return name
660
661 def make_submission_name(ininame):
662     name = make_base_name(ininame)
663     return name + '.tgz'
664
665 def make_ddf_name(pathname):
666     name = make_base_name(pathname)
667     return name + '.ddf'
668
669 def make_condor_name(pathname, run_type=None):
670     name = make_base_name(pathname)
671     elements = [name]
672     if run_type is not None:
673         elements.append(run_type)
674     elements.append('condor')
675     return ".".join(elements)
676     
677 def make_submit_script(target, header, body_list):
678     """
679     write out a text file
680
681     this was intended for condor submit scripts
682     
683     Args:
684       target (str or stream): 
685         if target is a string, we will open and close the file
686         if target is a stream, the caller is responsible.
687
688       header (str);
689         header to write at the beginning of the file
690       body_list (list of strs):
691         a list of blocks to add to the file.
692     """
693     if type(target) in types.StringTypes:
694         f = open(target,'w')
695     else:
696         f = target
697     f.write(header)
698     for entry in body_list:
699         f.write(entry)
700     if type(target) in types.StringTypes:
701         f.close()
702
703 def parse_filelist(file_string):
704     return file_string.split(',')
705
706 def validate_filelist(files):
707     """
708     Die if a file doesn't exist in a file list
709     """
710     for f in files:
711         if not os.path.exists(f):
712             raise RuntimeError("%s does not exist" % (f,))
713         
714 if __name__ == "__main__":
715     main()