Add support for extracting data out of Illumina's new RTA runfolder.
[htsworkflow.git] / htsworkflow / pipelines / gerald.py
1 """
2 Provide access to information stored in the GERALD directory.
3 """
4 from datetime import datetime, date
5 import logging
6 import os
7 import time
8
9 from htsworkflow.pipelines.summary import Summary
10 from htsworkflow.pipelines.eland import eland, ELAND
11
12 from htsworkflow.pipelines.runfolder import \
13    ElementTree, \
14    EUROPEAN_STRPTIME, \
15    LANES_PER_FLOWCELL, \
16    VERSION_RE
17 from htsworkflow.util.ethelp import indent, flatten
18
19 class Gerald(object):
20     """
21     Capture meaning out of the GERALD directory
22     """
23     XML_VERSION = 1
24     GERALD='Gerald'
25     RUN_PARAMETERS='RunParameters'
26     SUMMARY='Summary'
27
28     class LaneParameters(object):
29         """
30         Make it easy to access elements of LaneSpecificRunParameters from python
31         """
32         def __init__(self, gerald, lane_id):
33             self._gerald = gerald
34             self._lane_id = lane_id
35
36         def __get_attribute(self, xml_tag):
37             subtree = self._gerald.tree.find('LaneSpecificRunParameters')
38             container = subtree.find(xml_tag)
39             if container is None:
40                 return None
41             if len(container.getchildren()) > LANES_PER_FLOWCELL:
42                 raise RuntimeError('GERALD config.xml file changed')
43             lanes = [x.tag.split('_')[1] for x in container.getchildren()]
44             try:
45                 index = lanes.index(self._lane_id)
46             except ValueError, e:
47                 return None
48             element = container[index]
49             return element.text
50         def _get_analysis(self):
51             return self.__get_attribute('ANALYSIS')
52         analysis = property(_get_analysis)
53
54         def _get_eland_genome(self):
55             genome = self.__get_attribute('ELAND_GENOME')
56             # default to the chipwide parameters if there isn't an
57             # entry in the lane specific paramaters
58             if genome is None:
59                 subtree = self._gerald.tree.find('ChipWideRunParameters')
60                 container = subtree.find('ELAND_GENOME')
61                 genome = container.text
62             return genome
63         eland_genome = property(_get_eland_genome)
64
65         def _get_read_length(self):
66             return self.__get_attribute('READ_LENGTH')
67         read_length = property(_get_read_length)
68
69         def _get_use_bases(self):
70             return self.__get_attribute('USE_BASES')
71         use_bases = property(_get_use_bases)
72
73     class LaneSpecificRunParameters(object):
74         """
75         Provide access to LaneSpecificRunParameters
76         """
77         def __init__(self, gerald):
78             self._gerald = gerald
79             self._lane = None
80
81         def _initalize_lanes(self):
82             """
83             build dictionary of LaneParameters
84             """
85             self._lanes = {}
86             tree = self._gerald.tree
87             analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
88             # according to the pipeline specs I think their fields
89             # are sampleName_laneID, with sampleName defaulting to s
90             # since laneIDs are constant lets just try using
91             # those consistently.
92             for element in analysis:
93                 sample, lane_id = element.tag.split('_')
94                 self._lanes[int(lane_id)] = Gerald.LaneParameters(
95                                               self._gerald, lane_id)
96
97         def __getitem__(self, key):
98             if self._lane is None:
99                 self._initalize_lanes()
100             return self._lanes[key]
101         def keys(self):
102             if self._lane is None:
103                 self._initalize_lanes()
104             return self._lanes.keys()
105         def values(self):
106             if self._lane is None:
107                 self._initalize_lanes()
108             return self._lanes.values()
109         def items(self):
110             if self._lane is None:
111                 self._initalize_lanes()
112             return self._lanes.items()
113         def __len__(self):
114             if self._lane is None:
115                 self._initalize_lanes()
116             return len(self._lanes)
117
118     def __init__(self, xml=None):
119         self.pathname = None
120         self.tree = None
121
122         # parse lane parameters out of the config.xml file
123         self.lanes = Gerald.LaneSpecificRunParameters(self)
124
125         self.summary = None
126         self.eland_results = None
127
128         if xml is not None:
129             self.set_elements(xml)
130
131     def _get_date(self):
132         if self.tree is None:
133             return datetime.today()
134         timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
135         epochstamp = time.mktime(time.strptime(timestamp, '%c'))
136         return datetime.fromtimestamp(epochstamp)
137     date = property(_get_date)
138
139     def _get_time(self):
140         return time.mktime(self.date.timetuple())
141     time = property(_get_time, doc='return run time as seconds since epoch')
142
143     def _get_version(self):
144         if self.tree is None:
145             return None
146         return self.tree.findtext('ChipWideRunParameters/SOFTWARE_VERSION')
147     version = property(_get_version)
148
149     def dump(self):
150         """
151         Debugging function, report current object
152         """
153         print 'Gerald version:', self.version
154         print 'Gerald run date:', self.date
155         print 'Gerald config.xml:', self.tree
156         self.summary.dump()
157
158     def get_elements(self):
159         if self.tree is None or self.summary is None:
160             return None
161
162         gerald = ElementTree.Element(Gerald.GERALD,
163                                      {'version': unicode(Gerald.XML_VERSION)})
164         gerald.append(self.tree)
165         gerald.append(self.summary.get_elements())
166         if self.eland_results:
167             gerald.append(self.eland_results.get_elements())
168         return gerald
169
170     def set_elements(self, tree):
171         if tree.tag !=  Gerald.GERALD:
172             raise ValueError('exptected GERALD')
173         xml_version = int(tree.attrib.get('version', 0))
174         if xml_version > Gerald.XML_VERSION:
175             logging.warn('XML tree is a higher version than this class')
176         self.eland_results = ELAND()
177         for element in list(tree):
178             tag = element.tag.lower()
179             if tag == Gerald.RUN_PARAMETERS.lower():
180                 self.tree = element
181             elif tag == Gerald.SUMMARY.lower():
182                 self.summary = Summary(xml=element)
183             elif tag == ELAND.ELAND.lower():
184                 self.eland_results = ELAND(xml=element)
185             else:
186                 logging.warn("Unrecognized tag %s" % (element.tag,))
187
188 def gerald(pathname):
189     g = Gerald()
190     g.pathname = os.path.expanduser(pathname)
191     path, name = os.path.split(g.pathname)
192     logging.info("Parsing gerald config.xml")
193     config_pathname = os.path.join(g.pathname, 'config.xml')
194     g.tree = ElementTree.parse(config_pathname).getroot()
195
196     # parse Summary.htm file
197     logging.info("Parsing Summary.htm")
198     summary_pathname = os.path.join(g.pathname, 'Summary.htm')
199     g.summary = Summary(summary_pathname)
200     # parse eland files
201     g.eland_results = eland(g.pathname, g)
202     return g
203
204 if __name__ == "__main__":
205   # quick test code
206   import sys
207   g = gerald(sys.argv[1])
208   #ElementTree.dump(g.get_elements())