2 Core information needed to inspect a runfolder.
16 from xml.etree import ElementTree
17 except ImportError, e:
18 from elementtree import ElementTree
20 EUROPEAN_STRPTIME = "%d-%m-%Y"
21 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
22 VERSION_RE = "([0-9\.]+)"
23 USER_RE = "([a-zA-Z0-9]+)"
24 LANES_PER_FLOWCELL = 8
25 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
27 from htsworkflow.util.alphanum import alphanum
28 from htsworkflow.util.ethelp import indent, flatten
29 from htsworkflow.util.queuecommands import QueueCommands
31 from htsworkflow.pipelines import srf
33 class PipelineRun(object):
35 Capture "interesting" information about a pipeline run
38 PIPELINE_RUN = 'PipelineRun'
39 FLOWCELL_ID = 'FlowcellID'
41 def __init__(self, pathname=None, xml=None):
42 if pathname is not None:
43 self.pathname = os.path.normpath(pathname)
47 self._flowcell_id = None
48 self.image_analysis = None
53 self.set_elements(xml)
55 def _get_flowcell_id(self):
57 if self._flowcell_id is None:
58 config_dir = os.path.join(self.pathname, 'Config')
59 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
60 if os.path.exists(flowcell_id_path):
61 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
62 self._flowcell_id = flowcell_id_tree.findtext('Text')
64 path_fields = self.pathname.split('_')
65 if len(path_fields) > 0:
66 # guessing last element of filename
67 flowcell_id = path_fields[-1]
69 flowcell_id = 'unknown'
72 "Flowcell id was not found, guessing %s" % (
74 self._flowcell_id = flowcell_id
75 return self._flowcell_id
76 flowcell_id = property(_get_flowcell_id)
78 def get_elements(self):
80 make one master xml file from all of our sub-components.
82 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
83 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
84 flowcell.text = self.flowcell_id
85 root.append(self.image_analysis.get_elements())
86 root.append(self.bustard.get_elements())
87 root.append(self.gerald.get_elements())
90 def set_elements(self, tree):
91 # this file gets imported by all the others,
92 # so we need to hide the imports to avoid a cyclic imports
93 from htsworkflow.pipelines import firecrest
94 from htsworkflow.pipelines import ipar
95 from htsworkflow.pipelines import bustard
96 from htsworkflow.pipelines import gerald
98 tag = tree.tag.lower()
99 if tag != PipelineRun.PIPELINE_RUN.lower():
100 raise ValueError('Pipeline Run Expecting %s got %s' % (
101 PipelineRun.PIPELINE_RUN, tag))
103 tag = element.tag.lower()
104 if tag == PipelineRun.FLOWCELL_ID.lower():
105 self._flowcell_id = element.text
106 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
107 # you should only have Firecrest or IPAR, never both of them.
108 elif tag == firecrest.Firecrest.FIRECREST.lower():
109 self.image_analysis = firecrest.Firecrest(xml=element)
110 elif tag == ipar.IPAR.IPAR.lower():
111 self.image_analysis = ipar.IPAR(xml=element)
112 elif tag == bustard.Bustard.BUSTARD.lower():
113 self.bustard = bustard.Bustard(xml=element)
114 elif tag == gerald.Gerald.GERALD.lower():
115 self.gerald = gerald.Gerald(xml=element)
117 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
119 def _get_run_name(self):
121 Given a run tuple, find the latest date and use that as our name
123 if self._name is None:
124 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
125 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
126 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
128 name = property(_get_run_name)
130 def save(self, destdir=None):
133 logging.info("Saving run report "+ self.name)
134 xml = self.get_elements()
136 dest_pathname = os.path.join(destdir, self.name)
137 ElementTree.ElementTree(xml).write(dest_pathname)
139 def load(self, filename):
140 logging.info("Loading run report from " + filename)
141 tree = ElementTree.parse(filename).getroot()
142 self.set_elements(tree)
144 def load_pipeline_run_xml(pathname):
146 Load and instantiate a Pipeline run from a run xml file
149 - `pathname` : location of an run xml file
151 :Returns: initialized PipelineRun object
153 tree = ElementTree.parse(pathname).getroot()
154 run = PipelineRun(xml=tree)
157 def get_runs(runfolder):
159 Search through a run folder for all the various sub component runs
160 and then return a PipelineRun for each different combination.
162 For example if there are two different GERALD runs, this will
163 generate two different PipelineRun objects, that differ
164 in there gerald component.
166 from htsworkflow.pipelines import firecrest
167 from htsworkflow.pipelines import ipar
168 from htsworkflow.pipelines import bustard
169 from htsworkflow.pipelines import gerald
171 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
172 logging.info("Looking for bustard directories in %s" % (pathname,))
173 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
174 # RTA BaseCalls looks enough like Bustard.
175 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
176 for bustard_pathname in bustard_dirs:
177 logging.info("Found bustard directory %s" % (bustard_pathname,))
178 b = bustard.bustard(bustard_pathname)
179 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
180 logging.info("Looking for gerald directories in %s" % (pathname,))
181 for gerald_pathname in glob(gerald_glob):
182 logging.info("Found gerald directory %s" % (gerald_pathname,))
184 g = gerald.gerald(gerald_pathname)
185 p = PipelineRun(runfolder)
186 p.image_analysis = image_analysis
191 logging.error("Ignoring " + str(e))
193 datadir = os.path.join(runfolder, 'Data')
195 logging.info('Searching for runs in ' + datadir)
197 # scan for firecrest directories
198 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
199 logging.info('Found firecrest in ' + datadir)
200 image_analysis = firecrest.firecrest(firecrest_pathname)
201 if image_analysis is None:
203 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
206 scan_post_image_analysis(
207 runs, runfolder, image_analysis, firecrest_pathname
209 # scan for IPAR directories
210 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
211 # The Intensities directory from the RTA software looks a lot like IPAR
212 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
213 for ipar_pathname in ipar_dirs:
214 logging.info('Found ipar directories in ' + datadir)
215 image_analysis = ipar.ipar(ipar_pathname)
216 if image_analysis is None:
218 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
221 scan_post_image_analysis(
222 runs, runfolder, image_analysis, ipar_pathname
227 def get_specific_run(gerald_dir):
229 Given a gerald directory, construct a PipelineRun out of its parents
231 Basically this allows specifying a particular run instead of the previous
232 get_runs which scans a runfolder for various combinations of
233 firecrest/ipar/bustard/gerald runs.
235 from htsworkflow.pipelines import firecrest
236 from htsworkflow.pipelines import ipar
237 from htsworkflow.pipelines import bustard
238 from htsworkflow.pipelines import gerald
240 gerald_dir = os.path.expanduser(gerald_dir)
241 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
242 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
244 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
246 logging.info('--- use-run detected options ---')
247 logging.info('runfolder: %s' % (runfolder_dir,))
248 logging.info('image_dir: %s' % (image_dir,))
249 logging.info('bustard_dir: %s' % (bustard_dir,))
250 logging.info('gerald_dir: %s' % (gerald_dir,))
252 # find our processed image dir
254 # split into parent, and leaf directory
255 # leaf directory should be an IPAR or firecrest directory
256 data_dir, short_image_dir = os.path.split(image_dir)
257 logging.info('data_dir: %s' % (data_dir,))
258 logging.info('short_iamge_dir: %s' %(short_image_dir,))
260 # guess which type of image processing directory we have by looking
261 # in the leaf directory name
262 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
263 image_run = firecrest.firecrest(image_dir)
264 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
265 image_run = ipar.ipar(image_dir)
266 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
267 image_run = ipar.ipar(image_dir)
269 # if we din't find a run, report the error and return
270 if image_run is None:
271 msg = '%s does not contain an image processing step' % (image_dir,)
275 # find our base calling
276 base_calling_run = bustard.bustard(bustard_dir)
277 if base_calling_run is None:
278 logging.error('%s does not contain a bustard run' % (bustard_dir,))
282 gerald_run = gerald.gerald(gerald_dir)
283 if gerald_run is None:
284 logging.error('%s does not contain a gerald run' % (gerald_dir,))
287 p = PipelineRun(runfolder_dir)
288 p.image_analysis = image_run
289 p.bustard = base_calling_run
290 p.gerald = gerald_run
292 logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
295 def extract_run_parameters(runs):
297 Search through runfolder_path for various runs and grab their parameters
302 def summarize_mapped_reads(genome_map, mapped_reads):
304 Summarize per chromosome reads into a genome count
305 But handle spike-in/contamination symlinks seperately.
307 summarized_reads = {}
310 for k, v in mapped_reads.items():
311 path, k = os.path.split(k)
312 if len(path) > 0 and not genome_map.has_key(path):
316 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
317 summarized_reads[genome] = genome_reads
318 return summarized_reads
320 def summarize_lane(gerald, lane_id):
322 summary_results = gerald.summary.lane_results
323 for end in range(len(summary_results)):
324 eland_result = gerald.eland_results.results[end][lane_id]
325 report.append("Sample name %s" % (eland_result.sample_name))
326 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
327 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
328 cluster = summary_results[end][eland_result.lane_id].cluster
329 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
330 report.append("Total Reads: %d" % (eland_result.reads))
332 if hasattr(eland_result, 'match_codes'):
333 mc = eland_result.match_codes
335 nm_percent = float(nm)/eland_result.reads * 100
337 qc_percent = float(qc)/eland_result.reads * 100
339 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
340 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
341 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
342 (mc['U0'], mc['U1'], mc['U2']))
343 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
344 (mc['R0'], mc['R1'], mc['R2']))
346 if hasattr(eland_result, 'genome_map'):
347 report.append("Mapped Reads")
348 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
349 for name, counts in mapped_reads.items():
350 report.append(" %s: %d" % (name, counts))
355 def summary_report(runs):
357 Summarize cluster numbers and mapped read counts for a runfolder
362 report.append('Summary for %s' % (run.name,))
364 eland_keys = run.gerald.eland_results.results[0].keys()
365 eland_keys.sort(alphanum)
367 for lane_id in eland_keys:
368 report.extend(summarize_lane(run.gerald, lane_id))
371 return os.linesep.join(report)
373 def is_compressed(filename):
374 if os.path.splitext(filename)[1] == ".gz":
376 elif os.path.splitext(filename)[1] == '.bz2':
381 def save_flowcell_reports(data_dir, cycle_dir):
383 Save the flowcell quality reports
385 data_dir = os.path.abspath(data_dir)
386 status_file = os.path.join(data_dir, 'Status.xml')
387 reports_dir = os.path.join(data_dir, 'reports')
388 reports_dest = os.path.join(cycle_dir, 'flowcell-reports.tar.bz2')
389 if os.path.exists(reports_dir):
390 cmd_list = [ 'tar', 'cjvf', reports_dest, 'reports/' ]
391 if os.path.exists(status_file):
392 cmd_list.extend(['Status.xml', 'Status.xsl'])
393 logging.info("Saving reports from " + reports_dir)
396 q = QueueCommands([" ".join(cmd_list)])
401 def save_summary_file(gerald_object, cycle_dir):
403 summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
404 if os.path.exists(summary_path):
405 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
406 shutil.copy(summary_path, cycle_dir)
408 logging.info('Summary file %s was not found' % (summary_path,))
410 def save_ivc_plot(bustard_object, cycle_dir):
412 Save the IVC page and its supporting images
414 plot_html = os.path.join(bustard_object.pathname, 'IVC.htm')
415 plot_image_path = os.path.join(bustard_object.pathname, 'Plots')
416 plot_images = os.path.join(plot_image_path, 's_?_[a-z]*.png')
418 plot_target_path = os.path.join(cycle_dir, 'Plots')
420 if os.path.exists(plot_html):
421 logging.debug("Saving %s" % ( plot_html, ))
422 logging.debug("Saving %s" % ( plot_images, ))
423 shutil.copy(plot_html, cycle_dir)
424 if not os.path.exists(plot_target_path):
425 os.mkdir(plot_target_path)
426 for plot_file in glob(plot_images):
427 shutil.copy(plot_file, plot_target_path)
429 logging.warning('Missing IVC.html file, not archiving')
432 def compress_score_files(bustard_object, cycle_dir):
434 Compress score files into our result directory
436 # check for g.pathname/Temp a new feature of 1.1rc1
437 scores_path = bustard_object.pathname
438 scores_path_temp = os.path.join(scores_path, 'Temp')
439 if os.path.isdir(scores_path_temp):
440 scores_path = scores_path_temp
442 # hopefully we have a directory that contains s_*_score files
444 for f in os.listdir(scores_path):
445 if re.match('.*_score.txt', f):
446 score_files.append(f)
448 tar_cmd = ['tar', 'c'] + score_files
449 bzip_cmd = [ 'bzip2', '-9', '-c' ]
450 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
451 tar_dest = open(tar_dest_name, 'w')
452 logging.info("Compressing score files from %s" % (scores_path,))
453 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
454 logging.info("Running bzip2: " + " ".join(bzip_cmd))
455 logging.info("Writing to %s" %(tar_dest_name,))
458 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
460 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
464 def compress_eland_results(gerald_object, cycle_dir, num_jobs=1):
466 Compress eland result files into the archive directory
468 # copy & bzip eland files
471 for lanes_dictionary in gerald_object.eland_results.results:
472 for eland_lane in lanes_dictionary.values():
473 source_name = eland_lane.pathname
474 path, name = os.path.split(eland_lane.pathname)
475 dest_name = os.path.join(cycle_dir, name)
476 logging.info("Saving eland file %s to %s" % \
477 (source_name, dest_name))
479 if is_compressed(name):
480 logging.info('Already compressed, Saving to %s' % (dest_name, ))
481 shutil.copy(source_name, dest_name)
485 args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
486 bz_commands.append(" ".join(args))
487 #logging.info('Running: %s' % ( " ".join(args) ))
488 #bzip_dest = open(dest_name, 'w')
489 #bzip = subprocess.Popen(args, stdout=bzip_dest)
490 #logging.info('Saving to %s' % (dest_name, ))
493 if len(bz_commands) > 0:
494 q = QueueCommands(bz_commands, num_jobs)
498 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
500 Iterate over runfolders in runs extracting the most useful information.
501 * run parameters (in run-*.xml)
505 * srf files (raw sequence & qualities)
507 if output_base_dir is None:
508 output_base_dir = os.getcwd()
511 result_dir = os.path.join(output_base_dir, r.flowcell_id)
512 logging.info("Using %s as result directory" % (result_dir,))
513 if not os.path.exists(result_dir):
517 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
518 logging.info("Filling in %s" % (cycle,))
519 cycle_dir = os.path.join(result_dir, cycle)
520 if os.path.exists(cycle_dir):
521 logging.error("%s already exists, not overwriting" % (cycle_dir,))
529 # save illumina flowcell status report
530 save_flowcell_reports( os.path.join(r.image_analysis.pathname, '..'), cycle_dir )
532 # save stuff from bustard
534 save_ivc_plot(r.bustard, cycle_dir)
536 # build base call saving commands
539 run_name = srf.pathname_to_run_name(r.pathname)
540 srf_cmds = srf.make_qseq_commands(run_name, r.bustard.pathname, lanes, site, cycle_dir)
541 srf.run_commands(r.bustard.pathname, srf_cmds, num_jobs)
543 # save stuff from GERALD
544 # copy stuff out of the main run
548 save_summary_file(g, cycle_dir)
550 # compress eland result files
551 compress_eland_results(g, cycle_dir, num_jobs)
553 # md5 all the compressed files once we're done
554 md5_commands = srf.make_md5_commands(cycle_dir)
555 srf.run_commands(cycle_dir, md5_commands, num_jobs)
557 def rm_list(files, dry_run=True):
559 if os.path.exists(f):
560 logging.info('deleting %s' % (f,))
567 logging.warn("%s doesn't exist."% (f,))
569 def clean_runs(runs, dry_run=True):
571 Clean up run folders to optimize for compression.
574 logging.info('In dry-run mode')
577 logging.info('Cleaninging %s' % (run.pathname,))
579 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
580 rm_list(runlogs, dry_run)
582 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
583 rm_list(pipeline_logs, dry_run)
585 # rm NetCopy.log? Isn't this robocopy?
586 logs = glob(os.path.join(run.pathname, '*.log'))
587 rm_list(logs, dry_run)
590 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
591 rm_list(calibration_dir, dry_run)
593 logging.info("Cleaning images")
594 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
595 rm_list(image_dirs, dry_run)
596 # cd Data/C1-*_Firecrest*
597 logging.info("Cleaning intermediate files")
598 # make clean_intermediate
599 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
600 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
601 cwd=run.image_analysis.pathname,)