2 Core information needed to inspect a runfolder.
16 from xml.etree import ElementTree
17 except ImportError, e:
18 from elementtree import ElementTree
20 LOGGER = logging.getLogger(__name__)
22 EUROPEAN_STRPTIME = "%d-%m-%Y"
23 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
24 VERSION_RE = "([0-9\.]+)"
25 USER_RE = "([a-zA-Z0-9]+)"
26 LANES_PER_FLOWCELL = 8
27 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
29 from htsworkflow.util.alphanum import alphanum
30 from htsworkflow.util.ethelp import indent, flatten
31 from htsworkflow.util.queuecommands import QueueCommands
33 from htsworkflow.pipelines import srf
35 class PipelineRun(object):
37 Capture "interesting" information about a pipeline run
40 PIPELINE_RUN = 'PipelineRun'
41 FLOWCELL_ID = 'FlowcellID'
43 def __init__(self, pathname=None, flowcell_id=None, xml=None):
44 if pathname is not None:
45 self.pathname = os.path.normpath(pathname)
49 self._flowcell_id = flowcell_id
51 self.image_analysis = None
56 self.set_elements(xml)
58 def _get_flowcell_id(self):
60 if self._flowcell_id is None:
61 config_dir = os.path.join(self.pathname, 'Config')
62 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
63 if os.path.exists(flowcell_id_path):
64 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
65 self._flowcell_id = flowcell_id_tree.findtext('Text')
67 path_fields = self.pathname.split('_')
68 if len(path_fields) > 0:
69 # guessing last element of filename
70 self._flowcell_id = path_fields[-1]
72 self._flowcell_id = 'unknown'
75 "Flowcell id was not found, guessing %s" % (
78 return self._flowcell_id
79 flowcell_id = property(_get_flowcell_id)
81 def _get_runfolder_name(self):
82 if self.gerald is None:
85 return self.gerald.runfolder_name
86 runfolder_name = property(_get_runfolder_name)
88 def get_elements(self):
90 make one master xml file from all of our sub-components.
92 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
93 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
94 flowcell.text = self.flowcell_id
95 root.append(self.image_analysis.get_elements())
96 root.append(self.bustard.get_elements())
97 root.append(self.gerald.get_elements())
100 def set_elements(self, tree):
101 # this file gets imported by all the others,
102 # so we need to hide the imports to avoid a cyclic imports
103 from htsworkflow.pipelines import firecrest
104 from htsworkflow.pipelines import ipar
105 from htsworkflow.pipelines import bustard
106 from htsworkflow.pipelines import gerald
108 tag = tree.tag.lower()
109 if tag != PipelineRun.PIPELINE_RUN.lower():
110 raise ValueError('Pipeline Run Expecting %s got %s' % (
111 PipelineRun.PIPELINE_RUN, tag))
113 tag = element.tag.lower()
114 if tag == PipelineRun.FLOWCELL_ID.lower():
115 self._flowcell_id = element.text
116 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
117 # you should only have Firecrest or IPAR, never both of them.
118 elif tag == firecrest.Firecrest.FIRECREST.lower():
119 self.image_analysis = firecrest.Firecrest(xml=element)
120 elif tag == ipar.IPAR.IPAR.lower():
121 self.image_analysis = ipar.IPAR(xml=element)
122 elif tag == bustard.Bustard.BUSTARD.lower():
123 self.bustard = bustard.Bustard(xml=element)
124 elif tag == gerald.Gerald.GERALD.lower():
125 self.gerald = gerald.Gerald(xml=element)
127 LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
129 def _get_run_name(self):
131 Given a run tuple, find the latest date and use that as our name
133 if self._name is None:
134 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
135 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
136 self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
138 name = property(_get_run_name)
140 def save(self, destdir=None):
143 LOGGER.info("Saving run report " + self.name)
144 xml = self.get_elements()
146 dest_pathname = os.path.join(destdir, self.name)
147 ElementTree.ElementTree(xml).write(dest_pathname)
149 def load(self, filename):
150 LOGGER.info("Loading run report from " + filename)
151 tree = ElementTree.parse(filename).getroot()
152 self.set_elements(tree)
154 def load_pipeline_run_xml(pathname):
156 Load and instantiate a Pipeline run from a run xml file
159 - `pathname` : location of an run xml file
161 :Returns: initialized PipelineRun object
163 tree = ElementTree.parse(pathname).getroot()
164 run = PipelineRun(xml=tree)
167 def get_runs(runfolder, flowcell_id=None):
169 Search through a run folder for all the various sub component runs
170 and then return a PipelineRun for each different combination.
172 For example if there are two different GERALD runs, this will
173 generate two different PipelineRun objects, that differ
174 in there gerald component.
176 from htsworkflow.pipelines import firecrest
177 from htsworkflow.pipelines import ipar
178 from htsworkflow.pipelines import bustard
179 from htsworkflow.pipelines import gerald
181 def scan_post_image_analysis(runs, runfolder, datadir, image_analysis, pathname):
182 LOGGER.info("Looking for bustard directories in %s" % (pathname,))
183 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
184 # RTA BaseCalls looks enough like Bustard.
185 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
186 for bustard_pathname in bustard_dirs:
187 LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
188 b = bustard.bustard(bustard_pathname)
189 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
190 LOGGER.info("Looking for gerald directories in %s" % (pathname,))
191 for gerald_pathname in glob(gerald_glob):
192 LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
194 g = gerald.gerald(gerald_pathname)
195 p = PipelineRun(runfolder, flowcell_id)
197 p.image_analysis = image_analysis
202 LOGGER.error("Ignoring " + str(e))
204 aligned_glob = os.path.join(runfolder, 'Aligned*')
205 for aligned in glob(aligned_glob):
206 LOGGER.info("Found aligned directory %s" % (aligned,))
208 g = gerald.gerald(aligned)
209 p = PipelineRun(runfolder, flowcell_id)
211 p.image_analysis = image_analysis
216 LOGGER.error("Ignoring " + str(e))
218 datadir = os.path.join(runfolder, 'Data')
220 LOGGER.info('Searching for runs in ' + datadir)
222 # scan for firecrest directories
223 for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
224 LOGGER.info('Found firecrest in ' + datadir)
225 image_analysis = firecrest.firecrest(firecrest_pathname)
226 if image_analysis is None:
228 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
231 scan_post_image_analysis(
232 runs, runfolder, image_analysis, firecrest_pathname
234 # scan for IPAR directories
235 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
236 # The Intensities directory from the RTA software looks a lot like IPAR
237 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
238 for ipar_pathname in ipar_dirs:
239 LOGGER.info('Found ipar directories in ' + datadir)
240 image_analysis = ipar.ipar(ipar_pathname)
241 if image_analysis is None:
243 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
246 scan_post_image_analysis(
247 runs, runfolder, datadir, image_analysis, ipar_pathname
252 def get_specific_run(gerald_dir):
254 Given a gerald directory, construct a PipelineRun out of its parents
256 Basically this allows specifying a particular run instead of the previous
257 get_runs which scans a runfolder for various combinations of
258 firecrest/ipar/bustard/gerald runs.
260 from htsworkflow.pipelines import firecrest
261 from htsworkflow.pipelines import ipar
262 from htsworkflow.pipelines import bustard
263 from htsworkflow.pipelines import gerald
265 gerald_dir = os.path.expanduser(gerald_dir)
266 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
267 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
269 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
271 LOGGER.info('--- use-run detected options ---')
272 LOGGER.info('runfolder: %s' % (runfolder_dir,))
273 LOGGER.info('image_dir: %s' % (image_dir,))
274 LOGGER.info('bustard_dir: %s' % (bustard_dir,))
275 LOGGER.info('gerald_dir: %s' % (gerald_dir,))
277 # find our processed image dir
279 # split into parent, and leaf directory
280 # leaf directory should be an IPAR or firecrest directory
281 data_dir, short_image_dir = os.path.split(image_dir)
282 LOGGER.info('data_dir: %s' % (data_dir,))
283 LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
285 # guess which type of image processing directory we have by looking
286 # in the leaf directory name
287 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
288 image_run = firecrest.firecrest(image_dir)
289 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
290 image_run = ipar.ipar(image_dir)
291 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
292 image_run = ipar.ipar(image_dir)
294 # if we din't find a run, report the error and return
295 if image_run is None:
296 msg = '%s does not contain an image processing step' % (image_dir,)
300 # find our base calling
301 base_calling_run = bustard.bustard(bustard_dir)
302 if base_calling_run is None:
303 LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
307 gerald_run = gerald.gerald(gerald_dir)
308 if gerald_run is None:
309 LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
312 p = PipelineRun(runfolder_dir)
313 p.image_analysis = image_run
314 p.bustard = base_calling_run
315 p.gerald = gerald_run
317 LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
320 def extract_run_parameters(runs):
322 Search through runfolder_path for various runs and grab their parameters
327 def summarize_mapped_reads(genome_map, mapped_reads):
329 Summarize per chromosome reads into a genome count
330 But handle spike-in/contamination symlinks seperately.
332 summarized_reads = {}
335 for k, v in mapped_reads.items():
336 path, k = os.path.split(k)
337 if len(path) > 0 and not genome_map.has_key(path):
341 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
342 summarized_reads[genome] = genome_reads
343 return summarized_reads
345 def summarize_lane(gerald, lane_id):
347 summary_results = gerald.summary.lane_results
348 for end in range(len(summary_results)):
349 eland_result = gerald.eland_results.results[end][lane_id]
350 report.append("Sample name %s" % (eland_result.sample_name))
351 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
352 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
353 cluster = summary_results[end][eland_result.lane_id].cluster
354 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
355 report.append("Total Reads: %d" % (eland_result.reads))
357 if hasattr(eland_result, 'match_codes'):
358 mc = eland_result.match_codes
360 nm_percent = float(nm) / eland_result.reads * 100
362 qc_percent = float(qc) / eland_result.reads * 100
364 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
365 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
366 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
367 (mc['U0'], mc['U1'], mc['U2']))
368 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
369 (mc['R0'], mc['R1'], mc['R2']))
371 if hasattr(eland_result, 'genome_map'):
372 report.append("Mapped Reads")
373 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
374 for name, counts in mapped_reads.items():
375 report.append(" %s: %d" % (name, counts))
380 def summary_report(runs):
382 Summarize cluster numbers and mapped read counts for a runfolder
387 report.append('Summary for %s' % (run.name,))
389 eland_keys = run.gerald.eland_results.results[0].keys()
390 eland_keys.sort(alphanum)
392 for lane_id in eland_keys:
393 report.extend(summarize_lane(run.gerald, lane_id))
396 return os.linesep.join(report)
398 def is_compressed(filename):
399 if os.path.splitext(filename)[1] == ".gz":
401 elif os.path.splitext(filename)[1] == '.bz2':
406 def save_flowcell_reports(data_dir, cycle_dir):
408 Save the flowcell quality reports
410 data_dir = os.path.abspath(data_dir)
411 status_file = os.path.join(data_dir, 'Status.xml')
412 reports_dir = os.path.join(data_dir, 'reports')
413 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
414 if os.path.exists(reports_dir):
415 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
416 if os.path.exists(status_file):
417 cmd_list.extend(['Status.xml', 'Status.xsl'])
418 LOGGER.info("Saving reports from " + reports_dir)
421 q = QueueCommands([" ".join(cmd_list)])
426 def save_summary_file(pipeline, cycle_dir):
428 gerald_object = pipeline.gerald
429 gerald_summary = os.path.join(gerald_object.pathname, 'Summary.htm')
430 status_files_summary = os.path.join(pipeline.datadir, 'Status_Files', 'Summary.htm')
431 if os.path.exists(gerald_summary):
432 LOGGER.info('Copying %s to %s' % (gerald_summary, cycle_dir))
433 shutil.copy(gerald_summary, cycle_dir)
434 elif os.path.exists(status_files_summary):
435 LOGGER.info('Copying %s to %s' % (status_files_summary, cycle_dir))
436 shutil.copy(status_files_summary, cycle_dir)
438 LOGGER.info('Summary file %s was not found' % (summary_path,))
440 def save_ivc_plot(bustard_object, cycle_dir):
442 Save the IVC page and its supporting images
444 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
445 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
446 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
448 plot_target_path = os.path.join(cycle_dir, 'Plots')
450 if os.path.exists(plot_html):
451 LOGGER.debug("Saving %s" % (plot_html,))
452 LOGGER.debug("Saving %s" % (plot_images,))
453 shutil.copy(plot_html, cycle_dir)
454 if not os.path.exists(plot_target_path):
455 os.mkdir(plot_target_path)
456 for plot_file in glob(plot_images):
457 shutil.copy(plot_file, plot_target_path)
459 LOGGER.warning('Missing IVC.html file, not archiving')
462 def compress_score_files(bustard_object, cycle_dir):
464 Compress score files into our result directory
466 # check for g.pathname/Temp a new feature of 1.1rc1
467 scores_path = bustard_object.pathname
468 scores_path_temp = os.path.join(scores_path, 'Temp')
469 if os.path.isdir(scores_path_temp):
470 scores_path = scores_path_temp
472 # hopefully we have a directory that contains s_*_score files
474 for f in os.listdir(scores_path):
475 if re.match('.*_score.txt', f):
476 score_files.append(f)
478 tar_cmd = ['tar', 'c'] + score_files
479 bzip_cmd = [ 'bzip2', '-9', '-c' ]
480 tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
481 tar_dest = open(tar_dest_name, 'w')
482 LOGGER.info("Compressing score files from %s" % (scores_path,))
483 LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
484 LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
485 LOGGER.info("Writing to %s" % (tar_dest_name,))
488 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
490 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
494 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
496 Compress eland result files into the archive directory
498 # copy & bzip eland files
501 for lanes_dictionary in gerald_object.eland_results.results:
502 for eland_lane in lanes_dictionary.values():
503 source_name = eland_lane.pathname
504 if source_name is None:
506 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
508 path, name = os.path.split(source_name)
509 dest_name = os.path.join(cycle_dir, name)
510 LOGGER.info("Saving eland file %s to %s" % \
511 (source_name, dest_name))
513 if is_compressed(name):
514 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
515 shutil.copy(source_name, dest_name)
519 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
520 bz_commands.append(" ".join(args))
521 #LOGGER.info('Running: %s' % ( " ".join(args) ))
522 #bzip_dest = open(dest_name, 'w')
523 #bzip = subprocess.Popen(args, stdout=bzip_dest)
524 #LOGGER.info('Saving to %s' % (dest_name, ))
527 if len(bz_commands) > 0:
528 q = QueueCommands(bz_commands, num_jobs)
532 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
534 Iterate over runfolders in runs extracting the most useful information.
535 * run parameters (in run-*.xml)
539 * srf files (raw sequence & qualities)
541 if output_base_dir is None:
542 output_base_dir = os.getcwd()
545 result_dir = os.path.join(output_base_dir, r.flowcell_id)
546 LOGGER.info("Using %s as result directory" % (result_dir,))
547 if not os.path.exists(result_dir):
551 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
552 LOGGER.info("Filling in %s" % (cycle,))
553 cycle_dir = os.path.join(result_dir, cycle)
554 cycle_dir = os.path.abspath(cycle_dir)
555 if os.path.exists(cycle_dir):
556 LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
564 # save illumina flowcell status report
565 save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
567 # save stuff from bustard
569 save_ivc_plot(r.bustard, cycle_dir)
571 # build base call saving commands
574 for lane in range(1, 9):
575 lane_parameters = r.gerald.lanes.get(lane, None)
576 if lane_parameters is not None and lane_parameters.analysis != 'none':
579 run_name = srf.pathname_to_run_name(r.pathname)
581 LOGGER.info("Raw Format is: %s" % (raw_format, ))
582 if raw_format == 'fastq':
583 rawpath = os.path.join(r.pathname, r.gerald.runfolder_name)
584 LOGGER.info("raw data = %s" % (rawpath,))
585 srf.copy_hiseq_project_fastqs(run_name, rawpath, site, cycle_dir)
586 elif raw_format == 'qseq':
587 seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
588 elif raw_format == 'srf':
589 seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
591 raise ValueError('Unknown --raw-format=%s' % (raw_format))
592 srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
594 # save stuff from GERALD
595 # copy stuff out of the main run
599 save_summary_file(r, cycle_dir)
601 # compress eland result files
602 compress_eland_results(g, cycle_dir, num_jobs)
604 # md5 all the compressed files once we're done
605 md5_commands = srf.make_md5_commands(cycle_dir)
606 srf.run_commands(cycle_dir, md5_commands, num_jobs)
608 def rm_list(files, dry_run=True):
610 if os.path.exists(f):
611 LOGGER.info('deleting %s' % (f,))
618 LOGGER.warn("%s doesn't exist." % (f,))
620 def clean_runs(runs, dry_run=True):
622 Clean up run folders to optimize for compression.
625 LOGGER.info('In dry-run mode')
628 LOGGER.info('Cleaninging %s' % (run.pathname,))
630 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
631 rm_list(runlogs, dry_run)
633 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
634 rm_list(pipeline_logs, dry_run)
636 # rm NetCopy.log? Isn't this robocopy?
637 logs = glob(os.path.join(run.pathname, '*.log'))
638 rm_list(logs, dry_run)
641 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
642 rm_list(calibration_dir, dry_run)
644 LOGGER.info("Cleaning images")
645 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
646 rm_list(image_dirs, dry_run)
648 LOGGER.info("Cleaning ReadPrep*")
649 read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
650 rm_list(read_prep_dirs, dry_run)
652 LOGGER.info("Cleaning Thubmnail_images")
653 thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
654 rm_list(thumbnail_dirs, dry_run)
656 # make clean_intermediate
657 logging.info("Cleaning intermediate files")
658 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
659 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
660 cwd=run.image_analysis.pathname,)