After implementing proper tests get a working implementation of runfolder name from...
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
1 """
2 Extract information about the IPAR run
3
4 IPAR
5     class holding the properties we found
6 ipar
7     IPAR factory function initalized from a directory name
8 fromxml
9     IPAR factory function initalized from an xml dump from
10     the IPAR object.
11 """
12 __docformat__ = "restructuredtext en"
13
14 import datetime
15 from glob import glob
16 import logging
17 import os
18 import re
19 import stat
20 import time
21
22 from htsworkflow.pipelines.runfolder import \
23    ElementTree, \
24    VERSION_RE, \
25    EUROPEAN_STRPTIME
26
27 LOGGER = logging.getLogger(__name__)
28 SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities')
29
30 class Tiles(object):
31   def __init__(self, tree):
32     self.tree = tree.find("TileSelection")
33
34   def keys(self):
35     key_list = []
36     for c in self.tree.getchildren():
37       k = c.attrib.get('Index', None)
38       if k is not None:
39         key_list.append(k)
40     return key_list
41
42   def values(self):
43     value_list = []
44     for lane in self.tree.getchildren():
45       attributes = {}
46       for child in lane.getchildren():
47         if child.tag == "Sample":
48           attributes['Sample'] = child.text
49         elif child.tag == 'TileRange':
50           attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
51       value_list.append(attributes)
52     return value_list
53
54   def items(self):
55     return zip(self.keys(), self.values())
56
57   def __getitem__(self, key):
58     # FIXME: this is inefficient. building the dictionary be rescanning the xml.
59     v = dict(self.items())
60     return v[key]
61
62 class IPAR(object):
63     XML_VERSION=1
64
65     # xml tag names
66     IPAR = 'IPAR'
67     TIMESTAMP = 'timestamp'
68     MATRIX = 'matrix'
69     RUN = 'Run'
70
71     def __init__(self, xml=None):
72         self.tree = None
73         self.date = datetime.datetime.today()
74         self._tiles = None
75         if xml is not None:
76             self.set_elements(xml)
77
78     def _get_runfolder_name(self):
79         """Return runfolder name"""
80         if self.tree is None:
81             raise ValueError("Can't query an empty run")
82         runfolder = self.tree.xpath('RunParameters/RunFolder')
83         if len(runfolder) == 0:
84             return None
85         elif len(runfolder) > 1:
86             raise RuntimeError("RunXml parse error looking for RunFolder")
87         else:
88             return runfolder[0].text
89     runfolder_name = property(_get_runfolder_name)
90     
91     def _get_software(self):
92         """Return software name"""
93         if self.tree is None:
94             raise ValueError("Can't determine software name, please load a run")
95         software = self.tree.xpath('Software')
96         if len(software) == 0:
97           return None
98         elif len(software) > 1:
99             raise RuntimeError("Too many software tags, please update ipar.py")
100         else:
101             return software[0].attrib['Name']
102     software = property(_get_software)
103
104     def _get_time(self):
105         return time.mktime(self.date.timetuple())
106     def _set_time(self, value):
107         mtime_tuple = time.localtime(value)
108         self.date = datetime.datetime(*(mtime_tuple[0:7]))
109     time = property(_get_time, _set_time,
110                     doc='run time as seconds since epoch')
111
112     def _get_cycles(self):
113         if self.tree is None:
114           raise RuntimeError("get cycles called before xml tree initalized")
115         cycles = self.tree.find("Cycles")
116         assert cycles is not None
117         if cycles is None:
118           return None
119         return cycles.attrib
120
121     def _get_start(self):
122         """
123         return cycle start
124         """
125         cycles = self._get_cycles()
126         if cycles is not None:
127           return int(cycles['First'])
128         else:
129           return None
130     start = property(_get_start, doc="get cycle start")
131
132     def _get_stop(self):
133         """
134         return cycle stop
135         """
136         cycles = self._get_cycles()
137         if cycles is not None:
138           return int(cycles['Last'])
139         else:
140           return None
141     stop = property(_get_stop, doc="get cycle stop")
142
143     def _get_tiles(self):
144       if self._tiles is None:
145         self._tiles = Tiles(self.tree)
146       return self._tiles
147     tiles = property(_get_tiles)
148
149     def _get_version(self):
150       software = self.tree.find('Software')
151       if software is not None:
152         return software.attrib['Version']
153     version = property(_get_version, "IPAR software version")
154
155
156     def file_list(self):
157         """
158         Generate list of all files that should be generated by the IPAR unit
159         """
160         suffix_node = self.tree.find('RunParameters/CompressionSuffix')
161         if suffix_node is None:
162           print "find compression suffix failed"
163           return None
164         suffix = suffix_node.text
165         files = []
166         format = "%s_%s_%04d_%s.txt%s"
167         for lane, attrib in self.tiles.items():
168           for file_type in ["int","nse"]:
169             start, stop = attrib['TileRange']
170             for tile in range(start, stop+1):
171               files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
172         return files
173
174     def dump(self):
175         print "Matrix:", self.matrix
176         print "Tree:", self.tree
177
178     def get_elements(self):
179         attribs = {'version': str(IPAR.XML_VERSION) }
180         root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
181         timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
182         timestamp.text = str(int(self.time))
183         root.append(self.tree)
184         matrix = ElementTree.SubElement(root, IPAR.MATRIX)
185         matrix.text = self.matrix
186         return root
187
188     def set_elements(self, tree):
189         if tree.tag != IPAR.IPAR:
190             raise ValueError('Expected "IPAR" SubElements')
191         xml_version = int(tree.attrib.get('version', 0))
192         if xml_version > IPAR.XML_VERSION:
193             LOGGER.warn('IPAR XML tree is a higher version than this class')
194         for element in list(tree):
195             if element.tag == IPAR.RUN:
196                 self.tree = element
197             elif element.tag == IPAR.TIMESTAMP:
198                 self.time = int(element.text)
199             elif element.tag == IPAR.MATRIX:
200                 self.matrix = element.text
201             else:
202                 raise ValueError("Unrecognized tag: %s" % (element.tag,))
203
204 def load_ipar_param_tree(paramfile):
205     """
206     look for a .param file and load it if it is an IPAR tree
207     """
208
209     tree = ElementTree.parse(paramfile).getroot()
210     run = tree.find('Run')
211     if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
212         return run
213     else:
214         LOGGER.info("No run found")
215         return None
216
217 def ipar(pathname):
218     """
219     Examine the directory at pathname and initalize a IPAR object
220     """
221     LOGGER.info("Searching IPAR directory %s" % (pathname,))
222     i = IPAR()
223     i.pathname = pathname
224
225     # parse firecrest directory name
226     path, name = os.path.split(pathname)
227     groups = name.split('_')
228     if not (groups[0] == 'IPAR' or groups[0] == 'Intensities'):
229       raise ValueError('ipar can only process IPAR directories')
230
231     bustard_pattern = os.path.join(pathname, 'Bustard*')
232     # contents of the matrix file?
233     matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
234     if os.path.exists(matrix_pathname):
235         # this is IPAR_1.01
236         i.matrix = open(matrix_pathname, 'r').read()
237     elif glob(bustard_pattern) > 0:
238         i.matrix = None
239         # its still live.
240
241     # look for parameter xml file
242     paramfiles = [os.path.join(pathname, 'RTAConfig.xml'),
243                   os.path.join(pathname, 'config.xml'),
244                   os.path.join(path, '.params')]
245     for paramfile in paramfiles:
246         if os.path.exists(paramfile):
247             LOGGER.info("Found IPAR Config file at: %s" % ( paramfile, ))
248             i.tree = load_ipar_param_tree(paramfile)
249             mtime_local = os.stat(paramfile)[stat.ST_MTIME]
250             i.time = mtime_local
251             return i
252
253     return i
254
255 def fromxml(tree):
256     """
257     Initialize a IPAR object from an element tree node
258     """
259     f = IPAR()
260     f.set_elements(tree)
261     return f
262
263 #if __name__ == "__main__":
264   #i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
265   #x = i.get_elements()
266   #j = fromxml(x)
267   #ElementTree.dump(x)
268   #print j.date
269   #print j.start
270   #print j.stop
271   #print i.tiles.keys()
272   #print j.tiles.keys()
273   #print j.tiles.items()
274   #print j.file_list()