2 from ConfigParser import SafeConfigParser
7 from optparse import OptionParser
9 from pprint import pprint, pformat
11 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
24 def main(cmdline=None):
25 parser = make_parser()
26 opts, args = parser.parse_args(cmdline)
29 logging.basicConfig(level = logging.DEBUG )
31 logging.basicConfig(level = logging.INFO )
33 logging.basicConfig(level = logging.WARNING )
35 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
37 if opts.host is None or opts.apiid is None or opts.apikey is None:
38 parser.error("Please specify host url, apiid, apikey")
41 parser.error("I need at least one library submission-dir input file")
43 library_result_map = []
45 library_result_map.extend(read_library_result_map(a))
47 if opts.daf is not None:
48 link_daf(opts.daf, library_result_map)
51 build_fastqs(opts.host,
58 make_submission_ini(opts.host, apidata, library_result_map)
61 make_all_ddfs(library_result_map, opts.daf, force=opts.force)
65 # Load defaults from the config files
66 config = SafeConfigParser()
67 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
69 sequence_archive = None
73 SECTION = 'sequence_archive'
74 if config.has_section(SECTION):
75 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
76 sequence_archive = os.path.expanduser(sequence_archive)
77 apiid = config.get(SECTION, 'apiid', apiid)
78 apikey = config.get(SECTION, 'apikey', apikey)
79 apihost = config.get(SECTION, 'host', apihost)
81 parser = OptionParser()
84 parser.add_option('--fastq', help="generate scripts for making fastq files",
85 default=False, action="store_true")
87 parser.add_option('--ini', help="generate submission ini file", default=False,
90 parser.add_option('--makeddf', help='make the ddfs', default=False,
93 parser.add_option('--daf', default=None, help='specify daf name')
94 parser.add_option('--force', default=False, action="store_true",
95 help="Force regenerating fastqs")
97 # configuration options
98 parser.add_option('--apiid', default=apiid, help="Specify API ID")
99 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
100 parser.add_option('--host', default=apihost,
101 help="specify HTSWorkflow host",)
102 parser.add_option('--sequence', default=sequence_archive,
103 help="sequence repository")
106 parser.add_option('--verbose', default=False, action="store_true",
107 help='verbose logging')
108 parser.add_option('--debug', default=False, action="store_true",
109 help='debug logging')
114 def build_fastqs(host, apidata, sequences_path, library_result_map,
117 Generate condor scripts to build any needed fastq files
120 host (str): root of the htsworkflow api server
121 apidata (dict): id & key to post to the server
122 sequences_path (str): root of the directory tree to scan for files
123 library_result_map (list): [(library_id, destination directory), ...]
125 qseq_condor_header = """
127 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
128 error=log/qseq2fastq.err.$(process).log
129 output=log/qseq2fastq.out.$(process).log
130 log=log/qseq2fastq.log
133 qseq_condor_entries = []
134 srf_condor_header = """
136 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2fastq
137 output=log/srf_pair_fastq.out.$(process).log
138 error=log/srf_pair_fastq.err.$(process).log
139 log=log/srf_pair_fastq.log
140 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
143 srf_condor_entries = []
144 lib_db = find_archive_sequence_files(host,
149 needed_targets = find_missing_targets(library_result_map, lib_db, force)
151 for target_pathname, available_sources in needed_targets.items():
152 logging.debug(' target : %s' % (target_pathname,))
153 logging.debug(' candidate sources: %s' % (available_sources,))
154 if available_sources.has_key('qseq'):
155 source = available_sources['qseq']
156 qseq_condor_entries.append(
157 condor_qseq_to_fastq(source.path,
162 elif available_sources.has_key('srf'):
163 source = available_sources['srf']
164 mid = getattr(source, 'mid_point', None)
165 srf_condor_entries.append(
166 condor_srf_to_fastq(source.path,
174 print " need file", target_pathname
176 if len(srf_condor_entries) > 0:
177 make_submit_script('srf.fastq.condor',
181 if len(qseq_condor_entries) > 0:
182 make_submit_script('qseq.fastq.condor',
187 def find_missing_targets(library_result_map, lib_db, force=False):
189 Check if the sequence file exists.
190 This requires computing what the sequence name is and checking
191 to see if it can be found in the sequence location.
193 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
195 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
196 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
197 # find what targets we're missing
199 for lib_id, result_dir in library_result_map:
201 lane_dict = make_lane_dict(lib_db, lib_id)
203 for lane_key, sequences in lib['lanes'].items():
204 for seq in sequences:
205 seq.paired = lane_dict[seq.flowcell]['paired_end']
206 lane_status = lane_dict[seq.flowcell]['status']
208 if seq.paired and seq.read is None:
210 filename_attributes = {
211 'flowcell': seq.flowcell,
218 if lane_status == 'Failed':
220 if seq.flowcell == '30DY0AAXX':
221 # 30DY0 only ran for 151 bases instead of 152
222 # it is actually 76 1st read, 75 2nd read
227 target_name = fastq_paired_template % filename_attributes
229 target_name = fastq_single_template % filename_attributes
231 target_pathname = os.path.join(result_dir, target_name)
232 if force or not os.path.exists(target_pathname):
233 t = needed_targets.setdefault(target_pathname, {})
234 t[seq.filetype] = seq
236 return needed_targets
239 def link_daf(daf_path, library_result_map):
240 if not os.path.exists(daf_path):
241 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
243 base_daf = os.path.basename(daf_path)
245 for lib_id, result_dir in library_result_map:
246 submission_daf = os.path.join(result_dir, base_daf)
247 if not os.path.exists(submission_daf):
248 os.link(daf_path, submission_daf)
251 def make_submission_ini(host, apidata, library_result_map, paired=True):
252 #attributes = get_filename_attribute_map(paired)
253 view_map = NameToViewMap(host, apidata)
255 candidate_fastq_src = {}
257 for lib_id, result_dir in library_result_map:
258 order_by = ['order_by=files', 'view', 'replicate', 'cell',
259 'readType', 'mapAlgorithm', 'insertLength' ]
260 inifile = ['[config]']
261 inifile += [" ".join(order_by)]
264 result_ini = os.path.join(result_dir, result_dir+'.ini')
267 submission_files = os.listdir(result_dir)
269 for f in submission_files:
270 attributes = view_map.find_attributes(f, lib_id)
271 if attributes is None:
272 raise ValueError("Unrecognized file: %s" % (f,))
274 ext = attributes["extension"]
275 if attributes['view'] is None:
277 elif attributes.get("type", None) == 'fastq':
278 fastqs.setdefault(ext, set()).add(f)
281 make_submission_section(line_counter,
289 # add in fastqs on a single line.
290 for extension, fastq_set in fastqs.items():
292 make_submission_section(line_counter,
294 attributes[extension])
299 f = open(result_ini,'w')
300 f.write(os.linesep.join(inifile))
303 def make_lane_dict(lib_db, lib_id):
305 Convert the lane_set in a lib_db to a dictionary
306 indexed by flowcell ID
309 for lane in lib_db[lib_id]['lane_set']:
310 result.append((lane['flowcell'], lane))
314 def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False):
316 for lib_id, result_dir in library_result_map:
317 ininame = result_dir+'.ini'
318 inipathname = os.path.join(result_dir, ininame)
319 if os.path.exists(inipathname):
321 make_ddf(ininame, daf_name, True, make_condor, result_dir)
324 if make_condor and len(dag_fragment) > 0:
325 dag_filename = 'submission.dagman'
326 if not force and os.path.exists(dag_filename):
327 logging.warn("%s exists, please delete" % (dag_filename,))
329 f = open(dag_filename,'w')
330 f.write( os.linesep.join(dag_fragment))
331 f.write( os.linesep )
335 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
337 Make ddf files, and bonus condor file
341 if outdir is not None:
346 ddf_name = make_ddf_name(ininame)
348 output = open(ddf_name,'w')
350 file_list = read_ddf_ini(ininame, output)
352 file_list.append(daf_name)
353 if ddf_name is not None:
354 file_list.append(ddf_name)
357 archive_condor = make_condor_archive_script(ininame, file_list)
358 upload_condor = make_condor_upload_script(ininame)
360 dag_fragments.extend(
361 make_dag_fragment(ininame, archive_condor, upload_condor)
369 def read_ddf_ini(filename, output=sys.stdout):
371 Read a ini file and dump out a tab delmited text file
374 config = SafeConfigParser()
375 config.read(filename)
377 order_by = shlex.split(config.get("config", "order_by"))
379 output.write("\t".join(order_by))
380 output.write(os.linesep)
381 sections = config.sections()
383 for section in sections:
384 if section == "config":
385 # skip the config block
389 v = config.get(section, key)
392 file_list.extend(parse_filelist(v))
394 output.write("\t".join(values))
395 output.write(os.linesep)
399 def read_library_result_map(filename):
401 Read a file that maps library id to result directory.
402 Does not support spaces in filenames.
407 stream = open(filename,'r')
412 if not line.startswith('#') and len(line) > 0 :
413 library_id, result_dir = line.split()
414 results.append((library_id, result_dir))
418 def make_condor_archive_script(ininame, files):
419 script = """Universe = vanilla
421 Executable = /bin/tar
422 arguments = czvf ../%(archivename)s %(filelist)s
424 Error = compress.err.$(Process).log
425 Output = compress.out.$(Process).log
426 Log = /tmp/submission-compress.log
427 initialdir = %(initialdir)s
432 if not os.path.exists(f):
433 raise RuntimeError("Missing %s" % (f,))
435 context = {'archivename': make_submission_name(ininame),
436 'filelist': " ".join(files),
437 'initialdir': os.getcwd()}
439 condor_script = make_condor_name(ininame, 'archive')
440 condor_stream = open(condor_script,'w')
441 condor_stream.write(script % context)
442 condor_stream.close()
446 def make_condor_upload_script(ininame):
447 script = """Universe = vanilla
449 Executable = /usr/bin/lftp
450 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
452 Error = upload.err.$(Process).log
453 Output = upload.out.$(Process).log
454 Log = /tmp/submission-upload.log
455 initialdir = %(initialdir)s
459 context = {'archivename': make_submission_name(ininame),
460 'initialdir': os.getcwd()}
462 condor_script = make_condor_name(ininame, 'upload')
463 condor_stream = open(condor_script,'w')
464 condor_stream.write(script % context)
465 condor_stream.close()
469 def make_dag_fragment(ininame, archive_condor, upload_condor):
471 Make the couple of fragments compress and then upload the data.
473 cur_dir = os.getcwd()
474 archive_condor = os.path.join(cur_dir, archive_condor)
475 upload_condor = os.path.join(cur_dir, upload_condor)
476 job_basename = make_base_name(ininame)
479 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
480 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
481 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
486 def get_library_info(host, apidata, library_id):
487 url = api.library_url(host, library_id)
488 contents = api.retrieve_info(url, apidata)
492 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
493 mid=None, force=False):
496 args.extend(['--left', target_pathname])
497 # this is ugly. I did it because I was pregenerating the target
498 # names before I tried to figure out what sources could generate
499 # those targets, and everything up to this point had been
500 # one-to-one. So I couldn't figure out how to pair the
502 # With this at least the command will run correctly.
503 # however if we rename the default targets, this'll break
504 # also I think it'll generate it twice.
505 args.extend(['--right',
506 target_pathname.replace('_r1.fastq', '_r2.fastq')])
508 args.extend(['--single', target_pathname ])
509 if flowcell is not None:
510 args.extend(['--flowcell', flowcell])
513 args.extend(['-m', str(mid)])
516 args.extend(['--force'])
521 """ % (" ".join(args),)
526 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
527 args = ['-i', qseq_file, '-o', target_pathname ]
528 if flowcell is not None:
529 args.extend(['-f', flowcell])
533 """ % (" ".join(args))
537 def find_archive_sequence_files(host, apidata, sequences_path,
540 Find all the archive sequence files possibly associated with our results.
543 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
547 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
549 for lib_id, result_dir in library_result_map:
550 lib_info = get_library_info(host, apidata, lib_id)
551 lib_info['lanes'] = {}
552 lib_db[lib_id] = lib_info
554 for lane in lib_info['lane_set']:
555 lane_key = (lane['flowcell'], lane['lane_number'])
556 candidate_lanes[lane_key] = lib_id
557 seq_dirs.add(os.path.join(sequences_path,
560 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
561 candidate_seq_list = scan_for_sequences(seq_dirs)
563 # at this point we have too many sequences as scan_for_sequences
564 # returns all the sequences in a flowcell directory
565 # so lets filter out the extras
567 for seq in candidate_seq_list:
568 lane_key = (seq.flowcell, seq.lane)
569 lib_id = candidate_lanes.get(lane_key, None)
570 if lib_id is not None:
571 lib_info = lib_db[lib_id]
572 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
577 class NameToViewMap(object):
578 """Determine view attributes for a given submission file name
580 def __init__(self, root_url, apidata):
581 self.root_url = root_url
582 self.apidata = apidata
585 # ma is "map algorithm"
590 ('*.bam', self._guess_bam_view),
591 ('*.splices.bam', 'Splices'),
592 ('*.jnct', 'Junctions'),
593 ('*.plus.bigwig', 'PlusSignal'),
594 ('*.minus.bigwig', 'MinusSignal'),
595 ('*.bigwig', 'Signal'),
600 ('*denovo.genes.expr', 'GeneDeNovo'),
601 ('*denovo.transcripts.expr','TranscriptDeNovo'),
602 ('*novel.genes.expr', 'GeneDeNovo'),
603 ('*novel.transcripts.expr', 'TranscriptDeNovo'),
604 ('*genes.expr', 'GeneFPKM'),
605 ('*transcripts.expr', 'TranscriptFPKM'),
606 ('*transcript.expr', 'TranscriptFPKM'),
607 ('*_r1.fastq', 'FastqRd1'),
608 ('*_r2.fastq', 'FastqRd2'),
609 ('*.fastq', 'Fastq'),
610 ('*.gtf', 'GeneModel'),
613 ('*.stats.txt', 'InsLength'),
620 None: {"MapAlgorithm": "NA"},
621 "Paired": {"MapAlgorithm": ma},
622 "Single": {"MapAlgorithm": ma},
623 "Splices": {"MapAlgorithm": ma},
624 "Junctions": {"MapAlgorithm": ma},
625 "PlusSignal": {"MapAlgorithm": ma},
626 "MinusSignal": {"MapAlgorithm": ma},
627 "Signal": {"MapAlgorithm": ma},
628 "GeneDeNovo": {"MapAlgorithm": ma},
629 "TranscriptDeNovo": {"MapAlgorithm": ma},
630 "GeneDeNovo": {"MapAlgorithm": ma},
631 "TranscriptDeNovo": {"MapAlgorithm": ma},
632 "GeneFPKM": {"MapAlgorithm": ma},
633 "TranscriptFPKM": {"MapAlgorithm": ma},
634 "TranscriptFPKM": {"MapAlgorithm": ma},
635 "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"},
636 "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"},
637 "Fastq": {"MapAlgorithm": "NA", "type": "fastq" },
638 "GeneModel": {"MapAlgorithm": ma},
639 "InsLength": {"MapAlgorithm": ma},
641 # view name is one of the attributes
642 for v in self.views.keys():
643 self.views[v]['view'] = v
645 def find_attributes(self, pathname, lib_id):
646 """Looking for the best extension
647 The 'best' is the longest match
650 filename (str): the filename whose extention we are about to examine
652 path, filename = os.path.splitext(pathname)
653 if not self.lib_cache.has_key(lib_id):
654 self.lib_cache[lib_id] = get_library_info(self.root_url,
655 self.apidata, lib_id)
657 lib_info = self.lib_cache[lib_id]
658 if lib_info['cell_line'].lower() == 'unknown':
659 logging.warn("Library %s missing cell_line" % (lib_id,))
661 'cell': lib_info['cell_line'],
662 'replicate': lib_info['replicate'],
664 is_paired = self._is_paired(lib_info)
667 attributes.update(self.get_paired_attributes(lib_info))
669 attributes.update(self.get_single_attributes(lib_info))
671 for pattern, view in self.patterns:
672 if fnmatch.fnmatch(pathname, pattern):
674 view = view(is_paired=is_paired)
676 attributes.update(self.views[view])
677 attributes["extension"] = pattern
681 def _guess_bam_view(self, is_paired=True):
682 """Guess a view name based on library attributes
690 def _is_paired(self, lib_info):
691 """Determine if a library is paired end"""
692 if len(lib_info["lane_set"]) == 0:
697 # check to see if all the flowcells are the same.
698 # otherwise we might need to do something complicated
699 for flowcell in lib_info["lane_set"]:
700 if flowcell["paired_end"]:
705 logging.debug("Library %s: %d were, %d were not paired" % \
706 (lib_info["library_id"], is_paired, isnot_paired))
708 if is_paired > isnot_paired:
710 elif is_paired < isnot_paired:
713 raise RuntimeError("Assumptions about paired vs not paired are wrong")
715 def get_paired_attributes(self, lib_info):
716 if lib_info['insert_size'] is None:
717 errmsg = "Library %s is missing insert_size, assuming 200"
718 logging.warn(errmsg % (lib_info["library_id"],))
721 insert_size = lib_info['insert_size']
722 return {'insertLength': insert_size,
725 def get_single_attributes(self, lib_info):
726 return {'insertLength':'ilNA',
730 def make_submission_section(line_counter, files, attributes):
732 Create a section in the submission ini file
734 inifile = [ "[line%s]" % (line_counter,) ]
735 inifile += ["files=%s" % (",".join(files))]
737 for k,v in attributes.items():
738 inifile += ["%s=%s" % (k,v)]
742 def make_base_name(pathname):
743 base = os.path.basename(pathname)
744 name, ext = os.path.splitext(base)
748 def make_submission_name(ininame):
749 name = make_base_name(ininame)
753 def make_ddf_name(pathname):
754 name = make_base_name(pathname)
758 def make_condor_name(pathname, run_type=None):
759 name = make_base_name(pathname)
761 if run_type is not None:
762 elements.append(run_type)
763 elements.append("condor")
764 return ".".join(elements)
767 def make_submit_script(target, header, body_list):
769 write out a text file
771 this was intended for condor submit scripts
774 target (str or stream):
775 if target is a string, we will open and close the file
776 if target is a stream, the caller is responsible.
779 header to write at the beginning of the file
780 body_list (list of strs):
781 a list of blocks to add to the file.
783 if type(target) in types.StringTypes:
788 for entry in body_list:
790 if type(target) in types.StringTypes:
793 def parse_filelist(file_string):
794 return file_string.split(",")
797 def validate_filelist(files):
799 Die if a file doesn't exist in a file list
802 if not os.path.exists(f):
803 raise RuntimeError("%s does not exist" % (f,))
806 if __name__ == "__main__":