2 Core information needed to inspect a runfolder.
15 import lxml.etree as ElementTree
17 LOGGER = logging.getLogger(__name__)
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
28 from htsworkflow.util.queuecommands import QueueCommands
30 from htsworkflow.pipelines import srf
32 class PipelineRun(object):
34 Capture "interesting" information about a pipeline run
37 PIPELINE_RUN = 'PipelineRun'
38 FLOWCELL_ID = 'FlowcellID'
40 def __init__(self, pathname=None, flowcell_id=None, xml=None):
41 if pathname is not None:
42 self.pathname = os.path.normpath(pathname)
46 self._flowcell_id = flowcell_id
48 self.image_analysis = None
53 self.set_elements(xml)
55 def _get_flowcell_id(self):
57 if self._flowcell_id is None:
58 config_dir = os.path.join(self.pathname, 'Config')
59 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
60 if os.path.exists(flowcell_id_path):
61 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
62 self._flowcell_id = flowcell_id_tree.findtext('Text')
64 path_fields = self.pathname.split('_')
65 if len(path_fields) > 0:
66 # guessing last element of filename
67 self._flowcell_id = path_fields[-1]
69 self._flowcell_id = 'unknown'
72 "Flowcell id was not found, guessing %s" % (
75 return self._flowcell_id
76 flowcell_id = property(_get_flowcell_id)
78 def _get_runfolder_name(self):
79 if self.gerald is None:
82 return self.gerald.runfolder_name
83 runfolder_name = property(_get_runfolder_name)
85 def get_elements(self):
87 make one master xml file from all of our sub-components.
89 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
90 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
91 flowcell.text = self.flowcell_id
92 root.append(self.image_analysis.get_elements())
93 root.append(self.bustard.get_elements())
94 root.append(self.gerald.get_elements())
97 def set_elements(self, tree):
98 # this file gets imported by all the others,
99 # so we need to hide the imports to avoid a cyclic imports
100 from htsworkflow.pipelines import firecrest
101 from htsworkflow.pipelines import ipar
102 from htsworkflow.pipelines import bustard
103 from htsworkflow.pipelines import gerald
105 tag = tree.tag.lower()
106 if tag != PipelineRun.PIPELINE_RUN.lower():
107 raise ValueError('Pipeline Run Expecting %s got %s' % (
108 PipelineRun.PIPELINE_RUN, tag))
110 tag = element.tag.lower()
111 if tag == PipelineRun.FLOWCELL_ID.lower():
112 self._flowcell_id = element.text
113 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
114 # you should only have Firecrest or IPAR, never both of them.
115 elif tag == firecrest.Firecrest.FIRECREST.lower():
116 self.image_analysis = firecrest.Firecrest(xml=element)
117 elif tag == ipar.IPAR.IPAR.lower():
118 self.image_analysis = ipar.IPAR(xml=element)
119 elif tag == bustard.Bustard.BUSTARD.lower():
120 self.bustard = bustard.Bustard(xml=element)
121 elif tag == gerald.Gerald.GERALD.lower():
122 self.gerald = gerald.Gerald(xml=element)
123 elif tag == gerald.CASAVA.GERALD.lower():
124 self.gerald = gerald.CASAVA(xml=element)
126 LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
128 def _get_run_name(self):
130 Given a run tuple, find the latest date and use that as our name
132 if self._name is None:
133 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
134 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
135 self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
137 name = property(_get_run_name)
139 def save(self, destdir=None):
142 LOGGER.info("Saving run report " + self.name)
143 xml = self.get_elements()
145 dest_pathname = os.path.join(destdir, self.name)
146 ElementTree.ElementTree(xml).write(dest_pathname)
148 def load(self, filename):
149 LOGGER.info("Loading run report from " + filename)
150 tree = ElementTree.parse(filename).getroot()
151 self.set_elements(tree)
153 def load_pipeline_run_xml(pathname):
155 Load and instantiate a Pipeline run from a run xml file
158 - `pathname` : location of an run xml file
160 :Returns: initialized PipelineRun object
162 tree = ElementTree.parse(pathname).getroot()
163 run = PipelineRun(xml=tree)
166 def get_runs(runfolder, flowcell_id=None):
168 Search through a run folder for all the various sub component runs
169 and then return a PipelineRun for each different combination.
171 For example if there are two different GERALD runs, this will
172 generate two different PipelineRun objects, that differ
173 in there gerald component.
175 from htsworkflow.pipelines import firecrest
176 from htsworkflow.pipelines import ipar
177 from htsworkflow.pipelines import bustard
178 from htsworkflow.pipelines import gerald
180 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
181 LOGGER.info("Looking for bustard directories in %s" % (pathname,))
182 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
183 # RTA BaseCalls looks enough like Bustard.
184 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
185 for bustard_pathname in bustard_dirs:
186 LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
187 b = bustard.bustard(bustard_pathname)
188 build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder)
190 build_aligned_runs(image_analysis, runs, b, datadir, runfolder)
192 def build_gerald_runs(runs, b, image_analysis, bustard_pathname, datadir, pathname, runfolder):
193 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
194 LOGGER.info("Looking for gerald directories in %s" % (pathname,))
195 for gerald_pathname in glob(gerald_glob):
196 LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
198 g = gerald.gerald(gerald_pathname)
199 p = PipelineRun(runfolder, flowcell_id)
201 p.image_analysis = image_analysis
206 LOGGER.error("Ignoring " + str(e))
209 def build_aligned_runs(image_analysis, runs, b, datadir, runfolder):
210 aligned_glob = os.path.join(runfolder, 'Aligned*')
211 for aligned in glob(aligned_glob):
212 LOGGER.info("Found aligned directory %s" % (aligned,))
214 g = gerald.gerald(aligned)
215 p = PipelineRun(runfolder, flowcell_id)
217 p.image_analysis = image_analysis
222 LOGGER.error("Ignoring " + str(e))
224 datadir = os.path.join(runfolder, 'Data')
226 LOGGER.info('Searching for runs in ' + datadir)
228 # scan for firecrest directories
229 for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
230 LOGGER.info('Found firecrest in ' + datadir)
231 image_analysis = firecrest.firecrest(firecrest_pathname)
232 if image_analysis is None:
234 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
237 scan_post_image_analysis(
238 runs, runfolder, datadir, image_analysis, firecrest_pathname
240 # scan for IPAR directories
241 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
242 # The Intensities directory from the RTA software looks a lot like IPAR
243 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
244 for ipar_pathname in ipar_dirs:
245 LOGGER.info('Found ipar directories in ' + datadir)
246 image_analysis = ipar.ipar(ipar_pathname)
247 if image_analysis is None:
249 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
252 scan_post_image_analysis(
253 runs, runfolder, datadir, image_analysis, ipar_pathname
258 def get_specific_run(gerald_dir):
260 Given a gerald directory, construct a PipelineRun out of its parents
262 Basically this allows specifying a particular run instead of the previous
263 get_runs which scans a runfolder for various combinations of
264 firecrest/ipar/bustard/gerald runs.
266 from htsworkflow.pipelines import firecrest
267 from htsworkflow.pipelines import ipar
268 from htsworkflow.pipelines import bustard
269 from htsworkflow.pipelines import gerald
271 gerald_dir = os.path.expanduser(gerald_dir)
272 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
273 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
275 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
277 LOGGER.info('--- use-run detected options ---')
278 LOGGER.info('runfolder: %s' % (runfolder_dir,))
279 LOGGER.info('image_dir: %s' % (image_dir,))
280 LOGGER.info('bustard_dir: %s' % (bustard_dir,))
281 LOGGER.info('gerald_dir: %s' % (gerald_dir,))
283 # find our processed image dir
285 # split into parent, and leaf directory
286 # leaf directory should be an IPAR or firecrest directory
287 data_dir, short_image_dir = os.path.split(image_dir)
288 LOGGER.info('data_dir: %s' % (data_dir,))
289 LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
291 # guess which type of image processing directory we have by looking
292 # in the leaf directory name
293 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
294 image_run = firecrest.firecrest(image_dir)
295 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
296 image_run = ipar.ipar(image_dir)
297 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
298 image_run = ipar.ipar(image_dir)
300 # if we din't find a run, report the error and return
301 if image_run is None:
302 msg = '%s does not contain an image processing step' % (image_dir,)
306 # find our base calling
307 base_calling_run = bustard.bustard(bustard_dir)
308 if base_calling_run is None:
309 LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
313 gerald_run = gerald.gerald(gerald_dir)
314 if gerald_run is None:
315 LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
318 p = PipelineRun(runfolder_dir)
319 p.image_analysis = image_run
320 p.bustard = base_calling_run
321 p.gerald = gerald_run
323 LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
326 def extract_run_parameters(runs):
328 Search through runfolder_path for various runs and grab their parameters
333 def summarize_mapped_reads(genome_map, mapped_reads):
335 Summarize per chromosome reads into a genome count
336 But handle spike-in/contamination symlinks seperately.
338 summarized_reads = {}
341 for k, v in mapped_reads.items():
342 path, k = os.path.split(k)
343 if len(path) > 0 and not genome_map.has_key(path):
347 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
348 summarized_reads[genome] = genome_reads
349 return summarized_reads
351 def summarize_lane(gerald, lane_id):
353 summary_results = gerald.summary.lane_results
354 for end in range(len(summary_results)):
355 eland_result = gerald.eland_results.results[end][lane_id]
356 report.append("Sample name %s" % (eland_result.sample_name))
357 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
358 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
359 cluster = summary_results[end][eland_result.lane_id].cluster
360 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
361 report.append("Total Reads: %d" % (eland_result.reads))
363 if hasattr(eland_result, 'match_codes'):
364 mc = eland_result.match_codes
366 nm_percent = float(nm) / eland_result.reads * 100
368 qc_percent = float(qc) / eland_result.reads * 100
370 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
371 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
372 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
373 (mc['U0'], mc['U1'], mc['U2']))
374 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
375 (mc['R0'], mc['R1'], mc['R2']))
377 if hasattr(eland_result, 'genome_map'):
378 report.append("Mapped Reads")
379 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
380 for name, counts in mapped_reads.items():
381 report.append(" %s: %d" % (name, counts))
386 def summary_report(runs):
388 Summarize cluster numbers and mapped read counts for a runfolder
393 report.append('Summary for %s' % (run.name,))
395 eland_keys = run.gerald.eland_results.results[0].keys()
396 eland_keys.sort(alphanum)
398 for lane_id in eland_keys:
399 report.extend(summarize_lane(run.gerald, lane_id))
402 return os.linesep.join(report)
404 def is_compressed(filename):
405 if os.path.splitext(filename)[1] == ".gz":
407 elif os.path.splitext(filename)[1] == '.bz2':
412 def save_flowcell_reports(data_dir, cycle_dir):
414 Save the flowcell quality reports
416 data_dir = os.path.abspath(data_dir)
417 status_file = os.path.join(data_dir, 'Status.xml')
418 reports_dir = os.path.join(data_dir, 'reports')
419 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
420 if os.path.exists(reports_dir):
421 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
422 if os.path.exists(status_file):
423 cmd_list.extend(['Status.xml', 'Status.xsl'])
424 LOGGER.info("Saving reports from " + reports_dir)
427 q = QueueCommands([" ".join(cmd_list)])
432 def save_summary_file(pipeline, cycle_dir):
434 gerald_object = pipeline.gerald
435 gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
436 status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
437 if os.path.exists(gerald_summary):
438 LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
439 shutil.copy(gerald_summary, cycle_dir)
440 elif os.path.exists(status_files_summary):
441 LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
442 shutil.copy(status_files_summary, cycle_dir)
444 LOGGER.info('Summary file %s was not found' % (summary_path,))
446 def save_ivc_plot(bustard_object, cycle_dir):
448 Save the IVC page and its supporting images
450 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
451 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
452 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
454 plot_target_path = os.path.join(cycle_dir, 'Plots')
456 if os.path.exists(plot_html):
457 LOGGER.debug("Saving %s" % (plot_html,))
458 LOGGER.debug("Saving %s" % (plot_images,))
459 shutil.copy(plot_html, cycle_dir)
460 if not os.path.exists(plot_target_path):
461 os.mkdir(plot_target_path)
462 for plot_file in glob(plot_images):
463 shutil.copy(plot_file, plot_target_path)
465 LOGGER.warning('Missing IVC.html file, not archiving')
468 def compress_score_files(bustard_object, cycle_dir):
470 Compress score files into our result directory
472 # check for g.pathname/Temp a new feature of 1.1rc1
473 scores_path = bustard_object.pathname
474 scores_path_temp = os.path.join(scores_path, 'Temp')
475 if os.path.isdir(scores_path_temp):
476 scores_path = scores_path_temp
478 # hopefully we have a directory that contains s_*_score files
480 for f in os.listdir(scores_path):
481 if re.match('.*_score.txt', f):
482 score_files.append(f)
484 tar_cmd = ['tar', 'c'] + score_files
485 bzip_cmd = [ 'bzip2', '-9', '-c' ]
486 tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
487 tar_dest = open(tar_dest_name, 'w')
488 LOGGER.info("Compressing score files from %s" % (scores_path,))
489 LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
490 LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
491 LOGGER.info("Writing to %s" % (tar_dest_name,))
494 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
496 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
500 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
502 Compress eland result files into the archive directory
504 # copy & bzip eland files
507 for key in gerald_object.eland_results:
508 eland_lane = gerald_object.eland_results[key]
509 for source_name in eland_lane.pathnames:
510 if source_name is None:
512 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
514 path, name = os.path.split(source_name)
515 dest_name = os.path.join(cycle_dir, name)
516 LOGGER.info("Saving eland file %s to %s" % \
517 (source_name, dest_name))
519 if is_compressed(name):
520 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
521 shutil.copy(source_name, dest_name)
525 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
526 bz_commands.append(" ".join(args))
527 #LOGGER.info('Running: %s' % ( " ".join(args) ))
528 #bzip_dest = open(dest_name, 'w')
529 #bzip = subprocess.Popen(args, stdout=bzip_dest)
530 #LOGGER.info('Saving to %s' % (dest_name, ))
533 if len(bz_commands) > 0:
534 q = QueueCommands(bz_commands, num_jobs)
538 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
540 Iterate over runfolders in runs extracting the most useful information.
541 * run parameters (in run-*.xml)
545 * srf files (raw sequence & qualities)
547 if output_base_dir is None:
548 output_base_dir = os.getcwd()
551 result_dir = os.path.join(output_base_dir, r.flowcell_id)
552 LOGGER.info("Using %s as result directory" % (result_dir,))
553 if not os.path.exists(result_dir):
557 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
558 LOGGER.info("Filling in %s" % (cycle,))
559 cycle_dir = os.path.join(result_dir, cycle)
560 cycle_dir = os.path.abspath(cycle_dir)
561 if os.path.exists(cycle_dir):
562 LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
570 # save illumina flowcell status report
571 save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
573 # save stuff from bustard
575 save_ivc_plot(r.bustard, cycle_dir)
577 # build base call saving commands
580 for lane in range(1, 9):
581 lane_parameters = r.gerald.lanes.get(lane, None)
582 if lane_parameters is not None and lane_parameters.analysis != 'none':
585 run_name = srf.pathname_to_run_name(r.pathname)
587 LOGGER.info("Raw Format is: %s" % (raw_format, ))
588 if raw_format == 'fastq':
589 rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
590 LOGGER.info("raw data = %s" % (rawpath,))
591 srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
592 elif raw_format == 'qseq':
593 seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
594 elif raw_format == 'srf':
595 seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
597 raise ValueError('Unknown --raw-format=%s' % (raw_format))
598 srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
600 # save stuff from GERALD
601 # copy stuff out of the main run
605 save_summary_file(r, cycle_dir)
607 # compress eland result files
608 compress_eland_results(g, cycle_dir, num_jobs)
610 # md5 all the compressed files once we're done
611 md5_commands = srf.make_md5_commands(cycle_dir)
612 srf.run_commands(cycle_dir, md5_commands, num_jobs)
614 def rm_list(files, dry_run=True):
616 if os.path.exists(f):
617 LOGGER.info('deleting %s' % (f,))
624 LOGGER.warn("%s doesn't exist." % (f,))
626 def clean_runs(runs, dry_run=True):
628 Clean up run folders to optimize for compression.
631 LOGGER.info('In dry-run mode')
634 LOGGER.info('Cleaninging %s' % (run.pathname,))
636 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
637 rm_list(runlogs, dry_run)
639 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
640 rm_list(pipeline_logs, dry_run)
642 # rm NetCopy.log? Isn't this robocopy?
643 logs = glob(os.path.join(run.pathname, '*.log'))
644 rm_list(logs, dry_run)
647 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
648 rm_list(calibration_dir, dry_run)
650 LOGGER.info("Cleaning images")
651 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
652 rm_list(image_dirs, dry_run)
654 LOGGER.info("Cleaning ReadPrep*")
655 read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
656 rm_list(read_prep_dirs, dry_run)
658 LOGGER.info("Cleaning Thubmnail_images")
659 thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
660 rm_list(thumbnail_dirs, dry_run)
662 # make clean_intermediate
663 logging.info("Cleaning intermediate files")
664 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
665 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
666 cwd=run.image_analysis.pathname,)