1 """Provide access to information stored in the GERALD directory.
4 from datetime import datetime, date
11 from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
12 from htsworkflow.pipelines.eland import eland, ELAND
13 from htsworkflow.pipelines.samplekey import SampleKey
15 from htsworkflow.pipelines.runfolder import \
20 from htsworkflow.util.ethelp import indent, flatten
22 LOGGER = logging.getLogger(__name__)
24 class Alignment(object):
26 Capture meaning out of the GERALD directory
29 RUN_PARAMETERS='RunParameters'
32 def __init__(self, xml=None, pathname=None, tree=None):
33 self.pathname = pathname
36 # parse lane parameters out of the config.xml file
37 self.lanes = LaneSpecificRunParameters(self)
40 self.eland_results = None
43 self.set_elements(xml)
46 if self.pathname is not None:
47 epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
48 return datetime.fromtimestamp(epochstamp)
49 return datetime.today()
52 return time.mktime(self.date.timetuple())
53 time = property(_get_time, doc='return run time as seconds since epoch')
55 def _get_chip_attribute(self, value):
56 return self.tree.findtext('ChipWideRunParameters/%s' % (value,))
60 Debugging function, report current object
62 print 'Software:'. self.__class__.__name__
63 print 'Alignment version:', self.version
64 print 'Run date:', self.date
65 print 'config.xml:', self.tree
68 def get_elements(self, root_tag):
69 if self.tree is None or self.summary is None:
72 gerald = ElementTree.Element(root_tag,
73 {'version': unicode(Gerald.XML_VERSION)})
74 gerald.append(self.tree)
75 gerald.append(self.summary.get_elements())
76 if self.eland_results:
77 gerald.append(self.eland_results.get_elements())
80 def set_elements(self, tree, root_tag):
81 if tree.tag != root_tag:
82 raise ValueError('expected %s' % (self.__class__.GERALD,))
83 xml_version = int(tree.attrib.get('version', 0))
84 if xml_version > Gerald.XML_VERSION:
85 LOGGER.warn('XML tree is a higher version than this class')
86 self.eland_results = ELAND()
87 for element in list(tree):
88 tag = element.tag.lower()
89 if tag == Gerald.RUN_PARAMETERS.lower():
91 elif tag == Gerald.SUMMARY.lower():
92 self.summary = Summary(xml=element)
93 elif tag == ELAND.ELAND.lower():
94 self.eland_results = ELAND(xml=element)
96 LOGGER.warn("Unrecognized tag %s" % (element.tag,))
98 class Gerald(Alignment):
102 if self.tree is None:
103 return datetime.today()
105 timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
106 if timestamp is not None:
107 epochstamp = time.mktime(time.strptime(timestamp))
108 return datetime.fromtimestamp(epochstamp)
109 return super(Gerald, self)._get_date()
110 date = property(_get_date)
112 def get_elements(self):
113 return super(Gerald, self).get_elements(Gerald.GERALD)
115 def set_elements(self, tree):
116 return super(Gerald, self).set_elements(tree, Gerald.GERALD)
118 def _get_experiment_root(self):
119 if self.tree is None:
121 return self.tree.findtext('ChipWideRunParameters/EXPT_DIR_ROOT')
123 def _get_runfolder_name(self):
124 if self.tree is None:
127 expt_root = os.path.normpath(self._get_experiment_root())
128 chip_expt_dir = self.tree.findtext('ChipWideRunParameters/EXPT_DIR')
130 if expt_root is not None and chip_expt_dir is not None:
131 experiment_dir = chip_expt_dir.replace(expt_root+os.path.sep, '')
132 experiment_dir = experiment_dir.split(os.path.sep)[0]
134 if experiment_dir is None or len(experiment_dir) == 0:
136 return experiment_dir
138 runfolder_name = property(_get_runfolder_name)
140 def _get_software_version(self):
141 if self.tree is None:
143 ga_version = self.tree.findtext(
144 'ChipWideRunParameters/SOFTWARE_VERSION')
145 if ga_version is not None:
146 gerald = re.match("@.*GERALD.pl,v (?P<version>\d+(\.\d+)+)",
149 return ('GERALD', gerald.group('version'))
150 casava = re.match('CASAVA-(?P<version>\d+[.\d]*)',
153 return ('CASAVA', casava.group('version'))
155 def _get_software(self):
156 """Return name of analysis software package"""
157 software_version = self._get_software_version()
158 return software_version[0] if software_version is not None else None
159 software = property(_get_software)
161 def _get_version(self):
162 """Return version number of software package"""
163 software_version = self._get_software_version()
164 return software_version[1] if software_version is not None else None
165 version = property(_get_version)
167 class CASAVA(Alignment):
170 def __init__(self, xml=None, pathname=None, tree=None):
171 super(CASAVA, self).__init__(xml=xml, pathname=pathname, tree=tree)
173 self._add_timestamp()
175 def _add_timestamp(self):
176 """Manually add a time stamp to CASAVA runs"""
177 if self.tree is None:
179 if len(self.tree.xpath('TIME_STAMP')) == 0:
180 time_stamp = self.date.strftime('%c')
181 time_element = ElementTree.Element('TIME_STAMP')
182 time_element.text = time_stamp
183 self.tree.append(time_element)
186 if self.tree is None:
188 time_element = self.tree.xpath('TIME_STAMP')
189 if len(time_element) == 1:
190 timetuple = time.strptime(
191 time_element[0].text.strip(),
192 "%a %d %b %Y %I:%M:%S %p")
193 return datetime(*timetuple[:6])
194 return super(CASAVA, self)._get_date()
195 date = property(_get_date)
197 def get_elements(self):
198 tree = super(CASAVA, self).get_elements(CASAVA.GERALD)
201 def set_elements(self, tree):
202 return super(CASAVA, self).set_elements(tree, CASAVA.GERALD)
204 def _get_runfolder_name(self):
205 if self.tree is None:
208 # hiseqs renamed the experiment dir location
209 defaults_expt_dir = self.tree.findtext('Defaults/EXPT_DIR')
210 _, experiment_dir = os.path.split(defaults_expt_dir)
212 if experiment_dir is None or len(experiment_dir) == 0:
214 return experiment_dir
216 runfolder_name = property(_get_runfolder_name)
218 def _get_software_version(self):
219 if self.tree is None:
221 if self.tree is None:
223 hiseq_software_node = self.tree.find('Software')
224 software_version = hiseq_software_node.attrib.get('Version',None)
225 if software_version is None:
227 return software_version.split('-')
229 def _get_software(self):
230 software_version = self._get_software_version()
231 if software_version is None:
233 return software_version[0]
234 software = property(_get_software)
236 def _get_version(self):
237 software_version = self._get_software_version()
238 if software_version is None:
240 return software_version[1]
241 version = property(_get_version)
244 class LaneParameters(object):
246 Make it easy to access elements of LaneSpecificRunParameters from python
248 def __init__(self, gerald, lane_id):
249 self._gerald = gerald
250 self._lane_id = lane_id
252 def _get_analysis(self):
253 raise NotImplemented("abstract class")
254 analysis = property(_get_analysis)
256 def _get_eland_genome(self):
257 raise NotImplemented("abstract class")
258 eland_genome = property(_get_eland_genome)
260 def _get_read_length(self):
261 raise NotImplemented("abstract class")
262 read_length = property(_get_read_length)
264 def _get_use_bases(self):
265 raise NotImplemented("abstract class")
266 use_bases = property(_get_use_bases)
269 class LaneParametersGA(LaneParameters):
271 Make it easy to access elements of LaneSpecificRunParameters from python
273 def __init__(self, gerald, lane_id):
274 super(LaneParametersGA, self).__init__(gerald, lane_id)
276 def __get_attribute(self, xml_tag):
277 subtree = self._gerald.tree.find('LaneSpecificRunParameters')
278 container = subtree.find(xml_tag)
279 if container is None:
281 if len(container.getchildren()) > LANES_PER_FLOWCELL:
282 raise RuntimeError('GERALD config.xml file changed')
283 lanes = [x.tag.split('_')[1] for x in container.getchildren()]
285 index = lanes.index(self._lane_id)
286 except ValueError, e:
288 element = container[index]
290 def _get_analysis(self):
291 return self.__get_attribute('ANALYSIS')
292 analysis = property(_get_analysis)
294 def _get_eland_genome(self):
295 genome = self.__get_attribute('ELAND_GENOME')
296 # default to the chipwide parameters if there isn't an
297 # entry in the lane specific paramaters
299 genome = self._gerald._get_chip_attribute('ELAND_GENOME')
301 if genome == 'Need_to_specify_ELAND_genome_directory':
304 eland_genome = property(_get_eland_genome)
306 def _get_read_length(self):
307 read_length = self.__get_attribute('READ_LENGTH')
308 if read_length is None:
309 read_length = self._gerald._get_chip_attribute('READ_LENGTH')
311 read_length = property(_get_read_length)
313 def _get_use_bases(self):
314 return self.__get_attribute('USE_BASES')
315 use_bases = property(_get_use_bases)
318 class LaneParametersHiSeq(LaneParameters):
320 Make it easy to access elements of LaneSpecificRunParameters from python
322 def __init__(self, gerald, lane_id, element):
323 super(LaneParametersHiSeq, self).__init__(gerald, lane_id)
324 self.element = element
326 def __get_attribute(self, xml_tag):
327 container = self.element.find(xml_tag)
328 if container is None:
330 return container.text
332 def _get_analysis(self):
333 return self.__get_attribute('ANALYSIS')
334 analysis = property(_get_analysis)
336 def _get_eland_genome(self):
337 genome = self.__get_attribute('ELAND_GENOME')
338 # default to the chipwide parameters if there isn't an
339 # entry in the lane specific paramaters
341 genome = self._gerald._get_chip_attribute('ELAND_GENOME')
343 if genome == 'Need_to_specify_ELAND_genome_directory':
346 eland_genome = property(_get_eland_genome)
348 def _get_read_length(self):
349 return self.__get_attribute('READ_LENGTH1')
350 read_length = property(_get_read_length)
352 def _get_use_bases(self):
353 return self.__get_attribute('USE_BASES1')
354 use_bases = property(_get_use_bases)
356 class LaneSpecificRunParameters(collections.MutableMapping):
358 Provide access to LaneSpecificRunParameters
360 def __init__(self, gerald):
361 self._gerald = gerald
364 def _initialize_lanes(self):
366 build dictionary of LaneParameters
369 tree = self._gerald.tree
370 analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
371 if analysis is not None:
372 self._extract_ga_analysis_type(analysis)
373 analysis = tree.find('Projects')
374 if analysis is not None:
375 self._extract_hiseq_analysis_type(analysis)
377 def _extract_ga_analysis_type(self, analysis):
378 # according to the pipeline specs I think their fields
379 # are sampleName_laneID, with sampleName defaulting to s
380 # since laneIDs are constant lets just try using
381 # those consistently.
382 for element in analysis:
383 sample, lane_id = element.tag.split('_')
384 key = SampleKey(lane=int(lane_id), sample=sample)
385 self._lanes[key] = LaneParametersGA(
386 self._gerald, lane_id)
388 def _extract_hiseq_analysis_type(self, analysis):
389 """Extract from HiSeq style multiplexed analysis types"""
390 for element in analysis:
391 name = element.attrib['name']
392 key = SampleKey(sample=name)
393 self._lanes[key] = LaneParametersHiSeq(self._gerald,
398 if self._lanes is None:
399 self._initialize_lanes()
400 return self._lanes.iterkeys()
402 def __getitem__(self, key):
403 if self._lanes is None:
404 self._initialize_lanes()
405 value = self._lanes.get(key, None)
406 if value is not None:
408 real_key = self._find_key(key)
409 if real_key is not None:
410 return self._lanes[real_key]
411 raise KeyError("%s not found in %s" % (
413 ",".join((repr(k) for k in self._lanes.keys()))))
415 def __setitem__(self, key, value):
416 if len(self._lanes) > 100:
417 LOGGER.warn("many projects loaded, consider improving dictionary")
418 real_key = self._find_key(key)
419 if real_key is not None:
421 self._lanes[key] = value
423 def __delitem__(self, key):
424 if key in self._lanes:
427 real_key = self._find_key(key)
428 if real_key is not None:
429 del self._lanes[real_key]
432 if self._lanes is None:
433 self._initialize_lanes()
434 return len(self._lanes)
436 def _find_key(self, lookup_key):
437 if not isinstance(lookup_key, SampleKey):
438 lookup_key = SampleKey(lane=lookup_key)
441 for k in self._lanes:
442 if k.matches(lookup_key):
445 errmsg = "Key %s matched multiple keys: %s"
446 raise ValueError(errmsg % (str(lookup_key),
447 ",".join((str(x) for x in results))))
449 elif len(results) == 1:
454 def gerald(pathname):
455 LOGGER.info("Parsing gerald config.xml")
456 pathname = os.path.expanduser(pathname)
457 config_pathname = os.path.join(pathname, 'config.xml')
458 config_tree = ElementTree.parse(config_pathname).getroot()
460 # parse Summary.htm file
461 summary_xml = os.path.join(pathname, 'Summary.xml')
462 summary_htm = os.path.join(pathname, 'Summary.htm')
463 report_summary = os.path.join(pathname, '..', 'Data',
464 'reports', 'Summary', )
465 if os.path.exists(summary_xml):
466 g = Gerald(pathname = pathname, tree=config_tree)
467 LOGGER.info("Parsing Summary.xml")
468 g.summary = SummaryGA(summary_xml)
469 g.eland_results = eland(g.pathname, g)
470 elif os.path.exists(summary_htm):
471 g = Gerald(pathname=pathname, tree=config_tree)
472 LOGGER.info("Parsing Summary.htm")
473 g.summary = SummaryGA(summary_htm)
474 g.eland_results = eland(g.pathname, g)
475 elif os.path.isdir(report_summary):
476 g = CASAVA(pathname=pathname, tree=config_tree)
477 LOGGER.info("Parsing %s" % (report_summary,))
478 g.summary = SummaryHiSeq(report_summary)
479 g.eland_results = eland(g.pathname, g)
484 if __name__ == "__main__":
487 g = gerald(sys.argv[1])
488 #ElementTree.dump(g.get_elements())