Try to make runfolder results extraction more robust
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
1 """
2 Extract information about the IPAR run
3
4 IPAR - class holding the properties we found
5 IPAR - IPAR factory function initalized from a directory name
6 fromxml - IPAR factory function initalized from an xml dump from
7           the IPAR object.
8 """
9
10 import datetime
11 import logging
12 import os
13 import re
14 import stat
15 import time
16
17 from htsworkflow.pipelines.runfolder import \
18    ElementTree, \
19    VERSION_RE, \
20    EUROPEAN_STRPTIME
21
22 class Tiles(object):
23   def __init__(self, tree):
24     self.tree = tree.find("TileSelection")
25
26   def keys(self):
27     key_list = []
28     for c in self.tree.getchildren():
29       k = c.attrib.get('Index', None)
30       if k is not None:
31         key_list.append(k)
32     return key_list
33
34   def values(self):
35     value_list = []
36     for lane in self.tree.getchildren():
37       attributes = {}
38       for child in lane.getchildren():
39         if child.tag == "Sample":
40           attributes['Sample'] = child.text
41         elif child.tag == 'TileRange':
42           attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
43       value_list.append(attributes)
44     return value_list
45
46   def items(self):
47     return zip(self.keys(), self.values())
48
49   def __getitem__(self, key):
50     # FIXME: this is inefficient. building the dictionary be rescanning the xml.
51     v = dict(self.items())
52     return v[key]
53
54 class IPAR(object):
55     XML_VERSION=1
56
57     # xml tag names
58     IPAR = 'IPAR'
59     TIMESTAMP = 'timestamp'
60     MATRIX = 'matrix'
61     RUN = 'Run'
62
63     def __init__(self, xml=None):
64         self.tree = None
65         self.date = datetime.datetime.today()
66         self._tiles = None
67         if xml is not None:
68             self.set_elements(xml)
69
70     def _get_time(self):
71         return time.mktime(self.date.timetuple())
72     def _set_time(self, value):
73         mtime_tuple = time.localtime(value)
74         self.date = datetime.datetime(*(mtime_tuple[0:7]))
75     time = property(_get_time, _set_time,
76                     doc='run time as seconds since epoch')
77
78     def _get_cycles(self):
79         if self.tree is None:
80           return None
81         cycles = self.tree.find("Cycles")
82         if cycles is None:
83           return None
84         return cycles.attrib
85
86     def _get_start(self):
87         """
88         return cycle start
89         """
90         cycles = self._get_cycles()
91         if cycles is not None:
92           return int(cycles['First'])
93         else:
94           return None
95     start = property(_get_start, doc="get cycle start")
96
97     def _get_stop(self):
98         """
99         return cycle stop
100         """
101         cycles = self._get_cycles()
102         if cycles is not None:
103           return int(cycles['Last'])
104         else:
105           return None
106     stop = property(_get_stop, doc="get cycle stop")
107
108     def _get_tiles(self):
109       if self._tiles is None:
110         self._tiles = Tiles(self.tree)
111       return self._tiles
112     tiles = property(_get_tiles)
113
114     def _get_version(self):
115       software = self.tree.find('Software')
116       if software is not None:
117         return software.attrib['Version']
118     version = property(_get_version, "IPAR software version")
119
120
121     def file_list(self):
122         """
123         Generate list of all files that should be generated by the IPAR unit
124         """
125         suffix_node = self.tree.find('RunParameters/CompressionSuffix')
126         if suffix_node is None:
127           print "find compression suffix failed"
128           return None
129         suffix = suffix_node.text
130         files = []
131         format = "%s_%s_%04d_%s.txt%s"
132         for lane, attrib in self.tiles.items():
133           for file_type in ["int","nse"]:
134             start, stop = attrib['TileRange']
135             for tile in range(start, stop+1):
136               files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
137         return files
138
139     def dump(self):
140         print "Matrix:", self.matrix
141         print "Tree:", self.tree
142
143     def get_elements(self):
144         attribs = {'version': str(IPAR.XML_VERSION) }
145         root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
146         timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
147         timestamp.text = str(int(self.time))
148         root.append(self.tree)
149         matrix = ElementTree.SubElement(root, IPAR.MATRIX)
150         matrix.text = self.matrix
151         return root
152
153     def set_elements(self, tree):
154         if tree.tag != IPAR.IPAR:
155             raise ValueError('Expected "IPAR" SubElements')
156         xml_version = int(tree.attrib.get('version', 0))
157         if xml_version > IPAR.XML_VERSION:
158             logging.warn('IPAR XML tree is a higher version than this class')
159         for element in list(tree):
160             if element.tag == IPAR.RUN:
161                 self.tree = element
162             elif element.tag == IPAR.TIMESTAMP:
163                 self.time = int(element.text)
164             elif element.tag == IPAR.MATRIX:
165                 self.matrix = element.text
166             else:
167                 raise ValueError("Unrecognized tag: %s" % (element.tag,))
168
169 def load_ipar_param_tree(paramfile):
170     """
171     look for a .param file and load it if it is an IPAR tree
172     """
173
174     tree = ElementTree.parse(paramfile).getroot()
175     run = tree.find('Run')
176     if run.attrib.has_key('Name') and run.attrib['Name'].startswith("IPAR"):
177         return run
178
179     return None
180
181 def ipar(pathname):
182     """
183     Examine the directory at pathname and initalize a IPAR object
184     """
185     logging.info("Searching IPAR directory")
186     i = IPAR()
187
188     # parse firecrest directory name
189     path, name = os.path.split(pathname)
190     groups = name.split('_')
191     if groups[0] != 'IPAR':
192       raise ValueError('ipar can only process IPAR directories')
193
194     # contents of the matrix file?
195     matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
196     if not os.path.exists(matrix_pathname):
197         return None
198     i.matrix = open(matrix_pathname, 'r').read()
199
200     # look for parameter xml file
201     paramfile = os.path.join(path, '.params')
202     if os.path.exists(paramfile):
203       i.tree = load_ipar_param_tree(paramfile)
204       mtime_local = os.stat(paramfile)[stat.ST_MTIME]
205       i.time = mtime_local
206     return i
207
208 def fromxml(tree):
209     """
210     Initialize a IPAR object from an element tree node
211     """
212     f = IPAR()
213     f.set_elements(tree)
214     return f
215
216 if __name__ == "__main__":
217   i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
218   x = i.get_elements()
219   j = fromxml(x)
220   #ElementTree.dump(x)
221   print j.date
222   print j.start
223   print j.stop
224   print i.tiles.keys()
225   print j.tiles.keys()
226   print j.tiles.items()
227   print j.file_list()