2 Core information needed to inspect a runfolder.
16 from xml.etree import ElementTree
17 except ImportError, e:
18 from elementtree import ElementTree
20 EUROPEAN_STRPTIME = "%d-%m-%Y"
21 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
22 VERSION_RE = "([0-9\.]+)"
23 USER_RE = "([a-zA-Z0-9]+)"
24 LANES_PER_FLOWCELL = 8
25 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
27 from htsworkflow.util.alphanum import alphanum
28 from htsworkflow.util.ethelp import indent, flatten
30 from htsworkflow.pipelines import srf
32 class PipelineRun(object):
34 Capture "interesting" information about a pipeline run
37 PIPELINE_RUN = 'PipelineRun'
38 FLOWCELL_ID = 'FlowcellID'
40 def __init__(self, pathname=None, xml=None):
41 if pathname is not None:
42 self.pathname = os.path.normpath(pathname)
46 self._flowcell_id = None
47 self.image_analysis = None
52 self.set_elements(xml)
54 def _get_flowcell_id(self):
56 if self._flowcell_id is None:
57 config_dir = os.path.join(self.pathname, 'Config')
58 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
59 if os.path.exists(flowcell_id_path):
60 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
61 self._flowcell_id = flowcell_id_tree.findtext('Text')
63 path_fields = self.pathname.split('_')
64 if len(path_fields) > 0:
65 # guessing last element of filename
66 flowcell_id = path_fields[-1]
68 flowcell_id = 'unknown'
71 "Flowcell id was not found, guessing %s" % (
73 self._flowcell_id = flowcell_id
74 return self._flowcell_id
75 flowcell_id = property(_get_flowcell_id)
77 def get_elements(self):
79 make one master xml file from all of our sub-components.
81 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
82 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
83 flowcell.text = self.flowcell_id
84 root.append(self.image_analysis.get_elements())
85 root.append(self.bustard.get_elements())
86 root.append(self.gerald.get_elements())
89 def set_elements(self, tree):
90 # this file gets imported by all the others,
91 # so we need to hide the imports to avoid a cyclic imports
92 from htsworkflow.pipelines import firecrest
93 from htsworkflow.pipelines import ipar
94 from htsworkflow.pipelines import bustard
95 from htsworkflow.pipelines import gerald
97 tag = tree.tag.lower()
98 if tag != PipelineRun.PIPELINE_RUN.lower():
99 raise ValueError('Pipeline Run Expecting %s got %s' % (
100 PipelineRun.PIPELINE_RUN, tag))
102 tag = element.tag.lower()
103 if tag == PipelineRun.FLOWCELL_ID.lower():
104 self._flowcell_id = element.text
105 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
106 # you should only have Firecrest or IPAR, never both of them.
107 elif tag == firecrest.Firecrest.FIRECREST.lower():
108 self.image_analysis = firecrest.Firecrest(xml=element)
109 elif tag == ipar.IPAR.IPAR.lower():
110 self.image_analysis = ipar.IPAR(xml=element)
111 elif tag == bustard.Bustard.BUSTARD.lower():
112 self.bustard = bustard.Bustard(xml=element)
113 elif tag == gerald.Gerald.GERALD.lower():
114 self.gerald = gerald.Gerald(xml=element)
116 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
118 def _get_run_name(self):
120 Given a run tuple, find the latest date and use that as our name
122 if self._name is None:
123 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
124 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
125 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
127 name = property(_get_run_name)
129 def save(self, destdir=None):
132 logging.info("Saving run report "+ self.name)
133 xml = self.get_elements()
135 dest_pathname = os.path.join(destdir, self.name)
136 ElementTree.ElementTree(xml).write(dest_pathname)
138 def load(self, filename):
139 logging.info("Loading run report from " + filename)
140 tree = ElementTree.parse(filename).getroot()
141 self.set_elements(tree)
143 def load_pipeline_run_xml(pathname):
145 Load and instantiate a Pipeline run from a run xml file
148 - `pathname` : location of an run xml file
150 :Returns: initialized PipelineRun object
152 tree = ElementTree.parse(pathname).getroot()
153 run = PipelineRun(xml=tree)
156 def get_runs(runfolder):
158 Search through a run folder for all the various sub component runs
159 and then return a PipelineRun for each different combination.
161 For example if there are two different GERALD runs, this will
162 generate two different PipelineRun objects, that differ
163 in there gerald component.
165 from htsworkflow.pipelines import firecrest
166 from htsworkflow.pipelines import ipar
167 from htsworkflow.pipelines import bustard
168 from htsworkflow.pipelines import gerald
170 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
171 logging.info("Looking for bustard directories in %s" % (pathname,))
172 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
173 # RTA BaseCalls looks enough like Bustard.
174 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
175 for bustard_pathname in bustard_dirs:
176 logging.info("Found bustard directory %s" % (bustard_pathname,))
177 b = bustard.bustard(bustard_pathname)
178 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
179 logging.info("Looking for gerald directories in %s" % (pathname,))
180 for gerald_pathname in glob(gerald_glob):
181 logging.info("Found gerald directory %s" % (gerald_pathname,))
183 g = gerald.gerald(gerald_pathname)
184 p = PipelineRun(runfolder)
185 p.image_analysis = image_analysis
190 logging.error("Ignoring " + str(e))
192 datadir = os.path.join(runfolder, 'Data')
194 logging.info('Searching for runs in ' + datadir)
196 # scan for firecrest directories
197 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
198 logging.info('Found firecrest in ' + datadir)
199 image_analysis = firecrest.firecrest(firecrest_pathname)
200 if image_analysis is None:
202 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
205 scan_post_image_analysis(
206 runs, runfolder, image_analysis, firecrest_pathname
208 # scan for IPAR directories
209 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
210 # The Intensities directory from the RTA software looks a lot like IPAR
211 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
212 for ipar_pathname in ipar_dirs:
213 logging.info('Found ipar directories in ' + datadir)
214 image_analysis = ipar.ipar(ipar_pathname)
215 if image_analysis is None:
217 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
220 scan_post_image_analysis(
221 runs, runfolder, image_analysis, ipar_pathname
226 def get_specific_run(gerald_dir):
228 Given a gerald directory, construct a PipelineRun out of its parents
230 Basically this allows specifying a particular run instead of the previous
231 get_runs which scans a runfolder for various combinations of
232 firecrest/ipar/bustard/gerald runs.
234 from htsworkflow.pipelines import firecrest
235 from htsworkflow.pipelines import ipar
236 from htsworkflow.pipelines import bustard
237 from htsworkflow.pipelines import gerald
239 gerald_dir = os.path.expanduser(gerald_dir)
240 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
241 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
243 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
245 logging.info('--- use-run detected options ---')
246 logging.info('runfolder: %s' % (runfolder_dir,))
247 logging.info('image_dir: %s' % (image_dir,))
248 logging.info('bustard_dir: %s' % (bustard_dir,))
249 logging.info('gerald_dir: %s' % (gerald_dir,))
251 # find our processed image dir
253 # split into parent, and leaf directory
254 # leaf directory should be an IPAR or firecrest directory
255 data_dir, short_image_dir = os.path.split(image_dir)
256 logging.info('data_dir: %s' % (data_dir,))
257 logging.info('short_iamge_dir: %s' %(short_image_dir,))
259 # guess which type of image processing directory we have by looking
260 # in the leaf directory name
261 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
262 image_run = firecrest.firecrest(image_dir)
263 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
264 image_run = ipar.ipar(image_dir)
265 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
266 image_run = ipar.ipar(image_dir)
268 # if we din't find a run, report the error and return
269 if image_run is None:
270 msg = '%s does not contain an image processing step' % (image_dir,)
274 # find our base calling
275 base_calling_run = bustard.bustard(bustard_dir)
276 if base_calling_run is None:
277 logging.error('%s does not contain a bustard run' % (bustard_dir,))
281 gerald_run = gerald.gerald(gerald_dir)
282 if gerald_run is None:
283 logging.error('%s does not contain a gerald run' % (gerald_dir,))
286 p = PipelineRun(runfolder_dir)
287 p.image_analysis = image_run
288 p.bustard = base_calling_run
289 p.gerald = gerald_run
291 logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
294 def extract_run_parameters(runs):
296 Search through runfolder_path for various runs and grab their parameters
301 def summarize_mapped_reads(genome_map, mapped_reads):
303 Summarize per chromosome reads into a genome count
304 But handle spike-in/contamination symlinks seperately.
306 summarized_reads = {}
309 for k, v in mapped_reads.items():
310 path, k = os.path.split(k)
311 if len(path) > 0 and not genome_map.has_key(path):
315 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
316 summarized_reads[genome] = genome_reads
317 return summarized_reads
319 def summarize_lane(gerald, lane_id):
321 summary_results = gerald.summary.lane_results
322 for end in range(len(summary_results)):
323 eland_result = gerald.eland_results.results[end][lane_id]
324 report.append("Sample name %s" % (eland_result.sample_name))
325 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
326 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
327 cluster = summary_results[end][eland_result.lane_id].cluster
328 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
329 report.append("Total Reads: %d" % (eland_result.reads))
331 if hasattr(eland_result, 'match_codes'):
332 mc = eland_result.match_codes
334 nm_percent = float(nm)/eland_result.reads * 100
336 qc_percent = float(qc)/eland_result.reads * 100
338 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
339 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
340 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
341 (mc['U0'], mc['U1'], mc['U2']))
342 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
343 (mc['R0'], mc['R1'], mc['R2']))
345 if hasattr(eland_result, 'genome_map'):
346 report.append("Mapped Reads")
347 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
348 for name, counts in mapped_reads.items():
349 report.append(" %s: %d" % (name, counts))
354 def summary_report(runs):
356 Summarize cluster numbers and mapped read counts for a runfolder
361 report.append('Summary for %s' % (run.name,))
363 eland_keys = run.gerald.eland_results.results[0].keys()
364 eland_keys.sort(alphanum)
366 for lane_id in eland_keys:
367 report.extend(summarize_lane(run.gerald, lane_id))
370 return os.linesep.join(report)
372 def is_compressed(filename):
373 if os.path.splitext(filename)[1] == ".gz":
375 elif os.path.splitext(filename)[1] == '.bz2':
380 def save_summary_file(gerald_object, cycle_dir):
382 summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
383 if os.path.exists(summary_path):
384 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
385 shutil.copy(summary_path, cycle_dir)
387 logging.info('Summary file %s was not found' % (summary_path,))
390 def compress_score_files(bustard_object, cycle_dir):
392 Compress score files into our result directory
394 # check for g.pathname/Temp a new feature of 1.1rc1
395 scores_path = bustard_object.pathname
396 scores_path_temp = os.path.join(scores_path, 'Temp')
397 if os.path.isdir(scores_path_temp):
398 scores_path = scores_path_temp
400 # hopefully we have a directory that contains s_*_score files
402 for f in os.listdir(scores_path):
403 if re.match('.*_score.txt', f):
404 score_files.append(f)
406 tar_cmd = ['tar', 'c'] + score_files
407 bzip_cmd = [ 'bzip2', '-9', '-c' ]
408 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
409 tar_dest = open(tar_dest_name, 'w')
410 logging.info("Compressing score files from %s" % (scores_path,))
411 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
412 logging.info("Running bzip2: " + " ".join(bzip_cmd))
413 logging.info("Writing to %s" %(tar_dest_name,))
416 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
418 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
422 def compress_eland_results(gerald_object, cycle_dir):
424 Compress eland result files into the archive directory
426 # copy & bzip eland files
427 for lanes_dictionary in gerald_object.eland_results.results:
428 for eland_lane in lanes_dictionary.values():
429 source_name = eland_lane.pathname
430 path, name = os.path.split(eland_lane.pathname)
431 dest_name = os.path.join(cycle_dir, name)
432 logging.info("Saving eland file %s to %s" % \
433 (source_name, dest_name))
435 if is_compressed(name):
436 logging.info('Already compressed, Saving to %s' % (dest_name, ))
437 shutil.copy(source_name, dest_name)
441 args = ['bzip2', '-9', '-c', source_name]
442 logging.info('Running: %s' % ( " ".join(args) ))
443 bzip_dest = open(dest_name, 'w')
444 bzip = subprocess.Popen(args, stdout=bzip_dest)
445 logging.info('Saving to %s' % (dest_name, ))
448 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
450 Iterate over runfolders in runs extracting the most useful information.
451 * run parameters (in run-*.xml)
455 * srf files (raw sequence & qualities)
457 if output_base_dir is None:
458 output_base_dir = os.getcwd()
461 result_dir = os.path.join(output_base_dir, r.flowcell_id)
462 logging.info("Using %s as result directory" % (result_dir,))
463 if not os.path.exists(result_dir):
467 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
468 logging.info("Filling in %s" % (cycle,))
469 cycle_dir = os.path.join(result_dir, cycle)
470 if os.path.exists(cycle_dir):
471 logging.error("%s already exists, not overwriting" % (cycle_dir,))
476 # copy stuff out of the main run
483 save_summary_file(g, cycle_dir)
486 compress_score_files(r.bustard, cycle_dir)
488 # compress eland result files
489 compress_eland_results(g, cycle_dir)
494 run_name = srf.pathname_to_run_name(r.pathname)
495 srf_cmds = srf.make_commands(run_name, lanes, site, cycle_dir)
496 srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2)
498 def rm_list(files, dry_run=True):
500 if os.path.exists(f):
501 logging.info('deleting %s' % (f,))
508 logging.warn("%s doesn't exist."% (f,))
510 def clean_runs(runs, dry_run=True):
512 Clean up run folders to optimize for compression.
515 logging.info('In dry-run mode')
518 logging.info('Cleaninging %s' % (run.pathname,))
520 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
521 rm_list(runlogs, dry_run)
523 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
524 rm_list(pipeline_logs, dry_run)
526 # rm NetCopy.log? Isn't this robocopy?
527 logs = glob(os.path.join(run.pathname, '*.log'))
528 rm_list(logs, dry_run)
531 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
532 rm_list(calibration_dir, dry_run)
534 logging.info("Cleaning images")
535 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
536 rm_list(image_dirs, dry_run)
537 # cd Data/C1-*_Firecrest*
538 logging.info("Cleaning intermediate files")
539 # make clean_intermediate
540 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
541 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
542 cwd=run.image_analysis.pathname,)