2 Core information needed to inspect a runfolder.
15 from xml.etree import ElementTree
16 except ImportError, e:
17 from elementtree import ElementTree
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
29 class PipelineRun(object):
31 Capture "interesting" information about a pipeline run
34 PIPELINE_RUN = 'PipelineRun'
35 FLOWCELL_ID = 'FlowcellID'
37 def __init__(self, pathname=None, xml=None):
38 if pathname is not None:
39 self.pathname = os.path.normpath(pathname)
43 self._flowcell_id = None
44 self.image_analysis = None
49 self.set_elements(xml)
51 def _get_flowcell_id(self):
53 if self._flowcell_id is None:
54 config_dir = os.path.join(self.pathname, 'Config')
55 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
56 if os.path.exists(flowcell_id_path):
57 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
58 self._flowcell_id = flowcell_id_tree.findtext('Text')
60 path_fields = self.pathname.split('_')
61 if len(path_fields) > 0:
62 # guessing last element of filename
63 flowcell_id = path_fields[-1]
65 flowcell_id = 'unknown'
68 "Flowcell id was not found, guessing %s" % (
70 self._flowcell_id = flowcell_id
71 return self._flowcell_id
72 flowcell_id = property(_get_flowcell_id)
74 def get_elements(self):
76 make one master xml file from all of our sub-components.
78 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
79 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
80 flowcell.text = self.flowcell_id
81 root.append(self.image_analysis.get_elements())
82 root.append(self.bustard.get_elements())
83 root.append(self.gerald.get_elements())
86 def set_elements(self, tree):
87 # this file gets imported by all the others,
88 # so we need to hide the imports to avoid a cyclic imports
89 from htsworkflow.pipelines import firecrest
90 from htsworkflow.pipelines import ipar
91 from htsworkflow.pipelines import bustard
92 from htsworkflow.pipelines import gerald
94 tag = tree.tag.lower()
95 if tag != PipelineRun.PIPELINE_RUN.lower():
96 raise ValueError('Pipeline Run Expecting %s got %s' % (
97 PipelineRun.PIPELINE_RUN, tag))
99 tag = element.tag.lower()
100 if tag == PipelineRun.FLOWCELL_ID.lower():
101 self._flowcell_id = element.text
102 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
103 # you should only have Firecrest or IPAR, never both of them.
104 elif tag == firecrest.Firecrest.FIRECREST.lower():
105 self.image_analysis = firecrest.Firecrest(xml=element)
106 elif tag == ipar.IPAR.IPAR.lower():
107 self.image_analysis = ipar.IPAR(xml=element)
108 elif tag == bustard.Bustard.BUSTARD.lower():
109 self.bustard = bustard.Bustard(xml=element)
110 elif tag == gerald.Gerald.GERALD.lower():
111 self.gerald = gerald.Gerald(xml=element)
113 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
115 def _get_run_name(self):
117 Given a run tuple, find the latest date and use that as our name
119 if self._name is None:
120 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
121 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
122 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
124 name = property(_get_run_name)
126 def save(self, destdir=None):
129 logging.info("Saving run report "+ self.name)
130 xml = self.get_elements()
132 dest_pathname = os.path.join(destdir, self.name)
133 ElementTree.ElementTree(xml).write(dest_pathname)
135 def load(self, filename):
136 logging.info("Loading run report from " + filename)
137 tree = ElementTree.parse(filename).getroot()
138 self.set_elements(tree)
140 def load_pipeline_run_xml(pathname):
142 Load and instantiate a Pipeline run from a run xml file
145 - `pathname` : location of an run xml file
147 :Returns: initialized PipelineRun object
149 tree = ElementTree.parse(pathname).getroot()
150 run = PipelineRun(xml=tree)
153 def get_runs(runfolder):
155 Search through a run folder for all the various sub component runs
156 and then return a PipelineRun for each different combination.
158 For example if there are two different GERALD runs, this will
159 generate two different PipelineRun objects, that differ
160 in there gerald component.
162 from htsworkflow.pipelines import firecrest
163 from htsworkflow.pipelines import ipar
164 from htsworkflow.pipelines import bustard
165 from htsworkflow.pipelines import gerald
167 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
168 logging.info("Looking for bustard directories in %s" % (pathname,))
169 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
170 # RTA BaseCalls looks enough like Bustard.
171 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
172 for bustard_pathname in bustard_dirs:
173 logging.info("Found bustard directory %s" % (bustard_pathname,))
174 b = bustard.bustard(bustard_pathname)
175 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
176 logging.info("Looking for gerald directories in %s" % (pathname,))
177 for gerald_pathname in glob(gerald_glob):
178 logging.info("Found gerald directory %s" % (gerald_pathname,))
180 g = gerald.gerald(gerald_pathname)
181 p = PipelineRun(runfolder)
182 p.image_analysis = image_analysis
187 logging.error("Ignoring " + str(e))
189 datadir = os.path.join(runfolder, 'Data')
191 logging.info('Searching for runs in ' + datadir)
193 # scan for firecrest directories
194 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
195 logging.info('Found firecrest in ' + datadir)
196 image_analysis = firecrest.firecrest(firecrest_pathname)
197 if image_analysis is None:
199 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
202 scan_post_image_analysis(
203 runs, runfolder, image_analysis, firecrest_pathname
205 # scan for IPAR directories
206 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
207 # The Intensities directory from the RTA software looks a lot like IPAR
208 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
209 for ipar_pathname in ipar_dirs:
210 logging.info('Found ipar directories in ' + datadir)
211 image_analysis = ipar.ipar(ipar_pathname)
212 if image_analysis is None:
214 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
217 scan_post_image_analysis(
218 runs, runfolder, image_analysis, ipar_pathname
223 def get_specific_run(gerald_dir):
225 Given a gerald directory, construct a PipelineRun out of its parents
227 Basically this allows specifying a particular run instead of the previous
228 get_runs which scans a runfolder for various combinations of
229 firecrest/ipar/bustard/gerald runs.
231 from htsworkflow.pipelines import firecrest
232 from htsworkflow.pipelines import ipar
233 from htsworkflow.pipelines import bustard
234 from htsworkflow.pipelines import gerald
236 gerald_dir = os.path.expanduser(gerald_dir)
237 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
238 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
240 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
242 logging.info('--- use-run detected options ---')
243 logging.info('runfolder: %s' % (runfolder_dir,))
244 logging.info('image_dir: %s' % (image_dir,))
245 logging.info('bustard_dir: %s' % (bustard_dir,))
246 logging.info('gerald_dir: %s' % (gerald_dir,))
248 # find our processed image dir
250 # split into parent, and leaf directory
251 # leaf directory should be an IPAR or firecrest directory
252 data_dir, short_image_dir = os.path.split(image_dir)
253 logging.info('data_dir: %s' % (data_dir,))
254 logging.info('short_iamge_dir: %s' %(short_image_dir,))
256 # guess which type of image processing directory we have by looking
257 # in the leaf directory name
258 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
259 image_run = firecrest.firecrest(image_dir)
260 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
261 image_run = ipar.ipar(image_dir)
262 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
263 image_run = ipar.ipar(image_dir)
265 # if we din't find a run, report the error and return
266 if image_run is None:
267 msg = '%s does not contain an image processing step' % (image_dir,)
271 # find our base calling
272 base_calling_run = bustard.bustard(bustard_dir)
273 if base_calling_run is None:
274 logging.error('%s does not contain a bustard run' % (bustard_dir,))
278 gerald_run = gerald.gerald(gerald_dir)
279 if gerald_run is None:
280 logging.error('%s does not contain a gerald run' % (gerald_dir,))
283 p = PipelineRun(runfolder_dir)
284 p.image_analysis = image_run
285 p.bustard = base_calling_run
286 p.gerald = gerald_run
288 logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
291 def extract_run_parameters(runs):
293 Search through runfolder_path for various runs and grab their parameters
298 def summarize_mapped_reads(genome_map, mapped_reads):
300 Summarize per chromosome reads into a genome count
301 But handle spike-in/contamination symlinks seperately.
303 summarized_reads = {}
306 for k, v in mapped_reads.items():
307 path, k = os.path.split(k)
308 if len(path) > 0 and not genome_map.has_key(path):
312 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
313 summarized_reads[genome] = genome_reads
314 return summarized_reads
316 def summarize_lane(gerald, lane_id):
318 summary_results = gerald.summary.lane_results
319 for end in range(len(summary_results)):
320 eland_result = gerald.eland_results.results[end][lane_id]
321 report.append("Sample name %s" % (eland_result.sample_name))
322 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
323 cluster = summary_results[end][eland_result.lane_id].cluster
324 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
325 report.append("Total Reads: %d" % (eland_result.reads))
327 if hasattr(eland_result, 'match_codes'):
328 mc = eland_result.match_codes
330 nm_percent = float(nm)/eland_result.reads * 100
332 qc_percent = float(qc)/eland_result.reads * 100
334 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
335 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
336 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
337 (mc['U0'], mc['U1'], mc['U2']))
338 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
339 (mc['R0'], mc['R1'], mc['R2']))
341 if hasattr(eland_result, 'genome_map'):
342 report.append("Mapped Reads")
343 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
344 for name, counts in mapped_reads.items():
345 report.append(" %s: %d" % (name, counts))
350 def summary_report(runs):
352 Summarize cluster numbers and mapped read counts for a runfolder
357 report.append('Summary for %s' % (run.name,))
359 eland_keys = run.gerald.eland_results.results[0].keys()
360 eland_keys.sort(alphanum)
362 for lane_id in eland_keys:
363 report.extend(summarize_lane(run.gerald, lane_id))
366 return os.linesep.join(report)
368 def is_compressed(filename):
369 if os.path.splitext(filename)[1] == ".gz":
371 elif os.path.splitext(filename)[1] == '.bz2':
376 def extract_results(runs, output_base_dir=None):
377 if output_base_dir is None:
378 output_base_dir = os.getcwd()
381 result_dir = os.path.join(output_base_dir, r.flowcell_id)
382 logging.info("Using %s as result directory" % (result_dir,))
383 if not os.path.exists(result_dir):
387 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
388 logging.info("Filling in %s" % (cycle,))
389 cycle_dir = os.path.join(result_dir, cycle)
390 if os.path.exists(cycle_dir):
391 logging.error("%s already exists, not overwriting" % (cycle_dir,))
396 # copy stuff out of the main run
403 summary_path = os.path.join(r.gerald.pathname, 'Summary.htm')
404 if os.path.exists(summary_path):
405 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
406 shutil.copy(summary_path, cycle_dir)
408 logging.info('Summary file %s was not found' % (summary_path,))
413 # check for g.pathname/Temp a new feature of 1.1rc1
414 scores_path = g.pathname
415 scores_path_temp = os.path.join(scores_path, 'Temp')
416 if os.path.isdir(scores_path_temp):
417 scores_path = scores_path_temp
419 # hopefully we have a directory that contains s_*_score files
420 for f in os.listdir(scores_path):
421 if re.match('.*_score.txt', f):
422 score_files.append(f)
424 tar_cmd = ['/bin/tar', 'c'] + score_files
425 bzip_cmd = [ 'bzip2', '-9', '-c' ]
426 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
427 tar_dest = open(tar_dest_name, 'w')
428 logging.info("Compressing score files from %s" % (scores_path,))
429 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
430 logging.info("Running bzip2: " + " ".join(bzip_cmd))
431 logging.info("Writing to %s" %(tar_dest_name))
434 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
436 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
439 # copy & bzip eland files
440 for lanes_dictionary in g.eland_results.results:
441 for eland_lane in lanes_dictionary.values():
442 source_name = eland_lane.pathname
443 path, name = os.path.split(eland_lane.pathname)
444 dest_name = os.path.join(cycle_dir, name)
445 logging.info("Saving eland file %s to %s" % \
446 (source_name, dest_name))
448 if is_compressed(name):
449 logging.info('Already compressed, Saving to %s' % (dest_name, ))
450 shutil.copy(source_name, dest_name)
454 args = ['bzip2', '-9', '-c', source_name]
455 logging.info('Running: %s' % ( " ".join(args) ))
456 bzip_dest = open(dest_name, 'w')
457 bzip = subprocess.Popen(args, stdout=bzip_dest)
458 logging.info('Saving to %s' % (dest_name, ))
461 def rm_list(files, dry_run=True):
463 if os.path.exists(f):
464 logging.info('deleting %s' % (f,))
471 logging.warn("%s doesn't exist."% (f,))
473 def clean_runs(runs, dry_run=True):
475 Clean up run folders to optimize for compression.
478 logging.info('In dry-run mode')
481 logging.info('Cleaninging %s' % (run.pathname,))
483 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
484 rm_list(runlogs, dry_run)
486 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
487 rm_list(pipeline_logs, dry_run)
489 # rm NetCopy.log? Isn't this robocopy?
490 logs = glob(os.path.join(run.pathname, '*.log'))
491 rm_list(logs, dry_run)
494 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
495 rm_list(calibration_dir, dry_run)
497 logging.info("Cleaning images")
498 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
499 rm_list(image_dirs, dry_run)
500 # cd Data/C1-*_Firecrest*
501 logging.info("Cleaning intermediate files")
502 # make clean_intermediate
503 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
504 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
505 cwd=run.image_analysis.pathname,)