Add script to try and build submissions to the UCSC encode project.
authorDiane Trout <diane@caltech.edu>
Thu, 10 Jun 2010 00:55:11 +0000 (00:55 +0000)
committerDiane Trout <diane@caltech.edu>
Thu, 10 Jun 2010 00:55:11 +0000 (00:55 +0000)
This version supports generating qseq2fastq entries using the
htsworkflow api and scanning the flowcell repository directory.

There was code to generate the ddf files (from ini files).

I need to update the ini generation code to scan the submisison directory
for fastq files and group them by read.

extra/README.txt [new file with mode: 0644]
extra/ucsc_encode_submission/README.txt [new file with mode: 0644]
extra/ucsc_encode_submission/ucsc_gather.py [new file with mode: 0755]

diff --git a/extra/README.txt b/extra/README.txt
new file mode 100644 (file)
index 0000000..ee6d069
--- /dev/null
@@ -0,0 +1,3 @@
+This directory is intended for extra modules that might use some HTS 
+Workflow components, but aren't actually important for running your 
+own sequencing facility.
diff --git a/extra/ucsc_encode_submission/README.txt b/extra/ucsc_encode_submission/README.txt
new file mode 100644 (file)
index 0000000..bab7a55
--- /dev/null
@@ -0,0 +1,5 @@
+I was building a variety of scripts to handle submitting our data to the 
+UCSC ENCODE pipeline, some of them were pulling data out of the htsworkflow
+databases, and since I needed an official place to put the scripts
+
+I decided here.
diff --git a/extra/ucsc_encode_submission/ucsc_gather.py b/extra/ucsc_encode_submission/ucsc_gather.py
new file mode 100755 (executable)
index 0000000..d34a575
--- /dev/null
@@ -0,0 +1,487 @@
+#!/usr/bin/env python
+from ConfigParser import SafeConfigParser
+from glob import glob
+import json
+import logging
+from optparse import OptionParser
+import os
+from pprint import pprint
+import shlex
+from StringIO import StringIO
+import time
+import sys
+import urllib
+import urllib2
+import urlparse
+
+
+from htsworkflow.util import api
+from htsworkflow.pipelines.sequences import \
+    create_sequence_table, \
+    scan_for_sequences
+
+def make_submission_name(ininame):
+    base = os.path.basename(ininame)
+    name, ext = os.path.splitext(base)
+    return name + '.tgz'
+
+def make_ddf_name(pathname):
+    base = os.path.basename(pathname)
+    name, ext = os.path.splitext(base)
+    return name + '.ddf'
+
+def make_condor_name(pathname):
+    base = os.path.basename(pathname)
+    name, ext = os.path.splitext(base)
+    return name + '.condor'
+    
+def parse_filelist(file_string):
+    return file_string.split(',')
+
+def validate_filelist(files):
+    """
+    Die if a file doesn't exist in a file list
+    """
+    for f in files:
+        if not os.path.exists(f):
+            raise RuntimeError("%s does not exist" % (f,))
+
+def read_ddf_ini(filename, output=sys.stdout):
+    """
+    Read a ini file and dump out a tab delmited text file
+    """
+    file_list = []
+    config = SafeConfigParser()
+    config.read(filename)
+
+    order_by = shlex.split(config.get("config", "order_by"))
+
+    output.write("\t".join(order_by))
+    output.write(os.linesep)
+    sections = config.sections()
+    sections.sort()
+    for section in sections:
+        if section == "config":
+            # skip the config block
+            continue
+        values = []
+        for key in order_by:
+            v = config.get(section, key)
+            values.append(v)
+            if key == 'files':
+                file_list.extend(parse_filelist(v))
+                
+        output.write("\t".join(values))
+        output.write(os.linesep)
+    return file_list
+            
+def make_condor_archive_script(ininame, files):
+    script = """Universe = vanilla
+
+Executable = /bin/tar
+arguments = czvf ../%(archivename)s %(filelist)s
+
+Error = compress.err.$(Process).log
+Output = compress.out.$(Process).log
+Log = compress.log
+initialdir = %(initialdir)s
+
+queue 
+"""
+    for f in files:
+        if not os.path.exists(f):
+            raise RuntimeError("Missing %s" % (f,))
+
+    context = {'archivename': make_submission_name(ininame),
+               'filelist': " ".join(files),
+               'initialdir': os.getcwd()}
+
+    condor_script = make_condor_name(ininame)
+    condor_stream = open(condor_script,'w')
+    condor_stream.write(script % context)
+    condor_stream.close()
+
+def make_ddf(ininame,  daf_name, guess_ddf=False, make_condor=False, outdir=None):
+    """
+    Make ddf files, and bonus condor file
+    """
+    curdir = os.getcwd()
+    if outdir is not None:
+        os.chdir(outdir)
+    output = sys.stdout
+    ddf_name = None
+    if guess_ddf:
+        ddf_name = make_ddf_name(ininame)
+        print ddf_name
+        output = open(ddf_name,'w')
+
+    file_list = read_ddf_ini(ininame, output)
+
+    file_list.append(daf_name)
+    if ddf_name is not None:
+        file_list.append(ddf_name)
+
+    if make_condor:
+        make_condor_archive_script(ininame, file_list)
+        
+    os.chdir(curdir)
+
+
+def get_library_info(host, apidata, library_id):
+    url = api.library_url(host, library_id)
+    contents = api.retrieve_info(url, apidata)
+    return contents
+    
+def read_library_result_map(filename):
+    stream = open(filename,'r')
+
+    results = []
+    for line in stream:
+        library_id, result_dir = line.strip().split()
+        results.append((library_id, result_dir))
+    return results
+
+def condor_srf_to_fastq(srf_file, target_pathname):
+    script = """output=%(target_pathname)s
+arguments="-c %(srf_file)s"
+queue
+"""
+    params = {'srf_file': srf_file,
+              'target_pathname': target_pathname}
+    
+    return  script % params
+
+def condor_qseq_to_fastq(qseq_file, target_pathname):
+    script = """
+arguments="-i %(qseq_file)s -o %(target_pathname)s"
+queue
+"""
+    params = {'qseq_file': qseq_file,
+              'target_pathname': target_pathname}
+    
+    return script % params
+
+def find_archive_sequence_files(host, apidata, sequences_path, 
+                                library_result_map):
+    """
+    Find all the archive sequence files possibly associated with our results.
+
+    """
+    logging.debug("Searching for sequence files in: %s" %(sequences_path,))
+
+    lib_db = {}
+    seq_dirs = set()
+    #seq_dirs = set(os.path.join(sequences_path, 'srfs'))
+    candidate_lanes = {}
+    for lib_id, result_dir in library_result_map:
+        lib_info = get_library_info(host, apidata, lib_id)
+        lib_db[lib_id] = lib_info
+
+        for lane in lib_info['lane_set']:
+            lane_key = (lane['flowcell'], lane['lane_number'])
+            candidate_lanes[lane_key] = lib_id
+            seq_dirs.add(os.path.join(sequences_path, 
+                                         'flowcells', 
+                                         lane['flowcell']))
+    logging.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
+    candidate_seq_list = scan_for_sequences(seq_dirs)
+
+    # at this point we have too many sequences as scan_for_sequences
+    # returns all the sequences in a flowcell directory
+    # so lets filter out the extras
+    
+    for seq in candidate_seq_list:
+        lane_key = (seq.flowcell, seq.lane)
+        lib_id = candidate_lanes.get(lane_key, None)
+        if lib_id is not None:
+            lib_info = lib_db[lib_id]
+            lanes = lib_info.setdefault('lanes', {})
+            lanes.setdefault(lane_key, set()).add(seq)
+    
+    return lib_db
+
+def build_fastqs(host, apidata, sequences_path, library_result_map):
+    """
+    Generate condor scripts to build any needed fastq files
+    """
+    qseq_condor_header = """
+Universe=vanilla
+executable=/home/diane/proj/gaworkflow/scripts/qseq2fastq
+error=qseqfastq.err.$(process).log
+output=qseqfastq.out.$(process).log
+log=qseqfastq.log
+
+"""
+    qseq_condor_entries = []
+    srf_condor_header = """
+Universe=vanilla
+executable=/home/diane/bin/srf2fastq
+error=srffastq.err.$(process)
+log=srffastq.log
+
+"""
+    srf_condor_entries = []
+    fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
+    fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
+    lib_db = find_archive_sequence_files(host, 
+                                         apidata, 
+                                         sequences_path, 
+                                         library_result_map)
+
+    # find what targets we're missing
+    needed_targets = {}
+    for lib_id, result_dir in library_result_map:
+        lib = lib_db[lib_id]
+        for lane_key, sequences in lib['lanes'].items():
+            for seq in sequences:
+                # skip srfs for the moment
+                if seq.filetype == 'srf':
+                    continue
+                filename_attributes = { 
+                    'flowcell': seq.flowcell,
+                    'lib_id': lib_id,
+                    'lane': seq.lane,
+                    'read': seq.read,
+                    'cycle': seq.cycle
+                    }
+                # throw out test runs
+                # FIXME: this should probably be configurable
+                if seq.cycle < 50:
+                    continue
+                if seq.flowcell == '30CUUAAXX':
+                    # 30CUUAAXX run sucked
+                    continue
+
+                # end filters
+                if seq.read is not None:
+                    target_name = fastq_paired_template % filename_attributes
+                else:
+                    target_name = fastq_single_template % filename_attributes
+
+                target_pathname = os.path.join(result_dir, target_name)
+                if not os.path.exists(target_pathname):
+                    t = needed_targets.setdefault(target_pathname, {})
+                    t[seq.filetype] = seq
+                    
+    for target_pathname, available_sources in needed_targets.items():
+        if available_sources.has_key('qseq'):
+            source = available_sources['qseq']
+            qseq_condor_entries.append(
+                condor_qseq_to_fastq(source.path, target_pathname)
+            )
+        #elif available_sources.has_key('srf'):
+        #    srf_condor_entries.append(condor_srf_to_fastq(source, target))
+        else:
+            print " need file", target_pathname
+
+#
+#        f = open('srf.fastq.condor','w')
+#        f.write(srf_condor)
+#        f.close()
+
+    if len(qseq_condor_entries) > 0:
+        #f = sys.stdout
+        f = open('qseq.fastq.condor','w')
+        f.write(qseq_condor_header)
+        for entry in qseq_condor_entries:
+            f.write(entry)
+        f.close()
+
+def find_best_extension(extension_map, filename):
+    """
+    Search through extension_map looking for the best extension
+    The 'best' is the longest match
+
+    :Args:
+      extension_map (dict): '.ext' -> { 'view': 'name' or None }
+      filename (str): the filename whose extention we are about to examine
+    """
+    best_ext = None
+    path, last_ext = os.path.splitext(filename)
+
+    for ext in extension_map.keys():
+        if filename.endswith(ext):
+            if best_ext is None:
+                best_ext = ext
+            elif len(ext) > len(best_ext):
+                best_ext = ext
+    return best_ext
+
+def add_submission_section(line_counter, files, standard_attributes, file_attributes):
+    """
+    Create a section in the submission ini file
+    """
+    inifile = [ '[line%s]' % (line_counter,) ]
+    inifile += ["files=%s" % (",".join(files))]
+    cur_attributes = {}
+    cur_attributes.update(standard_attributes)
+    cur_attributes.update(file_attributes)
+    
+    for k,v in cur_attributes.items():
+        inifile += ["%s=%s" % (k,v)]
+    return inifile
+    
+def make_submission_ini(host, apidata, library_result_map):
+    attributes = {
+        '.bai':                   {'view': None}, # bam index
+        '.bam':                   {'view': 'Signal'},
+        '.condor':                {'view': None},
+        '.daf':                   {'view': None},
+        '.ddf':                   {'view': None},
+        '.splices.bam':           {'view': 'Splices'},
+        '.bed':                   {'view': 'TopHatJunctions'},
+        '.ini':                   {'view': None},
+        '.log':                   {'view': None},
+        '_r1.fastq':              {'view': 'FastqRd1'},
+        '_r2.fastq':              {'view': 'FastqRd2'},
+        '.tar.bz2':               {'view': None},
+        'novel.genes.expr':       {'view': 'GeneDeNovoFPKM'},
+        'novel.transcripts.expr': {'view': 'TranscriptDeNovoFPKM'},
+        '.genes.expr':            {'view': 'GeneFPKM'},
+        '.transcripts.expr':      {'view': 'TranscriptFPKM'},
+        '.stats.txt':             {'view': 'InsLength'},
+        '.gtf':                   {'view': 'CufflinksGeneModel'},
+        '.wig':                   {'view': 'RawSignal'},
+    }
+   
+    candidate_fastq_src = {}
+
+    for lib_id, result_dir in library_result_map:
+        inifile =  ['[config]']
+        inifile += ['order_by=files view cell localization rnaExtract mapAlgorithm readType replicate labVersion']
+        inifile += ['']
+        line_counter = 1
+        lib_info = get_library_info(host, apidata, lib_id)
+        result_ini = os.path.join(result_dir, result_dir+'.ini')
+
+        standard_attributes = {'cell': lib_info['cell_line'],
+                               'insertLength': '200', # ali
+                               'labVersion': 'TopHat',
+                               'localization': 'cell',
+                               'mapAlgorithm': 'TopHat',
+                               'readType': '2x75', #ali
+                               'replicate': lib_info['replicate'],
+                               'rnaExtract': 'longPolyA',
+                               }
+
+        # write fastq line
+        #fastqs = []
+        #for lane in lib_info['lane_set']:
+        #    target_name = "%s_%s_%s.fastq" % (lane['flowcell'], lib_id, lane['lane_number'])
+        #    fastqs.append(target_name)
+        #inifile.extend(
+        #    make_run_block(line_counter, fastqs, standard_attributes, attributes['.fastq'])
+        #)
+        #inifile += ['']
+        #line_counter += 1
+
+        # write other lines
+        submission_files = os.listdir(result_dir)
+        for f in submission_files:
+            best_ext = find_best_extension(attributes, f)
+
+            if best_ext is not None:
+               if attributes[best_ext]['view'] is None:
+                   continue
+               inifile.extend(
+                   add_submission_section(line_counter,
+                                          [f],
+                                          standard_attributes,
+                                          attributes[best_ext]
+                   )
+               )
+               inifile += ['']
+               line_counter += 1
+            else:
+                raise ValueError("Unrecognized file: %s" % (f,))
+
+        f = open(result_ini,'w')
+        f.write(os.linesep.join(inifile))
+        f.close()
+
+def link_daf(daf_path, library_result_map):
+    if not os.path.exists(daf_path):
+        raise RuntimeError("%s does not exist, how can I link to it?" % (daf_path,))
+
+    base_daf = os.path.basename(daf_path)
+    
+    for lib_id, result_dir in library_result_map:
+        submission_daf = os.path.join(result_dir, base_daf)
+        if not os.path.exists(submission_daf):
+            os.link(daf_path, submission_daf)
+
+def make_all_ddfs(library_result_map, daf_name):
+    for lib_id, result_dir in library_result_map:
+        ininame = result_dir+'.ini'
+        inipathname = os.path.join(result_dir, ininame)
+        if os.path.exists(inipathname):
+            make_ddf(ininame, daf_name, True, True, result_dir)
+            
+def make_parser():
+    # Load defaults from the config files
+    config = SafeConfigParser()
+    config.read([os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'])
+    
+    sequence_archive = None
+    apiid = None
+    apikey = None
+    apihost = None
+    SECTION = 'sequence_archive'
+    if config.has_section(SECTION):
+        sequence_archive = config.get(SECTION, 'sequence_archive',sequence_archive)
+        sequence_archive = os.path.expanduser(sequence_archive)
+        apiid = config.get(SECTION, 'apiid', apiid)
+        apikey = config.get(SECTION, 'apikey', apikey)
+        apihost = config.get(SECTION, 'host', apihost)
+
+    parser = OptionParser()
+
+    parser.add_option('--fastq', help="generate scripts for making fastq files",
+                      default=False, action="store_true")
+
+    parser.add_option('--ini', help="generate submission ini file", default=False,
+                      action="store_true")
+
+    parser.add_option('--makeddf', help='make the ddfs', default=False,
+                      action="store_true")
+    
+    parser.add_option('--daf', default=None, help='specify daf name')
+    parser.add_option('--sequence', help="sequence repository", default=sequence_archive)
+    parser.add_option('--host', help="specify HTSWorkflow host", default=apihost)
+    parser.add_option('--apiid', help="Specify API ID", default=apiid)
+    parser.add_option('--apikey', help="Specify API KEY", default=apikey)
+    
+    return parser
+
+def main(cmdline=None):
+    parser = make_parser()
+    opts, args = parser.parse_args(cmdline)
+
+    apidata = {'apiid': opts.apiid, 'apikey': opts.apikey }
+
+    if opts.host is None or opts.apiid is None or opts.apikey is None:
+        parser.error("Please specify host url, apiid, apikey")
+
+    if len(args) == 0:
+        parser.error("I need at least one library submission-dir input file")
+        
+    library_result_map = []
+    for a in args:
+        library_result_map.extend(read_library_result_map(a))
+
+    if opts.daf is not None:
+        link_daf(opts.daf, library_result_map)
+
+    if opts.fastq:
+        build_fastqs(opts.host, apidata, opts.sequence, library_result_map)
+
+    if opts.ini:
+        make_submission_ini(opts.host, apidata, library_result_map)
+
+    if opts.makeddf:
+        make_all_ddfs(library_result_map, opts.daf)
+        
+if __name__ == "__main__":
+    #logging.basicConfig(level = logging.WARNING )
+    logging.basicConfig(level = logging.DEBUG )
+    main()