From 78482b7a2ffed37ff0e2c9dcc6c2d266ce614e4f Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Thu, 10 Jun 2010 00:55:11 +0000 Subject: [PATCH] Add script to try and build submissions to the UCSC encode project. This version supports generating qseq2fastq entries using the htsworkflow api and scanning the flowcell repository directory. There was code to generate the ddf files (from ini files). I need to update the ini generation code to scan the submisison directory for fastq files and group them by read. --- extra/README.txt | 3 + extra/ucsc_encode_submission/README.txt | 5 + extra/ucsc_encode_submission/ucsc_gather.py | 487 ++++++++++++++++++++ 3 files changed, 495 insertions(+) create mode 100644 extra/README.txt create mode 100644 extra/ucsc_encode_submission/README.txt create mode 100755 extra/ucsc_encode_submission/ucsc_gather.py diff --git a/extra/README.txt b/extra/README.txt new file mode 100644 index 0000000..ee6d069 --- /dev/null +++ b/extra/README.txt @@ -0,0 +1,3 @@ +This directory is intended for extra modules that might use some HTS +Workflow components, but aren't actually important for running your +own sequencing facility. diff --git a/extra/ucsc_encode_submission/README.txt b/extra/ucsc_encode_submission/README.txt new file mode 100644 index 0000000..bab7a55 --- /dev/null +++ b/extra/ucsc_encode_submission/README.txt @@ -0,0 +1,5 @@ +I was building a variety of scripts to handle submitting our data to the +UCSC ENCODE pipeline, some of them were pulling data out of the htsworkflow +databases, and since I needed an official place to put the scripts + +I decided here. diff --git a/extra/ucsc_encode_submission/ucsc_gather.py b/extra/ucsc_encode_submission/ucsc_gather.py new file mode 100755 index 0000000..d34a575 --- /dev/null +++ b/extra/ucsc_encode_submission/ucsc_gather.py @@ -0,0 +1,487 @@ +#!/usr/bin/env python +from ConfigParser import SafeConfigParser +from glob import glob +import json +import logging +from optparse import OptionParser +import os +from pprint import pprint +import shlex +from StringIO import StringIO +import time +import sys +import urllib +import urllib2 +import urlparse + + +from htsworkflow.util import api +from htsworkflow.pipelines.sequences import \ + create_sequence_table, \ + scan_for_sequences + +def make_submission_name(ininame): + base = os.path.basename(ininame) + name, ext = os.path.splitext(base) + return name + '.tgz' + +def make_ddf_name(pathname): + base = os.path.basename(pathname) + name, ext = os.path.splitext(base) + return name + '.ddf' + +def make_condor_name(pathname): + base = os.path.basename(pathname) + name, ext = os.path.splitext(base) + return name + '.condor' + +def parse_filelist(file_string): + return file_string.split(',') + +def validate_filelist(files): + """ + Die if a file doesn't exist in a file list + """ + for f in files: + if not os.path.exists(f): + raise RuntimeError("%s does not exist" % (f,)) + +def read_ddf_ini(filename, output=sys.stdout): + """ + Read a ini file and dump out a tab delmited text file + """ + file_list = [] + config = SafeConfigParser() + config.read(filename) + + order_by = shlex.split(config.get("config", "order_by")) + + output.write("\t".join(order_by)) + output.write(os.linesep) + sections = config.sections() + sections.sort() + for section in sections: + if section == "config": + # skip the config block + continue + values = [] + for key in order_by: + v = config.get(section, key) + values.append(v) + if key == 'files': + file_list.extend(parse_filelist(v)) + + output.write("\t".join(values)) + output.write(os.linesep) + return file_list + +def make_condor_archive_script(ininame, files): + script = """Universe = vanilla + +Executable = /bin/tar +arguments = czvf ../%(archivename)s %(filelist)s + +Error = compress.err.$(Process).log +Output = compress.out.$(Process).log +Log = compress.log +initialdir = %(initialdir)s + +queue +""" + for f in files: + if not os.path.exists(f): + raise RuntimeError("Missing %s" % (f,)) + + context = {'archivename': make_submission_name(ininame), + 'filelist': " ".join(files), + 'initialdir': os.getcwd()} + + condor_script = make_condor_name(ininame) + condor_stream = open(condor_script,'w') + condor_stream.write(script % context) + condor_stream.close() + +def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None): + """ + Make ddf files, and bonus condor file + """ + curdir = os.getcwd() + if outdir is not None: + os.chdir(outdir) + output = sys.stdout + ddf_name = None + if guess_ddf: + ddf_name = make_ddf_name(ininame) + print ddf_name + output = open(ddf_name,'w') + + file_list = read_ddf_ini(ininame, output) + + file_list.append(daf_name) + if ddf_name is not None: + file_list.append(ddf_name) + + if make_condor: + make_condor_archive_script(ininame, file_list) + + os.chdir(curdir) + + +def get_library_info(host, apidata, library_id): + url = api.library_url(host, library_id) + contents = api.retrieve_info(url, apidata) + return contents + +def read_library_result_map(filename): + stream = open(filename,'r') + + results = [] + for line in stream: + library_id, result_dir = line.strip().split() + results.append((library_id, result_dir)) + return results + +def condor_srf_to_fastq(srf_file, target_pathname): + script = """output=%(target_pathname)s +arguments="-c %(srf_file)s" +queue +""" + params = {'srf_file': srf_file, + 'target_pathname': target_pathname} + + return script % params + +def condor_qseq_to_fastq(qseq_file, target_pathname): + script = """ +arguments="-i %(qseq_file)s -o %(target_pathname)s" +queue +""" + params = {'qseq_file': qseq_file, + 'target_pathname': target_pathname} + + return script % params + +def find_archive_sequence_files(host, apidata, sequences_path, + library_result_map): + """ + Find all the archive sequence files possibly associated with our results. + + """ + logging.debug("Searching for sequence files in: %s" %(sequences_path,)) + + lib_db = {} + seq_dirs = set() + #seq_dirs = set(os.path.join(sequences_path, 'srfs')) + candidate_lanes = {} + for lib_id, result_dir in library_result_map: + lib_info = get_library_info(host, apidata, lib_id) + lib_db[lib_id] = lib_info + + for lane in lib_info['lane_set']: + lane_key = (lane['flowcell'], lane['lane_number']) + candidate_lanes[lane_key] = lib_id + seq_dirs.add(os.path.join(sequences_path, + 'flowcells', + lane['flowcell'])) + logging.debug("Seq_dirs = %s" %(unicode(seq_dirs))) + candidate_seq_list = scan_for_sequences(seq_dirs) + + # at this point we have too many sequences as scan_for_sequences + # returns all the sequences in a flowcell directory + # so lets filter out the extras + + for seq in candidate_seq_list: + lane_key = (seq.flowcell, seq.lane) + lib_id = candidate_lanes.get(lane_key, None) + if lib_id is not None: + lib_info = lib_db[lib_id] + lanes = lib_info.setdefault('lanes', {}) + lanes.setdefault(lane_key, set()).add(seq) + + return lib_db + +def build_fastqs(host, apidata, sequences_path, library_result_map): + """ + Generate condor scripts to build any needed fastq files + """ + qseq_condor_header = """ +Universe=vanilla +executable=/home/diane/proj/gaworkflow/scripts/qseq2fastq +error=qseqfastq.err.$(process).log +output=qseqfastq.out.$(process).log +log=qseqfastq.log + +""" + qseq_condor_entries = [] + srf_condor_header = """ +Universe=vanilla +executable=/home/diane/bin/srf2fastq +error=srffastq.err.$(process) +log=srffastq.log + +""" + srf_condor_entries = [] + fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq' + fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq' + lib_db = find_archive_sequence_files(host, + apidata, + sequences_path, + library_result_map) + + # find what targets we're missing + needed_targets = {} + for lib_id, result_dir in library_result_map: + lib = lib_db[lib_id] + for lane_key, sequences in lib['lanes'].items(): + for seq in sequences: + # skip srfs for the moment + if seq.filetype == 'srf': + continue + filename_attributes = { + 'flowcell': seq.flowcell, + 'lib_id': lib_id, + 'lane': seq.lane, + 'read': seq.read, + 'cycle': seq.cycle + } + # throw out test runs + # FIXME: this should probably be configurable + if seq.cycle < 50: + continue + if seq.flowcell == '30CUUAAXX': + # 30CUUAAXX run sucked + continue + + # end filters + if seq.read is not None: + target_name = fastq_paired_template % filename_attributes + else: + target_name = fastq_single_template % filename_attributes + + target_pathname = os.path.join(result_dir, target_name) + if not os.path.exists(target_pathname): + t = needed_targets.setdefault(target_pathname, {}) + t[seq.filetype] = seq + + for target_pathname, available_sources in needed_targets.items(): + if available_sources.has_key('qseq'): + source = available_sources['qseq'] + qseq_condor_entries.append( + condor_qseq_to_fastq(source.path, target_pathname) + ) + #elif available_sources.has_key('srf'): + # srf_condor_entries.append(condor_srf_to_fastq(source, target)) + else: + print " need file", target_pathname + +# +# f = open('srf.fastq.condor','w') +# f.write(srf_condor) +# f.close() + + if len(qseq_condor_entries) > 0: + #f = sys.stdout + f = open('qseq.fastq.condor','w') + f.write(qseq_condor_header) + for entry in qseq_condor_entries: + f.write(entry) + f.close() + +def find_best_extension(extension_map, filename): + """ + Search through extension_map looking for the best extension + The 'best' is the longest match + + :Args: + extension_map (dict): '.ext' -> { 'view': 'name' or None } + filename (str): the filename whose extention we are about to examine + """ + best_ext = None + path, last_ext = os.path.splitext(filename) + + for ext in extension_map.keys(): + if filename.endswith(ext): + if best_ext is None: + best_ext = ext + elif len(ext) > len(best_ext): + best_ext = ext + return best_ext + +def add_submission_section(line_counter, files, standard_attributes, file_attributes): + """ + Create a section in the submission ini file + """ + inifile = [ '[line%s]' % (line_counter,) ] + inifile += ["files=%s" % (",".join(files))] + cur_attributes = {} + cur_attributes.update(standard_attributes) + cur_attributes.update(file_attributes) + + for k,v in cur_attributes.items(): + inifile += ["%s=%s" % (k,v)] + return inifile + +def make_submission_ini(host, apidata, library_result_map): + attributes = { + '.bai': {'view': None}, # bam index + '.bam': {'view': 'Signal'}, + '.condor': {'view': None}, + '.daf': {'view': None}, + '.ddf': {'view': None}, + '.splices.bam': {'view': 'Splices'}, + '.bed': {'view': 'TopHatJunctions'}, + '.ini': {'view': None}, + '.log': {'view': None}, + '_r1.fastq': {'view': 'FastqRd1'}, + '_r2.fastq': {'view': 'FastqRd2'}, + '.tar.bz2': {'view': None}, + 'novel.genes.expr': {'view': 'GeneDeNovoFPKM'}, + 'novel.transcripts.expr': {'view': 'TranscriptDeNovoFPKM'}, + '.genes.expr': {'view': 'GeneFPKM'}, + '.transcripts.expr': {'view': 'TranscriptFPKM'}, + '.stats.txt': {'view': 'InsLength'}, + '.gtf': {'view': 'CufflinksGeneModel'}, + '.wig': {'view': 'RawSignal'}, + } + + candidate_fastq_src = {} + + for lib_id, result_dir in library_result_map: + inifile = ['[config]'] + inifile += ['order_by=files view cell localization rnaExtract mapAlgorithm readType replicate labVersion'] + inifile += [''] + line_counter = 1 + lib_info = get_library_info(host, apidata, lib_id) + result_ini = os.path.join(result_dir, result_dir+'.ini') + + standard_attributes = {'cell': lib_info['cell_line'], + 'insertLength': '200', # ali + 'labVersion': 'TopHat', + 'localization': 'cell', + 'mapAlgorithm': 'TopHat', + 'readType': '2x75', #ali + 'replicate': lib_info['replicate'], + 'rnaExtract': 'longPolyA', + } + + # write fastq line + #fastqs = [] + #for lane in lib_info['lane_set']: + # target_name = "%s_%s_%s.fastq" % (lane['flowcell'], lib_id, lane['lane_number']) + # fastqs.append(target_name) + #inifile.extend( + # make_run_block(line_counter, fastqs, standard_attributes, attributes['.fastq']) + #) + #inifile += [''] + #line_counter += 1 + + # write other lines + submission_files = os.listdir(result_dir) + for f in submission_files: + best_ext = find_best_extension(attributes, f) + + if best_ext is not None: + if attributes[best_ext]['view'] is None: + continue + inifile.extend( + add_submission_section(line_counter, + [f], + standard_attributes, + attributes[best_ext] + ) + ) + inifile += [''] + line_counter += 1 + else: + raise ValueError("Unrecognized file: %s" % (f,)) + + f = open(result_ini,'w') + f.write(os.linesep.join(inifile)) + f.close() + +def link_daf(daf_path, library_result_map): + if not os.path.exists(daf_path): + raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,)) + + base_daf = os.path.basename(daf_path) + + for lib_id, result_dir in library_result_map: + submission_daf = os.path.join(result_dir, base_daf) + if not os.path.exists(submission_daf): + os.link(daf_path, submission_daf) + +def make_all_ddfs(library_result_map, daf_name): + for lib_id, result_dir in library_result_map: + ininame = result_dir+'.ini' + inipathname = os.path.join(result_dir, ininame) + if os.path.exists(inipathname): + make_ddf(ininame, daf_name, True, True, result_dir) + +def make_parser(): + # Load defaults from the config files + config = SafeConfigParser() + config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini']) + + sequence_archive = None + apiid = None + apikey = None + apihost = None + SECTION = 'sequence_archive' + if config.has_section(SECTION): + sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive) + sequence_archive = os.path.expanduser(sequence_archive) + apiid = config.get(SECTION, 'apiid', apiid) + apikey = config.get(SECTION, 'apikey', apikey) + apihost = config.get(SECTION, 'host', apihost) + + parser = OptionParser() + + parser.add_option('--fastq', help="generate scripts for making fastq files", + default=False, action="store_true") + + parser.add_option('--ini', help="generate submission ini file", default=False, + action="store_true") + + parser.add_option('--makeddf', help='make the ddfs', default=False, + action="store_true") + + parser.add_option('--daf', default=None, help='specify daf name') + parser.add_option('--sequence', help="sequence repository", default=sequence_archive) + parser.add_option('--host', help="specify HTSWorkflow host", default=apihost) + parser.add_option('--apiid', help="Specify API ID", default=apiid) + parser.add_option('--apikey', help="Specify API KEY", default=apikey) + + return parser + +def main(cmdline=None): + parser = make_parser() + opts, args = parser.parse_args(cmdline) + + apidata = {'apiid': opts.apiid, 'apikey': opts.apikey } + + if opts.host is None or opts.apiid is None or opts.apikey is None: + parser.error("Please specify host url, apiid, apikey") + + if len(args) == 0: + parser.error("I need at least one library submission-dir input file") + + library_result_map = [] + for a in args: + library_result_map.extend(read_library_result_map(a)) + + if opts.daf is not None: + link_daf(opts.daf, library_result_map) + + if opts.fastq: + build_fastqs(opts.host, apidata, opts.sequence, library_result_map) + + if opts.ini: + make_submission_ini(opts.host, apidata, library_result_map) + + if opts.makeddf: + make_all_ddfs(library_result_map, opts.daf) + +if __name__ == "__main__": + #logging.basicConfig(level = logging.WARNING ) + logging.basicConfig(level = logging.DEBUG ) + main() -- 2.30.2