2 from ConfigParser import SafeConfigParser
7 from optparse import OptionParser
9 from pprint import pprint, pformat
11 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
23 from htsworkflow.pipelines import qseq2fastq
24 from htsworkflow.pipelines import srf2fastq
26 def main(cmdline=None):
27 parser = make_parser()
28 opts, args = parser.parse_args(cmdline)
31 logging.basicConfig(level = logging.DEBUG )
33 logging.basicConfig(level = logging.INFO )
35 logging.basicConfig(level = logging.WARNING )
37 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
39 if opts.host is None or opts.apiid is None or opts.apikey is None:
40 parser.error("Please specify host url, apiid, apikey")
43 parser.error("I need at least one library submission-dir input file")
45 library_result_map = []
47 library_result_map.extend(read_library_result_map(a))
49 if opts.daf is not None:
50 link_daf(opts.daf, library_result_map)
53 build_fastqs(opts.host,
60 make_submission_ini(opts.host, apidata, library_result_map)
63 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
67 # Load defaults from the config files
68 config = SafeConfigParser()
69 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
71 sequence_archive = None
75 SECTION = 'sequence_archive'
76 if config.has_section(SECTION):
77 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
78 sequence_archive = os.path.expanduser(sequence_archive)
79 apiid = config.get(SECTION, 'apiid', apiid)
80 apikey = config.get(SECTION, 'apikey', apikey)
81 apihost = config.get(SECTION, 'host', apihost)
83 parser = OptionParser()
86 parser.add_option('--fastq', help="generate scripts for making fastq files",
87 default=False, action="store_true")
89 parser.add_option('--ini', help="generate submission ini file", default=False,
92 parser.add_option('--makeddf', help='make the ddfs', default=False,
95 parser.add_option('--daf', default=None, help='specify daf name')
96 parser.add_option('--force', default=False, action="store_true",
97 help="Force regenerating fastqs")
99 # configuration options
100 parser.add_option('--apiid', default=apiid, help="Specify API ID")
101 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
102 parser.add_option('--host', default=apihost,
103 help="specify HTSWorkflow host",)
104 parser.add_option('--sequence', default=sequence_archive,
105 help="sequence repository")
108 parser.add_option('--verbose', default=False, action="store_true",
109 help='verbose logging')
110 parser.add_option('--debug', default=False, action="store_true",
111 help='debug logging')
116 def build_fastqs(host, apidata, sequences_path, library_result_map,
119 Generate condor scripts to build any needed fastq files
122 host (str): root of the htsworkflow api server
123 apidata (dict): id & key to post to the server
124 sequences_path (str): root of the directory tree to scan for files
125 library_result_map (list): [(library_id, destination directory), ...]
127 qseq_condor_header = """
130 error=log/qseq2fastq.err.$(process).log
131 output=log/qseq2fastq.out.$(process).log
132 log=log/qseq2fastq.log
134 """ % {'exe': sys.executable }
135 qseq_condor_entries = []
136 srf_condor_header = """
139 output=log/srf_pair_fastq.out.$(process).log
140 error=log/srf_pair_fastq.err.$(process).log
141 log=log/srf_pair_fastq.log
142 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
144 """ % {'exe': sys.executable }
145 srf_condor_entries = []
146 lib_db = find_archive_sequence_files(host,
151 needed_targets = find_missing_targets(library_result_map, lib_db, force)
153 for target_pathname, available_sources in needed_targets.items():
154 logging.debug(' target : %s' % (target_pathname,))
155 logging.debug(' candidate sources: %s' % (available_sources,))
156 if available_sources.has_key('qseq'):
157 source = available_sources['qseq']
158 qseq_condor_entries.append(
159 condor_qseq_to_fastq(source.path,
164 elif available_sources.has_key('srf'):
165 source = available_sources['srf']
166 mid = getattr(source, 'mid_point', None)
167 srf_condor_entries.append(
168 condor_srf_to_fastq(source.path,
176 print " need file", target_pathname
178 if len(srf_condor_entries) > 0:
179 make_submit_script('srf.fastq.condor',
183 if len(qseq_condor_entries) > 0:
184 make_submit_script('qseq.fastq.condor',
189 def find_missing_targets(library_result_map, lib_db, force=False):
191 Check if the sequence file exists.
192 This requires computing what the sequence name is and checking
193 to see if it can be found in the sequence location.
195 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
197 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
198 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
199 # find what targets we're missing
201 for lib_id, result_dir in library_result_map:
203 lane_dict = make_lane_dict(lib_db, lib_id)
205 for lane_key, sequences in lib['lanes'].items():
206 for seq in sequences:
207 seq.paired = lane_dict[seq.flowcell]['paired_end']
208 lane_status = lane_dict[seq.flowcell]['status']
210 if seq.paired and seq.read is None:
212 filename_attributes = {
213 'flowcell': seq.flowcell,
220 if lane_status == 'Failed':
222 if seq.flowcell == '30DY0AAXX':
223 # 30DY0 only ran for 151 bases instead of 152
224 # it is actually 76 1st read, 75 2nd read
229 target_name = fastq_paired_template % filename_attributes
231 target_name = fastq_single_template % filename_attributes
233 target_pathname = os.path.join(result_dir, target_name)
234 if force or not os.path.exists(target_pathname):
235 t = needed_targets.setdefault(target_pathname, {})
236 t[seq.filetype] = seq
238 return needed_targets
241 def link_daf(daf_path, library_result_map):
242 if not os.path.exists(daf_path):
243 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
245 base_daf = os.path.basename(daf_path)
247 for lib_id, result_dir in library_result_map:
248 submission_daf = os.path.join(result_dir, base_daf)
249 if not os.path.exists(submission_daf):
250 os.link(daf_path, submission_daf)
253 def make_submission_ini(host, apidata, library_result_map, paired=True):
254 #attributes = get_filename_attribute_map(paired)
255 view_map = NameToViewMap(host, apidata)
257 candidate_fastq_src = {}
259 for lib_id, result_dir in library_result_map:
260 order_by = ['order_by=files', 'view', 'replicate', 'cell',
261 'readType', 'mapAlgorithm', 'insertLength' ]
262 inifile = ['[config]']
263 inifile += [" ".join(order_by)]
266 result_ini = os.path.join(result_dir, result_dir+'.ini')
269 submission_files = os.listdir(result_dir)
271 for f in submission_files:
272 attributes = view_map.find_attributes(f, lib_id)
273 if attributes is None:
274 raise ValueError("Unrecognized file: %s" % (f,))
276 ext = attributes["extension"]
277 if attributes['view'] is None:
279 elif attributes.get("type", None) == 'fastq':
280 fastqs.setdefault(ext, set()).add(f)
283 make_submission_section(line_counter,
291 # add in fastqs on a single line.
292 for extension, fastq_set in fastqs.items():
294 make_submission_section(line_counter,
296 attributes[extension])
301 f = open(result_ini,'w')
302 f.write(os.linesep.join(inifile))
305 def make_lane_dict(lib_db, lib_id):
307 Convert the lane_set in a lib_db to a dictionary
308 indexed by flowcell ID
311 for lane in lib_db[lib_id]['lane_set']:
312 result.append((lane['flowcell'], lane))
316 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
318 for lib_id, result_dir in library_result_map:
319 ininame = result_dir+'.ini'
320 inipathname = os.path.join(result_dir, ininame)
321 if os.path.exists(inipathname):
323 make_ddf(ininame, daf_name, True, make_condor, result_dir)
326 if make_condor and len(dag_fragment) > 0:
327 dag_filename = 'submission.dagman'
328 if not force and os.path.exists(dag_filename):
329 logging.warn("%s exists, please delete" % (dag_filename,))
331 f = open(dag_filename,'w')
332 f.write( os.linesep.join(dag_fragment))
333 f.write( os.linesep )
337 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
339 Make ddf files, and bonus condor file
343 if outdir is not None:
348 ddf_name = make_ddf_name(ininame)
350 output = open(ddf_name,'w')
352 file_list = read_ddf_ini(ininame, output)
354 file_list.append(daf_name)
355 if ddf_name is not None:
356 file_list.append(ddf_name)
359 archive_condor = make_condor_archive_script(ininame, file_list)
360 upload_condor = make_condor_upload_script(ininame)
362 dag_fragments.extend(
363 make_dag_fragment(ininame, archive_condor, upload_condor)
371 def read_ddf_ini(filename, output=sys.stdout):
373 Read a ini file and dump out a tab delmited text file
376 config = SafeConfigParser()
377 config.read(filename)
379 order_by = shlex.split(config.get("config", "order_by"))
381 output.write("\t".join(order_by))
382 output.write(os.linesep)
383 sections = config.sections()
385 for section in sections:
386 if section == "config":
387 # skip the config block
391 v = config.get(section, key)
394 file_list.extend(parse_filelist(v))
396 output.write("\t".join(values))
397 output.write(os.linesep)
401 def read_library_result_map(filename):
403 Read a file that maps library id to result directory.
404 Does not support spaces in filenames.
409 stream = open(filename,'r')
414 if not line.startswith('#') and len(line) > 0 :
415 library_id, result_dir = line.split()
416 results.append((library_id, result_dir))
420 def make_condor_archive_script(ininame, files):
421 script = """Universe = vanilla
423 Executable = /bin/tar
424 arguments = czvf ../%(archivename)s %(filelist)s
426 Error = compress.err.$(Process).log
427 Output = compress.out.$(Process).log
428 Log = /tmp/submission-compress-%(user)s.log
429 initialdir = %(initialdir)s
434 if not os.path.exists(f):
435 raise RuntimeError("Missing %s" % (f,))
437 context = {'archivename': make_submission_name(ininame),
438 'filelist': " ".join(files),
439 'initialdir': os.getcwd(),
440 'user': os.getlogin()}
442 condor_script = make_condor_name(ininame, 'archive')
443 condor_stream = open(condor_script,'w')
444 condor_stream.write(script % context)
445 condor_stream.close()
449 def make_condor_upload_script(ininame):
450 script = """Universe = vanilla
452 Executable = /usr/bin/lftp
453 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/%(archivename)s
455 Error = upload.err.$(Process).log
456 Output = upload.out.$(Process).log
457 Log = /tmp/submission-upload-%(user)s.log
458 initialdir = %(initialdir)s
462 context = {'archivename': make_submission_name(ininame),
463 'initialdir': os.getcwd(),
464 'user': os.getlogin()}
466 condor_script = make_condor_name(ininame, 'upload')
467 condor_stream = open(condor_script,'w')
468 condor_stream.write(script % context)
469 condor_stream.close()
473 def make_dag_fragment(ininame, archive_condor, upload_condor):
475 Make the couple of fragments compress and then upload the data.
477 cur_dir = os.getcwd()
478 archive_condor = os.path.join(cur_dir, archive_condor)
479 upload_condor = os.path.join(cur_dir, upload_condor)
480 job_basename = make_base_name(ininame)
483 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
484 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
485 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
490 def get_library_info(host, apidata, library_id):
491 url = api.library_url(host, library_id)
492 contents = api.retrieve_info(url, apidata)
496 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
497 mid=None, force=False):
498 py = srf2fastq.__file__
499 args = [ py, srf_file, ]
501 args.extend(['--left', target_pathname])
502 # this is ugly. I did it because I was pregenerating the target
503 # names before I tried to figure out what sources could generate
504 # those targets, and everything up to this point had been
505 # one-to-one. So I couldn't figure out how to pair the
507 # With this at least the command will run correctly.
508 # however if we rename the default targets, this'll break
509 # also I think it'll generate it twice.
510 args.extend(['--right',
511 target_pathname.replace('_r1.fastq', '_r2.fastq')])
513 args.extend(['--single', target_pathname ])
514 if flowcell is not None:
515 args.extend(['--flowcell', flowcell])
518 args.extend(['-m', str(mid)])
521 args.extend(['--force'])
526 """ % (" ".join(args),)
531 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
532 py = qseq2fastq.__file__
533 args = [py, '-i', qseq_file, '-o', target_pathname ]
534 if flowcell is not None:
535 args.extend(['-f', flowcell])
539 """ % (" ".join(args))
543 def find_archive_sequence_files(host, apidata, sequences_path,
546 Find all the archive sequence files possibly associated with our results.
549 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
553 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
555 for lib_id, result_dir in library_result_map:
556 lib_info = get_library_info(host, apidata, lib_id)
557 lib_info['lanes'] = {}
558 lib_db[lib_id] = lib_info
560 for lane in lib_info['lane_set']:
561 lane_key = (lane['flowcell'], lane['lane_number'])
562 candidate_lanes[lane_key] = lib_id
563 seq_dirs.add(os.path.join(sequences_path,
566 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
567 candidate_seq_list = scan_for_sequences(seq_dirs)
569 # at this point we have too many sequences as scan_for_sequences
570 # returns all the sequences in a flowcell directory
571 # so lets filter out the extras
573 for seq in candidate_seq_list:
574 lane_key = (seq.flowcell, seq.lane)
575 lib_id = candidate_lanes.get(lane_key, None)
576 if lib_id is not None:
577 lib_info = lib_db[lib_id]
578 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
583 class NameToViewMap(object):
584 """Determine view attributes for a given submission file name
586 def __init__(self, root_url, apidata):
587 self.root_url = root_url
588 self.apidata = apidata
592 # ma is "map algorithm"
597 ('*.bam', self._guess_bam_view),
598 ('*.splices.bam', 'Splices'),
599 ('*.jnct', 'Junctions'),
600 ('*.plus.bigwig', 'PlusSignal'),
601 ('*.minus.bigwig', 'MinusSignal'),
602 ('*.bigwig', 'Signal'),
607 ('cufflinks-0.9.0-genes.expr', 'GeneDeNovo'),
608 ('cufflinks-0.9.0-transcripts.expr', 'TranscriptDeNovo'),
609 ('cufflinks-0.9.0-transcripts.gtf', 'GeneModel'),
610 ('GENCODE-v3c-genes.expr', 'GeneGencV3c'),
611 ('GENCODE-v3c-transcripts.expr', 'TranscriptGencV3c'),
612 ('GENCODE-v4-genes.expr', 'GeneGencV4'),
613 ('GENCODE-v4-transcripts.expr', 'TranscriptGencV4'),
614 ('GENCODE-v4-transcript.expr', 'TranscriptGencV4'),
615 ('*_r1.fastq', 'FastqRd1'),
616 ('*_r2.fastq', 'FastqRd2'),
617 ('*.fastq', 'Fastq'),
618 ('*.gtf', 'GeneModel'),
621 ('*.stats.txt', 'InsLength'),
628 None: {"MapAlgorithm": "NA"},
629 "Paired": {"MapAlgorithm": ma},
630 "Single": {"MapAlgorithm": ma},
631 "Splices": {"MapAlgorithm": ma},
632 "Junctions": {"MapAlgorithm": ma},
633 "PlusSignal": {"MapAlgorithm": ma},
634 "MinusSignal": {"MapAlgorithm": ma},
635 "Signal": {"MapAlgorithm": ma},
636 "GeneModel": {"MapAlgorithm": ma},
637 "GeneDeNovo": {"MapAlgorithm": ma},
638 "TranscriptDeNovo": {"MapAlgorithm": ma},
639 "GeneGencV3c": {"MapAlgorithm": ma},
640 "TranscriptGencV3c": {"MapAlgorithm": ma},
641 "GeneGencV4": {"MapAlgorithm": ma},
642 "TranscriptGencV4": {"MapAlgorithm": ma},
643 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
644 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
645 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
646 "GeneModel": {"MapAlgorithm": ma},
647 "InsLength": {"MapAlgorithm": ma},
649 # view name is one of the attributes
650 for v in self.views.keys():
651 self.views[v]['view'] = v
653 def find_attributes(self, pathname, lib_id):
654 """Looking for the best extension
655 The 'best' is the longest match
658 filename (str): the filename whose extention we are about to examine
660 path, filename = os.path.splitext(pathname)
661 if not self.lib_cache.has_key(lib_id):
662 self.lib_cache[lib_id] = get_library_info(self.root_url,
663 self.apidata, lib_id)
665 lib_info = self.lib_cache[lib_id]
666 if lib_info['cell_line'].lower() == 'unknown':
667 logging.warn("Library %s missing cell_line" % (lib_id,))
669 'cell': lib_info['cell_line'],
670 'replicate': lib_info['replicate'],
672 is_paired = self._is_paired(lib_id, lib_info)
675 attributes.update(self.get_paired_attributes(lib_info))
677 attributes.update(self.get_single_attributes(lib_info))
679 for pattern, view in self.patterns:
680 if fnmatch.fnmatch(pathname, pattern):
682 view = view(is_paired=is_paired)
684 attributes.update(self.views[view])
685 attributes["extension"] = pattern
689 def _guess_bam_view(self, is_paired=True):
690 """Guess a view name based on library attributes
698 def _is_paired(self, lib_id, lib_info):
699 """Determine if a library is paired end"""
700 if len(lib_info["lane_set"]) == 0:
703 if not self.lib_paired.has_key(lib_id):
707 # check to see if all the flowcells are the same.
708 # otherwise we might need to do something complicated
709 for flowcell in lib_info["lane_set"]:
710 # yes there's also a status code, but this comparison
712 if flowcell["status"].lower() == "failed":
713 # ignore failed flowcell
716 elif flowcell["paired_end"]:
721 logging.debug("Library %s: %d paired, %d single, %d failed" % \
722 (lib_info["library_id"], is_paired, isnot_paired, failed))
724 if is_paired > isnot_paired:
725 self.lib_paired[lib_id] = True
726 elif is_paired < isnot_paired:
727 self.lib_paired[lib_id] = False
729 raise RuntimeError("Equal number of paired & unpaired lanes."\
730 "Can't guess library paired status")
732 return self.lib_paired[lib_id]
734 def get_paired_attributes(self, lib_info):
735 if lib_info['insert_size'] is None:
736 errmsg = "Library %s is missing insert_size, assuming 200"
737 logging.warn(errmsg % (lib_info["library_id"],))
740 insert_size = lib_info['insert_size']
741 return {'insertLength': insert_size,
744 def get_single_attributes(self, lib_info):
745 return {'insertLength':'ilNA',
749 def make_submission_section(line_counter, files, attributes):
751 Create a section in the submission ini file
753 inifile = [ "[line%s]" % (line_counter,) ]
754 inifile += ["files=%s" % (",".join(files))]
756 for k,v in attributes.items():
757 inifile += ["%s=%s" % (k,v)]
761 def make_base_name(pathname):
762 base = os.path.basename(pathname)
763 name, ext = os.path.splitext(base)
767 def make_submission_name(ininame):
768 name = make_base_name(ininame)
772 def make_ddf_name(pathname):
773 name = make_base_name(pathname)
777 def make_condor_name(pathname, run_type=None):
778 name = make_base_name(pathname)
780 if run_type is not None:
781 elements.append(run_type)
782 elements.append("condor")
783 return ".".join(elements)
786 def make_submit_script(target, header, body_list):
788 write out a text file
790 this was intended for condor submit scripts
793 target (str or stream):
794 if target is a string, we will open and close the file
795 if target is a stream, the caller is responsible.
798 header to write at the beginning of the file
799 body_list (list of strs):
800 a list of blocks to add to the file.
802 if type(target) in types.StringTypes:
807 for entry in body_list:
809 if type(target) in types.StringTypes:
812 def parse_filelist(file_string):
813 return file_string.split(",")
816 def validate_filelist(files):
818 Die if a file doesn't exist in a file list
821 if not os.path.exists(f):
822 raise RuntimeError("%s does not exist" % (f,))
825 if __name__ == "__main__":