2 Core information needed to inspect a runfolder.
14 from xml.etree import ElementTree
15 except ImportError, e:
16 from elementtree import ElementTree
18 EUROPEAN_STRPTIME = "%d-%m-%Y"
19 EUROPEAN_DATE_RE = "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4,4})"
20 VERSION_RE = "([0-9\.]+)"
21 USER_RE = "([a-zA-Z0-9]+)"
22 LANES_PER_FLOWCELL = 8
24 from gaworkflow.util.alphanum import alphanum
25 from gaworkflow.util.ethelp import indent, flatten
28 class PipelineRun(object):
30 Capture "interesting" information about a pipeline run
33 PIPELINE_RUN = 'PipelineRun'
34 FLOWCELL_ID = 'FlowcellID'
36 def __init__(self, pathname=None, firecrest=None, bustard=None, gerald=None, xml=None):
37 self.pathname = os.path.normpath(pathname)
39 self._flowcell_id = None
40 self.firecrest = firecrest
41 self.bustard = bustard
45 self.set_elements(xml)
47 def _get_flowcell_id(self):
49 if self._flowcell_id is None:
50 config_dir = os.path.join(self.pathname, 'Config')
51 flowcell_id_path = os.path.join(config_dir, 'FlowcellId.xml')
52 if os.path.exists(flowcell_id_path):
53 flowcell_id_tree = ElementTree.parse(flowcell_id_path)
54 self._flowcell_id = flowcell_id_tree.findtext('Text')
56 path_fields = self.pathname.split('_')
57 if len(path_fields) > 0:
58 # guessing last element of filename
59 flowcell_id = path_fields[-1]
61 flowcell_id = 'unknown'
64 "Flowcell id was not found, guessing %s" % (
66 self._flowcell_id = flowcell_id
67 return self._flowcell_id
68 flowcell_id = property(_get_flowcell_id)
70 def get_elements(self):
72 make one master xml file from all of our sub-components.
74 root = ElementTree.Element(PipelineRun.PIPELINE_RUN)
75 flowcell = ElementTree.SubElement(root, PipelineRun.FLOWCELL_ID)
76 flowcell.text = self.flowcell_id
77 root.append(self.firecrest.get_elements())
78 root.append(self.bustard.get_elements())
79 root.append(self.gerald.get_elements())
82 def set_elements(self, tree):
83 # this file gets imported by all the others,
84 # so we need to hide the imports to avoid a cyclic imports
85 from gaworkflow.pipeline import firecrest
86 from gaworkflow.pipeline import bustard
87 from gaworkflow.pipeline import gerald
89 tag = tree.tag.lower()
90 if tag != PipelineRun.PIPELINE_RUN.lower():
91 raise ValueError('Pipeline Run Expecting %s got %s' % (
92 PipelineRun.PIPELINE_RUN, tag))
94 tag = element.tag.lower()
95 if tag == PipelineRun.FLOWCELL_ID.lower():
96 self._flowcell_id = element.text
97 #ok the xword.Xword.XWORD pattern for module.class.constant is lame
98 elif tag == firecrest.Firecrest.FIRECREST.lower():
99 self.firecrest = firecrest.Firecrest(xml=element)
100 elif tag == bustard.Bustard.BUSTARD.lower():
101 self.bustard = bustard.Bustard(xml=element)
102 elif tag == gerald.Gerald.GERALD.lower():
103 self.gerald = gerald.Gerald(xml=element)
105 logging.warn('PipelineRun unrecognized tag %s' % (tag,))
107 def _get_run_name(self):
109 Given a run tuple, find the latest date and use that as our name
111 if self._name is None:
112 tmax = max(self.firecrest.time, self.bustard.time, self.gerald.time)
113 timestamp = time.strftime('%Y-%m-%d', time.localtime(tmax))
114 self._name = 'run_'+self.flowcell_id+"_"+timestamp+'.xml'
116 name = property(_get_run_name)
118 def save(self, destdir=None):
121 logging.info("Saving run report "+ self.name)
122 xml = self.get_elements()
124 dest_pathname = os.path.join(destdir, self.name)
125 ElementTree.ElementTree(xml).write(dest_pathname)
127 def load(self, filename):
128 logging.info("Loading run report from " + filename)
129 tree = ElementTree.parse(filename).getroot()
130 self.set_elements(tree)
132 def get_runs(runfolder):
134 Search through a run folder for all the various sub component runs
135 and then return a PipelineRun for each different combination.
137 For example if there are two different GERALD runs, this will
138 generate two different PipelineRun objects, that differ
139 in there gerald component.
141 from gaworkflow.pipeline import firecrest
142 from gaworkflow.pipeline import bustard
143 from gaworkflow.pipeline import gerald
145 datadir = os.path.join(runfolder, 'Data')
147 logging.info('Searching for runs in ' + datadir)
149 for firecrest_pathname in glob(os.path.join(datadir,"*Firecrest*")):
150 f = firecrest.firecrest(firecrest_pathname)
151 bustard_glob = os.path.join(firecrest_pathname, "Bustard*")
152 for bustard_pathname in glob(bustard_glob):
153 b = bustard.bustard(bustard_pathname)
154 gerald_glob = os.path.join(bustard_pathname, 'GERALD*')
155 for gerald_pathname in glob(gerald_glob):
157 g = gerald.gerald(gerald_pathname)
158 runs.append(PipelineRun(runfolder, f, b, g))
160 print "Ignoring", str(e)
164 def extract_run_parameters(runs):
166 Search through runfolder_path for various runs and grab their parameters
171 def summarize_mapped_reads(mapped_reads):
173 Summarize per chromosome reads into a genome count
174 But handle spike-in/contamination symlinks seperately.
176 summarized_reads = {}
179 for k, v in mapped_reads.items():
180 path, k = os.path.split(k)
185 summarized_reads[k] = summarized_reads.setdefault(k, 0) + v
186 summarized_reads[genome] = genome_reads
187 return summarized_reads
189 def summary_report(runs):
191 Summarize cluster numbers and mapped read counts for a runfolder
196 report.append('Summary for %s' % (run.name,))
198 eland_keys = run.gerald.eland_results.results.keys()
199 eland_keys.sort(alphanum)
201 lane_results = run.gerald.summary.lane_results
202 for lane_id in eland_keys:
203 result = run.gerald.eland_results.results[lane_id]
204 report.append("Sample name %s" % (result.sample_name))
205 report.append("Lane id %s" % (result.lane_id,))
206 cluster = lane_results[result.lane_id].cluster
207 report.append("Clusters %d +/- %d" % (cluster[0], cluster[1]))
208 report.append("Total Reads: %d" % (result.reads))
209 mc = result._match_codes
210 report.append("No Match: %d" % (mc['NM']))
211 report.append("QC Failed: %d" % (mc['QC']))
212 report.append('Unique (0,1,2 mismatches) %d %d %d' % \
213 (mc['U0'], mc['U1'], mc['U2']))
214 report.append('Repeat (0,1,2 mismatches) %d %d %d' % \
215 (mc['R0'], mc['R1'], mc['R2']))
216 report.append("Mapped Reads")
217 mapped_reads = summarize_mapped_reads(result.mapped_reads)
218 for name, counts in mapped_reads.items():
219 report.append(" %s: %d" % (name, counts))
222 return os.linesep.join(report)
224 def extract_results(runs, output_base_dir=None):
225 if output_base_dir is None:
226 output_base_dir = os.getcwd()
229 result_dir = os.path.join(output_base_dir, r.flowcell_id)
230 logging.info("Using %s as result directory" % (result_dir,))
231 if not os.path.exists(result_dir):
235 cycle = "C%d-%d" % (r.firecrest.start, r.firecrest.stop)
236 logging.info("Filling in %s" % (cycle,))
237 cycle_dir = os.path.join(result_dir, cycle)
238 if os.path.exists(cycle_dir):
239 logging.error("%s already exists, not overwriting" % (cycle_dir,))
244 # copy stuff out of the main run
252 for f in os.listdir(g.pathname):
253 if re.match('.*_score.txt', f):
254 score_files.append(f)
256 tar_cmd = ['/bin/tar', 'c'] + score_files
257 bzip_cmd = [ 'bzip2', '-9', '-c' ]
258 tar_dest_name =os.path.join(cycle_dir, 'scores.tar.bz2')
259 tar_dest = open(tar_dest_name, 'w')
260 logging.info("Compressing score files in %s" % (g.pathname,))
261 logging.info("Running tar: " + " ".join(tar_cmd[:10]))
262 logging.info("Running bzip2: " + " ".join(bzip_cmd))
263 logging.info("Writing to %s" %(tar_dest_name))
265 tar = subprocess.Popen(tar_cmd, stdout=subprocess.PIPE, shell=False, cwd=g.pathname)
266 bzip = subprocess.Popen(bzip_cmd, stdin=tar.stdout, stdout=tar_dest)
269 # copy & bzip eland files
270 for eland_lane in g.eland_results.values():
271 source_name = eland_lane.pathname
272 path, name = os.path.split(eland_lane.pathname)
273 dest_name = os.path.join(cycle_dir, name+'.bz2')
275 args = ['bzip2', '-9', '-c', source_name]
276 logging.info('Running: %s' % ( " ".join(args) ))
277 bzip_dest = open(dest_name, 'w')
278 bzip = subprocess.Popen(args, stdout=bzip_dest)
279 logging.info('Saving to %s' % (dest_name, ))