2 from ConfigParser import SafeConfigParser
6 from optparse import OptionParser
8 from pprint import pprint, pformat
10 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
25 def main(cmdline=None):
26 parser = make_parser()
27 opts, args = parser.parse_args(cmdline)
30 logging.basicConfig(level = logging.DEBUG )
32 logging.basicConfig(level = logging.INFO )
34 logging.basicConfig(level = logging.WARNING )
37 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
39 if opts.host is None or opts.apiid is None or opts.apikey is None:
40 parser.error("Please specify host url, apiid, apikey")
43 parser.error("I need at least one library submission-dir input file")
45 library_result_map = []
47 library_result_map.extend(read_library_result_map(a))
49 if opts.daf is not None:
50 link_daf(opts.daf, library_result_map)
53 build_fastqs(opts.host,
61 make_submission_ini(opts.host, apidata, library_result_map, not opts.single)
64 make_all_ddfs(library_result_map, opts.daf)
67 # Load defaults from the config files
68 config = SafeConfigParser()
69 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
71 sequence_archive = None
75 SECTION = 'sequence_archive'
76 if config.has_section(SECTION):
77 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
78 sequence_archive = os.path.expanduser(sequence_archive)
79 apiid = config.get(SECTION, 'apiid', apiid)
80 apikey = config.get(SECTION, 'apikey', apikey)
81 apihost = config.get(SECTION, 'host', apihost)
83 parser = OptionParser()
86 parser.add_option('--fastq', help="generate scripts for making fastq files",
87 default=False, action="store_true")
89 parser.add_option('--ini', help="generate submission ini file", default=False,
92 parser.add_option('--makeddf', help='make the ddfs', default=False,
95 parser.add_option('--daf', default=None, help='specify daf name')
96 parser.add_option('--force', default=False, action="store_true",
97 help="Force regenerating fastqs")
99 # configuration options
100 parser.add_option('--apiid', default=apiid, help="Specify API ID")
101 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
102 parser.add_option('--host', default=apihost,
103 help="specify HTSWorkflow host",)
104 parser.add_option('--sequence', default=sequence_archive,
105 help="sequence repository")
106 parser.add_option('--single', default=False, action="store_true",
107 help="treat the sequences as single ended runs")
110 parser.add_option('--verbose', default=False, action="store_true",
111 help='verbose logging')
112 parser.add_option('--debug', default=False, action="store_true",
113 help='debug logging')
117 def build_fastqs(host, apidata, sequences_path, library_result_map,
118 paired=True, force=False ):
120 Generate condor scripts to build any needed fastq files
123 host (str): root of the htsworkflow api server
124 apidata (dict): id & key to post to the server
125 sequences_path (str): root of the directory tree to scan for files
126 library_result_map (list): [(library_id, destination directory), ...]
127 paired: should we assume that we are processing paired end records?
128 if False, we will treat this as single ended.
130 qseq_condor_header = """
132 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
133 error=log/qseq2fastq.err.$(process).log
134 output=log/qseq2fastq.out.$(process).log
135 log=log/qseq2fastq.log
138 qseq_condor_entries = []
139 srf_condor_header = """
141 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2named_fastq.py
142 output=log/srf_pair_fastq.out.$(process).log
143 error=log/srf_pair_fastq.err.$(process).log
144 log=log/srf_pair_fastq.log
145 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
148 srf_condor_entries = []
149 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
150 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
151 lib_db = find_archive_sequence_files(host,
156 # find what targets we're missing
158 for lib_id, result_dir in library_result_map:
160 for lane_key, sequences in lib['lanes'].items():
161 for seq in sequences:
162 if paired and seq.read is None:
164 filename_attributes = {
165 'flowcell': seq.flowcell,
171 # throw out test runs
172 # FIXME: this should probably be configurable
175 if seq.flowcell == '30CUUAAXX':
176 # 30CUUAAXX run sucked
178 if seq.flowcell == '30DY0AAXX':
179 # 30DY0 only ran for 151 bases instead of 152
180 # it is actually 76 1st read, 75 2nd read
185 target_name = fastq_paired_template % filename_attributes
187 target_name = fastq_single_template % filename_attributes
189 target_pathname = os.path.join(result_dir, target_name)
190 if force or not os.path.exists(target_pathname):
191 t = needed_targets.setdefault(target_pathname, {})
192 t[seq.filetype] = seq
194 for target_pathname, available_sources in needed_targets.items():
195 logging.debug(' target : %s' % (target_pathname,))
196 logging.debug(' candidate sources: %s' % (available_sources,))
197 if available_sources.has_key('qseq'):
198 source = available_sources['qseq']
199 qseq_condor_entries.append(
200 condor_qseq_to_fastq(source.path,
205 elif available_sources.has_key('srf'):
206 source = available_sources['srf']
207 mid = getattr(source, 'mid_point', None)
208 srf_condor_entries.append(
209 condor_srf_to_fastq(source.path,
217 print " need file", target_pathname
219 if len(srf_condor_entries) > 0:
220 make_submit_script('srf.fastq.condor',
224 if len(qseq_condor_entries) > 0:
225 make_submit_script('qseq.fastq.condor',
229 def link_daf(daf_path, library_result_map):
230 if not os.path.exists(daf_path):
231 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
233 base_daf = os.path.basename(daf_path)
235 for lib_id, result_dir in library_result_map:
236 submission_daf = os.path.join(result_dir, base_daf)
237 if not os.path.exists(submission_daf):
238 os.link(daf_path, submission_daf)
240 def make_submission_ini(host, apidata, library_result_map, paired=True):
241 # ma is "map algorithm"
251 '.bai': {'view': None, 'MapAlgorithm': 'NA'},
252 '.bam': {'view': aligns, 'MapAlgorithm': ma},
253 '.splices.bam': {'view': 'Splices', 'MapAlgorithm': ma},
254 '.jnct': {'view': 'Junctions', 'MapAlgorithm': ma},
255 '.plus.bigwig': {'view': 'PlusSignal', 'MapAlgorithm': ma},
256 '.minus.bigwig': {'view': 'MinusSignal', 'MapAlgorithm': ma},
257 '.bigwig': {'view': 'Signal', 'MapAlgorithm': ma},
258 '.tar.bz2': {'view': None},
259 '.condor': {'view': None},
260 '.daf': {'view': None},
261 '.ddf': {'view': None},
262 'novel.genes.expr': {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
263 'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
264 '.genes.expr': {'view': 'GeneFPKM', 'MapAlgorithm': ma},
265 '.transcripts.expr': {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
266 '.fastq': {'view': 'Fastq', 'MapAlgorithm': 'NA' },
267 '_r1.fastq': {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
268 '_r2.fastq': {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
269 '.gtf': {'view': 'GeneModel', 'MapAlgorithm': ma},
270 '.ini': {'view': None},
271 '.log': {'view': None},
272 '.stats.txt': {'view': 'InsLength', 'MapAlgorithm': ma},
273 '.srf': {'view': None},
274 '.wig': {'view': None},
275 '.zip': {'view': None},
278 candidate_fastq_src = {}
280 for lib_id, result_dir in library_result_map:
281 order_by = ['order_by=files', 'view', 'replicate', 'cell',
282 'readType', 'mapAlgorithm', 'insertLength' ]
283 inifile = ['[config]']
284 inifile += [" ".join(order_by)]
287 lib_info = get_library_info(host, apidata, lib_id)
288 result_ini = os.path.join(result_dir, result_dir+'.ini')
290 if lib_info['cell_line'].lower() == 'unknown':
291 logging.warn("Library %s missing cell_line" % (lib_id,))
293 standard_attributes = {'cell': lib_info['cell_line'],
294 'replicate': lib_info['replicate'],
297 if lib_info['insert_size'] is None:
298 errmsg = "Library %s is missing insert_size, assuming 200"
299 logging.warn(errmsg % (lib_id,))
302 insert_size = lib_info['insert_size']
303 standard_attributes['insertLength'] = insert_size
304 standard_attributes['readType'] = '2x75'
306 standard_attributes['insertLength'] = 'ilNA'
307 standard_attributes['readType'] = '1x75D'
310 submission_files = os.listdir(result_dir)
312 for f in submission_files:
313 best_ext = find_best_extension(attributes, f)
315 if best_ext is not None:
316 if attributes[best_ext]['view'] is None:
319 elif best_ext.endswith('fastq'):
320 fastqs.setdefault(best_ext, set()).add(f)
323 make_submission_section(line_counter,
332 raise ValueError("Unrecognized file: %s" % (f,))
334 # add in fastqs on a single line.
335 for extension, fastq_set in fastqs.items():
337 make_submission_section(line_counter,
340 attributes[extension])
345 f = open(result_ini,'w')
346 f.write(os.linesep.join(inifile))
349 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
351 for lib_id, result_dir in library_result_map:
352 ininame = result_dir+'.ini'
353 inipathname = os.path.join(result_dir, ininame)
354 if os.path.exists(inipathname):
356 make_ddf(ininame, daf_name, True, make_condor, result_dir)
359 if make_condor and len(dag_fragment) > 0:
360 dag_filename = 'submission.dagman'
361 if os.path.exists(dag_filename):
362 logging.warn("%s exists, please delete" % (dag_filename,))
364 f = open(dag_filename,'w')
365 f.write( os.linesep.join(dag_fragment))
366 f.write( os.linesep )
370 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
372 Make ddf files, and bonus condor file
376 if outdir is not None:
381 ddf_name = make_ddf_name(ininame)
383 output = open(ddf_name,'w')
385 file_list = read_ddf_ini(ininame, output)
387 file_list.append(daf_name)
388 if ddf_name is not None:
389 file_list.append(ddf_name)
392 archive_condor = make_condor_archive_script(ininame, file_list)
393 upload_condor = make_condor_upload_script(ininame)
395 dag_fragments.extend(
396 make_dag_fragment(ininame, archive_condor, upload_condor)
403 def read_ddf_ini(filename, output=sys.stdout):
405 Read a ini file and dump out a tab delmited text file
408 config = SafeConfigParser()
409 config.read(filename)
411 order_by = shlex.split(config.get("config", "order_by"))
413 output.write("\t".join(order_by))
414 output.write(os.linesep)
415 sections = config.sections()
417 for section in sections:
418 if section == "config":
419 # skip the config block
423 v = config.get(section, key)
426 file_list.extend(parse_filelist(v))
428 output.write("\t".join(values))
429 output.write(os.linesep)
432 def read_library_result_map(filename):
434 Read a file that maps library id to result directory.
435 Does not support spaces in filenames.
440 stream = open(filename,'r')
444 if not line.startswith('#'):
445 library_id, result_dir = line.strip().split()
446 results.append((library_id, result_dir))
449 def make_condor_archive_script(ininame, files):
450 script = """Universe = vanilla
452 Executable = /bin/tar
453 arguments = czvf ../%(archivename)s %(filelist)s
455 Error = compress.err.$(Process).log
456 Output = compress.out.$(Process).log
457 Log = /tmp/submission-compress.log
458 initialdir = %(initialdir)s
463 if not os.path.exists(f):
464 raise RuntimeError("Missing %s" % (f,))
466 context = {'archivename': make_submission_name(ininame),
467 'filelist': " ".join(files),
468 'initialdir': os.getcwd()}
470 condor_script = make_condor_name(ininame, 'archive')
471 condor_stream = open(condor_script,'w')
472 condor_stream.write(script % context)
473 condor_stream.close()
476 def make_condor_upload_script(ininame):
477 script = """Universe = vanilla
479 Executable = /usr/bin/lftp
480 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
482 Error = upload.err.$(Process).log
483 Output = upload.out.$(Process).log
484 Log = /tmp/submission-upload.log
485 initialdir = %(initialdir)s
489 context = {'archivename': make_submission_name(ininame),
490 'initialdir': os.getcwd()}
492 condor_script = make_condor_name(ininame, 'upload')
493 condor_stream = open(condor_script,'w')
494 condor_stream.write(script % context)
495 condor_stream.close()
498 def make_dag_fragment(ininame, archive_condor, upload_condor):
500 Make the couple of fragments compress and then upload the data.
502 cur_dir = os.getcwd()
503 archive_condor = os.path.join(cur_dir, archive_condor)
504 upload_condor = os.path.join(cur_dir, upload_condor)
505 job_basename = make_base_name(ininame)
508 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
509 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
510 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
514 def get_library_info(host, apidata, library_id):
515 url = api.library_url(host, library_id)
516 contents = api.retrieve_info(url, apidata)
519 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
520 mid=None, force=False):
521 args = ['-c', srf_file, ]
523 args.extend(['--left', target_pathname])
524 # this is ugly. I did it because I was pregenerating the target
525 # names before I tried to figure out what sources could generate
526 # those targets, and everything up to this point had been
527 # one-to-one. So I couldn't figure out how to pair the
529 # With this at least the command will run correctly.
530 # however if we rename the default targets, this'll break
531 # also I think it'll generate it twice.
532 args.extend(['--right',
533 target_pathname.replace('_r1.fastq', '_r2.fastq')])
535 args.extend(['--single', target_pathname ])
536 if flowcell is not None:
537 args.extend(['--flowcell', flowcell])
540 args.extend(['-m', str(mid)])
543 args.extend(['--force'])
548 """ % (" ".join(args),)
552 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
553 args = ['-i', qseq_file, '-o', target_pathname ]
554 if flowcell is not None:
555 args.extend(['-f', flowcell])
559 """ % (" ".join(args))
563 def find_archive_sequence_files(host, apidata, sequences_path,
566 Find all the archive sequence files possibly associated with our results.
569 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
573 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
575 for lib_id, result_dir in library_result_map:
576 lib_info = get_library_info(host, apidata, lib_id)
577 lib_db[lib_id] = lib_info
579 for lane in lib_info['lane_set']:
580 lane_key = (lane['flowcell'], lane['lane_number'])
581 candidate_lanes[lane_key] = lib_id
582 seq_dirs.add(os.path.join(sequences_path,
585 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
586 candidate_seq_list = scan_for_sequences(seq_dirs)
588 # at this point we have too many sequences as scan_for_sequences
589 # returns all the sequences in a flowcell directory
590 # so lets filter out the extras
592 for seq in candidate_seq_list:
593 lane_key = (seq.flowcell, seq.lane)
594 lib_id = candidate_lanes.get(lane_key, None)
595 if lib_id is not None:
596 lib_info = lib_db[lib_id]
597 lanes = lib_info.setdefault('lanes', {})
598 lanes.setdefault(lane_key, set()).add(seq)
602 def find_best_extension(extension_map, filename):
604 Search through extension_map looking for the best extension
605 The 'best' is the longest match
608 extension_map (dict): '.ext' -> { 'view': 'name' or None }
609 filename (str): the filename whose extention we are about to examine
612 path, last_ext = os.path.splitext(filename)
614 for ext in extension_map.keys():
615 if filename.endswith(ext):
618 elif len(ext) > len(best_ext):
622 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
624 Create a section in the submission ini file
626 inifile = [ '[line%s]' % (line_counter,) ]
627 inifile += ["files=%s" % (",".join(files))]
629 cur_attributes.update(standard_attributes)
630 cur_attributes.update(file_attributes)
632 for k,v in cur_attributes.items():
633 inifile += ["%s=%s" % (k,v)]
636 def make_base_name(pathname):
637 base = os.path.basename(pathname)
638 name, ext = os.path.splitext(base)
641 def make_submission_name(ininame):
642 name = make_base_name(ininame)
645 def make_ddf_name(pathname):
646 name = make_base_name(pathname)
649 def make_condor_name(pathname, run_type=None):
650 name = make_base_name(pathname)
652 if run_type is not None:
653 elements.append(run_type)
654 elements.append('condor')
655 return ".".join(elements)
657 def make_submit_script(target, header, body_list):
659 write out a text file
661 this was intended for condor submit scripts
664 target (str or stream):
665 if target is a string, we will open and close the file
666 if target is a stream, the caller is responsible.
669 header to write at the beginning of the file
670 body_list (list of strs):
671 a list of blocks to add to the file.
673 if type(target) in types.StringTypes:
678 for entry in body_list:
680 if type(target) in types.StringTypes:
683 def parse_filelist(file_string):
684 return file_string.split(',')
686 def validate_filelist(files):
688 Die if a file doesn't exist in a file list
691 if not os.path.exists(f):
692 raise RuntimeError("%s does not exist" % (f,))
694 if __name__ == "__main__":