2 from ConfigParser import SafeConfigParser
6 from optparse import OptionParser
8 from pprint import pprint, pformat
10 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
24 def main(cmdline=None):
25 parser = make_parser()
26 opts, args = parser.parse_args(cmdline)
29 logging.basicConfig(level = logging.DEBUG )
31 logging.basicConfig(level = logging.INFO )
33 logging.basicConfig(level = logging.WARNING )
36 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
38 if opts.host is None or opts.apiid is None or opts.apikey is None:
39 parser.error("Please specify host url, apiid, apikey")
42 parser.error("I need at least one library submission-dir input file")
44 library_result_map = []
46 library_result_map.extend(read_library_result_map(a))
48 if opts.daf is not None:
49 link_daf(opts.daf, library_result_map)
52 build_fastqs(opts.host,
59 make_submission_ini(opts.host, apidata, library_result_map)
62 make_all_ddfs(library_result_map, opts.daf)
65 # Load defaults from the config files
66 config = SafeConfigParser()
67 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
69 sequence_archive = None
73 SECTION = 'sequence_archive'
74 if config.has_section(SECTION):
75 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
76 sequence_archive = os.path.expanduser(sequence_archive)
77 apiid = config.get(SECTION, 'apiid', apiid)
78 apikey = config.get(SECTION, 'apikey', apikey)
79 apihost = config.get(SECTION, 'host', apihost)
81 parser = OptionParser()
84 parser.add_option('--fastq', help="generate scripts for making fastq files",
85 default=False, action="store_true")
87 parser.add_option('--ini', help="generate submission ini file", default=False,
90 parser.add_option('--makeddf', help='make the ddfs', default=False,
93 parser.add_option('--daf', default=None, help='specify daf name')
94 parser.add_option('--force', default=False, action="store_true",
95 help="Force regenerating fastqs")
97 # configuration options
98 parser.add_option('--apiid', default=apiid, help="Specify API ID")
99 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
100 parser.add_option('--host', default=apihost,
101 help="specify HTSWorkflow host",)
102 parser.add_option('--sequence', default=sequence_archive,
103 help="sequence repository")
106 parser.add_option('--verbose', default=False, action="store_true",
107 help='verbose logging')
108 parser.add_option('--debug', default=False, action="store_true",
109 help='debug logging')
113 def build_fastqs(host, apidata, sequences_path, library_result_map,
116 Generate condor scripts to build any needed fastq files
119 host (str): root of the htsworkflow api server
120 apidata (dict): id & key to post to the server
121 sequences_path (str): root of the directory tree to scan for files
122 library_result_map (list): [(library_id, destination directory), ...]
124 qseq_condor_header = """
126 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
127 error=log/qseq2fastq.err.$(process).log
128 output=log/qseq2fastq.out.$(process).log
129 log=log/qseq2fastq.log
132 qseq_condor_entries = []
133 srf_condor_header = """
135 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2fastq
136 output=log/srf_pair_fastq.out.$(process).log
137 error=log/srf_pair_fastq.err.$(process).log
138 log=log/srf_pair_fastq.log
139 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
142 srf_condor_entries = []
143 lib_db = find_archive_sequence_files(host,
148 needed_targets = find_missing_targets(library_result_map, lib_db, force)
150 for target_pathname, available_sources in needed_targets.items():
151 logging.debug(' target : %s' % (target_pathname,))
152 logging.debug(' candidate sources: %s' % (available_sources,))
153 if available_sources.has_key('qseq'):
154 source = available_sources['qseq']
155 qseq_condor_entries.append(
156 condor_qseq_to_fastq(source.path,
161 elif available_sources.has_key('srf'):
162 source = available_sources['srf']
163 mid = getattr(source, 'mid_point', None)
164 srf_condor_entries.append(
165 condor_srf_to_fastq(source.path,
173 print " need file", target_pathname
175 if len(srf_condor_entries) > 0:
176 make_submit_script('srf.fastq.condor',
180 if len(qseq_condor_entries) > 0:
181 make_submit_script('qseq.fastq.condor',
185 def find_missing_targets(library_result_map, lib_db, force=False):
187 Check if the sequence file exists.
188 This requires computing what the sequence name is and checking
189 to see if it can be found in the sequence location.
191 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
193 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
194 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
195 # find what targets we're missing
197 for lib_id, result_dir in library_result_map:
199 lane_dict = make_lane_dict(lib_db, lib_id)
201 for lane_key, sequences in lib['lanes'].items():
202 for seq in sequences:
203 seq.paired = lane_dict[seq.flowcell]['paired_end']
204 lane_status = lane_dict[seq.flowcell]['status']
206 if seq.paired and seq.read is None:
208 filename_attributes = {
209 'flowcell': seq.flowcell,
216 if lane_status == 'Failed':
218 if seq.flowcell == '30DY0AAXX':
219 # 30DY0 only ran for 151 bases instead of 152
220 # it is actually 76 1st read, 75 2nd read
225 target_name = fastq_paired_template % filename_attributes
227 target_name = fastq_single_template % filename_attributes
229 target_pathname = os.path.join(result_dir, target_name)
230 if force or not os.path.exists(target_pathname):
231 t = needed_targets.setdefault(target_pathname, {})
232 t[seq.filetype] = seq
234 return needed_targets
236 def link_daf(daf_path, library_result_map):
237 if not os.path.exists(daf_path):
238 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
240 base_daf = os.path.basename(daf_path)
242 for lib_id, result_dir in library_result_map:
243 submission_daf = os.path.join(result_dir, base_daf)
244 if not os.path.exists(submission_daf):
245 os.link(daf_path, submission_daf)
247 def make_submission_ini(host, apidata, library_result_map, paired=True):
248 # ma is "map algorithm"
258 '.bai': {'view': None, 'MapAlgorithm': 'NA'},
259 '.bam': {'view': aligns, 'MapAlgorithm': ma},
260 '.splices.bam': {'view': 'Splices', 'MapAlgorithm': ma},
261 '.jnct': {'view': 'Junctions', 'MapAlgorithm': ma},
262 '.plus.bigwig': {'view': 'PlusSignal', 'MapAlgorithm': ma},
263 '.minus.bigwig': {'view': 'MinusSignal', 'MapAlgorithm': ma},
264 '.bigwig': {'view': 'Signal', 'MapAlgorithm': ma},
265 '.tar.bz2': {'view': None},
266 '.condor': {'view': None},
267 '.daf': {'view': None},
268 '.ddf': {'view': None},
269 'denovo.genes.expr': {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
270 'denovo.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
271 'novel.genes.expr': {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
272 'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
273 '.genes.expr': {'view': 'GeneFPKM', 'MapAlgorithm': ma},
274 '.transcripts.expr': {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
275 '.transcript.expr': {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
276 '.fastq': {'view': 'Fastq', 'MapAlgorithm': 'NA' },
277 '_r1.fastq': {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
278 '_r2.fastq': {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
279 '.gtf': {'view': 'GeneModel', 'MapAlgorithm': ma},
280 '.ini': {'view': None},
281 '.log': {'view': None},
282 '.stats.txt': {'view': 'InsLength', 'MapAlgorithm': ma},
283 '.srf': {'view': None},
284 '.wig': {'view': None},
285 '.zip': {'view': None},
288 candidate_fastq_src = {}
290 for lib_id, result_dir in library_result_map:
291 order_by = ['order_by=files', 'view', 'replicate', 'cell',
292 'readType', 'mapAlgorithm', 'insertLength' ]
293 inifile = ['[config]']
294 inifile += [" ".join(order_by)]
297 lib_info = get_library_info(host, apidata, lib_id)
298 result_ini = os.path.join(result_dir, result_dir+'.ini')
300 if lib_info['cell_line'].lower() == 'unknown':
301 logging.warn("Library %s missing cell_line" % (lib_id,))
303 standard_attributes = {'cell': lib_info['cell_line'],
304 'replicate': lib_info['replicate'],
307 if lib_info['insert_size'] is None:
308 errmsg = "Library %s is missing insert_size, assuming 200"
309 logging.warn(errmsg % (lib_id,))
312 insert_size = lib_info['insert_size']
313 standard_attributes['insertLength'] = insert_size
314 standard_attributes['readType'] = '2x75'
316 standard_attributes['insertLength'] = 'ilNA'
317 standard_attributes['readType'] = '1x75D'
320 submission_files = os.listdir(result_dir)
322 for f in submission_files:
323 best_ext = find_best_extension(attributes, f)
325 if best_ext is not None:
326 if attributes[best_ext]['view'] is None:
329 elif best_ext.endswith('fastq'):
330 fastqs.setdefault(best_ext, set()).add(f)
333 make_submission_section(line_counter,
342 raise ValueError("Unrecognized file: %s" % (f,))
344 # add in fastqs on a single line.
345 for extension, fastq_set in fastqs.items():
347 make_submission_section(line_counter,
350 attributes[extension])
355 f = open(result_ini,'w')
356 f.write(os.linesep.join(inifile))
358 def make_lane_dict(lib_db, lib_id):
360 Convert the lane_set in a lib_db to a dictionary
361 indexed by flowcell ID
364 for lane in lib_db[lib_id]['lane_set']:
365 result.append((lane['flowcell'], lane))
368 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
370 for lib_id, result_dir in library_result_map:
371 ininame = result_dir+'.ini'
372 inipathname = os.path.join(result_dir, ininame)
373 if os.path.exists(inipathname):
375 make_ddf(ininame, daf_name, True, make_condor, result_dir)
378 if make_condor and len(dag_fragment) > 0:
379 dag_filename = 'submission.dagman'
380 if os.path.exists(dag_filename):
381 logging.warn("%s exists, please delete" % (dag_filename,))
383 f = open(dag_filename,'w')
384 f.write( os.linesep.join(dag_fragment))
385 f.write( os.linesep )
389 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
391 Make ddf files, and bonus condor file
395 if outdir is not None:
400 ddf_name = make_ddf_name(ininame)
402 output = open(ddf_name,'w')
404 file_list = read_ddf_ini(ininame, output)
406 file_list.append(daf_name)
407 if ddf_name is not None:
408 file_list.append(ddf_name)
411 archive_condor = make_condor_archive_script(ininame, file_list)
412 upload_condor = make_condor_upload_script(ininame)
414 dag_fragments.extend(
415 make_dag_fragment(ininame, archive_condor, upload_condor)
422 def read_ddf_ini(filename, output=sys.stdout):
424 Read a ini file and dump out a tab delmited text file
427 config = SafeConfigParser()
428 config.read(filename)
430 order_by = shlex.split(config.get("config", "order_by"))
432 output.write("\t".join(order_by))
433 output.write(os.linesep)
434 sections = config.sections()
436 for section in sections:
437 if section == "config":
438 # skip the config block
442 v = config.get(section, key)
445 file_list.extend(parse_filelist(v))
447 output.write("\t".join(values))
448 output.write(os.linesep)
451 def read_library_result_map(filename):
453 Read a file that maps library id to result directory.
454 Does not support spaces in filenames.
459 stream = open(filename,'r')
464 if not line.startswith('#') and len(line) > 0 :
465 library_id, result_dir = line.split()
466 results.append((library_id, result_dir))
469 def make_condor_archive_script(ininame, files):
470 script = """Universe = vanilla
472 Executable = /bin/tar
473 arguments = czvf ../%(archivename)s %(filelist)s
475 Error = compress.err.$(Process).log
476 Output = compress.out.$(Process).log
477 Log = /tmp/submission-compress.log
478 initialdir = %(initialdir)s
483 if not os.path.exists(f):
484 raise RuntimeError("Missing %s" % (f,))
486 context = {'archivename': make_submission_name(ininame),
487 'filelist': " ".join(files),
488 'initialdir': os.getcwd()}
490 condor_script = make_condor_name(ininame, 'archive')
491 condor_stream = open(condor_script,'w')
492 condor_stream.write(script % context)
493 condor_stream.close()
496 def make_condor_upload_script(ininame):
497 script = """Universe = vanilla
499 Executable = /usr/bin/lftp
500 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
502 Error = upload.err.$(Process).log
503 Output = upload.out.$(Process).log
504 Log = /tmp/submission-upload.log
505 initialdir = %(initialdir)s
509 context = {'archivename': make_submission_name(ininame),
510 'initialdir': os.getcwd()}
512 condor_script = make_condor_name(ininame, 'upload')
513 condor_stream = open(condor_script,'w')
514 condor_stream.write(script % context)
515 condor_stream.close()
518 def make_dag_fragment(ininame, archive_condor, upload_condor):
520 Make the couple of fragments compress and then upload the data.
522 cur_dir = os.getcwd()
523 archive_condor = os.path.join(cur_dir, archive_condor)
524 upload_condor = os.path.join(cur_dir, upload_condor)
525 job_basename = make_base_name(ininame)
528 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
529 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
530 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
534 def get_library_info(host, apidata, library_id):
535 url = api.library_url(host, library_id)
536 contents = api.retrieve_info(url, apidata)
539 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
540 mid=None, force=False):
543 args.extend(['--left', target_pathname])
544 # this is ugly. I did it because I was pregenerating the target
545 # names before I tried to figure out what sources could generate
546 # those targets, and everything up to this point had been
547 # one-to-one. So I couldn't figure out how to pair the
549 # With this at least the command will run correctly.
550 # however if we rename the default targets, this'll break
551 # also I think it'll generate it twice.
552 args.extend(['--right',
553 target_pathname.replace('_r1.fastq', '_r2.fastq')])
555 args.extend(['--single', target_pathname ])
556 if flowcell is not None:
557 args.extend(['--flowcell', flowcell])
560 args.extend(['-m', str(mid)])
563 args.extend(['--force'])
568 """ % (" ".join(args),)
572 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
573 args = ['-i', qseq_file, '-o', target_pathname ]
574 if flowcell is not None:
575 args.extend(['-f', flowcell])
579 """ % (" ".join(args))
583 def find_archive_sequence_files(host, apidata, sequences_path,
586 Find all the archive sequence files possibly associated with our results.
589 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
593 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
595 for lib_id, result_dir in library_result_map:
596 lib_info = get_library_info(host, apidata, lib_id)
597 lib_db[lib_id] = lib_info
599 for lane in lib_info['lane_set']:
600 lane_key = (lane['flowcell'], lane['lane_number'])
601 candidate_lanes[lane_key] = lib_id
602 seq_dirs.add(os.path.join(sequences_path,
605 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
606 candidate_seq_list = scan_for_sequences(seq_dirs)
608 # at this point we have too many sequences as scan_for_sequences
609 # returns all the sequences in a flowcell directory
610 # so lets filter out the extras
612 for seq in candidate_seq_list:
613 lane_key = (seq.flowcell, seq.lane)
614 lib_id = candidate_lanes.get(lane_key, None)
615 if lib_id is not None:
616 lib_info = lib_db[lib_id]
617 lanes = lib_info.setdefault('lanes', {})
618 lanes.setdefault(lane_key, set()).add(seq)
622 def find_best_extension(extension_map, filename):
624 Search through extension_map looking for the best extension
625 The 'best' is the longest match
628 extension_map (dict): '.ext' -> { 'view': 'name' or None }
629 filename (str): the filename whose extention we are about to examine
632 path, last_ext = os.path.splitext(filename)
634 for ext in extension_map.keys():
635 if filename.endswith(ext):
638 elif len(ext) > len(best_ext):
642 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
644 Create a section in the submission ini file
646 inifile = [ '[line%s]' % (line_counter,) ]
647 inifile += ["files=%s" % (",".join(files))]
649 cur_attributes.update(standard_attributes)
650 cur_attributes.update(file_attributes)
652 for k,v in cur_attributes.items():
653 inifile += ["%s=%s" % (k,v)]
656 def make_base_name(pathname):
657 base = os.path.basename(pathname)
658 name, ext = os.path.splitext(base)
661 def make_submission_name(ininame):
662 name = make_base_name(ininame)
665 def make_ddf_name(pathname):
666 name = make_base_name(pathname)
669 def make_condor_name(pathname, run_type=None):
670 name = make_base_name(pathname)
672 if run_type is not None:
673 elements.append(run_type)
674 elements.append('condor')
675 return ".".join(elements)
677 def make_submit_script(target, header, body_list):
679 write out a text file
681 this was intended for condor submit scripts
684 target (str or stream):
685 if target is a string, we will open and close the file
686 if target is a stream, the caller is responsible.
689 header to write at the beginning of the file
690 body_list (list of strs):
691 a list of blocks to add to the file.
693 if type(target) in types.StringTypes:
698 for entry in body_list:
700 if type(target) in types.StringTypes:
703 def parse_filelist(file_string):
704 return file_string.split(',')
706 def validate_filelist(files):
708 Die if a file doesn't exist in a file list
711 if not os.path.exists(f):
712 raise RuntimeError("%s does not exist" % (f,))
714 if __name__ == "__main__":