--- /dev/null
+"""
+Core information needed to inspect a runfolder.
+"""
+from glob import glob
+import logging
+import os
+import re
+import stat
+import subprocess
+import sys
+import time
+
+try:
+ from xml.etree import ElementTree
+except ImportError, e:
+ from elementtree import ElementTree
+
+EUROPEAN_STRPTIME = "%d-%m-%Y"
+EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
+VERSION_RE = "([0-9\.]+)"
+USER_RE = "([a-zA-Z0-9]+)"
+LANES_PER_FLOWCELL = 8
+
+from gaworkflow.util.alphanum import alphanum
+from gaworkflow.util.ethelp import indent, flatten
+
+
+class PipelineRun(object):
+ """
+ Capture "interesting" information about a pipeline run
+ """
+ XML_VERSION = 1
+ PIPELINE_RUN = 'PipelineRun'
+ FLOWCELL_ID = 'FlowcellID'
+
+ def __init__(self, pathname=None, firecrest=None, bustard=None, gerald=None, xml=None):
+ self.pathname = os.path.normpath(pathname)
+ self._name = None
+ self._flowcell_id = None
+ self.firecrest = firecrest
+ self.bustard = bustard
+ self.gerald = gerald
+
+ if xml is not None:
+ self.set_elements(xml)
+
+ def _get_flowcell_id(self):
+ # extract flowcell ID
+ if self._flowcell_id is None:
+ config_dir = os.path.join(self.pathname, 'Config')
+ flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
+ if os.path.exists(flowcell_id_path):
+ flowcell_id_tree = ElementTree.parse(flowcell_id_path)
+ self._flowcell_id = flowcell_id_tree.findtext('Text')
+ else:
+ path_fields = self.pathname.split('_')
+ if len(path_fields) > 0:
+ # guessing last element of filename
+ flowcell_id = path_fields[-1]
+ else:
+ flowcell_id = 'unknown'
+
+ logging.warning(
+ "Flowcell id was not found, guessing %s" % (
+ flowcell_id))
+ self._flowcell_id = flowcell_id
+ return self._flowcell_id
+ flowcell_id = property(_get_flowcell_id)
+
+ def get_elements(self):
+ """
+ make one master xml file from all of our sub-components.
+ """
+ root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
+ flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
+ flowcell.text = self.flowcell_id
+ root.append(self.firecrest.get_elements())
+ root.append(self.bustard.get_elements())
+ root.append(self.gerald.get_elements())
+ return root
+
+ def set_elements(self, tree):
+ # this file gets imported by all the others,
+ # so we need to hide the imports to avoid a cyclic imports
+ from gaworkflow.pipeline import firecrest
+ from gaworkflow.pipeline import bustard
+ from gaworkflow.pipeline import gerald
+
+ tag = tree.tag.lower()
+ if tag != PipelineRun.PIPELINE_RUN.lower():
+ raise ValueError('Pipeline Run Expecting %s got %s' % (
+ PipelineRun.PIPELINE_RUN, tag))
+ for element in tree:
+ tag = element.tag.lower()
+ if tag == PipelineRun.FLOWCELL_ID.lower():
+ self._flowcell_id = element.text
+ #ok the xword.Xword.XWORD pattern for module.class.constant is lame
+ elif tag == firecrest.Firecrest.FIRECREST.lower():
+ self.firecrest = firecrest.Firecrest(xml=element)
+ elif tag == bustard.Bustard.BUSTARD.lower():
+ self.bustard = bustard.Bustard(xml=element)
+ elif tag == gerald.Gerald.GERALD.lower():
+ self.gerald = gerald.Gerald(xml=element)
+ else:
+ logging.warn('PipelineRun unrecognized tag %s' % (tag,))
+
+ def _get_run_name(self):
+ """
+ Given a run tuple, find the latest date and use that as our name
+ """
+ if self._name is None:
+ tmax = max(self.firecrest.time, self.bustard.time, self.gerald.time)
+ timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
+ self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
+ return self._name
+ name = property(_get_run_name)
+
+ def save(self, destdir=None):
+ if destdir is None:
+ destdir = ''
+ logging.info("Saving run report "+ self.name)
+ xml = self.get_elements()
+ indent(xml)
+ dest_pathname = os.path.join(destdir, self.name)
+ ElementTree.ElementTree(xml).write(dest_pathname)
+
+ def load(self, filename):
+ logging.info("Loading run report from " + filename)
+ tree = ElementTree.parse(filename).getroot()
+ self.set_elements(tree)
+
+def get_runs(runfolder):
+ """
+ Search through a run folder for all the various sub component runs
+ and then return a PipelineRun for each different combination.
+
+ For example if there are two different GERALD runs, this will
+ generate two different PipelineRun objects, that differ
+ in there gerald component.
+ """
+ from gaworkflow.pipeline import firecrest
+ from gaworkflow.pipeline import bustard
+ from gaworkflow.pipeline import gerald
+
+ datadir = os.path.join(runfolder, 'Data')
+
+ logging.info('Searching for runs in ' + datadir)
+ runs = []
+ for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
+ f = firecrest.firecrest(firecrest_pathname)
+ bustard_glob = os.path.join(firecrest_pathname, "Bustard*")
+ for bustard_pathname in glob(bustard_glob):
+ b = bustard.bustard(bustard_pathname)
+ gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
+ for gerald_pathname in glob(gerald_glob):
+ try:
+ g = gerald.gerald(gerald_pathname)
+ runs.append(PipelineRun(runfolder, f, b, g))
+ except IOError, e:
+ print "Ignoring", str(e)
+ return runs
+
+
+def extract_run_parameters(runs):
+ """
+ Search through runfolder_path for various runs and grab their parameters
+ """
+ for run in runs:
+ run.save()
+
+def summarize_mapped_reads(mapped_reads):
+ """
+ Summarize per chromosome reads into a genome count
+ But handle spike-in/contamination symlinks seperately.
+ """
+ summarized_reads = {}
+ genome_reads = 0
+ genome = 'unknown'
+ for k, v in mapped_reads.items():
+ path, k = os.path.split(k)
+ if len(path) > 0:
+ genome = path
+ genome_reads += v
+ else:
+ summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
+ summarized_reads[genome] = genome_reads
+ return summarized_reads
+
+def summary_report(runs):
+ """
+ Summarize cluster numbers and mapped read counts for a runfolder
+ """
+ report = []
+ for run in runs:
+ # print a run name?
+ report.append('Summary for %s' % (run.name,))
+ # sort the report
+ eland_keys = run.gerald.eland_results.results.keys()
+ eland_keys.sort(alphanum)
+
+ lane_results = run.gerald.summary.lane_results
+ for lane_id in eland_keys:
+ result = run.gerald.eland_results.results[lane_id]
+ report.append("Sample name %s" % (result.sample_name))
+ report.append("Lane id %s" % (result.lane_id,))
+ cluster = lane_results[result.lane_id].cluster
+ report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
+ report.append("Total Reads: %d" % (result.reads))
+ mc = result._match_codes
+ report.append("No Match: %d" % (mc['NM']))
+ report.append("QC Failed: %d" % (mc['QC']))
+ report.append('Unique (0,1,2 mismatches) %d %d %d' % \
+ (mc['U0'], mc['U1'], mc['U2']))
+ report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
+ (mc['R0'], mc['R1'], mc['R2']))
+ report.append("Mapped Reads")
+ mapped_reads = summarize_mapped_reads(result.mapped_reads)
+ for name, counts in mapped_reads.items():
+ report.append(" %s: %d" % (name, counts))
+ report.append('---')
+ report.append('')
+ return os.linesep.join(report)
+
+def extract_results(runs, output_base_dir=None):
+ if output_base_dir is None:
+ output_base_dir = os.getcwd()
+
+ for r in runs:
+ result_dir = os.path.join(output_base_dir, r.flowcell_id)
+ logging.info("Using %s as result directory" % (result_dir,))
+ if not os.path.exists(result_dir):
+ os.mkdir(result_dir)
+
+ # create cycle_dir
+ cycle = "C%d-%d" % (r.firecrest.start, r.firecrest.stop)
+ logging.info("Filling in %s" % (cycle,))
+ cycle_dir = os.path.join(result_dir, cycle)
+ if os.path.exists(cycle_dir):
+ logging.error("%s already exists, not overwriting" % (cycle_dir,))
+ continue
+ else:
+ os.mkdir(cycle_dir)
+
+ # copy stuff out of the main run
+ g = r.gerald
+
+ # save run file
+ r.save(cycle_dir)
+
+ # tar score files
+ score_files = []
+ for f in os.listdir(g.pathname):
+ if re.match('.*_score.txt', f):
+ score_files.append(f)
+
+ tar_cmd = ['/bin/tar', 'c'] + score_files
+ bzip_cmd = [ 'bzip2', '-9', '-c' ]
+ tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
+ tar_dest = open(tar_dest_name, 'w')
+ logging.info("Compressing score files in %s" % (g.pathname,))
+ logging.info("Running tar: " + " ".join(tar_cmd[:10]))
+ logging.info("Running bzip2: " + " ".join(bzip_cmd))
+ logging.info("Writing to %s" %(tar_dest_name))
+
+ tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
+ bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
+ tar.wait()
+
+ # copy & bzip eland files
+ for eland_lane in g.eland_results.values():
+ source_name = eland_lane.pathname
+ path, name = os.path.split(eland_lane.pathname)
+ dest_name = os.path.join(cycle_dir, name+'.bz2')
+
+ args = ['bzip2', '-9', '-c', source_name]
+ logging.info('Running: %s' % ( " ".join(args) ))
+ bzip_dest = open(dest_name, 'w')
+ bzip = subprocess.Popen(args, stdout=bzip_dest)
+ logging.info('Saving to %s' % (dest_name, ))
+ bzip.wait()
+
+