2 from ConfigParser import SafeConfigParser
7 from optparse import OptionParser
9 from pprint import pprint, pformat
11 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
23 from htsworkflow.pipelines import qseq2fastq
24 from htsworkflow.pipelines import srf2fastq
26 def main(cmdline=None):
27 parser = make_parser()
28 opts, args = parser.parse_args(cmdline)
31 logging.basicConfig(level = logging.DEBUG )
33 logging.basicConfig(level = logging.INFO )
35 logging.basicConfig(level = logging.WARNING )
37 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
39 if opts.host is None or opts.apiid is None or opts.apikey is None:
40 parser.error("Please specify host url, apiid, apikey")
43 parser.error("I need at least one library submission-dir input file")
45 library_result_map = []
47 library_result_map.extend(read_library_result_map(a))
49 if opts.daf is not None:
50 link_daf(opts.daf, library_result_map)
53 build_fastqs(opts.host,
60 make_submission_ini(opts.host, apidata, library_result_map)
63 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
67 # Load defaults from the config files
68 config = SafeConfigParser()
69 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
71 sequence_archive = None
75 SECTION = 'sequence_archive'
76 if config.has_section(SECTION):
77 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
78 sequence_archive = os.path.expanduser(sequence_archive)
79 apiid = config.get(SECTION, 'apiid', apiid)
80 apikey = config.get(SECTION, 'apikey', apikey)
81 apihost = config.get(SECTION, 'host', apihost)
83 parser = OptionParser()
86 parser.add_option('--fastq', help="generate scripts for making fastq files",
87 default=False, action="store_true")
89 parser.add_option('--ini', help="generate submission ini file", default=False,
92 parser.add_option('--makeddf', help='make the ddfs', default=False,
95 parser.add_option('--daf', default=None, help='specify daf name')
96 parser.add_option('--force', default=False, action="store_true",
97 help="Force regenerating fastqs")
99 # configuration options
100 parser.add_option('--apiid', default=apiid, help="Specify API ID")
101 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
102 parser.add_option('--host', default=apihost,
103 help="specify HTSWorkflow host",)
104 parser.add_option('--sequence', default=sequence_archive,
105 help="sequence repository")
108 parser.add_option('--verbose', default=False, action="store_true",
109 help='verbose logging')
110 parser.add_option('--debug', default=False, action="store_true",
111 help='debug logging')
116 def build_fastqs(host, apidata, sequences_path, library_result_map,
119 Generate condor scripts to build any needed fastq files
122 host (str): root of the htsworkflow api server
123 apidata (dict): id & key to post to the server
124 sequences_path (str): root of the directory tree to scan for files
125 library_result_map (list): [(library_id, destination directory), ...]
127 qseq_condor_header = """
130 error=log/qseq2fastq.err.$(process).log
131 output=log/qseq2fastq.out.$(process).log
132 log=log/qseq2fastq.log
134 """ % {'exe': sys.executable }
135 qseq_condor_entries = []
136 srf_condor_header = """
139 output=log/srf_pair_fastq.out.$(process).log
140 error=log/srf_pair_fastq.err.$(process).log
141 log=log/srf_pair_fastq.log
142 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
144 """ % {'exe': sys.executable }
145 srf_condor_entries = []
146 lib_db = find_archive_sequence_files(host,
151 needed_targets = find_missing_targets(library_result_map, lib_db, force)
153 for target_pathname, available_sources in needed_targets.items():
154 logging.debug(' target : %s' % (target_pathname,))
155 logging.debug(' candidate sources: %s' % (available_sources,))
156 if available_sources.has_key('qseq'):
157 source = available_sources['qseq']
158 qseq_condor_entries.append(
159 condor_qseq_to_fastq(source.path,
164 elif available_sources.has_key('srf'):
165 source = available_sources['srf']
166 mid = getattr(source, 'mid_point', None)
167 srf_condor_entries.append(
168 condor_srf_to_fastq(source.path,
176 print " need file", target_pathname
178 if len(srf_condor_entries) > 0:
179 make_submit_script('srf.fastq.condor',
183 if len(qseq_condor_entries) > 0:
184 make_submit_script('qseq.fastq.condor',
189 def find_missing_targets(library_result_map, lib_db, force=False):
191 Check if the sequence file exists.
192 This requires computing what the sequence name is and checking
193 to see if it can be found in the sequence location.
195 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
197 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
198 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
199 # find what targets we're missing
201 for lib_id, result_dir in library_result_map:
203 lane_dict = make_lane_dict(lib_db, lib_id)
205 for lane_key, sequences in lib['lanes'].items():
206 for seq in sequences:
207 seq.paired = lane_dict[seq.flowcell]['paired_end']
208 lane_status = lane_dict[seq.flowcell]['status']
210 if seq.paired and seq.read is None:
212 filename_attributes = {
213 'flowcell': seq.flowcell,
220 if lane_status == 'Failed':
222 if seq.flowcell == '30DY0AAXX':
223 # 30DY0 only ran for 151 bases instead of 152
224 # it is actually 76 1st read, 75 2nd read
229 target_name = fastq_paired_template % filename_attributes
231 target_name = fastq_single_template % filename_attributes
233 target_pathname = os.path.join(result_dir, target_name)
234 if force or not os.path.exists(target_pathname):
235 t = needed_targets.setdefault(target_pathname, {})
236 t[seq.filetype] = seq
238 return needed_targets
241 def link_daf(daf_path, library_result_map):
242 if not os.path.exists(daf_path):
243 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
245 base_daf = os.path.basename(daf_path)
247 for lib_id, result_dir in library_result_map:
248 submission_daf = os.path.join(result_dir, base_daf)
249 if not os.path.exists(submission_daf):
250 os.link(daf_path, submission_daf)
253 def make_submission_ini(host, apidata, library_result_map, paired=True):
254 #attributes = get_filename_attribute_map(paired)
255 view_map = NameToViewMap(host, apidata)
257 candidate_fastq_src = {}
259 for lib_id, result_dir in library_result_map:
260 order_by = ['order_by=files', 'view', 'replicate', 'cell',
261 'readType', 'mapAlgorithm', 'insertLength' ]
262 inifile = ['[config]']
263 inifile += [" ".join(order_by)]
266 result_ini = os.path.join(result_dir, result_dir+'.ini')
269 submission_files = os.listdir(result_dir)
271 fastq_attributes = {}
272 for f in submission_files:
273 attributes = view_map.find_attributes(f, lib_id)
274 if attributes is None:
275 raise ValueError("Unrecognized file: %s" % (f,))
277 ext = attributes["extension"]
278 if attributes['view'] is None:
280 elif attributes.get("type", None) == 'fastq':
281 fastqs.setdefault(ext, set()).add(f)
282 fastq_attributes[ext] = attributes
285 make_submission_section(line_counter,
292 # add in fastqs on a single line.
294 for extension, fastq_files in fastqs.items():
296 make_submission_section(line_counter,
298 fastq_attributes[extension])
303 f = open(result_ini,'w')
304 f.write(os.linesep.join(inifile))
307 def make_lane_dict(lib_db, lib_id):
309 Convert the lane_set in a lib_db to a dictionary
310 indexed by flowcell ID
313 for lane in lib_db[lib_id]['lane_set']:
314 result.append((lane['flowcell'], lane))
318 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
320 for lib_id, result_dir in library_result_map:
321 ininame = result_dir+'.ini'
322 inipathname = os.path.join(result_dir, ininame)
323 if os.path.exists(inipathname):
325 make_ddf(ininame, daf_name, True, make_condor, result_dir)
328 if make_condor and len(dag_fragment) > 0:
329 dag_filename = 'submission.dagman'
330 if not force and os.path.exists(dag_filename):
331 logging.warn("%s exists, please delete" % (dag_filename,))
333 f = open(dag_filename,'w')
334 f.write( os.linesep.join(dag_fragment))
335 f.write( os.linesep )
339 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
341 Make ddf files, and bonus condor file
345 if outdir is not None:
350 ddf_name = make_ddf_name(ininame)
352 output = open(ddf_name,'w')
354 file_list = read_ddf_ini(ininame, output)
356 file_list.append(daf_name)
357 if ddf_name is not None:
358 file_list.append(ddf_name)
361 archive_condor = make_condor_archive_script(ininame, file_list)
362 upload_condor = make_condor_upload_script(ininame)
364 dag_fragments.extend(
365 make_dag_fragment(ininame, archive_condor, upload_condor)
373 def read_ddf_ini(filename, output=sys.stdout):
375 Read a ini file and dump out a tab delmited text file
378 config = SafeConfigParser()
379 config.read(filename)
381 order_by = shlex.split(config.get("config", "order_by"))
383 output.write("\t".join(order_by))
384 output.write(os.linesep)
385 sections = config.sections()
387 for section in sections:
388 if section == "config":
389 # skip the config block
393 v = config.get(section, key)
396 file_list.extend(parse_filelist(v))
398 output.write("\t".join(values))
399 output.write(os.linesep)
403 def read_library_result_map(filename):
405 Read a file that maps library id to result directory.
406 Does not support spaces in filenames.
411 stream = open(filename,'r')
416 if not line.startswith('#') and len(line) > 0 :
417 library_id, result_dir = line.split()
418 results.append((library_id, result_dir))
422 def make_condor_archive_script(ininame, files):
423 script = """Universe = vanilla
425 Executable = /bin/tar
426 arguments = czvf ../%(archivename)s %(filelist)s
428 Error = compress.err.$(Process).log
429 Output = compress.out.$(Process).log
430 Log = /tmp/submission-compress-%(user)s.log
431 initialdir = %(initialdir)s
436 if not os.path.exists(f):
437 raise RuntimeError("Missing %s" % (f,))
439 context = {'archivename': make_submission_name(ininame),
440 'filelist': " ".join(files),
441 'initialdir': os.getcwd(),
442 'user': os.getlogin()}
444 condor_script = make_condor_name(ininame, 'archive')
445 condor_stream = open(condor_script,'w')
446 condor_stream.write(script % context)
447 condor_stream.close()
451 def make_condor_upload_script(ininame):
452 script = """Universe = vanilla
454 Executable = /usr/bin/lftp
455 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/%(archivename)s
457 Error = upload.err.$(Process).log
458 Output = upload.out.$(Process).log
459 Log = /tmp/submission-upload-%(user)s.log
460 initialdir = %(initialdir)s
464 context = {'archivename': make_submission_name(ininame),
465 'initialdir': os.getcwd(),
466 'user': os.getlogin()}
468 condor_script = make_condor_name(ininame, 'upload')
469 condor_stream = open(condor_script,'w')
470 condor_stream.write(script % context)
471 condor_stream.close()
475 def make_dag_fragment(ininame, archive_condor, upload_condor):
477 Make the couple of fragments compress and then upload the data.
479 cur_dir = os.getcwd()
480 archive_condor = os.path.join(cur_dir, archive_condor)
481 upload_condor = os.path.join(cur_dir, upload_condor)
482 job_basename = make_base_name(ininame)
485 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
486 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
487 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
492 def get_library_info(host, apidata, library_id):
493 url = api.library_url(host, library_id)
494 contents = api.retrieve_info(url, apidata)
498 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
499 mid=None, force=False):
500 py = srf2fastq.__file__
501 args = [ py, srf_file, ]
503 args.extend(['--left', target_pathname])
504 # this is ugly. I did it because I was pregenerating the target
505 # names before I tried to figure out what sources could generate
506 # those targets, and everything up to this point had been
507 # one-to-one. So I couldn't figure out how to pair the
509 # With this at least the command will run correctly.
510 # however if we rename the default targets, this'll break
511 # also I think it'll generate it twice.
512 args.extend(['--right',
513 target_pathname.replace('_r1.fastq', '_r2.fastq')])
515 args.extend(['--single', target_pathname ])
516 if flowcell is not None:
517 args.extend(['--flowcell', flowcell])
520 args.extend(['-m', str(mid)])
523 args.extend(['--force'])
528 """ % (" ".join(args),)
533 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
534 py = qseq2fastq.__file__
535 args = [py, '-i', qseq_file, '-o', target_pathname ]
536 if flowcell is not None:
537 args.extend(['-f', flowcell])
541 """ % (" ".join(args))
545 def find_archive_sequence_files(host, apidata, sequences_path,
548 Find all the archive sequence files possibly associated with our results.
551 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
555 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
557 for lib_id, result_dir in library_result_map:
558 lib_info = get_library_info(host, apidata, lib_id)
559 lib_info['lanes'] = {}
560 lib_db[lib_id] = lib_info
562 for lane in lib_info['lane_set']:
563 lane_key = (lane['flowcell'], lane['lane_number'])
564 candidate_lanes[lane_key] = lib_id
565 seq_dirs.add(os.path.join(sequences_path,
568 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
569 candidate_seq_list = scan_for_sequences(seq_dirs)
571 # at this point we have too many sequences as scan_for_sequences
572 # returns all the sequences in a flowcell directory
573 # so lets filter out the extras
575 for seq in candidate_seq_list:
576 lane_key = (seq.flowcell, seq.lane)
577 lib_id = candidate_lanes.get(lane_key, None)
578 if lib_id is not None:
579 lib_info = lib_db[lib_id]
580 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
585 class NameToViewMap(object):
586 """Determine view attributes for a given submission file name
588 def __init__(self, root_url, apidata):
589 self.root_url = root_url
590 self.apidata = apidata
594 # ma is "map algorithm"
599 ('*.bam', self._guess_bam_view),
600 ('*.splices.bam', 'Splices'),
601 ('junctions.bed', 'Junctions'),
602 ('*.jnct', 'Junctions'),
603 ('*.plus.bigwig', 'PlusSignal'),
604 ('*.minus.bigwig', 'MinusSignal'),
605 ('*.bigwig', 'Signal'),
610 ('cufflinks-0.9.0-genes.expr', 'GeneDeNovo'),
611 ('cufflinks-0.9.0-transcripts.expr', 'TranscriptDeNovo'),
612 ('cufflinks-0.9.0-transcripts.gtf', 'GeneModel'),
613 ('GENCODE-v3c-genes.expr', 'GeneGencV3c'),
614 ('GENCODE-v3c-transcripts.expr', 'TranscriptGencV3c'),
615 ('GENCODE-v4-genes.expr', 'GeneGencV4'),
616 ('GENCODE-v4-transcripts.expr', 'TranscriptGencV4'),
617 ('GENCODE-v4-transcript.expr', 'TranscriptGencV4'),
618 ('*_r1.fastq', 'FastqRd1'),
619 ('*_r2.fastq', 'FastqRd2'),
620 ('*.fastq', 'Fastq'),
621 ('*.gtf', 'GeneModel'),
624 ('*.stats.txt', 'InsLength'),
631 None: {"MapAlgorithm": "NA"},
632 "Paired": {"MapAlgorithm": ma},
633 "Single": {"MapAlgorithm": ma},
634 "Splices": {"MapAlgorithm": ma},
635 "Junctions": {"MapAlgorithm": ma},
636 "PlusSignal": {"MapAlgorithm": ma},
637 "MinusSignal": {"MapAlgorithm": ma},
638 "Signal": {"MapAlgorithm": ma},
639 "GeneModel": {"MapAlgorithm": ma},
640 "GeneDeNovo": {"MapAlgorithm": ma},
641 "TranscriptDeNovo": {"MapAlgorithm": ma},
642 "GeneGencV3c": {"MapAlgorithm": ma},
643 "TranscriptGencV3c": {"MapAlgorithm": ma},
644 "GeneGencV4": {"MapAlgorithm": ma},
645 "TranscriptGencV4": {"MapAlgorithm": ma},
646 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
647 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
648 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
649 "GeneModel": {"MapAlgorithm": ma},
650 "InsLength": {"MapAlgorithm": ma},
652 # view name is one of the attributes
653 for v in self.views.keys():
654 self.views[v]['view'] = v
656 def find_attributes(self, pathname, lib_id):
657 """Looking for the best extension
658 The 'best' is the longest match
661 filename (str): the filename whose extention we are about to examine
663 path, filename = os.path.splitext(pathname)
664 if not self.lib_cache.has_key(lib_id):
665 self.lib_cache[lib_id] = get_library_info(self.root_url,
666 self.apidata, lib_id)
668 lib_info = self.lib_cache[lib_id]
669 if lib_info['cell_line'].lower() == 'unknown':
670 logging.warn("Library %s missing cell_line" % (lib_id,))
672 'cell': lib_info['cell_line'],
673 'replicate': lib_info['replicate'],
675 is_paired = self._is_paired(lib_id, lib_info)
678 attributes.update(self.get_paired_attributes(lib_info))
680 attributes.update(self.get_single_attributes(lib_info))
682 for pattern, view in self.patterns:
683 if fnmatch.fnmatch(pathname, pattern):
685 view = view(is_paired=is_paired)
687 attributes.update(self.views[view])
688 attributes["extension"] = pattern
692 def _guess_bam_view(self, is_paired=True):
693 """Guess a view name based on library attributes
701 def _is_paired(self, lib_id, lib_info):
702 """Determine if a library is paired end"""
703 if len(lib_info["lane_set"]) == 0:
706 if not self.lib_paired.has_key(lib_id):
710 # check to see if all the flowcells are the same.
711 # otherwise we might need to do something complicated
712 for flowcell in lib_info["lane_set"]:
713 # yes there's also a status code, but this comparison
715 if flowcell["status"].lower() == "failed":
716 # ignore failed flowcell
719 elif flowcell["paired_end"]:
724 logging.debug("Library %s: %d paired, %d single, %d failed" % \
725 (lib_info["library_id"], is_paired, isnot_paired, failed))
727 if is_paired > isnot_paired:
728 self.lib_paired[lib_id] = True
729 elif is_paired < isnot_paired:
730 self.lib_paired[lib_id] = False
732 raise RuntimeError("Equal number of paired & unpaired lanes."\
733 "Can't guess library paired status")
735 return self.lib_paired[lib_id]
737 def get_paired_attributes(self, lib_info):
738 if lib_info['insert_size'] is None:
739 errmsg = "Library %s is missing insert_size, assuming 200"
740 logging.warn(errmsg % (lib_info["library_id"],))
743 insert_size = lib_info['insert_size']
744 return {'insertLength': insert_size,
747 def get_single_attributes(self, lib_info):
748 return {'insertLength':'ilNA',
752 def make_submission_section(line_counter, files, attributes):
754 Create a section in the submission ini file
756 inifile = [ "[line%s]" % (line_counter,) ]
757 inifile += ["files=%s" % (",".join(files))]
759 for k,v in attributes.items():
760 inifile += ["%s=%s" % (k,v)]
764 def make_base_name(pathname):
765 base = os.path.basename(pathname)
766 name, ext = os.path.splitext(base)
770 def make_submission_name(ininame):
771 name = make_base_name(ininame)
775 def make_ddf_name(pathname):
776 name = make_base_name(pathname)
780 def make_condor_name(pathname, run_type=None):
781 name = make_base_name(pathname)
783 if run_type is not None:
784 elements.append(run_type)
785 elements.append("condor")
786 return ".".join(elements)
789 def make_submit_script(target, header, body_list):
791 write out a text file
793 this was intended for condor submit scripts
796 target (str or stream):
797 if target is a string, we will open and close the file
798 if target is a stream, the caller is responsible.
801 header to write at the beginning of the file
802 body_list (list of strs):
803 a list of blocks to add to the file.
805 if type(target) in types.StringTypes:
810 for entry in body_list:
812 if type(target) in types.StringTypes:
815 def parse_filelist(file_string):
816 return file_string.split(",")
819 def validate_filelist(files):
821 Die if a file doesn't exist in a file list
824 if not os.path.exists(f):
825 raise RuntimeError("%s does not exist" % (f,))
828 if __name__ == "__main__":