2 from ConfigParser import SafeConfigParser
6 from optparse import OptionParser
8 from pprint import pprint, pformat
10 from StringIO import StringIO
19 from htsworkflow.util import api
20 from htsworkflow.pipelines.sequences import \
21 create_sequence_table, \
25 def main(cmdline=None):
26 parser = make_parser()
27 opts, args = parser.parse_args(cmdline)
30 logging.basicConfig(level = logging.DEBUG )
32 logging.basicConfig(level = logging.INFO )
34 logging.basicConfig(level = logging.WARNING )
37 apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
39 if opts.host is None or opts.apiid is None or opts.apikey is None:
40 parser.error("Please specify host url, apiid, apikey")
43 parser.error("I need at least one library submission-dir input file")
45 library_result_map = []
47 library_result_map.extend(read_library_result_map(a))
49 if opts.daf is not None:
50 link_daf(opts.daf, library_result_map)
53 build_fastqs(opts.host,
60 make_submission_ini(opts.host, apidata, library_result_map)
63 make_all_ddfs(library_result_map, opts.daf)
66 # Load defaults from the config files
67 config = SafeConfigParser()
68 config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
70 sequence_archive = None
74 SECTION = 'sequence_archive'
75 if config.has_section(SECTION):
76 sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
77 sequence_archive = os.path.expanduser(sequence_archive)
78 apiid = config.get(SECTION, 'apiid', apiid)
79 apikey = config.get(SECTION, 'apikey', apikey)
80 apihost = config.get(SECTION, 'host', apihost)
82 parser = OptionParser()
85 parser.add_option('--fastq', help="generate scripts for making fastq files",
86 default=False, action="store_true")
88 parser.add_option('--ini', help="generate submission ini file", default=False,
91 parser.add_option('--makeddf', help='make the ddfs', default=False,
94 parser.add_option('--daf', default=None, help='specify daf name')
95 parser.add_option('--force', default=False, action="store_true",
96 help="Force regenerating fastqs")
98 # configuration options
99 parser.add_option('--apiid', default=apiid, help="Specify API ID")
100 parser.add_option('--apikey', default=apikey, help="Specify API KEY")
101 parser.add_option('--host', default=apihost,
102 help="specify HTSWorkflow host",)
103 parser.add_option('--sequence', default=sequence_archive,
104 help="sequence repository")
107 parser.add_option('--verbose', default=False, action="store_true",
108 help='verbose logging')
109 parser.add_option('--debug', default=False, action="store_true",
110 help='debug logging')
114 def build_fastqs(host, apidata, sequences_path, library_result_map,
117 Generate condor scripts to build any needed fastq files
120 host (str): root of the htsworkflow api server
121 apidata (dict): id & key to post to the server
122 sequences_path (str): root of the directory tree to scan for files
123 library_result_map (list): [(library_id, destination directory), ...]
125 qseq_condor_header = """
127 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/qseq2fastq
128 error=log/qseq2fastq.err.$(process).log
129 output=log/qseq2fastq.out.$(process).log
130 log=log/qseq2fastq.log
133 qseq_condor_entries = []
134 srf_condor_header = """
136 executable=/woldlab/rattus/lvol0/mus/home/diane/proj/solexa/gaworkflow/scripts/srf2named_fastq.py
137 output=log/srf_pair_fastq.out.$(process).log
138 error=log/srf_pair_fastq.err.$(process).log
139 log=log/srf_pair_fastq.log
140 environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin"
143 srf_condor_entries = []
144 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
145 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
146 lib_db = find_archive_sequence_files(host,
151 # find what targets we're missing
153 for lib_id, result_dir in library_result_map:
155 lane_dict = make_lane_dict(lib_db, lib_id)
157 for lane_key, sequences in lib['lanes'].items():
158 for seq in sequences:
159 paired = lane_dict[seq.flowcell]['paired_end']
160 if paired and seq.read is None:
162 filename_attributes = {
163 'flowcell': seq.flowcell,
169 # throw out test runs
170 # FIXME: this should probably be configurable
173 if seq.flowcell == '30CUUAAXX':
174 # 30CUUAAXX run sucked
176 if seq.flowcell == '30DY0AAXX':
177 # 30DY0 only ran for 151 bases instead of 152
178 # it is actually 76 1st read, 75 2nd read
183 target_name = fastq_paired_template % filename_attributes
185 target_name = fastq_single_template % filename_attributes
187 target_pathname = os.path.join(result_dir, target_name)
188 if force or not os.path.exists(target_pathname):
189 t = needed_targets.setdefault(target_pathname, {})
190 t[seq.filetype] = seq
192 for target_pathname, available_sources in needed_targets.items():
193 logging.debug(' target : %s' % (target_pathname,))
194 logging.debug(' candidate sources: %s' % (available_sources,))
195 if available_sources.has_key('qseq'):
196 source = available_sources['qseq']
197 qseq_condor_entries.append(
198 condor_qseq_to_fastq(source.path,
203 elif available_sources.has_key('srf'):
204 source = available_sources['srf']
205 mid = getattr(source, 'mid_point', None)
206 srf_condor_entries.append(
207 condor_srf_to_fastq(source.path,
215 print " need file", target_pathname
217 if len(srf_condor_entries) > 0:
218 make_submit_script('srf.fastq.condor',
222 if len(qseq_condor_entries) > 0:
223 make_submit_script('qseq.fastq.condor',
227 def link_daf(daf_path, library_result_map):
228 if not os.path.exists(daf_path):
229 raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
231 base_daf = os.path.basename(daf_path)
233 for lib_id, result_dir in library_result_map:
234 submission_daf = os.path.join(result_dir, base_daf)
235 if not os.path.exists(submission_daf):
236 os.link(daf_path, submission_daf)
238 def make_submission_ini(host, apidata, library_result_map, paired=True):
239 # ma is "map algorithm"
249 '.bai': {'view': None, 'MapAlgorithm': 'NA'},
250 '.bam': {'view': aligns, 'MapAlgorithm': ma},
251 '.splices.bam': {'view': 'Splices', 'MapAlgorithm': ma},
252 '.jnct': {'view': 'Junctions', 'MapAlgorithm': ma},
253 '.plus.bigwig': {'view': 'PlusSignal', 'MapAlgorithm': ma},
254 '.minus.bigwig': {'view': 'MinusSignal', 'MapAlgorithm': ma},
255 '.bigwig': {'view': 'Signal', 'MapAlgorithm': ma},
256 '.tar.bz2': {'view': None},
257 '.condor': {'view': None},
258 '.daf': {'view': None},
259 '.ddf': {'view': None},
260 'denovo.genes.expr': {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
261 'denovo.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
262 'novel.genes.expr': {'view': 'GeneDeNovo', 'MapAlgorithm': ma},
263 'novel.transcripts.expr': {'view': 'TranscriptDeNovo', 'MapAlgorithm': ma},
264 '.genes.expr': {'view': 'GeneFPKM', 'MapAlgorithm': ma},
265 '.transcripts.expr': {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
266 '.transcript.expr': {'view': 'TranscriptFPKM', 'MapAlgorithm': ma},
267 '.fastq': {'view': 'Fastq', 'MapAlgorithm': 'NA' },
268 '_r1.fastq': {'view': 'FastqRd1', 'MapAlgorithm': 'NA'},
269 '_r2.fastq': {'view': 'FastqRd2', 'MapAlgorithm': 'NA'},
270 '.gtf': {'view': 'GeneModel', 'MapAlgorithm': ma},
271 '.ini': {'view': None},
272 '.log': {'view': None},
273 '.stats.txt': {'view': 'InsLength', 'MapAlgorithm': ma},
274 '.srf': {'view': None},
275 '.wig': {'view': None},
276 '.zip': {'view': None},
279 candidate_fastq_src = {}
281 for lib_id, result_dir in library_result_map:
282 order_by = ['order_by=files', 'view', 'replicate', 'cell',
283 'readType', 'mapAlgorithm', 'insertLength' ]
284 inifile = ['[config]']
285 inifile += [" ".join(order_by)]
288 lib_info = get_library_info(host, apidata, lib_id)
289 result_ini = os.path.join(result_dir, result_dir+'.ini')
291 if lib_info['cell_line'].lower() == 'unknown':
292 logging.warn("Library %s missing cell_line" % (lib_id,))
294 standard_attributes = {'cell': lib_info['cell_line'],
295 'replicate': lib_info['replicate'],
298 if lib_info['insert_size'] is None:
299 errmsg = "Library %s is missing insert_size, assuming 200"
300 logging.warn(errmsg % (lib_id,))
303 insert_size = lib_info['insert_size']
304 standard_attributes['insertLength'] = insert_size
305 standard_attributes['readType'] = '2x75'
307 standard_attributes['insertLength'] = 'ilNA'
308 standard_attributes['readType'] = '1x75D'
311 submission_files = os.listdir(result_dir)
313 for f in submission_files:
314 best_ext = find_best_extension(attributes, f)
316 if best_ext is not None:
317 if attributes[best_ext]['view'] is None:
320 elif best_ext.endswith('fastq'):
321 fastqs.setdefault(best_ext, set()).add(f)
324 make_submission_section(line_counter,
333 raise ValueError("Unrecognized file: %s" % (f,))
335 # add in fastqs on a single line.
336 for extension, fastq_set in fastqs.items():
338 make_submission_section(line_counter,
341 attributes[extension])
346 f = open(result_ini,'w')
347 f.write(os.linesep.join(inifile))
349 def make_lane_dict(lib_db, lib_id):
351 Convert the lane_set in a lib_db to a dictionary
352 indexed by flowcell ID
355 for lane in lib_db[lib_id]['lane_set']:
356 result.append((lane['flowcell'], lane))
359 def make_all_ddfs(library_result_map, daf_name, make_condor=True):
361 for lib_id, result_dir in library_result_map:
362 ininame = result_dir+'.ini'
363 inipathname = os.path.join(result_dir, ininame)
364 if os.path.exists(inipathname):
366 make_ddf(ininame, daf_name, True, make_condor, result_dir)
369 if make_condor and len(dag_fragment) > 0:
370 dag_filename = 'submission.dagman'
371 if os.path.exists(dag_filename):
372 logging.warn("%s exists, please delete" % (dag_filename,))
374 f = open(dag_filename,'w')
375 f.write( os.linesep.join(dag_fragment))
376 f.write( os.linesep )
380 def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None):
382 Make ddf files, and bonus condor file
386 if outdir is not None:
391 ddf_name = make_ddf_name(ininame)
393 output = open(ddf_name,'w')
395 file_list = read_ddf_ini(ininame, output)
397 file_list.append(daf_name)
398 if ddf_name is not None:
399 file_list.append(ddf_name)
402 archive_condor = make_condor_archive_script(ininame, file_list)
403 upload_condor = make_condor_upload_script(ininame)
405 dag_fragments.extend(
406 make_dag_fragment(ininame, archive_condor, upload_condor)
413 def read_ddf_ini(filename, output=sys.stdout):
415 Read a ini file and dump out a tab delmited text file
418 config = SafeConfigParser()
419 config.read(filename)
421 order_by = shlex.split(config.get("config", "order_by"))
423 output.write("\t".join(order_by))
424 output.write(os.linesep)
425 sections = config.sections()
427 for section in sections:
428 if section == "config":
429 # skip the config block
433 v = config.get(section, key)
436 file_list.extend(parse_filelist(v))
438 output.write("\t".join(values))
439 output.write(os.linesep)
442 def read_library_result_map(filename):
444 Read a file that maps library id to result directory.
445 Does not support spaces in filenames.
450 stream = open(filename,'r')
454 if not line.startswith('#'):
455 library_id, result_dir = line.strip().split()
456 results.append((library_id, result_dir))
459 def make_condor_archive_script(ininame, files):
460 script = """Universe = vanilla
462 Executable = /bin/tar
463 arguments = czvf ../%(archivename)s %(filelist)s
465 Error = compress.err.$(Process).log
466 Output = compress.out.$(Process).log
467 Log = /tmp/submission-compress.log
468 initialdir = %(initialdir)s
473 if not os.path.exists(f):
474 raise RuntimeError("Missing %s" % (f,))
476 context = {'archivename': make_submission_name(ininame),
477 'filelist': " ".join(files),
478 'initialdir': os.getcwd()}
480 condor_script = make_condor_name(ininame, 'archive')
481 condor_stream = open(condor_script,'w')
482 condor_stream.write(script % context)
483 condor_stream.close()
486 def make_condor_upload_script(ininame):
487 script = """Universe = vanilla
489 Executable = /usr/bin/lftp
490 arguments = -c put ../%(archivename)s -o ftp://detrout@encodeftp.cse.ucsc.edu/
492 Error = upload.err.$(Process).log
493 Output = upload.out.$(Process).log
494 Log = /tmp/submission-upload.log
495 initialdir = %(initialdir)s
499 context = {'archivename': make_submission_name(ininame),
500 'initialdir': os.getcwd()}
502 condor_script = make_condor_name(ininame, 'upload')
503 condor_stream = open(condor_script,'w')
504 condor_stream.write(script % context)
505 condor_stream.close()
508 def make_dag_fragment(ininame, archive_condor, upload_condor):
510 Make the couple of fragments compress and then upload the data.
512 cur_dir = os.getcwd()
513 archive_condor = os.path.join(cur_dir, archive_condor)
514 upload_condor = os.path.join(cur_dir, upload_condor)
515 job_basename = make_base_name(ininame)
518 fragments.append('JOB %s_archive %s' % (job_basename, archive_condor))
519 fragments.append('JOB %s_upload %s' % (job_basename, upload_condor))
520 fragments.append('PARENT %s_archive CHILD %s_upload' % (job_basename, job_basename))
524 def get_library_info(host, apidata, library_id):
525 url = api.library_url(host, library_id)
526 contents = api.retrieve_info(url, apidata)
529 def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None,
530 mid=None, force=False):
533 args.extend(['--left', target_pathname])
534 # this is ugly. I did it because I was pregenerating the target
535 # names before I tried to figure out what sources could generate
536 # those targets, and everything up to this point had been
537 # one-to-one. So I couldn't figure out how to pair the
539 # With this at least the command will run correctly.
540 # however if we rename the default targets, this'll break
541 # also I think it'll generate it twice.
542 args.extend(['--right',
543 target_pathname.replace('_r1.fastq', '_r2.fastq')])
545 args.extend(['--single', target_pathname ])
546 if flowcell is not None:
547 args.extend(['--flowcell', flowcell])
550 args.extend(['-m', str(mid)])
553 args.extend(['--force'])
558 """ % (" ".join(args),)
562 def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False):
563 args = ['-i', qseq_file, '-o', target_pathname ]
564 if flowcell is not None:
565 args.extend(['-f', flowcell])
569 """ % (" ".join(args))
573 def find_archive_sequence_files(host, apidata, sequences_path,
576 Find all the archive sequence files possibly associated with our results.
579 logging.debug("Searching for sequence files in: %s" %(sequences_path,))
583 #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
585 for lib_id, result_dir in library_result_map:
586 lib_info = get_library_info(host, apidata, lib_id)
587 lib_db[lib_id] = lib_info
589 for lane in lib_info['lane_set']:
590 lane_key = (lane['flowcell'], lane['lane_number'])
591 candidate_lanes[lane_key] = lib_id
592 seq_dirs.add(os.path.join(sequences_path,
595 logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
596 candidate_seq_list = scan_for_sequences(seq_dirs)
598 # at this point we have too many sequences as scan_for_sequences
599 # returns all the sequences in a flowcell directory
600 # so lets filter out the extras
602 for seq in candidate_seq_list:
603 lane_key = (seq.flowcell, seq.lane)
604 lib_id = candidate_lanes.get(lane_key, None)
605 if lib_id is not None:
606 lib_info = lib_db[lib_id]
607 lanes = lib_info.setdefault('lanes', {})
608 lanes.setdefault(lane_key, set()).add(seq)
612 def find_best_extension(extension_map, filename):
614 Search through extension_map looking for the best extension
615 The 'best' is the longest match
618 extension_map (dict): '.ext' -> { 'view': 'name' or None }
619 filename (str): the filename whose extention we are about to examine
622 path, last_ext = os.path.splitext(filename)
624 for ext in extension_map.keys():
625 if filename.endswith(ext):
628 elif len(ext) > len(best_ext):
632 def make_submission_section(line_counter, files, standard_attributes, file_attributes):
634 Create a section in the submission ini file
636 inifile = [ '[line%s]' % (line_counter,) ]
637 inifile += ["files=%s" % (",".join(files))]
639 cur_attributes.update(standard_attributes)
640 cur_attributes.update(file_attributes)
642 for k,v in cur_attributes.items():
643 inifile += ["%s=%s" % (k,v)]
646 def make_base_name(pathname):
647 base = os.path.basename(pathname)
648 name, ext = os.path.splitext(base)
651 def make_submission_name(ininame):
652 name = make_base_name(ininame)
655 def make_ddf_name(pathname):
656 name = make_base_name(pathname)
659 def make_condor_name(pathname, run_type=None):
660 name = make_base_name(pathname)
662 if run_type is not None:
663 elements.append(run_type)
664 elements.append('condor')
665 return ".".join(elements)
667 def make_submit_script(target, header, body_list):
669 write out a text file
671 this was intended for condor submit scripts
674 target (str or stream):
675 if target is a string, we will open and close the file
676 if target is a stream, the caller is responsible.
679 header to write at the beginning of the file
680 body_list (list of strs):
681 a list of blocks to add to the file.
683 if type(target) in types.StringTypes:
688 for entry in body_list:
690 if type(target) in types.StringTypes:
693 def parse_filelist(file_string):
694 return file_string.split(',')
696 def validate_filelist(files):
698 Die if a file doesn't exist in a file list
701 if not os.path.exists(f):
702 raise RuntimeError("%s does not exist" % (f,))
704 if __name__ == "__main__":