2 Core information needed to inspect a runfolder.
15 from xml.etree import ElementTree
16 except ImportError, e:
17 from elementtree import ElementTree
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
28 class PipelineRun(object):
30 Capture "interesting" information about a pipeline run
33 PIPELINE_RUN = 'PipelineRun'
34 FLOWCELL_ID = 'FlowcellID'
36 def __init__(self, pathname=None, xml=None):
37 if pathname is not None:
38 self.pathname = os.path.normpath(pathname)
42 self._flowcell_id = None
43 self.image_analysis = None
48 self.set_elements(xml)
50 def _get_flowcell_id(self):
52 if self._flowcell_id is None:
53 config_dir = os.path.join(self.pathname, 'Config')
54 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55 if os.path.exists(flowcell_id_path):
56 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57 self._flowcell_id = flowcell_id_tree.findtext('Text')
59 path_fields = self.pathname.split('_')
60 if len(path_fields) > 0:
61 # guessing last element of filename
62 flowcell_id = path_fields[-1]
64 flowcell_id = 'unknown'
67 "Flowcell id was not found, guessing %s" % (
69 self._flowcell_id = flowcell_id
70 return self._flowcell_id
71 flowcell_id = property(_get_flowcell_id)
73 def get_elements(self):
75 make one master xml file from all of our sub-components.
77 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79 flowcell.text = self.flowcell_id
80 root.append(self.image_analysis.get_elements())
81 root.append(self.bustard.get_elements())
82 root.append(self.gerald.get_elements())
85 def set_elements(self, tree):
86 # this file gets imported by all the others,
87 # so we need to hide the imports to avoid a cyclic imports
88 from htsworkflow.pipelines import firecrest
89 from htsworkflow.pipelines import ipar
90 from htsworkflow.pipelines import bustard
91 from htsworkflow.pipelines import gerald
93 tag = tree.tag.lower()
94 if tag != PipelineRun.PIPELINE_RUN.lower():
95 raise ValueError('Pipeline Run Expecting %s got %s' % (
96 PipelineRun.PIPELINE_RUN, tag))
98 tag = element.tag.lower()
99 if tag == PipelineRun.FLOWCELL_ID.lower():
100 self._flowcell_id = element.text
101 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102 # you should only have Firecrest or IPAR, never both of them.
103 elif tag == firecrest.Firecrest.FIRECREST.lower():
104 self.image_analysis = firecrest.Firecrest(xml=element)
105 elif tag == ipar.IPAR.IPAR.lower():
106 self.image_analysis = ipar.IPAR(xml=element)
107 elif tag == bustard.Bustard.BUSTARD.lower():
108 self.bustard = bustard.Bustard(xml=element)
109 elif tag == gerald.Gerald.GERALD.lower():
110 self.gerald = gerald.Gerald(xml=element)
112 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
114 def _get_run_name(self):
116 Given a run tuple, find the latest date and use that as our name
118 if self._name is None:
119 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
123 name = property(_get_run_name)
125 def save(self, destdir=None):
128 logging.info("Saving run report "+ self.name)
129 xml = self.get_elements()
131 dest_pathname = os.path.join(destdir, self.name)
132 ElementTree.ElementTree(xml).write(dest_pathname)
134 def load(self, filename):
135 logging.info("Loading run report from " + filename)
136 tree = ElementTree.parse(filename).getroot()
137 self.set_elements(tree)
139 def get_runs(runfolder):
141 Search through a run folder for all the various sub component runs
142 and then return a PipelineRun for each different combination.
144 For example if there are two different GERALD runs, this will
145 generate two different PipelineRun objects, that differ
146 in there gerald component.
148 from htsworkflow.pipelines import firecrest
149 from htsworkflow.pipelines import ipar
150 from htsworkflow.pipelines import bustard
151 from htsworkflow.pipelines import gerald
153 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
154 logging.info("Looking for bustard directories in %s" % (pathname,))
155 bustard_glob = os.path.join(pathname, "Bustard*")
156 for bustard_pathname in glob(bustard_glob):
157 logging.info("Found bustard directory %s" % (bustard_pathname,))
158 b = bustard.bustard(bustard_pathname)
159 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
160 logging.info("Looking for gerald directories in %s" % (pathname,))
161 for gerald_pathname in glob(gerald_glob):
162 logging.info("Found gerald directory %s" % (gerald_pathname,))
164 g = gerald.gerald(gerald_pathname)
165 p = PipelineRun(runfolder)
166 p.image_analysis = image_analysis
171 print "Ignoring", str(e)
173 datadir = os.path.join(runfolder, 'Data')
175 logging.info('Searching for runs in ' + datadir)
177 # scan for firecrest directories
178 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
179 logging.info('Found firecrest in ' + datadir)
180 image_analysis = firecrest.firecrest(firecrest_pathname)
181 if image_analysis is None:
183 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
186 scan_post_image_analysis(
187 runs, runfolder, image_analysis, firecrest_pathname
189 # scan for IPAR directories
190 for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
191 logging.info('Found ipar directories in ' + datadir)
192 image_analysis = ipar.ipar(ipar_pathname)
193 if image_analysis is None:
195 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
198 scan_post_image_analysis(
199 runs, runfolder, image_analysis, ipar_pathname
205 def extract_run_parameters(runs):
207 Search through runfolder_path for various runs and grab their parameters
212 def summarize_mapped_reads(mapped_reads):
214 Summarize per chromosome reads into a genome count
215 But handle spike-in/contamination symlinks seperately.
217 summarized_reads = {}
220 for k, v in mapped_reads.items():
221 path, k = os.path.split(k)
226 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
227 summarized_reads[genome] = genome_reads
228 return summarized_reads
230 def summarize_lane(gerald, lane_id):
232 summary_results = gerald.summary.lane_results
233 for end in range(len(summary_results)):
234 eland_result = gerald.eland_results.results[end][lane_id]
235 report.append("Sample name %s" % (eland_result.sample_name))
236 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
237 cluster = summary_results[end][eland_result.lane_id].cluster
238 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
239 report.append("Total Reads: %d" % (eland_result.reads))
240 mc = eland_result._match_codes
242 nm_percent = float(nm)/eland_result.reads * 100
244 qc_percent = float(qc)/eland_result.reads * 100
246 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
247 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
248 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
249 (mc['U0'], mc['U1'], mc['U2']))
250 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
251 (mc['R0'], mc['R1'], mc['R2']))
252 report.append("Mapped Reads")
253 mapped_reads = summarize_mapped_reads(eland_result.mapped_reads)
254 for name, counts in mapped_reads.items():
255 report.append(" %s: %d" % (name, counts))
259 def summary_report(runs):
261 Summarize cluster numbers and mapped read counts for a runfolder
266 report.append('Summary for %s' % (run.name,))
268 eland_keys = run.gerald.eland_results.results[0].keys()
269 eland_keys.sort(alphanum)
271 for lane_id in eland_keys:
272 report.extend(summarize_lane(run.gerald, lane_id))
275 return os.linesep.join(report)
277 def is_compressed(filename):
278 if os.path.splitext(filename)[1] == ".gz":
280 elif os.path.splitext(filename)[1] == '.bz2':
285 def extract_results(runs, output_base_dir=None):
286 if output_base_dir is None:
287 output_base_dir = os.getcwd()
290 result_dir = os.path.join(output_base_dir, r.flowcell_id)
291 logging.info("Using %s as result directory" % (result_dir,))
292 if not os.path.exists(result_dir):
296 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
297 logging.info("Filling in %s" % (cycle,))
298 cycle_dir = os.path.join(result_dir, cycle)
299 if os.path.exists(cycle_dir):
300 logging.error("%s already exists, not overwriting" % (cycle_dir,))
305 # copy stuff out of the main run
312 summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
313 if os.path.exists(summary_path):
314 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
315 shutil.copy(summary_path, cycle_dir)
317 logging.info('Summary file %s was not found' % (summary_path,))
322 # check for g.pathname/Temp a new feature of 1.1rc1
323 scores_path = g.pathname
324 scores_path_temp = os.path.join(scores_path, 'Temp')
325 if os.path.isdir(scores_path_temp):
326 scores_path = scores_path_temp
328 # hopefully we have a directory that contains s_*_score files
329 for f in os.listdir(scores_path):
330 if re.match('.*_score.txt', f):
331 score_files.append(f)
333 tar_cmd = ['/bin/tar', 'c'] + score_files
334 bzip_cmd = [ 'bzip2', '-9', '-c' ]
335 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
336 tar_dest = open(tar_dest_name, 'w')
337 logging.info("Compressing score files from %s" % (scores_path,))
338 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
339 logging.info("Running bzip2: " + " ".join(bzip_cmd))
340 logging.info("Writing to %s" %(tar_dest_name))
342 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False,
344 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
347 # copy & bzip eland files
348 for lanes_dictionary in g.eland_results.results:
349 for eland_lane in lanes_dictionary.values():
350 source_name = eland_lane.pathname
351 path, name = os.path.split(eland_lane.pathname)
352 dest_name = os.path.join(cycle_dir, name)
353 logging.info("Saving eland file %s to %s" % \
354 (source_name, dest_name))
356 if is_compressed(name):
357 logging.info('Already compressed, Saving to %s' % (dest_name, ))
358 shutil.copy(source_name, dest_name)
362 args = ['bzip2', '-9', '-c', source_name]
363 logging.info('Running: %s' % ( " ".join(args) ))
364 bzip_dest = open(dest_name, 'w')
365 bzip = subprocess.Popen(args, stdout=bzip_dest)
366 logging.info('Saving to %s' % (dest_name, ))
369 def clean_runs(runs):
371 Clean up run folders to optimize for compression.
373 # TODO: implement this.
380 # cd Data/C1-*_Firecrest*
381 # make clean_intermediate