f818b3495fbf2fd2d8f2fba13d698188e7680d3f
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
1 """
2 Extract information about the IPAR run
3
4 IPAR
5     class holding the properties we found
6 ipar
7     IPAR factory function initalized from a directory name
8 fromxml
9     IPAR factory function initalized from an xml dump from
10     the IPAR object.
11 """
12 __docformat__ = "restructuredtext en"
13
14 import datetime
15 from glob import glob
16 import logging
17 import os
18 import re
19 import stat
20 import time
21
22 from htsworkflow.pipelines import \
23    ElementTree, \
24    VERSION_RE, \
25    EUROPEAN_STRPTIME
26
27 LOGGER = logging.getLogger(__name__)
28 SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities')
29
30 class Tiles(object):
31   def __init__(self, tree):
32     self.tree = tree.find("TileSelection")
33
34   def keys(self):
35     key_list = []
36     for c in self.tree.getchildren():
37       k = c.attrib.get('Index', None)
38       if k is not None:
39         key_list.append(k)
40     return key_list
41
42   def values(self):
43     value_list = []
44     for lane in self.tree.getchildren():
45       attributes = {}
46       for child in lane.getchildren():
47         if child.tag == "Sample":
48           attributes['Sample'] = child.text
49         elif child.tag == 'TileRange':
50           attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
51       value_list.append(attributes)
52     return value_list
53
54   def items(self):
55     return zip(self.keys(), self.values())
56
57   def __getitem__(self, key):
58     # FIXME: this is inefficient. building the dictionary be rescanning the xml.
59     v = dict(self.items())
60     return v[key]
61
62 class IPAR(object):
63     XML_VERSION=1
64
65     # xml tag names
66     IPAR = 'IPAR'
67     TIMESTAMP = 'timestamp'
68     MATRIX = 'matrix'
69     RUN = 'Run'
70
71     def __init__(self, xml=None):
72         self.tree = None
73         self.date = datetime.datetime.today()
74         self._tiles = None
75         if xml is not None:
76             self.set_elements(xml)
77
78     def _get_software(self):
79         """Return software name"""
80         if self.tree is None:
81             raise ValueError("Can't determine software name, please load a run")
82         software = self.tree.xpath('Software')
83         if len(software) == 0:
84           return None
85         elif len(software) > 1:
86             raise RuntimeError("Too many software tags, please update ipar.py")
87         else:
88             return software[0].attrib['Name']
89     software = property(_get_software)
90
91     def _get_time(self):
92         return time.mktime(self.date.timetuple())
93     def _set_time(self, value):
94         mtime_tuple = time.localtime(value)
95         self.date = datetime.datetime(*(mtime_tuple[0:7]))
96     time = property(_get_time, _set_time,
97                     doc='run time as seconds since epoch')
98
99     def _get_cycles(self):
100         if self.tree is None:
101           raise RuntimeError("get cycles called before xml tree initalized")
102         cycles = self.tree.find("Cycles")
103         assert cycles is not None
104         if cycles is None:
105           return None
106         return cycles.attrib
107
108     def _get_start(self):
109         """
110         return cycle start
111         """
112         cycles = self._get_cycles()
113         if cycles is not None:
114           return int(cycles['First'])
115         else:
116           return None
117     start = property(_get_start, doc="get cycle start")
118
119     def _get_stop(self):
120         """
121         return cycle stop
122         """
123         cycles = self._get_cycles()
124         if cycles is not None:
125           return int(cycles['Last'])
126         else:
127           return None
128     stop = property(_get_stop, doc="get cycle stop")
129
130     def _get_tiles(self):
131       if self._tiles is None:
132         self._tiles = Tiles(self.tree)
133       return self._tiles
134     tiles = property(_get_tiles)
135
136     def _get_version(self):
137       software = self.tree.find('Software')
138       if software is not None:
139         return software.attrib['Version']
140     version = property(_get_version, "IPAR software version")
141
142
143     def file_list(self):
144         """
145         Generate list of all files that should be generated by the IPAR unit
146         """
147         suffix_node = self.tree.find('RunParameters/CompressionSuffix')
148         if suffix_node is None:
149           print "find compression suffix failed"
150           return None
151         suffix = suffix_node.text
152         files = []
153         format = "%s_%s_%04d_%s.txt%s"
154         for lane, attrib in self.tiles.items():
155           for file_type in ["int","nse"]:
156             start, stop = attrib['TileRange']
157             for tile in range(start, stop+1):
158               files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
159         return files
160
161     def dump(self):
162         print "Matrix:", self.matrix
163         print "Tree:", self.tree
164
165     def get_elements(self):
166         attribs = {'version': str(IPAR.XML_VERSION) }
167         root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
168         timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
169         timestamp.text = str(int(self.time))
170         root.append(self.tree)
171         matrix = ElementTree.SubElement(root, IPAR.MATRIX)
172         matrix.text = self.matrix
173         return root
174
175     def set_elements(self, tree):
176         if tree.tag != IPAR.IPAR:
177             raise ValueError('Expected "IPAR" SubElements')
178         xml_version = int(tree.attrib.get('version', 0))
179         if xml_version > IPAR.XML_VERSION:
180             LOGGER.warn('IPAR XML tree is a higher version than this class')
181         for element in list(tree):
182             if element.tag == IPAR.RUN:
183                 self.tree = element
184             elif element.tag == IPAR.TIMESTAMP:
185                 self.time = int(element.text)
186             elif element.tag == IPAR.MATRIX:
187                 self.matrix = element.text
188             else:
189                 raise ValueError("Unrecognized tag: %s" % (element.tag,))
190
191 def load_ipar_param_tree(paramfile):
192     """
193     look for a .param file and load it if it is an IPAR tree
194     """
195
196     tree = ElementTree.parse(paramfile).getroot()
197     run = tree.find('Run')
198     if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
199         return run
200     else:
201         LOGGER.info("No run found")
202         return None
203
204 def ipar(pathname):
205     """
206     Examine the directory at pathname and initalize a IPAR object
207     """
208     LOGGER.info("Searching IPAR directory %s" % (pathname,))
209     i = IPAR()
210     i.pathname = pathname
211
212     # parse firecrest directory name
213     path, name = os.path.split(pathname)
214     groups = name.split('_')
215     if not (groups[0] == 'IPAR' or groups[0] == 'Intensities'):
216       raise ValueError('ipar can only process IPAR directories')
217
218     bustard_pattern = os.path.join(pathname, 'Bustard*')
219     # contents of the matrix file?
220     matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
221     if os.path.exists(matrix_pathname):
222         # this is IPAR_1.01
223         i.matrix = open(matrix_pathname, 'r').read()
224     elif glob(bustard_pattern) > 0:
225         i.matrix = None
226         # its still live.
227
228     # look for parameter xml file
229     paramfiles = [os.path.join(pathname, 'RTAConfig.xml'),
230                   os.path.join(pathname, 'config.xml'),
231                   os.path.join(path, '.params')]
232     for paramfile in paramfiles:
233         if os.path.exists(paramfile):
234             LOGGER.info("Found IPAR Config file at: %s" % ( paramfile, ))
235             i.tree = load_ipar_param_tree(paramfile)
236             mtime_local = os.stat(paramfile)[stat.ST_MTIME]
237             i.time = mtime_local
238             return i
239
240     return i
241
242 def fromxml(tree):
243     """
244     Initialize a IPAR object from an element tree node
245     """
246     f = IPAR()
247     f.set_elements(tree)
248     return f
249
250 #if __name__ == "__main__":
251   #i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
252   #x = i.get_elements()
253   #j = fromxml(x)
254   #ElementTree.dump(x)
255   #print j.date
256   #print j.start
257   #print j.stop
258   #print i.tiles.keys()
259   #print j.tiles.keys()
260   #print j.tiles.items()
261   #print j.file_list()