Use the htsworkflow API to determine if a flowcell is paired end or not.
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 from glob import glob
4 import json
5 import logging
6 from optparse import OptionParser
7 import os
8 from pprint import pprint, pformat
9 import shlex
10 from StringIO import StringIO
11 import time
12 import sys
13 import types
14 import urllib
15 import urllib2
16 import urlparse
17
18
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21     create_sequence_table, \
22     scan_for_sequences
23
24
25 def main(cmdline=None):
26     parser = make_parser()
27     opts, args = parser.parse_args(cmdline)
28     
29     if opts.debug:
30         logging.basicConfig(level = logging.DEBUG )
31     elif opts.verbose:
32         logging.basicConfig(level = logging.INFO )
33     else:
34         logging.basicConfig(level = logging.WARNING )
35         
36     
37     apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
38
39     if opts.host is None or opts.apiid is None or opts.apikey is None:
40         parser.error("Please specify host url, apiid, apikey")
41
42     if len(args) == 0:
43         parser.error("I need at least one library submission-dir input file")
44         
45     library_result_map = []
46     for a in args:
47         library_result_map.extend(read_library_result_map(a))
48
49     if opts.daf is not None:
50         link_daf(opts.daf, library_result_map)
51
52     if opts.fastq:
53         build_fastqs(opts.host, 
54                      apidata, 
55                      opts.sequence, 
56                      library_result_map,
57                      force=opts.force)
58
59     if opts.ini:
60         make_submission_ini(opts.host, apidata, library_result_map)
61
62     if opts.makeddf:
63         make_all_ddfs(library_result_map, opts.daf)
64
65 def make_parser():
66     # Load defaults from the config files
67     config = SafeConfigParser()
68     config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
69     
70     sequence_archive = None
71     apiid = None
72     apikey = None
73     apihost = None
74     SECTION = 'sequence_archive'
75     if config.has_section(SECTION):
76         sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
77         sequence_archive = os.path.expanduser(sequence_archive)
78         apiid = config.get(SECTION, 'apiid', apiid)
79         apikey = config.get(SECTION, 'apikey', apikey)
80         apihost = config.get(SECTION, 'host', apihost)
81
82     parser = OptionParser()
83
84     # commands
85     parser.add_option('--fastq', help="generate scripts for making fastq files",
86                       default=False, action="store_true")
87
88     parser.add_option('--ini', help="generate submission ini file", default=False,
89                       action="store_true")
90
91     parser.add_option('--makeddf', help='make the ddfs', default=False,
92                       action="store_true")
93     
94     parser.add_option('--daf', default=None, help='specify daf name')
95     parser.add_option('--force', default=False, action="store_true",
96                       help="Force regenerating fastqs")
97
98     # configuration options
99     parser.add_option('--apiid', default=apiid, help="Specify API ID")
100     parser.add_option('--apikey', default=apikey, help="Specify API KEY")
101     parser.add_option('--host',  default=apihost,
102                       help="specify HTSWorkflow host",)
103     parser.add_option('--sequence', default=sequence_archive,
104                       help="sequence repository")
105
106     # debugging
107     parser.add_option('--verbose', default=False, action="store_true",
108                       help='verbose logging')
109     parser.add_option('--debug', default=False, action="store_true",
110                       help='debug logging')
111
112     return parser
113
114 def build_fastqs(host, apidata, sequences_path, library_result_map, 
115                  force=False ):
116     """
117     Generate condor scripts to build any needed fastq files
118     
119     Args:
120       host (str): root of the htsworkflow api server
121       apidata (dict): id & key to post to the server
122       sequences_path (str): root of the directory tree to scan for files
123       library_result_map (list):  [(library_id, destination directory), ...]
124     """
125     qseq_condor_header = """
126 Universe=vanilla
127 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
128 error=log/qseq2fastq.err.$(process).log
129 output=log/qseq2fastq.out.$(process).log
130 log=log/qseq2fastq.log
131
132 """
133     qseq_condor_entries = []
134     srf_condor_header = """
135 Universe=vanilla
136 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2named_fastq.py
137 output=log/srf_pair_fastq.out.$(process).log
138 error=log/srf_pair_fastq.err.$(process).log
139 log=log/srf_pair_fastq.log
140 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
141
142 """
143     srf_condor_entries = []
144     fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
145     fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
146     lib_db = find_archive_sequence_files(host, 
147                                          apidata, 
148                                          sequences_path, 
149                                          library_result_map)
150
151     # find what targets we're missing
152     needed_targets = {}
153     for lib_id, result_dir in library_result_map:
154         lib = lib_db[lib_id]
155         lane_dict = make_lane_dict(lib_db, lib_id)
156
157         for lane_key, sequences in lib['lanes'].items():
158             for seq in sequences:
159                 paired = lane_dict[seq.flowcell]['paired_end']
160                 if paired and seq.read is None:
161                     seq.read = 1
162                 filename_attributes = { 
163                     'flowcell': seq.flowcell,
164                     'lib_id': lib_id,
165                     'lane': seq.lane,
166                     'read': seq.read,
167                     'cycle': seq.cycle
168                     }
169                 # throw out test runs
170                 # FIXME: this should probably be configurable
171                 if seq.cycle < 50:
172                     continue
173                 if seq.flowcell == '30CUUAAXX':
174                     # 30CUUAAXX run sucked
175                     continue
176                 if seq.flowcell == '30DY0AAXX':
177                     # 30DY0 only ran for 151 bases instead of 152
178                     # it is actually 76 1st read, 75 2nd read
179                     seq.mid_point = 76
180
181                 # end filters
182                 if paired:
183                     target_name = fastq_paired_template % filename_attributes
184                 else:
185                     target_name = fastq_single_template % filename_attributes
186
187                 target_pathname = os.path.join(result_dir, target_name)
188                 if force or not os.path.exists(target_pathname):
189                     t = needed_targets.setdefault(target_pathname, {})
190                     t[seq.filetype] = seq
191
192     for target_pathname, available_sources in needed_targets.items():
193         logging.debug(' target : %s' % (target_pathname,))
194         logging.debug(' candidate sources: %s' % (available_sources,))
195         if available_sources.has_key('qseq'):
196             source = available_sources['qseq']
197             qseq_condor_entries.append(
198                 condor_qseq_to_fastq(source.path, 
199                                      target_pathname, 
200                                      source.flowcell,
201                                      force=force)
202             )
203         elif available_sources.has_key('srf'):
204             source = available_sources['srf']
205             mid = getattr(source, 'mid_point', None)
206             srf_condor_entries.append(
207                 condor_srf_to_fastq(source.path, 
208                                     target_pathname,
209                                     paired,
210                                     source.flowcell,
211                                     mid,
212                                     force=force)
213             )
214         else:
215             print " need file", target_pathname
216
217     if len(srf_condor_entries) > 0:
218         make_submit_script('srf.fastq.condor', 
219                            srf_condor_header,
220                            srf_condor_entries)
221
222     if len(qseq_condor_entries) > 0:
223         make_submit_script('qseq.fastq.condor', 
224                            qseq_condor_header,
225                            qseq_condor_entries)
226
227 def link_daf(daf_path, library_result_map):
228     if not os.path.exists(daf_path):
229         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
230
231     base_daf = os.path.basename(daf_path)
232     
233     for lib_id, result_dir in library_result_map:
234         submission_daf = os.path.join(result_dir, base_daf)
235         if not os.path.exists(submission_daf):
236             os.link(daf_path, submission_daf)
237
238 def make_submission_ini(host, apidata, library_result_map, paired=True):
239     # ma is "map algorithm"
240     ma = 'TH1014'
241
242     if paired:
243         aligns = "Paired"
244     else:
245         aligns = "Aligns"
246
247     attributes = {
248       # bam index
249      '.bai':                   {'view': None, 'MapAlgorithm': 'NA'},
250      '.bam':                   {'view': aligns, 'MapAlgorithm': ma},
251      '.splices.bam':           {'view': 'Splices', 'MapAlgorithm': ma},
252      '.jnct':                  {'view': 'Junctions', 'MapAlgorithm': ma},
253      '.plus.bigwig':           {'view': 'PlusSignal', 'MapAlgorithm': ma},
254      '.minus.bigwig':          {'view': 'MinusSignal', 'MapAlgorithm': ma},
255      '.bigwig':                {'view': 'Signal', 'MapAlgorithm': ma},
256      '.tar.bz2':               {'view': None},
257      '.condor':                {'view': None},
258      '.daf':                   {'view': None},
259      '.ddf':                   {'view': None},
260      'novel.genes.expr':       {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
261      'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
262      '.genes.expr':            {'view': 'GeneFPKM', 'MapAlgorithm': ma},
263      '.transcripts.expr':      {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
264      '.fastq':                 {'view': 'Fastq', 'MapAlgorithm': 'NA' },
265      '_r1.fastq':              {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
266      '_r2.fastq':              {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
267      '.gtf':                   {'view': 'GeneModel', 'MapAlgorithm': ma},
268      '.ini':                   {'view': None},
269      '.log':                   {'view': None},
270      '.stats.txt':             {'view': 'InsLength', 'MapAlgorithm': ma},
271      '.srf':                   {'view': None},
272      '.wig':                   {'view': None},
273      '.zip':                   {'view': None},
274     }
275    
276     candidate_fastq_src = {}
277
278     for lib_id, result_dir in library_result_map:
279         order_by = ['order_by=files', 'view', 'replicate', 'cell', 
280                     'readType', 'mapAlgorithm', 'insertLength' ]
281         inifile =  ['[config]']
282         inifile += [" ".join(order_by)]
283         inifile += ['']
284         line_counter = 1
285         lib_info = get_library_info(host, apidata, lib_id)
286         result_ini = os.path.join(result_dir, result_dir+'.ini')
287
288         if lib_info['cell_line'].lower() == 'unknown':
289             logging.warn("Library %s missing cell_line" % (lib_id,))
290
291         standard_attributes = {'cell': lib_info['cell_line'],
292                                'replicate': lib_info['replicate'],
293                                }
294         if paired:
295             if lib_info['insert_size'] is None:
296                 errmsg = "Library %s is missing insert_size, assuming 200"
297                 logging.warn(errmsg % (lib_id,))
298                 insert_size = 200
299             else:
300                 insert_size = lib_info['insert_size']
301             standard_attributes['insertLength'] = insert_size
302             standard_attributes['readType'] = '2x75'
303         else:
304             standard_attributes['insertLength'] = 'ilNA'
305             standard_attributes['readType'] = '1x75D'
306
307         # write other lines
308         submission_files = os.listdir(result_dir)
309         fastqs = {}
310         for f in submission_files:
311             best_ext = find_best_extension(attributes, f)
312
313             if best_ext is not None:
314                if attributes[best_ext]['view'] is None:
315                    
316                    continue
317                elif best_ext.endswith('fastq'):
318                    fastqs.setdefault(best_ext, set()).add(f)
319                else:
320                    inifile.extend(
321                        make_submission_section(line_counter,
322                                                [f],
323                                                standard_attributes,
324                                                attributes[best_ext]
325                                                )
326                        )
327                    inifile += ['']
328                    line_counter += 1
329             else:
330                 raise ValueError("Unrecognized file: %s" % (f,))
331
332         # add in fastqs on a single line.
333         for extension, fastq_set in fastqs.items():
334             inifile.extend(
335                 make_submission_section(line_counter, 
336                                         fastq_set,
337                                         standard_attributes,
338                                         attributes[extension])
339             )
340             inifile += ['']
341             line_counter += 1
342             
343         f = open(result_ini,'w')
344         f.write(os.linesep.join(inifile))
345
346 def make_lane_dict(lib_db, lib_id):
347     """
348     Convert the lane_set in a lib_db to a dictionary
349     indexed by flowcell ID
350     """
351     result = []
352     for lane in lib_db[lib_id]['lane_set']:
353         result.append((lane['flowcell'], lane))
354     return dict(result)
355
356 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
357     dag_fragment = []
358     for lib_id, result_dir in library_result_map:
359         ininame = result_dir+'.ini'
360         inipathname = os.path.join(result_dir, ininame)
361         if os.path.exists(inipathname):
362             dag_fragment.extend(
363                 make_ddf(ininame, daf_name, True, make_condor, result_dir)
364             )
365
366     if make_condor and len(dag_fragment) > 0:
367         dag_filename = 'submission.dagman'
368         if os.path.exists(dag_filename):
369             logging.warn("%s exists, please delete" % (dag_filename,))
370         else:
371             f = open(dag_filename,'w')
372             f.write( os.linesep.join(dag_fragment))
373             f.write( os.linesep )
374             f.close()
375             
376
377 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
378     """
379     Make ddf files, and bonus condor file
380     """
381     dag_fragments = []
382     curdir = os.getcwd()
383     if outdir is not None:
384         os.chdir(outdir)
385     output = sys.stdout
386     ddf_name = None
387     if guess_ddf:
388         ddf_name = make_ddf_name(ininame)
389         print ddf_name
390         output = open(ddf_name,'w')
391
392     file_list = read_ddf_ini(ininame, output)
393
394     file_list.append(daf_name)
395     if ddf_name is not None:
396         file_list.append(ddf_name)
397
398     if make_condor:
399         archive_condor = make_condor_archive_script(ininame, file_list)
400         upload_condor = make_condor_upload_script(ininame)
401         
402         dag_fragments.extend( 
403             make_dag_fragment(ininame, archive_condor, upload_condor)
404         ) 
405         
406     os.chdir(curdir)
407     
408     return dag_fragments
409
410 def read_ddf_ini(filename, output=sys.stdout):
411     """
412     Read a ini file and dump out a tab delmited text file
413     """
414     file_list = []
415     config = SafeConfigParser()
416     config.read(filename)
417
418     order_by = shlex.split(config.get("config", "order_by"))
419
420     output.write("\t".join(order_by))
421     output.write(os.linesep)
422     sections = config.sections()
423     sections.sort()
424     for section in sections:
425         if section == "config":
426             # skip the config block
427             continue
428         values = []
429         for key in order_by:
430             v = config.get(section, key)
431             values.append(v)
432             if key == 'files':
433                 file_list.extend(parse_filelist(v))
434                 
435         output.write("\t".join(values))
436         output.write(os.linesep)
437     return file_list
438     
439 def read_library_result_map(filename):
440     """
441     Read a file that maps library id to result directory.
442     Does not support spaces in filenames. 
443     
444     For example:
445       10000 result/foo/bar
446     """
447     stream = open(filename,'r')
448
449     results = []
450     for line in stream:
451         if not line.startswith('#'):
452             library_id, result_dir = line.strip().split()
453             results.append((library_id, result_dir))
454     return results
455             
456 def make_condor_archive_script(ininame, files):
457     script = """Universe = vanilla
458
459 Executable = /bin/tar
460 arguments = czvf ../%(archivename)s %(filelist)s
461
462 Error = compress.err.$(Process).log
463 Output = compress.out.$(Process).log
464 Log = /tmp/submission-compress.log
465 initialdir = %(initialdir)s
466
467 queue 
468 """
469     for f in files:
470         if not os.path.exists(f):
471             raise RuntimeError("Missing %s" % (f,))
472
473     context = {'archivename': make_submission_name(ininame),
474                'filelist': " ".join(files),
475                'initialdir': os.getcwd()}
476
477     condor_script = make_condor_name(ininame, 'archive')
478     condor_stream = open(condor_script,'w')
479     condor_stream.write(script % context)
480     condor_stream.close()
481     return condor_script
482
483 def make_condor_upload_script(ininame):
484     script = """Universe = vanilla
485
486 Executable = /usr/bin/lftp
487 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
488
489 Error = upload.err.$(Process).log
490 Output = upload.out.$(Process).log
491 Log = /tmp/submission-upload.log
492 initialdir = %(initialdir)s
493
494 queue 
495 """
496     context = {'archivename': make_submission_name(ininame),
497                'initialdir': os.getcwd()}
498
499     condor_script = make_condor_name(ininame, 'upload')
500     condor_stream = open(condor_script,'w')
501     condor_stream.write(script % context)
502     condor_stream.close()
503     return condor_script
504
505 def make_dag_fragment(ininame, archive_condor, upload_condor):
506     """
507     Make the couple of fragments compress and then upload the data.
508     """
509     cur_dir = os.getcwd()
510     archive_condor = os.path.join(cur_dir, archive_condor)
511     upload_condor = os.path.join(cur_dir, upload_condor)
512     job_basename = make_base_name(ininame)
513
514     fragments = []
515     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
516     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
517     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
518
519     return fragments
520
521 def get_library_info(host, apidata, library_id):
522     url = api.library_url(host, library_id)
523     contents = api.retrieve_info(url, apidata)
524     return contents
525
526 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
527                         mid=None, force=False):
528     args = [ srf_file, ]
529     if paired:
530         args.extend(['--left', target_pathname])
531         # this is ugly. I did it because I was pregenerating the target
532         # names before I tried to figure out what sources could generate
533         # those targets, and everything up to this point had been
534         # one-to-one. So I couldn't figure out how to pair the 
535         # target names. 
536         # With this at least the command will run correctly.
537         # however if we rename the default targets, this'll break
538         # also I think it'll generate it twice.
539         args.extend(['--right', 
540                      target_pathname.replace('_r1.fastq', '_r2.fastq')])
541     else:
542         args.extend(['--single', target_pathname ])
543     if flowcell is not None:
544         args.extend(['--flowcell', flowcell])
545
546     if mid is not None:
547         args.extend(['-m', str(mid)])
548
549     if force:
550         args.extend(['--force'])
551
552     script = """
553 arguments="%s"
554 queue
555 """ % (" ".join(args),)
556     
557     return  script 
558
559 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
560     args = ['-i', qseq_file, '-o', target_pathname ]
561     if flowcell is not None:
562         args.extend(['-f', flowcell])
563     script = """
564 arguments="%s"
565 queue
566 """ % (" ".join(args))
567
568     return script 
569
570 def find_archive_sequence_files(host, apidata, sequences_path, 
571                                 library_result_map):
572     """
573     Find all the archive sequence files possibly associated with our results.
574
575     """
576     logging.debug("Searching for sequence files in: %s" %(sequences_path,))
577
578     lib_db = {}
579     seq_dirs = set()
580     #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
581     candidate_lanes = {}
582     for lib_id, result_dir in library_result_map:
583         lib_info = get_library_info(host, apidata, lib_id)
584         lib_db[lib_id] = lib_info
585
586         for lane in lib_info['lane_set']:
587             lane_key = (lane['flowcell'], lane['lane_number'])
588             candidate_lanes[lane_key] = lib_id
589             seq_dirs.add(os.path.join(sequences_path, 
590                                          'flowcells', 
591                                          lane['flowcell']))
592     logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
593     candidate_seq_list = scan_for_sequences(seq_dirs)
594
595     # at this point we have too many sequences as scan_for_sequences
596     # returns all the sequences in a flowcell directory
597     # so lets filter out the extras
598     
599     for seq in candidate_seq_list:
600         lane_key = (seq.flowcell, seq.lane)
601         lib_id = candidate_lanes.get(lane_key, None)
602         if lib_id is not None:
603             lib_info = lib_db[lib_id]
604             lanes = lib_info.setdefault('lanes', {})
605             lanes.setdefault(lane_key, set()).add(seq)
606     
607     return lib_db
608
609 def find_best_extension(extension_map, filename):
610     """
611     Search through extension_map looking for the best extension
612     The 'best' is the longest match
613
614     :Args:
615       extension_map (dict): '.ext' -> { 'view': 'name' or None }
616       filename (str): the filename whose extention we are about to examine
617     """
618     best_ext = None
619     path, last_ext = os.path.splitext(filename)
620
621     for ext in extension_map.keys():
622         if filename.endswith(ext):
623             if best_ext is None:
624                 best_ext = ext
625             elif len(ext) > len(best_ext):
626                 best_ext = ext
627     return best_ext
628     
629 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
630     """
631     Create a section in the submission ini file
632     """
633     inifile = [ '[line%s]' % (line_counter,) ]
634     inifile += ["files=%s" % (",".join(files))]
635     cur_attributes = {}
636     cur_attributes.update(standard_attributes)
637     cur_attributes.update(file_attributes)
638     
639     for k,v in cur_attributes.items():
640         inifile += ["%s=%s" % (k,v)]
641     return inifile
642
643 def make_base_name(pathname):
644     base = os.path.basename(pathname)
645     name, ext = os.path.splitext(base)
646     return name
647
648 def make_submission_name(ininame):
649     name = make_base_name(ininame)
650     return name + '.tgz'
651
652 def make_ddf_name(pathname):
653     name = make_base_name(pathname)
654     return name + '.ddf'
655
656 def make_condor_name(pathname, run_type=None):
657     name = make_base_name(pathname)
658     elements = [name]
659     if run_type is not None:
660         elements.append(run_type)
661     elements.append('condor')
662     return ".".join(elements)
663     
664 def make_submit_script(target, header, body_list):
665     """
666     write out a text file
667
668     this was intended for condor submit scripts
669     
670     Args:
671       target (str or stream): 
672         if target is a string, we will open and close the file
673         if target is a stream, the caller is responsible.
674
675       header (str);
676         header to write at the beginning of the file
677       body_list (list of strs):
678         a list of blocks to add to the file.
679     """
680     if type(target) in types.StringTypes:
681         f = open(target,'w')
682     else:
683         f = target
684     f.write(header)
685     for entry in body_list:
686         f.write(entry)
687     if type(target) in types.StringTypes:
688         f.close()
689
690 def parse_filelist(file_string):
691     return file_string.split(',')
692
693 def validate_filelist(files):
694     """
695     Die if a file doesn't exist in a file list
696     """
697     for f in files:
698         if not os.path.exists(f):
699             raise RuntimeError("%s does not exist" % (f,))
700         
701 if __name__ == "__main__":
702     main()