2 Core information needed to inspect a runfolder.
15 from xml.etree import ElementTree
16 except ImportError, e:
17 from elementtree import ElementTree
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
28 class PipelineRun(object):
30 Capture "interesting" information about a pipeline run
33 PIPELINE_RUN = 'PipelineRun'
34 FLOWCELL_ID = 'FlowcellID'
36 def __init__(self, pathname=None, xml=None):
37 if pathname is not None:
38 self.pathname = os.path.normpath(pathname)
42 self._flowcell_id = None
43 self.image_analysis = None
48 self.set_elements(xml)
50 def _get_flowcell_id(self):
52 if self._flowcell_id is None:
53 config_dir = os.path.join(self.pathname, 'Config')
54 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55 if os.path.exists(flowcell_id_path):
56 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57 self._flowcell_id = flowcell_id_tree.findtext('Text')
59 path_fields = self.pathname.split('_')
60 if len(path_fields) > 0:
61 # guessing last element of filename
62 flowcell_id = path_fields[-1]
64 flowcell_id = 'unknown'
67 "Flowcell id was not found, guessing %s" % (
69 self._flowcell_id = flowcell_id
70 return self._flowcell_id
71 flowcell_id = property(_get_flowcell_id)
73 def get_elements(self):
75 make one master xml file from all of our sub-components.
77 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79 flowcell.text = self.flowcell_id
80 root.append(self.image_analysis.get_elements())
81 root.append(self.bustard.get_elements())
82 root.append(self.gerald.get_elements())
85 def set_elements(self, tree):
86 # this file gets imported by all the others,
87 # so we need to hide the imports to avoid a cyclic imports
88 from htsworkflow.pipelines import firecrest
89 from htsworkflow.pipelines import ipar
90 from htsworkflow.pipelines import bustard
91 from htsworkflow.pipelines import gerald
93 tag = tree.tag.lower()
94 if tag != PipelineRun.PIPELINE_RUN.lower():
95 raise ValueError('Pipeline Run Expecting %s got %s' % (
96 PipelineRun.PIPELINE_RUN, tag))
98 tag = element.tag.lower()
99 if tag == PipelineRun.FLOWCELL_ID.lower():
100 self._flowcell_id = element.text
101 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102 # you should only have Firecrest or IPAR, never both of them.
103 elif tag == firecrest.Firecrest.FIRECREST.lower():
104 self.image_analysis = firecrest.Firecrest(xml=element)
105 elif tag == ipar.IPAR.IPAR.lower():
106 self.image_analysis = ipar.IPAR(xml=element)
107 elif tag == bustard.Bustard.BUSTARD.lower():
108 self.bustard = bustard.Bustard(xml=element)
109 elif tag == gerald.Gerald.GERALD.lower():
110 self.gerald = gerald.Gerald(xml=element)
112 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
114 def _get_run_name(self):
116 Given a run tuple, find the latest date and use that as our name
118 if self._name is None:
119 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
123 name = property(_get_run_name)
125 def save(self, destdir=None):
128 logging.info("Saving run report "+ self.name)
129 xml = self.get_elements()
131 dest_pathname = os.path.join(destdir, self.name)
132 ElementTree.ElementTree(xml).write(dest_pathname)
134 def load(self, filename):
135 logging.info("Loading run report from " + filename)
136 tree = ElementTree.parse(filename).getroot()
137 self.set_elements(tree)
139 def load_pipeline_run_xml(pathname):
141 Load and instantiate a Pipeline run from a run xml file
144 - `pathname` : location of an run xml file
146 :Returns: initialized PipelineRun object
148 tree = ElementTree.parse(pathname).getroot()
149 run = PipelineRun(xml=tree)
152 def get_runs(runfolder):
154 Search through a run folder for all the various sub component runs
155 and then return a PipelineRun for each different combination.
157 For example if there are two different GERALD runs, this will
158 generate two different PipelineRun objects, that differ
159 in there gerald component.
161 from htsworkflow.pipelines import firecrest
162 from htsworkflow.pipelines import ipar
163 from htsworkflow.pipelines import bustard
164 from htsworkflow.pipelines import gerald
166 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
167 logging.info("Looking for bustard directories in %s" % (pathname,))
168 bustard_glob = os.path.join(pathname, "Bustard*")
169 for bustard_pathname in glob(bustard_glob):
170 logging.info("Found bustard directory %s" % (bustard_pathname,))
171 b = bustard.bustard(bustard_pathname)
172 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
173 logging.info("Looking for gerald directories in %s" % (pathname,))
174 for gerald_pathname in glob(gerald_glob):
175 logging.info("Found gerald directory %s" % (gerald_pathname,))
177 g = gerald.gerald(gerald_pathname)
178 p = PipelineRun(runfolder)
179 p.image_analysis = image_analysis
184 logging.error("Ignoring " + str(e))
186 datadir = os.path.join(runfolder, 'Data')
188 logging.info('Searching for runs in ' + datadir)
190 # scan for firecrest directories
191 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
192 logging.info('Found firecrest in ' + datadir)
193 image_analysis = firecrest.firecrest(firecrest_pathname)
194 if image_analysis is None:
196 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
199 scan_post_image_analysis(
200 runs, runfolder, image_analysis, firecrest_pathname
202 # scan for IPAR directories
203 for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
204 logging.info('Found ipar directories in ' + datadir)
205 image_analysis = ipar.ipar(ipar_pathname)
206 if image_analysis is None:
208 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
211 scan_post_image_analysis(
212 runs, runfolder, image_analysis, ipar_pathname
218 def extract_run_parameters(runs):
220 Search through runfolder_path for various runs and grab their parameters
225 def summarize_mapped_reads(genome_map, mapped_reads):
227 Summarize per chromosome reads into a genome count
228 But handle spike-in/contamination symlinks seperately.
230 summarized_reads = {}
233 for k, v in mapped_reads.items():
234 path, k = os.path.split(k)
235 if len(path) > 0 and not genome_map.has_key(path):
239 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
240 summarized_reads[genome] = genome_reads
241 return summarized_reads
243 def summarize_lane(gerald, lane_id):
245 summary_results = gerald.summary.lane_results
246 for end in range(len(summary_results)):
247 eland_result = gerald.eland_results.results[end][lane_id]
248 report.append("Sample name %s" % (eland_result.sample_name))
249 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
250 cluster = summary_results[end][eland_result.lane_id].cluster
251 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
252 report.append("Total Reads: %d" % (eland_result.reads))
253 mc = eland_result._match_codes
255 nm_percent = float(nm)/eland_result.reads * 100
257 qc_percent = float(qc)/eland_result.reads * 100
259 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
260 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
261 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
262 (mc['U0'], mc['U1'], mc['U2']))
263 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
264 (mc['R0'], mc['R1'], mc['R2']))
265 report.append("Mapped Reads")
266 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
267 for name, counts in mapped_reads.items():
268 report.append(" %s: %d" % (name, counts))
272 def summary_report(runs):
274 Summarize cluster numbers and mapped read counts for a runfolder
279 report.append('Summary for %s' % (run.name,))
281 eland_keys = run.gerald.eland_results.results[0].keys()
282 eland_keys.sort(alphanum)
284 for lane_id in eland_keys:
285 report.extend(summarize_lane(run.gerald, lane_id))
288 return os.linesep.join(report)
290 def is_compressed(filename):
291 if os.path.splitext(filename)[1] == ".gz":
293 elif os.path.splitext(filename)[1] == '.bz2':
298 def extract_results(runs, output_base_dir=None):
299 if output_base_dir is None:
300 output_base_dir = os.getcwd()
303 result_dir = os.path.join(output_base_dir, r.flowcell_id)
304 logging.info("Using %s as result directory" % (result_dir,))
305 if not os.path.exists(result_dir):
309 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
310 logging.info("Filling in %s" % (cycle,))
311 cycle_dir = os.path.join(result_dir, cycle)
312 if os.path.exists(cycle_dir):
313 logging.error("%s already exists, not overwriting" % (cycle_dir,))
318 # copy stuff out of the main run
325 summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
326 if os.path.exists(summary_path):
327 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
328 shutil.copy(summary_path, cycle_dir)
330 logging.info('Summary file %s was not found' % (summary_path,))
335 # check for g.pathname/Temp a new feature of 1.1rc1
336 scores_path = g.pathname
337 scores_path_temp = os.path.join(scores_path, 'Temp')
338 if os.path.isdir(scores_path_temp):
339 scores_path = scores_path_temp
341 # hopefully we have a directory that contains s_*_score files
342 for f in os.listdir(scores_path):
343 if re.match('.*_score.txt', f):
344 score_files.append(f)
346 tar_cmd = ['/bin/tar', 'c'] + score_files
347 bzip_cmd = [ 'bzip2', '-9', '-c' ]
348 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
349 tar_dest = open(tar_dest_name, 'w')
350 logging.info("Compressing score files from %s" % (scores_path,))
351 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
352 logging.info("Running bzip2: " + " ".join(bzip_cmd))
353 logging.info("Writing to %s" %(tar_dest_name))
355 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False,
357 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
360 # copy & bzip eland files
361 for lanes_dictionary in g.eland_results.results:
362 for eland_lane in lanes_dictionary.values():
363 source_name = eland_lane.pathname
364 path, name = os.path.split(eland_lane.pathname)
365 dest_name = os.path.join(cycle_dir, name)
366 logging.info("Saving eland file %s to %s" % \
367 (source_name, dest_name))
369 if is_compressed(name):
370 logging.info('Already compressed, Saving to %s' % (dest_name, ))
371 shutil.copy(source_name, dest_name)
375 args = ['bzip2', '-9', '-c', source_name]
376 logging.info('Running: %s' % ( " ".join(args) ))
377 bzip_dest = open(dest_name, 'w')
378 bzip = subprocess.Popen(args, stdout=bzip_dest)
379 logging.info('Saving to %s' % (dest_name, ))
382 def clean_runs(runs):
384 Clean up run folders to optimize for compression.
386 # TODO: implement this.
393 # cd Data/C1-*_Firecrest*
394 # make clean_intermediate