46758ec141af2629838e439ce0747642d8f024e6
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
1 """
2 Extract information about the IPAR run
3
4 IPAR
5     class holding the properties we found
6 ipar
7     IPAR factory function initalized from a directory name
8 fromxml
9     IPAR factory function initalized from an xml dump from
10     the IPAR object.
11 """
12 from __future__ import print_function
13
14 __docformat__ = "restructuredtext en"
15
16 import datetime
17 from glob import glob
18 import logging
19 import os
20 import re
21 import stat
22 import time
23
24 from htsworkflow.pipelines import \
25    ElementTree, \
26    VERSION_RE, \
27    EUROPEAN_STRPTIME
28
29 LOGGER = logging.getLogger(__name__)
30 SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities')
31
32 class Tiles(object):
33   def __init__(self, tree):
34     self.tree = tree.find("TileSelection")
35
36   def keys(self):
37     key_list = []
38     for c in self.tree.getchildren():
39       k = c.attrib.get('Index', None)
40       if k is not None:
41         key_list.append(k)
42     return key_list
43
44   def values(self):
45     value_list = []
46     for lane in self.tree.getchildren():
47       attributes = {}
48       for child in lane.getchildren():
49         if child.tag == "Sample":
50           attributes['Sample'] = child.text
51         elif child.tag == 'TileRange':
52           attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
53       value_list.append(attributes)
54     return value_list
55
56   def items(self):
57     return zip(self.keys(), self.values())
58
59   def __getitem__(self, key):
60     # FIXME: this is inefficient. building the dictionary be rescanning the xml.
61     v = dict(self.items())
62     return v[key]
63
64 class IPAR(object):
65     XML_VERSION=1
66
67     # xml tag names
68     IPAR = 'IPAR'
69     TIMESTAMP = 'timestamp'
70     MATRIX = 'matrix'
71     RUN = 'Run'
72
73     def __init__(self, xml=None):
74         self.tree = None
75         self.date = datetime.datetime.today()
76         self._tiles = None
77         if xml is not None:
78             self.set_elements(xml)
79
80     def _get_runfolder_name(self):
81         """Return runfolder name"""
82         if self.tree is None:
83             raise ValueError("Can't query an empty run")
84         runfolder = self.tree.xpath('RunParameters/RunFolder')
85         if len(runfolder) == 0:
86             return None
87         elif len(runfolder) > 1:
88             raise RuntimeError("RunXml parse error looking for RunFolder")
89         else:
90             return runfolder[0].text
91     runfolder_name = property(_get_runfolder_name)
92     
93     def _get_software(self):
94         """Return software name"""
95         if self.tree is None:
96             raise ValueError("Can't determine software name, please load a run")
97         software = self.tree.xpath('Software')
98         if len(software) == 0:
99           return None
100         elif len(software) > 1:
101             raise RuntimeError("Too many software tags, please update ipar.py")
102         else:
103             return software[0].attrib['Name']
104     software = property(_get_software)
105
106     def _get_time(self):
107         return time.mktime(self.date.timetuple())
108     def _set_time(self, value):
109         mtime_tuple = time.localtime(value)
110         self.date = datetime.datetime(*(mtime_tuple[0:7]))
111     time = property(_get_time, _set_time,
112                     doc='run time as seconds since epoch')
113
114     def _get_cycles(self):
115         if self.tree is None:
116           raise RuntimeError("get cycles called before xml tree initalized")
117         cycles = self.tree.find("Cycles")
118         assert cycles is not None
119         if cycles is None:
120           return None
121         return cycles.attrib
122
123     def _get_start(self):
124         """
125         return cycle start
126         """
127         cycles = self._get_cycles()
128         if cycles is not None:
129           return int(cycles['First'])
130         else:
131           return None
132     start = property(_get_start, doc="get cycle start")
133
134     def _get_stop(self):
135         """
136         return cycle stop
137         """
138         cycles = self._get_cycles()
139         if cycles is not None:
140           return int(cycles['Last'])
141         else:
142           return None
143     stop = property(_get_stop, doc="get cycle stop")
144
145     def _get_tiles(self):
146       if self._tiles is None:
147         self._tiles = Tiles(self.tree)
148       return self._tiles
149     tiles = property(_get_tiles)
150
151     def _get_version(self):
152       software = self.tree.find('Software')
153       if software is not None:
154         return software.attrib['Version']
155     version = property(_get_version, "IPAR software version")
156
157
158     def file_list(self):
159         """
160         Generate list of all files that should be generated by the IPAR unit
161         """
162         suffix_node = self.tree.find('RunParameters/CompressionSuffix')
163         if suffix_node is None:
164           print("find compression suffix failed")
165           return None
166         suffix = suffix_node.text
167         files = []
168         format = "%s_%s_%04d_%s.txt%s"
169         for lane, attrib in self.tiles.items():
170           for file_type in ["int","nse"]:
171             start, stop = attrib['TileRange']
172             for tile in range(start, stop+1):
173               files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
174         return files
175
176     def dump(self):
177         print("Matrix:", self.matrix)
178         print("Tree:", self.tree)
179
180     def get_elements(self):
181         attribs = {'version': str(IPAR.XML_VERSION) }
182         root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
183         timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
184         timestamp.text = str(int(self.time))
185         root.append(self.tree)
186         matrix = ElementTree.SubElement(root, IPAR.MATRIX)
187         matrix.text = self.matrix
188         return root
189
190     def set_elements(self, tree):
191         if tree.tag != IPAR.IPAR:
192             raise ValueError('Expected "IPAR" SubElements')
193         xml_version = int(tree.attrib.get('version', 0))
194         if xml_version > IPAR.XML_VERSION:
195             LOGGER.warn('IPAR XML tree is a higher version than this class')
196         for element in list(tree):
197             if element.tag == IPAR.RUN:
198                 self.tree = element
199             elif element.tag == IPAR.TIMESTAMP:
200                 self.time = int(element.text)
201             elif element.tag == IPAR.MATRIX:
202                 self.matrix = element.text
203             else:
204                 raise ValueError("Unrecognized tag: %s" % (element.tag,))
205
206 def load_ipar_param_tree(paramfile):
207     """
208     look for a .param file and load it if it is an IPAR tree
209     """
210
211     tree = ElementTree.parse(paramfile).getroot()
212     run = tree.find('Run')
213     if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
214         return run
215     else:
216         LOGGER.info("No run found")
217         return None
218
219 def ipar(pathname):
220     """
221     Examine the directory at pathname and initalize a IPAR object
222     """
223     LOGGER.info("Searching IPAR directory %s" % (pathname,))
224     i = IPAR()
225     i.pathname = pathname
226
227     # parse firecrest directory name
228     path, name = os.path.split(pathname)
229     groups = name.split('_')
230     if not (groups[0] == 'IPAR' or groups[0] == 'Intensities'):
231       raise ValueError('ipar can only process IPAR directories')
232
233     bustard_pattern = os.path.join(pathname, 'Bustard*')
234     # contents of the matrix file?
235     matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
236     if os.path.exists(matrix_pathname):
237         # this is IPAR_1.01
238         i.matrix = open(matrix_pathname, 'r').read()
239     elif glob(bustard_pattern) > 0:
240         i.matrix = None
241         # its still live.
242
243     # look for parameter xml file
244     paramfiles = [os.path.join(pathname, 'RTAConfig.xml'),
245                   os.path.join(pathname, 'config.xml'),
246                   os.path.join(path, '.params')]
247     for paramfile in paramfiles:
248         if os.path.exists(paramfile):
249             LOGGER.info("Found IPAR Config file at: %s" % ( paramfile, ))
250             i.tree = load_ipar_param_tree(paramfile)
251             mtime_local = os.stat(paramfile)[stat.ST_MTIME]
252             i.time = mtime_local
253             return i
254
255     return i
256
257 def fromxml(tree):
258     """
259     Initialize a IPAR object from an element tree node
260     """
261     f = IPAR()
262     f.set_elements(tree)
263     return f
264
265 #if __name__ == "__main__":
266   #i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
267   #x = i.get_elements()
268   #j = fromxml(x)
269   #ElementTree.dump(x)
270   #print j.date
271   #print j.start
272   #print j.stop
273   #print i.tiles.keys()
274   #print j.tiles.keys()
275   #print j.tiles.items()
276   #print j.file_list()