Recent IPAR xml config blocks include the runfolder name
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
1 """
2 Extract information about the IPAR run
3
4 IPAR
5     class holding the properties we found
6 ipar
7     IPAR factory function initalized from a directory name
8 fromxml
9     IPAR factory function initalized from an xml dump from
10     the IPAR object.
11 """
12 __docformat__ = "restructuredtext en"
13
14 import datetime
15 from glob import glob
16 import logging
17 import os
18 import re
19 import stat
20 import time
21
22 from htsworkflow.pipelines import \
23    ElementTree, \
24    VERSION_RE, \
25    EUROPEAN_STRPTIME
26
27 LOGGER = logging.getLogger(__name__)
28 SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities')
29
30 class Tiles(object):
31   def __init__(self, tree):
32     self.tree = tree.find("TileSelection")
33
34   def keys(self):
35     key_list = []
36     for c in self.tree.getchildren():
37       k = c.attrib.get('Index', None)
38       if k is not None:
39         key_list.append(k)
40     return key_list
41
42   def values(self):
43     value_list = []
44     for lane in self.tree.getchildren():
45       attributes = {}
46       for child in lane.getchildren():
47         if child.tag == "Sample":
48           attributes['Sample'] = child.text
49         elif child.tag == 'TileRange':
50           attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
51       value_list.append(attributes)
52     return value_list
53
54   def items(self):
55     return zip(self.keys(), self.values())
56
57   def __getitem__(self, key):
58     # FIXME: this is inefficient. building the dictionary be rescanning the xml.
59     v = dict(self.items())
60     return v[key]
61
62 class IPAR(object):
63     XML_VERSION=1
64
65     # xml tag names
66     IPAR = 'IPAR'
67     TIMESTAMP = 'timestamp'
68     MATRIX = 'matrix'
69     RUN = 'Run'
70
71     def __init__(self, xml=None):
72         self.tree = None
73         self.date = datetime.datetime.today()
74         self._tiles = None
75         if xml is not None:
76             self.set_elements(xml)
77
78     def _get_runfolder_name(self):
79         """Return runfolder name"""
80         if self.tree is None:
81             raise ValueError("Can't query an empty run")
82         runfolder = self.tree.xpath('RunParameters/Runfolder')
83         return runfolder
84     runfolder_name = property(_get_runfolder)
85     
86     def _get_software(self):
87         """Return software name"""
88         if self.tree is None:
89             raise ValueError("Can't determine software name, please load a run")
90         software = self.tree.xpath('Software')
91         if len(software) == 0:
92           return None
93         elif len(software) > 1:
94             raise RuntimeError("Too many software tags, please update ipar.py")
95         else:
96             return software[0].attrib['Name']
97     software = property(_get_software)
98
99     def _get_time(self):
100         return time.mktime(self.date.timetuple())
101     def _set_time(self, value):
102         mtime_tuple = time.localtime(value)
103         self.date = datetime.datetime(*(mtime_tuple[0:7]))
104     time = property(_get_time, _set_time,
105                     doc='run time as seconds since epoch')
106
107     def _get_cycles(self):
108         if self.tree is None:
109           raise RuntimeError("get cycles called before xml tree initalized")
110         cycles = self.tree.find("Cycles")
111         assert cycles is not None
112         if cycles is None:
113           return None
114         return cycles.attrib
115
116     def _get_start(self):
117         """
118         return cycle start
119         """
120         cycles = self._get_cycles()
121         if cycles is not None:
122           return int(cycles['First'])
123         else:
124           return None
125     start = property(_get_start, doc="get cycle start")
126
127     def _get_stop(self):
128         """
129         return cycle stop
130         """
131         cycles = self._get_cycles()
132         if cycles is not None:
133           return int(cycles['Last'])
134         else:
135           return None
136     stop = property(_get_stop, doc="get cycle stop")
137
138     def _get_tiles(self):
139       if self._tiles is None:
140         self._tiles = Tiles(self.tree)
141       return self._tiles
142     tiles = property(_get_tiles)
143
144     def _get_version(self):
145       software = self.tree.find('Software')
146       if software is not None:
147         return software.attrib['Version']
148     version = property(_get_version, "IPAR software version")
149
150
151     def file_list(self):
152         """
153         Generate list of all files that should be generated by the IPAR unit
154         """
155         suffix_node = self.tree.find('RunParameters/CompressionSuffix')
156         if suffix_node is None:
157           print "find compression suffix failed"
158           return None
159         suffix = suffix_node.text
160         files = []
161         format = "%s_%s_%04d_%s.txt%s"
162         for lane, attrib in self.tiles.items():
163           for file_type in ["int","nse"]:
164             start, stop = attrib['TileRange']
165             for tile in range(start, stop+1):
166               files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
167         return files
168
169     def dump(self):
170         print "Matrix:", self.matrix
171         print "Tree:", self.tree
172
173     def get_elements(self):
174         attribs = {'version': str(IPAR.XML_VERSION) }
175         root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
176         timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
177         timestamp.text = str(int(self.time))
178         root.append(self.tree)
179         matrix = ElementTree.SubElement(root, IPAR.MATRIX)
180         matrix.text = self.matrix
181         return root
182
183     def set_elements(self, tree):
184         if tree.tag != IPAR.IPAR:
185             raise ValueError('Expected "IPAR" SubElements')
186         xml_version = int(tree.attrib.get('version', 0))
187         if xml_version > IPAR.XML_VERSION:
188             LOGGER.warn('IPAR XML tree is a higher version than this class')
189         for element in list(tree):
190             if element.tag == IPAR.RUN:
191                 self.tree = element
192             elif element.tag == IPAR.TIMESTAMP:
193                 self.time = int(element.text)
194             elif element.tag == IPAR.MATRIX:
195                 self.matrix = element.text
196             else:
197                 raise ValueError("Unrecognized tag: %s" % (element.tag,))
198
199 def load_ipar_param_tree(paramfile):
200     """
201     look for a .param file and load it if it is an IPAR tree
202     """
203
204     tree = ElementTree.parse(paramfile).getroot()
205     run = tree.find('Run')
206     if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
207         return run
208     else:
209         LOGGER.info("No run found")
210         return None
211
212 def ipar(pathname):
213     """
214     Examine the directory at pathname and initalize a IPAR object
215     """
216     LOGGER.info("Searching IPAR directory %s" % (pathname,))
217     i = IPAR()
218     i.pathname = pathname
219
220     # parse firecrest directory name
221     path, name = os.path.split(pathname)
222     groups = name.split('_')
223     if not (groups[0] == 'IPAR' or groups[0] == 'Intensities'):
224       raise ValueError('ipar can only process IPAR directories')
225
226     bustard_pattern = os.path.join(pathname, 'Bustard*')
227     # contents of the matrix file?
228     matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
229     if os.path.exists(matrix_pathname):
230         # this is IPAR_1.01
231         i.matrix = open(matrix_pathname, 'r').read()
232     elif glob(bustard_pattern) > 0:
233         i.matrix = None
234         # its still live.
235
236     # look for parameter xml file
237     paramfiles = [os.path.join(pathname, 'RTAConfig.xml'),
238                   os.path.join(pathname, 'config.xml'),
239                   os.path.join(path, '.params')]
240     for paramfile in paramfiles:
241         if os.path.exists(paramfile):
242             LOGGER.info("Found IPAR Config file at: %s" % ( paramfile, ))
243             i.tree = load_ipar_param_tree(paramfile)
244             mtime_local = os.stat(paramfile)[stat.ST_MTIME]
245             i.time = mtime_local
246             return i
247
248     return i
249
250 def fromxml(tree):
251     """
252     Initialize a IPAR object from an element tree node
253     """
254     f = IPAR()
255     f.set_elements(tree)
256     return f
257
258 #if __name__ == "__main__":
259   #i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
260   #x = i.get_elements()
261   #j = fromxml(x)
262   #ElementTree.dump(x)
263   #print j.date
264   #print j.start
265   #print j.stop
266   #print i.tiles.keys()
267   #print j.tiles.keys()
268   #print j.tiles.items()
269   #print j.file_list()