2 Core information needed to inspect a runfolder.
16 from xml.etree import ElementTree
17 except ImportError, e:
18 from elementtree import ElementTree
20 LOGGER = logging.getLogger(__name__)
22 EUROPEAN_STRPTIME = "%d-%m-%Y"
23 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
24 VERSION_RE = "([0-9\.]+)"
25 USER_RE = "([a-zA-Z0-9]+)"
26 LANES_PER_FLOWCELL = 8
27 LANE_LIST = range(1, LANES_PER_FLOWCELL + 1)
29 from htsworkflow.util.alphanum import alphanum
30 from htsworkflow.util.ethelp import indent, flatten
31 from htsworkflow.util.queuecommands import QueueCommands
33 from htsworkflow.pipelines import srf
35 class PipelineRun(object):
37 Capture "interesting" information about a pipeline run
40 PIPELINE_RUN = 'PipelineRun'
41 FLOWCELL_ID = 'FlowcellID'
43 def __init__(self, pathname=None, flowcell_id=None, xml=None):
44 if pathname is not None:
45 self.pathname = os.path.normpath(pathname)
49 self._flowcell_id = flowcell_id
50 self.image_analysis = None
55 self.set_elements(xml)
57 def _get_flowcell_id(self):
59 if self._flowcell_id is None:
60 config_dir = os.path.join(self.pathname, 'Config')
61 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
62 if os.path.exists(flowcell_id_path):
63 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
64 self._flowcell_id = flowcell_id_tree.findtext('Text')
66 path_fields = self.pathname.split('_')
67 if len(path_fields) > 0:
68 # guessing last element of filename
69 self._flowcell_id = path_fields[-1]
71 self._flowcell_id = 'unknown'
74 "Flowcell id was not found, guessing %s" % (
77 return self._flowcell_id
78 flowcell_id = property(_get_flowcell_id)
80 def _get_runfolder_name(self):
81 if self.gerald is None:
84 return self.gerald.runfolder_name
85 runfolder_name = property(_get_runfolder_name)
87 def get_elements(self):
89 make one master xml file from all of our sub-components.
91 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
92 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
93 flowcell.text = self.flowcell_id
94 root.append(self.image_analysis.get_elements())
95 root.append(self.bustard.get_elements())
96 root.append(self.gerald.get_elements())
99 def set_elements(self, tree):
100 # this file gets imported by all the others,
101 # so we need to hide the imports to avoid a cyclic imports
102 from htsworkflow.pipelines import firecrest
103 from htsworkflow.pipelines import ipar
104 from htsworkflow.pipelines import bustard
105 from htsworkflow.pipelines import gerald
107 tag = tree.tag.lower()
108 if tag != PipelineRun.PIPELINE_RUN.lower():
109 raise ValueError('Pipeline Run Expecting %s got %s' % (
110 PipelineRun.PIPELINE_RUN, tag))
112 tag = element.tag.lower()
113 if tag == PipelineRun.FLOWCELL_ID.lower():
114 self._flowcell_id = element.text
115 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
116 # you should only have Firecrest or IPAR, never both of them.
117 elif tag == firecrest.Firecrest.FIRECREST.lower():
118 self.image_analysis = firecrest.Firecrest(xml=element)
119 elif tag == ipar.IPAR.IPAR.lower():
120 self.image_analysis = ipar.IPAR(xml=element)
121 elif tag == bustard.Bustard.BUSTARD.lower():
122 self.bustard = bustard.Bustard(xml=element)
123 elif tag == gerald.Gerald.GERALD.lower():
124 self.gerald = gerald.Gerald(xml=element)
126 LOGGER.warn('PipelineRun unrecognized tag %s' % (tag,))
128 def _get_run_name(self):
130 Given a run tuple, find the latest date and use that as our name
132 if self._name is None:
133 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
134 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
135 self._name = 'run_' + self.flowcell_id + "_" + timestamp + '.xml'
137 name = property(_get_run_name)
139 def save(self, destdir=None):
142 LOGGER.info("Saving run report " + self.name)
143 xml = self.get_elements()
145 dest_pathname = os.path.join(destdir, self.name)
146 ElementTree.ElementTree(xml).write(dest_pathname)
148 def load(self, filename):
149 LOGGER.info("Loading run report from " + filename)
150 tree = ElementTree.parse(filename).getroot()
151 self.set_elements(tree)
153 def load_pipeline_run_xml(pathname):
155 Load and instantiate a Pipeline run from a run xml file
158 - `pathname` : location of an run xml file
160 :Returns: initialized PipelineRun object
162 tree = ElementTree.parse(pathname).getroot()
163 run = PipelineRun(xml=tree)
166 def get_runs(runfolder, flowcell_id=None):
168 Search through a run folder for all the various sub component runs
169 and then return a PipelineRun for each different combination.
171 For example if there are two different GERALD runs, this will
172 generate two different PipelineRun objects, that differ
173 in there gerald component.
175 from htsworkflow.pipelines import firecrest
176 from htsworkflow.pipelines import ipar
177 from htsworkflow.pipelines import bustard
178 from htsworkflow.pipelines import gerald
180 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
181 LOGGER.info("Looking for bustard directories in %s" % (pathname,))
182 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
183 # RTA BaseCalls looks enough like Bustard.
184 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
185 for bustard_pathname in bustard_dirs:
186 LOGGER.info("Found bustard directory %s" % (bustard_pathname,))
187 b = bustard.bustard(bustard_pathname)
188 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
189 LOGGER.info("Looking for gerald directories in %s" % (pathname,))
190 for gerald_pathname in glob(gerald_glob):
191 LOGGER.info("Found gerald directory %s" % (gerald_pathname,))
193 g = gerald.gerald(gerald_pathname)
194 p = PipelineRun(runfolder, flowcell_id)
195 p.image_analysis = image_analysis
200 LOGGER.error("Ignoring " + str(e))
202 datadir = os.path.join(runfolder, 'Data')
204 LOGGER.info('Searching for runs in ' + datadir)
206 # scan for firecrest directories
207 for firecrest_pathname in glob(os.path.join(datadir, "*Firecrest*")):
208 LOGGER.info('Found firecrest in ' + datadir)
209 image_analysis = firecrest.firecrest(firecrest_pathname)
210 if image_analysis is None:
212 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
215 scan_post_image_analysis(
216 runs, runfolder, image_analysis, firecrest_pathname
218 # scan for IPAR directories
219 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
220 # The Intensities directory from the RTA software looks a lot like IPAR
221 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
222 for ipar_pathname in ipar_dirs:
223 LOGGER.info('Found ipar directories in ' + datadir)
224 image_analysis = ipar.ipar(ipar_pathname)
225 if image_analysis is None:
227 "%s is an empty or invalid IPAR directory" % (ipar_pathname,)
230 scan_post_image_analysis(
231 runs, runfolder, image_analysis, ipar_pathname
236 def get_specific_run(gerald_dir):
238 Given a gerald directory, construct a PipelineRun out of its parents
240 Basically this allows specifying a particular run instead of the previous
241 get_runs which scans a runfolder for various combinations of
242 firecrest/ipar/bustard/gerald runs.
244 from htsworkflow.pipelines import firecrest
245 from htsworkflow.pipelines import ipar
246 from htsworkflow.pipelines import bustard
247 from htsworkflow.pipelines import gerald
249 gerald_dir = os.path.expanduser(gerald_dir)
250 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
251 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
253 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..', '..'))
255 LOGGER.info('--- use-run detected options ---')
256 LOGGER.info('runfolder: %s' % (runfolder_dir,))
257 LOGGER.info('image_dir: %s' % (image_dir,))
258 LOGGER.info('bustard_dir: %s' % (bustard_dir,))
259 LOGGER.info('gerald_dir: %s' % (gerald_dir,))
261 # find our processed image dir
263 # split into parent, and leaf directory
264 # leaf directory should be an IPAR or firecrest directory
265 data_dir, short_image_dir = os.path.split(image_dir)
266 LOGGER.info('data_dir: %s' % (data_dir,))
267 LOGGER.info('short_iamge_dir: %s' % (short_image_dir,))
269 # guess which type of image processing directory we have by looking
270 # in the leaf directory name
271 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
272 image_run = firecrest.firecrest(image_dir)
273 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
274 image_run = ipar.ipar(image_dir)
275 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
276 image_run = ipar.ipar(image_dir)
278 # if we din't find a run, report the error and return
279 if image_run is None:
280 msg = '%s does not contain an image processing step' % (image_dir,)
284 # find our base calling
285 base_calling_run = bustard.bustard(bustard_dir)
286 if base_calling_run is None:
287 LOGGER.error('%s does not contain a bustard run' % (bustard_dir,))
291 gerald_run = gerald.gerald(gerald_dir)
292 if gerald_run is None:
293 LOGGER.error('%s does not contain a gerald run' % (gerald_dir,))
296 p = PipelineRun(runfolder_dir)
297 p.image_analysis = image_run
298 p.bustard = base_calling_run
299 p.gerald = gerald_run
301 LOGGER.info('Constructed PipelineRun from %s' % (gerald_dir,))
304 def extract_run_parameters(runs):
306 Search through runfolder_path for various runs and grab their parameters
311 def summarize_mapped_reads(genome_map, mapped_reads):
313 Summarize per chromosome reads into a genome count
314 But handle spike-in/contamination symlinks seperately.
316 summarized_reads = {}
319 for k, v in mapped_reads.items():
320 path, k = os.path.split(k)
321 if len(path) > 0 and not genome_map.has_key(path):
325 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
326 summarized_reads[genome] = genome_reads
327 return summarized_reads
329 def summarize_lane(gerald, lane_id):
331 summary_results = gerald.summary.lane_results
332 for end in range(len(summary_results)):
333 eland_result = gerald.eland_results.results[end][lane_id]
334 report.append("Sample name %s" % (eland_result.sample_name))
335 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
336 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
337 cluster = summary_results[end][eland_result.lane_id].cluster
338 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
339 report.append("Total Reads: %d" % (eland_result.reads))
341 if hasattr(eland_result, 'match_codes'):
342 mc = eland_result.match_codes
344 nm_percent = float(nm) / eland_result.reads * 100
346 qc_percent = float(qc) / eland_result.reads * 100
348 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
349 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
350 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
351 (mc['U0'], mc['U1'], mc['U2']))
352 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
353 (mc['R0'], mc['R1'], mc['R2']))
355 if hasattr(eland_result, 'genome_map'):
356 report.append("Mapped Reads")
357 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
358 for name, counts in mapped_reads.items():
359 report.append(" %s: %d" % (name, counts))
364 def summary_report(runs):
366 Summarize cluster numbers and mapped read counts for a runfolder
371 report.append('Summary for %s' % (run.name,))
373 eland_keys = run.gerald.eland_results.results[0].keys()
374 eland_keys.sort(alphanum)
376 for lane_id in eland_keys:
377 report.extend(summarize_lane(run.gerald, lane_id))
380 return os.linesep.join(report)
382 def is_compressed(filename):
383 if os.path.splitext(filename)[1] == ".gz":
385 elif os.path.splitext(filename)[1] == '.bz2':
390 def save_flowcell_reports(data_dir, cycle_dir):
392 Save the flowcell quality reports
394 data_dir = os.path.abspath(data_dir)
395 status_file = os.path.join(data_dir, 'Status.xml')
396 reports_dir = os.path.join(data_dir, 'reports')
397 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
398 if os.path.exists(reports_dir):
399 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
400 if os.path.exists(status_file):
401 cmd_list.extend(['Status.xml', 'Status.xsl'])
402 LOGGER.info("Saving reports from " + reports_dir)
405 q = QueueCommands([" ".join(cmd_list)])
410 def save_summary_file(gerald_object, cycle_dir):
412 summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
413 if os.path.exists(summary_path):
414 LOGGER.info('Copying %s to %s' % (summary_path, cycle_dir))
415 shutil.copy(summary_path, cycle_dir)
417 LOGGER.info('Summary file %s was not found' % (summary_path,))
419 def save_ivc_plot(bustard_object, cycle_dir):
421 Save the IVC page and its supporting images
423 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
424 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
425 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
427 plot_target_path = os.path.join(cycle_dir, 'Plots')
429 if os.path.exists(plot_html):
430 LOGGER.debug("Saving %s" % (plot_html,))
431 LOGGER.debug("Saving %s" % (plot_images,))
432 shutil.copy(plot_html, cycle_dir)
433 if not os.path.exists(plot_target_path):
434 os.mkdir(plot_target_path)
435 for plot_file in glob(plot_images):
436 shutil.copy(plot_file, plot_target_path)
438 LOGGER.warning('Missing IVC.html file, not archiving')
441 def compress_score_files(bustard_object, cycle_dir):
443 Compress score files into our result directory
445 # check for g.pathname/Temp a new feature of 1.1rc1
446 scores_path = bustard_object.pathname
447 scores_path_temp = os.path.join(scores_path, 'Temp')
448 if os.path.isdir(scores_path_temp):
449 scores_path = scores_path_temp
451 # hopefully we have a directory that contains s_*_score files
453 for f in os.listdir(scores_path):
454 if re.match('.*_score.txt', f):
455 score_files.append(f)
457 tar_cmd = ['tar', 'c'] + score_files
458 bzip_cmd = [ 'bzip2', '-9', '-c' ]
459 tar_dest_name = os.path.join(cycle_dir, 'scores.tar.bz2')
460 tar_dest = open(tar_dest_name, 'w')
461 LOGGER.info("Compressing score files from %s" % (scores_path,))
462 LOGGER.info("Running tar: " + " ".join(tar_cmd[:10]))
463 LOGGER.info("Running bzip2: " + " ".join(bzip_cmd))
464 LOGGER.info("Writing to %s" % (tar_dest_name,))
467 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
469 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
473 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
475 Compress eland result files into the archive directory
477 # copy & bzip eland files
480 for lanes_dictionary in gerald_object.eland_results.results:
481 for eland_lane in lanes_dictionary.values():
482 source_name = eland_lane.pathname
483 if source_name is None:
485 "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
487 path, name = os.path.split(source_name)
488 dest_name = os.path.join(cycle_dir, name)
489 LOGGER.info("Saving eland file %s to %s" % \
490 (source_name, dest_name))
492 if is_compressed(name):
493 LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
494 shutil.copy(source_name, dest_name)
498 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
499 bz_commands.append(" ".join(args))
500 #LOGGER.info('Running: %s' % ( " ".join(args) ))
501 #bzip_dest = open(dest_name, 'w')
502 #bzip = subprocess.Popen(args, stdout=bzip_dest)
503 #LOGGER.info('Saving to %s' % (dest_name, ))
506 if len(bz_commands) > 0:
507 q = QueueCommands(bz_commands, num_jobs)
511 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1, raw_format='qseq'):
513 Iterate over runfolders in runs extracting the most useful information.
514 * run parameters (in run-*.xml)
518 * srf files (raw sequence & qualities)
520 if output_base_dir is None:
521 output_base_dir = os.getcwd()
524 result_dir = os.path.join(output_base_dir, r.flowcell_id)
525 LOGGER.info("Using %s as result directory" % (result_dir,))
526 if not os.path.exists(result_dir):
530 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
531 LOGGER.info("Filling in %s" % (cycle,))
532 cycle_dir = os.path.join(result_dir, cycle)
533 cycle_dir = os.path.abspath(cycle_dir)
534 if os.path.exists(cycle_dir):
535 LOGGER.error("%s already exists, not overwriting" % (cycle_dir,))
543 # save illumina flowcell status report
544 save_flowcell_reports(os.path.join(r.image_analysis.pathname, '..'), cycle_dir)
546 # save stuff from bustard
548 save_ivc_plot(r.bustard, cycle_dir)
550 # build base call saving commands
553 for lane in range(1, 9):
554 if r.gerald.lanes[lane].analysis != 'none':
557 run_name = srf.pathname_to_run_name(r.pathname)
559 if raw_format == 'fastq':
560 srf.copy_hiseq_project_fastqs(run_name, r.bustard.pathname, site, cycle_dir)
561 elif raw_format == 'qseq':
562 seq_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
563 elif raw_format == 'srf':
564 seq_cmds = srf.make_srf_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir, 0)
566 raise ValueError('Unknown --raw-format=%s' % (raw_format))
567 srf.run_commands(r.bustard.pathname, seq_cmds, num_jobs)
569 # save stuff from GERALD
570 # copy stuff out of the main run
574 save_summary_file(g, cycle_dir)
576 # compress eland result files
577 compress_eland_results(g, cycle_dir, num_jobs)
579 # md5 all the compressed files once we're done
580 md5_commands = srf.make_md5_commands(cycle_dir)
581 srf.run_commands(cycle_dir, md5_commands, num_jobs)
583 def rm_list(files, dry_run=True):
585 if os.path.exists(f):
586 LOGGER.info('deleting %s' % (f,))
593 LOGGER.warn("%s doesn't exist." % (f,))
595 def clean_runs(runs, dry_run=True):
597 Clean up run folders to optimize for compression.
600 LOGGER.info('In dry-run mode')
603 LOGGER.info('Cleaninging %s' % (run.pathname,))
605 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
606 rm_list(runlogs, dry_run)
608 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
609 rm_list(pipeline_logs, dry_run)
611 # rm NetCopy.log? Isn't this robocopy?
612 logs = glob(os.path.join(run.pathname, '*.log'))
613 rm_list(logs, dry_run)
616 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
617 rm_list(calibration_dir, dry_run)
619 LOGGER.info("Cleaning images")
620 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
621 rm_list(image_dirs, dry_run)
623 LOGGER.info("Cleaning ReadPrep*")
624 read_prep_dirs = glob(os.path.join(run.pathname, 'ReadPrep*'))
625 rm_list(read_prep_dirs, dry_run)
627 LOGGER.info("Cleaning Thubmnail_images")
628 thumbnail_dirs = glob(os.path.join(run.pathname, 'Thumbnail_Images'))
629 rm_list(thumbnail_dirs, dry_run)
631 # make clean_intermediate
632 logging.info("Cleaning intermediate files")
633 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
634 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
635 cwd=run.image_analysis.pathname,)