2 Core information needed to inspect a runfolder.
15 from xml.etree import ElementTree
16 except ImportError, e:
17 from elementtree import ElementTree
19 EUROPEAN_STRPTIME = "%d-%m-%Y"
20 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
21 VERSION_RE = "([0-9\.]+)"
22 USER_RE = "([a-zA-Z0-9]+)"
23 LANES_PER_FLOWCELL = 8
24 LANE_LIST = range(1, LANES_PER_FLOWCELL+1)
26 from htsworkflow.util.alphanum import alphanum
27 from htsworkflow.util.ethelp import indent, flatten
29 from htsworkflow.pipelines import srf
31 class PipelineRun(object):
33 Capture "interesting" information about a pipeline run
36 PIPELINE_RUN = 'PipelineRun'
37 FLOWCELL_ID = 'FlowcellID'
39 def __init__(self, pathname=None, xml=None):
40 if pathname is not None:
41 self.pathname = os.path.normpath(pathname)
45 self._flowcell_id = None
46 self.image_analysis = None
51 self.set_elements(xml)
53 def _get_flowcell_id(self):
55 if self._flowcell_id is None:
56 config_dir = os.path.join(self.pathname, 'Config')
57 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
58 if os.path.exists(flowcell_id_path):
59 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
60 self._flowcell_id = flowcell_id_tree.findtext('Text')
62 path_fields = self.pathname.split('_')
63 if len(path_fields) > 0:
64 # guessing last element of filename
65 flowcell_id = path_fields[-1]
67 flowcell_id = 'unknown'
70 "Flowcell id was not found, guessing %s" % (
72 self._flowcell_id = flowcell_id
73 return self._flowcell_id
74 flowcell_id = property(_get_flowcell_id)
76 def get_elements(self):
78 make one master xml file from all of our sub-components.
80 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
81 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
82 flowcell.text = self.flowcell_id
83 root.append(self.image_analysis.get_elements())
84 root.append(self.bustard.get_elements())
85 root.append(self.gerald.get_elements())
88 def set_elements(self, tree):
89 # this file gets imported by all the others,
90 # so we need to hide the imports to avoid a cyclic imports
91 from htsworkflow.pipelines import firecrest
92 from htsworkflow.pipelines import ipar
93 from htsworkflow.pipelines import bustard
94 from htsworkflow.pipelines import gerald
96 tag = tree.tag.lower()
97 if tag != PipelineRun.PIPELINE_RUN.lower():
98 raise ValueError('Pipeline Run Expecting %s got %s' % (
99 PipelineRun.PIPELINE_RUN, tag))
101 tag = element.tag.lower()
102 if tag == PipelineRun.FLOWCELL_ID.lower():
103 self._flowcell_id = element.text
104 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
105 # you should only have Firecrest or IPAR, never both of them.
106 elif tag == firecrest.Firecrest.FIRECREST.lower():
107 self.image_analysis = firecrest.Firecrest(xml=element)
108 elif tag == ipar.IPAR.IPAR.lower():
109 self.image_analysis = ipar.IPAR(xml=element)
110 elif tag == bustard.Bustard.BUSTARD.lower():
111 self.bustard = bustard.Bustard(xml=element)
112 elif tag == gerald.Gerald.GERALD.lower():
113 self.gerald = gerald.Gerald(xml=element)
115 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
117 def _get_run_name(self):
119 Given a run tuple, find the latest date and use that as our name
121 if self._name is None:
122 tmax = max(self.image_analysis.time, self.bustard.time, self.gerald.time)
123 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
124 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
126 name = property(_get_run_name)
128 def save(self, destdir=None):
131 logging.info("Saving run report "+ self.name)
132 xml = self.get_elements()
134 dest_pathname = os.path.join(destdir, self.name)
135 ElementTree.ElementTree(xml).write(dest_pathname)
137 def load(self, filename):
138 logging.info("Loading run report from " + filename)
139 tree = ElementTree.parse(filename).getroot()
140 self.set_elements(tree)
142 def load_pipeline_run_xml(pathname):
144 Load and instantiate a Pipeline run from a run xml file
147 - `pathname` : location of an run xml file
149 :Returns: initialized PipelineRun object
151 tree = ElementTree.parse(pathname).getroot()
152 run = PipelineRun(xml=tree)
155 def get_runs(runfolder):
157 Search through a run folder for all the various sub component runs
158 and then return a PipelineRun for each different combination.
160 For example if there are two different GERALD runs, this will
161 generate two different PipelineRun objects, that differ
162 in there gerald component.
164 from htsworkflow.pipelines import firecrest
165 from htsworkflow.pipelines import ipar
166 from htsworkflow.pipelines import bustard
167 from htsworkflow.pipelines import gerald
169 def scan_post_image_analysis(runs, runfolder, image_analysis, pathname):
170 logging.info("Looking for bustard directories in %s" % (pathname,))
171 bustard_dirs = glob(os.path.join(pathname, "Bustard*"))
172 # RTA BaseCalls looks enough like Bustard.
173 bustard_dirs.extend(glob(os.path.join(pathname, "BaseCalls")))
174 for bustard_pathname in bustard_dirs:
175 logging.info("Found bustard directory %s" % (bustard_pathname,))
176 b = bustard.bustard(bustard_pathname)
177 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
178 logging.info("Looking for gerald directories in %s" % (pathname,))
179 for gerald_pathname in glob(gerald_glob):
180 logging.info("Found gerald directory %s" % (gerald_pathname,))
182 g = gerald.gerald(gerald_pathname)
183 p = PipelineRun(runfolder)
184 p.image_analysis = image_analysis
189 logging.error("Ignoring " + str(e))
191 datadir = os.path.join(runfolder, 'Data')
193 logging.info('Searching for runs in ' + datadir)
195 # scan for firecrest directories
196 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
197 logging.info('Found firecrest in ' + datadir)
198 image_analysis = firecrest.firecrest(firecrest_pathname)
199 if image_analysis is None:
201 "%s is an empty or invalid firecrest directory" % (firecrest_pathname,)
204 scan_post_image_analysis(
205 runs, runfolder, image_analysis, firecrest_pathname
207 # scan for IPAR directories
208 ipar_dirs = glob(os.path.join(datadir, "IPAR_*"))
209 # The Intensities directory from the RTA software looks a lot like IPAR
210 ipar_dirs.extend(glob(os.path.join(datadir, 'Intensities')))
211 for ipar_pathname in ipar_dirs:
212 logging.info('Found ipar directories in ' + datadir)
213 image_analysis = ipar.ipar(ipar_pathname)
214 if image_analysis is None:
216 "%s is an empty or invalid IPAR directory" %(ipar_pathname,)
219 scan_post_image_analysis(
220 runs, runfolder, image_analysis, ipar_pathname
225 def get_specific_run(gerald_dir):
227 Given a gerald directory, construct a PipelineRun out of its parents
229 Basically this allows specifying a particular run instead of the previous
230 get_runs which scans a runfolder for various combinations of
231 firecrest/ipar/bustard/gerald runs.
233 from htsworkflow.pipelines import firecrest
234 from htsworkflow.pipelines import ipar
235 from htsworkflow.pipelines import bustard
236 from htsworkflow.pipelines import gerald
238 gerald_dir = os.path.expanduser(gerald_dir)
239 bustard_dir = os.path.abspath(os.path.join(gerald_dir, '..'))
240 image_dir = os.path.abspath(os.path.join(gerald_dir, '..', '..'))
242 runfolder_dir = os.path.abspath(os.path.join(image_dir, '..','..'))
244 logging.info('--- use-run detected options ---')
245 logging.info('runfolder: %s' % (runfolder_dir,))
246 logging.info('image_dir: %s' % (image_dir,))
247 logging.info('bustard_dir: %s' % (bustard_dir,))
248 logging.info('gerald_dir: %s' % (gerald_dir,))
250 # find our processed image dir
252 # split into parent, and leaf directory
253 # leaf directory should be an IPAR or firecrest directory
254 data_dir, short_image_dir = os.path.split(image_dir)
255 logging.info('data_dir: %s' % (data_dir,))
256 logging.info('short_iamge_dir: %s' %(short_image_dir,))
258 # guess which type of image processing directory we have by looking
259 # in the leaf directory name
260 if re.search('Firecrest', short_image_dir, re.IGNORECASE) is not None:
261 image_run = firecrest.firecrest(image_dir)
262 elif re.search('IPAR', short_image_dir, re.IGNORECASE) is not None:
263 image_run = ipar.ipar(image_dir)
264 elif re.search('Intensities', short_image_dir, re.IGNORECASE) is not None:
265 image_run = ipar.ipar(image_dir)
267 # if we din't find a run, report the error and return
268 if image_run is None:
269 msg = '%s does not contain an image processing step' % (image_dir,)
273 # find our base calling
274 base_calling_run = bustard.bustard(bustard_dir)
275 if base_calling_run is None:
276 logging.error('%s does not contain a bustard run' % (bustard_dir,))
280 gerald_run = gerald.gerald(gerald_dir)
281 if gerald_run is None:
282 logging.error('%s does not contain a gerald run' % (gerald_dir,))
285 p = PipelineRun(runfolder_dir)
286 p.image_analysis = image_run
287 p.bustard = base_calling_run
288 p.gerald = gerald_run
290 logging.info('Constructed PipelineRun from %s' % (gerald_dir,))
293 def extract_run_parameters(runs):
295 Search through runfolder_path for various runs and grab their parameters
300 def summarize_mapped_reads(genome_map, mapped_reads):
302 Summarize per chromosome reads into a genome count
303 But handle spike-in/contamination symlinks seperately.
305 summarized_reads = {}
308 for k, v in mapped_reads.items():
309 path, k = os.path.split(k)
310 if len(path) > 0 and not genome_map.has_key(path):
314 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
315 summarized_reads[genome] = genome_reads
316 return summarized_reads
318 def summarize_lane(gerald, lane_id):
320 summary_results = gerald.summary.lane_results
321 for end in range(len(summary_results)):
322 eland_result = gerald.eland_results.results[end][lane_id]
323 report.append("Sample name %s" % (eland_result.sample_name))
324 report.append("Lane id %s end %s" % (eland_result.lane_id, end))
325 if end < len(summary_results) and summary_results[end].has_key(eland_result.lane_id):
326 cluster = summary_results[end][eland_result.lane_id].cluster
327 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
328 report.append("Total Reads: %d" % (eland_result.reads))
330 if hasattr(eland_result, 'match_codes'):
331 mc = eland_result.match_codes
333 nm_percent = float(nm)/eland_result.reads * 100
335 qc_percent = float(qc)/eland_result.reads * 100
337 report.append("No Match: %d (%2.2g %%)" % (nm, nm_percent))
338 report.append("QC Failed: %d (%2.2g %%)" % (qc, qc_percent))
339 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
340 (mc['U0'], mc['U1'], mc['U2']))
341 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
342 (mc['R0'], mc['R1'], mc['R2']))
344 if hasattr(eland_result, 'genome_map'):
345 report.append("Mapped Reads")
346 mapped_reads = summarize_mapped_reads(eland_result.genome_map, eland_result.mapped_reads)
347 for name, counts in mapped_reads.items():
348 report.append(" %s: %d" % (name, counts))
353 def summary_report(runs):
355 Summarize cluster numbers and mapped read counts for a runfolder
360 report.append('Summary for %s' % (run.name,))
362 eland_keys = run.gerald.eland_results.results[0].keys()
363 eland_keys.sort(alphanum)
365 for lane_id in eland_keys:
366 report.extend(summarize_lane(run.gerald, lane_id))
369 return os.linesep.join(report)
371 def is_compressed(filename):
372 if os.path.splitext(filename)[1] == ".gz":
374 elif os.path.splitext(filename)[1] == '.bz2':
379 def save_summary_file(gerald_object, cycle_dir):
381 summary_path = os.path.join(gerald_object.pathname, 'Summary.htm')
382 if os.path.exists(summary_path):
383 logging.info('Copying %s to %s' % (summary_path, cycle_dir))
384 shutil.copy(summary_path, cycle_dir)
386 logging.info('Summary file %s was not found' % (summary_path,))
389 def compress_score_files(gerald_object, cycle_dir):
391 Compress score files into our result directory
393 # check for g.pathname/Temp a new feature of 1.1rc1
394 scores_path = gerald_object.pathname
395 scores_path_temp = os.path.join(scores_path, 'Temp')
396 if os.path.isdir(scores_path_temp):
397 scores_path = scores_path_temp
399 # hopefully we have a directory that contains s_*_score files
401 for f in os.listdir(scores_path):
402 if re.match('.*_score.txt', f):
403 score_files.append(f)
405 tar_cmd = ['/bin/tar', 'c'] + score_files
406 bzip_cmd = [ 'bzip2', '-9', '-c' ]
407 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
408 tar_dest = open(tar_dest_name, 'w')
409 logging.info("Compressing score files from %s" % (scores_path,))
410 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
411 logging.info("Running bzip2: " + " ".join(bzip_cmd))
412 logging.info("Writing to %s" %(tar_dest_name))
415 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, env=env,
417 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
420 def compress_eland_results(gerald_object, cycle_dir):
422 Compress eland result files into the archive directory
424 # copy & bzip eland files
425 for lanes_dictionary in gerald_object.eland_results.results:
426 for eland_lane in lanes_dictionary.values():
427 source_name = eland_lane.pathname
428 path, name = os.path.split(eland_lane.pathname)
429 dest_name = os.path.join(cycle_dir, name)
430 logging.info("Saving eland file %s to %s" % \
431 (source_name, dest_name))
433 if is_compressed(name):
434 logging.info('Already compressed, Saving to %s' % (dest_name, ))
435 shutil.copy(source_name, dest_name)
439 args = ['bzip2', '-9', '-c', source_name]
440 logging.info('Running: %s' % ( " ".join(args) ))
441 bzip_dest = open(dest_name, 'w')
442 bzip = subprocess.Popen(args, stdout=bzip_dest)
443 logging.info('Saving to %s' % (dest_name, ))
446 def extract_results(runs, output_base_dir=None, site="individual", num_jobs=1):
447 if output_base_dir is None:
448 output_base_dir = os.getcwd()
451 result_dir = os.path.join(output_base_dir, r.flowcell_id)
452 logging.info("Using %s as result directory" % (result_dir,))
453 if not os.path.exists(result_dir):
457 cycle = "C%d-%d" % (r.image_analysis.start, r.image_analysis.stop)
458 logging.info("Filling in %s" % (cycle,))
459 cycle_dir = os.path.join(result_dir, cycle)
460 if os.path.exists(cycle_dir):
461 logging.error("%s already exists, not overwriting" % (cycle_dir,))
466 # copy stuff out of the main run
473 save_summary_file(g, cycle_dir)
476 compress_score_files(g, cycle_dir)
478 # compress eland result files
479 compress_eland_results(g, cycle_dir)
483 srf_cmds = srf.make_commands(r.pathname, lanes, "woldlab", cycle_dir)
484 srf.run_srf_commands(r.bustard.pathname, srf_cmds, 2)
486 def rm_list(files, dry_run=True):
488 if os.path.exists(f):
489 logging.info('deleting %s' % (f,))
496 logging.warn("%s doesn't exist."% (f,))
498 def clean_runs(runs, dry_run=True):
500 Clean up run folders to optimize for compression.
503 logging.info('In dry-run mode')
506 logging.info('Cleaninging %s' % (run.pathname,))
508 runlogs = glob(os.path.join(run.pathname, 'RunLog*xml'))
509 rm_list(runlogs, dry_run)
511 pipeline_logs = glob(os.path.join(run.pathname, 'pipeline*.txt'))
512 rm_list(pipeline_logs, dry_run)
514 # rm NetCopy.log? Isn't this robocopy?
515 logs = glob(os.path.join(run.pathname, '*.log'))
516 rm_list(logs, dry_run)
519 calibration_dir = glob(os.path.join(run.pathname, 'Calibration_*'))
520 rm_list(calibration_dir, dry_run)
522 logging.info("Cleaning images")
523 image_dirs = glob(os.path.join(run.pathname, 'Images', 'L*'))
524 rm_list(image_dirs, dry_run)
525 # cd Data/C1-*_Firecrest*
526 logging.info("Cleaning intermediate files")
527 # make clean_intermediate
528 if os.path.exists(os.path.join(run.image_analysis.pathname, 'Makefile')):
529 clean_process = subprocess.Popen(['make', 'clean_intermediate'],
530 cwd=run.image_analysis.pathname,)