2 Core information needed to inspect a runfolder.
15 from xml.etree import ElementTree
16 except ImportError, e:
17 from elementtree import ElementTree
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
28 class PipelineRun(object):
30 Capture "interesting" information about a pipeline run
33 PIPELINE_RUN = 'PipelineRun'
34 FLOWCELL_ID = 'FlowcellID'
36 def __init__(self, pathname=None, xml=None):
37 if pathname is not None:
38 self.pathname = os.path.normpath(pathname)
42 self._flowcell_id = None
43 self.image_analysis = None
48 self.set_elements(xml)
50 def _get_flowcell_id(self):
52 if self._flowcell_id is None:
53 config_dir = os.path.join(self.pathname, 'Config')
54 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55 if os.path.exists(flowcell_id_path):
56 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57 self._flowcell_id = flowcell_id_tree.findtext('Text')
59 path_fields = self.pathname.split('_')
60 if len(path_fields) > 0:
61 # guessing last element of filename
62 flowcell_id = path_fields[-1]
64 flowcell_id = 'unknown'
67 "Flowcell id was not found, guessing %s" % (
69 self._flowcell_id = flowcell_id
70 return self._flowcell_id
71 flowcell_id = property(_get_flowcell_id)
73 def get_elements(self):
75 make one master xml file from all of our sub-components.
77 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79 flowcell.text = self.flowcell_id
80 root.append(self.image_analysis.get_elements())
81 root.append(self.bustard.get_elements())
82 root.append(self.gerald.get_elements())
85 def set_elements(self, tree):
86 # this file gets imported by all the others,
87 # so we need to hide the imports to avoid a cyclic imports
88 from htsworkflow.pipelines import firecrest
89 from htsworkflow.pipelines import ipar
90 from htsworkflow.pipelines import bustard
91 from htsworkflow.pipelines import gerald
93 tag = tree.tag.lower()
94 if tag != PipelineRun.PIPELINE_RUN.lower():
95 raise ValueError('Pipeline Run Expecting %s got %s' % (
96 PipelineRun.PIPELINE_RUN, tag))
98 tag = element.tag.lower()
99 if tag == PipelineRun.FLOWCELL_ID.lower():
100 self._flowcell_id = element.text
101 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102 # you should only have Firecrest or IPAR, never both of them.
103 elif tag == firecrest.Firecrest.FIRECREST.lower():
104 self.image_analysis = firecrest.Firecrest(xml=element)
105 elif tag == ipar.IPAR.IPAR.lower():
106 self.image_analysis = ipar.IPAR(xml=element)
107 elif tag == bustard.Bustard.BUSTARD.lower():
108 self.bustard = bustard.Bustard(xml=element)
109 elif tag == gerald.Gerald.GERALD.lower():
110 self.gerald = gerald.Gerald(xml=element)
112 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
114 def _get_run_name(self):
116 Given a run tuple, find the latest date and use that as our name
118 if self._name is None:
119 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
123 name = property(_get_run_name)
125 def save(self, destdir=None):
128 logging.info("Saving run report "+ self.name)
129 xml = self.get_elements()
131 dest_pathname = os.path.join(destdir, self.name)
132 ElementTree.ElementTree(xml).write(dest_pathname)
134 def load(self, filename):
135 logging.info("Loading run report from " + filename)
136 tree = ElementTree.parse(filename).getroot()
137 self.set_elements(tree)
139 def get_runs(runfolder):
141 Search through a run folder for all the various sub component runs
142 and then return a PipelineRun for each different combination.
144 For example if there are two different GERALD runs, this will
145 generate two different PipelineRun objects, that differ
146 in there gerald component.
148 from htsworkflow.pipelines import firecrest
149 from htsworkflow.pipelines import ipar
150 from htsworkflow.pipelines import bustard
151 from htsworkflow.pipelines import gerald
153 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
154 logging.info("Looking for bustard directories in %s" % (pathname,))
155 bustard_glob = os.path.join(pathname, "Bustard*")
156 for bustard_pathname in glob(bustard_glob):
157 logging.info("Found bustard directory %s" % (bustard_pathname,))
158 b = bustard.bustard(bustard_pathname)
159 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
160 logging.info("Looking for gerald directories in %s" % (pathname,))
161 for gerald_pathname in glob(gerald_glob):
162 logging.info("Found gerald directory %s" % (gerald_pathname,))
164 g = gerald.gerald(gerald_pathname)
165 p = PipelineRun(runfolder)
166 p.image_analysis = image_analysis
171 print "Ignoring", str(e)
173 datadir = os.path.join(runfolder, 'Data')
175 logging.info('Searching for runs in ' + datadir)
177 # scan for firecrest directories
178 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
179 logging.info('Found firecrest in ' + datadir)
180 image_analysis = firecrest.firecrest(firecrest_pathname)
181 scan_post_image_analysis(runs, runfolder, image_analysis, firecrest_pathname)
182 # scan for IPAR directories
183 for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
184 logging.info('Found ipar directories in ' + datadir)
185 image_analysis = ipar.ipar(ipar_pathname)
186 scan_post_image_analysis(runs, runfolder, image_analysis, ipar_pathname)
191 def extract_run_parameters(runs):
193 Search through runfolder_path for various runs and grab their parameters
198 def summarize_mapped_reads(mapped_reads):
200 Summarize per chromosome reads into a genome count
201 But handle spike-in/contamination symlinks seperately.
203 summarized_reads = {}
206 for k, v in mapped_reads.items():
207 path, k = os.path.split(k)
212 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
213 summarized_reads[genome] = genome_reads
214 return summarized_reads
216 def summarize_lane(gerald, lane_id):
218 summary_results = gerald.summary.lane_results
219 for end in range(len(summary_results)):
220 eland_result = gerald.eland_results.results[end][lane_id]
221 report.append("Sample name %s" % (eland_result.sample_name))
222 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
223 cluster = summary_results[end][eland_result.lane_id].cluster
224 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
225 report.append("Total Reads: %d" % (eland_result.reads))
226 mc = eland_result._match_codes
228 nm_percent = float(nm)/eland_result.reads * 100
230 qc_percent = float(qc)/eland_result.reads * 100
232 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
233 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
234 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
235 (mc['U0'], mc['U1'], mc['U2']))
236 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
237 (mc['R0'], mc['R1'], mc['R2']))
238 report.append("Mapped Reads")
239 mapped_reads = summarize_mapped_reads(eland_result.mapped_reads)
240 for name, counts in mapped_reads.items():
241 report.append(" %s: %d" % (name, counts))
245 def summary_report(runs):
247 Summarize cluster numbers and mapped read counts for a runfolder
252 report.append('Summary for %s' % (run.name,))
254 eland_keys = run.gerald.eland_results.results[0].keys()
255 eland_keys.sort(alphanum)
257 for lane_id in eland_keys:
258 report.extend(summarize_lane(run.gerald, lane_id))
261 return os.linesep.join(report)
263 def is_compressed(filename):
264 if os.path.splitext(filename)[1] == ".gz":
266 elif os.path.splitext(filename)[1] == '.bz2':
271 def extract_results(runs, output_base_dir=None):
272 if output_base_dir is None:
273 output_base_dir = os.getcwd()
276 result_dir = os.path.join(output_base_dir, r.flowcell_id)
277 logging.info("Using %s as result directory" % (result_dir,))
278 if not os.path.exists(result_dir):
282 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
283 logging.info("Filling in %s" % (cycle,))
284 cycle_dir = os.path.join(result_dir, cycle)
285 if os.path.exists(cycle_dir):
286 logging.error("%s already exists, not overwriting" % (cycle_dir,))
291 # copy stuff out of the main run
298 summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
299 if os.path.exists(summary_path):
300 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
301 shutil.copy(summary_path, cycle_dir)
303 logging.info('Summary file %s was not found' % (summary_path,))
307 for f in os.listdir(g.pathname):
308 if re.match('.*_score.txt', f):
309 score_files.append(f)
311 tar_cmd = ['/bin/tar', 'c'] + score_files
312 bzip_cmd = [ 'bzip2', '-9', '-c' ]
313 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
314 tar_dest = open(tar_dest_name, 'w')
315 logging.info("Compressing score files in %s" % (g.pathname,))
316 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
317 logging.info("Running bzip2: " + " ".join(bzip_cmd))
318 logging.info("Writing to %s" %(tar_dest_name))
320 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
321 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
324 # copy & bzip eland files
325 for lanes_dictionary in g.eland_results.results:
326 for eland_lane in lanes_dictionary.values():
327 source_name = eland_lane.pathname
328 path, name = os.path.split(eland_lane.pathname)
329 dest_name = os.path.join(cycle_dir, name)
330 if is_compressed(name):
331 logging.info('Already compressed, Saving to %s' % (dest_name, ))
332 shutil.copy(source_name, dest_name)
336 args = ['bzip2', '-9', '-c', source_name]
337 logging.info('Running: %s' % ( " ".join(args) ))
338 bzip_dest = open(dest_name, 'w')
339 bzip = subprocess.Popen(args, stdout=bzip_dest)
340 logging.info('Saving to %s' % (dest_name, ))
343 def clean_runs(runs):
345 Clean up run folders to optimize for compression.
347 # TODO: implement this.
354 # cd Data/C1-*_Firecrest*
355 # make clean_intermediate