2 from ConfigParser import SafeConfigParser
6 from optparse import OptionParser
8 from pprint import pprint, pformat
10 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
24 def make_submission_name(ininame):
25 base = os.path.basename(ininame)
26 name, ext = os.path.splitext(base)
29 def make_ddf_name(pathname):
30 base = os.path.basename(pathname)
31 name, ext = os.path.splitext(base)
34 def make_condor_name(pathname):
35 base = os.path.basename(pathname)
36 name, ext = os.path.splitext(base)
37 return name + '.condor'
39 def make_submit_script(target, header, body_list):
43 this was intended for condor submit scripts
46 target (str or stream):
47 if target is a string, we will open and close the file
48 if target is a stream, the caller is responsible.
51 header to write at the beginning of the file
52 body_list (list of strs):
53 a list of blocks to add to the file.
55 if type(target) in types.StringTypes:
60 for entry in body_list:
62 if type(target) in types.StringTypes:
65 def parse_filelist(file_string):
66 return file_string.split(',')
68 def validate_filelist(files):
70 Die if a file doesn't exist in a file list
73 if not os.path.exists(f):
74 raise RuntimeError("%s does not exist" % (f,))
76 def read_ddf_ini(filename, output=sys.stdout):
78 Read a ini file and dump out a tab delmited text file
81 config = SafeConfigParser()
84 order_by = shlex.split(config.get("config", "order_by"))
86 output.write("\t".join(order_by))
87 output.write(os.linesep)
88 sections = config.sections()
90 for section in sections:
91 if section == "config":
92 # skip the config block
96 v = config.get(section, key)
99 file_list.extend(parse_filelist(v))
101 output.write("\t".join(values))
102 output.write(os.linesep)
105 def make_condor_archive_script(ininame, files):
106 script = """Universe = vanilla
108 Executable = /bin/tar
109 arguments = czvf ../%(archivename)s %(filelist)s
111 Error = compress.err.$(Process).log
112 Output = compress.out.$(Process).log
114 initialdir = %(initialdir)s
119 if not os.path.exists(f):
120 raise RuntimeError("Missing %s" % (f,))
122 context = {'archivename': make_submission_name(ininame),
123 'filelist': " ".join(files),
124 'initialdir': os.getcwd()}
126 condor_script = make_condor_name(ininame)
127 condor_stream = open(condor_script,'w')
128 condor_stream.write(script % context)
129 condor_stream.close()
131 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
133 Make ddf files, and bonus condor file
136 if outdir is not None:
141 ddf_name = make_ddf_name(ininame)
143 output = open(ddf_name,'w')
145 file_list = read_ddf_ini(ininame, output)
147 file_list.append(daf_name)
148 if ddf_name is not None:
149 file_list.append(ddf_name)
152 make_condor_archive_script(ininame, file_list)
157 def get_library_info(host, apidata, library_id):
158 url = api.library_url(host, library_id)
159 contents = api.retrieve_info(url, apidata)
162 def read_library_result_map(filename):
163 stream = open(filename,'r')
167 if not line.startswith('#'):
168 library_id, result_dir = line.strip().split()
169 results.append((library_id, result_dir))
172 def condor_srf_to_fastq(srf_file, target_pathname):
173 script = """output=%(target_pathname)s
174 arguments="-c %(srf_file)s"
177 params = {'srf_file': srf_file,
178 'target_pathname': target_pathname}
180 return script % params
182 def condor_qseq_to_fastq(qseq_file, target_pathname):
184 arguments="-i %(qseq_file)s -o %(target_pathname)s"
187 params = {'qseq_file': qseq_file,
188 'target_pathname': target_pathname}
190 return script % params
192 def find_archive_sequence_files(host, apidata, sequences_path,
195 Find all the archive sequence files possibly associated with our results.
198 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
202 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
204 for lib_id, result_dir in library_result_map:
205 lib_info = get_library_info(host, apidata, lib_id)
206 lib_db[lib_id] = lib_info
208 for lane in lib_info['lane_set']:
209 lane_key = (lane['flowcell'], lane['lane_number'])
210 candidate_lanes[lane_key] = lib_id
211 seq_dirs.add(os.path.join(sequences_path,
214 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
215 candidate_seq_list = scan_for_sequences(seq_dirs)
217 # at this point we have too many sequences as scan_for_sequences
218 # returns all the sequences in a flowcell directory
219 # so lets filter out the extras
221 for seq in candidate_seq_list:
222 lane_key = (seq.flowcell, seq.lane)
223 lib_id = candidate_lanes.get(lane_key, None)
224 if lib_id is not None:
225 lib_info = lib_db[lib_id]
226 lanes = lib_info.setdefault('lanes', {})
227 lanes.setdefault(lane_key, set()).add(seq)
231 def build_fastqs(host, apidata, sequences_path, library_result_map,
234 Generate condor scripts to build any needed fastq files
237 host (str): root of the htsworkflow api server
238 apidata (dict): id & key to post to the server
239 sequences_path (str): root of the directory tree to scan for files
240 library_result_map (list): [(library_id, destination directory), ...]
241 paired: should we assume that we are processing paired end records?
242 if False, we will treat this as single ended.
244 qseq_condor_header = """
246 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/gaworkflow/scripts/qseq2fastq
247 error=qseqfastq.err.$(process).log
248 output=qseqfastq.out.$(process).log
252 qseq_condor_entries = []
253 srf_condor_header = """
255 executable=/woldlab/rattus/lvol0/mus/home/diane/bin/srf2fastq
256 output=srf2fastq.out.$(process).log
257 error=srf2fastq.err.$(process).log
261 srf_condor_entries = []
262 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
263 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
264 lib_db = find_archive_sequence_files(host,
269 # find what targets we're missing
271 for lib_id, result_dir in library_result_map:
273 for lane_key, sequences in lib['lanes'].items():
274 for seq in sequences:
275 filename_attributes = {
276 'flowcell': seq.flowcell,
282 # throw out test runs
283 # FIXME: this should probably be configurable
286 if seq.flowcell == '30CUUAAXX':
287 # 30CUUAAXX run sucked
292 target_name = fastq_paired_template % filename_attributes
294 target_name = fastq_single_template % filename_attributes
296 target_pathname = os.path.join(result_dir, target_name)
297 if not os.path.exists(target_pathname):
298 t = needed_targets.setdefault(target_pathname, {})
299 t[seq.filetype] = seq
301 for target_pathname, available_sources in needed_targets.items():
302 logging.debug(' target : %s' % (target_pathname,))
303 logging.debug(' candidate sources: %s' % (available_sources,))
304 if available_sources.has_key('qseq'):
305 source = available_sources['qseq']
306 qseq_condor_entries.append(
307 condor_qseq_to_fastq(source.path, target_pathname)
309 elif available_sources.has_key('srf'):
310 source = available_sources['srf']
311 if source.read is not None:
313 "srf -> fastq paired end doesn't work yet: %s" % (source,)
316 srf_condor_entries.append(
317 condor_srf_to_fastq(source.path, target_pathname)
320 print " need file", target_pathname
322 if len(srf_condor_entries) > 0:
323 make_submit_script('srf.fastq.condor',
327 if len(qseq_condor_entries) > 0:
328 make_submit_script('qseq.fastq.condor',
332 def find_best_extension(extension_map, filename):
334 Search through extension_map looking for the best extension
335 The 'best' is the longest match
338 extension_map (dict): '.ext' -> { 'view': 'name' or None }
339 filename (str): the filename whose extention we are about to examine
342 path, last_ext = os.path.splitext(filename)
344 for ext in extension_map.keys():
345 if filename.endswith(ext):
348 elif len(ext) > len(best_ext):
352 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
354 Create a section in the submission ini file
356 inifile = [ '[line%s]' % (line_counter,) ]
357 inifile += ["files=%s" % (",".join(files))]
359 cur_attributes.update(standard_attributes)
360 cur_attributes.update(file_attributes)
362 for k,v in cur_attributes.items():
363 inifile += ["%s=%s" % (k,v)]
366 def make_submission_ini(host, apidata, library_result_map):
368 '.bai': {'view': None}, # bam index
369 '.bam': {'view': 'Signal'},
370 '.splices.bam': {'view': 'Splices'},
371 '.bed': {'view': 'TopHatJunctions'},
372 '.bigwig': {'view': 'RawSigna'},
373 '.tar.bz2': {'view': None},
374 '.condor': {'view': None},
375 '.daf': {'view': None},
376 '.ddf': {'view': None},
377 'novel.genes.expr': {'view': 'GeneDeNovoFPKM'},
378 'novel.transcripts.expr': {'view': 'TranscriptDeNovoFPKM'},
379 '.genes.expr': {'view': 'GeneFPKM'},
380 '.transcripts.expr': {'view': 'TranscriptFPKM'},
381 '.fastq': {'view': 'Fastq' },
382 '_r1.fastq': {'view': 'FastqRd1'},
383 '_r2.fastq': {'view': 'FastqRd2'},
384 '.gtf': {'view': 'CufflinksGeneModel'},
385 '.ini': {'view': None},
386 '.log': {'view': None},
387 '.stats.txt': {'view': 'InsLength'},
388 '.srf': {'view': None},
389 '.wig': {'view': 'RawSignal'},
392 candidate_fastq_src = {}
394 for lib_id, result_dir in library_result_map:
395 inifile = ['[config]']
396 inifile += ['order_by=files view cell localization rnaExtract mapAlgorithm readType replicate labVersion']
399 lib_info = get_library_info(host, apidata, lib_id)
400 result_ini = os.path.join(result_dir, result_dir+'.ini')
402 if lib_info['cell_line'].lower() == 'unknown':
403 logging.warn("Library %s missing cell_line" % (lib_id,))
405 standard_attributes = {'cell': lib_info['cell_line'],
406 'insertLength': '200',
407 'labVersion': 'TopHat',
408 'localization': 'cell',
409 'mapAlgorithm': 'TopHat',
411 'replicate': lib_info['replicate'],
412 'rnaExtract': 'longPolyA',
416 submission_files = os.listdir(result_dir)
418 for f in submission_files:
419 best_ext = find_best_extension(attributes, f)
421 if best_ext is not None:
422 if attributes[best_ext]['view'] is None:
425 elif best_ext.endswith('fastq'):
426 fastqs.setdefault(best_ext, set()).add(f)
429 make_submission_section(line_counter,
438 raise ValueError("Unrecognized file: %s" % (f,))
440 # add in fastqs on a single line.
441 for extension, fastq_set in fastqs.items():
443 make_submission_section(line_counter,
446 attributes[extension])
451 f = open(result_ini,'w')
452 f.write(os.linesep.join(inifile))
454 def link_daf(daf_path, library_result_map):
455 if not os.path.exists(daf_path):
456 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
458 base_daf = os.path.basename(daf_path)
460 for lib_id, result_dir in library_result_map:
461 submission_daf = os.path.join(result_dir, base_daf)
462 if not os.path.exists(submission_daf):
463 os.link(daf_path, submission_daf)
465 def make_all_ddfs(library_result_map, daf_name):
466 for lib_id, result_dir in library_result_map:
467 ininame = result_dir+'.ini'
468 inipathname = os.path.join(result_dir, ininame)
469 if os.path.exists(inipathname):
470 make_ddf(ininame, daf_name, True, True, result_dir)
473 # Load defaults from the config files
474 config = SafeConfigParser()
475 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
477 sequence_archive = None
481 SECTION = 'sequence_archive'
482 if config.has_section(SECTION):
483 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
484 sequence_archive = os.path.expanduser(sequence_archive)
485 apiid = config.get(SECTION, 'apiid', apiid)
486 apikey = config.get(SECTION, 'apikey', apikey)
487 apihost = config.get(SECTION, 'host', apihost)
489 parser = OptionParser()
492 parser.add_option('--fastq', help="generate scripts for making fastq files",
493 default=False, action="store_true")
495 parser.add_option('--ini', help="generate submission ini file", default=False,
498 parser.add_option('--makeddf', help='make the ddfs', default=False,
501 parser.add_option('--daf', default=None, help='specify daf name')
503 # configuration options
504 parser.add_option('--apiid', default=apiid, help="Specify API ID")
505 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
506 parser.add_option('--host', default=apihost,
507 help="specify HTSWorkflow host",)
508 parser.add_option('--sequence', default=sequence_archive,
509 help="sequence repository")
510 parser.add_option('--single', default=False, action="store_true",
511 help="treat the sequences as single ended runs")
514 parser.add_option('--verbose', default=False, action="store_true",
515 help='verbose logging')
516 parser.add_option('--debug', default=False, action="store_true",
517 help='debug logging')
521 def main(cmdline=None):
522 parser = make_parser()
523 opts, args = parser.parse_args(cmdline)
526 logging.basicConfig(level = logging.DEBUG )
528 logging.basicConfig(level = logging.INFO )
530 logging.basicConfig(level = logging.WARNING )
533 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
535 if opts.host is None or opts.apiid is None or opts.apikey is None:
536 parser.error("Please specify host url, apiid, apikey")
539 parser.error("I need at least one library submission-dir input file")
541 library_result_map = []
543 library_result_map.extend(read_library_result_map(a))
545 if opts.daf is not None:
546 link_daf(opts.daf, library_result_map)
549 build_fastqs(opts.host,
556 make_submission_ini(opts.host, apidata, library_result_map)
559 make_all_ddfs(library_result_map, opts.daf)
561 if __name__ == "__main__":