2 Core information needed to inspect a runfolder.
15 import lxml.etree as ElementTree
17 LOGGER = logging.getLogger(__name__)
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28 from htsworkflow.util.queuecommands import QueueCommands
30 from htsworkflow.pipelines import srf
32 class PipelineRun(object):
34 Capture "interesting" information about a pipeline run
37 PIPELINE_RUN = 'PipelineRun'
38 FLOWCELL_ID = 'FlowcellID'
40 def __init__(self, pathname=None, flowcell_id=None, xml=None):
41 if pathname is not None:
42 self.pathname = os.path.normpath(pathname)
46 self._flowcell_id = flowcell_id
48 self.image_analysis = None
53 self.set_elements(xml)
55 def _get_flowcell_id(self):
57 if self._flowcell_id is None:
58 self._flowcell_id = self._get_flowcell_id_from_runinfo()
59 if self._flowcell_id is None:
60 self._flowcell_id = self._get_flowcell_id_from_flowcellid()
61 if self._flowcell_id is None:
62 self._flowcell_id = self._get_flowcell_id_from_path()
63 if self._flowcell_id is None:
64 self._flowcell_id = 'unknown'
67 "Flowcell id was not found, guessing %s" % (
70 return self._flowcell_id
71 flowcell_id = property(_get_flowcell_id)
73 def _get_flowcell_id_from_flowcellid(self):
74 """Extract flowcell id from a Config/FlowcellId.xml file
76 config_dir = os.path.join(self.pathname, 'Config')
77 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
78 if os.path.exists(flowcell_id_path):
79 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
80 return flowcell_id_tree.findtext('Text')
82 def _get_flowcell_id_from_runinfo(self):
83 """Read RunInfo file for flowcell id
85 runinfo = os.path.join(self.pathname, 'RunInfo.xml')
86 if os.path.exists(runinfo):
87 tree = ElementTree.parse(runinfo)
89 fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
90 if len(fc_nodes) == 1:
91 return fc_nodes[0].text
94 def _get_flowcell_id_from_path(self):
95 """Guess a flowcell name from the path
97 path_fields = self.pathname.split('_')
98 if len(path_fields) > 0:
99 # guessing last element of filename
100 return path_fields[-1]
102 def _get_runfolder_name(self):
104 return self.gerald.runfolder_name
105 elif hasattr(self.image_analysis, 'runfolder_name'):
106 return self.image_analysis.runfolder_name
109 runfolder_name = property(_get_runfolder_name)
111 def get_elements(self):
113 make one master xml file from all of our sub-components.
115 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
116 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
117 flowcell.text = self.flowcell_id
118 root.append(self.image_analysis.get_elements())
119 root.append(self.bustard.get_elements())
120 root.append(self.gerald.get_elements())
123 def set_elements(self, tree):
124 # this file gets imported by all the others,
125 # so we need to hide the imports to avoid a cyclic imports
126 from htsworkflow.pipelines import firecrest
127 from htsworkflow.pipelines import ipar
128 from htsworkflow.pipelines import bustard
129 from htsworkflow.pipelines import gerald
131 tag = tree.tag.lower()
132 if tag != PipelineRun.PIPELINE_RUN.lower():
133 raise ValueError('Pipeline Run Expecting %s got %s' % (
134 PipelineRun.PIPELINE_RUN, tag))
136 tag = element.tag.lower()
137 if tag == PipelineRun.FLOWCELL_ID.lower():
138 self._flowcell_id = element.text
139 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
140 # you should only have Firecrest or IPAR, never both of them.
141 elif tag == firecrest.Firecrest.FIRECREST.lower():
142 self.image_analysis = firecrest.Firecrest(xml=element)
143 elif tag == ipar.IPAR.IPAR.lower():
144 self.image_analysis = ipar.IPAR(xml=element)
145 elif tag == bustard.Bustard.BUSTARD.lower():
146 self.bustard = bustard.Bustard(xml=element)
147 elif tag == gerald.Gerald.GERALD.lower():
148 self.gerald = gerald.Gerald(xml=element)
149 elif tag == gerald.CASAVA.GERALD.lower():
150 self.gerald = gerald.CASAVA(xml=element)
152 LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
154 def _get_run_name(self):
156 Given a run tuple, find the latest date and use that as our name
158 if self._name is None:
159 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
160 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
161 self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
163 name = property(_get_run_name)
165 def save(self, destdir=None):
168 LOGGER.info("Saving run report " + self.name)
169 xml = self.get_elements()
171 dest_pathname = os.path.join(destdir, self.name)
172 ElementTree.ElementTree(xml).write(dest_pathname)
174 def load(self, filename):
175 LOGGER.info("Loading run report from " + filename)
176 tree = ElementTree.parse(filename).getroot()
177 self.set_elements(tree)
179 def load_pipeline_run_xml(pathname):
181 Load and instantiate a Pipeline run from a run xml file
184 - `pathname` : location of an run xml file
186 :Returns: initialized PipelineRun object
188 tree = ElementTree.parse(pathname).getroot()
189 run = PipelineRun(xml=tree)
192 def get_runs(runfolder, flowcell_id=None):
194 Search through a run folder for all the various sub component runs
195 and then return a PipelineRun for each different combination.
197 For example if there are two different GERALD runs, this will
198 generate two different PipelineRun objects, that differ
199 in there gerald component.
201 from htsworkflow.pipelines import firecrest
202 from htsworkflow.pipelines import ipar
203 from htsworkflow.pipelines import bustard
204 from htsworkflow.pipelines import gerald
206 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
208 added = build_aligned_runs(image_analysis, runs, datadir, runfolder)
209 # If we're a multiplexed run, don't look for older run type.
213 LOGGER.info("Looking for bustard directories in %s" % (pathname,))
214 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
215 # RTA BaseCalls looks enough like Bustard.
216 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
217 for bustard_pathname in bustard_dirs:
218 LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
219 b = bustard.bustard(bustard_pathname)
220 build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
223 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
225 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
226 LOGGER.info("Looking for gerald directories in %s" % (pathname,))
227 for gerald_pathname in glob(gerald_glob):
228 LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
230 g = gerald.gerald(gerald_pathname)
231 p = PipelineRun(runfolder, flowcell_id)
233 p.image_analysis = image_analysis
238 LOGGER.error("Ignoring " + str(e))
239 return len(runs) - start
242 def build_aligned_runs(image_analysis, runs, datadir, runfolder):
244 aligned_glob = os.path.join(runfolder, 'Aligned*')
245 for aligned in glob(aligned_glob):
246 LOGGER.info("Found aligned directory %s" % (aligned,))
248 g = gerald.gerald(aligned)
249 p = PipelineRun(runfolder, flowcell_id)
250 bustard_pathname = os.path.join(runfolder, g.runfolder_name)
253 p.image_analysis = image_analysis
254 p.bustard = bustard.bustard(bustard_pathname)
258 LOGGER.error("Ignoring " + str(e))
259 return len(runs) - start
260 datadir = os.path.join(runfolder, 'Data')
262 LOGGER.info('Searching for runs in ' + datadir)
264 # scan for firecrest directories
265 for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
266 LOGGER.info('Found firecrest in ' + datadir)
267 image_analysis = firecrest.firecrest(firecrest_pathname)
268 if image_analysis is None:
270 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
273 scan_post_image_analysis(
274 runs, runfolder, datadir, image_analysis, firecrest_pathname
276 # scan for IPAR directories
277 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
278 # The Intensities directory from the RTA software looks a lot like IPAR
279 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
280 for ipar_pathname in ipar_dirs:
281 LOGGER.info('Found ipar directories in ' + datadir)
282 image_analysis = ipar.ipar(ipar_pathname)
283 if image_analysis is None:
285 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
288 scan_post_image_analysis(
289 runs, runfolder, datadir, image_analysis, ipar_pathname
294 def get_specific_run(gerald_dir):
296 Given a gerald directory, construct a PipelineRun out of its parents
298 Basically this allows specifying a particular run instead of the previous
299 get_runs which scans a runfolder for various combinations of
300 firecrest/ipar/bustard/gerald runs.
302 from htsworkflow.pipelines import firecrest
303 from htsworkflow.pipelines import ipar
304 from htsworkflow.pipelines import bustard
305 from htsworkflow.pipelines import gerald
307 gerald_dir = os.path.expanduser(gerald_dir)
308 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
309 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
311 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
313 LOGGER.info('--- use-run detected options ---')
314 LOGGER.info('runfolder: %s' % (runfolder_dir,))
315 LOGGER.info('image_dir: %s' % (image_dir,))
316 LOGGER.info('bustard_dir: %s' % (bustard_dir,))
317 LOGGER.info('gerald_dir: %s' % (gerald_dir,))
319 # find our processed image dir
321 # split into parent, and leaf directory
322 # leaf directory should be an IPAR or firecrest directory
323 data_dir, short_image_dir = os.path.split(image_dir)
324 LOGGER.info('data_dir: %s' % (data_dir,))
325 LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
327 # guess which type of image processing directory we have by looking
328 # in the leaf directory name
329 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
330 image_run = firecrest.firecrest(image_dir)
331 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
332 image_run = ipar.ipar(image_dir)
333 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
334 image_run = ipar.ipar(image_dir)
336 # if we din't find a run, report the error and return
337 if image_run is None:
338 msg = '%s does not contain an image processing step' % (image_dir,)
342 # find our base calling
343 base_calling_run = bustard.bustard(bustard_dir)
344 if base_calling_run is None:
345 LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
349 gerald_run = gerald.gerald(gerald_dir)
350 if gerald_run is None:
351 LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
354 p = PipelineRun(runfolder_dir)
355 p.image_analysis = image_run
356 p.bustard = base_calling_run
357 p.gerald = gerald_run
359 LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
362 def extract_run_parameters(runs):
364 Search through runfolder_path for various runs and grab their parameters
369 def summarize_mapped_reads(genome_map, mapped_reads):
371 Summarize per chromosome reads into a genome count
372 But handle spike-in/contamination symlinks seperately.
374 summarized_reads = {}
377 for k, v in mapped_reads.items():
378 path, k = os.path.split(k)
379 if len(path) > 0 and path not in genome_map:
383 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
384 summarized_reads[genome] = genome_reads
385 return summarized_reads
387 def summarize_lane(gerald, lane_id):
389 lane_results = gerald.summary.lane_results
390 eland_result = gerald.eland_results[lane_id]
391 report.append("Sample name %s" % (eland_result.sample_name))
392 report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
394 if lane_id.read < len(lane_results) and \
395 lane_id.lane in lane_results[lane_id.read]:
396 summary_results = lane_results[lane_id.read][lane_id.lane]
397 cluster = summary_results.cluster
398 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
399 report.append("Total Reads: %d" % (eland_result.reads))
401 if hasattr(eland_result, 'match_codes'):
402 mc = eland_result.match_codes
404 nm_percent = float(nm) / eland_result.reads * 100
406 qc_percent = float(qc) / eland_result.reads * 100
408 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
409 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
410 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
411 (mc['U0'], mc['U1'], mc['U2']))
412 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
413 (mc['R0'], mc['R1'], mc['R2']))
415 if hasattr(eland_result, 'genome_map'):
416 report.append("Mapped Reads")
417 mapped_reads = summarize_mapped_reads(eland_result.genome_map,
418 eland_result.mapped_reads)
419 for name, counts in mapped_reads.items():
420 report.append(" %s: %d" % (name, counts))
425 def summary_report(runs):
427 Summarize cluster numbers and mapped read counts for a runfolder
432 report.append('Summary for %s' % (run.name,))
434 eland_keys = sorted(run.gerald.eland_results.keys())
435 for lane_id in eland_keys:
436 report.extend(summarize_lane(run.gerald, lane_id))
439 return os.linesep.join(report)
441 def is_compressed(filename):
442 if os.path.splitext(filename)[1] == ".gz":
444 elif os.path.splitext(filename)[1] == '.bz2':
449 def save_flowcell_reports(data_dir, cycle_dir):
451 Save the flowcell quality reports
453 data_dir = os.path.abspath(data_dir)
454 status_file = os.path.join(data_dir, 'Status.xml')
455 reports_dir = os.path.join(data_dir, 'reports')
456 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
457 if os.path.exists(reports_dir):
458 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
459 if os.path.exists(status_file):
460 cmd_list.extend(['Status.xml', 'Status.xsl'])
461 LOGGER.info("Saving reports from " + reports_dir)
464 q = QueueCommands([" ".join(cmd_list)])
469 def save_summary_file(pipeline, cycle_dir):
471 gerald_object = pipeline.gerald
472 gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
473 status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
474 if os.path.exists(gerald_summary):
475 LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
476 shutil.copy(gerald_summary, cycle_dir)
477 elif os.path.exists(status_files_summary):
478 LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
479 shutil.copy(status_files_summary, cycle_dir)
481 LOGGER.info('Summary file %s was not found' % (summary_path,))
483 def save_ivc_plot(bustard_object, cycle_dir):
485 Save the IVC page and its supporting images
487 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
488 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
489 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
491 plot_target_path = os.path.join(cycle_dir, 'Plots')
493 if os.path.exists(plot_html):
494 LOGGER.debug("Saving %s" % (plot_html,))
495 LOGGER.debug("Saving %s" % (plot_images,))
496 shutil.copy(plot_html, cycle_dir)
497 if not os.path.exists(plot_target_path):
498 os.mkdir(plot_target_path)
499 for plot_file in glob(plot_images):
500 shutil.copy(plot_file, plot_target_path)
502 LOGGER.warning('Missing IVC.html file, not archiving')
505 def compress_score_files(bustard_object, cycle_dir):
507 Compress score files into our result directory
509 # check for g.pathname/Temp a new feature of 1.1rc1
510 scores_path = bustard_object.pathname
511 scores_path_temp = os.path.join(scores_path, 'Temp')
512 if os.path.isdir(scores_path_temp):
513 scores_path = scores_path_temp
515 # hopefully we have a directory that contains s_*_score files
517 for f in os.listdir(scores_path):
518 if re.match('.*_score.txt', f):
519 score_files.append(f)
521 tar_cmd = ['tar', 'c'] + score_files
522 bzip_cmd = [ 'bzip2', '-9', '-c' ]
523 tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
524 tar_dest = open(tar_dest_name, 'w')
525 LOGGER.info("Compressing score files from %s" % (scores_path,))
526 LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
527 LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
528 LOGGER.info("Writing to %s" % (tar_dest_name,))
531 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
533 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
537 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
539 Compress eland result files into the archive directory
541 # copy & bzip eland files
544 for key in gerald_object.eland_results:
545 eland_lane = gerald_object.eland_results[key]
546 for source_name in eland_lane.pathnames:
547 if source_name is None:
549 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
551 path, name = os.path.split(source_name)
552 dest_name = os.path.join(cycle_dir, name)
553 LOGGER.info("Saving eland file %s to %s" % \
554 (source_name, dest_name))
556 if is_compressed(name):
557 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
558 shutil.copy(source_name, dest_name)
562 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
563 bz_commands.append(" ".join(args))
564 #LOGGER.info('Running: %s' % ( " ".join(args) ))
565 #bzip_dest = open(dest_name, 'w')
566 #bzip = subprocess.Popen(args, stdout=bzip_dest)
567 #LOGGER.info('Saving to %s' % (dest_name, ))
570 if len(bz_commands) > 0:
571 q = QueueCommands(bz_commands, num_jobs)
575 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
577 Iterate over runfolders in runs extracting the most useful information.
578 * run parameters (in run-*.xml)
582 * srf files (raw sequence & qualities)
584 if output_base_dir is None:
585 output_base_dir = os.getcwd()
588 result_dir = os.path.join(output_base_dir, r.flowcell_id)
589 LOGGER.info("Using %s as result directory" % (result_dir,))
590 if not os.path.exists(result_dir):
594 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
595 LOGGER.info("Filling in %s" % (cycle,))
596 cycle_dir = os.path.join(result_dir, cycle)
597 cycle_dir = os.path.abspath(cycle_dir)
598 if os.path.exists(cycle_dir):
599 LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
607 # save illumina flowcell status report
608 save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
611 # save stuff from bustard
613 save_ivc_plot(r.bustard, cycle_dir)
615 # build base call saving commands
617 save_raw_data(num_jobs, r, site, raw_format, cycle_dir)
619 # save stuff from GERALD
620 # copy stuff out of the main run
624 save_summary_file(r, cycle_dir)
626 # compress eland result files
627 compress_eland_results(g, cycle_dir, num_jobs)
629 # md5 all the compressed files once we're done
630 md5_commands = srf.make_md5_commands(cycle_dir)
631 srf.run_commands(cycle_dir, md5_commands, num_jobs)
633 def save_raw_data(num_jobs, r, site, raw_format, cycle_dir):
635 for lane in r.gerald.lanes:
636 lane_parameters = r.gerald.lanes.get(lane, None)
637 if lane_parameters is not None:
640 run_name = srf.pathname_to_run_name(r.pathname)
642 if raw_format is None:
643 raw_format = r.bustard.sequence_format
645 LOGGER.info("Raw Format is: %s" % (raw_format, ))
646 if raw_format == 'fastq':
647 rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
648 LOGGER.info("raw data = %s" % (rawpath,))
649 srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
650 elif raw_format == 'qseq':
651 seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
652 elif raw_format == 'srf':
653 seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
655 raise ValueError('Unknown --raw-format=%s' % (raw_format))
656 srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
658 def rm_list(files, dry_run=True):
660 if os.path.exists(f):
661 LOGGER.info('deleting %s' % (f,))
668 LOGGER.warn("%s doesn't exist." % (f,))
670 def clean_runs(runs, dry_run=True):
672 Clean up run folders to optimize for compression.
675 LOGGER.info('In dry-run mode')
678 LOGGER.info('Cleaninging %s' % (run.pathname,))
680 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
681 rm_list(runlogs, dry_run)
683 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
684 rm_list(pipeline_logs, dry_run)
686 # rm NetCopy.log? Isn't this robocopy?
687 logs = glob(os.path.join(run.pathname, '*.log'))
688 rm_list(logs, dry_run)
691 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
692 rm_list(calibration_dir, dry_run)
694 LOGGER.info("Cleaning images")
695 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
696 rm_list(image_dirs, dry_run)
698 LOGGER.info("Cleaning ReadPrep*")
699 read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
700 rm_list(read_prep_dirs, dry_run)
702 LOGGER.info("Cleaning Thubmnail_images")
703 thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
704 rm_list(thumbnail_dirs, dry_run)
706 # make clean_intermediate
707 logging.info("Cleaning intermediate files")
708 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
709 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
710 cwd=run.image_analysis.pathname,)