2 Core information needed to inspect a runfolder.
15 from xml.etree import ElementTree
16 except ImportError, e:
17 from elementtree import ElementTree
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
29 class PipelineRun(object):
31 Capture "interesting" information about a pipeline run
34 PIPELINE_RUN = 'PipelineRun'
35 FLOWCELL_ID = 'FlowcellID'
37 def __init__(self, pathname=None, firecrest=None, bustard=None, gerald=None, xml=None):
38 if pathname is not None:
39 self.pathname = os.path.normpath(pathname)
43 self._flowcell_id = None
44 self.firecrest = firecrest
45 self.bustard = bustard
49 self.set_elements(xml)
51 def _get_flowcell_id(self):
53 if self._flowcell_id is None:
54 config_dir = os.path.join(self.pathname, 'Config')
55 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
56 if os.path.exists(flowcell_id_path):
57 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
58 self._flowcell_id = flowcell_id_tree.findtext('Text')
60 path_fields = self.pathname.split('_')
61 if len(path_fields) > 0:
62 # guessing last element of filename
63 flowcell_id = path_fields[-1]
65 flowcell_id = 'unknown'
68 "Flowcell id was not found, guessing %s" % (
70 self._flowcell_id = flowcell_id
71 return self._flowcell_id
72 flowcell_id = property(_get_flowcell_id)
74 def get_elements(self):
76 make one master xml file from all of our sub-components.
78 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
79 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
80 flowcell.text = self.flowcell_id
81 root.append(self.firecrest.get_elements())
82 root.append(self.bustard.get_elements())
83 root.append(self.gerald.get_elements())
86 def set_elements(self, tree):
87 # this file gets imported by all the others,
88 # so we need to hide the imports to avoid a cyclic imports
89 from htsworkflow.pipelines import firecrest
90 from htsworkflow.pipelines import bustard
91 from htsworkflow.pipelines import gerald
93 tag = tree.tag.lower()
94 if tag != PipelineRun.PIPELINE_RUN.lower():
95 raise ValueError('Pipeline Run Expecting %s got %s' % (
96 PipelineRun.PIPELINE_RUN, tag))
98 tag = element.tag.lower()
99 if tag == PipelineRun.FLOWCELL_ID.lower():
100 self._flowcell_id = element.text
101 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102 elif tag == firecrest.Firecrest.FIRECREST.lower():
103 self.firecrest = firecrest.Firecrest(xml=element)
104 elif tag == bustard.Bustard.BUSTARD.lower():
105 self.bustard = bustard.Bustard(xml=element)
106 elif tag == gerald.Gerald.GERALD.lower():
107 self.gerald = gerald.Gerald(xml=element)
109 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
111 def _get_run_name(self):
113 Given a run tuple, find the latest date and use that as our name
115 if self._name is None:
116 tmax = max(self.firecrest.time, self.bustard.time, self.gerald.time)
117 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
118 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
120 name = property(_get_run_name)
122 def save(self, destdir=None):
125 logging.info("Saving run report "+ self.name)
126 xml = self.get_elements()
128 dest_pathname = os.path.join(destdir, self.name)
129 ElementTree.ElementTree(xml).write(dest_pathname)
131 def load(self, filename):
132 logging.info("Loading run report from " + filename)
133 tree = ElementTree.parse(filename).getroot()
134 self.set_elements(tree)
136 def get_runs(runfolder):
138 Search through a run folder for all the various sub component runs
139 and then return a PipelineRun for each different combination.
141 For example if there are two different GERALD runs, this will
142 generate two different PipelineRun objects, that differ
143 in there gerald component.
145 from htsworkflow.pipelines import firecrest
146 from htsworkflow.pipelines import bustard
147 from htsworkflow.pipelines import gerald
149 datadir = os.path.join(runfolder, 'Data')
151 logging.info('Searching for runs in ' + datadir)
153 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
154 f = firecrest.firecrest(firecrest_pathname)
155 bustard_glob = os.path.join(firecrest_pathname, "Bustard*")
156 for bustard_pathname in glob(bustard_glob):
157 b = bustard.bustard(bustard_pathname)
158 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
159 for gerald_pathname in glob(gerald_glob):
161 g = gerald.gerald(gerald_pathname)
162 runs.append(PipelineRun(runfolder, f, b, g))
164 print "Ignoring", str(e)
168 def extract_run_parameters(runs):
170 Search through runfolder_path for various runs and grab their parameters
175 def summarize_mapped_reads(mapped_reads):
177 Summarize per chromosome reads into a genome count
178 But handle spike-in/contamination symlinks seperately.
180 summarized_reads = {}
183 for k, v in mapped_reads.items():
184 path, k = os.path.split(k)
189 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
190 summarized_reads[genome] = genome_reads
191 return summarized_reads
193 def summary_report(runs):
195 Summarize cluster numbers and mapped read counts for a runfolder
200 report.append('Summary for %s' % (run.name,))
202 eland_keys = run.gerald.eland_results.results.keys()
203 eland_keys.sort(alphanum)
205 lane_results = run.gerald.summary.lane_results
206 for lane_id in eland_keys:
207 result = run.gerald.eland_results.results[lane_id]
208 report.append("Sample name %s" % (result.sample_name))
209 report.append("Lane id %s" % (result.lane_id,))
210 cluster = lane_results[result.lane_id].cluster
211 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
212 report.append("Total Reads: %d" % (result.reads))
213 mc = result._match_codes
215 nm_percent = float(nm)/result.reads * 100
217 qc_percent = float(qc)/result.reads * 100
219 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
220 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
221 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
222 (mc['U0'], mc['U1'], mc['U2']))
223 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
224 (mc['R0'], mc['R1'], mc['R2']))
225 report.append("Mapped Reads")
226 mapped_reads = summarize_mapped_reads(result.mapped_reads)
227 for name, counts in mapped_reads.items():
228 report.append(" %s: %d" % (name, counts))
231 return os.linesep.join(report)
233 def extract_results(runs, output_base_dir=None):
234 if output_base_dir is None:
235 output_base_dir = os.getcwd()
238 result_dir = os.path.join(output_base_dir, r.flowcell_id)
239 logging.info("Using %s as result directory" % (result_dir,))
240 if not os.path.exists(result_dir):
244 cycle = "C%d-%d" % (r.firecrest.start, r.firecrest.stop)
245 logging.info("Filling in %s" % (cycle,))
246 cycle_dir = os.path.join(result_dir, cycle)
247 if os.path.exists(cycle_dir):
248 logging.error("%s already exists, not overwriting" % (cycle_dir,))
253 # copy stuff out of the main run
260 summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
261 if os.path.exists(summary_path):
262 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
263 shutil.copy(summary_path, cycle_dir)
265 logging.info('Summary file %s was not found' % (summary_path,))
269 for f in os.listdir(g.pathname):
270 if re.match('.*_score.txt', f):
271 score_files.append(f)
273 tar_cmd = ['/bin/tar', 'c'] + score_files
274 bzip_cmd = [ 'bzip2', '-9', '-c' ]
275 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
276 tar_dest = open(tar_dest_name, 'w')
277 logging.info("Compressing score files in %s" % (g.pathname,))
278 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
279 logging.info("Running bzip2: " + " ".join(bzip_cmd))
280 logging.info("Writing to %s" %(tar_dest_name))
282 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
283 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
286 # copy & bzip eland files
287 for eland_lane in g.eland_results.values():
288 source_name = eland_lane.pathname
289 path, name = os.path.split(eland_lane.pathname)
290 dest_name = os.path.join(cycle_dir, name+'.bz2')
292 args = ['bzip2', '-9', '-c', source_name]
293 logging.info('Running: %s' % ( " ".join(args) ))
294 bzip_dest = open(dest_name, 'w')
295 bzip = subprocess.Popen(args, stdout=bzip_dest)
296 logging.info('Saving to %s' % (dest_name, ))
299 def clean_runs(runs):
301 Clean up run folders to optimize for compression.
303 # TODO: implement this.
310 # cd Data/C1-*_Firecrest*
311 # make clean_intermediate