2 from ConfigParser import SafeConfigParser
6 from optparse import OptionParser
8 from pprint import pprint, pformat
10 from StringIO import StringIO
18 from htsworkflow.util import api
19 from htsworkflow.pipelines.sequences import \
20 create_sequence_table, \
23 def main(cmdline=None):
24 parser = make_parser()
25 opts, args = parser.parse_args(cmdline)
28 logging.basicConfig(level = logging.DEBUG )
30 logging.basicConfig(level = logging.INFO )
32 logging.basicConfig(level = logging.WARNING )
34 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
36 if opts.host is None or opts.apiid is None or opts.apikey is None:
37 parser.error("Please specify host url, apiid, apikey")
40 parser.error("I need at least one library submission-dir input file")
42 library_result_map = []
44 library_result_map.extend(read_library_result_map(a))
46 if opts.daf is not None:
47 link_daf(opts.daf, library_result_map)
50 build_fastqs(opts.host,
57 make_submission_ini(opts.host, apidata, library_result_map)
60 make_all_ddfs(library_result_map, opts.daf)
64 # Load defaults from the config files
65 config = SafeConfigParser()
66 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
68 sequence_archive = None
72 SECTION = 'sequence_archive'
73 if config.has_section(SECTION):
74 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
75 sequence_archive = os.path.expanduser(sequence_archive)
76 apiid = config.get(SECTION, 'apiid', apiid)
77 apikey = config.get(SECTION, 'apikey', apikey)
78 apihost = config.get(SECTION, 'host', apihost)
80 parser = OptionParser()
83 parser.add_option('--fastq', help="generate scripts for making fastq files",
84 default=False, action="store_true")
86 parser.add_option('--ini', help="generate submission ini file", default=False,
89 parser.add_option('--makeddf', help='make the ddfs', default=False,
92 parser.add_option('--daf', default=None, help='specify daf name')
93 parser.add_option('--force', default=False, action="store_true",
94 help="Force regenerating fastqs")
96 # configuration options
97 parser.add_option('--apiid', default=apiid, help="Specify API ID")
98 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
99 parser.add_option('--host', default=apihost,
100 help="specify HTSWorkflow host",)
101 parser.add_option('--sequence', default=sequence_archive,
102 help="sequence repository")
105 parser.add_option('--verbose', default=False, action="store_true",
106 help='verbose logging')
107 parser.add_option('--debug', default=False, action="store_true",
108 help='debug logging')
113 def build_fastqs(host, apidata, sequences_path, library_result_map,
116 Generate condor scripts to build any needed fastq files
119 host (str): root of the htsworkflow api server
120 apidata (dict): id & key to post to the server
121 sequences_path (str): root of the directory tree to scan for files
122 library_result_map (list): [(library_id, destination directory), ...]
124 qseq_condor_header = """
126 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
127 error=log/qseq2fastq.err.$(process).log
128 output=log/qseq2fastq.out.$(process).log
129 log=log/qseq2fastq.log
132 qseq_condor_entries = []
133 srf_condor_header = """
135 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2fastq
136 output=log/srf_pair_fastq.out.$(process).log
137 error=log/srf_pair_fastq.err.$(process).log
138 log=log/srf_pair_fastq.log
139 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
142 srf_condor_entries = []
143 lib_db = find_archive_sequence_files(host,
148 needed_targets = find_missing_targets(library_result_map, lib_db, force)
150 for target_pathname, available_sources in needed_targets.items():
151 logging.debug(' target : %s' % (target_pathname,))
152 logging.debug(' candidate sources: %s' % (available_sources,))
153 if available_sources.has_key('qseq'):
154 source = available_sources['qseq']
155 qseq_condor_entries.append(
156 condor_qseq_to_fastq(source.path,
161 elif available_sources.has_key('srf'):
162 source = available_sources['srf']
163 mid = getattr(source, 'mid_point', None)
164 srf_condor_entries.append(
165 condor_srf_to_fastq(source.path,
173 print " need file", target_pathname
175 if len(srf_condor_entries) > 0:
176 make_submit_script('srf.fastq.condor',
180 if len(qseq_condor_entries) > 0:
181 make_submit_script('qseq.fastq.condor',
186 def find_missing_targets(library_result_map, lib_db, force=False):
188 Check if the sequence file exists.
189 This requires computing what the sequence name is and checking
190 to see if it can be found in the sequence location.
192 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
194 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
195 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
196 # find what targets we're missing
198 for lib_id, result_dir in library_result_map:
200 lane_dict = make_lane_dict(lib_db, lib_id)
202 for lane_key, sequences in lib['lanes'].items():
203 for seq in sequences:
204 seq.paired = lane_dict[seq.flowcell]['paired_end']
205 lane_status = lane_dict[seq.flowcell]['status']
207 if seq.paired and seq.read is None:
209 filename_attributes = {
210 'flowcell': seq.flowcell,
217 if lane_status == 'Failed':
219 if seq.flowcell == '30DY0AAXX':
220 # 30DY0 only ran for 151 bases instead of 152
221 # it is actually 76 1st read, 75 2nd read
226 target_name = fastq_paired_template % filename_attributes
228 target_name = fastq_single_template % filename_attributes
230 target_pathname = os.path.join(result_dir, target_name)
231 if force or not os.path.exists(target_pathname):
232 t = needed_targets.setdefault(target_pathname, {})
233 t[seq.filetype] = seq
235 return needed_targets
238 def link_daf(daf_path, library_result_map):
239 if not os.path.exists(daf_path):
240 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
242 base_daf = os.path.basename(daf_path)
244 for lib_id, result_dir in library_result_map:
245 submission_daf = os.path.join(result_dir, base_daf)
246 if not os.path.exists(submission_daf):
247 os.link(daf_path, submission_daf)
250 def make_submission_ini(host, apidata, library_result_map, paired=True):
251 # ma is "map algorithm"
261 '.bai': {'view': None, 'MapAlgorithm': 'NA'},
262 '.bam': {'view': aligns, 'MapAlgorithm': ma},
263 '.splices.bam': {'view': 'Splices', 'MapAlgorithm': ma},
264 '.jnct': {'view': 'Junctions', 'MapAlgorithm': ma},
265 '.plus.bigwig': {'view': 'PlusSignal', 'MapAlgorithm': ma},
266 '.minus.bigwig': {'view': 'MinusSignal', 'MapAlgorithm': ma},
267 '.bigwig': {'view': 'Signal', 'MapAlgorithm': ma},
268 '.tar.bz2': {'view': None},
269 '.condor': {'view': None},
270 '.daf': {'view': None},
271 '.ddf': {'view': None},
272 'denovo.genes.expr': {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
273 'denovo.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
274 'novel.genes.expr': {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
275 'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
276 '.genes.expr': {'view': 'GeneFPKM', 'MapAlgorithm': ma},
277 '.transcripts.expr': {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
278 '.transcript.expr': {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
279 '.fastq': {'view': 'Fastq', 'MapAlgorithm': 'NA' },
280 '_r1.fastq': {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
281 '_r2.fastq': {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
282 '.gtf': {'view': 'GeneModel', 'MapAlgorithm': ma},
283 '.ini': {'view': None},
284 '.log': {'view': None},
285 '.stats.txt': {'view': 'InsLength', 'MapAlgorithm': ma},
286 '.srf': {'view': None},
287 '.wig': {'view': None},
288 '.zip': {'view': None},
291 candidate_fastq_src = {}
293 for lib_id, result_dir in library_result_map:
294 order_by = ['order_by=files', 'view', 'replicate', 'cell',
295 'readType', 'mapAlgorithm', 'insertLength' ]
296 inifile = ['[config]']
297 inifile += [" ".join(order_by)]
300 lib_info = get_library_info(host, apidata, lib_id)
301 result_ini = os.path.join(result_dir, result_dir+'.ini')
303 if lib_info['cell_line'].lower() == 'unknown':
304 logging.warn("Library %s missing cell_line" % (lib_id,))
306 standard_attributes = {'cell': lib_info['cell_line'],
307 'replicate': lib_info['replicate'],
310 if lib_info['insert_size'] is None:
311 errmsg = "Library %s is missing insert_size, assuming 200"
312 logging.warn(errmsg % (lib_id,))
315 insert_size = lib_info['insert_size']
316 standard_attributes['insertLength'] = insert_size
317 standard_attributes['readType'] = '2x75'
319 standard_attributes['insertLength'] = 'ilNA'
320 standard_attributes['readType'] = '1x75D'
323 submission_files = os.listdir(result_dir)
325 for f in submission_files:
326 best_ext = find_best_extension(attributes, f)
328 if best_ext is not None:
329 if attributes[best_ext]['view'] is None:
332 elif best_ext.endswith('fastq'):
333 fastqs.setdefault(best_ext, set()).add(f)
336 make_submission_section(line_counter,
345 raise ValueError("Unrecognized file: %s" % (f,))
347 # add in fastqs on a single line.
348 for extension, fastq_set in fastqs.items():
350 make_submission_section(line_counter,
353 attributes[extension])
358 f = open(result_ini,'w')
359 f.write(os.linesep.join(inifile))
362 def make_lane_dict(lib_db, lib_id):
364 Convert the lane_set in a lib_db to a dictionary
365 indexed by flowcell ID
368 for lane in lib_db[lib_id]['lane_set']:
369 result.append((lane['flowcell'], lane))
373 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
375 for lib_id, result_dir in library_result_map:
376 ininame = result_dir+'.ini'
377 inipathname = os.path.join(result_dir, ininame)
378 if os.path.exists(inipathname):
380 make_ddf(ininame, daf_name, True, make_condor, result_dir)
383 if make_condor and len(dag_fragment) > 0:
384 dag_filename = 'submission.dagman'
385 if os.path.exists(dag_filename):
386 logging.warn("%s exists, please delete" % (dag_filename,))
388 f = open(dag_filename,'w')
389 f.write( os.linesep.join(dag_fragment))
390 f.write( os.linesep )
394 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
396 Make ddf files, and bonus condor file
400 if outdir is not None:
405 ddf_name = make_ddf_name(ininame)
407 output = open(ddf_name,'w')
409 file_list = read_ddf_ini(ininame, output)
411 file_list.append(daf_name)
412 if ddf_name is not None:
413 file_list.append(ddf_name)
416 archive_condor = make_condor_archive_script(ininame, file_list)
417 upload_condor = make_condor_upload_script(ininame)
419 dag_fragments.extend(
420 make_dag_fragment(ininame, archive_condor, upload_condor)
428 def read_ddf_ini(filename, output=sys.stdout):
430 Read a ini file and dump out a tab delmited text file
433 config = SafeConfigParser()
434 config.read(filename)
436 order_by = shlex.split(config.get("config", "order_by"))
438 output.write("\t".join(order_by))
439 output.write(os.linesep)
440 sections = config.sections()
442 for section in sections:
443 if section == "config":
444 # skip the config block
448 v = config.get(section, key)
451 file_list.extend(parse_filelist(v))
453 output.write("\t".join(values))
454 output.write(os.linesep)
458 def read_library_result_map(filename):
460 Read a file that maps library id to result directory.
461 Does not support spaces in filenames.
466 stream = open(filename,'r')
471 if not line.startswith('#') and len(line) > 0 :
472 library_id, result_dir = line.split()
473 results.append((library_id, result_dir))
477 def make_condor_archive_script(ininame, files):
478 script = """Universe = vanilla
480 Executable = /bin/tar
481 arguments = czvf ../%(archivename)s %(filelist)s
483 Error = compress.err.$(Process).log
484 Output = compress.out.$(Process).log
485 Log = /tmp/submission-compress.log
486 initialdir = %(initialdir)s
491 if not os.path.exists(f):
492 raise RuntimeError("Missing %s" % (f,))
494 context = {'archivename': make_submission_name(ininame),
495 'filelist': " ".join(files),
496 'initialdir': os.getcwd()}
498 condor_script = make_condor_name(ininame, 'archive')
499 condor_stream = open(condor_script,'w')
500 condor_stream.write(script % context)
501 condor_stream.close()
505 def make_condor_upload_script(ininame):
506 script = """Universe = vanilla
508 Executable = /usr/bin/lftp
509 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
511 Error = upload.err.$(Process).log
512 Output = upload.out.$(Process).log
513 Log = /tmp/submission-upload.log
514 initialdir = %(initialdir)s
518 context = {'archivename': make_submission_name(ininame),
519 'initialdir': os.getcwd()}
521 condor_script = make_condor_name(ininame, 'upload')
522 condor_stream = open(condor_script,'w')
523 condor_stream.write(script % context)
524 condor_stream.close()
528 def make_dag_fragment(ininame, archive_condor, upload_condor):
530 Make the couple of fragments compress and then upload the data.
532 cur_dir = os.getcwd()
533 archive_condor = os.path.join(cur_dir, archive_condor)
534 upload_condor = os.path.join(cur_dir, upload_condor)
535 job_basename = make_base_name(ininame)
538 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
539 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
540 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
545 def get_library_info(host, apidata, library_id):
546 url = api.library_url(host, library_id)
547 contents = api.retrieve_info(url, apidata)
551 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
552 mid=None, force=False):
555 args.extend(['--left', target_pathname])
556 # this is ugly. I did it because I was pregenerating the target
557 # names before I tried to figure out what sources could generate
558 # those targets, and everything up to this point had been
559 # one-to-one. So I couldn't figure out how to pair the
561 # With this at least the command will run correctly.
562 # however if we rename the default targets, this'll break
563 # also I think it'll generate it twice.
564 args.extend(['--right',
565 target_pathname.replace('_r1.fastq', '_r2.fastq')])
567 args.extend(['--single', target_pathname ])
568 if flowcell is not None:
569 args.extend(['--flowcell', flowcell])
572 args.extend(['-m', str(mid)])
575 args.extend(['--force'])
580 """ % (" ".join(args),)
585 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
586 args = ['-i', qseq_file, '-o', target_pathname ]
587 if flowcell is not None:
588 args.extend(['-f', flowcell])
592 """ % (" ".join(args))
596 def find_archive_sequence_files(host, apidata, sequences_path,
599 Find all the archive sequence files possibly associated with our results.
602 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
606 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
608 for lib_id, result_dir in library_result_map:
609 lib_info = get_library_info(host, apidata, lib_id)
610 lib_db[lib_id] = lib_info
612 for lane in lib_info['lane_set']:
613 lane_key = (lane['flowcell'], lane['lane_number'])
614 candidate_lanes[lane_key] = lib_id
615 seq_dirs.add(os.path.join(sequences_path,
618 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
619 candidate_seq_list = scan_for_sequences(seq_dirs)
621 # at this point we have too many sequences as scan_for_sequences
622 # returns all the sequences in a flowcell directory
623 # so lets filter out the extras
625 for seq in candidate_seq_list:
626 lane_key = (seq.flowcell, seq.lane)
627 lib_id = candidate_lanes.get(lane_key, None)
628 if lib_id is not None:
629 lib_info = lib_db[lib_id]
630 lanes = lib_info.setdefault('lanes', {})
631 lanes.setdefault(lane_key, set()).add(seq)
636 def find_best_extension(extension_map, filename):
638 Search through extension_map looking for the best extension
639 The 'best' is the longest match
642 extension_map (dict): '.ext' -> { 'view': 'name' or None }
643 filename (str): the filename whose extention we are about to examine
646 path, last_ext = os.path.splitext(filename)
648 for ext in extension_map.keys():
649 if filename.endswith(ext):
652 elif len(ext) > len(best_ext):
657 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
659 Create a section in the submission ini file
661 inifile = [ '[line%s]' % (line_counter,) ]
662 inifile += ["files=%s" % (",".join(files))]
664 cur_attributes.update(standard_attributes)
665 cur_attributes.update(file_attributes)
667 for k,v in cur_attributes.items():
668 inifile += ["%s=%s" % (k,v)]
672 def make_base_name(pathname):
673 base = os.path.basename(pathname)
674 name, ext = os.path.splitext(base)
678 def make_submission_name(ininame):
679 name = make_base_name(ininame)
683 def make_ddf_name(pathname):
684 name = make_base_name(pathname)
688 def make_condor_name(pathname, run_type=None):
689 name = make_base_name(pathname)
691 if run_type is not None:
692 elements.append(run_type)
693 elements.append('condor')
694 return ".".join(elements)
697 def make_submit_script(target, header, body_list):
699 write out a text file
701 this was intended for condor submit scripts
704 target (str or stream):
705 if target is a string, we will open and close the file
706 if target is a stream, the caller is responsible.
709 header to write at the beginning of the file
710 body_list (list of strs):
711 a list of blocks to add to the file.
713 if type(target) in types.StringTypes:
718 for entry in body_list:
720 if type(target) in types.StringTypes:
723 def parse_filelist(file_string):
724 return file_string.split(',')
727 def validate_filelist(files):
729 Die if a file doesn't exist in a file list
732 if not os.path.exists(f):
733 raise RuntimeError("%s does not exist" % (f,))
736 if __name__ == "__main__":