Split lane parameters into seperate classes for GA & HiSeq config files.
[htsworkflow.git] / htsworkflow / pipelines / gerald.py
1 """
2 Provide access to information stored in the GERALD directory.
3 """
4 from datetime import datetime, date
5 import logging
6 import os
7 import stat
8 import time
9
10 from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
11 from htsworkflow.pipelines.eland import eland, ELAND
12
13 from htsworkflow.pipelines.runfolder import \
14    ElementTree, \
15    EUROPEAN_STRPTIME, \
16    LANES_PER_FLOWCELL, \
17    VERSION_RE
18 from htsworkflow.util.ethelp import indent, flatten
19
20 LOGGER = logging.getLogger(__name__)
21
22 class Gerald(object):
23     """
24     Capture meaning out of the GERALD directory
25     """
26     XML_VERSION = 1
27     GERALD='Gerald'
28     RUN_PARAMETERS='RunParameters'
29     SUMMARY='Summary'
30
31     def __init__(self, xml=None):
32         self.pathname = None
33         self.tree = None
34
35         # parse lane parameters out of the config.xml file
36         self.lanes = LaneSpecificRunParameters(self)
37
38         self.summary = None
39         self.eland_results = None
40
41         if xml is not None:
42             self.set_elements(xml)
43
44     def _get_date(self):
45         if self.tree is None:
46             return datetime.today()
47         timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
48         if timestamp is not None:
49             epochstamp = time.mktime(time.strptime(timestamp, '%c'))
50             return datetime.fromtimestamp(epochstamp)
51         if self.pathname is not None:
52             epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
53             return datetime.fromtimestamp(epochstamp)
54         return datetime.today()
55     date = property(_get_date)
56
57     def _get_time(self):
58         return time.mktime(self.date.timetuple())
59     time = property(_get_time, doc='return run time as seconds since epoch')
60
61     def _get_experiment_root(self):
62         if self.tree is None:
63             return None
64         return self.tree.findtext('ChipWideRunParameters/EXPT_DIR_ROOT')
65
66     def _get_runfolder_name(self):
67         if self.tree is None:
68             return None
69
70         expt_root = os.path.normpath(self._get_experiment_root())
71         chip_expt_dir = self.tree.findtext('ChipWideRunParameters/EXPT_DIR')
72         # hiseqs renamed the experiment dir location
73         defaults_expt_dir = self.tree.findtext('Defaults/EXPT_DIR')
74
75         experiment_dir = None
76         if defaults_expt_dir is not None:
77             _, experiment_dir = os.path.split(defaults_expt_dir)
78         elif expt_root is not None and chip_expt_dir is not None:
79             experiment_dir = chip_expt_dir.replace(expt_root+os.path.sep, '')
80             experiment_dir = experiment_dir.split(os.path.sep)[0]
81
82         if experiment_dir is None or len(experiment_dir) == 0:
83             return None
84         return experiment_dir
85
86     runfolder_name = property(_get_runfolder_name)
87
88     def _get_version(self):
89         if self.tree is None:
90             return None
91         ga_version = self.tree.findtext(
92             'ChipWideRunParameters/SOFTWARE_VERSION')
93         if ga_version is not None:
94             return ga_version
95         hiseq_software_node = self.tree.find('Software')
96         hiseq_version = hiseq_software_node.attrib['Version']
97         return hiseq_version
98
99     version = property(_get_version)
100
101     def _get_chip_attribute(self, value):
102         return self.tree.findtext('ChipWideRunParameters/%s' % (value,))
103
104     def dump(self):
105         """
106         Debugging function, report current object
107         """
108         print 'Gerald version:', self.version
109         print 'Gerald run date:', self.date
110         print 'Gerald config.xml:', self.tree
111         self.summary.dump()
112
113     def get_elements(self):
114         if self.tree is None or self.summary is None:
115             return None
116
117         gerald = ElementTree.Element(Gerald.GERALD,
118                                      {'version': unicode(Gerald.XML_VERSION)})
119         gerald.append(self.tree)
120         gerald.append(self.summary.get_elements())
121         if self.eland_results:
122             gerald.append(self.eland_results.get_elements())
123         return gerald
124
125     def set_elements(self, tree):
126         if tree.tag !=  Gerald.GERALD:
127             raise ValueError('exptected GERALD')
128         xml_version = int(tree.attrib.get('version', 0))
129         if xml_version > Gerald.XML_VERSION:
130             LOGGER.warn('XML tree is a higher version than this class')
131         self.eland_results = ELAND()
132         for element in list(tree):
133             tag = element.tag.lower()
134             if tag == Gerald.RUN_PARAMETERS.lower():
135                 self.tree = element
136             elif tag == Gerald.SUMMARY.lower():
137                 self.summary = Summary(xml=element)
138             elif tag == ELAND.ELAND.lower():
139                 self.eland_results = ELAND(xml=element)
140             else:
141                 LOGGER.warn("Unrecognized tag %s" % (element.tag,))
142
143
144 class LaneParameters(object):
145     """
146     Make it easy to access elements of LaneSpecificRunParameters from python
147     """
148     def __init__(self, gerald, lane_id):
149         self._gerald = gerald
150         self._lane_id = lane_id
151
152     def _get_analysis(self):
153         raise NotImplemented("abstract class")
154     analysis = property(_get_analysis)
155
156     def _get_eland_genome(self):
157         raise NotImplemented("abstract class")
158     eland_genome = property(_get_eland_genome)
159
160     def _get_read_length(self):
161         raise NotImplemented("abstract class")
162     read_length = property(_get_read_length)
163
164     def _get_use_bases(self):
165         raise NotImplemented("abstract class")
166     use_bases = property(_get_use_bases)
167
168
169 class LaneParametersGA(LaneParameters):
170     """
171     Make it easy to access elements of LaneSpecificRunParameters from python
172     """
173     def __init__(self, gerald, lane_id):
174         super(LaneParametersGA, self).__init__(gerald, lane_id)
175
176     def __get_attribute(self, xml_tag):
177         subtree = self._gerald.tree.find('LaneSpecificRunParameters')
178         container = subtree.find(xml_tag)
179         if container is None:
180             return None
181         if len(container.getchildren()) > LANES_PER_FLOWCELL:
182             raise RuntimeError('GERALD config.xml file changed')
183         lanes = [x.tag.split('_')[1] for x in container.getchildren()]
184         try:
185             index = lanes.index(self._lane_id)
186         except ValueError, e:
187             return None
188         element = container[index]
189         return element.text
190     def _get_analysis(self):
191         return self.__get_attribute('ANALYSIS')
192     analysis = property(_get_analysis)
193
194     def _get_eland_genome(self):
195         genome = self.__get_attribute('ELAND_GENOME')
196         # default to the chipwide parameters if there isn't an
197         # entry in the lane specific paramaters
198         if genome is None:
199             genome = self._gerald._get_chip_attribute('ELAND_GENOME')
200         # ignore flag value
201         if genome == 'Need_to_specify_ELAND_genome_directory':
202             genome = None
203         return genome
204     eland_genome = property(_get_eland_genome)
205
206     def _get_read_length(self):
207         read_length = self.__get_attribute('READ_LENGTH')
208         if read_length is None:
209             read_length = self._gerald._get_chip_attribute('READ_LENGTH')
210         return read_length
211     read_length = property(_get_read_length)
212
213     def _get_use_bases(self):
214         return self.__get_attribute('USE_BASES')
215     use_bases = property(_get_use_bases)
216
217
218 class LaneParametersHiSeq(LaneParameters):
219     """
220     Make it easy to access elements of LaneSpecificRunParameters from python
221     """
222     def __init__(self, gerald, lane_id, element):
223         super(LaneParametersHiSeq, self).__init__(gerald, lane_id)
224         self.element = element
225
226     def __get_attribute(self, xml_tag):
227         container = self.element.find(xml_tag)
228         if container is None:
229             return None
230         return container.text
231
232     def _get_analysis(self):
233         return self.__get_attribute('ANALYSIS')
234     analysis = property(_get_analysis)
235
236     def _get_eland_genome(self):
237         genome = self.__get_attribute('ELAND_GENOME')
238         # default to the chipwide parameters if there isn't an
239         # entry in the lane specific paramaters
240         if genome is None:
241             genome = self._gerald._get_chip_attribute('ELAND_GENOME')
242         # ignore flag value
243         if genome == 'Need_to_specify_ELAND_genome_directory':
244             genome = None
245         return genome
246     eland_genome = property(_get_eland_genome)
247
248     def _get_read_length(self):
249         return self.__get_attribute('READ_LENGTH1')
250     read_length = property(_get_read_length)
251
252     def _get_use_bases(self):
253         return self.__get_attribute('USE_BASES1')
254     use_bases = property(_get_use_bases)
255
256 class LaneSpecificRunParameters(object):
257     """
258     Provide access to LaneSpecificRunParameters
259     """
260     def __init__(self, gerald):
261         self._gerald = gerald
262         self._lane = None
263
264     def _initalize_lanes(self):
265         """
266         build dictionary of LaneParameters
267         """
268         self._lanes = {}
269         tree = self._gerald.tree
270         analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
271         if analysis is not None:
272             self._extract_ga_analysis_type(analysis)
273         analysis = tree.find('Projects')
274         if analysis is not None:
275             self._extract_hiseq_analysis_type(analysis)
276
277     def _extract_ga_analysis_type(self, analysis):
278         # according to the pipeline specs I think their fields
279         # are sampleName_laneID, with sampleName defaulting to s
280         # since laneIDs are constant lets just try using
281         # those consistently.
282         for element in analysis:
283             sample, lane_id = element.tag.split('_')
284             self._lanes[int(lane_id)] = LaneParametersGA(
285                                           self._gerald, lane_id)
286
287     def _extract_hiseq_analysis_type(self, analysis):
288         """Extract from HiSeq style multiplexed analysis types"""
289         for element in analysis:
290             name = element.attrib['name']
291             self._lanes[name] = LaneParametersHiSeq(self._gerald,
292                                                     name,
293                                                     element)
294
295     def __iter__(self):
296         return self._lanes.iterkeys()
297     def __getitem__(self, key):
298         if self._lane is None:
299             self._initalize_lanes()
300         return self._lanes[key]
301     def get(self, key, default):
302         if self._lane is None:
303             self._initalize_lanes()
304         return self._lanes.get(key, None)
305     def keys(self):
306         if self._lane is None:
307             self._initalize_lanes()
308         return self._lanes.keys()
309     def values(self):
310         if self._lane is None:
311             self._initalize_lanes()
312         return self._lanes.values()
313     def items(self):
314         if self._lane is None:
315             self._initalize_lanes()
316         return self._lanes.items()
317     def __len__(self):
318         if self._lane is None:
319             self._initalize_lanes()
320         return len(self._lanes)
321
322
323 def gerald(pathname):
324     g = Gerald()
325     g.pathname = os.path.expanduser(pathname)
326     path, name = os.path.split(g.pathname)
327     LOGGER.info("Parsing gerald config.xml")
328     config_pathname = os.path.join(g.pathname, 'config.xml')
329     g.tree = ElementTree.parse(config_pathname).getroot()
330
331     # parse Summary.htm file
332     summary_xml = os.path.join(g.pathname, 'Summary.xml')
333     summary_htm = os.path.join(g.pathname, 'Summary.htm')
334     report_summary = os.path.join(g.pathname, '..', 'Data',
335                                   'reports', 'Summary', )
336     if os.path.exists(summary_xml):
337         LOGGER.info("Parsing Summary.xml")
338         g.summary = SummaryGA(summary_xml)
339         g.eland_results = eland(g.pathname, g)
340     elif os.path.exists(summary_htm):
341         LOGGER.info("Parsing Summary.htm")
342         g.summary = SummaryGA(summary_htm)
343         g.eland_results = eland(g.pathname, g)
344     elif os.path.isdir(report_summary):
345         LOGGER.info("Parsing %s" % (report_summary,))
346         g.summary = SummaryHiSeq(report_summary)
347
348     # parse eland files
349     return g
350
351 if __name__ == "__main__":
352   # quick test code
353   import sys
354   g = gerald(sys.argv[1])
355   #ElementTree.dump(g.get_elements())