X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=blobdiff_plain;f=htsworkflow%2Fpipelines%2Frunfolder.py;h=26c592ce5f7eb6d65175031f9c9010dc400d9410;hp=e857c3e6ae04a5ab044cac5c105d5b187e87eafc;hb=5644e2bfe7c1f7e5af99289cd918f783b61c441a;hpb=d4fbf453f2e06a033f8ac1c503585e70124ee69b diff --git a/htsworkflow/pipelines/runfolder.py b/htsworkflow/pipelines/runfolder.py index e857c3e..26c592c 100644 --- a/htsworkflow/pipelines/runfolder.py +++ b/htsworkflow/pipelines/runfolder.py @@ -1,5 +1,4 @@ -""" -Core information needed to inspect a runfolder. +"""Core information needed to inspect a runfolder. """ from glob import glob import logging @@ -12,17 +11,17 @@ import sys import tarfile import time -import lxml.etree as ElementTree - LOGGER = logging.getLogger(__name__) -EUROPEAN_STRPTIME = "%d-%m-%Y" -EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})" -VERSION_RE = "([0-9\.]+)" -USER_RE = "([a-zA-Z0-9]+)" -LANES_PER_FLOWCELL = 8 -LANE_LIST = range(1, LANES_PER_FLOWCELL + 1) - +from htsworkflow.pipelines import firecrest +from htsworkflow.pipelines import ipar +from htsworkflow.pipelines import bustard +from htsworkflow.pipelines import gerald +from htsworkflow.pipelines import ElementTree, \ + EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \ + VERSION_RE, USER_RE, \ + LANES_PER_FLOWCELL, LANE_LIST +from htsworkflow.pipelines.samplekey import LANE_SAMPLE_KEYS from htsworkflow.util.alphanum import alphanum from htsworkflow.util.ethelp import indent, flatten from htsworkflow.util.queuecommands import QueueCommands @@ -30,14 +29,34 @@ from htsworkflow.util.queuecommands import QueueCommands from htsworkflow.pipelines import srf class PipelineRun(object): - """ - Capture "interesting" information about a pipeline run + """Capture "interesting" information about a pipeline run + + :Variables: + - `pathname` location of the root of this runfolder + - `serialization_filename` read only property containing name of run xml file + - `flowcell_id` read-only property containing flowcell id (bar code) + - `datadir` location of the runfolder data dir. + - `image_analysis` generic name for Firecrest or IPAR image analysis + - `bustard` summary base caller + - `gerald` summary of sequence alignment and quality control metrics """ XML_VERSION = 1 PIPELINE_RUN = 'PipelineRun' FLOWCELL_ID = 'FlowcellID' def __init__(self, pathname=None, flowcell_id=None, xml=None): + """Initialize a PipelineRun object + + :Parameters: + - `pathname` the root directory of this run folder. + - `flowcell_id` the flowcell ID in case it can't be determined + - `xml` Allows initializing an object from a serialized xml file. + + :Types: + - `pathname` str + - `flowcell_id` str + - `ElementTree` str + """ if pathname is not None: self.pathname = os.path.normpath(pathname) else: @@ -45,6 +64,7 @@ class PipelineRun(object): self._name = None self._flowcell_id = flowcell_id self.datadir = None + self.suffix = None self.image_analysis = None self.bustard = None self.gerald = None @@ -53,55 +73,110 @@ class PipelineRun(object): self.set_elements(xml) def _get_flowcell_id(self): + """Return the flowcell ID + + Attempts to find the flowcell ID through several mechanisms. + """ # extract flowcell ID if self._flowcell_id is None: - config_dir = os.path.join(self.pathname, 'Config') - flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml') - if os.path.exists(flowcell_id_path): - flowcell_id_tree = ElementTree.parse(flowcell_id_path) - self._flowcell_id = flowcell_id_tree.findtext('Text') - else: - path_fields = self.pathname.split('_') - if len(path_fields) > 0: - # guessing last element of filename - self._flowcell_id = path_fields[-1] - else: - self._flowcell_id = 'unknown' + self._flowcell_id = self._get_flowcell_id_from_runinfo() + if self._flowcell_id is None: + self._flowcell_id = self._get_flowcell_id_from_flowcellid() + if self._flowcell_id is None: + self._flowcell_id = self._get_flowcell_id_from_path() + if self._flowcell_id is None: + self._flowcell_id = 'unknown' - LOGGER.warning( - "Flowcell id was not found, guessing %s" % ( - self._flowcell_id)) + LOGGER.warning( + "Flowcell id was not found, guessing %s" % ( + self._flowcell_id)) return self._flowcell_id flowcell_id = property(_get_flowcell_id) + def _get_flowcell_id_from_flowcellid(self): + """Extract flowcell id from a Config/FlowcellId.xml file + + :return: flowcell_id or None if not found + """ + config_dir = os.path.join(self.pathname, 'Config') + flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml') + if os.path.exists(flowcell_id_path): + flowcell_id_tree = ElementTree.parse(flowcell_id_path) + return flowcell_id_tree.findtext('Text') + + def _get_flowcell_id_from_runinfo(self): + """Read RunInfo file for flowcell id + + :return: flowcell_id or None if not found + """ + runinfo = os.path.join(self.pathname, 'RunInfo.xml') + if os.path.exists(runinfo): + tree = ElementTree.parse(runinfo) + root = tree.getroot() + fc_nodes = root.xpath('/RunInfo/Run/Flowcell') + if len(fc_nodes) == 1: + return fc_nodes[0].text + + def _get_flowcell_id_from_path(self): + """Guess a flowcell name from the path + + :return: flowcell_id or None if not found + """ + path_fields = self.pathname.split('_') + if len(path_fields) > 0: + # guessing last element of filename + return path_fields[-1] + def _get_runfolder_name(self): - if self.gerald is None: - return None - else: + if self.gerald: return self.gerald.runfolder_name + elif hasattr(self.image_analysis, 'runfolder_name'): + return self.image_analysis.runfolder_name + else: + return None runfolder_name = property(_get_runfolder_name) - def get_elements(self): + def _get_run_dirname(self): + """Return name of directory to hold result files from one analysis + + For pre-multiplexing runs this is just the cycle range C1-123 + For post-multiplexing runs the "suffix" that we add to + differentiate runs will be added to the range. + E.g. Unaligned_6mm may produce C1-200_6mm """ - make one master xml file from all of our sub-components. + if self.image_analysis is None: + raise ValueError("Not initialized yet") + start = self.image_analysis.start + stop = self.image_analysis.stop + cycle_fragment = "C%d-%d" % (start, stop) + if self.suffix: + cycle_fragment += self.suffix + + return cycle_fragment + run_dirname = property(_get_run_dirname) + + def get_elements(self): + """make one master xml file from all of our sub-components. + + :return: an ElementTree containing all available pipeline + run xml compoents. """ root = ElementTree.Element(PipelineRun.PIPELINE_RUN) flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID) flowcell.text = self.flowcell_id root.append(self.image_analysis.get_elements()) root.append(self.bustard.get_elements()) - root.append(self.gerald.get_elements()) + if self.gerald: + root.append(self.gerald.get_elements()) return root def set_elements(self, tree): - # this file gets imported by all the others, - # so we need to hide the imports to avoid a cyclic imports - from htsworkflow.pipelines import firecrest - from htsworkflow.pipelines import ipar - from htsworkflow.pipelines import bustard - from htsworkflow.pipelines import gerald + """Initialize a PipelineRun object from an run.xml ElementTree. + :param tree: parsed ElementTree + :type tree: ElementTree + """ tag = tree.tag.lower() if tag != PipelineRun.PIPELINE_RUN.lower(): raise ValueError('Pipeline Run Expecting %s got %s' % ( @@ -125,27 +200,47 @@ class PipelineRun(object): else: LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,)) - def _get_run_name(self): - """ - Given a run tuple, find the latest date and use that as our name + def _get_serialization_filename(self): + """Compute the filename for the run xml file + + Attempts to find the latest date from all of the run + components. + + :return: filename run_{flowcell id}_{timestamp}.xml + :rtype: str """ if self._name is None: - tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time) + components = [self.image_analysis, self.bustard, self.gerald] + tmax = max([ c.time for c in components if c ]) timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax)) self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml' return self._name - name = property(_get_run_name) + serialization_filename = property(_get_serialization_filename) def save(self, destdir=None): + """Save a run xml file. + + :param destdir: Directory name to save too, uses current directory + if not specified. + :type destdir: str + """ if destdir is None: destdir = '' - LOGGER.info("Saving run report " + self.name) + LOGGER.info("Saving run report " + self.serialization_filename) xml = self.get_elements() indent(xml) - dest_pathname = os.path.join(destdir, self.name) + dest_pathname = os.path.join(destdir, self.serialization_filename) ElementTree.ElementTree(xml).write(dest_pathname) def load(self, filename): + """Load a run xml into this object. + + :Parameters: + - `filename` location of a run xml file + + :Types: + - `filename` str + """ LOGGER.info("Loading run report from " + filename) tree = ElementTree.parse(filename).getroot() self.set_elements(tree) @@ -155,7 +250,7 @@ def load_pipeline_run_xml(pathname): Load and instantiate a Pipeline run from a run xml file :Parameters: - - `pathname` : location of an run xml file + - `pathname` location of an run xml file :Returns: initialized PipelineRun object """ @@ -164,63 +259,16 @@ def load_pipeline_run_xml(pathname): return run def get_runs(runfolder, flowcell_id=None): - """ - Search through a run folder for all the various sub component runs - and then return a PipelineRun for each different combination. + """Find all runs associated with a runfolder. + + We end up with multiple analysis runs as we sometimes + need to try with different parameters. This attempts + to return a list of all the various runs. For example if there are two different GERALD runs, this will generate two different PipelineRun objects, that differ in there gerald component. """ - from htsworkflow.pipelines import firecrest - from htsworkflow.pipelines import ipar - from htsworkflow.pipelines import bustard - from htsworkflow.pipelines import gerald - - def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname): - LOGGER.info("Looking for bustard directories in %s" % (pathname,)) - bustard_dirs = glob(os.path.join(pathname, "Bustard*")) - # RTA BaseCalls looks enough like Bustard. - bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls"))) - for bustard_pathname in bustard_dirs: - LOGGER.info("Found bustard directory %s" % (bustard_pathname,)) - b = bustard.bustard(bustard_pathname) - build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder) - - build_aligned_runs(image_analysis, runs, b, datadir, runfolder) - - def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder): - gerald_glob = os.path.join(bustard_pathname, 'GERALD*') - LOGGER.info("Looking for gerald directories in %s" % (pathname,)) - for gerald_pathname in glob(gerald_glob): - LOGGER.info("Found gerald directory %s" % (gerald_pathname,)) - try: - g = gerald.gerald(gerald_pathname) - p = PipelineRun(runfolder, flowcell_id) - p.datadir = datadir - p.image_analysis = image_analysis - p.bustard = b - p.gerald = g - runs.append(p) - except IOError, e: - LOGGER.error("Ignoring " + str(e)) - - - def build_aligned_runs(image_analysis, runs, b, datadir, runfolder): - aligned_glob = os.path.join(runfolder, 'Aligned*') - for aligned in glob(aligned_glob): - LOGGER.info("Found aligned directory %s" % (aligned,)) - try: - g = gerald.HiSeq(aligned) - p = PipelineRun(runfolder, flowcell_id) - p.datadir = datadir - p.image_analysis = image_analysis - p.bustard = b - p.gerald = g - runs.append(p) - except IOError, e: - LOGGER.error("Ignoring " + str(e)) - datadir = os.path.join(runfolder, 'Data') LOGGER.info('Searching for runs in ' + datadir) @@ -235,7 +283,7 @@ def get_runs(runfolder, flowcell_id=None): ) else: scan_post_image_analysis( - runs, runfolder, datadir, image_analysis, firecrest_pathname + runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id ) # scan for IPAR directories ipar_dirs = glob(os.path.join(datadir, "IPAR_*")) @@ -250,11 +298,111 @@ def get_runs(runfolder, flowcell_id=None): ) else: scan_post_image_analysis( - runs, runfolder, datadir, image_analysis, ipar_pathname + runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id ) return runs +def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, + pathname, flowcell_id): + added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id) + # If we're a multiplexed run, don't look for older run type. + if added > 0: + return + + LOGGER.info("Looking for bustard directories in %s" % (pathname,)) + bustard_dirs = glob(os.path.join(pathname, "Bustard*")) + # RTA BaseCalls looks enough like Bustard. + bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls"))) + for bustard_pathname in bustard_dirs: + LOGGER.info("Found bustard directory %s" % (bustard_pathname,)) + b = bustard.bustard(bustard_pathname) + build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, + runfolder, flowcell_id) + + +def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder, + flowcell_id): + start = len(runs) + gerald_glob = os.path.join(bustard_pathname, 'GERALD*') + LOGGER.info("Looking for gerald directories in %s" % (pathname,)) + for gerald_pathname in glob(gerald_glob): + LOGGER.info("Found gerald directory %s" % (gerald_pathname,)) + try: + g = gerald.gerald(gerald_pathname) + p = PipelineRun(runfolder, flowcell_id) + p.datadir = datadir + p.image_analysis = image_analysis + p.bustard = b + p.gerald = g + runs.append(p) + except IOError, e: + LOGGER.error("Ignoring " + str(e)) + return len(runs) - start + + +def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id): + start = len(runs) + aligned_glob = os.path.join(runfolder, 'Aligned*') + unaligned_glob = os.path.join(runfolder, 'Unaligned*') + + aligned_paths = glob(aligned_glob) + unaligned_paths = glob(unaligned_glob) + + matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths) + LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths)) + + for aligned, unaligned, suffix in matched_paths: + if unaligned is None: + LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned) + continue + + try: + p = PipelineRun(runfolder, flowcell_id) + p.datadir = datadir + p.suffix = suffix + p.image_analysis = image_analysis + p.bustard = bustard.bustard(unaligned) + assert p.bustard + if aligned: + p.gerald = gerald.gerald(aligned) + runs.append(p) + except IOError, e: + LOGGER.error("Ignoring " + str(e)) + return len(runs) - start + +def hiseq_match_aligned_unaligned(aligned, unaligned): + """Match aligned and unaligned folders from seperate lists + """ + unaligned_suffix_re = re.compile('Unaligned(?P[\w]*)') + + aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned) + unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned) + + keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys())) + + matches = [] + for key in keys: + a = aligned_by_suffix.get(key) + u = unaligned_by_suffix.get(key) + matches.append((a, u, key)) + return matches + +def build_dir_dict_by_suffix(prefix, dirnames): + """Build a dictionary indexed by suffix of last directory name. + + It assumes a constant prefix + """ + regex = re.compile('%s(?P[\w]*)' % (prefix,)) + + by_suffix = {} + for absname in dirnames: + basename = os.path.basename(absname) + match = regex.match(basename) + if match: + by_suffix[match.group('suffix')] = absname + return by_suffix + def get_specific_run(gerald_dir): """ Given a gerald directory, construct a PipelineRun out of its parents @@ -340,7 +488,7 @@ def summarize_mapped_reads(genome_map, mapped_reads): genome = 'unknown' for k, v in mapped_reads.items(): path, k = os.path.split(k) - if len(path) > 0 and not genome_map.has_key(path): + if len(path) > 0 and path not in genome_map: genome = path genome_reads += v else: @@ -350,37 +498,40 @@ def summarize_mapped_reads(genome_map, mapped_reads): def summarize_lane(gerald, lane_id): report = [] - summary_results = gerald.summary.lane_results - for end in range(len(summary_results)): - eland_result = gerald.eland_results.results[end][lane_id] - report.append("Sample name %s" % (eland_result.sample_name)) - report.append("Lane id %s end %s" % (eland_result.lane_id, end)) - if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id): - cluster = summary_results[end][eland_result.lane_id].cluster - report.append("Clusters %d +/- %d" % (cluster[0], cluster[1])) - report.append("Total Reads: %d" % (eland_result.reads)) - - if hasattr(eland_result, 'match_codes'): - mc = eland_result.match_codes - nm = mc['NM'] - nm_percent = float(nm) / eland_result.reads * 100 - qc = mc['QC'] - qc_percent = float(qc) / eland_result.reads * 100 - - report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent)) - report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent)) - report.append('Unique (0,1,2 mismatches) %d %d %d' % \ - (mc['U0'], mc['U1'], mc['U2'])) - report.append('Repeat (0,1,2 mismatches) %d %d %d' % \ - (mc['R0'], mc['R1'], mc['R2'])) - - if hasattr(eland_result, 'genome_map'): - report.append("Mapped Reads") - mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads) - for name, counts in mapped_reads.items(): + lane_results = gerald.summary.lane_results + eland_result = gerald.eland_results[lane_id] + report.append("Sample name %s" % (eland_result.sample_name)) + report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read)) + + if lane_id.read < len(lane_results) and \ + lane_id.lane in lane_results[lane_id.read]: + summary_results = lane_results[lane_id.read][lane_id.lane] + cluster = summary_results.cluster + report.append("Clusters %d +/- %d" % (cluster[0], cluster[1])) + report.append("Total Reads: %d" % (eland_result.reads)) + + if hasattr(eland_result, 'match_codes'): + mc = eland_result.match_codes + nm = mc['NM'] + nm_percent = float(nm) / eland_result.reads * 100 + qc = mc['QC'] + qc_percent = float(qc) / eland_result.reads * 100 + + report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent)) + report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent)) + report.append('Unique (0,1,2 mismatches) %d %d %d' % \ + (mc['U0'], mc['U1'], mc['U2'])) + report.append('Repeat (0,1,2 mismatches) %d %d %d' % \ + (mc['R0'], mc['R1'], mc['R2'])) + + if hasattr(eland_result, 'genome_map'): + report.append("Mapped Reads") + mapped_reads = summarize_mapped_reads(eland_result.genome_map, + eland_result.mapped_reads) + for name, counts in mapped_reads.items(): report.append(" %s: %d" % (name, counts)) - report.append('') + report.append('') return report def summary_report(runs): @@ -388,18 +539,21 @@ def summary_report(runs): Summarize cluster numbers and mapped read counts for a runfolder """ report = [] + eland_keys = [] for run in runs: # print a run name? - report.append('Summary for %s' % (run.name,)) + report.append('Summary for %s' % (run.serialization_filename,)) # sort the report - eland_keys = run.gerald.eland_results.results[0].keys() - eland_keys.sort(alphanum) + if run.gerald: + eland_keys = sorted(run.gerald.eland_results.keys()) + else: + report.append("Alignment not done, no report possible") - for lane_id in eland_keys: - report.extend(summarize_lane(run.gerald, lane_id)) - report.append('---') - report.append('') - return os.linesep.join(report) + for lane_id in eland_keys: + report.extend(summarize_lane(run.gerald, lane_id)) + report.append('---') + report.append('') + return os.linesep.join(report) def is_compressed(filename): if os.path.splitext(filename)[1] == ".gz": @@ -409,14 +563,14 @@ def is_compressed(filename): else: return False -def save_flowcell_reports(data_dir, cycle_dir): +def save_flowcell_reports(data_dir, run_dirname): """ Save the flowcell quality reports """ data_dir = os.path.abspath(data_dir) status_file = os.path.join(data_dir, 'Status.xml') reports_dir = os.path.join(data_dir, 'reports') - reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2') + reports_dest = os.path.join(run_dirname, 'flowcell-reports.tar.bz2') if os.path.exists(reports_dir): cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ] if os.path.exists(status_file): @@ -429,21 +583,21 @@ def save_flowcell_reports(data_dir, cycle_dir): os.chdir(cwd) -def save_summary_file(pipeline, cycle_dir): +def save_summary_file(pipeline, run_dirname): # Copy Summary.htm gerald_object = pipeline.gerald gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm') status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm') if os.path.exists(gerald_summary): - LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir)) - shutil.copy(gerald_summary, cycle_dir) + LOGGER.info('Copying %s to %s' % (gerald_summary, run_dirname)) + shutil.copy(gerald_summary, run_dirname) elif os.path.exists(status_files_summary): - LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir)) - shutil.copy(status_files_summary, cycle_dir) + LOGGER.info('Copying %s to %s' % (status_files_summary, run_dirname)) + shutil.copy(status_files_summary, run_dirname) else: LOGGER.info('Summary file %s was not found' % (summary_path,)) -def save_ivc_plot(bustard_object, cycle_dir): +def save_ivc_plot(bustard_object, run_dirname): """ Save the IVC page and its supporting images """ @@ -451,12 +605,12 @@ def save_ivc_plot(bustard_object, cycle_dir): plot_image_path = os.path.join(bustard_object.pathname, 'Plots') plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png') - plot_target_path = os.path.join(cycle_dir, 'Plots') + plot_target_path = os.path.join(run_dirname, 'Plots') if os.path.exists(plot_html): LOGGER.debug("Saving %s" % (plot_html,)) LOGGER.debug("Saving %s" % (plot_images,)) - shutil.copy(plot_html, cycle_dir) + shutil.copy(plot_html, run_dirname) if not os.path.exists(plot_target_path): os.mkdir(plot_target_path) for plot_file in glob(plot_images): @@ -465,7 +619,7 @@ def save_ivc_plot(bustard_object, cycle_dir): LOGGER.warning('Missing IVC.html file, not archiving') -def compress_score_files(bustard_object, cycle_dir): +def compress_score_files(bustard_object, run_dirname): """ Compress score files into our result directory """ @@ -483,7 +637,7 @@ def compress_score_files(bustard_object, cycle_dir): tar_cmd = ['tar', 'c'] + score_files bzip_cmd = [ 'bzip2', '-9', '-c' ] - tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2') + tar_dest_name = os.path.join(run_dirname, 'scores.tar.bz2') tar_dest = open(tar_dest_name, 'w') LOGGER.info("Compressing score files from %s" % (scores_path,)) LOGGER.info("Running tar: " + " ".join(tar_cmd[:10])) @@ -497,7 +651,7 @@ def compress_score_files(bustard_object, cycle_dir): tar.wait() -def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): +def compress_eland_results(gerald_object, run_dirname, num_jobs=1): """ Compress eland result files into the archive directory """ @@ -512,7 +666,7 @@ def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): "Lane ID %s does not have a filename." % (eland_lane.lane_id,)) else: path, name = os.path.split(source_name) - dest_name = os.path.join(cycle_dir, name) + dest_name = os.path.join(run_dirname, name) LOGGER.info("Saving eland file %s to %s" % \ (source_name, dest_name)) @@ -535,7 +689,7 @@ def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): q.run() -def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'): +def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None): """ Iterate over runfolders in runs extracting the most useful information. * run parameters (in run-*.xml) @@ -548,68 +702,80 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, r output_base_dir = os.getcwd() for r in runs: - result_dir = os.path.join(output_base_dir, r.flowcell_id) - LOGGER.info("Using %s as result directory" % (result_dir,)) - if not os.path.exists(result_dir): - os.mkdir(result_dir) - - # create cycle_dir - cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop) - LOGGER.info("Filling in %s" % (cycle,)) - cycle_dir = os.path.join(result_dir, cycle) - cycle_dir = os.path.abspath(cycle_dir) - if os.path.exists(cycle_dir): - LOGGER.error("%s already exists, not overwriting" % (cycle_dir,)) - continue - else: - os.mkdir(cycle_dir) - - # save run file - r.save(cycle_dir) - - # save illumina flowcell status report - save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir) - - # save stuff from bustard - # grab IVC plot - save_ivc_plot(r.bustard, cycle_dir) - - # build base call saving commands - if site is not None: - lanes = [] - for lane in range(1, 9): - lane_parameters = r.gerald.lanes.get(lane, None) - if lane_parameters is not None and lane_parameters.analysis != 'none': - lanes.append(lane) - - run_name = srf.pathname_to_run_name(r.pathname) - seq_cmds = [] - LOGGER.info("Raw Format is: %s" % (raw_format, )) - if raw_format == 'fastq': - rawpath = os.path.join(r.pathname, r.gerald.runfolder_name) - LOGGER.info("raw data = %s" % (rawpath,)) - srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir) - elif raw_format == 'qseq': - seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir) - elif raw_format == 'srf': - seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0) + result_dir = os.path.join(output_base_dir, r.flowcell_id) + LOGGER.info("Using %s as result directory" % (result_dir,)) + if not os.path.exists(result_dir): + os.mkdir(result_dir) + + # create directory to add this runs results to + LOGGER.info("Filling in %s" % (r.run_dirname,)) + run_dirname = os.path.join(result_dir, r.run_dirname) + run_dirname = os.path.abspath(run_dirname) + if os.path.exists(run_dirname): + LOGGER.error("%s already exists, not overwriting" % (run_dirname,)) + continue else: - raise ValueError('Unknown --raw-format=%s' % (raw_format)) - srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs) + os.mkdir(run_dirname) + + # save run file + r.save(run_dirname) - # save stuff from GERALD - # copy stuff out of the main run - g = r.gerald + # save illumina flowcell status report + save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), + run_dirname) - # save summary file - save_summary_file(r, cycle_dir) + # save stuff from bustard + # grab IVC plot + save_ivc_plot(r.bustard, run_dirname) - # compress eland result files - compress_eland_results(g, cycle_dir, num_jobs) + # build base call saving commands + if site is not None: + save_raw_data(num_jobs, r, site, raw_format, run_dirname) - # md5 all the compressed files once we're done - md5_commands = srf.make_md5_commands(cycle_dir) - srf.run_commands(cycle_dir, md5_commands, num_jobs) + # save stuff from GERALD + # copy stuff out of the main run + if r.gerald: + g = r.gerald + + # save summary file + save_summary_file(r, run_dirname) + + # compress eland result files + compress_eland_results(g, run_dirname, num_jobs) + + # md5 all the compressed files once we're done + md5_commands = srf.make_md5_commands(run_dirname) + srf.run_commands(run_dirname, md5_commands, num_jobs) + +def save_raw_data(num_jobs, r, site, raw_format, run_dirname): + lanes = [] + if r.gerald: + for lane in r.gerald.lanes: + lane_parameters = r.gerald.lanes.get(lane, None) + if lane_parameters is not None: + lanes.append(lane) + else: + # assume default list of lanes + lanes = LANE_SAMPLE_KEYS + + run_name = srf.pathname_to_run_name(r.pathname) + seq_cmds = [] + if raw_format is None: + raw_format = r.bustard.sequence_format + + LOGGER.info("Raw Format is: %s" % (raw_format, )) + if raw_format == 'fastq': + LOGGER.info("Reading fastq files from %s", r.bustard.pathname) + rawpath = os.path.join(r.pathname, r.bustard.pathname) + LOGGER.info("raw data = %s" % (rawpath,)) + srf.copy_hiseq_project_fastqs(run_name, rawpath, site, run_dirname) + elif raw_format == 'qseq': + seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, run_dirname) + elif raw_format == 'srf': + seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, run_dirname, 0) + else: + raise ValueError('Unknown --raw-format=%s' % (raw_format)) + srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs) def rm_list(files, dry_run=True): for f in files: