2 Core information needed to inspect a runfolder.
15 import lxml.etree as ElementTree
17 LOGGER = logging.getLogger(__name__)
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28 from htsworkflow.util.queuecommands import QueueCommands
30 from htsworkflow.pipelines import srf
32 class PipelineRun(object):
34 Capture "interesting" information about a pipeline run
37 PIPELINE_RUN = 'PipelineRun'
38 FLOWCELL_ID = 'FlowcellID'
40 def __init__(self, pathname=None, flowcell_id=None, xml=None):
41 if pathname is not None:
42 self.pathname = os.path.normpath(pathname)
46 self._flowcell_id = flowcell_id
48 self.image_analysis = None
53 self.set_elements(xml)
55 def _get_flowcell_id(self):
57 if self._flowcell_id is None:
58 self._flowcell_id = self._get_flowcell_id_from_runinfo()
59 if self._flowcell_id is None:
60 self._flowcell_id = self._get_flowcell_id_from_flowcellid()
61 if self._flowcell_id is None:
62 self._flowcell_id = self._get_flowcell_id_from_path()
63 if self._flowcell_id is None:
64 self._flowcell_id = 'unknown'
67 "Flowcell id was not found, guessing %s" % (
70 return self._flowcell_id
71 flowcell_id = property(_get_flowcell_id)
73 def _get_flowcell_id_from_flowcellid(self):
74 """Extract flowcell id from a Config/FlowcellId.xml file
76 config_dir = os.path.join(self.pathname, 'Config')
77 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
78 if os.path.exists(flowcell_id_path):
79 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
80 return flowcell_id_tree.findtext('Text')
82 def _get_flowcell_id_from_runinfo(self):
83 """Read RunInfo file for flowcell id
85 runinfo = os.path.join(self.pathname, 'RunInfo.xml')
86 if os.path.exists(runinfo):
87 tree = ElementTree.parse(runinfo)
89 fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
90 if len(fc_nodes) == 1:
91 return fc_nodes[0].text
94 def _get_flowcell_id_from_path(self):
95 """Guess a flowcell name from the path
97 path_fields = self.pathname.split('_')
98 if len(path_fields) > 0:
99 # guessing last element of filename
100 return path_fields[-1]
102 def _get_runfolder_name(self):
103 if self.gerald is None:
106 return self.gerald.runfolder_name
107 runfolder_name = property(_get_runfolder_name)
109 def get_elements(self):
111 make one master xml file from all of our sub-components.
113 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
114 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
115 flowcell.text = self.flowcell_id
116 root.append(self.image_analysis.get_elements())
117 root.append(self.bustard.get_elements())
118 root.append(self.gerald.get_elements())
121 def set_elements(self, tree):
122 # this file gets imported by all the others,
123 # so we need to hide the imports to avoid a cyclic imports
124 from htsworkflow.pipelines import firecrest
125 from htsworkflow.pipelines import ipar
126 from htsworkflow.pipelines import bustard
127 from htsworkflow.pipelines import gerald
129 tag = tree.tag.lower()
130 if tag != PipelineRun.PIPELINE_RUN.lower():
131 raise ValueError('Pipeline Run Expecting %s got %s' % (
132 PipelineRun.PIPELINE_RUN, tag))
134 tag = element.tag.lower()
135 if tag == PipelineRun.FLOWCELL_ID.lower():
136 self._flowcell_id = element.text
137 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
138 # you should only have Firecrest or IPAR, never both of them.
139 elif tag == firecrest.Firecrest.FIRECREST.lower():
140 self.image_analysis = firecrest.Firecrest(xml=element)
141 elif tag == ipar.IPAR.IPAR.lower():
142 self.image_analysis = ipar.IPAR(xml=element)
143 elif tag == bustard.Bustard.BUSTARD.lower():
144 self.bustard = bustard.Bustard(xml=element)
145 elif tag == gerald.Gerald.GERALD.lower():
146 self.gerald = gerald.Gerald(xml=element)
147 elif tag == gerald.CASAVA.GERALD.lower():
148 self.gerald = gerald.CASAVA(xml=element)
150 LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
152 def _get_run_name(self):
154 Given a run tuple, find the latest date and use that as our name
156 if self._name is None:
157 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
158 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
159 self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
161 name = property(_get_run_name)
163 def save(self, destdir=None):
166 LOGGER.info("Saving run report " + self.name)
167 xml = self.get_elements()
169 dest_pathname = os.path.join(destdir, self.name)
170 ElementTree.ElementTree(xml).write(dest_pathname)
172 def load(self, filename):
173 LOGGER.info("Loading run report from " + filename)
174 tree = ElementTree.parse(filename).getroot()
175 self.set_elements(tree)
177 def load_pipeline_run_xml(pathname):
179 Load and instantiate a Pipeline run from a run xml file
182 - `pathname` : location of an run xml file
184 :Returns: initialized PipelineRun object
186 tree = ElementTree.parse(pathname).getroot()
187 run = PipelineRun(xml=tree)
190 def get_runs(runfolder, flowcell_id=None):
192 Search through a run folder for all the various sub component runs
193 and then return a PipelineRun for each different combination.
195 For example if there are two different GERALD runs, this will
196 generate two different PipelineRun objects, that differ
197 in there gerald component.
199 from htsworkflow.pipelines import firecrest
200 from htsworkflow.pipelines import ipar
201 from htsworkflow.pipelines import bustard
202 from htsworkflow.pipelines import gerald
204 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
205 LOGGER.info("Looking for bustard directories in %s" % (pathname,))
206 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
207 # RTA BaseCalls looks enough like Bustard.
208 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
209 for bustard_pathname in bustard_dirs:
210 LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
211 b = bustard.bustard(bustard_pathname)
212 build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
214 build_aligned_runs(image_analysis, runs, b, datadir, runfolder)
216 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
217 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
218 LOGGER.info("Looking for gerald directories in %s" % (pathname,))
219 for gerald_pathname in glob(gerald_glob):
220 LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
222 g = gerald.gerald(gerald_pathname)
223 p = PipelineRun(runfolder, flowcell_id)
225 p.image_analysis = image_analysis
230 LOGGER.error("Ignoring " + str(e))
233 def build_aligned_runs(image_analysis, runs, b, datadir, runfolder):
234 aligned_glob = os.path.join(runfolder, 'Aligned*')
235 for aligned in glob(aligned_glob):
236 LOGGER.info("Found aligned directory %s" % (aligned,))
238 g = gerald.gerald(aligned)
239 p = PipelineRun(runfolder, flowcell_id)
241 p.image_analysis = image_analysis
246 LOGGER.error("Ignoring " + str(e))
248 datadir = os.path.join(runfolder, 'Data')
250 LOGGER.info('Searching for runs in ' + datadir)
252 # scan for firecrest directories
253 for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
254 LOGGER.info('Found firecrest in ' + datadir)
255 image_analysis = firecrest.firecrest(firecrest_pathname)
256 if image_analysis is None:
258 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
261 scan_post_image_analysis(
262 runs, runfolder, datadir, image_analysis, firecrest_pathname
264 # scan for IPAR directories
265 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
266 # The Intensities directory from the RTA software looks a lot like IPAR
267 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
268 for ipar_pathname in ipar_dirs:
269 LOGGER.info('Found ipar directories in ' + datadir)
270 image_analysis = ipar.ipar(ipar_pathname)
271 if image_analysis is None:
273 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
276 scan_post_image_analysis(
277 runs, runfolder, datadir, image_analysis, ipar_pathname
282 def get_specific_run(gerald_dir):
284 Given a gerald directory, construct a PipelineRun out of its parents
286 Basically this allows specifying a particular run instead of the previous
287 get_runs which scans a runfolder for various combinations of
288 firecrest/ipar/bustard/gerald runs.
290 from htsworkflow.pipelines import firecrest
291 from htsworkflow.pipelines import ipar
292 from htsworkflow.pipelines import bustard
293 from htsworkflow.pipelines import gerald
295 gerald_dir = os.path.expanduser(gerald_dir)
296 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
297 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
299 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
301 LOGGER.info('--- use-run detected options ---')
302 LOGGER.info('runfolder: %s' % (runfolder_dir,))
303 LOGGER.info('image_dir: %s' % (image_dir,))
304 LOGGER.info('bustard_dir: %s' % (bustard_dir,))
305 LOGGER.info('gerald_dir: %s' % (gerald_dir,))
307 # find our processed image dir
309 # split into parent, and leaf directory
310 # leaf directory should be an IPAR or firecrest directory
311 data_dir, short_image_dir = os.path.split(image_dir)
312 LOGGER.info('data_dir: %s' % (data_dir,))
313 LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
315 # guess which type of image processing directory we have by looking
316 # in the leaf directory name
317 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
318 image_run = firecrest.firecrest(image_dir)
319 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
320 image_run = ipar.ipar(image_dir)
321 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
322 image_run = ipar.ipar(image_dir)
324 # if we din't find a run, report the error and return
325 if image_run is None:
326 msg = '%s does not contain an image processing step' % (image_dir,)
330 # find our base calling
331 base_calling_run = bustard.bustard(bustard_dir)
332 if base_calling_run is None:
333 LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
337 gerald_run = gerald.gerald(gerald_dir)
338 if gerald_run is None:
339 LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
342 p = PipelineRun(runfolder_dir)
343 p.image_analysis = image_run
344 p.bustard = base_calling_run
345 p.gerald = gerald_run
347 LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
350 def extract_run_parameters(runs):
352 Search through runfolder_path for various runs and grab their parameters
357 def summarize_mapped_reads(genome_map, mapped_reads):
359 Summarize per chromosome reads into a genome count
360 But handle spike-in/contamination symlinks seperately.
362 summarized_reads = {}
365 for k, v in mapped_reads.items():
366 path, k = os.path.split(k)
367 if len(path) > 0 and path not in genome_map:
371 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
372 summarized_reads[genome] = genome_reads
373 return summarized_reads
375 def summarize_lane(gerald, lane_id):
377 lane_results = gerald.summary.lane_results
378 eland_result = gerald.eland_results[lane_id]
379 report.append("Sample name %s" % (eland_result.sample_name))
380 report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
382 if lane_id.read < len(lane_results) and \
383 lane_id.lane in lane_results[lane_id.read]:
384 summary_results = lane_results[lane_id.read][lane_id.lane]
385 cluster = summary_results.cluster
386 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
387 report.append("Total Reads: %d" % (eland_result.reads))
389 if hasattr(eland_result, 'match_codes'):
390 mc = eland_result.match_codes
392 nm_percent = float(nm) / eland_result.reads * 100
394 qc_percent = float(qc) / eland_result.reads * 100
396 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
397 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
398 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
399 (mc['U0'], mc['U1'], mc['U2']))
400 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
401 (mc['R0'], mc['R1'], mc['R2']))
403 if hasattr(eland_result, 'genome_map'):
404 report.append("Mapped Reads")
405 mapped_reads = summarize_mapped_reads(eland_result.genome_map,
406 eland_result.mapped_reads)
407 for name, counts in mapped_reads.items():
408 report.append(" %s: %d" % (name, counts))
413 def summary_report(runs):
415 Summarize cluster numbers and mapped read counts for a runfolder
420 report.append('Summary for %s' % (run.name,))
422 eland_keys = sorted(run.gerald.eland_results.keys())
423 for lane_id in eland_keys:
424 report.extend(summarize_lane(run.gerald, lane_id))
427 return os.linesep.join(report)
429 def is_compressed(filename):
430 if os.path.splitext(filename)[1] == ".gz":
432 elif os.path.splitext(filename)[1] == '.bz2':
437 def save_flowcell_reports(data_dir, cycle_dir):
439 Save the flowcell quality reports
441 data_dir = os.path.abspath(data_dir)
442 status_file = os.path.join(data_dir, 'Status.xml')
443 reports_dir = os.path.join(data_dir, 'reports')
444 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
445 if os.path.exists(reports_dir):
446 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
447 if os.path.exists(status_file):
448 cmd_list.extend(['Status.xml', 'Status.xsl'])
449 LOGGER.info("Saving reports from " + reports_dir)
452 q = QueueCommands([" ".join(cmd_list)])
457 def save_summary_file(pipeline, cycle_dir):
459 gerald_object = pipeline.gerald
460 gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
461 status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
462 if os.path.exists(gerald_summary):
463 LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
464 shutil.copy(gerald_summary, cycle_dir)
465 elif os.path.exists(status_files_summary):
466 LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
467 shutil.copy(status_files_summary, cycle_dir)
469 LOGGER.info('Summary file %s was not found' % (summary_path,))
471 def save_ivc_plot(bustard_object, cycle_dir):
473 Save the IVC page and its supporting images
475 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
476 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
477 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
479 plot_target_path = os.path.join(cycle_dir, 'Plots')
481 if os.path.exists(plot_html):
482 LOGGER.debug("Saving %s" % (plot_html,))
483 LOGGER.debug("Saving %s" % (plot_images,))
484 shutil.copy(plot_html, cycle_dir)
485 if not os.path.exists(plot_target_path):
486 os.mkdir(plot_target_path)
487 for plot_file in glob(plot_images):
488 shutil.copy(plot_file, plot_target_path)
490 LOGGER.warning('Missing IVC.html file, not archiving')
493 def compress_score_files(bustard_object, cycle_dir):
495 Compress score files into our result directory
497 # check for g.pathname/Temp a new feature of 1.1rc1
498 scores_path = bustard_object.pathname
499 scores_path_temp = os.path.join(scores_path, 'Temp')
500 if os.path.isdir(scores_path_temp):
501 scores_path = scores_path_temp
503 # hopefully we have a directory that contains s_*_score files
505 for f in os.listdir(scores_path):
506 if re.match('.*_score.txt', f):
507 score_files.append(f)
509 tar_cmd = ['tar', 'c'] + score_files
510 bzip_cmd = [ 'bzip2', '-9', '-c' ]
511 tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
512 tar_dest = open(tar_dest_name, 'w')
513 LOGGER.info("Compressing score files from %s" % (scores_path,))
514 LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
515 LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
516 LOGGER.info("Writing to %s" % (tar_dest_name,))
519 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
521 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
525 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
527 Compress eland result files into the archive directory
529 # copy & bzip eland files
532 for key in gerald_object.eland_results:
533 eland_lane = gerald_object.eland_results[key]
534 for source_name in eland_lane.pathnames:
535 if source_name is None:
537 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
539 path, name = os.path.split(source_name)
540 dest_name = os.path.join(cycle_dir, name)
541 LOGGER.info("Saving eland file %s to %s" % \
542 (source_name, dest_name))
544 if is_compressed(name):
545 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
546 shutil.copy(source_name, dest_name)
550 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
551 bz_commands.append(" ".join(args))
552 #LOGGER.info('Running: %s' % ( " ".join(args) ))
553 #bzip_dest = open(dest_name, 'w')
554 #bzip = subprocess.Popen(args, stdout=bzip_dest)
555 #LOGGER.info('Saving to %s' % (dest_name, ))
558 if len(bz_commands) > 0:
559 q = QueueCommands(bz_commands, num_jobs)
563 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
565 Iterate over runfolders in runs extracting the most useful information.
566 * run parameters (in run-*.xml)
570 * srf files (raw sequence & qualities)
572 if output_base_dir is None:
573 output_base_dir = os.getcwd()
576 result_dir = os.path.join(output_base_dir, r.flowcell_id)
577 LOGGER.info("Using %s as result directory" % (result_dir,))
578 if not os.path.exists(result_dir):
582 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
583 LOGGER.info("Filling in %s" % (cycle,))
584 cycle_dir = os.path.join(result_dir, cycle)
585 cycle_dir = os.path.abspath(cycle_dir)
586 if os.path.exists(cycle_dir):
587 LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
595 # save illumina flowcell status report
596 save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
598 # save stuff from bustard
600 save_ivc_plot(r.bustard, cycle_dir)
602 # build base call saving commands
605 for lane in range(1, 9):
606 lane_parameters = r.gerald.lanes.get(lane, None)
607 if lane_parameters is not None and lane_parameters.analysis != 'none':
610 run_name = srf.pathname_to_run_name(r.pathname)
612 LOGGER.info("Raw Format is: %s" % (raw_format, ))
613 if raw_format == 'fastq':
614 rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
615 LOGGER.info("raw data = %s" % (rawpath,))
616 srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
617 elif raw_format == 'qseq':
618 seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
619 elif raw_format == 'srf':
620 seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
622 raise ValueError('Unknown --raw-format=%s' % (raw_format))
623 srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
625 # save stuff from GERALD
626 # copy stuff out of the main run
630 save_summary_file(r, cycle_dir)
632 # compress eland result files
633 compress_eland_results(g, cycle_dir, num_jobs)
635 # md5 all the compressed files once we're done
636 md5_commands = srf.make_md5_commands(cycle_dir)
637 srf.run_commands(cycle_dir, md5_commands, num_jobs)
639 def rm_list(files, dry_run=True):
641 if os.path.exists(f):
642 LOGGER.info('deleting %s' % (f,))
649 LOGGER.warn("%s doesn't exist." % (f,))
651 def clean_runs(runs, dry_run=True):
653 Clean up run folders to optimize for compression.
656 LOGGER.info('In dry-run mode')
659 LOGGER.info('Cleaninging %s' % (run.pathname,))
661 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
662 rm_list(runlogs, dry_run)
664 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
665 rm_list(pipeline_logs, dry_run)
667 # rm NetCopy.log? Isn't this robocopy?
668 logs = glob(os.path.join(run.pathname, '*.log'))
669 rm_list(logs, dry_run)
672 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
673 rm_list(calibration_dir, dry_run)
675 LOGGER.info("Cleaning images")
676 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
677 rm_list(image_dirs, dry_run)
679 LOGGER.info("Cleaning ReadPrep*")
680 read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
681 rm_list(read_prep_dirs, dry_run)
683 LOGGER.info("Cleaning Thubmnail_images")
684 thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
685 rm_list(thumbnail_dirs, dry_run)
687 # make clean_intermediate
688 logging.info("Cleaning intermediate files")
689 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
690 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
691 cwd=run.image_analysis.pathname,)