1 """Provide access to information stored in the GERALD directory.
4 from datetime import datetime, date
11 from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
12 from htsworkflow.pipelines.eland import eland, ELAND
13 from htsworkflow.pipelines.samplekey import SampleKey
15 from htsworkflow.pipelines.runfolder import \
20 from htsworkflow.util.ethelp import indent, flatten
22 LOGGER = logging.getLogger(__name__)
24 class Alignment(object):
26 Capture meaning out of the GERALD directory
29 RUN_PARAMETERS='RunParameters'
32 def __init__(self, xml=None, pathname=None, tree=None):
33 self.pathname = pathname
36 # parse lane parameters out of the config.xml file
37 self.lanes = LaneSpecificRunParameters(self)
40 self.eland_results = None
43 self.set_elements(xml)
46 if self.pathname is not None:
47 epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
48 return datetime.fromtimestamp(epochstamp)
49 return datetime.today()
52 return time.mktime(self.date.timetuple())
53 time = property(_get_time, doc='return run time as seconds since epoch')
55 def _get_chip_attribute(self, value):
56 return self.tree.findtext('ChipWideRunParameters/%s' % (value,))
60 Debugging function, report current object
62 print 'Software:'. self.__class__.__name__
63 print 'Alignment version:', self.version
64 print 'Run date:', self.date
65 print 'config.xml:', self.tree
68 def get_elements(self, root_tag):
69 if self.tree is None or self.summary is None:
72 gerald = ElementTree.Element(root_tag,
73 {'version': unicode(Gerald.XML_VERSION)})
74 gerald.append(self.tree)
75 gerald.append(self.summary.get_elements())
76 if self.eland_results:
77 gerald.append(self.eland_results.get_elements())
80 def set_elements(self, tree, root_tag):
81 if tree.tag != root_tag:
82 raise ValueError('expected %s' % (self.__class__.GERALD,))
83 xml_version = int(tree.attrib.get('version', 0))
84 if xml_version > Gerald.XML_VERSION:
85 LOGGER.warn('XML tree is a higher version than this class')
86 self.eland_results = ELAND()
87 for element in list(tree):
88 tag = element.tag.lower()
89 if tag == Gerald.RUN_PARAMETERS.lower():
91 elif tag == Gerald.SUMMARY.lower():
92 self.summary = Summary(xml=element)
93 elif tag == ELAND.ELAND.lower():
94 self.eland_results = ELAND(xml=element)
96 LOGGER.warn("Unrecognized tag %s" % (element.tag,))
98 class Gerald(Alignment):
102 if self.tree is None:
103 return datetime.today()
105 timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
106 if timestamp is not None:
107 epochstamp = time.mktime(time.strptime(timestamp, '%c'))
108 return datetime.fromtimestamp(epochstamp)
109 return super(Gerald, self)._get_date()
110 date = property(_get_date)
112 def get_elements(self):
113 return super(Gerald, self).get_elements(Gerald.GERALD)
115 def set_elements(self, tree):
116 return super(Gerald, self).set_elements(tree, Gerald.GERALD)
118 def _get_experiment_root(self):
119 if self.tree is None:
121 return self.tree.findtext('ChipWideRunParameters/EXPT_DIR_ROOT')
123 def _get_runfolder_name(self):
124 if self.tree is None:
127 expt_root = os.path.normpath(self._get_experiment_root())
128 chip_expt_dir = self.tree.findtext('ChipWideRunParameters/EXPT_DIR')
130 if expt_root is not None and chip_expt_dir is not None:
131 experiment_dir = chip_expt_dir.replace(expt_root+os.path.sep, '')
132 experiment_dir = experiment_dir.split(os.path.sep)[0]
134 if experiment_dir is None or len(experiment_dir) == 0:
136 return experiment_dir
138 runfolder_name = property(_get_runfolder_name)
140 def _get_software_version(self):
141 if self.tree is None:
143 ga_version = self.tree.findtext(
144 'ChipWideRunParameters/SOFTWARE_VERSION')
145 if ga_version is not None:
146 gerald = re.match("@.*GERALD.pl,v (?P<version>\d+(\.\d+)+)",
149 return ('GERALD', gerald.group('version'))
150 casava = re.match('CASAVA-(?P<version>\d+[.\d]*)',
153 return ('CASAVA', casava.group('version'))
155 def _get_software(self):
156 """Return name of analysis software package"""
157 software_version = self._get_software_version()
158 return software_version[0] if software_version is not None else None
159 software = property(_get_software)
161 def _get_version(self):
162 """Return version number of software package"""
163 software_version = self._get_software_version()
164 return software_version[1] if software_version is not None else None
165 version = property(_get_version)
167 class CASAVA(Alignment):
170 def __init__(self, xml=None, pathname=None, tree=None):
171 super(CASAVA, self).__init__(xml=xml, pathname=pathname, tree=tree)
173 self._add_timestamp()
175 def _add_timestamp(self):
176 """Manually add a time stamp to CASAVA runs"""
177 if self.tree is None:
179 if len(self.tree.xpath('TIME_STAMP')) == 0:
180 time_stamp = self.date.strftime('%c')
181 time_element = ElementTree.Element('TIME_STAMP')
182 time_element.text = time_stamp
183 self.tree.append(time_element)
186 if self.tree is None:
188 time_element = self.tree.xpath('TIME_STAMP')
189 if len(time_element) == 1:
190 return datetime.strptime(time_element[0].text, '%c')
191 return super(CASAVA, self)._get_date()
192 date = property(_get_date)
194 def get_elements(self):
195 tree = super(CASAVA, self).get_elements(CASAVA.GERALD)
198 def set_elements(self, tree):
199 return super(CASAVA, self).set_elements(tree, CASAVA.GERALD)
201 def _get_runfolder_name(self):
202 if self.tree is None:
205 # hiseqs renamed the experiment dir location
206 defaults_expt_dir = self.tree.findtext('Defaults/EXPT_DIR')
207 _, experiment_dir = os.path.split(defaults_expt_dir)
209 if experiment_dir is None or len(experiment_dir) == 0:
211 return experiment_dir
213 runfolder_name = property(_get_runfolder_name)
215 def _get_software_version(self):
216 if self.tree is None:
218 if self.tree is None:
220 hiseq_software_node = self.tree.find('Software')
221 software_version = hiseq_software_node.attrib.get('Version',None)
222 if software_version is None:
224 return software_version.split('-')
226 def _get_software(self):
227 software_version = self._get_software_version()
228 if software_version is None:
230 return software_version[0]
231 software = property(_get_software)
233 def _get_version(self):
234 software_version = self._get_software_version()
235 if software_version is None:
237 return software_version[1]
238 version = property(_get_version)
241 class LaneParameters(object):
243 Make it easy to access elements of LaneSpecificRunParameters from python
245 def __init__(self, gerald, lane_id):
246 self._gerald = gerald
247 self._lane_id = lane_id
249 def _get_analysis(self):
250 raise NotImplemented("abstract class")
251 analysis = property(_get_analysis)
253 def _get_eland_genome(self):
254 raise NotImplemented("abstract class")
255 eland_genome = property(_get_eland_genome)
257 def _get_read_length(self):
258 raise NotImplemented("abstract class")
259 read_length = property(_get_read_length)
261 def _get_use_bases(self):
262 raise NotImplemented("abstract class")
263 use_bases = property(_get_use_bases)
266 class LaneParametersGA(LaneParameters):
268 Make it easy to access elements of LaneSpecificRunParameters from python
270 def __init__(self, gerald, lane_id):
271 super(LaneParametersGA, self).__init__(gerald, lane_id)
273 def __get_attribute(self, xml_tag):
274 subtree = self._gerald.tree.find('LaneSpecificRunParameters')
275 container = subtree.find(xml_tag)
276 if container is None:
278 if len(container.getchildren()) > LANES_PER_FLOWCELL:
279 raise RuntimeError('GERALD config.xml file changed')
280 lanes = [x.tag.split('_')[1] for x in container.getchildren()]
282 index = lanes.index(self._lane_id)
283 except ValueError, e:
285 element = container[index]
287 def _get_analysis(self):
288 return self.__get_attribute('ANALYSIS')
289 analysis = property(_get_analysis)
291 def _get_eland_genome(self):
292 genome = self.__get_attribute('ELAND_GENOME')
293 # default to the chipwide parameters if there isn't an
294 # entry in the lane specific paramaters
296 genome = self._gerald._get_chip_attribute('ELAND_GENOME')
298 if genome == 'Need_to_specify_ELAND_genome_directory':
301 eland_genome = property(_get_eland_genome)
303 def _get_read_length(self):
304 read_length = self.__get_attribute('READ_LENGTH')
305 if read_length is None:
306 read_length = self._gerald._get_chip_attribute('READ_LENGTH')
308 read_length = property(_get_read_length)
310 def _get_use_bases(self):
311 return self.__get_attribute('USE_BASES')
312 use_bases = property(_get_use_bases)
315 class LaneParametersHiSeq(LaneParameters):
317 Make it easy to access elements of LaneSpecificRunParameters from python
319 def __init__(self, gerald, lane_id, element):
320 super(LaneParametersHiSeq, self).__init__(gerald, lane_id)
321 self.element = element
323 def __get_attribute(self, xml_tag):
324 container = self.element.find(xml_tag)
325 if container is None:
327 return container.text
329 def _get_analysis(self):
330 return self.__get_attribute('ANALYSIS')
331 analysis = property(_get_analysis)
333 def _get_eland_genome(self):
334 genome = self.__get_attribute('ELAND_GENOME')
335 # default to the chipwide parameters if there isn't an
336 # entry in the lane specific paramaters
338 genome = self._gerald._get_chip_attribute('ELAND_GENOME')
340 if genome == 'Need_to_specify_ELAND_genome_directory':
343 eland_genome = property(_get_eland_genome)
345 def _get_read_length(self):
346 return self.__get_attribute('READ_LENGTH1')
347 read_length = property(_get_read_length)
349 def _get_use_bases(self):
350 return self.__get_attribute('USE_BASES1')
351 use_bases = property(_get_use_bases)
353 class LaneSpecificRunParameters(collections.MutableMapping):
355 Provide access to LaneSpecificRunParameters
357 def __init__(self, gerald):
358 self._gerald = gerald
361 def _initialize_lanes(self):
363 build dictionary of LaneParameters
366 tree = self._gerald.tree
367 analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
368 if analysis is not None:
369 self._extract_ga_analysis_type(analysis)
370 analysis = tree.find('Projects')
371 if analysis is not None:
372 self._extract_hiseq_analysis_type(analysis)
374 def _extract_ga_analysis_type(self, analysis):
375 # according to the pipeline specs I think their fields
376 # are sampleName_laneID, with sampleName defaulting to s
377 # since laneIDs are constant lets just try using
378 # those consistently.
379 for element in analysis:
380 sample, lane_id = element.tag.split('_')
381 key = SampleKey(lane=int(lane_id), sample=sample)
382 self._lanes[key] = LaneParametersGA(
383 self._gerald, lane_id)
385 def _extract_hiseq_analysis_type(self, analysis):
386 """Extract from HiSeq style multiplexed analysis types"""
387 for element in analysis:
388 name = element.attrib['name']
389 key = SampleKey(sample=name)
390 self._lanes[key] = LaneParametersHiSeq(self._gerald,
395 if self._lanes is None:
396 self._initialize_lanes()
397 return self._lanes.iterkeys()
399 def __getitem__(self, key):
400 if self._lanes is None:
401 self._initialize_lanes()
402 value = self._lanes.get(key, None)
403 if value is not None:
405 real_key = self._find_key(key)
406 if real_key is not None:
407 return self._lanes[real_key]
408 raise KeyError("%s not found in %s" % (
410 ",".join((repr(k) for k in self._lanes.keys()))))
412 def __setitem__(self, key, value):
413 if len(self._lanes) > 100:
414 LOGGER.warn("many projects loaded, consider improving dictionary")
415 real_key = self._find_key(key)
416 if real_key is not None:
418 self._lanes[key] = value
420 def __delitem__(self, key):
421 if key in self._lanes:
424 real_key = self._find_key(key)
425 if real_key is not None:
426 del self._lanes[real_key]
429 if self._lanes is None:
430 self._initialize_lanes()
431 return len(self._lanes)
433 def _find_key(self, lookup_key):
434 if not isinstance(lookup_key, SampleKey):
435 lookup_key = SampleKey(lane=lookup_key)
438 for k in self._lanes:
439 if k.matches(lookup_key):
442 errmsg = "Key %s matched multiple keys: %s"
443 raise ValueError(errmsg % (str(lookup_key),
444 ",".join((str(x) for x in results))))
446 elif len(results) == 1:
451 def gerald(pathname):
452 LOGGER.info("Parsing gerald config.xml")
453 pathname = os.path.expanduser(pathname)
454 config_pathname = os.path.join(pathname, 'config.xml')
455 config_tree = ElementTree.parse(config_pathname).getroot()
457 # parse Summary.htm file
458 summary_xml = os.path.join(pathname, 'Summary.xml')
459 summary_htm = os.path.join(pathname, 'Summary.htm')
460 report_summary = os.path.join(pathname, '..', 'Data',
461 'reports', 'Summary', )
462 if os.path.exists(summary_xml):
463 g = Gerald(pathname = pathname, tree=config_tree)
464 LOGGER.info("Parsing Summary.xml")
465 g.summary = SummaryGA(summary_xml)
466 g.eland_results = eland(g.pathname, g)
467 elif os.path.exists(summary_htm):
468 g = Gerald(pathname=pathname, tree=config_tree)
469 LOGGER.info("Parsing Summary.htm")
470 g.summary = SummaryGA(summary_htm)
471 g.eland_results = eland(g.pathname, g)
472 elif os.path.isdir(report_summary):
473 g = CASAVA(pathname=pathname, tree=config_tree)
474 LOGGER.info("Parsing %s" % (report_summary,))
475 g.summary = SummaryHiSeq(report_summary)
476 g.eland_results = eland(g.pathname, g)
481 if __name__ == "__main__":
484 g = gerald(sys.argv[1])
485 #ElementTree.dump(g.get_elements())