2 Core information needed to inspect a runfolder.
14 from xml.etree import ElementTree
15 except ImportError, e:
16 from elementtree import ElementTree
18 EUROPEAN_STRPTIME = "%d-%m-%Y"
19 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
20 VERSION_RE = "([0-9\.]+)"
21 USER_RE = "([a-zA-Z0-9]+)"
22 LANES_PER_FLOWCELL = 8
24 from gaworkflow.util.alphanum import alphanum
25 from gaworkflow.util.ethelp import indent, flatten
28 class PipelineRun(object):
30 Capture "interesting" information about a pipeline run
33 PIPELINE_RUN = 'PipelineRun'
34 FLOWCELL_ID = 'FlowcellID'
36 def __init__(self, pathname=None, firecrest=None, bustard=None, gerald=None, xml=None):
37 if pathname is not None:
38 self.pathname = os.path.normpath(pathname)
42 self._flowcell_id = None
43 self.firecrest = firecrest
44 self.bustard = bustard
48 self.set_elements(xml)
50 def _get_flowcell_id(self):
52 if self._flowcell_id is None:
53 config_dir = os.path.join(self.pathname, 'Config')
54 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
55 if os.path.exists(flowcell_id_path):
56 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
57 self._flowcell_id = flowcell_id_tree.findtext('Text')
59 path_fields = self.pathname.split('_')
60 if len(path_fields) > 0:
61 # guessing last element of filename
62 flowcell_id = path_fields[-1]
64 flowcell_id = 'unknown'
67 "Flowcell id was not found, guessing %s" % (
69 self._flowcell_id = flowcell_id
70 return self._flowcell_id
71 flowcell_id = property(_get_flowcell_id)
73 def get_elements(self):
75 make one master xml file from all of our sub-components.
77 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
78 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
79 flowcell.text = self.flowcell_id
80 root.append(self.firecrest.get_elements())
81 root.append(self.bustard.get_elements())
82 root.append(self.gerald.get_elements())
85 def set_elements(self, tree):
86 # this file gets imported by all the others,
87 # so we need to hide the imports to avoid a cyclic imports
88 from gaworkflow.pipeline import firecrest
89 from gaworkflow.pipeline import bustard
90 from gaworkflow.pipeline import gerald
92 tag = tree.tag.lower()
93 if tag != PipelineRun.PIPELINE_RUN.lower():
94 raise ValueError('Pipeline Run Expecting %s got %s' % (
95 PipelineRun.PIPELINE_RUN, tag))
97 tag = element.tag.lower()
98 if tag == PipelineRun.FLOWCELL_ID.lower():
99 self._flowcell_id = element.text
100 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
101 elif tag == firecrest.Firecrest.FIRECREST.lower():
102 self.firecrest = firecrest.Firecrest(xml=element)
103 elif tag == bustard.Bustard.BUSTARD.lower():
104 self.bustard = bustard.Bustard(xml=element)
105 elif tag == gerald.Gerald.GERALD.lower():
106 self.gerald = gerald.Gerald(xml=element)
108 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
110 def _get_run_name(self):
112 Given a run tuple, find the latest date and use that as our name
114 if self._name is None:
115 tmax = max(self.firecrest.time, self.bustard.time, self.gerald.time)
116 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
117 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
119 name = property(_get_run_name)
121 def save(self, destdir=None):
124 logging.info("Saving run report "+ self.name)
125 xml = self.get_elements()
127 dest_pathname = os.path.join(destdir, self.name)
128 ElementTree.ElementTree(xml).write(dest_pathname)
130 def load(self, filename):
131 logging.info("Loading run report from " + filename)
132 tree = ElementTree.parse(filename).getroot()
133 self.set_elements(tree)
135 def get_runs(runfolder):
137 Search through a run folder for all the various sub component runs
138 and then return a PipelineRun for each different combination.
140 For example if there are two different GERALD runs, this will
141 generate two different PipelineRun objects, that differ
142 in there gerald component.
144 from gaworkflow.pipeline import firecrest
145 from gaworkflow.pipeline import bustard
146 from gaworkflow.pipeline import gerald
148 datadir = os.path.join(runfolder, 'Data')
150 logging.info('Searching for runs in ' + datadir)
152 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
153 f = firecrest.firecrest(firecrest_pathname)
154 bustard_glob = os.path.join(firecrest_pathname, "Bustard*")
155 for bustard_pathname in glob(bustard_glob):
156 b = bustard.bustard(bustard_pathname)
157 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
158 for gerald_pathname in glob(gerald_glob):
160 g = gerald.gerald(gerald_pathname)
161 runs.append(PipelineRun(runfolder, f, b, g))
163 print "Ignoring", str(e)
167 def extract_run_parameters(runs):
169 Search through runfolder_path for various runs and grab their parameters
174 def summarize_mapped_reads(mapped_reads):
176 Summarize per chromosome reads into a genome count
177 But handle spike-in/contamination symlinks seperately.
179 summarized_reads = {}
182 for k, v in mapped_reads.items():
183 path, k = os.path.split(k)
188 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
189 summarized_reads[genome] = genome_reads
190 return summarized_reads
192 def summary_report(runs):
194 Summarize cluster numbers and mapped read counts for a runfolder
199 report.append('Summary for %s' % (run.name,))
201 eland_keys = run.gerald.eland_results.results.keys()
202 eland_keys.sort(alphanum)
204 lane_results = run.gerald.summary.lane_results
205 for lane_id in eland_keys:
206 result = run.gerald.eland_results.results[lane_id]
207 report.append("Sample name %s" % (result.sample_name))
208 report.append("Lane id %s" % (result.lane_id,))
209 cluster = lane_results[result.lane_id].cluster
210 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
211 report.append("Total Reads: %d" % (result.reads))
212 mc = result._match_codes
213 report.append("No Match: %d" % (mc['NM']))
214 report.append("QC Failed: %d" % (mc['QC']))
215 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
216 (mc['U0'], mc['U1'], mc['U2']))
217 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
218 (mc['R0'], mc['R1'], mc['R2']))
219 report.append("Mapped Reads")
220 mapped_reads = summarize_mapped_reads(result.mapped_reads)
221 for name, counts in mapped_reads.items():
222 report.append(" %s: %d" % (name, counts))
225 return os.linesep.join(report)
227 def extract_results(runs, output_base_dir=None):
228 if output_base_dir is None:
229 output_base_dir = os.getcwd()
232 result_dir = os.path.join(output_base_dir, r.flowcell_id)
233 logging.info("Using %s as result directory" % (result_dir,))
234 if not os.path.exists(result_dir):
238 cycle = "C%d-%d" % (r.firecrest.start, r.firecrest.stop)
239 logging.info("Filling in %s" % (cycle,))
240 cycle_dir = os.path.join(result_dir, cycle)
241 if os.path.exists(cycle_dir):
242 logging.error("%s already exists, not overwriting" % (cycle_dir,))
247 # copy stuff out of the main run
255 for f in os.listdir(g.pathname):
256 if re.match('.*_score.txt', f):
257 score_files.append(f)
259 tar_cmd = ['/bin/tar', 'c'] + score_files
260 bzip_cmd = [ 'bzip2', '-9', '-c' ]
261 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
262 tar_dest = open(tar_dest_name, 'w')
263 logging.info("Compressing score files in %s" % (g.pathname,))
264 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
265 logging.info("Running bzip2: " + " ".join(bzip_cmd))
266 logging.info("Writing to %s" %(tar_dest_name))
268 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
269 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
272 # copy & bzip eland files
273 for eland_lane in g.eland_results.values():
274 source_name = eland_lane.pathname
275 path, name = os.path.split(eland_lane.pathname)
276 dest_name = os.path.join(cycle_dir, name+'.bz2')
278 args = ['bzip2', '-9', '-c', source_name]
279 logging.info('Running: %s' % ( " ".join(args) ))
280 bzip_dest = open(dest_name, 'w')
281 bzip = subprocess.Popen(args, stdout=bzip_dest)
282 logging.info('Saving to %s' % (dest_name, ))