2 Core information needed to inspect a runfolder.
16 from xml.etree import ElementTree
17 except ImportError, e:
18 from elementtree import ElementTree
20 LOGGER = logging.getLogger(__name__)
22 EUROPEAN_STRPTIME = "%d-%m-%Y"
23 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
24 VERSION_RE = "([0-9\.]+)"
25 USER_RE = "([a-zA-Z0-9]+)"
26 LANES_PER_FLOWCELL = 8
27 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
29 from htsworkflow.util.alphanum import alphanum
30 from htsworkflow.util.ethelp import indent, flatten
31 from htsworkflow.util.queuecommands import QueueCommands
33 from htsworkflow.pipelines import srf
35 class PipelineRun(object):
37 Capture "interesting" information about a pipeline run
40 PIPELINE_RUN = 'PipelineRun'
41 FLOWCELL_ID = 'FlowcellID'
43 def __init__(self, pathname=None, flowcell_id=None, xml=None):
44 if pathname is not None:
45 self.pathname = os.path.normpath(pathname)
49 self._flowcell_id = flowcell_id
51 self.image_analysis = None
56 self.set_elements(xml)
58 def _get_flowcell_id(self):
60 if self._flowcell_id is None:
61 config_dir = os.path.join(self.pathname, 'Config')
62 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
63 if os.path.exists(flowcell_id_path):
64 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
65 self._flowcell_id = flowcell_id_tree.findtext('Text')
67 path_fields = self.pathname.split('_')
68 if len(path_fields) > 0:
69 # guessing last element of filename
70 self._flowcell_id = path_fields[-1]
72 self._flowcell_id = 'unknown'
75 "Flowcell id was not found, guessing %s" % (
78 return self._flowcell_id
79 flowcell_id = property(_get_flowcell_id)
81 def _get_runfolder_name(self):
82 if self.gerald is None:
85 return self.gerald.runfolder_name
86 runfolder_name = property(_get_runfolder_name)
88 def get_elements(self):
90 make one master xml file from all of our sub-components.
92 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
93 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
94 flowcell.text = self.flowcell_id
95 root.append(self.image_analysis.get_elements())
96 root.append(self.bustard.get_elements())
97 root.append(self.gerald.get_elements())
100 def set_elements(self, tree):
101 # this file gets imported by all the others,
102 # so we need to hide the imports to avoid a cyclic imports
103 from htsworkflow.pipelines import firecrest
104 from htsworkflow.pipelines import ipar
105 from htsworkflow.pipelines import bustard
106 from htsworkflow.pipelines import gerald
108 tag = tree.tag.lower()
109 if tag != PipelineRun.PIPELINE_RUN.lower():
110 raise ValueError('Pipeline Run Expecting %s got %s' % (
111 PipelineRun.PIPELINE_RUN, tag))
113 tag = element.tag.lower()
114 if tag == PipelineRun.FLOWCELL_ID.lower():
115 self._flowcell_id = element.text
116 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
117 # you should only have Firecrest or IPAR, never both of them.
118 elif tag == firecrest.Firecrest.FIRECREST.lower():
119 self.image_analysis = firecrest.Firecrest(xml=element)
120 elif tag == ipar.IPAR.IPAR.lower():
121 self.image_analysis = ipar.IPAR(xml=element)
122 elif tag == bustard.Bustard.BUSTARD.lower():
123 self.bustard = bustard.Bustard(xml=element)
124 elif tag == gerald.Gerald.GERALD.lower():
125 self.gerald = gerald.Gerald(xml=element)
127 LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
129 def _get_run_name(self):
131 Given a run tuple, find the latest date and use that as our name
133 if self._name is None:
134 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
135 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
136 self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
138 name = property(_get_run_name)
140 def save(self, destdir=None):
143 LOGGER.info("Saving run report " + self.name)
144 xml = self.get_elements()
146 dest_pathname = os.path.join(destdir, self.name)
147 ElementTree.ElementTree(xml).write(dest_pathname)
149 def load(self, filename):
150 LOGGER.info("Loading run report from " + filename)
151 tree = ElementTree.parse(filename).getroot()
152 self.set_elements(tree)
154 def load_pipeline_run_xml(pathname):
156 Load and instantiate a Pipeline run from a run xml file
159 - `pathname` : location of an run xml file
161 :Returns: initialized PipelineRun object
163 tree = ElementTree.parse(pathname).getroot()
164 run = PipelineRun(xml=tree)
167 def get_runs(runfolder, flowcell_id=None):
169 Search through a run folder for all the various sub component runs
170 and then return a PipelineRun for each different combination.
172 For example if there are two different GERALD runs, this will
173 generate two different PipelineRun objects, that differ
174 in there gerald component.
176 from htsworkflow.pipelines import firecrest
177 from htsworkflow.pipelines import ipar
178 from htsworkflow.pipelines import bustard
179 from htsworkflow.pipelines import gerald
181 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
182 LOGGER.info("Looking for bustard directories in %s" % (pathname,))
183 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
184 # RTA BaseCalls looks enough like Bustard.
185 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
186 for bustard_pathname in bustard_dirs:
187 LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
188 b = bustard.bustard(bustard_pathname)
189 build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
191 build_aligned_runs(image_analysis, runs, b, datadir, runfolder)
193 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
194 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
195 LOGGER.info("Looking for gerald directories in %s" % (pathname,))
196 for gerald_pathname in glob(gerald_glob):
197 LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
199 g = gerald.gerald(gerald_pathname)
200 p = PipelineRun(runfolder, flowcell_id)
202 p.image_analysis = image_analysis
207 LOGGER.error("Ignoring " + str(e))
210 def build_aligned_runs(image_analysis, runs, b, datadir, runfolder):
211 aligned_glob = os.path.join(runfolder, 'Aligned*')
212 for aligned in glob(aligned_glob):
213 LOGGER.info("Found aligned directory %s" % (aligned,))
215 g = gerald.gerald(aligned)
216 p = PipelineRun(runfolder, flowcell_id)
218 p.image_analysis = image_analysis
223 LOGGER.error("Ignoring " + str(e))
225 datadir = os.path.join(runfolder, 'Data')
227 LOGGER.info('Searching for runs in ' + datadir)
229 # scan for firecrest directories
230 for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
231 LOGGER.info('Found firecrest in ' + datadir)
232 image_analysis = firecrest.firecrest(firecrest_pathname)
233 if image_analysis is None:
235 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
238 scan_post_image_analysis(
239 runs, runfolder, datadir, image_analysis, firecrest_pathname
241 # scan for IPAR directories
242 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
243 # The Intensities directory from the RTA software looks a lot like IPAR
244 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
245 for ipar_pathname in ipar_dirs:
246 LOGGER.info('Found ipar directories in ' + datadir)
247 image_analysis = ipar.ipar(ipar_pathname)
248 if image_analysis is None:
250 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
253 scan_post_image_analysis(
254 runs, runfolder, datadir, image_analysis, ipar_pathname
259 def get_specific_run(gerald_dir):
261 Given a gerald directory, construct a PipelineRun out of its parents
263 Basically this allows specifying a particular run instead of the previous
264 get_runs which scans a runfolder for various combinations of
265 firecrest/ipar/bustard/gerald runs.
267 from htsworkflow.pipelines import firecrest
268 from htsworkflow.pipelines import ipar
269 from htsworkflow.pipelines import bustard
270 from htsworkflow.pipelines import gerald
272 gerald_dir = os.path.expanduser(gerald_dir)
273 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
274 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
276 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
278 LOGGER.info('--- use-run detected options ---')
279 LOGGER.info('runfolder: %s' % (runfolder_dir,))
280 LOGGER.info('image_dir: %s' % (image_dir,))
281 LOGGER.info('bustard_dir: %s' % (bustard_dir,))
282 LOGGER.info('gerald_dir: %s' % (gerald_dir,))
284 # find our processed image dir
286 # split into parent, and leaf directory
287 # leaf directory should be an IPAR or firecrest directory
288 data_dir, short_image_dir = os.path.split(image_dir)
289 LOGGER.info('data_dir: %s' % (data_dir,))
290 LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
292 # guess which type of image processing directory we have by looking
293 # in the leaf directory name
294 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
295 image_run = firecrest.firecrest(image_dir)
296 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
297 image_run = ipar.ipar(image_dir)
298 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
299 image_run = ipar.ipar(image_dir)
301 # if we din't find a run, report the error and return
302 if image_run is None:
303 msg = '%s does not contain an image processing step' % (image_dir,)
307 # find our base calling
308 base_calling_run = bustard.bustard(bustard_dir)
309 if base_calling_run is None:
310 LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
314 gerald_run = gerald.gerald(gerald_dir)
315 if gerald_run is None:
316 LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
319 p = PipelineRun(runfolder_dir)
320 p.image_analysis = image_run
321 p.bustard = base_calling_run
322 p.gerald = gerald_run
324 LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
327 def extract_run_parameters(runs):
329 Search through runfolder_path for various runs and grab their parameters
334 def summarize_mapped_reads(genome_map, mapped_reads):
336 Summarize per chromosome reads into a genome count
337 But handle spike-in/contamination symlinks seperately.
339 summarized_reads = {}
342 for k, v in mapped_reads.items():
343 path, k = os.path.split(k)
344 if len(path) > 0 and not genome_map.has_key(path):
348 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
349 summarized_reads[genome] = genome_reads
350 return summarized_reads
352 def summarize_lane(gerald, lane_id):
354 summary_results = gerald.summary.lane_results
355 for end in range(len(summary_results)):
356 eland_result = gerald.eland_results.results[end][lane_id]
357 report.append("Sample name %s" % (eland_result.sample_name))
358 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
359 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
360 cluster = summary_results[end][eland_result.lane_id].cluster
361 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
362 report.append("Total Reads: %d" % (eland_result.reads))
364 if hasattr(eland_result, 'match_codes'):
365 mc = eland_result.match_codes
367 nm_percent = float(nm) / eland_result.reads * 100
369 qc_percent = float(qc) / eland_result.reads * 100
371 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
372 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
373 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
374 (mc['U0'], mc['U1'], mc['U2']))
375 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
376 (mc['R0'], mc['R1'], mc['R2']))
378 if hasattr(eland_result, 'genome_map'):
379 report.append("Mapped Reads")
380 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
381 for name, counts in mapped_reads.items():
382 report.append(" %s: %d" % (name, counts))
387 def summary_report(runs):
389 Summarize cluster numbers and mapped read counts for a runfolder
394 report.append('Summary for %s' % (run.name,))
396 eland_keys = run.gerald.eland_results.results[0].keys()
397 eland_keys.sort(alphanum)
399 for lane_id in eland_keys:
400 report.extend(summarize_lane(run.gerald, lane_id))
403 return os.linesep.join(report)
405 def is_compressed(filename):
406 if os.path.splitext(filename)[1] == ".gz":
408 elif os.path.splitext(filename)[1] == '.bz2':
413 def save_flowcell_reports(data_dir, cycle_dir):
415 Save the flowcell quality reports
417 data_dir = os.path.abspath(data_dir)
418 status_file = os.path.join(data_dir, 'Status.xml')
419 reports_dir = os.path.join(data_dir, 'reports')
420 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
421 if os.path.exists(reports_dir):
422 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
423 if os.path.exists(status_file):
424 cmd_list.extend(['Status.xml', 'Status.xsl'])
425 LOGGER.info("Saving reports from " + reports_dir)
428 q = QueueCommands([" ".join(cmd_list)])
433 def save_summary_file(pipeline, cycle_dir):
435 gerald_object = pipeline.gerald
436 gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
437 status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
438 if os.path.exists(gerald_summary):
439 LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
440 shutil.copy(gerald_summary, cycle_dir)
441 elif os.path.exists(status_files_summary):
442 LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
443 shutil.copy(status_files_summary, cycle_dir)
445 LOGGER.info('Summary file %s was not found' % (summary_path,))
447 def save_ivc_plot(bustard_object, cycle_dir):
449 Save the IVC page and its supporting images
451 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
452 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
453 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
455 plot_target_path = os.path.join(cycle_dir, 'Plots')
457 if os.path.exists(plot_html):
458 LOGGER.debug("Saving %s" % (plot_html,))
459 LOGGER.debug("Saving %s" % (plot_images,))
460 shutil.copy(plot_html, cycle_dir)
461 if not os.path.exists(plot_target_path):
462 os.mkdir(plot_target_path)
463 for plot_file in glob(plot_images):
464 shutil.copy(plot_file, plot_target_path)
466 LOGGER.warning('Missing IVC.html file, not archiving')
469 def compress_score_files(bustard_object, cycle_dir):
471 Compress score files into our result directory
473 # check for g.pathname/Temp a new feature of 1.1rc1
474 scores_path = bustard_object.pathname
475 scores_path_temp = os.path.join(scores_path, 'Temp')
476 if os.path.isdir(scores_path_temp):
477 scores_path = scores_path_temp
479 # hopefully we have a directory that contains s_*_score files
481 for f in os.listdir(scores_path):
482 if re.match('.*_score.txt', f):
483 score_files.append(f)
485 tar_cmd = ['tar', 'c'] + score_files
486 bzip_cmd = [ 'bzip2', '-9', '-c' ]
487 tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
488 tar_dest = open(tar_dest_name, 'w')
489 LOGGER.info("Compressing score files from %s" % (scores_path,))
490 LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
491 LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
492 LOGGER.info("Writing to %s" % (tar_dest_name,))
495 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
497 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
501 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
503 Compress eland result files into the archive directory
505 # copy & bzip eland files
508 for lanes_dictionary in gerald_object.eland_results.results:
509 for eland_lane in lanes_dictionary.values():
510 source_name = eland_lane.pathname
511 if source_name is None:
513 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
515 path, name = os.path.split(source_name)
516 dest_name = os.path.join(cycle_dir, name)
517 LOGGER.info("Saving eland file %s to %s" % \
518 (source_name, dest_name))
520 if is_compressed(name):
521 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
522 shutil.copy(source_name, dest_name)
526 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
527 bz_commands.append(" ".join(args))
528 #LOGGER.info('Running: %s' % ( " ".join(args) ))
529 #bzip_dest = open(dest_name, 'w')
530 #bzip = subprocess.Popen(args, stdout=bzip_dest)
531 #LOGGER.info('Saving to %s' % (dest_name, ))
534 if len(bz_commands) > 0:
535 q = QueueCommands(bz_commands, num_jobs)
539 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
541 Iterate over runfolders in runs extracting the most useful information.
542 * run parameters (in run-*.xml)
546 * srf files (raw sequence & qualities)
548 if output_base_dir is None:
549 output_base_dir = os.getcwd()
552 result_dir = os.path.join(output_base_dir, r.flowcell_id)
553 LOGGER.info("Using %s as result directory" % (result_dir,))
554 if not os.path.exists(result_dir):
558 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
559 LOGGER.info("Filling in %s" % (cycle,))
560 cycle_dir = os.path.join(result_dir, cycle)
561 cycle_dir = os.path.abspath(cycle_dir)
562 if os.path.exists(cycle_dir):
563 LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
571 # save illumina flowcell status report
572 save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
574 # save stuff from bustard
576 save_ivc_plot(r.bustard, cycle_dir)
578 # build base call saving commands
581 for lane in range(1, 9):
582 lane_parameters = r.gerald.lanes.get(lane, None)
583 if lane_parameters is not None and lane_parameters.analysis != 'none':
586 run_name = srf.pathname_to_run_name(r.pathname)
588 LOGGER.info("Raw Format is: %s" % (raw_format, ))
589 if raw_format == 'fastq':
590 rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
591 LOGGER.info("raw data = %s" % (rawpath,))
592 srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
593 elif raw_format == 'qseq':
594 seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
595 elif raw_format == 'srf':
596 seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
598 raise ValueError('Unknown --raw-format=%s' % (raw_format))
599 srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
601 # save stuff from GERALD
602 # copy stuff out of the main run
606 save_summary_file(r, cycle_dir)
608 # compress eland result files
609 compress_eland_results(g, cycle_dir, num_jobs)
611 # md5 all the compressed files once we're done
612 md5_commands = srf.make_md5_commands(cycle_dir)
613 srf.run_commands(cycle_dir, md5_commands, num_jobs)
615 def rm_list(files, dry_run=True):
617 if os.path.exists(f):
618 LOGGER.info('deleting %s' % (f,))
625 LOGGER.warn("%s doesn't exist." % (f,))
627 def clean_runs(runs, dry_run=True):
629 Clean up run folders to optimize for compression.
632 LOGGER.info('In dry-run mode')
635 LOGGER.info('Cleaninging %s' % (run.pathname,))
637 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
638 rm_list(runlogs, dry_run)
640 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
641 rm_list(pipeline_logs, dry_run)
643 # rm NetCopy.log? Isn't this robocopy?
644 logs = glob(os.path.join(run.pathname, '*.log'))
645 rm_list(logs, dry_run)
648 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
649 rm_list(calibration_dir, dry_run)
651 LOGGER.info("Cleaning images")
652 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
653 rm_list(image_dirs, dry_run)
655 LOGGER.info("Cleaning ReadPrep*")
656 read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
657 rm_list(read_prep_dirs, dry_run)
659 LOGGER.info("Cleaning Thubmnail_images")
660 thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
661 rm_list(thumbnail_dirs, dry_run)
663 # make clean_intermediate
664 logging.info("Cleaning intermediate files")
665 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
666 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
667 cwd=run.image_analysis.pathname,)