2 Core information needed to inspect a runfolder.
15 import lxml.etree as ElementTree
17 LOGGER = logging.getLogger(__name__)
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28 from htsworkflow.util.queuecommands import QueueCommands
30 from htsworkflow.pipelines import srf
32 class PipelineRun(object):
34 Capture "interesting" information about a pipeline run
37 PIPELINE_RUN = 'PipelineRun'
38 FLOWCELL_ID = 'FlowcellID'
40 def __init__(self, pathname=None, flowcell_id=None, xml=None):
41 if pathname is not None:
42 self.pathname = os.path.normpath(pathname)
46 self._flowcell_id = flowcell_id
48 self.image_analysis = None
53 self.set_elements(xml)
55 def _get_flowcell_id(self):
57 if self._flowcell_id is None:
58 self._flowcell_id = self._get_flowcell_id_from_runinfo()
59 if self._flowcell_id is None:
60 self._flowcell_id = self._get_flowcell_id_from_flowcellid()
61 if self._flowcell_id is None:
62 self._flowcell_id = self._get_flowcell_id_from_path()
63 if self._flowcell_id is None:
64 self._flowcell_id = 'unknown'
67 "Flowcell id was not found, guessing %s" % (
70 return self._flowcell_id
71 flowcell_id = property(_get_flowcell_id)
73 def _get_flowcell_id_from_flowcellid(self):
74 """Extract flowcell id from a Config/FlowcellId.xml file
76 config_dir = os.path.join(self.pathname, 'Config')
77 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
78 if os.path.exists(flowcell_id_path):
79 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
80 return flowcell_id_tree.findtext('Text')
82 def _get_flowcell_id_from_runinfo(self):
83 """Read RunInfo file for flowcell id
85 runinfo = os.path.join(self.pathname, 'RunInfo.xml')
86 if os.path.exists(runinfo):
87 tree = ElementTree.parse(runinfo)
89 fc_nodes = root.xpath('/RunInfo/Run/Flowcell')
90 if len(fc_nodes) == 1:
91 return fc_nodes[0].text
94 def _get_flowcell_id_from_path(self):
95 """Guess a flowcell name from the path
97 path_fields = self.pathname.split('_')
98 if len(path_fields) > 0:
99 # guessing last element of filename
100 return path_fields[-1]
102 def _get_runfolder_name(self):
103 if self.gerald is None:
106 return self.gerald.runfolder_name
107 runfolder_name = property(_get_runfolder_name)
109 def get_elements(self):
111 make one master xml file from all of our sub-components.
113 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
114 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
115 flowcell.text = self.flowcell_id
116 root.append(self.image_analysis.get_elements())
117 root.append(self.bustard.get_elements())
118 root.append(self.gerald.get_elements())
121 def set_elements(self, tree):
122 # this file gets imported by all the others,
123 # so we need to hide the imports to avoid a cyclic imports
124 from htsworkflow.pipelines import firecrest
125 from htsworkflow.pipelines import ipar
126 from htsworkflow.pipelines import bustard
127 from htsworkflow.pipelines import gerald
129 tag = tree.tag.lower()
130 if tag != PipelineRun.PIPELINE_RUN.lower():
131 raise ValueError('Pipeline Run Expecting %s got %s' % (
132 PipelineRun.PIPELINE_RUN, tag))
134 tag = element.tag.lower()
135 if tag == PipelineRun.FLOWCELL_ID.lower():
136 self._flowcell_id = element.text
137 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
138 # you should only have Firecrest or IPAR, never both of them.
139 elif tag == firecrest.Firecrest.FIRECREST.lower():
140 self.image_analysis = firecrest.Firecrest(xml=element)
141 elif tag == ipar.IPAR.IPAR.lower():
142 self.image_analysis = ipar.IPAR(xml=element)
143 elif tag == bustard.Bustard.BUSTARD.lower():
144 self.bustard = bustard.Bustard(xml=element)
145 elif tag == gerald.Gerald.GERALD.lower():
146 self.gerald = gerald.Gerald(xml=element)
147 elif tag == gerald.CASAVA.GERALD.lower():
148 self.gerald = gerald.CASAVA(xml=element)
150 LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
152 def _get_run_name(self):
154 Given a run tuple, find the latest date and use that as our name
156 if self._name is None:
157 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
158 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
159 self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
161 name = property(_get_run_name)
163 def save(self, destdir=None):
166 LOGGER.info("Saving run report " + self.name)
167 xml = self.get_elements()
169 dest_pathname = os.path.join(destdir, self.name)
170 ElementTree.ElementTree(xml).write(dest_pathname)
172 def load(self, filename):
173 LOGGER.info("Loading run report from " + filename)
174 tree = ElementTree.parse(filename).getroot()
175 self.set_elements(tree)
177 def load_pipeline_run_xml(pathname):
179 Load and instantiate a Pipeline run from a run xml file
182 - `pathname` : location of an run xml file
184 :Returns: initialized PipelineRun object
186 tree = ElementTree.parse(pathname).getroot()
187 run = PipelineRun(xml=tree)
190 def get_runs(runfolder, flowcell_id=None):
192 Search through a run folder for all the various sub component runs
193 and then return a PipelineRun for each different combination.
195 For example if there are two different GERALD runs, this will
196 generate two different PipelineRun objects, that differ
197 in there gerald component.
199 from htsworkflow.pipelines import firecrest
200 from htsworkflow.pipelines import ipar
201 from htsworkflow.pipelines import bustard
202 from htsworkflow.pipelines import gerald
204 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis,
206 added = build_aligned_runs(image_analysis, runs, datadir, runfolder)
207 # If we're a multiplexed run, don't look for older run type.
211 LOGGER.info("Looking for bustard directories in %s" % (pathname,))
212 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
213 # RTA BaseCalls looks enough like Bustard.
214 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
215 for bustard_pathname in bustard_dirs:
216 LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
217 b = bustard.bustard(bustard_pathname)
218 build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
221 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
223 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
224 LOGGER.info("Looking for gerald directories in %s" % (pathname,))
225 for gerald_pathname in glob(gerald_glob):
226 LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
228 g = gerald.gerald(gerald_pathname)
229 p = PipelineRun(runfolder, flowcell_id)
231 p.image_analysis = image_analysis
236 LOGGER.error("Ignoring " + str(e))
237 return len(runs) - start
240 def build_aligned_runs(image_analysis, runs, datadir, runfolder):
242 aligned_glob = os.path.join(runfolder, 'Aligned*')
243 for aligned in glob(aligned_glob):
244 LOGGER.info("Found aligned directory %s" % (aligned,))
246 g = gerald.gerald(aligned)
247 p = PipelineRun(runfolder, flowcell_id)
248 bustard_pathname = os.path.join(runfolder, g.runfolder_name)
251 p.image_analysis = image_analysis
252 p.bustard = bustard.bustard(bustard_pathname)
256 LOGGER.error("Ignoring " + str(e))
257 return len(runs) - start
258 datadir = os.path.join(runfolder, 'Data')
260 LOGGER.info('Searching for runs in ' + datadir)
262 # scan for firecrest directories
263 for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
264 LOGGER.info('Found firecrest in ' + datadir)
265 image_analysis = firecrest.firecrest(firecrest_pathname)
266 if image_analysis is None:
268 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
271 scan_post_image_analysis(
272 runs, runfolder, datadir, image_analysis, firecrest_pathname
274 # scan for IPAR directories
275 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
276 # The Intensities directory from the RTA software looks a lot like IPAR
277 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
278 for ipar_pathname in ipar_dirs:
279 LOGGER.info('Found ipar directories in ' + datadir)
280 image_analysis = ipar.ipar(ipar_pathname)
281 if image_analysis is None:
283 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
286 scan_post_image_analysis(
287 runs, runfolder, datadir, image_analysis, ipar_pathname
292 def get_specific_run(gerald_dir):
294 Given a gerald directory, construct a PipelineRun out of its parents
296 Basically this allows specifying a particular run instead of the previous
297 get_runs which scans a runfolder for various combinations of
298 firecrest/ipar/bustard/gerald runs.
300 from htsworkflow.pipelines import firecrest
301 from htsworkflow.pipelines import ipar
302 from htsworkflow.pipelines import bustard
303 from htsworkflow.pipelines import gerald
305 gerald_dir = os.path.expanduser(gerald_dir)
306 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
307 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
309 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
311 LOGGER.info('--- use-run detected options ---')
312 LOGGER.info('runfolder: %s' % (runfolder_dir,))
313 LOGGER.info('image_dir: %s' % (image_dir,))
314 LOGGER.info('bustard_dir: %s' % (bustard_dir,))
315 LOGGER.info('gerald_dir: %s' % (gerald_dir,))
317 # find our processed image dir
319 # split into parent, and leaf directory
320 # leaf directory should be an IPAR or firecrest directory
321 data_dir, short_image_dir = os.path.split(image_dir)
322 LOGGER.info('data_dir: %s' % (data_dir,))
323 LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
325 # guess which type of image processing directory we have by looking
326 # in the leaf directory name
327 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
328 image_run = firecrest.firecrest(image_dir)
329 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
330 image_run = ipar.ipar(image_dir)
331 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
332 image_run = ipar.ipar(image_dir)
334 # if we din't find a run, report the error and return
335 if image_run is None:
336 msg = '%s does not contain an image processing step' % (image_dir,)
340 # find our base calling
341 base_calling_run = bustard.bustard(bustard_dir)
342 if base_calling_run is None:
343 LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
347 gerald_run = gerald.gerald(gerald_dir)
348 if gerald_run is None:
349 LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
352 p = PipelineRun(runfolder_dir)
353 p.image_analysis = image_run
354 p.bustard = base_calling_run
355 p.gerald = gerald_run
357 LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
360 def extract_run_parameters(runs):
362 Search through runfolder_path for various runs and grab their parameters
367 def summarize_mapped_reads(genome_map, mapped_reads):
369 Summarize per chromosome reads into a genome count
370 But handle spike-in/contamination symlinks seperately.
372 summarized_reads = {}
375 for k, v in mapped_reads.items():
376 path, k = os.path.split(k)
377 if len(path) > 0 and path not in genome_map:
381 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
382 summarized_reads[genome] = genome_reads
383 return summarized_reads
385 def summarize_lane(gerald, lane_id):
387 lane_results = gerald.summary.lane_results
388 eland_result = gerald.eland_results[lane_id]
389 report.append("Sample name %s" % (eland_result.sample_name))
390 report.append("Lane id %s end %s" % (lane_id.lane, lane_id.read))
392 if lane_id.read < len(lane_results) and \
393 lane_id.lane in lane_results[lane_id.read]:
394 summary_results = lane_results[lane_id.read][lane_id.lane]
395 cluster = summary_results.cluster
396 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
397 report.append("Total Reads: %d" % (eland_result.reads))
399 if hasattr(eland_result, 'match_codes'):
400 mc = eland_result.match_codes
402 nm_percent = float(nm) / eland_result.reads * 100
404 qc_percent = float(qc) / eland_result.reads * 100
406 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
407 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
408 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
409 (mc['U0'], mc['U1'], mc['U2']))
410 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
411 (mc['R0'], mc['R1'], mc['R2']))
413 if hasattr(eland_result, 'genome_map'):
414 report.append("Mapped Reads")
415 mapped_reads = summarize_mapped_reads(eland_result.genome_map,
416 eland_result.mapped_reads)
417 for name, counts in mapped_reads.items():
418 report.append(" %s: %d" % (name, counts))
423 def summary_report(runs):
425 Summarize cluster numbers and mapped read counts for a runfolder
430 report.append('Summary for %s' % (run.name,))
432 eland_keys = sorted(run.gerald.eland_results.keys())
433 for lane_id in eland_keys:
434 report.extend(summarize_lane(run.gerald, lane_id))
437 return os.linesep.join(report)
439 def is_compressed(filename):
440 if os.path.splitext(filename)[1] == ".gz":
442 elif os.path.splitext(filename)[1] == '.bz2':
447 def save_flowcell_reports(data_dir, cycle_dir):
449 Save the flowcell quality reports
451 data_dir = os.path.abspath(data_dir)
452 status_file = os.path.join(data_dir, 'Status.xml')
453 reports_dir = os.path.join(data_dir, 'reports')
454 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
455 if os.path.exists(reports_dir):
456 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
457 if os.path.exists(status_file):
458 cmd_list.extend(['Status.xml', 'Status.xsl'])
459 LOGGER.info("Saving reports from " + reports_dir)
462 q = QueueCommands([" ".join(cmd_list)])
467 def save_summary_file(pipeline, cycle_dir):
469 gerald_object = pipeline.gerald
470 gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
471 status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
472 if os.path.exists(gerald_summary):
473 LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
474 shutil.copy(gerald_summary, cycle_dir)
475 elif os.path.exists(status_files_summary):
476 LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
477 shutil.copy(status_files_summary, cycle_dir)
479 LOGGER.info('Summary file %s was not found' % (summary_path,))
481 def save_ivc_plot(bustard_object, cycle_dir):
483 Save the IVC page and its supporting images
485 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
486 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
487 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
489 plot_target_path = os.path.join(cycle_dir, 'Plots')
491 if os.path.exists(plot_html):
492 LOGGER.debug("Saving %s" % (plot_html,))
493 LOGGER.debug("Saving %s" % (plot_images,))
494 shutil.copy(plot_html, cycle_dir)
495 if not os.path.exists(plot_target_path):
496 os.mkdir(plot_target_path)
497 for plot_file in glob(plot_images):
498 shutil.copy(plot_file, plot_target_path)
500 LOGGER.warning('Missing IVC.html file, not archiving')
503 def compress_score_files(bustard_object, cycle_dir):
505 Compress score files into our result directory
507 # check for g.pathname/Temp a new feature of 1.1rc1
508 scores_path = bustard_object.pathname
509 scores_path_temp = os.path.join(scores_path, 'Temp')
510 if os.path.isdir(scores_path_temp):
511 scores_path = scores_path_temp
513 # hopefully we have a directory that contains s_*_score files
515 for f in os.listdir(scores_path):
516 if re.match('.*_score.txt', f):
517 score_files.append(f)
519 tar_cmd = ['tar', 'c'] + score_files
520 bzip_cmd = [ 'bzip2', '-9', '-c' ]
521 tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
522 tar_dest = open(tar_dest_name, 'w')
523 LOGGER.info("Compressing score files from %s" % (scores_path,))
524 LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
525 LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
526 LOGGER.info("Writing to %s" % (tar_dest_name,))
529 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
531 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
535 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
537 Compress eland result files into the archive directory
539 # copy & bzip eland files
542 for key in gerald_object.eland_results:
543 eland_lane = gerald_object.eland_results[key]
544 for source_name in eland_lane.pathnames:
545 if source_name is None:
547 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
549 path, name = os.path.split(source_name)
550 dest_name = os.path.join(cycle_dir, name)
551 LOGGER.info("Saving eland file %s to %s" % \
552 (source_name, dest_name))
554 if is_compressed(name):
555 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
556 shutil.copy(source_name, dest_name)
560 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
561 bz_commands.append(" ".join(args))
562 #LOGGER.info('Running: %s' % ( " ".join(args) ))
563 #bzip_dest = open(dest_name, 'w')
564 #bzip = subprocess.Popen(args, stdout=bzip_dest)
565 #LOGGER.info('Saving to %s' % (dest_name, ))
568 if len(bz_commands) > 0:
569 q = QueueCommands(bz_commands, num_jobs)
573 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format=None):
575 Iterate over runfolders in runs extracting the most useful information.
576 * run parameters (in run-*.xml)
580 * srf files (raw sequence & qualities)
582 if output_base_dir is None:
583 output_base_dir = os.getcwd()
586 result_dir = os.path.join(output_base_dir, r.flowcell_id)
587 LOGGER.info("Using %s as result directory" % (result_dir,))
588 if not os.path.exists(result_dir):
592 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
593 LOGGER.info("Filling in %s" % (cycle,))
594 cycle_dir = os.path.join(result_dir, cycle)
595 cycle_dir = os.path.abspath(cycle_dir)
596 if os.path.exists(cycle_dir):
597 LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
605 # save illumina flowcell status report
606 save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'),
609 # save stuff from bustard
611 save_ivc_plot(r.bustard, cycle_dir)
613 # build base call saving commands
615 save_raw_data(num_jobs, r, site, raw_format, cycle_dir)
617 # save stuff from GERALD
618 # copy stuff out of the main run
622 save_summary_file(r, cycle_dir)
624 # compress eland result files
625 compress_eland_results(g, cycle_dir, num_jobs)
627 # md5 all the compressed files once we're done
628 md5_commands = srf.make_md5_commands(cycle_dir)
629 srf.run_commands(cycle_dir, md5_commands, num_jobs)
631 def save_raw_data(num_jobs, r, site, raw_format, cycle_dir):
633 for lane in r.gerald.lanes:
634 lane_parameters = r.gerald.lanes.get(lane, None)
635 if lane_parameters is not None:
638 run_name = srf.pathname_to_run_name(r.pathname)
640 if raw_format is None:
641 raw_format = r.bustard.sequence_format
643 LOGGER.info("Raw Format is: %s" % (raw_format, ))
644 if raw_format == 'fastq':
645 rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
646 LOGGER.info("raw data = %s" % (rawpath,))
647 srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
648 elif raw_format == 'qseq':
649 seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
650 elif raw_format == 'srf':
651 seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
653 raise ValueError('Unknown --raw-format=%s' % (raw_format))
654 srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
656 def rm_list(files, dry_run=True):
658 if os.path.exists(f):
659 LOGGER.info('deleting %s' % (f,))
666 LOGGER.warn("%s doesn't exist." % (f,))
668 def clean_runs(runs, dry_run=True):
670 Clean up run folders to optimize for compression.
673 LOGGER.info('In dry-run mode')
676 LOGGER.info('Cleaninging %s' % (run.pathname,))
678 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
679 rm_list(runlogs, dry_run)
681 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
682 rm_list(pipeline_logs, dry_run)
684 # rm NetCopy.log? Isn't this robocopy?
685 logs = glob(os.path.join(run.pathname, '*.log'))
686 rm_list(logs, dry_run)
689 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
690 rm_list(calibration_dir, dry_run)
692 LOGGER.info("Cleaning images")
693 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
694 rm_list(image_dirs, dry_run)
696 LOGGER.info("Cleaning ReadPrep*")
697 read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
698 rm_list(read_prep_dirs, dry_run)
700 LOGGER.info("Cleaning Thubmnail_images")
701 thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
702 rm_list(thumbnail_dirs, dry_run)
704 # make clean_intermediate
705 logging.info("Cleaning intermediate files")
706 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
707 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
708 cwd=run.image_analysis.pathname,)