1 """Provide access to information stored in the GERALD directory.
3 from __future__ import print_function
6 from datetime import datetime, date
13 from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
14 from htsworkflow.pipelines.eland import eland, ELAND
15 from htsworkflow.pipelines.samplekey import SampleKey
17 from htsworkflow.pipelines import \
22 from htsworkflow.util.ethelp import indent, flatten
24 LOGGER = logging.getLogger(__name__)
26 class Alignment(object):
28 Capture meaning out of the GERALD directory
31 RUN_PARAMETERS='RunParameters'
34 def __init__(self, xml=None, pathname=None, tree=None):
35 self.pathname = pathname
38 # parse lane parameters out of the config.xml file
39 self.lanes = LaneSpecificRunParameters(self)
42 self.eland_results = None
45 self.set_elements(xml)
48 if self.pathname is not None:
49 epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
50 return datetime.fromtimestamp(epochstamp)
51 return datetime.today()
54 return time.mktime(self.date.timetuple())
55 time = property(_get_time, doc='return run time as seconds since epoch')
57 def _get_chip_attribute(self, value):
58 return self.tree.findtext('ChipWideRunParameters/%s' % (value,))
62 Debugging function, report current object
64 print('Software:'. self.__class__.__name__)
65 print('Alignment version:', self.version)
66 print('Run date:', self.date)
67 print('config.xml:', self.tree)
70 def get_elements(self, root_tag):
71 if self.tree is None or self.summary is None:
74 gerald = ElementTree.Element(root_tag,
75 {'version': unicode(Gerald.XML_VERSION)})
76 gerald.append(self.tree)
77 gerald.append(self.summary.get_elements())
78 if self.eland_results:
79 gerald.append(self.eland_results.get_elements())
82 def set_elements(self, tree, root_tag):
83 if tree.tag != root_tag:
84 raise ValueError('expected %s' % (self.__class__.GERALD,))
85 xml_version = int(tree.attrib.get('version', 0))
86 if xml_version > Gerald.XML_VERSION:
87 LOGGER.warn('XML tree is a higher version than this class')
88 self.eland_results = ELAND()
89 for element in list(tree):
90 tag = element.tag.lower()
91 if tag == Gerald.RUN_PARAMETERS.lower():
93 elif tag == Gerald.SUMMARY.lower():
94 self.summary = Summary(xml=element)
95 elif tag == ELAND.ELAND.lower():
96 self.eland_results = ELAND(xml=element)
98 LOGGER.warn("Unrecognized tag %s" % (element.tag,))
100 class Gerald(Alignment):
104 if self.tree is None:
105 return datetime.today()
107 timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
108 if timestamp is not None:
109 epochstamp = time.mktime(time.strptime(timestamp))
110 return datetime.fromtimestamp(epochstamp)
111 return super(Gerald, self)._get_date()
112 date = property(_get_date)
114 def get_elements(self):
115 return super(Gerald, self).get_elements(Gerald.GERALD)
117 def set_elements(self, tree):
118 return super(Gerald, self).set_elements(tree, Gerald.GERALD)
120 def _get_experiment_root(self):
121 if self.tree is None:
123 return self.tree.findtext('ChipWideRunParameters/EXPT_DIR_ROOT')
125 def _get_runfolder_name(self):
126 if self.tree is None:
129 expt_root = os.path.normpath(self._get_experiment_root())
130 chip_expt_dir = self.tree.findtext('ChipWideRunParameters/EXPT_DIR')
132 if expt_root is not None and chip_expt_dir is not None:
133 experiment_dir = chip_expt_dir.replace(expt_root+os.path.sep, '')
134 experiment_dir = experiment_dir.split(os.path.sep)[0]
136 if experiment_dir is None or len(experiment_dir) == 0:
138 return experiment_dir
140 runfolder_name = property(_get_runfolder_name)
142 def _get_software_version(self):
143 if self.tree is None:
145 ga_version = self.tree.findtext(
146 'ChipWideRunParameters/SOFTWARE_VERSION')
147 if ga_version is not None:
148 gerald = re.match("@.*GERALD.pl,v (?P<version>\d+(\.\d+)+)",
151 return ('GERALD', gerald.group('version'))
152 casava = re.match('CASAVA-(?P<version>\d+[.\d]*)',
155 return ('CASAVA', casava.group('version'))
157 def _get_software(self):
158 """Return name of analysis software package"""
159 software_version = self._get_software_version()
160 return software_version[0] if software_version is not None else None
161 software = property(_get_software)
163 def _get_version(self):
164 """Return version number of software package"""
165 software_version = self._get_software_version()
166 return software_version[1] if software_version is not None else None
167 version = property(_get_version)
169 class CASAVA(Alignment):
172 def __init__(self, xml=None, pathname=None, tree=None):
173 super(CASAVA, self).__init__(xml=xml, pathname=pathname, tree=tree)
175 self._add_timestamp()
177 def _add_timestamp(self):
178 """Manually add a time stamp to CASAVA runs"""
179 if self.tree is None:
181 if len(self.tree.xpath('TIME_STAMP')) == 0:
182 time_stamp = self.date.strftime('%a %b %d %H:%M:%S %Y')
183 time_element = ElementTree.Element('TIME_STAMP')
184 time_element.text = time_stamp
185 self.tree.append(time_element)
188 if self.tree is None:
190 time_element = self.tree.xpath('TIME_STAMP')
191 if len(time_element) == 1:
192 timetuple = time.strptime(
193 time_element[0].text.strip(),
194 "%a %b %d %H:%M:%S %Y")
195 return datetime(*timetuple[:6])
196 return super(CASAVA, self)._get_date()
197 date = property(_get_date)
199 def get_elements(self):
200 tree = super(CASAVA, self).get_elements(CASAVA.GERALD)
203 def set_elements(self, tree):
204 return super(CASAVA, self).set_elements(tree, CASAVA.GERALD)
206 def _get_runfolder_name(self):
207 if self.tree is None:
210 # hiseqs renamed the experiment dir location
211 defaults_expt_dir = self.tree.findtext('Defaults/EXPT_DIR')
212 _, experiment_dir = os.path.split(defaults_expt_dir)
214 if experiment_dir is None or len(experiment_dir) == 0:
216 return experiment_dir
218 runfolder_name = property(_get_runfolder_name)
220 def _get_software_version(self):
221 if self.tree is None:
223 if self.tree is None:
225 hiseq_software_node = self.tree.find('Software')
226 software_version = hiseq_software_node.attrib.get('Version',None)
227 if software_version is None:
229 return software_version.split('-')
231 def _get_software(self):
232 software_version = self._get_software_version()
233 if software_version is None:
235 return software_version[0]
236 software = property(_get_software)
238 def _get_version(self):
239 software_version = self._get_software_version()
240 if software_version is None:
242 return software_version[1]
243 version = property(_get_version)
246 class LaneParameters(object):
248 Make it easy to access elements of LaneSpecificRunParameters from python
250 def __init__(self, gerald, lane_id):
251 self._gerald = gerald
252 self._lane_id = lane_id
254 def _get_analysis(self):
255 raise NotImplemented("abstract class")
256 analysis = property(_get_analysis)
258 def _get_eland_genome(self):
259 raise NotImplemented("abstract class")
260 eland_genome = property(_get_eland_genome)
262 def _get_read_length(self):
263 raise NotImplemented("abstract class")
264 read_length = property(_get_read_length)
266 def _get_use_bases(self):
267 raise NotImplemented("abstract class")
268 use_bases = property(_get_use_bases)
271 class LaneParametersGA(LaneParameters):
273 Make it easy to access elements of LaneSpecificRunParameters from python
275 def __init__(self, gerald, lane_id):
276 super(LaneParametersGA, self).__init__(gerald, lane_id)
278 def __get_attribute(self, xml_tag):
279 subtree = self._gerald.tree.find('LaneSpecificRunParameters')
280 container = subtree.find(xml_tag)
281 if container is None:
283 if len(container.getchildren()) > LANES_PER_FLOWCELL:
284 raise RuntimeError('GERALD config.xml file changed')
285 lanes = [x.tag.split('_')[1] for x in container.getchildren()]
287 index = lanes.index(self._lane_id)
288 except ValueError as e:
290 element = container[index]
292 def _get_analysis(self):
293 return self.__get_attribute('ANALYSIS')
294 analysis = property(_get_analysis)
296 def _get_eland_genome(self):
297 genome = self.__get_attribute('ELAND_GENOME')
298 # default to the chipwide parameters if there isn't an
299 # entry in the lane specific paramaters
301 genome = self._gerald._get_chip_attribute('ELAND_GENOME')
303 if genome == 'Need_to_specify_ELAND_genome_directory':
306 eland_genome = property(_get_eland_genome)
308 def _get_read_length(self):
309 read_length = self.__get_attribute('READ_LENGTH')
310 if read_length is None:
311 read_length = self._gerald._get_chip_attribute('READ_LENGTH')
313 read_length = property(_get_read_length)
315 def _get_use_bases(self):
316 return self.__get_attribute('USE_BASES')
317 use_bases = property(_get_use_bases)
320 class LaneParametersHiSeq(LaneParameters):
322 Make it easy to access elements of LaneSpecificRunParameters from python
324 def __init__(self, gerald, lane_id, element):
325 super(LaneParametersHiSeq, self).__init__(gerald, lane_id)
326 self.element = element
328 def __get_attribute(self, xml_tag):
329 container = self.element.find(xml_tag)
330 if container is None:
332 return container.text
334 def _get_analysis(self):
335 return self.__get_attribute('ANALYSIS')
336 analysis = property(_get_analysis)
338 def _get_eland_genome(self):
339 genome = self.__get_attribute('ELAND_GENOME')
340 # default to the chipwide parameters if there isn't an
341 # entry in the lane specific paramaters
343 genome = self._gerald._get_chip_attribute('ELAND_GENOME')
345 if genome == 'Need_to_specify_ELAND_genome_directory':
348 eland_genome = property(_get_eland_genome)
350 def _get_read_length(self):
351 return self.__get_attribute('READ_LENGTH1')
352 read_length = property(_get_read_length)
354 def _get_use_bases(self):
355 return self.__get_attribute('USE_BASES1')
356 use_bases = property(_get_use_bases)
358 class LaneSpecificRunParameters(collections.MutableMapping):
360 Provide access to LaneSpecificRunParameters
362 def __init__(self, gerald):
363 self._gerald = gerald
366 def _initialize_lanes(self):
368 build dictionary of LaneParameters
371 tree = self._gerald.tree
372 analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
373 if analysis is not None:
374 self._extract_ga_analysis_type(analysis)
375 analysis = tree.find('Projects')
376 if analysis is not None:
377 self._extract_hiseq_analysis_type(analysis)
379 def _extract_ga_analysis_type(self, analysis):
380 # according to the pipeline specs I think their fields
381 # are sampleName_laneID, with sampleName defaulting to s
382 # since laneIDs are constant lets just try using
383 # those consistently.
384 for element in analysis:
385 sample, lane_id = element.tag.split('_')
386 key = SampleKey(lane=int(lane_id), sample=sample)
387 self._lanes[key] = LaneParametersGA(
388 self._gerald, lane_id)
390 def _extract_hiseq_analysis_type(self, analysis):
391 """Extract from HiSeq style multiplexed analysis types"""
392 for element in analysis:
393 name = element.attrib['name']
394 key = SampleKey(sample=name)
395 self._lanes[key] = LaneParametersHiSeq(self._gerald,
400 if self._lanes is None:
401 self._initialize_lanes()
402 return self._lanes.iterkeys()
404 def __getitem__(self, key):
405 if self._lanes is None:
406 self._initialize_lanes()
407 value = self._lanes.get(key, None)
408 if value is not None:
410 real_key = self._find_key(key)
411 if real_key is not None:
412 return self._lanes[real_key]
413 raise KeyError("%s not found in %s" % (
415 ",".join((repr(k) for k in self._lanes.keys()))))
417 def __setitem__(self, key, value):
418 if len(self._lanes) > 100:
419 LOGGER.warn("many projects loaded, consider improving dictionary")
420 real_key = self._find_key(key)
421 if real_key is not None:
423 self._lanes[key] = value
425 def __delitem__(self, key):
426 if key in self._lanes:
429 real_key = self._find_key(key)
430 if real_key is not None:
431 del self._lanes[real_key]
434 if self._lanes is None:
435 self._initialize_lanes()
436 return len(self._lanes)
438 def _find_key(self, lookup_key):
439 if not isinstance(lookup_key, SampleKey):
440 lookup_key = SampleKey(lane=lookup_key)
443 for k in self._lanes:
444 if k.matches(lookup_key):
447 errmsg = "Key %s matched multiple keys: %s"
448 raise ValueError(errmsg % (str(lookup_key),
449 ",".join((str(x) for x in results))))
451 elif len(results) == 1:
456 def gerald(pathname):
457 LOGGER.info("Parsing gerald config.xml")
458 pathname = os.path.expanduser(pathname)
459 config_pathname = os.path.join(pathname, 'config.xml')
460 config_tree = ElementTree.parse(config_pathname).getroot()
462 # parse Summary.htm file
463 summary_xml = os.path.join(pathname, 'Summary.xml')
464 summary_htm = os.path.join(pathname, 'Summary.htm')
465 report_summary = os.path.join(pathname, '..', 'Data',
466 'reports', 'Summary', )
467 if os.path.exists(summary_xml):
468 g = Gerald(pathname = pathname, tree=config_tree)
469 LOGGER.info("Parsing Summary.xml")
470 g.summary = SummaryGA(summary_xml)
471 g.eland_results = eland(g.pathname, g)
472 elif os.path.exists(summary_htm):
473 g = Gerald(pathname=pathname, tree=config_tree)
474 LOGGER.info("Parsing Summary.htm")
475 g.summary = SummaryGA(summary_htm)
476 g.eland_results = eland(g.pathname, g)
477 elif os.path.isdir(report_summary):
478 g = CASAVA(pathname=pathname, tree=config_tree)
479 LOGGER.info("Parsing %s" % (report_summary,))
480 g.summary = SummaryHiSeq(report_summary)
481 g.eland_results = eland(g.pathname, g)
486 if __name__ == "__main__":
489 g = gerald(sys.argv[1])
490 #ElementTree.dump(g.get_elements())