2 Extract configuration from Illumina Bustard Directory.
4 This includes the version number, run date, bustard executable parameters, and
7 from __future__ import print_function
10 from datetime import date
18 from htsworkflow.pipelines import \
23 LOGGER = logging.getLogger(__name__)
26 __docformat__ = "restructuredtext en"
28 LANE_LIST = range(1,9)
30 class Phasing(object):
32 PREPHASING = 'Prephasing'
34 def __init__(self, fromfile=None, xml=None):
37 self.prephasing = None
39 if fromfile is not None:
40 self._initialize_from_file(fromfile)
42 self.set_elements(xml)
44 def _initialize_from_file(self, pathname):
45 path, name = os.path.split(pathname)
46 basename, ext = os.path.splitext(name)
47 # the last character of the param base filename should be the
49 tree = ElementTree.parse(pathname).getroot()
50 self.set_elements(tree)
51 self.lane = int(basename[-1])
53 def get_elements(self):
54 root = ElementTree.Element(Phasing.PHASING, {'lane': str(self.lane)})
55 root.tail = os.linesep
56 phasing = ElementTree.SubElement(root, Phasing.PHASING)
57 phasing.text = str(self.phasing)
58 phasing.tail = os.linesep
59 prephasing = ElementTree.SubElement(root, Phasing.PREPHASING)
60 prephasing.text = str(self.prephasing)
61 prephasing.tail = os.linesep
64 def set_elements(self, tree):
65 if tree.tag not in ('Phasing', 'Parameters'):
66 raise ValueError('exptected Phasing or Parameters')
67 lane = tree.attrib.get('lane', None)
70 for element in list(tree):
71 if element.tag == Phasing.PHASING:
72 self.phasing = float(element.text)
73 elif element.tag == Phasing.PREPHASING:
74 self.prephasing = float(element.text)
76 class CrosstalkMatrix(object):
77 CROSSTALK = "MatrixElements"
81 def __init__(self, fromfile=None, xml=None):
84 if fromfile is not None:
85 self._initialize_from_file(fromfile)
87 self.set_elements(xml)
89 def _initialize_from_file(self, pathname):
90 data = open(pathname).readlines()
91 auto_header = '# Auto-generated frequency response matrix'
92 if data[0].strip() == auto_header and len(data) == 9:
93 # skip over lines 1,2,3,4 which contain the 4 bases
94 self.base['A'] = [ float(v) for v in data[5].split() ]
95 self.base['C'] = [ float(v) for v in data[6].split() ]
96 self.base['G'] = [ float(v) for v in data[7].split() ]
97 self.base['T'] = [ float(v) for v in data[8].split() ]
99 self.base['A'] = [ float(v) for v in data[:4] ]
100 self.base['C'] = [ float(v) for v in data[4:8] ]
101 self.base['G'] = [ float(v) for v in data[8:12] ]
102 self.base['T'] = [ float(v) for v in data[12:16] ]
104 raise RuntimeError("matrix file %s is unusual" % (pathname,))
105 def get_elements(self):
106 root = ElementTree.Element(CrosstalkMatrix.CROSSTALK)
107 root.tail = os.linesep
108 base_order = ['A','C','G','T']
110 base_element = ElementTree.SubElement(root, CrosstalkMatrix.BASE)
111 base_element.text = b
112 base_element.tail = os.linesep
114 for value in self.base[b]:
115 crosstalk_value = ElementTree.SubElement(root, CrosstalkMatrix.ELEMENT)
116 crosstalk_value.text = unicode(value)
117 crosstalk_value.tail = os.linesep
121 def set_elements(self, tree):
122 if tree.tag != CrosstalkMatrix.CROSSTALK:
123 raise ValueError('Invalid run-xml exptected '+CrosstalkMatrix.CROSSTALK)
127 for element in tree.getchildren():
128 # read in the order of the bases
129 if element.tag == 'Base':
130 base_order.append(element.text)
131 elif element.tag == 'Element':
132 # we're done reading bases, now its just the 4x4 matrix
133 # written out as a list of elements
134 # if this is the first element, make a copy of the list
135 # to play with and initialize an empty list for the current base
136 if current_base is None:
137 current_base = copy(base_order)
138 self.base[current_base[0]] = []
139 # we found (probably) 4 bases go to the next base
140 if current_index == len(base_order):
143 self.base[current_base[0]] = []
144 value = float(element.text)
145 self.base[current_base[0]].append(value)
149 raise RuntimeError("Unrecognized tag in run xml: %s" %(element.tag,))
151 def crosstalk_matrix_from_bustard_config(bustard_path, bustard_config_tree):
153 Analyze the bustard config file and try to find the crosstalk matrix.
155 bustard_run = bustard_config_tree[0]
156 if bustard_run.tag != 'Run':
157 raise RuntimeError('Expected Run tag, got %s' % (bustard_run.tag,))
159 call_parameters = bustard_run.find('BaseCallParameters')
160 if call_parameters is None:
161 raise RuntimeError('Missing BaseCallParameters section')
163 matrix = call_parameters.find('Matrix')
165 raise RuntimeError('Expected to find Matrix in Bustard BaseCallParameters')
167 matrix_auto_flag = int(matrix.find('AutoFlag').text)
168 matrix_auto_lane = int(matrix.find('AutoLane').text)
172 # we estimated the matrix from something in this run.
173 # though we don't really care which lane it was
174 if matrix_auto_lane == 0:
175 # its defaulting to all of the lanes, so just pick one
176 auto_lane_fragment = "_1"
178 auto_lane_fragment = "_%d" % ( matrix_auto_lane,)
180 for matrix_name in ['s%s_02_matrix.txt' % (auto_lane_fragment,),
181 's%s_1_matrix.txt' % (auto_lane_fragment,),
183 matrix_path = os.path.join(bustard_path, 'Matrix', matrix_name)
184 if os.path.exists(matrix_path):
187 raise RuntimeError("Couldn't find matrix for lane %d" % \
190 crosstalk = CrosstalkMatrix(matrix_path)
192 matrix_elements = call_parameters.find('MatrixElements')
193 # the matrix was provided
194 if matrix_elements is not None:
195 crosstalk = CrosstalkMatrix(xml=matrix_elements)
197 # we have no crosstalk matrix?
202 class Bustard(object):
207 SOFTWARE_VERSION = 'version'
210 PARAMETERS = 'Parameters'
211 BUSTARD_CONFIG = 'BaseCallAnalysis'
213 def __init__(self, xml=None):
214 self._path_version = None # version number from directory name
218 self.crosstalk = None
220 self.bustard_config = None
223 self.set_elements(xml)
225 def update_attributes_from_pathname(self):
226 """Update version, date, user from bustard directory names
227 Obviously this wont work for BaseCalls or later runfolders
229 if self.pathname is None:
231 "Set pathname before calling update_attributes_from_pathname")
232 path, name = os.path.split(self.pathname)
234 if not re.match('bustard', name, re.IGNORECASE):
237 groups = name.split("_")
238 version = re.search(VERSION_RE, groups[0])
239 self._path_version = version.group(1)
240 t = time.strptime(groups[1], EUROPEAN_STRPTIME)
241 self.date = date(*t[0:3])
242 self.user = groups[2]
244 def _get_sequence_format(self):
245 """Guess sequence format"""
246 project_glob = os.path.join(self.pathname, 'Project_*')
247 LOGGER.debug("Scanning: %s" % (project_glob,))
248 projects = glob(project_glob)
249 if len(projects) > 0:
250 # Hey we look like a demultiplexed run
252 seqs = glob(os.path.join(self.pathname, '*_seq.txt'))
256 sequence_format = property(_get_sequence_format)
258 def _get_software_version(self):
259 """return software name, version tuple"""
260 if self.bustard_config is None:
261 if self._path_version is not None:
262 return 'Bustard', self._path_version
265 software_nodes = self.bustard_config.xpath('Run/Software')
266 if len(software_nodes) == 0:
268 elif len(software_nodes) > 1:
269 raise RuntimeError("Too many software XML elements for bustard.py")
271 return (software_nodes[0].attrib['Name'],
272 software_nodes[0].attrib['Version'])
274 def _get_software(self):
275 """Return software name"""
276 software_version = self._get_software_version()
277 return software_version[0] if software_version is not None else None
278 software = property(_get_software)
280 def _get_version(self):
281 """Return software name"""
282 software_version = self._get_software_version()
283 return software_version[1] if software_version is not None else None
284 version = property(_get_version)
288 if self.date is None:
290 return time.mktime(self.date.timetuple())
291 time = property(_get_time, doc='return run time as seconds since epoch')
294 #print(ElementTree.tostring(self.get_elements()))
295 ElementTree.dump(self.get_elements())
297 def get_elements(self):
298 root = ElementTree.Element('Bustard',
299 {'version': str(Bustard.XML_VERSION)})
300 version = ElementTree.SubElement(root, Bustard.SOFTWARE_VERSION)
301 version.text = self.version
302 if self.time is not None:
303 run_date = ElementTree.SubElement(root, Bustard.DATE)
304 run_date.text = str(self.time)
305 if self.user is not None:
306 user = ElementTree.SubElement(root, Bustard.USER)
307 user.text = self.user
308 params = ElementTree.SubElement(root, Bustard.PARAMETERS)
310 # add phasing parameters
311 for lane in LANE_LIST:
312 if self.phasing.has_key(lane):
313 params.append(self.phasing[lane].get_elements())
315 # add crosstalk matrix if it exists
316 if self.crosstalk is not None:
317 root.append(self.crosstalk.get_elements())
319 # add bustard config if it exists
320 if self.bustard_config is not None:
321 root.append(self.bustard_config)
324 def set_elements(self, tree):
325 if tree.tag != Bustard.BUSTARD:
326 raise ValueError('Expected "Bustard" SubElements')
327 xml_version = int(tree.attrib.get('version', 0))
328 if xml_version > Bustard.XML_VERSION:
329 LOGGER.warn('Bustard XML tree is a higher version than this class')
330 for element in list(tree):
331 if element.tag == Bustard.SOFTWARE_VERSION:
332 self._path_version = element.text
333 elif element.tag == Bustard.DATE:
334 self.date = date.fromtimestamp(float(element.text))
335 elif element.tag == Bustard.USER:
336 self.user = element.text
337 elif element.tag == Bustard.PARAMETERS:
338 for param in element:
339 p = Phasing(xml=param)
340 self.phasing[p.lane] = p
341 elif element.tag == CrosstalkMatrix.CROSSTALK:
342 self.crosstalk = CrosstalkMatrix(xml=element)
343 elif element.tag == Bustard.BUSTARD_CONFIG:
344 self.bustard_config = element
346 raise ValueError("Unrecognized tag: %s" % (element.tag,))
348 def bustard(pathname):
350 Construct a Bustard object by analyzing an Illumina Bustard directory.
353 - `pathname`: A bustard directory
356 Fully initialized Bustard object.
358 pathname = os.path.abspath(pathname)
359 bustard_filename = os.path.join(pathname, 'config.xml')
360 demultiplexed_filename = os.path.join(pathname,
361 'DemultiplexedBustardConfig.xml')
363 if os.path.exists(demultiplexed_filename):
364 b = bustard_from_hiseq(pathname, demultiplexed_filename)
365 elif os.path.exists(bustard_filename):
366 b = bustard_from_ga2(pathname, bustard_filename)
368 b = bustard_from_ga1(pathname)
371 raise RuntimeError("Unable to parse base-call directory at %s" % (pathname,))
375 def bustard_from_ga1(pathname):
376 """Initialize bustard class from ga1 era runfolders.
378 path, name = os.path.split(pathname)
380 groups = name.split("_")
382 msg = "Not enough information to create attributes"\
383 " from directory name: %s"
384 LOGGER.error(msg % (pathname,))
388 b.pathname = pathname
389 b.update_attributes_from_pathname()
390 version = re.search(VERSION_RE, groups[0])
391 b._path_version = version.group(1)
392 t = time.strptime(groups[1], EUROPEAN_STRPTIME)
393 b.date = date(*t[0:3])
396 # I only found these in Bustard1.9.5/1.9.6 directories
397 if b.version in ('1.9.5', '1.9.6'):
398 # at least for our runfolders for 1.9.5 and 1.9.6 matrix[1-8].txt are always the same
399 crosstalk_file = os.path.join(pathname, "matrix1.txt")
400 b.crosstalk = CrosstalkMatrix(crosstalk_file)
406 def bustard_from_ga2(pathname, config_filename):
407 """Initialize bustard class from ga2-era runfolder
408 Actually I don't quite remember if it is exactly the GA2s, but
409 its after the original runfolder style and before the HiSeq.
411 # for version 1.3.2 of the pipeline the bustard version number went down
412 # to match the rest of the pipeline. However there's now a nifty
413 # new (useful) bustard config file.
417 b.pathname = pathname
418 b.update_attributes_from_pathname()
419 bustard_config_root = ElementTree.parse(config_filename)
420 b.bustard_config = bustard_config_root.getroot()
421 b.crosstalk = crosstalk_matrix_from_bustard_config(b.pathname,
427 def bustard_from_hiseq(pathname, config_filename):
429 b.pathname = pathname
430 bustard_config_root = ElementTree.parse(config_filename)
431 b.bustard_config = bustard_config_root.getroot()
435 def add_phasing(bustard_obj):
436 paramfiles = glob(os.path.join(bustard_obj.pathname,
438 for paramfile in paramfiles:
439 phasing = Phasing(paramfile)
440 assert (phasing.lane >= 1 and phasing.lane <= 8)
441 bustard_obj.phasing[phasing.lane] = phasing
445 Reconstruct a htsworkflow.pipelines.Bustard object from an xml block
451 def make_cmdline_parser():
452 from optparse import OptionParser
453 parser = OptionParser('%prog: bustard_directory')
457 parser = make_cmdline_parser()
458 opts, args = parser.parse_args(cmdline)
460 for bustard_dir in args:
461 print(u'analyzing bustard directory: ' + unicode(bustard_dir))
462 bustard_object = bustard(bustard_dir)
463 bustard_object.dump()
465 bustard_object2 = Bustard(xml=bustard_object.get_elements())
466 print('-------------------------------------')
467 bustard_object2.dump()
468 print('=====================================')
469 b1_tree = bustard_object.get_elements()
470 b1 = ElementTree.tostring(b1_tree).split(os.linesep)
471 b2_tree = bustard_object2.get_elements()
472 b2 = ElementTree.tostring(b2_tree).split(os.linesep)
473 for line1, line2 in zip(b1, b2):
478 if __name__ == "__main__":