2 from ConfigParser import SafeConfigParser
7 from optparse import OptionParser
9 from pprint import pprint, pformat
11 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
24 def main(cmdline=None):
25 parser = make_parser()
26 opts, args = parser.parse_args(cmdline)
29 logging.basicConfig(level = logging.DEBUG )
31 logging.basicConfig(level = logging.INFO )
33 logging.basicConfig(level = logging.WARNING )
35 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
37 if opts.host is None or opts.apiid is None or opts.apikey is None:
38 parser.error("Please specify host url, apiid, apikey")
41 parser.error("I need at least one library submission-dir input file")
43 library_result_map = []
45 library_result_map.extend(read_library_result_map(a))
47 if opts.daf is not None:
48 link_daf(opts.daf, library_result_map)
51 build_fastqs(opts.host,
58 make_submission_ini(opts.host, apidata, library_result_map)
61 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
65 # Load defaults from the config files
66 config = SafeConfigParser()
67 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
69 sequence_archive = None
73 SECTION = 'sequence_archive'
74 if config.has_section(SECTION):
75 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
76 sequence_archive = os.path.expanduser(sequence_archive)
77 apiid = config.get(SECTION, 'apiid', apiid)
78 apikey = config.get(SECTION, 'apikey', apikey)
79 apihost = config.get(SECTION, 'host', apihost)
81 parser = OptionParser()
84 parser.add_option('--fastq', help="generate scripts for making fastq files",
85 default=False, action="store_true")
87 parser.add_option('--ini', help="generate submission ini file", default=False,
90 parser.add_option('--makeddf', help='make the ddfs', default=False,
93 parser.add_option('--daf', default=None, help='specify daf name')
94 parser.add_option('--force', default=False, action="store_true",
95 help="Force regenerating fastqs")
97 # configuration options
98 parser.add_option('--apiid', default=apiid, help="Specify API ID")
99 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
100 parser.add_option('--host', default=apihost,
101 help="specify HTSWorkflow host",)
102 parser.add_option('--sequence', default=sequence_archive,
103 help="sequence repository")
106 parser.add_option('--verbose', default=False, action="store_true",
107 help='verbose logging')
108 parser.add_option('--debug', default=False, action="store_true",
109 help='debug logging')
114 def build_fastqs(host, apidata, sequences_path, library_result_map,
117 Generate condor scripts to build any needed fastq files
120 host (str): root of the htsworkflow api server
121 apidata (dict): id & key to post to the server
122 sequences_path (str): root of the directory tree to scan for files
123 library_result_map (list): [(library_id, destination directory), ...]
125 qseq_condor_header = """
127 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
128 error=log/qseq2fastq.err.$(process).log
129 output=log/qseq2fastq.out.$(process).log
130 log=log/qseq2fastq.log
133 qseq_condor_entries = []
134 srf_condor_header = """
136 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2fastq
137 output=log/srf_pair_fastq.out.$(process).log
138 error=log/srf_pair_fastq.err.$(process).log
139 log=log/srf_pair_fastq.log
140 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
143 srf_condor_entries = []
144 lib_db = find_archive_sequence_files(host,
149 needed_targets = find_missing_targets(library_result_map, lib_db, force)
151 for target_pathname, available_sources in needed_targets.items():
152 logging.debug(' target : %s' % (target_pathname,))
153 logging.debug(' candidate sources: %s' % (available_sources,))
154 if available_sources.has_key('qseq'):
155 source = available_sources['qseq']
156 qseq_condor_entries.append(
157 condor_qseq_to_fastq(source.path,
162 elif available_sources.has_key('srf'):
163 source = available_sources['srf']
164 mid = getattr(source, 'mid_point', None)
165 srf_condor_entries.append(
166 condor_srf_to_fastq(source.path,
174 print " need file", target_pathname
176 if len(srf_condor_entries) > 0:
177 make_submit_script('srf.fastq.condor',
181 if len(qseq_condor_entries) > 0:
182 make_submit_script('qseq.fastq.condor',
187 def find_missing_targets(library_result_map, lib_db, force=False):
189 Check if the sequence file exists.
190 This requires computing what the sequence name is and checking
191 to see if it can be found in the sequence location.
193 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
195 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
196 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
197 # find what targets we're missing
199 for lib_id, result_dir in library_result_map:
201 lane_dict = make_lane_dict(lib_db, lib_id)
203 for lane_key, sequences in lib['lanes'].items():
204 for seq in sequences:
205 seq.paired = lane_dict[seq.flowcell]['paired_end']
206 lane_status = lane_dict[seq.flowcell]['status']
208 if seq.paired and seq.read is None:
210 filename_attributes = {
211 'flowcell': seq.flowcell,
218 if lane_status == 'Failed':
220 if seq.flowcell == '30DY0AAXX':
221 # 30DY0 only ran for 151 bases instead of 152
222 # it is actually 76 1st read, 75 2nd read
227 target_name = fastq_paired_template % filename_attributes
229 target_name = fastq_single_template % filename_attributes
231 target_pathname = os.path.join(result_dir, target_name)
232 if force or not os.path.exists(target_pathname):
233 t = needed_targets.setdefault(target_pathname, {})
234 t[seq.filetype] = seq
236 return needed_targets
239 def link_daf(daf_path, library_result_map):
240 if not os.path.exists(daf_path):
241 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
243 base_daf = os.path.basename(daf_path)
245 for lib_id, result_dir in library_result_map:
246 submission_daf = os.path.join(result_dir, base_daf)
247 if not os.path.exists(submission_daf):
248 os.link(daf_path, submission_daf)
251 def make_submission_ini(host, apidata, library_result_map, paired=True):
252 #attributes = get_filename_attribute_map(paired)
253 view_map = NameToViewMap(host, apidata)
255 candidate_fastq_src = {}
257 for lib_id, result_dir in library_result_map:
258 order_by = ['order_by=files', 'view', 'replicate', 'cell',
259 'readType', 'mapAlgorithm', 'insertLength' ]
260 inifile = ['[config]']
261 inifile += [" ".join(order_by)]
264 result_ini = os.path.join(result_dir, result_dir+'.ini')
267 submission_files = os.listdir(result_dir)
269 for f in submission_files:
270 attributes = view_map.find_attributes(f, lib_id)
271 if attributes is None:
272 raise ValueError("Unrecognized file: %s" % (f,))
274 ext = attributes["extension"]
275 if attributes['view'] is None:
277 elif attributes.get("type", None) == 'fastq':
278 fastqs.setdefault(ext, set()).add(f)
281 make_submission_section(line_counter,
289 # add in fastqs on a single line.
290 for extension, fastq_set in fastqs.items():
292 make_submission_section(line_counter,
294 attributes[extension])
299 f = open(result_ini,'w')
300 f.write(os.linesep.join(inifile))
303 def make_lane_dict(lib_db, lib_id):
305 Convert the lane_set in a lib_db to a dictionary
306 indexed by flowcell ID
309 for lane in lib_db[lib_id]['lane_set']:
310 result.append((lane['flowcell'], lane))
314 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
316 for lib_id, result_dir in library_result_map:
317 ininame = result_dir+'.ini'
318 inipathname = os.path.join(result_dir, ininame)
319 if os.path.exists(inipathname):
321 make_ddf(ininame, daf_name, True, make_condor, result_dir)
324 if make_condor and len(dag_fragment) > 0:
325 dag_filename = 'submission.dagman'
326 if not force and os.path.exists(dag_filename):
327 logging.warn("%s exists, please delete" % (dag_filename,))
329 f = open(dag_filename,'w')
330 f.write( os.linesep.join(dag_fragment))
331 f.write( os.linesep )
335 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
337 Make ddf files, and bonus condor file
341 if outdir is not None:
346 ddf_name = make_ddf_name(ininame)
348 output = open(ddf_name,'w')
350 file_list = read_ddf_ini(ininame, output)
352 file_list.append(daf_name)
353 if ddf_name is not None:
354 file_list.append(ddf_name)
357 archive_condor = make_condor_archive_script(ininame, file_list)
358 upload_condor = make_condor_upload_script(ininame)
360 dag_fragments.extend(
361 make_dag_fragment(ininame, archive_condor, upload_condor)
369 def read_ddf_ini(filename, output=sys.stdout):
371 Read a ini file and dump out a tab delmited text file
374 config = SafeConfigParser()
375 config.read(filename)
377 order_by = shlex.split(config.get("config", "order_by"))
379 output.write("\t".join(order_by))
380 output.write(os.linesep)
381 sections = config.sections()
383 for section in sections:
384 if section == "config":
385 # skip the config block
389 v = config.get(section, key)
392 file_list.extend(parse_filelist(v))
394 output.write("\t".join(values))
395 output.write(os.linesep)
399 def read_library_result_map(filename):
401 Read a file that maps library id to result directory.
402 Does not support spaces in filenames.
407 stream = open(filename,'r')
412 if not line.startswith('#') and len(line) > 0 :
413 library_id, result_dir = line.split()
414 results.append((library_id, result_dir))
418 def make_condor_archive_script(ininame, files):
419 script = """Universe = vanilla
421 Executable = /bin/tar
422 arguments = czvf ../%(archivename)s %(filelist)s
424 Error = compress.err.$(Process).log
425 Output = compress.out.$(Process).log
426 Log = /tmp/submission-compress-%(user)s.log
427 initialdir = %(initialdir)s
432 if not os.path.exists(f):
433 raise RuntimeError("Missing %s" % (f,))
435 context = {'archivename': make_submission_name(ininame),
436 'filelist': " ".join(files),
437 'initialdir': os.getcwd(),
438 'user': os.getlogin()}
440 condor_script = make_condor_name(ininame, 'archive')
441 condor_stream = open(condor_script,'w')
442 condor_stream.write(script % context)
443 condor_stream.close()
447 def make_condor_upload_script(ininame):
448 script = """Universe = vanilla
450 Executable = /usr/bin/lftp
451 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/%(archivename)s
453 Error = upload.err.$(Process).log
454 Output = upload.out.$(Process).log
455 Log = /tmp/submission-upload-%(user)s.log
456 initialdir = %(initialdir)s
460 context = {'archivename': make_submission_name(ininame),
461 'initialdir': os.getcwd(),
462 'user': os.getlogin()}
464 condor_script = make_condor_name(ininame, 'upload')
465 condor_stream = open(condor_script,'w')
466 condor_stream.write(script % context)
467 condor_stream.close()
471 def make_dag_fragment(ininame, archive_condor, upload_condor):
473 Make the couple of fragments compress and then upload the data.
475 cur_dir = os.getcwd()
476 archive_condor = os.path.join(cur_dir, archive_condor)
477 upload_condor = os.path.join(cur_dir, upload_condor)
478 job_basename = make_base_name(ininame)
481 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
482 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
483 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
488 def get_library_info(host, apidata, library_id):
489 url = api.library_url(host, library_id)
490 contents = api.retrieve_info(url, apidata)
494 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
495 mid=None, force=False):
498 args.extend(['--left', target_pathname])
499 # this is ugly. I did it because I was pregenerating the target
500 # names before I tried to figure out what sources could generate
501 # those targets, and everything up to this point had been
502 # one-to-one. So I couldn't figure out how to pair the
504 # With this at least the command will run correctly.
505 # however if we rename the default targets, this'll break
506 # also I think it'll generate it twice.
507 args.extend(['--right',
508 target_pathname.replace('_r1.fastq', '_r2.fastq')])
510 args.extend(['--single', target_pathname ])
511 if flowcell is not None:
512 args.extend(['--flowcell', flowcell])
515 args.extend(['-m', str(mid)])
518 args.extend(['--force'])
523 """ % (" ".join(args),)
528 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
529 args = ['-i', qseq_file, '-o', target_pathname ]
530 if flowcell is not None:
531 args.extend(['-f', flowcell])
535 """ % (" ".join(args))
539 def find_archive_sequence_files(host, apidata, sequences_path,
542 Find all the archive sequence files possibly associated with our results.
545 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
549 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
551 for lib_id, result_dir in library_result_map:
552 lib_info = get_library_info(host, apidata, lib_id)
553 lib_info['lanes'] = {}
554 lib_db[lib_id] = lib_info
556 for lane in lib_info['lane_set']:
557 lane_key = (lane['flowcell'], lane['lane_number'])
558 candidate_lanes[lane_key] = lib_id
559 seq_dirs.add(os.path.join(sequences_path,
562 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
563 candidate_seq_list = scan_for_sequences(seq_dirs)
565 # at this point we have too many sequences as scan_for_sequences
566 # returns all the sequences in a flowcell directory
567 # so lets filter out the extras
569 for seq in candidate_seq_list:
570 lane_key = (seq.flowcell, seq.lane)
571 lib_id = candidate_lanes.get(lane_key, None)
572 if lib_id is not None:
573 lib_info = lib_db[lib_id]
574 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
579 class NameToViewMap(object):
580 """Determine view attributes for a given submission file name
582 def __init__(self, root_url, apidata):
583 self.root_url = root_url
584 self.apidata = apidata
588 # ma is "map algorithm"
593 ('*.bam', self._guess_bam_view),
594 ('*.splices.bam', 'Splices'),
595 ('*.jnct', 'Junctions'),
596 ('*.plus.bigwig', 'PlusSignal'),
597 ('*.minus.bigwig', 'MinusSignal'),
598 ('*.bigwig', 'Signal'),
603 ('cufflinks-0.9.0-genes.expr', 'GeneDeNovo'),
604 ('cufflinks-0.9.0-transcripts.expr', 'TranscriptDeNovo'),
605 ('cufflinks-0.9.0-transcripts.gtf', 'GeneModel'),
606 ('GENCODE-v3c-genes.expr', 'GeneGencV3c'),
607 ('GENCODE-v3c-transcripts.expr', 'TranscriptGencV3c'),
608 ('GENCODE-v4-genes.expr', 'GeneGencV4'),
609 ('GENCODE-v4-transcripts.expr', 'TranscriptGencV4'),
610 ('GENCODE-v4-transcript.expr', 'TranscriptGencV4'),
611 ('*_r1.fastq', 'FastqRd1'),
612 ('*_r2.fastq', 'FastqRd2'),
613 ('*.fastq', 'Fastq'),
614 ('*.gtf', 'GeneModel'),
617 ('*.stats.txt', 'InsLength'),
624 None: {"MapAlgorithm": "NA"},
625 "Paired": {"MapAlgorithm": ma},
626 "Single": {"MapAlgorithm": ma},
627 "Splices": {"MapAlgorithm": ma},
628 "Junctions": {"MapAlgorithm": ma},
629 "PlusSignal": {"MapAlgorithm": ma},
630 "MinusSignal": {"MapAlgorithm": ma},
631 "Signal": {"MapAlgorithm": ma},
632 "GeneModel": {"MapAlgorithm": ma},
633 "GeneDeNovo": {"MapAlgorithm": ma},
634 "TranscriptDeNovo": {"MapAlgorithm": ma},
635 "GeneGencV3c": {"MapAlgorithm": ma},
636 "TranscriptGencV3c": {"MapAlgorithm": ma},
637 "GeneGencV4": {"MapAlgorithm": ma},
638 "TranscriptGencV4": {"MapAlgorithm": ma},
639 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
640 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
641 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
642 "GeneModel": {"MapAlgorithm": ma},
643 "InsLength": {"MapAlgorithm": ma},
645 # view name is one of the attributes
646 for v in self.views.keys():
647 self.views[v]['view'] = v
649 def find_attributes(self, pathname, lib_id):
650 """Looking for the best extension
651 The 'best' is the longest match
654 filename (str): the filename whose extention we are about to examine
656 path, filename = os.path.splitext(pathname)
657 if not self.lib_cache.has_key(lib_id):
658 self.lib_cache[lib_id] = get_library_info(self.root_url,
659 self.apidata, lib_id)
661 lib_info = self.lib_cache[lib_id]
662 if lib_info['cell_line'].lower() == 'unknown':
663 logging.warn("Library %s missing cell_line" % (lib_id,))
665 'cell': lib_info['cell_line'],
666 'replicate': lib_info['replicate'],
668 is_paired = self._is_paired(lib_id, lib_info)
671 attributes.update(self.get_paired_attributes(lib_info))
673 attributes.update(self.get_single_attributes(lib_info))
675 for pattern, view in self.patterns:
676 if fnmatch.fnmatch(pathname, pattern):
678 view = view(is_paired=is_paired)
680 attributes.update(self.views[view])
681 attributes["extension"] = pattern
685 def _guess_bam_view(self, is_paired=True):
686 """Guess a view name based on library attributes
694 def _is_paired(self, lib_id, lib_info):
695 """Determine if a library is paired end"""
696 if len(lib_info["lane_set"]) == 0:
699 if not self.lib_paired.has_key(lib_id):
703 # check to see if all the flowcells are the same.
704 # otherwise we might need to do something complicated
705 for flowcell in lib_info["lane_set"]:
706 # yes there's also a status code, but this comparison
708 if flowcell["status"].lower() == "failed":
709 # ignore failed flowcell
712 elif flowcell["paired_end"]:
717 logging.debug("Library %s: %d paired, %d single, %d failed" % \
718 (lib_info["library_id"], is_paired, isnot_paired, failed))
720 if is_paired > isnot_paired:
721 self.lib_paired[lib_id] = True
722 elif is_paired < isnot_paired:
723 self.lib_paired[lib_id] = False
725 raise RuntimeError("Equal number of paired & unpaired lanes."\
726 "Can't guess library paired status")
728 return self.lib_paired[lib_id]
730 def get_paired_attributes(self, lib_info):
731 if lib_info['insert_size'] is None:
732 errmsg = "Library %s is missing insert_size, assuming 200"
733 logging.warn(errmsg % (lib_info["library_id"],))
736 insert_size = lib_info['insert_size']
737 return {'insertLength': insert_size,
740 def get_single_attributes(self, lib_info):
741 return {'insertLength':'ilNA',
745 def make_submission_section(line_counter, files, attributes):
747 Create a section in the submission ini file
749 inifile = [ "[line%s]" % (line_counter,) ]
750 inifile += ["files=%s" % (",".join(files))]
752 for k,v in attributes.items():
753 inifile += ["%s=%s" % (k,v)]
757 def make_base_name(pathname):
758 base = os.path.basename(pathname)
759 name, ext = os.path.splitext(base)
763 def make_submission_name(ininame):
764 name = make_base_name(ininame)
768 def make_ddf_name(pathname):
769 name = make_base_name(pathname)
773 def make_condor_name(pathname, run_type=None):
774 name = make_base_name(pathname)
776 if run_type is not None:
777 elements.append(run_type)
778 elements.append("condor")
779 return ".".join(elements)
782 def make_submit_script(target, header, body_list):
784 write out a text file
786 this was intended for condor submit scripts
789 target (str or stream):
790 if target is a string, we will open and close the file
791 if target is a stream, the caller is responsible.
794 header to write at the beginning of the file
795 body_list (list of strs):
796 a list of blocks to add to the file.
798 if type(target) in types.StringTypes:
803 for entry in body_list:
805 if type(target) in types.StringTypes:
808 def parse_filelist(file_string):
809 return file_string.split(",")
812 def validate_filelist(files):
814 Die if a file doesn't exist in a file list
817 if not os.path.exists(f):
818 raise RuntimeError("%s does not exist" % (f,))
821 if __name__ == "__main__":