From 41302335b13bb98b45a1f5599d01483ffbe18060 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Fri, 11 Dec 2009 23:56:10 +0000 Subject: [PATCH] Modify the srf utility to tar.bz2 the qseq files instead of the using the srf utility. Additionally I updated the runfolder script to capture a few more pieces of information (in addition to the switch to qseq files). I'm now capturing the IVC plot and pngs, and the flow cell reports generated by the 1.4 and later version of the pipeline. --- htsworkflow/pipelines/runfolder.py | 115 ++++++++++++++++++++++------- htsworkflow/pipelines/srf.py | 107 +++++++++++++++++++++++++-- scripts/srf | 15 +++- 3 files changed, 200 insertions(+), 37 deletions(-) diff --git a/htsworkflow/pipelines/runfolder.py b/htsworkflow/pipelines/runfolder.py index cfa68a4..038ae5b 100644 --- a/htsworkflow/pipelines/runfolder.py +++ b/htsworkflow/pipelines/runfolder.py @@ -26,6 +26,7 @@ LANE_LIST = range(1, LANES_PER_FLOWCELL+1) from htsworkflow.util.alphanum import alphanum from htsworkflow.util.ethelp import indent, flatten +from htsworkflow.util.queuecommands import QueueCommands from htsworkflow.pipelines import srf @@ -377,16 +378,57 @@ def is_compressed(filename): else: return False +def save_flowcell_reports(data_dir, cycle_dir): + """ + Save the flowcell quality reports + """ + data_dir = os.path.abspath(data_dir) + status_file = os.path.join(data_dir, 'Status.xml') + reports_dir = os.path.join(data_dir, 'reports') + reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2') + if os.path.exists(reports_dir): + cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ] + if os.path.exists(status_file): + cmd_list.extend(['Status.xml', 'Status.xsl']) + logging.info("Saving reports from " + reports_dir) + cwd = os.getcwd() + os.chdir(data_dir) + q = QueueCommands([" ".join(cmd_list)]) + q.run() + os.chdir(cwd) + + def save_summary_file(gerald_object, cycle_dir): - # Copy Summary.htm - summary_path = os.path.join(gerald_object.pathname, 'Summary.htm') - if os.path.exists(summary_path): - logging.info('Copying %s to %s' % (summary_path, cycle_dir)) - shutil.copy(summary_path, cycle_dir) - else: - logging.info('Summary file %s was not found' % (summary_path,)) + # Copy Summary.htm + summary_path = os.path.join(gerald_object.pathname, 'Summary.htm') + if os.path.exists(summary_path): + logging.info('Copying %s to %s' % (summary_path, cycle_dir)) + shutil.copy(summary_path, cycle_dir) + else: + logging.info('Summary file %s was not found' % (summary_path,)) + +def save_ivc_plot(bustard_object, cycle_dir): + """ + Save the IVC page and its supporting images + """ + plot_html = os.path.join(bustard_object.pathname, 'IVC.htm') + plot_image_path = os.path.join(bustard_object.pathname, 'Plots') + plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png') + + plot_target_path = os.path.join(cycle_dir, 'Plots') + + if os.path.exists(plot_html): + logging.debug("Saving %s" % ( plot_html, )) + logging.debug("Saving %s" % ( plot_images, )) + shutil.copy(plot_html, cycle_dir) + if not os.path.exists(plot_target_path): + os.mkdir(plot_target_path) + for plot_file in glob(plot_images): + shutil.copy(plot_file, plot_target_path) + else: + logging.warning('Missing IVC.html file, not archiving') + - def compress_score_files(bustard_object, cycle_dir): """ Compress score files into our result directory @@ -419,11 +461,13 @@ def compress_score_files(bustard_object, cycle_dir): tar.wait() -def compress_eland_results(gerald_object, cycle_dir): +def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): """ Compress eland result files into the archive directory """ # copy & bzip eland files + bz_commands = [] + for lanes_dictionary in gerald_object.eland_results.results: for eland_lane in lanes_dictionary.values(): source_name = eland_lane.pathname @@ -438,12 +482,18 @@ def compress_eland_results(gerald_object, cycle_dir): else: # not compressed dest_name += '.bz2' - args = ['bzip2', '-9', '-c', source_name] - logging.info('Running: %s' % ( " ".join(args) )) - bzip_dest = open(dest_name, 'w') - bzip = subprocess.Popen(args, stdout=bzip_dest) - logging.info('Saving to %s' % (dest_name, )) - bzip.wait() + args = ['bzip2', '-9', '-c', source_name, '>', dest_name ] + bz_commands.append(" ".join(args)) + #logging.info('Running: %s' % ( " ".join(args) )) + #bzip_dest = open(dest_name, 'w') + #bzip = subprocess.Popen(args, stdout=bzip_dest) + #logging.info('Saving to %s' % (dest_name, )) + #bzip.wait() + + if len(bz_commands) > 0: + q = QueueCommands(bz_commands, num_jobs) + q.run() + def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1): """ @@ -473,27 +523,36 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1): else: os.mkdir(cycle_dir) - # copy stuff out of the main run - g = r.gerald - # save run file r.save(cycle_dir) - # save summary file - save_summary_file(g, cycle_dir) + # save illumina flowcell status report + save_flowcell_reports( os.path.join(r.image_analysis.pathname, '..'), cycle_dir ) - # tar score files - compress_score_files(r.bustard, cycle_dir) + # save stuff from bustard + # grab IVC plot + save_ivc_plot(r.bustard, cycle_dir) - # compress eland result files - compress_eland_results(g, cycle_dir) - - # build srf commands + # build base call saving commands if site is not None: lanes = range(1,9) run_name = srf.pathname_to_run_name(r.pathname) - srf_cmds = srf.make_commands(run_name, lanes, site, cycle_dir) - srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2) + srf_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir) + srf.run_commands(r.bustard.pathname, srf_cmds, num_jobs) + + # save stuff from GERALD + # copy stuff out of the main run + g = r.gerald + + # save summary file + save_summary_file(g, cycle_dir) + + # compress eland result files + compress_eland_results(g, cycle_dir, num_jobs) + + # md5 all the compressed files once we're done + md5_commands = srf.make_md5_commands(cycle_dir) + srf.run_commands(cycle_dir, md5_commands, num_jobs) def rm_list(files, dry_run=True): for f in files: diff --git a/htsworkflow/pipelines/srf.py b/htsworkflow/pipelines/srf.py index 3809bc6..b27c33e 100644 --- a/htsworkflow/pipelines/srf.py +++ b/htsworkflow/pipelines/srf.py @@ -1,3 +1,4 @@ +from glob import glob import logging import os @@ -11,15 +12,28 @@ def pathname_to_run_name(base): """ Convert a pathname to a base runfolder name handle the case with a trailing / + + >>> print pathname_to_run_name("/a/b/c/run") + run + >>> print pathname_to_run_name("/a/b/c/run/") + run + >>> print pathname_to_run_name("run") + run + >>> print pathname_to_run_name("run/") + run + >>> print pathname_to_run_name("../run") + run + >>> print pathname_to_run_name("../run/") + run """ name = "" while len(name) == 0: base, name = os.path.split(base) if len(base) == 0: - return None + break return name -def make_commands(run_name, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11): +def make_srf_commands(run_name, bustard_dir, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11): """ make a subprocess-friendly list of command line arguments to run solexa2srf generates files like: @@ -65,11 +79,94 @@ def make_commands(run_name, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11): cmd_list.append(" ".join(cmd)) return cmd_list -def run_srf_commands(bustard_dir, cmd_list, num_jobs): - logging.info("chdir to %s" % (bustard_dir,)) +def create_qseq_patterns(bustard_dir): + """ + Scan a bustard directory for qseq files and determine a glob pattern + """ + # grab one tile for each lane. + qseqs = glob(os.path.join(bustard_dir, '*_0001_qseq.txt')) + qseqs = [ os.path.split(x)[-1] for x in qseqs ] + if len(qseqs[0].split('_')) == 4: + # single ended + return [(None,"s_%d_[0-9][0-9][0-9][0-9]_qseq.txt")] + elif len(qseqs[0].split('_')) == 5: + # more than 1 read + # build a dictionary of read numbers by lane + # ( just in case we didn't run all 8 lanes ) + lanes = {} + for q in qseqs: + sample, lane, read, tile, extension = q.split('_') + lanes.setdefault(lane, []).append(read) + qseq_patterns = [] + # grab a lane from the dictionary + # I don't think it matters which one. + k = lanes.keys()[0] + # build the list of patterns + for read in lanes[k]: + read = int(read) + qseq_patterns.append((read, 's_%d_' + '%d_[0-9][0-9][0-9][0-9]_qseq.txt' % (read,))) + return qseq_patterns + else: + raise RuntimeError('unrecognized qseq pattern, not a single or multiple read pattern') + +def make_qseq_commands(run_name, bustard_dir, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11): + """ + make a subprocess-friendly list of command line arguments to run solexa2srf + generates files like: + woldlab:080514_HWI-EAS229_0029_20768AAXX:8.srf + site run name lane + + run_name - most of the file name (run folder name is a good choice) + lanes - list of integers corresponding to which lanes to process + site_name - name of your "sequencing site" or "Individual" + destdir - where to write all the srf files + """ + # clean up pathname + logging.info("run_name %s" % ( run_name, )) + + cmd_list = [] + for lane in lanes: + name_prefix = '%s_%%l_%%t_' % (run_name,) + destdir = os.path.normpath(destdir) + qseq_patterns = create_qseq_patterns(bustard_dir) + + for read, pattern in qseq_patterns: + if read is None: + destname = '%s_%s_l%d.tar.bz2' % (site_name, run_name, lane) + dest_path = os.path.join(destdir, destname) + else: + destname = '%s_%s_l%d_r%d.tar.bz2' % (site_name, run_name, lane, read) + dest_path = os.path.join(destdir, destname) + + cmd = " ".join(['tar', 'cjf', dest_path, pattern % (lane,) ]) + logging.info("Generated command: " + cmd) + cmd_list.append(cmd) + + return cmd_list + +def run_commands(new_dir, cmd_list, num_jobs): + logging.info("chdir to %s" % (new_dir,)) curdir = os.getcwd() - os.chdir(bustard_dir) + os.chdir(new_dir) q = queuecommands.QueueCommands(cmd_list, num_jobs) q.run() os.chdir(curdir) +def make_md5_commands(destdir): + """ + Scan the cycle dir and create md5s for the contents + """ + cmd_list = [] + destdir = os.path.abspath(destdir) + bz2s = glob(os.path.join(destdir,"*.bz2")) + gzs = glob(os.path.join(destdir,"*gz")) + srfs = glob(os.path.join(destdir,"*.srf")) + + file_list = bz2s + gzs + srfs + + for f in file_list: + cmd = " ".join(['md5sum', f, '>', f+'.md5']) + logging.info('generated command: '+cmd) + cmd_list.append(cmd) + + return cmd_list diff --git a/scripts/srf b/scripts/srf index d363a4e..bcf835d 100644 --- a/scripts/srf +++ b/scripts/srf @@ -6,7 +6,8 @@ import os import sys from htsworkflow.pipelines import runfolder -from htsworkflow.pipelines.srf import make_commands, run_srf_commands, pathname_to_run_name +from htsworkflow.pipelines.srf import make_srf_commands, make_qseq_commands, \ + run_commands, pathname_to_run_name from htsworkflow.pipelines.srf import ILLUMINA2SRF10, ILLUMINA2SRF11, SOLEXA2SRF def make_parser(): @@ -84,7 +85,8 @@ def main(cmdline=None): parser.error( "Number of lane arguments must match number of runfolders" ) - + + make_commands = make_qseq_commands # build list of commands cmds = {} for runfolder_path, lanes in zip(args, lanes_list): @@ -99,14 +101,19 @@ def main(cmdline=None): return 1 elif len(runs) == 1: bustard_dir = runs[0].bustard.pathname - cmds[bustard_dir] = make_commands(run_name, lanes, opts.site, opts.dest_dir, opts.runfolder_version) + cmds[bustard_dir] = make_commands(run_name, + bustard_dir, + lanes, + opts.site, + opts.dest_dir, + opts.runfolder_version) else: print "ERROR: Couldn't find a bustard directory in", runfolder_path return 1 if not opts.dry_run: for cwd, cmd_list in cmds.items(): - run_srf_commands(cwd, cmd_list, opts.jobs) + run_commands(cwd, cmd_list, opts.jobs) else: for cwd, cmd_list in cmds.items(): print cwd -- 2.30.2