2 from ConfigParser import SafeConfigParser
7 from optparse import OptionParser
9 from pprint import pprint, pformat
11 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
24 def main(cmdline=None):
25 parser = make_parser()
26 opts, args = parser.parse_args(cmdline)
29 logging.basicConfig(level = logging.DEBUG )
31 logging.basicConfig(level = logging.INFO )
33 logging.basicConfig(level = logging.WARNING )
35 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
37 if opts.host is None or opts.apiid is None or opts.apikey is None:
38 parser.error("Please specify host url, apiid, apikey")
41 parser.error("I need at least one library submission-dir input file")
43 library_result_map = []
45 library_result_map.extend(read_library_result_map(a))
47 if opts.daf is not None:
48 link_daf(opts.daf, library_result_map)
51 build_fastqs(opts.host,
58 make_submission_ini(opts.host, apidata, library_result_map)
61 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
65 # Load defaults from the config files
66 config = SafeConfigParser()
67 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
69 sequence_archive = None
73 SECTION = 'sequence_archive'
74 if config.has_section(SECTION):
75 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
76 sequence_archive = os.path.expanduser(sequence_archive)
77 apiid = config.get(SECTION, 'apiid', apiid)
78 apikey = config.get(SECTION, 'apikey', apikey)
79 apihost = config.get(SECTION, 'host', apihost)
81 parser = OptionParser()
84 parser.add_option('--fastq', help="generate scripts for making fastq files",
85 default=False, action="store_true")
87 parser.add_option('--ini', help="generate submission ini file", default=False,
90 parser.add_option('--makeddf', help='make the ddfs', default=False,
93 parser.add_option('--daf', default=None, help='specify daf name')
94 parser.add_option('--force', default=False, action="store_true",
95 help="Force regenerating fastqs")
97 # configuration options
98 parser.add_option('--apiid', default=apiid, help="Specify API ID")
99 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
100 parser.add_option('--host', default=apihost,
101 help="specify HTSWorkflow host",)
102 parser.add_option('--sequence', default=sequence_archive,
103 help="sequence repository")
106 parser.add_option('--verbose', default=False, action="store_true",
107 help='verbose logging')
108 parser.add_option('--debug', default=False, action="store_true",
109 help='debug logging')
114 def build_fastqs(host, apidata, sequences_path, library_result_map,
117 Generate condor scripts to build any needed fastq files
120 host (str): root of the htsworkflow api server
121 apidata (dict): id & key to post to the server
122 sequences_path (str): root of the directory tree to scan for files
123 library_result_map (list): [(library_id, destination directory), ...]
125 qseq_condor_header = """
127 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
128 error=log/qseq2fastq.err.$(process).log
129 output=log/qseq2fastq.out.$(process).log
130 log=log/qseq2fastq.log
133 qseq_condor_entries = []
134 srf_condor_header = """
136 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2fastq
137 output=log/srf_pair_fastq.out.$(process).log
138 error=log/srf_pair_fastq.err.$(process).log
139 log=log/srf_pair_fastq.log
140 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
143 srf_condor_entries = []
144 lib_db = find_archive_sequence_files(host,
149 needed_targets = find_missing_targets(library_result_map, lib_db, force)
151 for target_pathname, available_sources in needed_targets.items():
152 logging.debug(' target : %s' % (target_pathname,))
153 logging.debug(' candidate sources: %s' % (available_sources,))
154 if available_sources.has_key('qseq'):
155 source = available_sources['qseq']
156 qseq_condor_entries.append(
157 condor_qseq_to_fastq(source.path,
162 elif available_sources.has_key('srf'):
163 source = available_sources['srf']
164 mid = getattr(source, 'mid_point', None)
165 srf_condor_entries.append(
166 condor_srf_to_fastq(source.path,
174 print " need file", target_pathname
176 if len(srf_condor_entries) > 0:
177 make_submit_script('srf.fastq.condor',
181 if len(qseq_condor_entries) > 0:
182 make_submit_script('qseq.fastq.condor',
187 def find_missing_targets(library_result_map, lib_db, force=False):
189 Check if the sequence file exists.
190 This requires computing what the sequence name is and checking
191 to see if it can be found in the sequence location.
193 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
195 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
196 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
197 # find what targets we're missing
199 for lib_id, result_dir in library_result_map:
201 lane_dict = make_lane_dict(lib_db, lib_id)
203 for lane_key, sequences in lib['lanes'].items():
204 for seq in sequences:
205 seq.paired = lane_dict[seq.flowcell]['paired_end']
206 lane_status = lane_dict[seq.flowcell]['status']
208 if seq.paired and seq.read is None:
210 filename_attributes = {
211 'flowcell': seq.flowcell,
218 if lane_status == 'Failed':
220 if seq.flowcell == '30DY0AAXX':
221 # 30DY0 only ran for 151 bases instead of 152
222 # it is actually 76 1st read, 75 2nd read
227 target_name = fastq_paired_template % filename_attributes
229 target_name = fastq_single_template % filename_attributes
231 target_pathname = os.path.join(result_dir, target_name)
232 if force or not os.path.exists(target_pathname):
233 t = needed_targets.setdefault(target_pathname, {})
234 t[seq.filetype] = seq
236 return needed_targets
239 def link_daf(daf_path, library_result_map):
240 if not os.path.exists(daf_path):
241 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
243 base_daf = os.path.basename(daf_path)
245 for lib_id, result_dir in library_result_map:
246 submission_daf = os.path.join(result_dir, base_daf)
247 if not os.path.exists(submission_daf):
248 os.link(daf_path, submission_daf)
251 def make_submission_ini(host, apidata, library_result_map, paired=True):
252 #attributes = get_filename_attribute_map(paired)
253 view_map = NameToViewMap(host, apidata)
255 candidate_fastq_src = {}
257 for lib_id, result_dir in library_result_map:
258 order_by = ['order_by=files', 'view', 'replicate', 'cell',
259 'readType', 'mapAlgorithm', 'insertLength' ]
260 inifile = ['[config]']
261 inifile += [" ".join(order_by)]
264 result_ini = os.path.join(result_dir, result_dir+'.ini')
267 submission_files = os.listdir(result_dir)
269 for f in submission_files:
270 attributes = view_map.find_attributes(f, lib_id)
271 if attributes is None:
272 raise ValueError("Unrecognized file: %s" % (f,))
274 ext = attributes["extension"]
275 if attributes['view'] is None:
277 elif attributes.get("type", None) == 'fastq':
278 fastqs.setdefault(ext, set()).add(f)
281 make_submission_section(line_counter,
289 # add in fastqs on a single line.
290 for extension, fastq_set in fastqs.items():
292 make_submission_section(line_counter,
294 attributes[extension])
299 f = open(result_ini,'w')
300 f.write(os.linesep.join(inifile))
303 def make_lane_dict(lib_db, lib_id):
305 Convert the lane_set in a lib_db to a dictionary
306 indexed by flowcell ID
309 for lane in lib_db[lib_id]['lane_set']:
310 result.append((lane['flowcell'], lane))
314 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
316 for lib_id, result_dir in library_result_map:
317 ininame = result_dir+'.ini'
318 inipathname = os.path.join(result_dir, ininame)
319 if os.path.exists(inipathname):
321 make_ddf(ininame, daf_name, True, make_condor, result_dir)
324 if make_condor and len(dag_fragment) > 0:
325 dag_filename = 'submission.dagman'
326 if not force and os.path.exists(dag_filename):
327 logging.warn("%s exists, please delete" % (dag_filename,))
329 f = open(dag_filename,'w')
330 f.write( os.linesep.join(dag_fragment))
331 f.write( os.linesep )
335 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
337 Make ddf files, and bonus condor file
341 if outdir is not None:
346 ddf_name = make_ddf_name(ininame)
348 output = open(ddf_name,'w')
350 file_list = read_ddf_ini(ininame, output)
352 file_list.append(daf_name)
353 if ddf_name is not None:
354 file_list.append(ddf_name)
357 archive_condor = make_condor_archive_script(ininame, file_list)
358 upload_condor = make_condor_upload_script(ininame)
360 dag_fragments.extend(
361 make_dag_fragment(ininame, archive_condor, upload_condor)
369 def read_ddf_ini(filename, output=sys.stdout):
371 Read a ini file and dump out a tab delmited text file
374 config = SafeConfigParser()
375 config.read(filename)
377 order_by = shlex.split(config.get("config", "order_by"))
379 output.write("\t".join(order_by))
380 output.write(os.linesep)
381 sections = config.sections()
383 for section in sections:
384 if section == "config":
385 # skip the config block
389 v = config.get(section, key)
392 file_list.extend(parse_filelist(v))
394 output.write("\t".join(values))
395 output.write(os.linesep)
399 def read_library_result_map(filename):
401 Read a file that maps library id to result directory.
402 Does not support spaces in filenames.
407 stream = open(filename,'r')
412 if not line.startswith('#') and len(line) > 0 :
413 library_id, result_dir = line.split()
414 results.append((library_id, result_dir))
418 def make_condor_archive_script(ininame, files):
419 script = """Universe = vanilla
421 Executable = /bin/tar
422 arguments = czvf ../%(archivename)s %(filelist)s
424 Error = compress.err.$(Process).log
425 Output = compress.out.$(Process).log
426 Log = /tmp/submission-compress-%(user)s.log
427 initialdir = %(initialdir)s
432 if not os.path.exists(f):
433 raise RuntimeError("Missing %s" % (f,))
435 context = {'archivename': make_submission_name(ininame),
436 'filelist': " ".join(files),
437 'initialdir': os.getcwd(),
438 'user': os.getlogin()}
440 condor_script = make_condor_name(ininame, 'archive')
441 condor_stream = open(condor_script,'w')
442 condor_stream.write(script % context)
443 condor_stream.close()
447 def make_condor_upload_script(ininame):
448 script = """Universe = vanilla
450 Executable = /usr/bin/lftp
451 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/%(archivename)s
453 Error = upload.err.$(Process).log
454 Output = upload.out.$(Process).log
455 Log = /tmp/submission-upload-%(user)s.log
456 initialdir = %(initialdir)s
460 context = {'archivename': make_submission_name(ininame),
461 'initialdir': os.getcwd(),
462 'user': os.getlogin()}
464 condor_script = make_condor_name(ininame, 'upload')
465 condor_stream = open(condor_script,'w')
466 condor_stream.write(script % context)
467 condor_stream.close()
471 def make_dag_fragment(ininame, archive_condor, upload_condor):
473 Make the couple of fragments compress and then upload the data.
475 cur_dir = os.getcwd()
476 archive_condor = os.path.join(cur_dir, archive_condor)
477 upload_condor = os.path.join(cur_dir, upload_condor)
478 job_basename = make_base_name(ininame)
481 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
482 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
483 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
488 def get_library_info(host, apidata, library_id):
489 url = api.library_url(host, library_id)
490 contents = api.retrieve_info(url, apidata)
494 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
495 mid=None, force=False):
498 args.extend(['--left', target_pathname])
499 # this is ugly. I did it because I was pregenerating the target
500 # names before I tried to figure out what sources could generate
501 # those targets, and everything up to this point had been
502 # one-to-one. So I couldn't figure out how to pair the
504 # With this at least the command will run correctly.
505 # however if we rename the default targets, this'll break
506 # also I think it'll generate it twice.
507 args.extend(['--right',
508 target_pathname.replace('_r1.fastq', '_r2.fastq')])
510 args.extend(['--single', target_pathname ])
511 if flowcell is not None:
512 args.extend(['--flowcell', flowcell])
515 args.extend(['-m', str(mid)])
518 args.extend(['--force'])
523 """ % (" ".join(args),)
528 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
529 args = ['-i', qseq_file, '-o', target_pathname ]
530 if flowcell is not None:
531 args.extend(['-f', flowcell])
535 """ % (" ".join(args))
539 def find_archive_sequence_files(host, apidata, sequences_path,
542 Find all the archive sequence files possibly associated with our results.
545 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
549 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
551 for lib_id, result_dir in library_result_map:
552 lib_info = get_library_info(host, apidata, lib_id)
553 lib_info['lanes'] = {}
554 lib_db[lib_id] = lib_info
556 for lane in lib_info['lane_set']:
557 lane_key = (lane['flowcell'], lane['lane_number'])
558 candidate_lanes[lane_key] = lib_id
559 seq_dirs.add(os.path.join(sequences_path,
562 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
563 candidate_seq_list = scan_for_sequences(seq_dirs)
565 # at this point we have too many sequences as scan_for_sequences
566 # returns all the sequences in a flowcell directory
567 # so lets filter out the extras
569 for seq in candidate_seq_list:
570 lane_key = (seq.flowcell, seq.lane)
571 lib_id = candidate_lanes.get(lane_key, None)
572 if lib_id is not None:
573 lib_info = lib_db[lib_id]
574 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
579 class NameToViewMap(object):
580 """Determine view attributes for a given submission file name
582 def __init__(self, root_url, apidata):
583 self.root_url = root_url
584 self.apidata = apidata
587 # ma is "map algorithm"
592 ('*.bam', self._guess_bam_view),
593 ('*.splices.bam', 'Splices'),
594 ('*.jnct', 'Junctions'),
595 ('*.plus.bigwig', 'PlusSignal'),
596 ('*.minus.bigwig', 'MinusSignal'),
597 ('*.bigwig', 'Signal'),
602 ('cufflinks-0.9.0-genes.expr', 'GeneDeNovo'),
603 ('cufflinks-0.9.0-transcripts.expr', 'TranscriptDeNovo'),
604 ('cufflinks-0.9.0-transcripts.gtf', 'GeneModel'),
605 ('GENCODE-v3c-genes.expr', 'GeneGencV3c'),
606 ('GENCODE-v3c-transcripts.expr', 'TranscriptGencV3c'),
607 ('GENCODE-v4-genes.expr', 'GeneGencV4'),
608 ('GENCODE-v4-transcripts.expr', 'TranscriptGencV4'),
609 ('GENCODE-v4-transcript.expr', 'TranscriptGencV4'),
610 ('*_r1.fastq', 'FastqRd1'),
611 ('*_r2.fastq', 'FastqRd2'),
612 ('*.fastq', 'Fastq'),
613 ('*.gtf', 'GeneModel'),
616 ('*.stats.txt', 'InsLength'),
623 None: {"MapAlgorithm": "NA"},
624 "Paired": {"MapAlgorithm": ma},
625 "Single": {"MapAlgorithm": ma},
626 "Splices": {"MapAlgorithm": ma},
627 "Junctions": {"MapAlgorithm": ma},
628 "PlusSignal": {"MapAlgorithm": ma},
629 "MinusSignal": {"MapAlgorithm": ma},
630 "Signal": {"MapAlgorithm": ma},
631 "GeneModel": {"MapAlgorithm": ma},
632 "GeneDeNovo": {"MapAlgorithm": ma},
633 "TranscriptDeNovo": {"MapAlgorithm": ma},
634 "GeneGencV3c": {"MapAlgorithm": ma},
635 "TranscriptGencV3c": {"MapAlgorithm": ma},
636 "GeneGencV4": {"MapAlgorithm": ma},
637 "TranscriptGencV4": {"MapAlgorithm": ma},
638 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
639 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
640 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
641 "GeneModel": {"MapAlgorithm": ma},
642 "InsLength": {"MapAlgorithm": ma},
644 # view name is one of the attributes
645 for v in self.views.keys():
646 self.views[v]['view'] = v
648 def find_attributes(self, pathname, lib_id):
649 """Looking for the best extension
650 The 'best' is the longest match
653 filename (str): the filename whose extention we are about to examine
655 path, filename = os.path.splitext(pathname)
656 if not self.lib_cache.has_key(lib_id):
657 self.lib_cache[lib_id] = get_library_info(self.root_url,
658 self.apidata, lib_id)
660 lib_info = self.lib_cache[lib_id]
661 if lib_info['cell_line'].lower() == 'unknown':
662 logging.warn("Library %s missing cell_line" % (lib_id,))
664 'cell': lib_info['cell_line'],
665 'replicate': lib_info['replicate'],
667 is_paired = self._is_paired(lib_info)
670 attributes.update(self.get_paired_attributes(lib_info))
672 attributes.update(self.get_single_attributes(lib_info))
674 for pattern, view in self.patterns:
675 if fnmatch.fnmatch(pathname, pattern):
677 view = view(is_paired=is_paired)
679 attributes.update(self.views[view])
680 attributes["extension"] = pattern
684 def _guess_bam_view(self, is_paired=True):
685 """Guess a view name based on library attributes
693 def _is_paired(self, lib_info):
694 """Determine if a library is paired end"""
695 if len(lib_info["lane_set"]) == 0:
700 # check to see if all the flowcells are the same.
701 # otherwise we might need to do something complicated
702 for flowcell in lib_info["lane_set"]:
703 if flowcell["paired_end"]:
708 logging.debug("Library %s: %d were, %d were not paired" % \
709 (lib_info["library_id"], is_paired, isnot_paired))
711 if is_paired > isnot_paired:
713 elif is_paired < isnot_paired:
716 raise RuntimeError("Assumptions about paired vs not paired are wrong")
718 def get_paired_attributes(self, lib_info):
719 if lib_info['insert_size'] is None:
720 errmsg = "Library %s is missing insert_size, assuming 200"
721 logging.warn(errmsg % (lib_info["library_id"],))
724 insert_size = lib_info['insert_size']
725 return {'insertLength': insert_size,
728 def get_single_attributes(self, lib_info):
729 return {'insertLength':'ilNA',
733 def make_submission_section(line_counter, files, attributes):
735 Create a section in the submission ini file
737 inifile = [ "[line%s]" % (line_counter,) ]
738 inifile += ["files=%s" % (",".join(files))]
740 for k,v in attributes.items():
741 inifile += ["%s=%s" % (k,v)]
745 def make_base_name(pathname):
746 base = os.path.basename(pathname)
747 name, ext = os.path.splitext(base)
751 def make_submission_name(ininame):
752 name = make_base_name(ininame)
756 def make_ddf_name(pathname):
757 name = make_base_name(pathname)
761 def make_condor_name(pathname, run_type=None):
762 name = make_base_name(pathname)
764 if run_type is not None:
765 elements.append(run_type)
766 elements.append("condor")
767 return ".".join(elements)
770 def make_submit_script(target, header, body_list):
772 write out a text file
774 this was intended for condor submit scripts
777 target (str or stream):
778 if target is a string, we will open and close the file
779 if target is a stream, the caller is responsible.
782 header to write at the beginning of the file
783 body_list (list of strs):
784 a list of blocks to add to the file.
786 if type(target) in types.StringTypes:
791 for entry in body_list:
793 if type(target) in types.StringTypes:
796 def parse_filelist(file_string):
797 return file_string.split(",")
800 def validate_filelist(files):
802 Die if a file doesn't exist in a file list
805 if not os.path.exists(f):
806 raise RuntimeError("%s does not exist" % (f,))
809 if __name__ == "__main__":