whitespace-cleanup
[htsworkflow.git] / htsworkflow / pipelines / gerald.py
1 """Provide access to information stored in the GERALD directory.
2 """
3 from __future__ import print_function
4
5 import collections
6 from datetime import datetime, date
7 import logging
8 import os
9 import re
10 import stat
11 import time
12
13 from htsworkflow.pipelines.summary import Summary, SummaryGA, SummaryHiSeq
14 from htsworkflow.pipelines.eland import eland, ELAND
15 from htsworkflow.pipelines.samplekey import SampleKey
16
17 from htsworkflow.pipelines import \
18    ElementTree, \
19    EUROPEAN_STRPTIME, \
20    LANES_PER_FLOWCELL, \
21    VERSION_RE
22 from htsworkflow.util.ethelp import indent, flatten
23
24 LOGGER = logging.getLogger(__name__)
25
26 class Alignment(object):
27     """
28     Capture meaning out of the GERALD directory
29     """
30     XML_VERSION = 1
31     RUN_PARAMETERS='RunParameters'
32     SUMMARY='Summary'
33
34     def __init__(self, xml=None, pathname=None, tree=None):
35         self.pathname = pathname
36         self.tree = tree
37
38         # parse lane parameters out of the config.xml file
39         self.lanes = LaneSpecificRunParameters(self)
40
41         self.summary = None
42         self.eland_results = None
43
44         if xml is not None:
45             self.set_elements(xml)
46
47     def _get_date(self):
48         if self.pathname is not None:
49             epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
50             return datetime.fromtimestamp(epochstamp)
51         return datetime.today()
52
53     def _get_time(self):
54         return time.mktime(self.date.timetuple())
55     time = property(_get_time, doc='return run time as seconds since epoch')
56
57     def _get_chip_attribute(self, value):
58         return self.tree.findtext('ChipWideRunParameters/%s' % (value,))
59
60     def dump(self):
61         """
62         Debugging function, report current object
63         """
64         print('Software:'. self.__class__.__name__)
65         print('Alignment version:', self.version)
66         print('Run date:', self.date)
67         print('config.xml:', self.tree)
68         self.summary.dump()
69
70     def get_elements(self, root_tag):
71         if self.tree is None or self.summary is None:
72             return None
73
74         gerald = ElementTree.Element(root_tag,
75                                      {'version': unicode(Gerald.XML_VERSION)})
76         gerald.append(self.tree)
77         gerald.append(self.summary.get_elements())
78         if self.eland_results:
79             gerald.append(self.eland_results.get_elements())
80         return gerald
81
82     def set_elements(self, tree, root_tag):
83         if tree.tag !=  root_tag:
84             raise ValueError('expected %s' % (self.__class__.GERALD,))
85         xml_version = int(tree.attrib.get('version', 0))
86         if xml_version > Gerald.XML_VERSION:
87             LOGGER.warn('XML tree is a higher version than this class')
88         self.eland_results = ELAND()
89         for element in list(tree):
90             tag = element.tag.lower()
91             if tag == Gerald.RUN_PARAMETERS.lower():
92                 self.tree = element
93             elif tag == Gerald.SUMMARY.lower():
94                 self.summary = Summary(xml=element)
95             elif tag == ELAND.ELAND.lower():
96                 self.eland_results = ELAND(xml=element)
97             else:
98                 LOGGER.warn("Unrecognized tag %s" % (element.tag,))
99
100 class Gerald(Alignment):
101     GERALD='Gerald'
102
103     def _get_date(self):
104         if self.tree is None:
105             return datetime.today()
106
107         timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
108         if timestamp is not None:
109             epochstamp = time.mktime(time.strptime(timestamp))
110             return datetime.fromtimestamp(epochstamp)
111         return super(Gerald, self)._get_date()
112     date = property(_get_date)
113
114     def get_elements(self):
115         return super(Gerald, self).get_elements(Gerald.GERALD)
116
117     def set_elements(self, tree):
118         return super(Gerald, self).set_elements(tree, Gerald.GERALD)
119
120     def _get_experiment_root(self):
121         if self.tree is None:
122             return None
123         return self.tree.findtext('ChipWideRunParameters/EXPT_DIR_ROOT')
124
125     def _get_runfolder_name(self):
126         if self.tree is None:
127             return None
128
129         expt_root = os.path.normpath(self._get_experiment_root())
130         chip_expt_dir = self.tree.findtext('ChipWideRunParameters/EXPT_DIR')
131
132         if expt_root is not None and chip_expt_dir is not None:
133             experiment_dir = chip_expt_dir.replace(expt_root+os.path.sep, '')
134             experiment_dir = experiment_dir.split(os.path.sep)[0]
135
136         if experiment_dir is None or len(experiment_dir) == 0:
137             return None
138         return experiment_dir
139
140     runfolder_name = property(_get_runfolder_name)
141
142     def _get_software_version(self):
143         if self.tree is None:
144             return None
145         ga_version = self.tree.findtext(
146             'ChipWideRunParameters/SOFTWARE_VERSION')
147         if ga_version is not None:
148             gerald = re.match("@.*GERALD.pl,v (?P<version>\d+(\.\d+)+)",
149                              ga_version)
150             if gerald:
151                 return ('GERALD', gerald.group('version'))
152             casava = re.match('CASAVA-(?P<version>\d+[.\d]*)',
153                               ga_version)
154             if casava:
155                 return ('CASAVA', casava.group('version'))
156
157     def _get_software(self):
158         """Return name of analysis software package"""
159         software_version = self._get_software_version()
160         return software_version[0] if software_version is not None else None
161     software = property(_get_software)
162
163     def _get_version(self):
164         """Return version number of software package"""
165         software_version = self._get_software_version()
166         return software_version[1] if software_version is not None else None
167     version = property(_get_version)
168
169 class CASAVA(Alignment):
170     GERALD='Casava'
171
172     def __init__(self, xml=None, pathname=None, tree=None):
173         super(CASAVA, self).__init__(xml=xml, pathname=pathname, tree=tree)
174
175         self._add_timestamp()
176
177     def _add_timestamp(self):
178         """Manually add a time stamp to CASAVA runs"""
179         if self.tree is None:
180             return
181         if len(self.tree.xpath('TIME_STAMP')) == 0:
182             time_stamp = self.date.strftime('%a %b %d %H:%M:%S %Y')
183             time_element = ElementTree.Element('TIME_STAMP')
184             time_element.text = time_stamp
185             self.tree.append(time_element)
186
187     def _get_date(self):
188         if self.tree is None:
189             return None
190         time_element = self.tree.xpath('TIME_STAMP')
191         if len(time_element) == 1:
192             timetuple = time.strptime(
193                 time_element[0].text.strip(),
194                 "%a %b %d %H:%M:%S %Y")
195             return datetime(*timetuple[:6])
196         return super(CASAVA, self)._get_date()
197     date = property(_get_date)
198
199     def get_elements(self):
200         tree = super(CASAVA, self).get_elements(CASAVA.GERALD)
201         return tree
202
203     def set_elements(self, tree):
204         return super(CASAVA, self).set_elements(tree, CASAVA.GERALD)
205
206     def _get_runfolder_name(self):
207         if self.tree is None:
208             return None
209
210         # hiseqs renamed the experiment dir location
211         defaults_expt_dir = self.tree.findtext('Defaults/EXPT_DIR')
212         _, experiment_dir = os.path.split(defaults_expt_dir)
213
214         if experiment_dir is None or len(experiment_dir) == 0:
215             return None
216         return experiment_dir
217
218     runfolder_name = property(_get_runfolder_name)
219
220     def _get_software_version(self):
221         if self.tree is None:
222             return None
223         if self.tree is None:
224             return None
225         hiseq_software_node = self.tree.find('Software')
226         software_version = hiseq_software_node.attrib.get('Version',None)
227         if software_version is None:
228             return None
229         return software_version.split('-')
230
231     def _get_software(self):
232         software_version = self._get_software_version()
233         if software_version is None:
234             return None
235         return software_version[0]
236     software = property(_get_software)
237
238     def _get_version(self):
239         software_version = self._get_software_version()
240         if software_version is None:
241             return None
242         return software_version[1]
243     version = property(_get_version)
244
245
246 class LaneParameters(object):
247     """
248     Make it easy to access elements of LaneSpecificRunParameters from python
249     """
250     def __init__(self, gerald, lane_id):
251         self._gerald = gerald
252         self._lane_id = lane_id
253
254     def _get_analysis(self):
255         raise NotImplemented("abstract class")
256     analysis = property(_get_analysis)
257
258     def _get_eland_genome(self):
259         raise NotImplemented("abstract class")
260     eland_genome = property(_get_eland_genome)
261
262     def _get_read_length(self):
263         raise NotImplemented("abstract class")
264     read_length = property(_get_read_length)
265
266     def _get_use_bases(self):
267         raise NotImplemented("abstract class")
268     use_bases = property(_get_use_bases)
269
270
271 class LaneParametersGA(LaneParameters):
272     """
273     Make it easy to access elements of LaneSpecificRunParameters from python
274     """
275     def __init__(self, gerald, lane_id):
276         super(LaneParametersGA, self).__init__(gerald, lane_id)
277
278     def __get_attribute(self, xml_tag):
279         subtree = self._gerald.tree.find('LaneSpecificRunParameters')
280         container = subtree.find(xml_tag)
281         if container is None:
282             return None
283         if len(container.getchildren()) > LANES_PER_FLOWCELL:
284             raise RuntimeError('GERALD config.xml file changed')
285         lanes = [x.tag.split('_')[1] for x in container.getchildren()]
286         try:
287             index = lanes.index(self._lane_id)
288         except ValueError as e:
289             return None
290         element = container[index]
291         return element.text
292     def _get_analysis(self):
293         return self.__get_attribute('ANALYSIS')
294     analysis = property(_get_analysis)
295
296     def _get_eland_genome(self):
297         genome = self.__get_attribute('ELAND_GENOME')
298         # default to the chipwide parameters if there isn't an
299         # entry in the lane specific paramaters
300         if genome is None:
301             genome = self._gerald._get_chip_attribute('ELAND_GENOME')
302         # ignore flag value
303         if genome == 'Need_to_specify_ELAND_genome_directory':
304             genome = None
305         return genome
306     eland_genome = property(_get_eland_genome)
307
308     def _get_read_length(self):
309         read_length = self.__get_attribute('READ_LENGTH')
310         if read_length is None:
311             read_length = self._gerald._get_chip_attribute('READ_LENGTH')
312         return read_length
313     read_length = property(_get_read_length)
314
315     def _get_use_bases(self):
316         return self.__get_attribute('USE_BASES')
317     use_bases = property(_get_use_bases)
318
319
320 class LaneParametersHiSeq(LaneParameters):
321     """
322     Make it easy to access elements of LaneSpecificRunParameters from python
323     """
324     def __init__(self, gerald, lane_id, element):
325         super(LaneParametersHiSeq, self).__init__(gerald, lane_id)
326         self.element = element
327
328     def __get_attribute(self, xml_tag):
329         container = self.element.find(xml_tag)
330         if container is None:
331             return None
332         return container.text
333
334     def _get_analysis(self):
335         return self.__get_attribute('ANALYSIS')
336     analysis = property(_get_analysis)
337
338     def _get_eland_genome(self):
339         genome = self.__get_attribute('ELAND_GENOME')
340         # default to the chipwide parameters if there isn't an
341         # entry in the lane specific paramaters
342         if genome is None:
343             genome = self._gerald._get_chip_attribute('ELAND_GENOME')
344         # ignore flag value
345         if genome == 'Need_to_specify_ELAND_genome_directory':
346             genome = None
347         return genome
348     eland_genome = property(_get_eland_genome)
349
350     def _get_read_length(self):
351         return self.__get_attribute('READ_LENGTH1')
352     read_length = property(_get_read_length)
353
354     def _get_use_bases(self):
355         return self.__get_attribute('USE_BASES1')
356     use_bases = property(_get_use_bases)
357
358 class LaneSpecificRunParameters(collections.MutableMapping):
359     """
360     Provide access to LaneSpecificRunParameters
361     """
362     def __init__(self, gerald):
363         self._gerald = gerald
364         self._lanes = None
365
366     def _initialize_lanes(self):
367         """
368         build dictionary of LaneParameters
369         """
370         self._lanes = {}
371         tree = self._gerald.tree
372         analysis = tree.find('LaneSpecificRunParameters/ANALYSIS')
373         if analysis is not None:
374             self._extract_ga_analysis_type(analysis)
375         analysis = tree.find('Projects')
376         if analysis is not None:
377             self._extract_hiseq_analysis_type(analysis)
378
379     def _extract_ga_analysis_type(self, analysis):
380         # according to the pipeline specs I think their fields
381         # are sampleName_laneID, with sampleName defaulting to s
382         # since laneIDs are constant lets just try using
383         # those consistently.
384         for element in analysis:
385             sample, lane_id = element.tag.split('_')
386             key = SampleKey(lane=int(lane_id), sample=sample)
387             self._lanes[key] = LaneParametersGA(
388                                           self._gerald, lane_id)
389
390     def _extract_hiseq_analysis_type(self, analysis):
391         """Extract from HiSeq style multiplexed analysis types"""
392         for element in analysis:
393             name = element.attrib['name']
394             key = SampleKey(sample=name)
395             self._lanes[key] = LaneParametersHiSeq(self._gerald,
396                                                    name,
397                                                    element)
398
399     def __iter__(self):
400         if self._lanes is None:
401             self._initialize_lanes()
402         return self._lanes.iterkeys()
403
404     def __getitem__(self, key):
405         if self._lanes is None:
406             self._initialize_lanes()
407         value = self._lanes.get(key, None)
408         if value is not None:
409             return value
410         real_key = self._find_key(key)
411         if real_key is not None:
412             return self._lanes[real_key]
413         raise KeyError("%s not found in %s" % (
414             repr(key),
415             ",".join((repr(k) for k in self._lanes.keys()))))
416
417     def __setitem__(self, key, value):
418         if len(self._lanes) > 100:
419             LOGGER.warn("many projects loaded, consider improving dictionary")
420         real_key = self._find_key(key)
421         if real_key is not None:
422             key = real_key
423         self._lanes[key] = value
424
425     def __delitem__(self, key):
426         if key in self._lanes:
427             del self._lanes[key]
428         else:
429             real_key = self._find_key(key)
430             if real_key is not None:
431                 del self._lanes[real_key]
432
433     def __len__(self):
434         if self._lanes is None:
435             self._initialize_lanes()
436         return len(self._lanes)
437
438     def _find_key(self, lookup_key):
439         if not isinstance(lookup_key, SampleKey):
440             lookup_key = SampleKey(lane=lookup_key)
441
442         results = []
443         for k in self._lanes:
444             if k.matches(lookup_key):
445                 results.append(k)
446         if len(results) > 1:
447             errmsg = "Key %s matched multiple keys: %s"
448             raise ValueError(errmsg % (str(lookup_key),
449                                        ",".join((str(x) for x in results))))
450
451         elif len(results) == 1:
452             return results[0]
453         else:
454             return None
455
456 def gerald(pathname):
457     LOGGER.info("Parsing gerald config.xml")
458     pathname = os.path.expanduser(pathname)
459     config_pathname = os.path.join(pathname, 'config.xml')
460     config_tree = ElementTree.parse(config_pathname).getroot()
461
462     # parse Summary.htm file
463     summary_xml = os.path.join(pathname, 'Summary.xml')
464     summary_htm = os.path.join(pathname, 'Summary.htm')
465     report_summary = os.path.join(pathname, '..', 'Data',
466                                   'reports', 'Summary', )
467     if os.path.exists(summary_xml):
468         g = Gerald(pathname = pathname, tree=config_tree)
469         LOGGER.info("Parsing Summary.xml")
470         g.summary = SummaryGA(summary_xml)
471         g.eland_results = eland(g.pathname, g)
472     elif os.path.exists(summary_htm):
473         g = Gerald(pathname=pathname, tree=config_tree)
474         LOGGER.info("Parsing Summary.htm")
475         g.summary = SummaryGA(summary_htm)
476         g.eland_results = eland(g.pathname, g)
477     elif os.path.isdir(report_summary):
478         g = CASAVA(pathname=pathname, tree=config_tree)
479         LOGGER.info("Parsing %s" % (report_summary,))
480         g.summary = SummaryHiSeq(report_summary)
481         g.eland_results = eland(g.pathname, g)
482
483     # parse eland files
484     return g
485
486 if __name__ == "__main__":
487   # quick test code
488   import sys
489   g = gerald(sys.argv[1])
490   #ElementTree.dump(g.get_elements())