239239ecbf17aa1247d6185d8dd528b0e87eb11d
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
1 """
2 Extract information about the IPAR run
3
4 IPAR 
5     class holding the properties we found
6 ipar
7     IPAR factory function initalized from a directory name
8 fromxml
9     IPAR factory function initalized from an xml dump from
10     the IPAR object.
11 """
12 __docformat__ = "restructuredtext en"
13
14 import datetime
15 from glob import glob
16 import logging
17 import os
18 import re
19 import stat
20 import time
21
22 from htsworkflow.pipelines.runfolder import \
23    ElementTree, \
24    VERSION_RE, \
25    EUROPEAN_STRPTIME
26
27 class Tiles(object):
28   def __init__(self, tree):
29     self.tree = tree.find("TileSelection")
30
31   def keys(self):
32     key_list = []
33     for c in self.tree.getchildren():
34       k = c.attrib.get('Index', None)
35       if k is not None:
36         key_list.append(k)
37     return key_list
38
39   def values(self):
40     value_list = []
41     for lane in self.tree.getchildren():
42       attributes = {}
43       for child in lane.getchildren():
44         if child.tag == "Sample":
45           attributes['Sample'] = child.text
46         elif child.tag == 'TileRange':
47           attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
48       value_list.append(attributes)
49     return value_list
50
51   def items(self):
52     return zip(self.keys(), self.values())
53
54   def __getitem__(self, key):
55     # FIXME: this is inefficient. building the dictionary be rescanning the xml.
56     v = dict(self.items())
57     return v[key]
58
59 class IPAR(object):
60     XML_VERSION=1
61
62     # xml tag names
63     IPAR = 'IPAR'
64     TIMESTAMP = 'timestamp'
65     MATRIX = 'matrix'
66     RUN = 'Run'
67
68     def __init__(self, xml=None):
69         self.tree = None
70         self.date = datetime.datetime.today()
71         self._tiles = None
72         if xml is not None:
73             self.set_elements(xml)
74
75     def _get_time(self):
76         return time.mktime(self.date.timetuple())
77     def _set_time(self, value):
78         mtime_tuple = time.localtime(value)
79         self.date = datetime.datetime(*(mtime_tuple[0:7]))
80     time = property(_get_time, _set_time,
81                     doc='run time as seconds since epoch')
82
83     def _get_cycles(self):
84         if self.tree is None:
85           return None
86         cycles = self.tree.find("Cycles")
87         if cycles is None:
88           return None
89         return cycles.attrib
90
91     def _get_start(self):
92         """
93         return cycle start
94         """
95         cycles = self._get_cycles()
96         if cycles is not None:
97           return int(cycles['First'])
98         else:
99           return None
100     start = property(_get_start, doc="get cycle start")
101
102     def _get_stop(self):
103         """
104         return cycle stop
105         """
106         cycles = self._get_cycles()
107         if cycles is not None:
108           return int(cycles['Last'])
109         else:
110           return None
111     stop = property(_get_stop, doc="get cycle stop")
112
113     def _get_tiles(self):
114       if self._tiles is None:
115         self._tiles = Tiles(self.tree)
116       return self._tiles
117     tiles = property(_get_tiles)
118
119     def _get_version(self):
120       software = self.tree.find('Software')
121       if software is not None:
122         return software.attrib['Version']
123     version = property(_get_version, "IPAR software version")
124
125
126     def file_list(self):
127         """
128         Generate list of all files that should be generated by the IPAR unit
129         """
130         suffix_node = self.tree.find('RunParameters/CompressionSuffix')
131         if suffix_node is None:
132           print "find compression suffix failed"
133           return None
134         suffix = suffix_node.text
135         files = []
136         format = "%s_%s_%04d_%s.txt%s"
137         for lane, attrib in self.tiles.items():
138           for file_type in ["int","nse"]:
139             start, stop = attrib['TileRange']
140             for tile in range(start, stop+1):
141               files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
142         return files
143
144     def dump(self):
145         print "Matrix:", self.matrix
146         print "Tree:", self.tree
147
148     def get_elements(self):
149         attribs = {'version': str(IPAR.XML_VERSION) }
150         root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
151         timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
152         timestamp.text = str(int(self.time))
153         root.append(self.tree)
154         matrix = ElementTree.SubElement(root, IPAR.MATRIX)
155         matrix.text = self.matrix
156         return root
157
158     def set_elements(self, tree):
159         if tree.tag != IPAR.IPAR:
160             raise ValueError('Expected "IPAR" SubElements')
161         xml_version = int(tree.attrib.get('version', 0))
162         if xml_version > IPAR.XML_VERSION:
163             logging.warn('IPAR XML tree is a higher version than this class')
164         for element in list(tree):
165             if element.tag == IPAR.RUN:
166                 self.tree = element
167             elif element.tag == IPAR.TIMESTAMP:
168                 self.time = int(element.text)
169             elif element.tag == IPAR.MATRIX:
170                 self.matrix = element.text
171             else:
172                 raise ValueError("Unrecognized tag: %s" % (element.tag,))
173
174 def load_ipar_param_tree(paramfile):
175     """
176     look for a .param file and load it if it is an IPAR tree
177     """
178
179     tree = ElementTree.parse(paramfile).getroot()
180     run = tree.find('Run')
181     if run.attrib.has_key('Name') and run.attrib['Name'].startswith("IPAR"):
182         return run
183
184     return None
185
186 def ipar(pathname):
187     """
188     Examine the directory at pathname and initalize a IPAR object
189     """
190     logging.info("Searching IPAR directory")
191     i = IPAR()
192     i.pathname = pathname
193
194     # parse firecrest directory name
195     path, name = os.path.split(pathname)
196     groups = name.split('_')
197     if groups[0] != 'IPAR':
198       raise ValueError('ipar can only process IPAR directories')
199
200     bustard_pattern = os.path.join(pathname, 'Bustard*')
201     # contents of the matrix file?
202     matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
203     if os.path.exists(matrix_pathname):
204         # this is IPAR_1.01
205         i.matrix = open(matrix_pathname, 'r').read()
206     elif glob(bustard_pattern) > 0:
207         i.matrix = None
208         # its still live.
209     else:
210         return None
211
212     # look for parameter xml file
213     paramfile = os.path.join(path, '.params')
214     if os.path.exists(paramfile):
215       i.tree = load_ipar_param_tree(paramfile)
216       mtime_local = os.stat(paramfile)[stat.ST_MTIME]
217       i.time = mtime_local
218     return i
219
220 def fromxml(tree):
221     """
222     Initialize a IPAR object from an element tree node
223     """
224     f = IPAR()
225     f.set_elements(tree)
226     return f
227
228 if __name__ == "__main__":
229   i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
230   x = i.get_elements()
231   j = fromxml(x)
232   #ElementTree.dump(x)
233   print j.date
234   print j.start
235   print j.stop
236   print i.tiles.keys()
237   print j.tiles.keys()
238   print j.tiles.items()
239   print j.file_list()