make our API docstrings more epydoc friendly
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
1 """
2 Extract information about the IPAR run
3
4 IPAR 
5     class holding the properties we found
6 ipar
7     IPAR factory function initalized from a directory name
8 fromxml
9     IPAR factory function initalized from an xml dump from
10     the IPAR object.
11 """
12 __docformat__ = "restructuredtext en"
13
14 import datetime
15 import logging
16 import os
17 import re
18 import stat
19 import time
20
21 from htsworkflow.pipelines.runfolder import \
22    ElementTree, \
23    VERSION_RE, \
24    EUROPEAN_STRPTIME
25
26 class Tiles(object):
27   def __init__(self, tree):
28     self.tree = tree.find("TileSelection")
29
30   def keys(self):
31     key_list = []
32     for c in self.tree.getchildren():
33       k = c.attrib.get('Index', None)
34       if k is not None:
35         key_list.append(k)
36     return key_list
37
38   def values(self):
39     value_list = []
40     for lane in self.tree.getchildren():
41       attributes = {}
42       for child in lane.getchildren():
43         if child.tag == "Sample":
44           attributes['Sample'] = child.text
45         elif child.tag == 'TileRange':
46           attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
47       value_list.append(attributes)
48     return value_list
49
50   def items(self):
51     return zip(self.keys(), self.values())
52
53   def __getitem__(self, key):
54     # FIXME: this is inefficient. building the dictionary be rescanning the xml.
55     v = dict(self.items())
56     return v[key]
57
58 class IPAR(object):
59     XML_VERSION=1
60
61     # xml tag names
62     IPAR = 'IPAR'
63     TIMESTAMP = 'timestamp'
64     MATRIX = 'matrix'
65     RUN = 'Run'
66
67     def __init__(self, xml=None):
68         self.tree = None
69         self.date = datetime.datetime.today()
70         self._tiles = None
71         if xml is not None:
72             self.set_elements(xml)
73
74     def _get_time(self):
75         return time.mktime(self.date.timetuple())
76     def _set_time(self, value):
77         mtime_tuple = time.localtime(value)
78         self.date = datetime.datetime(*(mtime_tuple[0:7]))
79     time = property(_get_time, _set_time,
80                     doc='run time as seconds since epoch')
81
82     def _get_cycles(self):
83         if self.tree is None:
84           return None
85         cycles = self.tree.find("Cycles")
86         if cycles is None:
87           return None
88         return cycles.attrib
89
90     def _get_start(self):
91         """
92         return cycle start
93         """
94         cycles = self._get_cycles()
95         if cycles is not None:
96           return int(cycles['First'])
97         else:
98           return None
99     start = property(_get_start, doc="get cycle start")
100
101     def _get_stop(self):
102         """
103         return cycle stop
104         """
105         cycles = self._get_cycles()
106         if cycles is not None:
107           return int(cycles['Last'])
108         else:
109           return None
110     stop = property(_get_stop, doc="get cycle stop")
111
112     def _get_tiles(self):
113       if self._tiles is None:
114         self._tiles = Tiles(self.tree)
115       return self._tiles
116     tiles = property(_get_tiles)
117
118     def _get_version(self):
119       software = self.tree.find('Software')
120       if software is not None:
121         return software.attrib['Version']
122     version = property(_get_version, "IPAR software version")
123
124
125     def file_list(self):
126         """
127         Generate list of all files that should be generated by the IPAR unit
128         """
129         suffix_node = self.tree.find('RunParameters/CompressionSuffix')
130         if suffix_node is None:
131           print "find compression suffix failed"
132           return None
133         suffix = suffix_node.text
134         files = []
135         format = "%s_%s_%04d_%s.txt%s"
136         for lane, attrib in self.tiles.items():
137           for file_type in ["int","nse"]:
138             start, stop = attrib['TileRange']
139             for tile in range(start, stop+1):
140               files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
141         return files
142
143     def dump(self):
144         print "Matrix:", self.matrix
145         print "Tree:", self.tree
146
147     def get_elements(self):
148         attribs = {'version': str(IPAR.XML_VERSION) }
149         root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
150         timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
151         timestamp.text = str(int(self.time))
152         root.append(self.tree)
153         matrix = ElementTree.SubElement(root, IPAR.MATRIX)
154         matrix.text = self.matrix
155         return root
156
157     def set_elements(self, tree):
158         if tree.tag != IPAR.IPAR:
159             raise ValueError('Expected "IPAR" SubElements')
160         xml_version = int(tree.attrib.get('version', 0))
161         if xml_version > IPAR.XML_VERSION:
162             logging.warn('IPAR XML tree is a higher version than this class')
163         for element in list(tree):
164             if element.tag == IPAR.RUN:
165                 self.tree = element
166             elif element.tag == IPAR.TIMESTAMP:
167                 self.time = int(element.text)
168             elif element.tag == IPAR.MATRIX:
169                 self.matrix = element.text
170             else:
171                 raise ValueError("Unrecognized tag: %s" % (element.tag,))
172
173 def load_ipar_param_tree(paramfile):
174     """
175     look for a .param file and load it if it is an IPAR tree
176     """
177
178     tree = ElementTree.parse(paramfile).getroot()
179     run = tree.find('Run')
180     if run.attrib.has_key('Name') and run.attrib['Name'].startswith("IPAR"):
181         return run
182
183     return None
184
185 def ipar(pathname):
186     """
187     Examine the directory at pathname and initalize a IPAR object
188     """
189     logging.info("Searching IPAR directory")
190     i = IPAR()
191
192     # parse firecrest directory name
193     path, name = os.path.split(pathname)
194     groups = name.split('_')
195     if groups[0] != 'IPAR':
196       raise ValueError('ipar can only process IPAR directories')
197
198     # contents of the matrix file?
199     matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
200     if not os.path.exists(matrix_pathname):
201         return None
202     i.matrix = open(matrix_pathname, 'r').read()
203
204     # look for parameter xml file
205     paramfile = os.path.join(path, '.params')
206     if os.path.exists(paramfile):
207       i.tree = load_ipar_param_tree(paramfile)
208       mtime_local = os.stat(paramfile)[stat.ST_MTIME]
209       i.time = mtime_local
210     return i
211
212 def fromxml(tree):
213     """
214     Initialize a IPAR object from an element tree node
215     """
216     f = IPAR()
217     f.set_elements(tree)
218     return f
219
220 if __name__ == "__main__":
221   i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
222   x = i.get_elements()
223   j = fromxml(x)
224   #ElementTree.dump(x)
225   print j.date
226   print j.start
227   print j.stop
228   print i.tiles.keys()
229   print j.tiles.keys()
230   print j.tiles.items()
231   print j.file_list()