2 Core information needed to inspect a runfolder.
16 from xml.etree import ElementTree
17 except ImportError, e:
18 from elementtree import ElementTree
20 LOGGER = logging.getLogger(__name__)
22 EUROPEAN_STRPTIME = "%d-%m-%Y"
23 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
24 VERSION_RE = "([0-9\.]+)"
25 USER_RE = "([a-zA-Z0-9]+)"
26 LANES_PER_FLOWCELL = 8
27 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
29 from htsworkflow.util.alphanum import alphanum
30 from htsworkflow.util.ethelp import indent, flatten
31 from htsworkflow.util.queuecommands import QueueCommands
33 from htsworkflow.pipelines import srf
35 class PipelineRun(object):
37 Capture "interesting" information about a pipeline run
40 PIPELINE_RUN = 'PipelineRun'
41 FLOWCELL_ID = 'FlowcellID'
43 def __init__(self, pathname=None, flowcell_id=None, xml=None):
44 if pathname is not None:
45 self.pathname = os.path.normpath(pathname)
49 self._flowcell_id = flowcell_id
51 self.image_analysis = None
56 self.set_elements(xml)
58 def _get_flowcell_id(self):
60 if self._flowcell_id is None:
61 config_dir = os.path.join(self.pathname, 'Config')
62 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
63 if os.path.exists(flowcell_id_path):
64 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
65 self._flowcell_id = flowcell_id_tree.findtext('Text')
67 path_fields = self.pathname.split('_')
68 if len(path_fields) > 0:
69 # guessing last element of filename
70 self._flowcell_id = path_fields[-1]
72 self._flowcell_id = 'unknown'
75 "Flowcell id was not found, guessing %s" % (
78 return self._flowcell_id
79 flowcell_id = property(_get_flowcell_id)
81 def _get_runfolder_name(self):
82 if self.gerald is None:
85 return self.gerald.runfolder_name
86 runfolder_name = property(_get_runfolder_name)
88 def get_elements(self):
90 make one master xml file from all of our sub-components.
92 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
93 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
94 flowcell.text = self.flowcell_id
95 root.append(self.image_analysis.get_elements())
96 root.append(self.bustard.get_elements())
97 root.append(self.gerald.get_elements())
100 def set_elements(self, tree):
101 # this file gets imported by all the others,
102 # so we need to hide the imports to avoid a cyclic imports
103 from htsworkflow.pipelines import firecrest
104 from htsworkflow.pipelines import ipar
105 from htsworkflow.pipelines import bustard
106 from htsworkflow.pipelines import gerald
108 tag = tree.tag.lower()
109 if tag != PipelineRun.PIPELINE_RUN.lower():
110 raise ValueError('Pipeline Run Expecting %s got %s' % (
111 PipelineRun.PIPELINE_RUN, tag))
113 tag = element.tag.lower()
114 if tag == PipelineRun.FLOWCELL_ID.lower():
115 self._flowcell_id = element.text
116 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
117 # you should only have Firecrest or IPAR, never both of them.
118 elif tag == firecrest.Firecrest.FIRECREST.lower():
119 self.image_analysis = firecrest.Firecrest(xml=element)
120 elif tag == ipar.IPAR.IPAR.lower():
121 self.image_analysis = ipar.IPAR(xml=element)
122 elif tag == bustard.Bustard.BUSTARD.lower():
123 self.bustard = bustard.Bustard(xml=element)
124 elif tag == gerald.Gerald.GERALD.lower():
125 self.gerald = gerald.Gerald(xml=element)
126 elif tag == gerald.CASAVA.GERALD.lower():
127 self.gerald = gerald.CASAVA(xml=element)
129 LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
131 def _get_run_name(self):
133 Given a run tuple, find the latest date and use that as our name
135 if self._name is None:
136 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
137 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
138 self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
140 name = property(_get_run_name)
142 def save(self, destdir=None):
145 LOGGER.info("Saving run report " + self.name)
146 xml = self.get_elements()
148 dest_pathname = os.path.join(destdir, self.name)
149 ElementTree.ElementTree(xml).write(dest_pathname)
151 def load(self, filename):
152 LOGGER.info("Loading run report from " + filename)
153 tree = ElementTree.parse(filename).getroot()
154 self.set_elements(tree)
156 def load_pipeline_run_xml(pathname):
158 Load and instantiate a Pipeline run from a run xml file
161 - `pathname` : location of an run xml file
163 :Returns: initialized PipelineRun object
165 tree = ElementTree.parse(pathname).getroot()
166 run = PipelineRun(xml=tree)
169 def get_runs(runfolder, flowcell_id=None):
171 Search through a run folder for all the various sub component runs
172 and then return a PipelineRun for each different combination.
174 For example if there are two different GERALD runs, this will
175 generate two different PipelineRun objects, that differ
176 in there gerald component.
178 from htsworkflow.pipelines import firecrest
179 from htsworkflow.pipelines import ipar
180 from htsworkflow.pipelines import bustard
181 from htsworkflow.pipelines import gerald
183 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
184 LOGGER.info("Looking for bustard directories in %s" % (pathname,))
185 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
186 # RTA BaseCalls looks enough like Bustard.
187 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
188 for bustard_pathname in bustard_dirs:
189 LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
190 b = bustard.bustard(bustard_pathname)
191 build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
193 build_aligned_runs(image_analysis, runs, b, datadir, runfolder)
195 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
196 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
197 LOGGER.info("Looking for gerald directories in %s" % (pathname,))
198 for gerald_pathname in glob(gerald_glob):
199 LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
201 g = gerald.gerald(gerald_pathname)
202 p = PipelineRun(runfolder, flowcell_id)
204 p.image_analysis = image_analysis
209 LOGGER.error("Ignoring " + str(e))
212 def build_aligned_runs(image_analysis, runs, b, datadir, runfolder):
213 aligned_glob = os.path.join(runfolder, 'Aligned*')
214 for aligned in glob(aligned_glob):
215 LOGGER.info("Found aligned directory %s" % (aligned,))
217 g = gerald.HiSeq(aligned)
218 p = PipelineRun(runfolder, flowcell_id)
220 p.image_analysis = image_analysis
225 LOGGER.error("Ignoring " + str(e))
227 datadir = os.path.join(runfolder, 'Data')
229 LOGGER.info('Searching for runs in ' + datadir)
231 # scan for firecrest directories
232 for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
233 LOGGER.info('Found firecrest in ' + datadir)
234 image_analysis = firecrest.firecrest(firecrest_pathname)
235 if image_analysis is None:
237 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
240 scan_post_image_analysis(
241 runs, runfolder, datadir, image_analysis, firecrest_pathname
243 # scan for IPAR directories
244 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
245 # The Intensities directory from the RTA software looks a lot like IPAR
246 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
247 for ipar_pathname in ipar_dirs:
248 LOGGER.info('Found ipar directories in ' + datadir)
249 image_analysis = ipar.ipar(ipar_pathname)
250 if image_analysis is None:
252 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
255 scan_post_image_analysis(
256 runs, runfolder, datadir, image_analysis, ipar_pathname
261 def get_specific_run(gerald_dir):
263 Given a gerald directory, construct a PipelineRun out of its parents
265 Basically this allows specifying a particular run instead of the previous
266 get_runs which scans a runfolder for various combinations of
267 firecrest/ipar/bustard/gerald runs.
269 from htsworkflow.pipelines import firecrest
270 from htsworkflow.pipelines import ipar
271 from htsworkflow.pipelines import bustard
272 from htsworkflow.pipelines import gerald
274 gerald_dir = os.path.expanduser(gerald_dir)
275 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
276 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
278 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
280 LOGGER.info('--- use-run detected options ---')
281 LOGGER.info('runfolder: %s' % (runfolder_dir,))
282 LOGGER.info('image_dir: %s' % (image_dir,))
283 LOGGER.info('bustard_dir: %s' % (bustard_dir,))
284 LOGGER.info('gerald_dir: %s' % (gerald_dir,))
286 # find our processed image dir
288 # split into parent, and leaf directory
289 # leaf directory should be an IPAR or firecrest directory
290 data_dir, short_image_dir = os.path.split(image_dir)
291 LOGGER.info('data_dir: %s' % (data_dir,))
292 LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
294 # guess which type of image processing directory we have by looking
295 # in the leaf directory name
296 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
297 image_run = firecrest.firecrest(image_dir)
298 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
299 image_run = ipar.ipar(image_dir)
300 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
301 image_run = ipar.ipar(image_dir)
303 # if we din't find a run, report the error and return
304 if image_run is None:
305 msg = '%s does not contain an image processing step' % (image_dir,)
309 # find our base calling
310 base_calling_run = bustard.bustard(bustard_dir)
311 if base_calling_run is None:
312 LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
316 gerald_run = gerald.gerald(gerald_dir)
317 if gerald_run is None:
318 LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
321 p = PipelineRun(runfolder_dir)
322 p.image_analysis = image_run
323 p.bustard = base_calling_run
324 p.gerald = gerald_run
326 LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
329 def extract_run_parameters(runs):
331 Search through runfolder_path for various runs and grab their parameters
336 def summarize_mapped_reads(genome_map, mapped_reads):
338 Summarize per chromosome reads into a genome count
339 But handle spike-in/contamination symlinks seperately.
341 summarized_reads = {}
344 for k, v in mapped_reads.items():
345 path, k = os.path.split(k)
346 if len(path) > 0 and not genome_map.has_key(path):
350 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
351 summarized_reads[genome] = genome_reads
352 return summarized_reads
354 def summarize_lane(gerald, lane_id):
356 summary_results = gerald.summary.lane_results
357 for end in range(len(summary_results)):
358 eland_result = gerald.eland_results.results[end][lane_id]
359 report.append("Sample name %s" % (eland_result.sample_name))
360 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
361 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
362 cluster = summary_results[end][eland_result.lane_id].cluster
363 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
364 report.append("Total Reads: %d" % (eland_result.reads))
366 if hasattr(eland_result, 'match_codes'):
367 mc = eland_result.match_codes
369 nm_percent = float(nm) / eland_result.reads * 100
371 qc_percent = float(qc) / eland_result.reads * 100
373 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
374 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
375 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
376 (mc['U0'], mc['U1'], mc['U2']))
377 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
378 (mc['R0'], mc['R1'], mc['R2']))
380 if hasattr(eland_result, 'genome_map'):
381 report.append("Mapped Reads")
382 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
383 for name, counts in mapped_reads.items():
384 report.append(" %s: %d" % (name, counts))
389 def summary_report(runs):
391 Summarize cluster numbers and mapped read counts for a runfolder
396 report.append('Summary for %s' % (run.name,))
398 eland_keys = run.gerald.eland_results.results[0].keys()
399 eland_keys.sort(alphanum)
401 for lane_id in eland_keys:
402 report.extend(summarize_lane(run.gerald, lane_id))
405 return os.linesep.join(report)
407 def is_compressed(filename):
408 if os.path.splitext(filename)[1] == ".gz":
410 elif os.path.splitext(filename)[1] == '.bz2':
415 def save_flowcell_reports(data_dir, cycle_dir):
417 Save the flowcell quality reports
419 data_dir = os.path.abspath(data_dir)
420 status_file = os.path.join(data_dir, 'Status.xml')
421 reports_dir = os.path.join(data_dir, 'reports')
422 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
423 if os.path.exists(reports_dir):
424 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
425 if os.path.exists(status_file):
426 cmd_list.extend(['Status.xml', 'Status.xsl'])
427 LOGGER.info("Saving reports from " + reports_dir)
430 q = QueueCommands([" ".join(cmd_list)])
435 def save_summary_file(pipeline, cycle_dir):
437 gerald_object = pipeline.gerald
438 gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
439 status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
440 if os.path.exists(gerald_summary):
441 LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
442 shutil.copy(gerald_summary, cycle_dir)
443 elif os.path.exists(status_files_summary):
444 LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
445 shutil.copy(status_files_summary, cycle_dir)
447 LOGGER.info('Summary file %s was not found' % (summary_path,))
449 def save_ivc_plot(bustard_object, cycle_dir):
451 Save the IVC page and its supporting images
453 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
454 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
455 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
457 plot_target_path = os.path.join(cycle_dir, 'Plots')
459 if os.path.exists(plot_html):
460 LOGGER.debug("Saving %s" % (plot_html,))
461 LOGGER.debug("Saving %s" % (plot_images,))
462 shutil.copy(plot_html, cycle_dir)
463 if not os.path.exists(plot_target_path):
464 os.mkdir(plot_target_path)
465 for plot_file in glob(plot_images):
466 shutil.copy(plot_file, plot_target_path)
468 LOGGER.warning('Missing IVC.html file, not archiving')
471 def compress_score_files(bustard_object, cycle_dir):
473 Compress score files into our result directory
475 # check for g.pathname/Temp a new feature of 1.1rc1
476 scores_path = bustard_object.pathname
477 scores_path_temp = os.path.join(scores_path, 'Temp')
478 if os.path.isdir(scores_path_temp):
479 scores_path = scores_path_temp
481 # hopefully we have a directory that contains s_*_score files
483 for f in os.listdir(scores_path):
484 if re.match('.*_score.txt', f):
485 score_files.append(f)
487 tar_cmd = ['tar', 'c'] + score_files
488 bzip_cmd = [ 'bzip2', '-9', '-c' ]
489 tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
490 tar_dest = open(tar_dest_name, 'w')
491 LOGGER.info("Compressing score files from %s" % (scores_path,))
492 LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
493 LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
494 LOGGER.info("Writing to %s" % (tar_dest_name,))
497 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
499 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
503 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
505 Compress eland result files into the archive directory
507 # copy & bzip eland files
510 for lanes_dictionary in gerald_object.eland_results.results:
511 for eland_lane in lanes_dictionary.values():
512 source_name = eland_lane.pathname
513 if source_name is None:
515 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
517 path, name = os.path.split(source_name)
518 dest_name = os.path.join(cycle_dir, name)
519 LOGGER.info("Saving eland file %s to %s" % \
520 (source_name, dest_name))
522 if is_compressed(name):
523 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
524 shutil.copy(source_name, dest_name)
528 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
529 bz_commands.append(" ".join(args))
530 #LOGGER.info('Running: %s' % ( " ".join(args) ))
531 #bzip_dest = open(dest_name, 'w')
532 #bzip = subprocess.Popen(args, stdout=bzip_dest)
533 #LOGGER.info('Saving to %s' % (dest_name, ))
536 if len(bz_commands) > 0:
537 q = QueueCommands(bz_commands, num_jobs)
541 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
543 Iterate over runfolders in runs extracting the most useful information.
544 * run parameters (in run-*.xml)
548 * srf files (raw sequence & qualities)
550 if output_base_dir is None:
551 output_base_dir = os.getcwd()
554 result_dir = os.path.join(output_base_dir, r.flowcell_id)
555 LOGGER.info("Using %s as result directory" % (result_dir,))
556 if not os.path.exists(result_dir):
560 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
561 LOGGER.info("Filling in %s" % (cycle,))
562 cycle_dir = os.path.join(result_dir, cycle)
563 cycle_dir = os.path.abspath(cycle_dir)
564 if os.path.exists(cycle_dir):
565 LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
573 # save illumina flowcell status report
574 save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
576 # save stuff from bustard
578 save_ivc_plot(r.bustard, cycle_dir)
580 # build base call saving commands
583 for lane in range(1, 9):
584 lane_parameters = r.gerald.lanes.get(lane, None)
585 if lane_parameters is not None and lane_parameters.analysis != 'none':
588 run_name = srf.pathname_to_run_name(r.pathname)
590 LOGGER.info("Raw Format is: %s" % (raw_format, ))
591 if raw_format == 'fastq':
592 rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
593 LOGGER.info("raw data = %s" % (rawpath,))
594 srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
595 elif raw_format == 'qseq':
596 seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
597 elif raw_format == 'srf':
598 seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
600 raise ValueError('Unknown --raw-format=%s' % (raw_format))
601 srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
603 # save stuff from GERALD
604 # copy stuff out of the main run
608 save_summary_file(r, cycle_dir)
610 # compress eland result files
611 compress_eland_results(g, cycle_dir, num_jobs)
613 # md5 all the compressed files once we're done
614 md5_commands = srf.make_md5_commands(cycle_dir)
615 srf.run_commands(cycle_dir, md5_commands, num_jobs)
617 def rm_list(files, dry_run=True):
619 if os.path.exists(f):
620 LOGGER.info('deleting %s' % (f,))
627 LOGGER.warn("%s doesn't exist." % (f,))
629 def clean_runs(runs, dry_run=True):
631 Clean up run folders to optimize for compression.
634 LOGGER.info('In dry-run mode')
637 LOGGER.info('Cleaninging %s' % (run.pathname,))
639 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
640 rm_list(runlogs, dry_run)
642 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
643 rm_list(pipeline_logs, dry_run)
645 # rm NetCopy.log? Isn't this robocopy?
646 logs = glob(os.path.join(run.pathname, '*.log'))
647 rm_list(logs, dry_run)
650 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
651 rm_list(calibration_dir, dry_run)
653 LOGGER.info("Cleaning images")
654 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
655 rm_list(image_dirs, dry_run)
657 LOGGER.info("Cleaning ReadPrep*")
658 read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
659 rm_list(read_prep_dirs, dry_run)
661 LOGGER.info("Cleaning Thubmnail_images")
662 thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
663 rm_list(thumbnail_dirs, dry_run)
665 # make clean_intermediate
666 logging.info("Cleaning intermediate files")
667 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
668 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
669 cwd=run.image_analysis.pathname,)