2 Core information needed to inspect a runfolder.
16 from xml.etree import ElementTree
17 except ImportError, e:
18 from elementtree import ElementTree
20 EUROPEAN_STRPTIME = "%d-%m-%Y"
21 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
22 VERSION_RE = "([0-9\.]+)"
23 USER_RE = "([a-zA-Z0-9]+)"
24 LANES_PER_FLOWCELL = 8
25 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
27 from htsworkflow.util.alphanum import alphanum
28 from htsworkflow.util.ethelp import indent, flatten
29 from htsworkflow.util.queuecommands import QueueCommands
31 from htsworkflow.pipelines import srf
33 class PipelineRun(object):
35 Capture "interesting" information about a pipeline run
38 PIPELINE_RUN = 'PipelineRun'
39 FLOWCELL_ID = 'FlowcellID'
41 def __init__(self, pathname=None, xml=None):
42 if pathname is not None:
43 self.pathname = os.path.normpath(pathname)
47 self._flowcell_id = None
48 self.image_analysis = None
53 self.set_elements(xml)
55 def _get_flowcell_id(self):
57 if self._flowcell_id is None:
58 config_dir = os.path.join(self.pathname, 'Config')
59 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
60 if os.path.exists(flowcell_id_path):
61 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
62 self._flowcell_id = flowcell_id_tree.findtext('Text')
64 path_fields = self.pathname.split('_')
65 if len(path_fields) > 0:
66 # guessing last element of filename
67 flowcell_id = path_fields[-1]
69 flowcell_id = 'unknown'
72 "Flowcell id was not found, guessing %s" % (
74 self._flowcell_id = flowcell_id
75 return self._flowcell_id
76 flowcell_id = property(_get_flowcell_id)
78 def _get_runfolder_name(self):
79 if self.gerald is None:
82 return self.gerald.runfolder_name
83 runfolder_name = property(_get_runfolder_name)
85 def get_elements(self):
87 make one master xml file from all of our sub-components.
89 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
90 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
91 flowcell.text = self.flowcell_id
92 root.append(self.image_analysis.get_elements())
93 root.append(self.bustard.get_elements())
94 root.append(self.gerald.get_elements())
97 def set_elements(self, tree):
98 # this file gets imported by all the others,
99 # so we need to hide the imports to avoid a cyclic imports
100 from htsworkflow.pipelines import firecrest
101 from htsworkflow.pipelines import ipar
102 from htsworkflow.pipelines import bustard
103 from htsworkflow.pipelines import gerald
105 tag = tree.tag.lower()
106 if tag != PipelineRun.PIPELINE_RUN.lower():
107 raise ValueError('Pipeline Run Expecting %s got %s' % (
108 PipelineRun.PIPELINE_RUN, tag))
110 tag = element.tag.lower()
111 if tag == PipelineRun.FLOWCELL_ID.lower():
112 self._flowcell_id = element.text
113 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
114 # you should only have Firecrest or IPAR, never both of them.
115 elif tag == firecrest.Firecrest.FIRECREST.lower():
116 self.image_analysis = firecrest.Firecrest(xml=element)
117 elif tag == ipar.IPAR.IPAR.lower():
118 self.image_analysis = ipar.IPAR(xml=element)
119 elif tag == bustard.Bustard.BUSTARD.lower():
120 self.bustard = bustard.Bustard(xml=element)
121 elif tag == gerald.Gerald.GERALD.lower():
122 self.gerald = gerald.Gerald(xml=element)
124 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
126 def _get_run_name(self):
128 Given a run tuple, find the latest date and use that as our name
130 if self._name is None:
131 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
132 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
133 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
135 name = property(_get_run_name)
137 def save(self, destdir=None):
140 logging.info("Saving run report "+ self.name)
141 xml = self.get_elements()
143 dest_pathname = os.path.join(destdir, self.name)
144 ElementTree.ElementTree(xml).write(dest_pathname)
146 def load(self, filename):
147 logging.info("Loading run report from " + filename)
148 tree = ElementTree.parse(filename).getroot()
149 self.set_elements(tree)
151 def load_pipeline_run_xml(pathname):
153 Load and instantiate a Pipeline run from a run xml file
156 - `pathname` : location of an run xml file
158 :Returns: initialized PipelineRun object
160 tree = ElementTree.parse(pathname).getroot()
161 run = PipelineRun(xml=tree)
164 def get_runs(runfolder):
166 Search through a run folder for all the various sub component runs
167 and then return a PipelineRun for each different combination.
169 For example if there are two different GERALD runs, this will
170 generate two different PipelineRun objects, that differ
171 in there gerald component.
173 from htsworkflow.pipelines import firecrest
174 from htsworkflow.pipelines import ipar
175 from htsworkflow.pipelines import bustard
176 from htsworkflow.pipelines import gerald
178 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
179 logging.info("Looking for bustard directories in %s" % (pathname,))
180 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
181 # RTA BaseCalls looks enough like Bustard.
182 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
183 for bustard_pathname in bustard_dirs:
184 logging.info("Found bustard directory %s" % (bustard_pathname,))
185 b = bustard.bustard(bustard_pathname)
186 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
187 logging.info("Looking for gerald directories in %s" % (pathname,))
188 for gerald_pathname in glob(gerald_glob):
189 logging.info("Found gerald directory %s" % (gerald_pathname,))
191 g = gerald.gerald(gerald_pathname)
192 p = PipelineRun(runfolder)
193 p.image_analysis = image_analysis
198 logging.error("Ignoring " + str(e))
200 datadir = os.path.join(runfolder, 'Data')
202 logging.info('Searching for runs in ' + datadir)
204 # scan for firecrest directories
205 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
206 logging.info('Found firecrest in ' + datadir)
207 image_analysis = firecrest.firecrest(firecrest_pathname)
208 if image_analysis is None:
210 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
213 scan_post_image_analysis(
214 runs, runfolder, image_analysis, firecrest_pathname
216 # scan for IPAR directories
217 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
218 # The Intensities directory from the RTA software looks a lot like IPAR
219 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
220 for ipar_pathname in ipar_dirs:
221 logging.info('Found ipar directories in ' + datadir)
222 image_analysis = ipar.ipar(ipar_pathname)
223 if image_analysis is None:
225 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
228 scan_post_image_analysis(
229 runs, runfolder, image_analysis, ipar_pathname
234 def get_specific_run(gerald_dir):
236 Given a gerald directory, construct a PipelineRun out of its parents
238 Basically this allows specifying a particular run instead of the previous
239 get_runs which scans a runfolder for various combinations of
240 firecrest/ipar/bustard/gerald runs.
242 from htsworkflow.pipelines import firecrest
243 from htsworkflow.pipelines import ipar
244 from htsworkflow.pipelines import bustard
245 from htsworkflow.pipelines import gerald
247 gerald_dir = os.path.expanduser(gerald_dir)
248 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
249 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
251 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
253 logging.info('--- use-run detected options ---')
254 logging.info('runfolder: %s' % (runfolder_dir,))
255 logging.info('image_dir: %s' % (image_dir,))
256 logging.info('bustard_dir: %s' % (bustard_dir,))
257 logging.info('gerald_dir: %s' % (gerald_dir,))
259 # find our processed image dir
261 # split into parent, and leaf directory
262 # leaf directory should be an IPAR or firecrest directory
263 data_dir, short_image_dir = os.path.split(image_dir)
264 logging.info('data_dir: %s' % (data_dir,))
265 logging.info('short_iamge_dir: %s' %(short_image_dir,))
267 # guess which type of image processing directory we have by looking
268 # in the leaf directory name
269 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
270 image_run = firecrest.firecrest(image_dir)
271 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
272 image_run = ipar.ipar(image_dir)
273 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
274 image_run = ipar.ipar(image_dir)
276 # if we din't find a run, report the error and return
277 if image_run is None:
278 msg = '%s does not contain an image processing step' % (image_dir,)
282 # find our base calling
283 base_calling_run = bustard.bustard(bustard_dir)
284 if base_calling_run is None:
285 logging.error('%s does not contain a bustard run' % (bustard_dir,))
289 gerald_run = gerald.gerald(gerald_dir)
290 if gerald_run is None:
291 logging.error('%s does not contain a gerald run' % (gerald_dir,))
294 p = PipelineRun(runfolder_dir)
295 p.image_analysis = image_run
296 p.bustard = base_calling_run
297 p.gerald = gerald_run
299 logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
302 def extract_run_parameters(runs):
304 Search through runfolder_path for various runs and grab their parameters
309 def summarize_mapped_reads(genome_map, mapped_reads):
311 Summarize per chromosome reads into a genome count
312 But handle spike-in/contamination symlinks seperately.
314 summarized_reads = {}
317 for k, v in mapped_reads.items():
318 path, k = os.path.split(k)
319 if len(path) > 0 and not genome_map.has_key(path):
323 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
324 summarized_reads[genome] = genome_reads
325 return summarized_reads
327 def summarize_lane(gerald, lane_id):
329 summary_results = gerald.summary.lane_results
330 for end in range(len(summary_results)):
331 eland_result = gerald.eland_results.results[end][lane_id]
332 report.append("Sample name %s" % (eland_result.sample_name))
333 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
334 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
335 cluster = summary_results[end][eland_result.lane_id].cluster
336 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
337 report.append("Total Reads: %d" % (eland_result.reads))
339 if hasattr(eland_result, 'match_codes'):
340 mc = eland_result.match_codes
342 nm_percent = float(nm)/eland_result.reads * 100
344 qc_percent = float(qc)/eland_result.reads * 100
346 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
347 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
348 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
349 (mc['U0'], mc['U1'], mc['U2']))
350 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
351 (mc['R0'], mc['R1'], mc['R2']))
353 if hasattr(eland_result, 'genome_map'):
354 report.append("Mapped Reads")
355 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
356 for name, counts in mapped_reads.items():
357 report.append(" %s: %d" % (name, counts))
362 def summary_report(runs):
364 Summarize cluster numbers and mapped read counts for a runfolder
369 report.append('Summary for %s' % (run.name,))
371 eland_keys = run.gerald.eland_results.results[0].keys()
372 eland_keys.sort(alphanum)
374 for lane_id in eland_keys:
375 report.extend(summarize_lane(run.gerald, lane_id))
378 return os.linesep.join(report)
380 def is_compressed(filename):
381 if os.path.splitext(filename)[1] == ".gz":
383 elif os.path.splitext(filename)[1] == '.bz2':
388 def save_flowcell_reports(data_dir, cycle_dir):
390 Save the flowcell quality reports
392 data_dir = os.path.abspath(data_dir)
393 status_file = os.path.join(data_dir, 'Status.xml')
394 reports_dir = os.path.join(data_dir, 'reports')
395 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
396 if os.path.exists(reports_dir):
397 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
398 if os.path.exists(status_file):
399 cmd_list.extend(['Status.xml', 'Status.xsl'])
400 logging.info("Saving reports from " + reports_dir)
403 q = QueueCommands([" ".join(cmd_list)])
408 def save_summary_file(gerald_object, cycle_dir):
410 summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
411 if os.path.exists(summary_path):
412 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
413 shutil.copy(summary_path, cycle_dir)
415 logging.info('Summary file %s was not found' % (summary_path,))
417 def save_ivc_plot(bustard_object, cycle_dir):
419 Save the IVC page and its supporting images
421 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
422 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
423 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
425 plot_target_path = os.path.join(cycle_dir, 'Plots')
427 if os.path.exists(plot_html):
428 logging.debug("Saving %s" % ( plot_html, ))
429 logging.debug("Saving %s" % ( plot_images, ))
430 shutil.copy(plot_html, cycle_dir)
431 if not os.path.exists(plot_target_path):
432 os.mkdir(plot_target_path)
433 for plot_file in glob(plot_images):
434 shutil.copy(plot_file, plot_target_path)
436 logging.warning('Missing IVC.html file, not archiving')
439 def compress_score_files(bustard_object, cycle_dir):
441 Compress score files into our result directory
443 # check for g.pathname/Temp a new feature of 1.1rc1
444 scores_path = bustard_object.pathname
445 scores_path_temp = os.path.join(scores_path, 'Temp')
446 if os.path.isdir(scores_path_temp):
447 scores_path = scores_path_temp
449 # hopefully we have a directory that contains s_*_score files
451 for f in os.listdir(scores_path):
452 if re.match('.*_score.txt', f):
453 score_files.append(f)
455 tar_cmd = ['tar', 'c'] + score_files
456 bzip_cmd = [ 'bzip2', '-9', '-c' ]
457 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
458 tar_dest = open(tar_dest_name, 'w')
459 logging.info("Compressing score files from %s" % (scores_path,))
460 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
461 logging.info("Running bzip2: " + " ".join(bzip_cmd))
462 logging.info("Writing to %s" %(tar_dest_name,))
465 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
467 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
471 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
473 Compress eland result files into the archive directory
475 # copy & bzip eland files
478 for lanes_dictionary in gerald_object.eland_results.results:
479 for eland_lane in lanes_dictionary.values():
480 source_name = eland_lane.pathname
481 if source_name is None:
483 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
485 path, name = os.path.split(source_name)
486 dest_name = os.path.join(cycle_dir, name)
487 logging.info("Saving eland file %s to %s" % \
488 (source_name, dest_name))
490 if is_compressed(name):
491 logging.info('Already compressed, Saving to %s' % (dest_name, ))
492 shutil.copy(source_name, dest_name)
496 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
497 bz_commands.append(" ".join(args))
498 #logging.info('Running: %s' % ( " ".join(args) ))
499 #bzip_dest = open(dest_name, 'w')
500 #bzip = subprocess.Popen(args, stdout=bzip_dest)
501 #logging.info('Saving to %s' % (dest_name, ))
504 if len(bz_commands) > 0:
505 q = QueueCommands(bz_commands, num_jobs)
509 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
511 Iterate over runfolders in runs extracting the most useful information.
512 * run parameters (in run-*.xml)
516 * srf files (raw sequence & qualities)
518 if output_base_dir is None:
519 output_base_dir = os.getcwd()
522 result_dir = os.path.join(output_base_dir, r.flowcell_id)
523 logging.info("Using %s as result directory" % (result_dir,))
524 if not os.path.exists(result_dir):
528 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
529 logging.info("Filling in %s" % (cycle,))
530 cycle_dir = os.path.join(result_dir, cycle)
531 cycle_dir = os.path.abspath(cycle_dir)
532 if os.path.exists(cycle_dir):
533 logging.error("%s already exists, not overwriting" % (cycle_dir,))
541 # save illumina flowcell status report
542 save_flowcell_reports( os.path.join(r.image_analysis.pathname, '..'), cycle_dir )
544 # save stuff from bustard
546 save_ivc_plot(r.bustard, cycle_dir)
548 # build base call saving commands
551 for lane in range(1,9):
552 if r.gerald.lanes[lane].analysis != 'none':
555 run_name = srf.pathname_to_run_name(r.pathname)
556 srf_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
557 srf.run_commands(r.bustard.pathname, srf_cmds, num_jobs)
559 # save stuff from GERALD
560 # copy stuff out of the main run
564 save_summary_file(g, cycle_dir)
566 # compress eland result files
567 compress_eland_results(g, cycle_dir, num_jobs)
569 # md5 all the compressed files once we're done
570 md5_commands = srf.make_md5_commands(cycle_dir)
571 srf.run_commands(cycle_dir, md5_commands, num_jobs)
573 def rm_list(files, dry_run=True):
575 if os.path.exists(f):
576 logging.info('deleting %s' % (f,))
583 logging.warn("%s doesn't exist."% (f,))
585 def clean_runs(runs, dry_run=True):
587 Clean up run folders to optimize for compression.
590 logging.info('In dry-run mode')
593 logging.info('Cleaninging %s' % (run.pathname,))
595 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
596 rm_list(runlogs, dry_run)
598 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
599 rm_list(pipeline_logs, dry_run)
601 # rm NetCopy.log? Isn't this robocopy?
602 logs = glob(os.path.join(run.pathname, '*.log'))
603 rm_list(logs, dry_run)
606 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
607 rm_list(calibration_dir, dry_run)
609 logging.info("Cleaning images")
610 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
611 rm_list(image_dirs, dry_run)
612 # cd Data/C1-*_Firecrest*
613 logging.info("Cleaning intermediate files")
614 # make clean_intermediate
615 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
616 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
617 cwd=run.image_analysis.pathname,)