From 08657a5a08347f0fa872ae1bf9ee0d542049e3c8 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Wed, 30 Sep 2009 22:16:41 +0000 Subject: [PATCH] Add building srf files to runfolder as part of --extract-results This required splitting the srf code out from the srf script into a new module in htsworkflow/pipelines/srf.py --- htsworkflow/pipelines/runfolder.py | 142 +++++++++++++++++------------ htsworkflow/pipelines/srf.py | 75 +++++++++++++++ scripts/runfolder | 20 ++-- scripts/srf | 74 +-------------- 4 files changed, 174 insertions(+), 137 deletions(-) create mode 100644 htsworkflow/pipelines/srf.py diff --git a/htsworkflow/pipelines/runfolder.py b/htsworkflow/pipelines/runfolder.py index c17ebeb..5e9bd0b 100644 --- a/htsworkflow/pipelines/runfolder.py +++ b/htsworkflow/pipelines/runfolder.py @@ -26,6 +26,8 @@ LANE_LIST = range(1, LANES_PER_FLOWCELL+1) from htsworkflow.util.alphanum import alphanum from htsworkflow.util.ethelp import indent, flatten +from htsworkflow.pipelines import srf + class PipelineRun(object): """ Capture "interesting" information about a pipeline run @@ -374,7 +376,74 @@ def is_compressed(filename): else: return False -def extract_results(runs, output_base_dir=None): +def save_summary_file(gerald_object, cycle_dir): + # Copy Summary.htm + summary_path = os.path.join(gerald_object.pathname, 'Summary.htm') + if os.path.exists(summary_path): + logging.info('Copying %s to %s' % (summary_path, cycle_dir)) + shutil.copy(summary_path, cycle_dir) + else: + logging.info('Summary file %s was not found' % (summary_path,)) + + +def compress_score_files(gerald_object, cycle_dir): + """ + Compress score files into our result directory + """ + # check for g.pathname/Temp a new feature of 1.1rc1 + scores_path = gerald_object.pathname + scores_path_temp = os.path.join(scores_path, 'Temp') + if os.path.isdir(scores_path_temp): + scores_path = scores_path_temp + + # hopefully we have a directory that contains s_*_score files + score_files = [] + for f in os.listdir(scores_path): + if re.match('.*_score.txt', f): + score_files.append(f) + + tar_cmd = ['/bin/tar', 'c'] + score_files + bzip_cmd = [ 'bzip2', '-9', '-c' ] + tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2') + tar_dest = open(tar_dest_name, 'w') + logging.info("Compressing score files from %s" % (scores_path,)) + logging.info("Running tar: " + " ".join(tar_cmd[:10])) + logging.info("Running bzip2: " + " ".join(bzip_cmd)) + logging.info("Writing to %s" %(tar_dest_name)) + + env = {'BZIP': '-9'} + tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env, + cwd=scores_path) + bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest) + tar.wait() + +def compress_eland_results(gerald_object, cycle_dir): + """ + Compress eland result files into the archive directory + """ + # copy & bzip eland files + for lanes_dictionary in gerald_object.eland_results.results: + for eland_lane in lanes_dictionary.values(): + source_name = eland_lane.pathname + path, name = os.path.split(eland_lane.pathname) + dest_name = os.path.join(cycle_dir, name) + logging.info("Saving eland file %s to %s" % \ + (source_name, dest_name)) + + if is_compressed(name): + logging.info('Already compressed, Saving to %s' % (dest_name, )) + shutil.copy(source_name, dest_name) + else: + # not compressed + dest_name += '.bz2' + args = ['bzip2', '-9', '-c', source_name] + logging.info('Running: %s' % ( " ".join(args) )) + bzip_dest = open(dest_name, 'w') + bzip = subprocess.Popen(args, stdout=bzip_dest) + logging.info('Saving to %s' % (dest_name, )) + bzip.wait() + +def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1): if output_base_dir is None: output_base_dir = os.getcwd() @@ -400,65 +469,20 @@ def extract_results(runs, output_base_dir=None): # save run file r.save(cycle_dir) - # Copy Summary.htm - summary_path = os.path.join(r.gerald.pathname, 'Summary.htm') - if os.path.exists(summary_path): - logging.info('Copying %s to %s' % (summary_path, cycle_dir)) - shutil.copy(summary_path, cycle_dir) - else: - logging.info('Summary file %s was not found' % (summary_path,)) - + # save summary file + save_summary_file(g, cycle_dir) + # tar score files - score_files = [] - - # check for g.pathname/Temp a new feature of 1.1rc1 - scores_path = g.pathname - scores_path_temp = os.path.join(scores_path, 'Temp') - if os.path.isdir(scores_path_temp): - scores_path = scores_path_temp - - # hopefully we have a directory that contains s_*_score files - for f in os.listdir(scores_path): - if re.match('.*_score.txt', f): - score_files.append(f) - - tar_cmd = ['/bin/tar', 'c'] + score_files - bzip_cmd = [ 'bzip2', '-9', '-c' ] - tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2') - tar_dest = open(tar_dest_name, 'w') - logging.info("Compressing score files from %s" % (scores_path,)) - logging.info("Running tar: " + " ".join(tar_cmd[:10])) - logging.info("Running bzip2: " + " ".join(bzip_cmd)) - logging.info("Writing to %s" %(tar_dest_name)) - - env = {'BZIP': '-9'} - tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env, - cwd=scores_path) - bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest) - tar.wait() - - # copy & bzip eland files - for lanes_dictionary in g.eland_results.results: - for eland_lane in lanes_dictionary.values(): - source_name = eland_lane.pathname - path, name = os.path.split(eland_lane.pathname) - dest_name = os.path.join(cycle_dir, name) - logging.info("Saving eland file %s to %s" % \ - (source_name, dest_name)) - - if is_compressed(name): - logging.info('Already compressed, Saving to %s' % (dest_name, )) - shutil.copy(source_name, dest_name) - else: - # not compressed - dest_name += '.bz2' - args = ['bzip2', '-9', '-c', source_name] - logging.info('Running: %s' % ( " ".join(args) )) - bzip_dest = open(dest_name, 'w') - bzip = subprocess.Popen(args, stdout=bzip_dest) - logging.info('Saving to %s' % (dest_name, )) - bzip.wait() - + compress_score_files(g, cycle_dir) + + # compress eland result files + compress_eland_results(g, cycle_dir) + + # build srf commands + lanes = range(1,9) + srf_cmds = srf.make_commands(r.pathname, lanes, "woldlab", cycle_dir) + srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2) + def rm_list(files, dry_run=True): for f in files: if os.path.exists(f): diff --git a/htsworkflow/pipelines/srf.py b/htsworkflow/pipelines/srf.py new file mode 100644 index 0000000..3809bc6 --- /dev/null +++ b/htsworkflow/pipelines/srf.py @@ -0,0 +1,75 @@ +import logging +import os + +from htsworkflow.util import queuecommands + +SOLEXA2SRF = 0 +ILLUMINA2SRF10 = 1 +ILLUMINA2SRF11 = 2 + +def pathname_to_run_name(base): + """ + Convert a pathname to a base runfolder name + handle the case with a trailing / + """ + name = "" + while len(name) == 0: + base, name = os.path.split(base) + if len(base) == 0: + return None + return name + +def make_commands(run_name, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11): + """ + make a subprocess-friendly list of command line arguments to run solexa2srf + generates files like: + woldlab:080514_HWI-EAS229_0029_20768AAXX:8.srf + site run name lane + + run_name - most of the file name (run folder name is a good choice) + lanes - list of integers corresponding to which lanes to process + site_name - name of your "sequencing site" or "Individual" + destdir - where to write all the srf files + """ + # clean up pathname + logging.info("run_name %s" % ( run_name, )) + + cmd_list = [] + for lane in lanes: + name_prefix = '%s_%%l_%%t_' % (run_name,) + destname = '%s_%s_%d.srf' % (site_name, run_name, lane) + destdir = os.path.normpath(destdir) + dest_path = os.path.join(destdir, destname) + seq_pattern = 's_%d_*_seq.txt' % (lane,) + + if cmdlevel == SOLEXA2SRF: + cmd = ['solexa2srf', + '-N', name_prefix, + '-n', '%3x:%3y', + '-o', dest_path, + seq_pattern] + elif cmdlevel == ILLUMINA2SRF10: + cmd = ['illumina2srf', + '-v1.0', + '-o', dest_path, + seq_pattern] + elif cmdlevel == ILLUMINA2SRF11: + seq_pattern = 's_%d_*_qseq.txt' % (lane,) + cmd = ['illumina2srf', + '-o', dest_path, + seq_pattern] + else: + raise ValueError("Unrecognized run level %d" % (cmdlevel,)) + + logging.info("Generated command: " + " ".join(cmd)) + cmd_list.append(" ".join(cmd)) + return cmd_list + +def run_srf_commands(bustard_dir, cmd_list, num_jobs): + logging.info("chdir to %s" % (bustard_dir,)) + curdir = os.getcwd() + os.chdir(bustard_dir) + q = queuecommands.QueueCommands(cmd_list, num_jobs) + q.run() + os.chdir(curdir) + diff --git a/scripts/runfolder b/scripts/runfolder index 1380fbf..2be3ab7 100644 --- a/scripts/runfolder +++ b/scripts/runfolder @@ -56,26 +56,28 @@ def make_parser(): help='generate run configuration archive') commands.add_option('--extract-results', action='store_true', default=False, - help='create run-xml summary, compress the eland result files, and ' - 'copy them and the Summary.htm file into archival directory.') + help='create run-xml summary, compress the eland result files, build srf files and ' + 'copy all that and the Summary.htm file into an archival directory.') commands.add_option('-c', '--clean', action='store_true', default=False, help='Clean runfolder, preparing it for long-term storage') parser.add_option_group(commands) + parser.add_option('-j', '--max-jobs', default=1, + help='sepcify the maximum number of processes to run ' + '(used in extract-results)') parser.add_option('-o', '--output-dir', default=None, help="specify the default output directory for extract results") - + parser.add_option('--run-xml', dest='run_xml', + default=None, + help='specify a run_.xml file for summary reports') + parser.add_option('--site', default='individual', + help='specify the site name for srf files') parser.add_option('-u', '--use-run', dest='use_run', default=None, help='Specify which run to use instead of autoscanning ' 'the runfolder. You do this by providing the final ' ' GERALD directory, and it assumes the parent ' 'directories are the bustard and image processing ' 'directories.') - - parser.add_option('--run-xml', dest='run_xml', - default=None, - help='specify a run_.xml file for summary reports') - return parser @@ -119,7 +121,7 @@ def main(cmdlist=None): runfolder.extract_run_parameters(runs) command_run = True if opt.extract_results: - runfolder.extract_results(runs, opt.output_dir) + runfolder.extract_results(runs, opt.output_dir, opt.site, opt.max_jobs) command_run = True if opt.clean: runfolder.clean_runs(runs, opt.dry_run) diff --git a/scripts/srf b/scripts/srf index e7478a9..5a06d5d 100644 --- a/scripts/srf +++ b/scripts/srf @@ -1,72 +1,13 @@ #!/usr/bin/python -from glob import glob import logging import optparse import os -import subprocess import sys -from htsworkflow.util import queuecommands from htsworkflow.pipelines import runfolder - -SOLEXA2SRF = 0 -ILLUMINA2SRF10 = 1 -ILLUMINA2SRF11 = 2 - -def make_commands(run_name, lanes, site_name, destdir, cmdlevel=ILLUMINA2SRF11): - """ - make a subprocess-friendly list of command line arguments to run solexa2srf - generates files like: - woldlab:080514_HWI-EAS229_0029_20768AAXX:8.srf - site run name lane - - run_name - most of the file name (run folder name is a good choice) - lanes - list of integers corresponding to which lanes to process - site_name - name of your "sequencing site" or "Individual" - destdir - where to write all the srf files - """ - cmd_list = [] - for lane in lanes: - name_prefix = '%s_%%l_%%t_' % (run_name,) - destname = '%s_%s_%d.srf' % (site_name, run_name, lane) - destdir = os.path.normpath(destdir) - dest_path = os.path.join(destdir, destname) - seq_pattern = 's_%d_*_seq.txt' % (lane,) - - if cmdlevel == SOLEXA2SRF: - cmd = ['solexa2srf', - '-N', name_prefix, - '-n', '%3x:%3y', - '-o', dest_path, - seq_pattern] - elif cmdlevel == ILLUMINA2SRF10: - cmd = ['illumina2srf', - '-v1.0', - '-o', dest_path, - seq_pattern] - elif cmdlevel == ILLUMINA2SRF11: - seq_pattern = 's_%d_*_qseq.txt' % (lane,) - cmd = ['illumina2srf', - '-o', dest_path, - seq_pattern] - else: - raise ValueError("Unrecognized run level %d" % (cmdlevel,)) - - cmd_list.append(" ".join(cmd)) - return cmd_list - -def pathname_to_run_name(base): - """ - Convert a pathname to a base runfolder name - handle the case with a trailing / - """ - name = "" - while len(name) == 0: - base, name = os.path.split(base) - if len(base) == 0: - return None - return name +from htsworkflow.pipelines.srf import make_commands, run_srf_commands +from htsworkflow.pipelines.srf import ILLUMINA2SRF10, ILLUMINA2SRF11, SOLEXA2SRF def make_parser(): usage = '%prog: [options] runfolder -l 1,2,3 [runfolder -l 5,6 ...]' @@ -149,8 +90,7 @@ def main(cmdline=None): for runfolder_path, lanes in zip(args, lanes_list): # normalize paths, either relative to home dirs or current dir runfolder_path = os.path.abspath(runfolder_path) - # the last part of the path should be a runfolder name - name = pathname_to_run_name(runfolder_path) + run_name = pathname_to_run_name(runfolder_path) # so any bustard directories? runs = runfolder.get_runs(runfolder_path) # give up if there are anything other than 1 run @@ -159,18 +99,14 @@ def main(cmdline=None): return 1 elif len(runs) == 1: bustard_dir = runs[0].bustard.pathname - cmds[bustard_dir] = make_commands(name, lanes, opts.site, opts.dest_dir, opts.runfolder_version) + cmds[bustard_dir] = make_commands(run_name, lanes, opts.site, opts.dest_dir, opts.runfolder_version) else: print "ERROR: Couldn't find a bustard directory in", runfolder_path return 1 if not opts.dry_run: for cwd, cmd_list in cmds.items(): - curdir = os.getcwd() - os.chdir(cwd) - q = queuecommands.QueueCommands(cmd_list, opts.jobs) - q.run() - os.chdir(curdir) + run_srf_command(cwd, cmd_list, opts.jobs) else: for cwd, cmd_list in cmds.items(): print cwd -- 2.30.2