2 Core information needed to inspect a runfolder.
15 from xml.etree import ElementTree
16 except ImportError, e:
17 from elementtree import ElementTree
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
25 from htsworkflow.util.alphanum import alphanum
26 from htsworkflow.util.ethelp import indent, flatten
28 class PipelineRun(object):
30 Capture "interesting" information about a pipeline run
33 PIPELINE_RUN = 'PipelineRun'
34 FLOWCELL_ID = 'FlowcellID'
36 def __init__(self, pathname=None, xml=None):
37 if pathname is not None:
38 self.pathname = os.path.normpath(pathname)
42 self._flowcell_id = None
43 self.image_analysis = None
48 self.set_elements(xml)
50 def _get_flowcell_id(self):
52 if self._flowcell_id is None:
53 config_dir = os.path.join(self.pathname, 'Config')
54 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55 if os.path.exists(flowcell_id_path):
56 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57 self._flowcell_id = flowcell_id_tree.findtext('Text')
59 path_fields = self.pathname.split('_')
60 if len(path_fields) > 0:
61 # guessing last element of filename
62 flowcell_id = path_fields[-1]
64 flowcell_id = 'unknown'
67 "Flowcell id was not found, guessing %s" % (
69 self._flowcell_id = flowcell_id
70 return self._flowcell_id
71 flowcell_id = property(_get_flowcell_id)
73 def get_elements(self):
75 make one master xml file from all of our sub-components.
77 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79 flowcell.text = self.flowcell_id
80 root.append(self.image_analysis.get_elements())
81 root.append(self.bustard.get_elements())
82 root.append(self.gerald.get_elements())
85 def set_elements(self, tree):
86 # this file gets imported by all the others,
87 # so we need to hide the imports to avoid a cyclic imports
88 from htsworkflow.pipelines import firecrest
89 from htsworkflow.pipelines import ipar
90 from htsworkflow.pipelines import bustard
91 from htsworkflow.pipelines import gerald
93 tag = tree.tag.lower()
94 if tag != PipelineRun.PIPELINE_RUN.lower():
95 raise ValueError('Pipeline Run Expecting %s got %s' % (
96 PipelineRun.PIPELINE_RUN, tag))
98 tag = element.tag.lower()
99 if tag == PipelineRun.FLOWCELL_ID.lower():
100 self._flowcell_id = element.text
101 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
102 # you should only have Firecrest or IPAR, never both of them.
103 elif tag == firecrest.Firecrest.FIRECREST.lower():
104 self.image_analysis = firecrest.Firecrest(xml=element)
105 elif tag == ipar.IPAR.IPAR.lower():
106 self.image_analysis = ipar.IPAR(xml=element)
107 elif tag == bustard.Bustard.BUSTARD.lower():
108 self.bustard = bustard.Bustard(xml=element)
109 elif tag == gerald.Gerald.GERALD.lower():
110 self.gerald = gerald.Gerald(xml=element)
112 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
114 def _get_run_name(self):
116 Given a run tuple, find the latest date and use that as our name
118 if self._name is None:
119 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
120 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
121 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
123 name = property(_get_run_name)
125 def save(self, destdir=None):
128 logging.info("Saving run report "+ self.name)
129 xml = self.get_elements()
131 dest_pathname = os.path.join(destdir, self.name)
132 ElementTree.ElementTree(xml).write(dest_pathname)
134 def load(self, filename):
135 logging.info("Loading run report from " + filename)
136 tree = ElementTree.parse(filename).getroot()
137 self.set_elements(tree)
139 def load_pipeline_run_xml(pathname):
141 Load and instantiate a Pipeline run from a run xml file
144 - `pathname` : location of an run xml file
146 :Returns: initialized PipelineRun object
148 tree = ElementTree.parse(pathname).getroot()
149 run = PipelineRun(xml=tree)
152 def get_runs(runfolder):
154 Search through a run folder for all the various sub component runs
155 and then return a PipelineRun for each different combination.
157 For example if there are two different GERALD runs, this will
158 generate two different PipelineRun objects, that differ
159 in there gerald component.
161 from htsworkflow.pipelines import firecrest
162 from htsworkflow.pipelines import ipar
163 from htsworkflow.pipelines import bustard
164 from htsworkflow.pipelines import gerald
166 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
167 logging.info("Looking for bustard directories in %s" % (pathname,))
168 bustard_glob = os.path.join(pathname, "Bustard*")
169 for bustard_pathname in glob(bustard_glob):
170 logging.info("Found bustard directory %s" % (bustard_pathname,))
171 b = bustard.bustard(bustard_pathname)
172 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
173 logging.info("Looking for gerald directories in %s" % (pathname,))
174 for gerald_pathname in glob(gerald_glob):
175 logging.info("Found gerald directory %s" % (gerald_pathname,))
177 g = gerald.gerald(gerald_pathname)
178 p = PipelineRun(runfolder)
179 p.image_analysis = image_analysis
184 logging.error("Ignoring " + str(e))
186 datadir = os.path.join(runfolder, 'Data')
188 logging.info('Searching for runs in ' + datadir)
190 # scan for firecrest directories
191 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
192 logging.info('Found firecrest in ' + datadir)
193 image_analysis = firecrest.firecrest(firecrest_pathname)
194 if image_analysis is None:
196 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
199 scan_post_image_analysis(
200 runs, runfolder, image_analysis, firecrest_pathname
202 # scan for IPAR directories
203 for ipar_pathname in glob(os.path.join(datadir,"IPAR_*")):
204 logging.info('Found ipar directories in ' + datadir)
205 image_analysis = ipar.ipar(ipar_pathname)
206 if image_analysis is None:
208 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
211 scan_post_image_analysis(
212 runs, runfolder, image_analysis, ipar_pathname
217 def get_specific_run(gerald_dir):
219 Given a gerald directory, construct a PipelineRun out of its parents
221 Basically this allows specifying a particular run instead of the previous
222 get_runs which scans a runfolder for various combinations of
223 firecrest/ipar/bustard/gerald runs.
225 from htsworkflow.pipelines import firecrest
226 from htsworkflow.pipelines import ipar
227 from htsworkflow.pipelines import bustard
228 from htsworkflow.pipelines import gerald
230 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
231 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
233 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
235 logging.info('--- use-run detected options ---')
236 logging.info('runfolder: %s' % (runfolder_dir,))
237 logging.info('image_dir: %s' % (image_dir,))
238 logging.info('bustard_dir: %s' % (bustard_dir,))
239 logging.info('gerald_dir: %s' % (gerald_dir,))
241 # find our processed image dir
243 # split into parent, and leaf directory
244 # leaf directory should be an IPAR or firecrest directory
245 data_dir, short_image_dir = os.path.split(image_dir)
246 logging.info('data_dir: %s' % (data_dir,))
247 logging.info('short_iamge_dir: %s' %(short_image_dir,))
249 # guess which type of image processing directory we have by looking
250 # in the leaf directory name
251 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
252 image_run = firecrest.firecrest(image_dir)
253 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
254 image_run = ipar.ipar(image_dir)
255 # if we din't find a run, report the error and return
256 if image_run is None:
257 msg = '%s does not contain an image processing step' % (image_dir,)
261 # find our base calling
262 base_calling_run = bustard.bustard(bustard_dir)
263 if base_calling_run is None:
264 logging.error('%s does not contain a bustard run' % (bustard_dir,))
268 gerald_run = gerald.gerald(gerald_dir)
269 if gerald_run is None:
270 logging.error('%s does not contain a gerald run' % (gerald_dir,))
273 p = PipelineRun(runfolder_dir)
274 p.image_analysis = image_run
275 p.bustard = base_calling_run
276 p.gerald = gerald_run
278 logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
281 def extract_run_parameters(runs):
283 Search through runfolder_path for various runs and grab their parameters
288 def summarize_mapped_reads(genome_map, mapped_reads):
290 Summarize per chromosome reads into a genome count
291 But handle spike-in/contamination symlinks seperately.
293 summarized_reads = {}
296 for k, v in mapped_reads.items():
297 path, k = os.path.split(k)
298 if len(path) > 0 and not genome_map.has_key(path):
302 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
303 summarized_reads[genome] = genome_reads
304 return summarized_reads
306 def summarize_lane(gerald, lane_id):
308 summary_results = gerald.summary.lane_results
309 for end in range(len(summary_results)):
310 eland_result = gerald.eland_results.results[end][lane_id]
311 report.append("Sample name %s" % (eland_result.sample_name))
312 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
313 cluster = summary_results[end][eland_result.lane_id].cluster
314 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
315 report.append("Total Reads: %d" % (eland_result.reads))
317 if hasattr(eland_result, 'match_codes'):
318 mc = eland_result.match_codes
320 nm_percent = float(nm)/eland_result.reads * 100
322 qc_percent = float(qc)/eland_result.reads * 100
324 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
325 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
326 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
327 (mc['U0'], mc['U1'], mc['U2']))
328 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
329 (mc['R0'], mc['R1'], mc['R2']))
331 if hasattr(eland_result, 'genome_map'):
332 report.append("Mapped Reads")
333 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
334 for name, counts in mapped_reads.items():
335 report.append(" %s: %d" % (name, counts))
340 def summary_report(runs):
342 Summarize cluster numbers and mapped read counts for a runfolder
347 report.append('Summary for %s' % (run.name,))
349 eland_keys = run.gerald.eland_results.results[0].keys()
350 eland_keys.sort(alphanum)
352 for lane_id in eland_keys:
353 report.extend(summarize_lane(run.gerald, lane_id))
356 return os.linesep.join(report)
358 def is_compressed(filename):
359 if os.path.splitext(filename)[1] == ".gz":
361 elif os.path.splitext(filename)[1] == '.bz2':
366 def extract_results(runs, output_base_dir=None):
367 if output_base_dir is None:
368 output_base_dir = os.getcwd()
371 result_dir = os.path.join(output_base_dir, r.flowcell_id)
372 logging.info("Using %s as result directory" % (result_dir,))
373 if not os.path.exists(result_dir):
377 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
378 logging.info("Filling in %s" % (cycle,))
379 cycle_dir = os.path.join(result_dir, cycle)
380 if os.path.exists(cycle_dir):
381 logging.error("%s already exists, not overwriting" % (cycle_dir,))
386 # copy stuff out of the main run
393 summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
394 if os.path.exists(summary_path):
395 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
396 shutil.copy(summary_path, cycle_dir)
398 logging.info('Summary file %s was not found' % (summary_path,))
403 # check for g.pathname/Temp a new feature of 1.1rc1
404 scores_path = g.pathname
405 scores_path_temp = os.path.join(scores_path, 'Temp')
406 if os.path.isdir(scores_path_temp):
407 scores_path = scores_path_temp
409 # hopefully we have a directory that contains s_*_score files
410 for f in os.listdir(scores_path):
411 if re.match('.*_score.txt', f):
412 score_files.append(f)
414 tar_cmd = ['/bin/tar', 'c'] + score_files
415 bzip_cmd = [ 'bzip2', '-9', '-c' ]
416 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
417 tar_dest = open(tar_dest_name, 'w')
418 logging.info("Compressing score files from %s" % (scores_path,))
419 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
420 logging.info("Running bzip2: " + " ".join(bzip_cmd))
421 logging.info("Writing to %s" %(tar_dest_name))
424 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
426 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
429 # copy & bzip eland files
430 for lanes_dictionary in g.eland_results.results:
431 for eland_lane in lanes_dictionary.values():
432 source_name = eland_lane.pathname
433 path, name = os.path.split(eland_lane.pathname)
434 dest_name = os.path.join(cycle_dir, name)
435 logging.info("Saving eland file %s to %s" % \
436 (source_name, dest_name))
438 if is_compressed(name):
439 logging.info('Already compressed, Saving to %s' % (dest_name, ))
440 shutil.copy(source_name, dest_name)
444 args = ['bzip2', '-9', '-c', source_name]
445 logging.info('Running: %s' % ( " ".join(args) ))
446 bzip_dest = open(dest_name, 'w')
447 bzip = subprocess.Popen(args, stdout=bzip_dest)
448 logging.info('Saving to %s' % (dest_name, ))
451 def rm_list(files, dry_run=True):
453 if os.path.exists(f):
454 logging.info('deleting %s' % (f,))
461 logging.warn("%s doesn't exist."% (f,))
463 def clean_runs(runs, dry_run=True):
465 Clean up run folders to optimize for compression.
468 logging.info('In dry-run mode')
471 logging.info('Cleaninging %s' % (run.pathname,))
473 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
474 rm_list(runlogs, dry_run)
476 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
477 rm_list(pipeline_logs, dry_run)
479 # rm NetCopy.log? Isn't this robocopy?
480 logs = glob(os.path.join(run.pathname, '*.log'))
481 rm_list(logs, dry_run)
484 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
485 rm_list(calibration_dir, dry_run)
487 logging.info("Cleaning images")
488 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
489 rm_list(image_dirs, dry_run)
490 # cd Data/C1-*_Firecrest*
491 logging.info("Cleaning intermediate files")
492 # make clean_intermediate
493 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
494 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
495 cwd=run.image_analysis.pathname,)