2 Extract configuration from Illumina Bustard Directory.
4 This includes the version number, run date, bustard executable parameters, and
8 from datetime import date
16 from htsworkflow.pipelines.runfolder import \
22 __docformat__ = "restructuredtext en"
24 LANE_LIST = range(1,9)
26 class Phasing(object):
28 PREPHASING = 'Prephasing'
30 def __init__(self, fromfile=None, xml=None):
33 self.prephasing = None
35 if fromfile is not None:
36 self._initialize_from_file(fromfile)
38 self.set_elements(xml)
40 def _initialize_from_file(self, pathname):
41 path, name = os.path.split(pathname)
42 basename, ext = os.path.splitext(name)
43 # the last character of the param base filename should be the
45 tree = ElementTree.parse(pathname).getroot()
46 self.set_elements(tree)
47 self.lane = int(basename[-1])
49 def get_elements(self):
50 root = ElementTree.Element(Phasing.PHASING, {'lane': str(self.lane)})
51 root.tail = os.linesep
52 phasing = ElementTree.SubElement(root, Phasing.PHASING)
53 phasing.text = str(self.phasing)
54 phasing.tail = os.linesep
55 prephasing = ElementTree.SubElement(root, Phasing.PREPHASING)
56 prephasing.text = str(self.prephasing)
57 prephasing.tail = os.linesep
60 def set_elements(self, tree):
61 if tree.tag not in ('Phasing', 'Parameters'):
62 raise ValueError('exptected Phasing or Parameters')
63 lane = tree.attrib.get('lane', None)
66 for element in list(tree):
67 if element.tag == Phasing.PHASING:
68 self.phasing = float(element.text)
69 elif element.tag == Phasing.PREPHASING:
70 self.prephasing = float(element.text)
72 class CrosstalkMatrix(object):
73 CROSSTALK = "MatrixElements"
77 def __init__(self, fromfile=None, xml=None):
80 if fromfile is not None:
81 self._initialize_from_file(fromfile)
83 self.set_elements(xml)
85 def _initialize_from_file(self, pathname):
86 data = open(pathname).readlines()
87 auto_header = '# Auto-generated frequency response matrix'
88 if data[0].strip() == auto_header and len(data) == 9:
89 # skip over lines 1,2,3,4 which contain the 4 bases
90 self.base['A'] = [ float(v) for v in data[5].split() ]
91 self.base['C'] = [ float(v) for v in data[6].split() ]
92 self.base['G'] = [ float(v) for v in data[7].split() ]
93 self.base['T'] = [ float(v) for v in data[8].split() ]
95 self.base['A'] = [ float(v) for v in data[:4] ]
96 self.base['C'] = [ float(v) for v in data[4:8] ]
97 self.base['G'] = [ float(v) for v in data[8:12] ]
98 self.base['T'] = [ float(v) for v in data[12:16] ]
100 raise RuntimeError("matrix file %s is unusual" % (pathname,))
101 def get_elements(self):
102 root = ElementTree.Element(CrosstalkMatrix.CROSSTALK)
103 root.tail = os.linesep
104 base_order = ['A','C','G','T']
106 base_element = ElementTree.SubElement(root, CrosstalkMatrix.BASE)
107 base_element.text = b
108 base_element.tail = os.linesep
110 for value in self.base[b]:
111 crosstalk_value = ElementTree.SubElement(root, CrosstalkMatrix.ELEMENT)
112 crosstalk_value.text = unicode(value)
113 crosstalk_value.tail = os.linesep
117 def set_elements(self, tree):
118 if tree.tag != CrosstalkMatrix.CROSSTALK:
119 raise ValueError('Invalid run-xml exptected '+CrosstalkMatrix.CROSSTALK)
123 for element in tree.getchildren():
124 # read in the order of the bases
125 if element.tag == 'Base':
126 base_order.append(element.text)
127 elif element.tag == 'Element':
128 # we're done reading bases, now its just the 4x4 matrix
129 # written out as a list of elements
130 # if this is the first element, make a copy of the list
131 # to play with and initialize an empty list for the current base
132 if current_base is None:
133 current_base = copy(base_order)
134 self.base[current_base[0]] = []
135 # we found (probably) 4 bases go to the next base
136 if current_index == len(base_order):
139 self.base[current_base[0]] = []
140 value = float(element.text)
141 self.base[current_base[0]].append(value)
145 raise RuntimeError("Unrecognized tag in run xml: %s" %(element.tag,))
147 def crosstalk_matrix_from_bustard_config(bustard_path, bustard_config_tree):
149 Analyze the bustard config file and try to find the crosstalk matrix.
151 bustard_run = bustard_config_tree[0]
152 if bustard_run.tag != 'Run':
153 raise RuntimeError('Expected Run tag, got %s' % (bustard_run.tag,))
155 call_parameters = bustard_run.find('BaseCallParameters')
156 if call_parameters is None:
157 raise RuntimeError('Missing BaseCallParameters section')
159 matrix = call_parameters.find('Matrix')
161 raise RuntimeError('Expected to find Matrix in Bustard BaseCallParameters')
163 matrix_auto_flag = int(matrix.find('AutoFlag').text)
164 matrix_auto_lane = int(matrix.find('AutoLane').text)
168 # we estimated the matrix from something in this run.
169 # though we don't really care which lane it was
170 if matrix_auto_lane == 0:
171 auto_lane_fragment = ""
173 auto_lane_fragment = "_%d" % ( matrix_auto_lane,)
175 for matrix_name in ['s%s_02_matrix.txt' % (auto_lane_fragment,),
176 's%s_1_matrix.txt' % (auto_lane_fragment,),
178 matrix_path = os.path.join(bustard_path, 'Matrix', matrix_name)
179 if os.path.exists(matrix_path):
182 raise RuntimeError("Couldn't find matrix for lane %d" % \
185 crosstalk = CrosstalkMatrix(matrix_path)
187 matrix_elements = call_parameters.find('MatrixElements')
188 # the matrix was provided
189 if matrix_elements is not None:
190 crosstalk = CrosstalkMatrix(xml=matrix_elements)
192 # we have no crosstalk matrix?
197 class Bustard(object):
202 SOFTWARE_VERSION = 'version'
205 PARAMETERS = 'Parameters'
206 BUSTARD_CONFIG = 'BaseCallAnalysis'
208 def __init__(self, xml=None):
213 self.crosstalk = None
215 self.bustard_config = None
218 self.set_elements(xml)
221 if self.date is None:
223 return time.mktime(self.date.timetuple())
224 time = property(_get_time, doc='return run time as seconds since epoch')
227 #print ElementTree.tostring(self.get_elements())
228 ElementTree.dump(self.get_elements())
230 def get_elements(self):
231 root = ElementTree.Element('Bustard',
232 {'version': str(Bustard.XML_VERSION)})
233 version = ElementTree.SubElement(root, Bustard.SOFTWARE_VERSION)
234 version.text = self.version
235 if self.time is not None:
236 run_date = ElementTree.SubElement(root, Bustard.DATE)
237 run_date.text = str(self.time)
238 if self.user is not None:
239 user = ElementTree.SubElement(root, Bustard.USER)
240 user.text = self.user
241 params = ElementTree.SubElement(root, Bustard.PARAMETERS)
243 # add phasing parameters
244 for lane in LANE_LIST:
245 if self.phasing.has_key(lane):
246 params.append(self.phasing[lane].get_elements())
248 # add crosstalk matrix if it exists
249 if self.crosstalk is not None:
250 root.append(self.crosstalk.get_elements())
252 # add bustard config if it exists
253 if self.bustard_config is not None:
254 root.append(self.bustard_config)
257 def set_elements(self, tree):
258 if tree.tag != Bustard.BUSTARD:
259 raise ValueError('Expected "Bustard" SubElements')
260 xml_version = int(tree.attrib.get('version', 0))
261 if xml_version > Bustard.XML_VERSION:
262 logging.warn('Bustard XML tree is a higher version than this class')
263 for element in list(tree):
264 if element.tag == Bustard.SOFTWARE_VERSION:
265 self.version = element.text
266 elif element.tag == Bustard.DATE:
267 self.date = date.fromtimestamp(float(element.text))
268 elif element.tag == Bustard.USER:
269 self.user = element.text
270 elif element.tag == Bustard.PARAMETERS:
271 for param in element:
272 p = Phasing(xml=param)
273 self.phasing[p.lane] = p
274 elif element.tag == CrosstalkMatrix.CROSSTALK:
275 self.crosstalk = CrosstalkMatrix(xml=element)
276 elif element.tag == Bustard.BUSTARD_CONFIG:
277 self.bustard_config = element
279 raise ValueError("Unrecognized tag: %s" % (element.tag,))
281 def bustard(pathname):
283 Construct a Bustard object by analyzing an Illumina Bustard directory.
286 - `pathname`: A bustard directory
289 Fully initialized Bustard object.
292 pathname = os.path.abspath(pathname)
293 path, name = os.path.split(pathname)
294 groups = name.split("_")
295 if groups[0].lower().startswith('bustard'):
296 version = re.search(VERSION_RE, groups[0])
297 b.version = version.group(1)
298 t = time.strptime(groups[1], EUROPEAN_STRPTIME)
299 b.date = date(*t[0:3])
301 elif groups[0] == 'BaseCalls':
307 b.pathname = pathname
308 bustard_config_filename = os.path.join(pathname, 'config.xml')
309 paramfiles = glob(os.path.join(pathname, "params?.xml"))
310 for paramfile in paramfiles:
311 phasing = Phasing(paramfile)
312 assert (phasing.lane >= 1 and phasing.lane <= 8)
313 b.phasing[phasing.lane] = phasing
314 # I only found these in Bustard1.9.5/1.9.6 directories
315 if b.version in ('1.9.5', '1.9.6'):
316 # at least for our runfolders for 1.9.5 and 1.9.6 matrix[1-8].txt are always the same
317 crosstalk_file = os.path.join(pathname, "matrix1.txt")
318 b.crosstalk = CrosstalkMatrix(crosstalk_file)
319 # for version 1.3.2 of the pipeline the bustard version number went down
320 # to match the rest of the pipeline. However there's now a nifty
321 # new (useful) bustard config file.
322 elif os.path.exists(bustard_config_filename):
323 bustard_config_root = ElementTree.parse(bustard_config_filename)
324 b.bustard_config = bustard_config_root.getroot()
325 b.crosstalk = crosstalk_matrix_from_bustard_config(b.pathname, b.bustard_config)
326 software = bustard_config_root.find('*/Software')
327 b.version = software.attrib['Version']
328 #b.version = software.attrib['Name'] + "-" + software.attrib['Version']
334 Reconstruct a htsworkflow.pipelines.Bustard object from an xml block
340 def make_cmdline_parser():
341 from optparse import OptionParser
342 parser = OptionParser('%prog: bustard_directory')
346 parser = make_cmdline_parser()
347 opts, args = parser.parse_args(cmdline)
349 for bustard_dir in args:
350 print u'analyzing bustard directory: ' + unicode(bustard_dir)
351 bustard_object = bustard(bustard_dir)
352 bustard_object.dump()
354 bustard_object2 = Bustard(xml=bustard_object.get_elements())
355 print ('-------------------------------------')
356 bustard_object2.dump()
357 print ('=====================================')
358 b1_tree = bustard_object.get_elements()
359 b1 = ElementTree.tostring(b1_tree).split(os.linesep)
360 b2_tree = bustard_object2.get_elements()
361 b2 = ElementTree.tostring(b2_tree).split(os.linesep)
362 for line1, line2 in zip(b1, b2):
367 if __name__ == "__main__":