X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=blobdiff_plain;f=htsworkflow%2Fpipelines%2Frunfolder.py;h=26c592ce5f7eb6d65175031f9c9010dc400d9410;hp=016ae873b3579966ad40ad8c1da5b3e44bd4aeed;hb=5644e2bfe7c1f7e5af99289cd918f783b61c441a;hpb=82991cfec133d9ea8a33b481dbf6645ba87c4dae diff --git a/htsworkflow/pipelines/runfolder.py b/htsworkflow/pipelines/runfolder.py index 016ae87..26c592c 100644 --- a/htsworkflow/pipelines/runfolder.py +++ b/htsworkflow/pipelines/runfolder.py @@ -1,5 +1,4 @@ -""" -Core information needed to inspect a runfolder. +"""Core information needed to inspect a runfolder. """ from glob import glob import logging @@ -12,18 +11,17 @@ import sys import tarfile import time -try: - from xml.etree import ElementTree -except ImportError, e: - from elementtree import ElementTree - -EUROPEAN_STRPTIME = "%d-%m-%Y" -EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})" -VERSION_RE = "([0-9\.]+)" -USER_RE = "([a-zA-Z0-9]+)" -LANES_PER_FLOWCELL = 8 -LANE_LIST = range(1, LANES_PER_FLOWCELL+1) - +LOGGER = logging.getLogger(__name__) + +from htsworkflow.pipelines import firecrest +from htsworkflow.pipelines import ipar +from htsworkflow.pipelines import bustard +from htsworkflow.pipelines import gerald +from htsworkflow.pipelines import ElementTree, \ + EUROPEAN_STRPTIME, EUROPEAN_DATE_RE, \ + VERSION_RE, USER_RE, \ + LANES_PER_FLOWCELL, LANE_LIST +from htsworkflow.pipelines.samplekey import LANE_SAMPLE_KEYS from htsworkflow.util.alphanum import alphanum from htsworkflow.util.ethelp import indent, flatten from htsworkflow.util.queuecommands import QueueCommands @@ -31,20 +29,42 @@ from htsworkflow.util.queuecommands import QueueCommands from htsworkflow.pipelines import srf class PipelineRun(object): - """ - Capture "interesting" information about a pipeline run + """Capture "interesting" information about a pipeline run + + :Variables: + - `pathname` location of the root of this runfolder + - `serialization_filename` read only property containing name of run xml file + - `flowcell_id` read-only property containing flowcell id (bar code) + - `datadir` location of the runfolder data dir. + - `image_analysis` generic name for Firecrest or IPAR image analysis + - `bustard` summary base caller + - `gerald` summary of sequence alignment and quality control metrics """ XML_VERSION = 1 PIPELINE_RUN = 'PipelineRun' FLOWCELL_ID = 'FlowcellID' - def __init__(self, pathname=None, xml=None): + def __init__(self, pathname=None, flowcell_id=None, xml=None): + """Initialize a PipelineRun object + + :Parameters: + - `pathname` the root directory of this run folder. + - `flowcell_id` the flowcell ID in case it can't be determined + - `xml` Allows initializing an object from a serialized xml file. + + :Types: + - `pathname` str + - `flowcell_id` str + - `ElementTree` str + """ if pathname is not None: self.pathname = os.path.normpath(pathname) else: self.pathname = None self._name = None - self._flowcell_id = None + self._flowcell_id = flowcell_id + self.datadir = None + self.suffix = None self.image_analysis = None self.bustard = None self.gerald = None @@ -53,48 +73,110 @@ class PipelineRun(object): self.set_elements(xml) def _get_flowcell_id(self): + """Return the flowcell ID + + Attempts to find the flowcell ID through several mechanisms. + """ # extract flowcell ID if self._flowcell_id is None: - config_dir = os.path.join(self.pathname, 'Config') - flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml') - if os.path.exists(flowcell_id_path): - flowcell_id_tree = ElementTree.parse(flowcell_id_path) - self._flowcell_id = flowcell_id_tree.findtext('Text') - else: - path_fields = self.pathname.split('_') - if len(path_fields) > 0: - # guessing last element of filename - flowcell_id = path_fields[-1] - else: - flowcell_id = 'unknown' + self._flowcell_id = self._get_flowcell_id_from_runinfo() + if self._flowcell_id is None: + self._flowcell_id = self._get_flowcell_id_from_flowcellid() + if self._flowcell_id is None: + self._flowcell_id = self._get_flowcell_id_from_path() + if self._flowcell_id is None: + self._flowcell_id = 'unknown' + + LOGGER.warning( + "Flowcell id was not found, guessing %s" % ( + self._flowcell_id)) - logging.warning( - "Flowcell id was not found, guessing %s" % ( - flowcell_id)) - self._flowcell_id = flowcell_id return self._flowcell_id flowcell_id = property(_get_flowcell_id) - def get_elements(self): + def _get_flowcell_id_from_flowcellid(self): + """Extract flowcell id from a Config/FlowcellId.xml file + + :return: flowcell_id or None if not found """ - make one master xml file from all of our sub-components. + config_dir = os.path.join(self.pathname, 'Config') + flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml') + if os.path.exists(flowcell_id_path): + flowcell_id_tree = ElementTree.parse(flowcell_id_path) + return flowcell_id_tree.findtext('Text') + + def _get_flowcell_id_from_runinfo(self): + """Read RunInfo file for flowcell id + + :return: flowcell_id or None if not found + """ + runinfo = os.path.join(self.pathname, 'RunInfo.xml') + if os.path.exists(runinfo): + tree = ElementTree.parse(runinfo) + root = tree.getroot() + fc_nodes = root.xpath('/RunInfo/Run/Flowcell') + if len(fc_nodes) == 1: + return fc_nodes[0].text + + def _get_flowcell_id_from_path(self): + """Guess a flowcell name from the path + + :return: flowcell_id or None if not found + """ + path_fields = self.pathname.split('_') + if len(path_fields) > 0: + # guessing last element of filename + return path_fields[-1] + + def _get_runfolder_name(self): + if self.gerald: + return self.gerald.runfolder_name + elif hasattr(self.image_analysis, 'runfolder_name'): + return self.image_analysis.runfolder_name + else: + return None + runfolder_name = property(_get_runfolder_name) + + def _get_run_dirname(self): + """Return name of directory to hold result files from one analysis + + For pre-multiplexing runs this is just the cycle range C1-123 + For post-multiplexing runs the "suffix" that we add to + differentiate runs will be added to the range. + E.g. Unaligned_6mm may produce C1-200_6mm + """ + if self.image_analysis is None: + raise ValueError("Not initialized yet") + start = self.image_analysis.start + stop = self.image_analysis.stop + cycle_fragment = "C%d-%d" % (start, stop) + if self.suffix: + cycle_fragment += self.suffix + + return cycle_fragment + run_dirname = property(_get_run_dirname) + + def get_elements(self): + """make one master xml file from all of our sub-components. + + :return: an ElementTree containing all available pipeline + run xml compoents. """ root = ElementTree.Element(PipelineRun.PIPELINE_RUN) flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID) flowcell.text = self.flowcell_id root.append(self.image_analysis.get_elements()) root.append(self.bustard.get_elements()) - root.append(self.gerald.get_elements()) + if self.gerald: + root.append(self.gerald.get_elements()) return root def set_elements(self, tree): - # this file gets imported by all the others, - # so we need to hide the imports to avoid a cyclic imports - from htsworkflow.pipelines import firecrest - from htsworkflow.pipelines import ipar - from htsworkflow.pipelines import bustard - from htsworkflow.pipelines import gerald + """Initialize a PipelineRun object from an run.xml ElementTree. + :param tree: parsed ElementTree + :type tree: ElementTree + """ tag = tree.tag.lower() if tag != PipelineRun.PIPELINE_RUN.lower(): raise ValueError('Pipeline Run Expecting %s got %s' % ( @@ -113,31 +195,53 @@ class PipelineRun(object): self.bustard = bustard.Bustard(xml=element) elif tag == gerald.Gerald.GERALD.lower(): self.gerald = gerald.Gerald(xml=element) + elif tag == gerald.CASAVA.GERALD.lower(): + self.gerald = gerald.CASAVA(xml=element) else: - logging.warn('PipelineRun unrecognized tag %s' % (tag,)) - - def _get_run_name(self): - """ - Given a run tuple, find the latest date and use that as our name + LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,)) + + def _get_serialization_filename(self): + """Compute the filename for the run xml file + + Attempts to find the latest date from all of the run + components. + + :return: filename run_{flowcell id}_{timestamp}.xml + :rtype: str """ if self._name is None: - tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time) + components = [self.image_analysis, self.bustard, self.gerald] + tmax = max([ c.time for c in components if c ]) timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax)) - self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml' + self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml' return self._name - name = property(_get_run_name) + serialization_filename = property(_get_serialization_filename) def save(self, destdir=None): + """Save a run xml file. + + :param destdir: Directory name to save too, uses current directory + if not specified. + :type destdir: str + """ if destdir is None: destdir = '' - logging.info("Saving run report "+ self.name) + LOGGER.info("Saving run report " + self.serialization_filename) xml = self.get_elements() indent(xml) - dest_pathname = os.path.join(destdir, self.name) + dest_pathname = os.path.join(destdir, self.serialization_filename) ElementTree.ElementTree(xml).write(dest_pathname) def load(self, filename): - logging.info("Loading run report from " + filename) + """Load a run xml into this object. + + :Parameters: + - `filename` location of a run xml file + + :Types: + - `filename` str + """ + LOGGER.info("Loading run report from " + filename) tree = ElementTree.parse(filename).getroot() self.set_elements(tree) @@ -145,8 +249,8 @@ def load_pipeline_run_xml(pathname): """ Load and instantiate a Pipeline run from a run xml file - :Parameters: - - `pathname` : location of an run xml file + :Parameters: + - `pathname` location of an run xml file :Returns: initialized PipelineRun object """ @@ -154,76 +258,151 @@ def load_pipeline_run_xml(pathname): run = PipelineRun(xml=tree) return run -def get_runs(runfolder): - """ - Search through a run folder for all the various sub component runs - and then return a PipelineRun for each different combination. +def get_runs(runfolder, flowcell_id=None): + """Find all runs associated with a runfolder. + + We end up with multiple analysis runs as we sometimes + need to try with different parameters. This attempts + to return a list of all the various runs. For example if there are two different GERALD runs, this will generate two different PipelineRun objects, that differ in there gerald component. """ - from htsworkflow.pipelines import firecrest - from htsworkflow.pipelines import ipar - from htsworkflow.pipelines import bustard - from htsworkflow.pipelines import gerald - - def scan_post_image_analysis(runs, runfolder, image_analysis, pathname): - logging.info("Looking for bustard directories in %s" % (pathname,)) - bustard_dirs = glob(os.path.join(pathname, "Bustard*")) - # RTA BaseCalls looks enough like Bustard. - bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls"))) - for bustard_pathname in bustard_dirs: - logging.info("Found bustard directory %s" % (bustard_pathname,)) - b = bustard.bustard(bustard_pathname) - gerald_glob = os.path.join(bustard_pathname, 'GERALD*') - logging.info("Looking for gerald directories in %s" % (pathname,)) - for gerald_pathname in glob(gerald_glob): - logging.info("Found gerald directory %s" % (gerald_pathname,)) - try: - g = gerald.gerald(gerald_pathname) - p = PipelineRun(runfolder) - p.image_analysis = image_analysis - p.bustard = b - p.gerald = g - runs.append(p) - except IOError, e: - logging.error("Ignoring " + str(e)) - datadir = os.path.join(runfolder, 'Data') - logging.info('Searching for runs in ' + datadir) + LOGGER.info('Searching for runs in ' + datadir) runs = [] # scan for firecrest directories - for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")): - logging.info('Found firecrest in ' + datadir) + for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")): + LOGGER.info('Found firecrest in ' + datadir) image_analysis = firecrest.firecrest(firecrest_pathname) if image_analysis is None: - logging.warn( + LOGGER.warn( "%s is an empty or invalid firecrest directory" % (firecrest_pathname,) ) else: scan_post_image_analysis( - runs, runfolder, image_analysis, firecrest_pathname + runs, runfolder, datadir, image_analysis, firecrest_pathname, flowcell_id ) # scan for IPAR directories ipar_dirs = glob(os.path.join(datadir, "IPAR_*")) # The Intensities directory from the RTA software looks a lot like IPAR ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities'))) for ipar_pathname in ipar_dirs: - logging.info('Found ipar directories in ' + datadir) + LOGGER.info('Found ipar directories in ' + datadir) image_analysis = ipar.ipar(ipar_pathname) if image_analysis is None: - logging.warn( - "%s is an empty or invalid IPAR directory" %(ipar_pathname,) + LOGGER.warn( + "%s is an empty or invalid IPAR directory" % (ipar_pathname,) ) else: scan_post_image_analysis( - runs, runfolder, image_analysis, ipar_pathname + runs, runfolder, datadir, image_analysis, ipar_pathname, flowcell_id ) return runs +def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, + pathname, flowcell_id): + added = build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id) + # If we're a multiplexed run, don't look for older run type. + if added > 0: + return + + LOGGER.info("Looking for bustard directories in %s" % (pathname,)) + bustard_dirs = glob(os.path.join(pathname, "Bustard*")) + # RTA BaseCalls looks enough like Bustard. + bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls"))) + for bustard_pathname in bustard_dirs: + LOGGER.info("Found bustard directory %s" % (bustard_pathname,)) + b = bustard.bustard(bustard_pathname) + build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, + runfolder, flowcell_id) + + +def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder, + flowcell_id): + start = len(runs) + gerald_glob = os.path.join(bustard_pathname, 'GERALD*') + LOGGER.info("Looking for gerald directories in %s" % (pathname,)) + for gerald_pathname in glob(gerald_glob): + LOGGER.info("Found gerald directory %s" % (gerald_pathname,)) + try: + g = gerald.gerald(gerald_pathname) + p = PipelineRun(runfolder, flowcell_id) + p.datadir = datadir + p.image_analysis = image_analysis + p.bustard = b + p.gerald = g + runs.append(p) + except IOError, e: + LOGGER.error("Ignoring " + str(e)) + return len(runs) - start + + +def build_hiseq_runs(image_analysis, runs, datadir, runfolder, flowcell_id): + start = len(runs) + aligned_glob = os.path.join(runfolder, 'Aligned*') + unaligned_glob = os.path.join(runfolder, 'Unaligned*') + + aligned_paths = glob(aligned_glob) + unaligned_paths = glob(unaligned_glob) + + matched_paths = hiseq_match_aligned_unaligned(aligned_paths, unaligned_paths) + LOGGER.debug("Matched HiSeq analysis: %s", str(matched_paths)) + + for aligned, unaligned, suffix in matched_paths: + if unaligned is None: + LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned) + continue + + try: + p = PipelineRun(runfolder, flowcell_id) + p.datadir = datadir + p.suffix = suffix + p.image_analysis = image_analysis + p.bustard = bustard.bustard(unaligned) + assert p.bustard + if aligned: + p.gerald = gerald.gerald(aligned) + runs.append(p) + except IOError, e: + LOGGER.error("Ignoring " + str(e)) + return len(runs) - start + +def hiseq_match_aligned_unaligned(aligned, unaligned): + """Match aligned and unaligned folders from seperate lists + """ + unaligned_suffix_re = re.compile('Unaligned(?P[\w]*)') + + aligned_by_suffix = build_dir_dict_by_suffix('Aligned', aligned) + unaligned_by_suffix = build_dir_dict_by_suffix('Unaligned', unaligned) + + keys = set(aligned_by_suffix.keys()).union(set(unaligned_by_suffix.keys())) + + matches = [] + for key in keys: + a = aligned_by_suffix.get(key) + u = unaligned_by_suffix.get(key) + matches.append((a, u, key)) + return matches + +def build_dir_dict_by_suffix(prefix, dirnames): + """Build a dictionary indexed by suffix of last directory name. + + It assumes a constant prefix + """ + regex = re.compile('%s(?P[\w]*)' % (prefix,)) + + by_suffix = {} + for absname in dirnames: + basename = os.path.basename(absname) + match = regex.match(basename) + if match: + by_suffix[match.group('suffix')] = absname + return by_suffix + def get_specific_run(gerald_dir): """ Given a gerald directory, construct a PipelineRun out of its parents @@ -241,21 +420,21 @@ def get_specific_run(gerald_dir): bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..')) image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..')) - runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..')) - - logging.info('--- use-run detected options ---') - logging.info('runfolder: %s' % (runfolder_dir,)) - logging.info('image_dir: %s' % (image_dir,)) - logging.info('bustard_dir: %s' % (bustard_dir,)) - logging.info('gerald_dir: %s' % (gerald_dir,)) + runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..')) + + LOGGER.info('--- use-run detected options ---') + LOGGER.info('runfolder: %s' % (runfolder_dir,)) + LOGGER.info('image_dir: %s' % (image_dir,)) + LOGGER.info('bustard_dir: %s' % (bustard_dir,)) + LOGGER.info('gerald_dir: %s' % (gerald_dir,)) # find our processed image dir image_run = None # split into parent, and leaf directory # leaf directory should be an IPAR or firecrest directory data_dir, short_image_dir = os.path.split(image_dir) - logging.info('data_dir: %s' % (data_dir,)) - logging.info('short_iamge_dir: %s' %(short_image_dir,)) + LOGGER.info('data_dir: %s' % (data_dir,)) + LOGGER.info('short_iamge_dir: %s' % (short_image_dir,)) # guess which type of image processing directory we have by looking # in the leaf directory name @@ -266,30 +445,30 @@ def get_specific_run(gerald_dir): elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None: image_run = ipar.ipar(image_dir) - # if we din't find a run, report the error and return + # if we din't find a run, report the error and return if image_run is None: msg = '%s does not contain an image processing step' % (image_dir,) - logging.error(msg) + LOGGER.error(msg) return None # find our base calling base_calling_run = bustard.bustard(bustard_dir) if base_calling_run is None: - logging.error('%s does not contain a bustard run' % (bustard_dir,)) + LOGGER.error('%s does not contain a bustard run' % (bustard_dir,)) return None # find alignments gerald_run = gerald.gerald(gerald_dir) if gerald_run is None: - logging.error('%s does not contain a gerald run' % (gerald_dir,)) + LOGGER.error('%s does not contain a gerald run' % (gerald_dir,)) return None p = PipelineRun(runfolder_dir) p.image_analysis = image_run p.bustard = base_calling_run p.gerald = gerald_run - - logging.info('Constructed PipelineRun from %s' % (gerald_dir,)) + + LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,)) return p def extract_run_parameters(runs): @@ -309,7 +488,7 @@ def summarize_mapped_reads(genome_map, mapped_reads): genome = 'unknown' for k, v in mapped_reads.items(): path, k = os.path.split(k) - if len(path) > 0 and not genome_map.has_key(path): + if len(path) > 0 and path not in genome_map: genome = path genome_reads += v else: @@ -319,37 +498,40 @@ def summarize_mapped_reads(genome_map, mapped_reads): def summarize_lane(gerald, lane_id): report = [] - summary_results = gerald.summary.lane_results - for end in range(len(summary_results)): - eland_result = gerald.eland_results.results[end][lane_id] - report.append("Sample name %s" % (eland_result.sample_name)) - report.append("Lane id %s end %s" % (eland_result.lane_id, end)) - if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id): - cluster = summary_results[end][eland_result.lane_id].cluster - report.append("Clusters %d +/- %d" % (cluster[0], cluster[1])) - report.append("Total Reads: %d" % (eland_result.reads)) - - if hasattr(eland_result, 'match_codes'): - mc = eland_result.match_codes - nm = mc['NM'] - nm_percent = float(nm)/eland_result.reads * 100 - qc = mc['QC'] - qc_percent = float(qc)/eland_result.reads * 100 - - report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent)) - report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent)) - report.append('Unique (0,1,2 mismatches) %d %d %d' % \ - (mc['U0'], mc['U1'], mc['U2'])) - report.append('Repeat (0,1,2 mismatches) %d %d %d' % \ - (mc['R0'], mc['R1'], mc['R2'])) - - if hasattr(eland_result, 'genome_map'): - report.append("Mapped Reads") - mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads) - for name, counts in mapped_reads.items(): + lane_results = gerald.summary.lane_results + eland_result = gerald.eland_results[lane_id] + report.append("Sample name %s" % (eland_result.sample_name)) + report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read)) + + if lane_id.read < len(lane_results) and \ + lane_id.lane in lane_results[lane_id.read]: + summary_results = lane_results[lane_id.read][lane_id.lane] + cluster = summary_results.cluster + report.append("Clusters %d +/- %d" % (cluster[0], cluster[1])) + report.append("Total Reads: %d" % (eland_result.reads)) + + if hasattr(eland_result, 'match_codes'): + mc = eland_result.match_codes + nm = mc['NM'] + nm_percent = float(nm) / eland_result.reads * 100 + qc = mc['QC'] + qc_percent = float(qc) / eland_result.reads * 100 + + report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent)) + report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent)) + report.append('Unique (0,1,2 mismatches) %d %d %d' % \ + (mc['U0'], mc['U1'], mc['U2'])) + report.append('Repeat (0,1,2 mismatches) %d %d %d' % \ + (mc['R0'], mc['R1'], mc['R2'])) + + if hasattr(eland_result, 'genome_map'): + report.append("Mapped Reads") + mapped_reads = summarize_mapped_reads(eland_result.genome_map, + eland_result.mapped_reads) + for name, counts in mapped_reads.items(): report.append(" %s: %d" % (name, counts)) - report.append('') + report.append('') return report def summary_report(runs): @@ -357,18 +539,21 @@ def summary_report(runs): Summarize cluster numbers and mapped read counts for a runfolder """ report = [] + eland_keys = [] for run in runs: # print a run name? - report.append('Summary for %s' % (run.name,)) + report.append('Summary for %s' % (run.serialization_filename,)) # sort the report - eland_keys = run.gerald.eland_results.results[0].keys() - eland_keys.sort(alphanum) + if run.gerald: + eland_keys = sorted(run.gerald.eland_results.keys()) + else: + report.append("Alignment not done, no report possible") - for lane_id in eland_keys: - report.extend(summarize_lane(run.gerald, lane_id)) - report.append('---') - report.append('') - return os.linesep.join(report) + for lane_id in eland_keys: + report.extend(summarize_lane(run.gerald, lane_id)) + report.append('---') + report.append('') + return os.linesep.join(report) def is_compressed(filename): if os.path.splitext(filename)[1] == ".gz": @@ -378,19 +563,19 @@ def is_compressed(filename): else: return False -def save_flowcell_reports(data_dir, cycle_dir): +def save_flowcell_reports(data_dir, run_dirname): """ Save the flowcell quality reports """ data_dir = os.path.abspath(data_dir) status_file = os.path.join(data_dir, 'Status.xml') reports_dir = os.path.join(data_dir, 'reports') - reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2') + reports_dest = os.path.join(run_dirname, 'flowcell-reports.tar.bz2') if os.path.exists(reports_dir): cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ] if os.path.exists(status_file): cmd_list.extend(['Status.xml', 'Status.xsl']) - logging.info("Saving reports from " + reports_dir) + LOGGER.info("Saving reports from " + reports_dir) cwd = os.getcwd() os.chdir(data_dir) q = QueueCommands([" ".join(cmd_list)]) @@ -398,16 +583,21 @@ def save_flowcell_reports(data_dir, cycle_dir): os.chdir(cwd) -def save_summary_file(gerald_object, cycle_dir): +def save_summary_file(pipeline, run_dirname): # Copy Summary.htm - summary_path = os.path.join(gerald_object.pathname, 'Summary.htm') - if os.path.exists(summary_path): - logging.info('Copying %s to %s' % (summary_path, cycle_dir)) - shutil.copy(summary_path, cycle_dir) + gerald_object = pipeline.gerald + gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm') + status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm') + if os.path.exists(gerald_summary): + LOGGER.info('Copying %s to %s' % (gerald_summary, run_dirname)) + shutil.copy(gerald_summary, run_dirname) + elif os.path.exists(status_files_summary): + LOGGER.info('Copying %s to %s' % (status_files_summary, run_dirname)) + shutil.copy(status_files_summary, run_dirname) else: - logging.info('Summary file %s was not found' % (summary_path,)) + LOGGER.info('Summary file %s was not found' % (summary_path,)) -def save_ivc_plot(bustard_object, cycle_dir): +def save_ivc_plot(bustard_object, run_dirname): """ Save the IVC page and its supporting images """ @@ -415,21 +605,21 @@ def save_ivc_plot(bustard_object, cycle_dir): plot_image_path = os.path.join(bustard_object.pathname, 'Plots') plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png') - plot_target_path = os.path.join(cycle_dir, 'Plots') + plot_target_path = os.path.join(run_dirname, 'Plots') if os.path.exists(plot_html): - logging.debug("Saving %s" % ( plot_html, )) - logging.debug("Saving %s" % ( plot_images, )) - shutil.copy(plot_html, cycle_dir) + LOGGER.debug("Saving %s" % (plot_html,)) + LOGGER.debug("Saving %s" % (plot_images,)) + shutil.copy(plot_html, run_dirname) if not os.path.exists(plot_target_path): - os.mkdir(plot_target_path) + os.mkdir(plot_target_path) for plot_file in glob(plot_images): shutil.copy(plot_file, plot_target_path) else: - logging.warning('Missing IVC.html file, not archiving') + LOGGER.warning('Missing IVC.html file, not archiving') -def compress_score_files(bustard_object, cycle_dir): +def compress_score_files(bustard_object, run_dirname): """ Compress score files into our result directory """ @@ -447,12 +637,12 @@ def compress_score_files(bustard_object, cycle_dir): tar_cmd = ['tar', 'c'] + score_files bzip_cmd = [ 'bzip2', '-9', '-c' ] - tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2') + tar_dest_name = os.path.join(run_dirname, 'scores.tar.bz2') tar_dest = open(tar_dest_name, 'w') - logging.info("Compressing score files from %s" % (scores_path,)) - logging.info("Running tar: " + " ".join(tar_cmd[:10])) - logging.info("Running bzip2: " + " ".join(bzip_cmd)) - logging.info("Writing to %s" %(tar_dest_name,)) + LOGGER.info("Compressing score files from %s" % (scores_path,)) + LOGGER.info("Running tar: " + " ".join(tar_cmd[:10])) + LOGGER.info("Running bzip2: " + " ".join(bzip_cmd)) + LOGGER.info("Writing to %s" % (tar_dest_name,)) env = {'BZIP': '-9'} tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env, @@ -461,45 +651,45 @@ def compress_score_files(bustard_object, cycle_dir): tar.wait() -def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): +def compress_eland_results(gerald_object, run_dirname, num_jobs=1): """ Compress eland result files into the archive directory """ # copy & bzip eland files bz_commands = [] - - for lanes_dictionary in gerald_object.eland_results.results: - for eland_lane in lanes_dictionary.values(): - source_name = eland_lane.pathname + + for key in gerald_object.eland_results: + eland_lane = gerald_object.eland_results[key] + for source_name in eland_lane.pathnames: if source_name is None: - logging.info( + LOGGER.info( "Lane ID %s does not have a filename." % (eland_lane.lane_id,)) else: path, name = os.path.split(source_name) - dest_name = os.path.join(cycle_dir, name) - logging.info("Saving eland file %s to %s" % \ + dest_name = os.path.join(run_dirname, name) + LOGGER.info("Saving eland file %s to %s" % \ (source_name, dest_name)) - + if is_compressed(name): - logging.info('Already compressed, Saving to %s' % (dest_name, )) + LOGGER.info('Already compressed, Saving to %s' % (dest_name,)) shutil.copy(source_name, dest_name) else: # not compressed dest_name += '.bz2' args = ['bzip2', '-9', '-c', source_name, '>', dest_name ] bz_commands.append(" ".join(args)) - #logging.info('Running: %s' % ( " ".join(args) )) + #LOGGER.info('Running: %s' % ( " ".join(args) )) #bzip_dest = open(dest_name, 'w') #bzip = subprocess.Popen(args, stdout=bzip_dest) - #logging.info('Saving to %s' % (dest_name, )) + #LOGGER.info('Saving to %s' % (dest_name, )) #bzip.wait() - + if len(bz_commands) > 0: q = QueueCommands(bz_commands, num_jobs) q.run() - - -def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1): + + +def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None): """ Iterate over runfolders in runs extracting the most useful information. * run parameters (in run-*.xml) @@ -512,78 +702,102 @@ def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1): output_base_dir = os.getcwd() for r in runs: - result_dir = os.path.join(output_base_dir, r.flowcell_id) - logging.info("Using %s as result directory" % (result_dir,)) - if not os.path.exists(result_dir): - os.mkdir(result_dir) - - # create cycle_dir - cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop) - logging.info("Filling in %s" % (cycle,)) - cycle_dir = os.path.join(result_dir, cycle) - cycle_dir = os.path.abspath(cycle_dir) - if os.path.exists(cycle_dir): - logging.error("%s already exists, not overwriting" % (cycle_dir,)) - continue - else: - os.mkdir(cycle_dir) - - # save run file - r.save(cycle_dir) - - # save illumina flowcell status report - save_flowcell_reports( os.path.join(r.image_analysis.pathname, '..'), cycle_dir ) - - # save stuff from bustard - # grab IVC plot - save_ivc_plot(r.bustard, cycle_dir) - - # build base call saving commands - if site is not None: - lanes = [] - for lane in range(1,9): - if r.gerald.lanes[lane].analysis != 'none': - lanes.append(lane) - - run_name = srf.pathname_to_run_name(r.pathname) - srf_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir) - srf.run_commands(r.bustard.pathname, srf_cmds, num_jobs) - - # save stuff from GERALD - # copy stuff out of the main run - g = r.gerald - - # save summary file - save_summary_file(g, cycle_dir) - - # compress eland result files - compress_eland_results(g, cycle_dir, num_jobs) - - # md5 all the compressed files once we're done - md5_commands = srf.make_md5_commands(cycle_dir) - srf.run_commands(cycle_dir, md5_commands, num_jobs) - + result_dir = os.path.join(output_base_dir, r.flowcell_id) + LOGGER.info("Using %s as result directory" % (result_dir,)) + if not os.path.exists(result_dir): + os.mkdir(result_dir) + + # create directory to add this runs results to + LOGGER.info("Filling in %s" % (r.run_dirname,)) + run_dirname = os.path.join(result_dir, r.run_dirname) + run_dirname = os.path.abspath(run_dirname) + if os.path.exists(run_dirname): + LOGGER.error("%s already exists, not overwriting" % (run_dirname,)) + continue + else: + os.mkdir(run_dirname) + + # save run file + r.save(run_dirname) + + # save illumina flowcell status report + save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), + run_dirname) + + # save stuff from bustard + # grab IVC plot + save_ivc_plot(r.bustard, run_dirname) + + # build base call saving commands + if site is not None: + save_raw_data(num_jobs, r, site, raw_format, run_dirname) + + # save stuff from GERALD + # copy stuff out of the main run + if r.gerald: + g = r.gerald + + # save summary file + save_summary_file(r, run_dirname) + + # compress eland result files + compress_eland_results(g, run_dirname, num_jobs) + + # md5 all the compressed files once we're done + md5_commands = srf.make_md5_commands(run_dirname) + srf.run_commands(run_dirname, md5_commands, num_jobs) + +def save_raw_data(num_jobs, r, site, raw_format, run_dirname): + lanes = [] + if r.gerald: + for lane in r.gerald.lanes: + lane_parameters = r.gerald.lanes.get(lane, None) + if lane_parameters is not None: + lanes.append(lane) + else: + # assume default list of lanes + lanes = LANE_SAMPLE_KEYS + + run_name = srf.pathname_to_run_name(r.pathname) + seq_cmds = [] + if raw_format is None: + raw_format = r.bustard.sequence_format + + LOGGER.info("Raw Format is: %s" % (raw_format, )) + if raw_format == 'fastq': + LOGGER.info("Reading fastq files from %s", r.bustard.pathname) + rawpath = os.path.join(r.pathname, r.bustard.pathname) + LOGGER.info("raw data = %s" % (rawpath,)) + srf.copy_hiseq_project_fastqs(run_name, rawpath, site, run_dirname) + elif raw_format == 'qseq': + seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, run_dirname) + elif raw_format == 'srf': + seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, run_dirname, 0) + else: + raise ValueError('Unknown --raw-format=%s' % (raw_format)) + srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs) + def rm_list(files, dry_run=True): for f in files: if os.path.exists(f): - logging.info('deleting %s' % (f,)) + LOGGER.info('deleting %s' % (f,)) if not dry_run: if os.path.isdir(f): shutil.rmtree(f) else: os.unlink(f) else: - logging.warn("%s doesn't exist."% (f,)) + LOGGER.warn("%s doesn't exist." % (f,)) def clean_runs(runs, dry_run=True): """ Clean up run folders to optimize for compression. """ if dry_run: - logging.info('In dry-run mode') + LOGGER.info('In dry-run mode') for run in runs: - logging.info('Cleaninging %s' % (run.pathname,)) + LOGGER.info('Cleaninging %s' % (run.pathname,)) # rm RunLog*.xml runlogs = glob(os.path.join(run.pathname, 'RunLog*xml')) rm_list(runlogs, dry_run) @@ -599,14 +813,22 @@ def clean_runs(runs, dry_run=True): calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*')) rm_list(calibration_dir, dry_run) # rm Images/L* - logging.info("Cleaning images") + LOGGER.info("Cleaning images") image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*')) rm_list(image_dirs, dry_run) - # cd Data/C1-*_Firecrest* - logging.info("Cleaning intermediate files") + # rm ReadPrep + LOGGER.info("Cleaning ReadPrep*") + read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*')) + rm_list(read_prep_dirs, dry_run) + # rm ReadPrep + LOGGER.info("Cleaning Thubmnail_images") + thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images')) + rm_list(thumbnail_dirs, dry_run) + # make clean_intermediate + logging.info("Cleaning intermediate files") if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')): - clean_process = subprocess.Popen(['make', 'clean_intermediate'], + clean_process = subprocess.Popen(['make', 'clean_intermediate'], cwd=run.image_analysis.pathname,) clean_process.wait()