-"""
-Core information needed to inspect a runfolder.
+"""Core information needed to inspect a runfolder.
"""
from glob import glob
import logging
from htsworkflow.pipelines import srf
class PipelineRun(object):
- """
- Capture "interesting" information about a pipeline run
+ """Capture "interesting" information about a pipeline run
+
+ :Variables:
+ - `pathname` location of the root of this runfolder
+ - `name` read only property containing name of run xml file
+ - `flowcell_id` read-only property containing flowcell id (bar code)
+ - `datadir` location of the runfolder data dir.
+ - `image_analysis` generic name for Firecrest or IPAR image analysis
+ - `bustard` summary base caller
+ - `gerald` summary of sequence alignment and quality control metrics
"""
XML_VERSION = 1
PIPELINE_RUN = 'PipelineRun'
FLOWCELL_ID = 'FlowcellID'
def __init__(self, pathname=None, flowcell_id=None, xml=None):
+ """Initialize a PipelineRun object
+
+ :Parameters:
+ - `pathname` the root directory of this run folder.
+ - `flowcell_id` the flowcell ID in case it can't be determined
+ - `xml` Allows initializing an object from a serialized xml file.
+
+ :Types:
+ - `pathname` str
+ - `flowcell_id` str
+ - `ElementTree` str
+ """
if pathname is not None:
self.pathname = os.path.normpath(pathname)
else:
self.set_elements(xml)
def _get_flowcell_id(self):
+ """Return the flowcell ID
+
+ Attempts to find the flowcell ID through several mechanisms.
+ """
# extract flowcell ID
if self._flowcell_id is None:
self._flowcell_id = self._get_flowcell_id_from_runinfo()
def _get_flowcell_id_from_flowcellid(self):
"""Extract flowcell id from a Config/FlowcellId.xml file
+
+ :return: flowcell_id or None if not found
"""
config_dir = os.path.join(self.pathname, 'Config')
flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
def _get_flowcell_id_from_runinfo(self):
"""Read RunInfo file for flowcell id
+
+ :return: flowcell_id or None if not found
"""
runinfo = os.path.join(self.pathname, 'RunInfo.xml')
if os.path.exists(runinfo):
if len(fc_nodes) == 1:
return fc_nodes[0].text
-
def _get_flowcell_id_from_path(self):
"""Guess a flowcell name from the path
+
+ :return: flowcell_id or None if not found
"""
path_fields = self.pathname.split('_')
if len(path_fields) > 0:
return self.gerald.runfolder_name
runfolder_name = property(_get_runfolder_name)
- def get_elements(self):
+ def _get_run_id(self):
+ """Return a identifer for a run.
+
+ For pre-multiplexing runs this is just the cycle range C1-123
+ For post-multiplexing runs the "suffix" that we add to
+ differentiate runs will be added to the range.
+ E.g. Unaligned_6mm may produce C1-200_6mm
"""
- make one master xml file from all of our sub-components.
+ pass
+
+ def get_elements(self):
+ """make one master xml file from all of our sub-components.
+
+ :return: an ElementTree containing all available pipeline
+ run xml compoents.
"""
root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
flowcell.text = self.flowcell_id
root.append(self.image_analysis.get_elements())
root.append(self.bustard.get_elements())
- root.append(self.gerald.get_elements())
+ if self.gerald:
+ root.append(self.gerald.get_elements())
return root
def set_elements(self, tree):
- # this file gets imported by all the others,
- # so we need to hide the imports to avoid a cyclic imports
+ """Initialize a PipelineRun object from an run.xml ElementTree.
+ :param tree: parsed ElementTree
+ :type tree: ElementTree
+ """
tag = tree.tag.lower()
if tag != PipelineRun.PIPELINE_RUN.lower():
raise ValueError('Pipeline Run Expecting %s got %s' % (
LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
def _get_run_name(self):
- """
- Given a run tuple, find the latest date and use that as our name
+ """Compute the run name for the run xml file
+
+ Attempts to find the latest date from all of the run
+ components.
+
+ :return: run xml name
+ :rtype: str
"""
if self._name is None:
tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
name = property(_get_run_name)
def save(self, destdir=None):
+ """Save a run xml file.
+
+ :param destdir: Directory name to save too, uses current directory
+ if not specified.
+ :type destdir: str
+ """
if destdir is None:
destdir = ''
LOGGER.info("Saving run report " + self.name)
ElementTree.ElementTree(xml).write(dest_pathname)
def load(self, filename):
+ """Load a run xml into this object.
+
+ :Parameters:
+ - `filename` location of a run xml file
+
+ :Types:
+ - `filename` str
+ """
LOGGER.info("Loading run report from " + filename)
tree = ElementTree.parse(filename).getroot()
self.set_elements(tree)
Load and instantiate a Pipeline run from a run xml file
:Parameters:
- - `pathname` : location of an run xml file
+ - `pathname` location of an run xml file
:Returns: initialized PipelineRun object
"""
return run
def get_runs(runfolder, flowcell_id=None):
- """
- Search through a run folder for all the various sub component runs
- and then return a PipelineRun for each different combination.
+ """Find all runs associated with a runfolder.
+
+ We end up with multiple analysis runs as we sometimes
+ need to try with different parameters. This attempts
+ to return a list of all the various runs.
For example if there are two different GERALD runs, this will
generate two different PipelineRun objects, that differ
in there gerald component.
"""
- from htsworkflow.pipelines import firecrest
- from htsworkflow.pipelines import ipar
- from htsworkflow.pipelines import bustard
- from htsworkflow.pipelines import gerald
-
datadir = os.path.join(runfolder, 'Data')
LOGGER.info('Searching for runs in ' + datadir)
LOGGER.warn("Aligned directory %s without matching unalinged, skipping", aligned)
continue
- g = gerald.gerald(aligned)
print "scan for aligned then remove them from unaligned list"
try:
p = PipelineRun(runfolder, flowcell_id)