b21353a12b432621d4f7b53ea0fb76f566b73c26
[htsworkflow.git] / extra / ucsc_encode_submission / ucsc_gather.py
1 #!/usr/bin/env python
2 from ConfigParser import SafeConfigParser
3 from glob import glob
4 import json
5 import logging
6 from optparse import OptionParser
7 import os
8 from pprint import pprint, pformat
9 import shlex
10 from StringIO import StringIO
11 import time
12 import sys
13 import types
14 import urllib
15 import urllib2
16 import urlparse
17
18
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21     create_sequence_table, \
22     scan_for_sequences
23
24
25 def main(cmdline=None):
26     parser = make_parser()
27     opts, args = parser.parse_args(cmdline)
28     
29     if opts.debug:
30         logging.basicConfig(level = logging.DEBUG )
31     elif opts.verbose:
32         logging.basicConfig(level = logging.INFO )
33     else:
34         logging.basicConfig(level = logging.WARNING )
35         
36     
37     apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
38
39     if opts.host is None or opts.apiid is None or opts.apikey is None:
40         parser.error("Please specify host url, apiid, apikey")
41
42     if len(args) == 0:
43         parser.error("I need at least one library submission-dir input file")
44         
45     library_result_map = []
46     for a in args:
47         library_result_map.extend(read_library_result_map(a))
48
49     if opts.daf is not None:
50         link_daf(opts.daf, library_result_map)
51
52     if opts.fastq:
53         build_fastqs(opts.host, 
54                      apidata, 
55                      opts.sequence, 
56                      library_result_map,
57                      not opts.single,
58                      force=opts.force)
59
60     if opts.ini:
61         make_submission_ini(opts.host, apidata, library_result_map, not opts.single)
62
63     if opts.makeddf:
64         make_all_ddfs(library_result_map, opts.daf)
65
66 def make_parser():
67     # Load defaults from the config files
68     config = SafeConfigParser()
69     config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
70     
71     sequence_archive = None
72     apiid = None
73     apikey = None
74     apihost = None
75     SECTION = 'sequence_archive'
76     if config.has_section(SECTION):
77         sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
78         sequence_archive = os.path.expanduser(sequence_archive)
79         apiid = config.get(SECTION, 'apiid', apiid)
80         apikey = config.get(SECTION, 'apikey', apikey)
81         apihost = config.get(SECTION, 'host', apihost)
82
83     parser = OptionParser()
84
85     # commands
86     parser.add_option('--fastq', help="generate scripts for making fastq files",
87                       default=False, action="store_true")
88
89     parser.add_option('--ini', help="generate submission ini file", default=False,
90                       action="store_true")
91
92     parser.add_option('--makeddf', help='make the ddfs', default=False,
93                       action="store_true")
94     
95     parser.add_option('--daf', default=None, help='specify daf name')
96     parser.add_option('--force', default=False, action="store_true",
97                       help="Force regenerating fastqs")
98
99     # configuration options
100     parser.add_option('--apiid', default=apiid, help="Specify API ID")
101     parser.add_option('--apikey', default=apikey, help="Specify API KEY")
102     parser.add_option('--host',  default=apihost,
103                       help="specify HTSWorkflow host",)
104     parser.add_option('--sequence', default=sequence_archive,
105                       help="sequence repository")
106     parser.add_option('--single', default=False, action="store_true", 
107                       help="treat the sequences as single ended runs")
108
109     # debugging
110     parser.add_option('--verbose', default=False, action="store_true",
111                       help='verbose logging')
112     parser.add_option('--debug', default=False, action="store_true",
113                       help='debug logging')
114
115     return parser
116
117 def build_fastqs(host, apidata, sequences_path, library_result_map, 
118                  paired=True, force=False ):
119     """
120     Generate condor scripts to build any needed fastq files
121     
122     Args:
123       host (str): root of the htsworkflow api server
124       apidata (dict): id & key to post to the server
125       sequences_path (str): root of the directory tree to scan for files
126       library_result_map (list):  [(library_id, destination directory), ...]
127       paired: should we assume that we are processing paired end records?
128               if False, we will treat this as single ended.
129     """
130     qseq_condor_header = """
131 Universe=vanilla
132 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
133 error=log/qseq2fastq.err.$(process).log
134 output=log/qseq2fastq.out.$(process).log
135 log=log/qseq2fastq.log
136
137 """
138     qseq_condor_entries = []
139     srf_condor_header = """
140 Universe=vanilla
141 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2named_fastq.py
142 output=log/srf_pair_fastq.out.$(process).log
143 error=log/srf_pair_fastq.err.$(process).log
144 log=log/srf_pair_fastq.log
145 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
146
147 """
148     srf_condor_entries = []
149     fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
150     fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
151     lib_db = find_archive_sequence_files(host, 
152                                          apidata, 
153                                          sequences_path, 
154                                          library_result_map)
155
156     # find what targets we're missing
157     needed_targets = {}
158     for lib_id, result_dir in library_result_map:
159         lib = lib_db[lib_id]
160         for lane_key, sequences in lib['lanes'].items():
161             for seq in sequences:
162                 if paired and seq.read is None:
163                     seq.read = 1
164                 filename_attributes = { 
165                     'flowcell': seq.flowcell,
166                     'lib_id': lib_id,
167                     'lane': seq.lane,
168                     'read': seq.read,
169                     'cycle': seq.cycle
170                     }
171                 # throw out test runs
172                 # FIXME: this should probably be configurable
173                 if seq.cycle < 50:
174                     continue
175                 if seq.flowcell == '30CUUAAXX':
176                     # 30CUUAAXX run sucked
177                     continue
178                 if seq.flowcell == '30DY0AAXX':
179                     # 30DY0 only ran for 151 bases instead of 152
180                     # it is actually 76 1st read, 75 2nd read
181                     seq.mid_point = 76
182
183                 # end filters
184                 if paired:
185                     target_name = fastq_paired_template % filename_attributes
186                 else:
187                     target_name = fastq_single_template % filename_attributes
188
189                 target_pathname = os.path.join(result_dir, target_name)
190                 if force or not os.path.exists(target_pathname):
191                     t = needed_targets.setdefault(target_pathname, {})
192                     t[seq.filetype] = seq
193
194     for target_pathname, available_sources in needed_targets.items():
195         logging.debug(' target : %s' % (target_pathname,))
196         logging.debug(' candidate sources: %s' % (available_sources,))
197         if available_sources.has_key('qseq'):
198             source = available_sources['qseq']
199             qseq_condor_entries.append(
200                 condor_qseq_to_fastq(source.path, 
201                                      target_pathname, 
202                                      source.flowcell,
203                                      force=force)
204             )
205         elif available_sources.has_key('srf'):
206             source = available_sources['srf']
207             mid = getattr(source, 'mid_point', None)
208             srf_condor_entries.append(
209                 condor_srf_to_fastq(source.path, 
210                                     target_pathname,
211                                     paired,
212                                     source.flowcell,
213                                     mid,
214                                     force=force)
215             )
216         else:
217             print " need file", target_pathname
218
219     if len(srf_condor_entries) > 0:
220         make_submit_script('srf.fastq.condor', 
221                            srf_condor_header,
222                            srf_condor_entries)
223
224     if len(qseq_condor_entries) > 0:
225         make_submit_script('qseq.fastq.condor', 
226                            qseq_condor_header,
227                            qseq_condor_entries)
228
229 def link_daf(daf_path, library_result_map):
230     if not os.path.exists(daf_path):
231         raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
232
233     base_daf = os.path.basename(daf_path)
234     
235     for lib_id, result_dir in library_result_map:
236         submission_daf = os.path.join(result_dir, base_daf)
237         if not os.path.exists(submission_daf):
238             os.link(daf_path, submission_daf)
239
240 def make_submission_ini(host, apidata, library_result_map, paired=True):
241     # ma is "map algorithm"
242     ma = 'TH1014'
243
244     if paired:
245         aligns = "Paired"
246     else:
247         aligns = "Aligns"
248
249     attributes = {
250       # bam index
251      '.bai':                   {'view': None, 'MapAlgorithm': 'NA'},
252      '.bam':                   {'view': aligns, 'MapAlgorithm': ma},
253      '.splices.bam':           {'view': 'Splices', 'MapAlgorithm': ma},
254      '.jnct':                  {'view': 'Junctions', 'MapAlgorithm': ma},
255      '.plus.bigwig':           {'view': 'PlusSignal', 'MapAlgorithm': ma},
256      '.minus.bigwig':          {'view': 'MinusSignal', 'MapAlgorithm': ma},
257      '.bigwig':                {'view': 'Signal', 'MapAlgorithm': ma},
258      '.tar.bz2':               {'view': None},
259      '.condor':                {'view': None},
260      '.daf':                   {'view': None},
261      '.ddf':                   {'view': None},
262      'novel.genes.expr':       {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
263      'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
264      '.genes.expr':            {'view': 'GeneFPKM', 'MapAlgorithm': ma},
265      '.transcripts.expr':      {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
266      '.fastq':                 {'view': 'Fastq', 'MapAlgorithm': 'NA' },
267      '_r1.fastq':              {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
268      '_r2.fastq':              {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
269      '.gtf':                   {'view': 'GeneModel', 'MapAlgorithm': ma},
270      '.ini':                   {'view': None},
271      '.log':                   {'view': None},
272      '.stats.txt':             {'view': 'InsLength', 'MapAlgorithm': ma},
273      '.srf':                   {'view': None},
274      '.wig':                   {'view': None},
275      '.zip':                   {'view': None},
276     }
277    
278     candidate_fastq_src = {}
279
280     for lib_id, result_dir in library_result_map:
281         order_by = ['order_by=files', 'view', 'replicate', 'cell', 
282                     'readType', 'mapAlgorithm', 'insertLength' ]
283         inifile =  ['[config]']
284         inifile += [" ".join(order_by)]
285         inifile += ['']
286         line_counter = 1
287         lib_info = get_library_info(host, apidata, lib_id)
288         result_ini = os.path.join(result_dir, result_dir+'.ini')
289
290         if lib_info['cell_line'].lower() == 'unknown':
291             logging.warn("Library %s missing cell_line" % (lib_id,))
292
293         standard_attributes = {'cell': lib_info['cell_line'],
294                                'replicate': lib_info['replicate'],
295                                }
296         if paired:
297             if lib_info['insert_size'] is None:
298                 errmsg = "Library %s is missing insert_size, assuming 200"
299                 logging.warn(errmsg % (lib_id,))
300                 insert_size = 200
301             else:
302                 insert_size = lib_info['insert_size']
303             standard_attributes['insertLength'] = insert_size
304             standard_attributes['readType'] = '2x75'
305         else:
306             standard_attributes['insertLength'] = 'ilNA'
307             standard_attributes['readType'] = '1x75D'
308
309         # write other lines
310         submission_files = os.listdir(result_dir)
311         fastqs = {}
312         for f in submission_files:
313             best_ext = find_best_extension(attributes, f)
314
315             if best_ext is not None:
316                if attributes[best_ext]['view'] is None:
317                    
318                    continue
319                elif best_ext.endswith('fastq'):
320                    fastqs.setdefault(best_ext, set()).add(f)
321                else:
322                    inifile.extend(
323                        make_submission_section(line_counter,
324                                                [f],
325                                                standard_attributes,
326                                                attributes[best_ext]
327                                                )
328                        )
329                    inifile += ['']
330                    line_counter += 1
331             else:
332                 raise ValueError("Unrecognized file: %s" % (f,))
333
334         # add in fastqs on a single line.
335         for extension, fastq_set in fastqs.items():
336             inifile.extend(
337                 make_submission_section(line_counter, 
338                                         fastq_set,
339                                         standard_attributes,
340                                         attributes[extension])
341             )
342             inifile += ['']
343             line_counter += 1
344             
345         f = open(result_ini,'w')
346         f.write(os.linesep.join(inifile))
347
348
349 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
350     dag_fragment = []
351     for lib_id, result_dir in library_result_map:
352         ininame = result_dir+'.ini'
353         inipathname = os.path.join(result_dir, ininame)
354         if os.path.exists(inipathname):
355             dag_fragment.extend(
356                 make_ddf(ininame, daf_name, True, make_condor, result_dir)
357             )
358
359     if make_condor and len(dag_fragment) > 0:
360         dag_filename = 'submission.dagman'
361         if os.path.exists(dag_filename):
362             logging.warn("%s exists, please delete" % (dag_filename,))
363         else:
364             f = open(dag_filename,'w')
365             f.write( os.linesep.join(dag_fragment))
366             f.write( os.linesep )
367             f.close()
368             
369
370 def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
371     """
372     Make ddf files, and bonus condor file
373     """
374     dag_fragments = []
375     curdir = os.getcwd()
376     if outdir is not None:
377         os.chdir(outdir)
378     output = sys.stdout
379     ddf_name = None
380     if guess_ddf:
381         ddf_name = make_ddf_name(ininame)
382         print ddf_name
383         output = open(ddf_name,'w')
384
385     file_list = read_ddf_ini(ininame, output)
386
387     file_list.append(daf_name)
388     if ddf_name is not None:
389         file_list.append(ddf_name)
390
391     if make_condor:
392         archive_condor = make_condor_archive_script(ininame, file_list)
393         upload_condor = make_condor_upload_script(ininame)
394         
395         dag_fragments.extend( 
396             make_dag_fragment(ininame, archive_condor, upload_condor)
397         ) 
398         
399     os.chdir(curdir)
400     
401     return dag_fragments
402
403 def read_ddf_ini(filename, output=sys.stdout):
404     """
405     Read a ini file and dump out a tab delmited text file
406     """
407     file_list = []
408     config = SafeConfigParser()
409     config.read(filename)
410
411     order_by = shlex.split(config.get("config", "order_by"))
412
413     output.write("\t".join(order_by))
414     output.write(os.linesep)
415     sections = config.sections()
416     sections.sort()
417     for section in sections:
418         if section == "config":
419             # skip the config block
420             continue
421         values = []
422         for key in order_by:
423             v = config.get(section, key)
424             values.append(v)
425             if key == 'files':
426                 file_list.extend(parse_filelist(v))
427                 
428         output.write("\t".join(values))
429         output.write(os.linesep)
430     return file_list
431     
432 def read_library_result_map(filename):
433     """
434     Read a file that maps library id to result directory.
435     Does not support spaces in filenames. 
436     
437     For example:
438       10000 result/foo/bar
439     """
440     stream = open(filename,'r')
441
442     results = []
443     for line in stream:
444         if not line.startswith('#'):
445             library_id, result_dir = line.strip().split()
446             results.append((library_id, result_dir))
447     return results
448             
449 def make_condor_archive_script(ininame, files):
450     script = """Universe = vanilla
451
452 Executable = /bin/tar
453 arguments = czvf ../%(archivename)s %(filelist)s
454
455 Error = compress.err.$(Process).log
456 Output = compress.out.$(Process).log
457 Log = /tmp/submission-compress.log
458 initialdir = %(initialdir)s
459
460 queue 
461 """
462     for f in files:
463         if not os.path.exists(f):
464             raise RuntimeError("Missing %s" % (f,))
465
466     context = {'archivename': make_submission_name(ininame),
467                'filelist': " ".join(files),
468                'initialdir': os.getcwd()}
469
470     condor_script = make_condor_name(ininame, 'archive')
471     condor_stream = open(condor_script,'w')
472     condor_stream.write(script % context)
473     condor_stream.close()
474     return condor_script
475
476 def make_condor_upload_script(ininame):
477     script = """Universe = vanilla
478
479 Executable = /usr/bin/lftp
480 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
481
482 Error = upload.err.$(Process).log
483 Output = upload.out.$(Process).log
484 Log = /tmp/submission-upload.log
485 initialdir = %(initialdir)s
486
487 queue 
488 """
489     context = {'archivename': make_submission_name(ininame),
490                'initialdir': os.getcwd()}
491
492     condor_script = make_condor_name(ininame, 'upload')
493     condor_stream = open(condor_script,'w')
494     condor_stream.write(script % context)
495     condor_stream.close()
496     return condor_script
497
498 def make_dag_fragment(ininame, archive_condor, upload_condor):
499     """
500     Make the couple of fragments compress and then upload the data.
501     """
502     cur_dir = os.getcwd()
503     archive_condor = os.path.join(cur_dir, archive_condor)
504     upload_condor = os.path.join(cur_dir, upload_condor)
505     job_basename = make_base_name(ininame)
506
507     fragments = []
508     fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
509     fragments.append('JOB %s_upload %s' % (job_basename,  upload_condor))
510     fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
511
512     return fragments
513
514 def get_library_info(host, apidata, library_id):
515     url = api.library_url(host, library_id)
516     contents = api.retrieve_info(url, apidata)
517     return contents
518
519 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
520                         mid=None, force=False):
521     args = [ srf_file, ]
522     if paired:
523         args.extend(['--left', target_pathname])
524         # this is ugly. I did it because I was pregenerating the target
525         # names before I tried to figure out what sources could generate
526         # those targets, and everything up to this point had been
527         # one-to-one. So I couldn't figure out how to pair the 
528         # target names. 
529         # With this at least the command will run correctly.
530         # however if we rename the default targets, this'll break
531         # also I think it'll generate it twice.
532         args.extend(['--right', 
533                      target_pathname.replace('_r1.fastq', '_r2.fastq')])
534     else:
535         args.extend(['--single', target_pathname ])
536     if flowcell is not None:
537         args.extend(['--flowcell', flowcell])
538
539     if mid is not None:
540         args.extend(['-m', str(mid)])
541
542     if force:
543         args.extend(['--force'])
544
545     script = """
546 arguments="%s"
547 queue
548 """ % (" ".join(args),)
549     
550     return  script 
551
552 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
553     args = ['-i', qseq_file, '-o', target_pathname ]
554     if flowcell is not None:
555         args.extend(['-f', flowcell])
556     script = """
557 arguments="%s"
558 queue
559 """ % (" ".join(args))
560
561     return script 
562
563 def find_archive_sequence_files(host, apidata, sequences_path, 
564                                 library_result_map):
565     """
566     Find all the archive sequence files possibly associated with our results.
567
568     """
569     logging.debug("Searching for sequence files in: %s" %(sequences_path,))
570
571     lib_db = {}
572     seq_dirs = set()
573     #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
574     candidate_lanes = {}
575     for lib_id, result_dir in library_result_map:
576         lib_info = get_library_info(host, apidata, lib_id)
577         lib_db[lib_id] = lib_info
578
579         for lane in lib_info['lane_set']:
580             lane_key = (lane['flowcell'], lane['lane_number'])
581             candidate_lanes[lane_key] = lib_id
582             seq_dirs.add(os.path.join(sequences_path, 
583                                          'flowcells', 
584                                          lane['flowcell']))
585     logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
586     candidate_seq_list = scan_for_sequences(seq_dirs)
587
588     # at this point we have too many sequences as scan_for_sequences
589     # returns all the sequences in a flowcell directory
590     # so lets filter out the extras
591     
592     for seq in candidate_seq_list:
593         lane_key = (seq.flowcell, seq.lane)
594         lib_id = candidate_lanes.get(lane_key, None)
595         if lib_id is not None:
596             lib_info = lib_db[lib_id]
597             lanes = lib_info.setdefault('lanes', {})
598             lanes.setdefault(lane_key, set()).add(seq)
599     
600     return lib_db
601
602 def find_best_extension(extension_map, filename):
603     """
604     Search through extension_map looking for the best extension
605     The 'best' is the longest match
606
607     :Args:
608       extension_map (dict): '.ext' -> { 'view': 'name' or None }
609       filename (str): the filename whose extention we are about to examine
610     """
611     best_ext = None
612     path, last_ext = os.path.splitext(filename)
613
614     for ext in extension_map.keys():
615         if filename.endswith(ext):
616             if best_ext is None:
617                 best_ext = ext
618             elif len(ext) > len(best_ext):
619                 best_ext = ext
620     return best_ext
621     
622 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
623     """
624     Create a section in the submission ini file
625     """
626     inifile = [ '[line%s]' % (line_counter,) ]
627     inifile += ["files=%s" % (",".join(files))]
628     cur_attributes = {}
629     cur_attributes.update(standard_attributes)
630     cur_attributes.update(file_attributes)
631     
632     for k,v in cur_attributes.items():
633         inifile += ["%s=%s" % (k,v)]
634     return inifile
635
636 def make_base_name(pathname):
637     base = os.path.basename(pathname)
638     name, ext = os.path.splitext(base)
639     return name
640
641 def make_submission_name(ininame):
642     name = make_base_name(ininame)
643     return name + '.tgz'
644
645 def make_ddf_name(pathname):
646     name = make_base_name(pathname)
647     return name + '.ddf'
648
649 def make_condor_name(pathname, run_type=None):
650     name = make_base_name(pathname)
651     elements = [name]
652     if run_type is not None:
653         elements.append(run_type)
654     elements.append('condor')
655     return ".".join(elements)
656     
657 def make_submit_script(target, header, body_list):
658     """
659     write out a text file
660
661     this was intended for condor submit scripts
662     
663     Args:
664       target (str or stream): 
665         if target is a string, we will open and close the file
666         if target is a stream, the caller is responsible.
667
668       header (str);
669         header to write at the beginning of the file
670       body_list (list of strs):
671         a list of blocks to add to the file.
672     """
673     if type(target) in types.StringTypes:
674         f = open(target,'w')
675     else:
676         f = target
677     f.write(header)
678     for entry in body_list:
679         f.write(entry)
680     if type(target) in types.StringTypes:
681         f.close()
682
683 def parse_filelist(file_string):
684     return file_string.split(',')
685
686 def validate_filelist(files):
687     """
688     Die if a file doesn't exist in a file list
689     """
690     for f in files:
691         if not os.path.exists(f):
692             raise RuntimeError("%s does not exist" % (f,))
693         
694 if __name__ == "__main__":
695     main()