Add support for extracting data out of Illumina's new RTA runfolder.
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
1 """
2 Extract information about the IPAR run
3
4 IPAR 
5     class holding the properties we found
6 ipar
7     IPAR factory function initalized from a directory name
8 fromxml
9     IPAR factory function initalized from an xml dump from
10     the IPAR object.
11 """
12 __docformat__ = "restructuredtext en"
13
14 import datetime
15 from glob import glob
16 import logging
17 import os
18 import re
19 import stat
20 import time
21
22 from htsworkflow.pipelines.runfolder import \
23    ElementTree, \
24    VERSION_RE, \
25    EUROPEAN_STRPTIME
26
27 SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities')
28
29 class Tiles(object):
30   def __init__(self, tree):
31     self.tree = tree.find("TileSelection")
32
33   def keys(self):
34     key_list = []
35     for c in self.tree.getchildren():
36       k = c.attrib.get('Index', None)
37       if k is not None:
38         key_list.append(k)
39     return key_list
40
41   def values(self):
42     value_list = []
43     for lane in self.tree.getchildren():
44       attributes = {}
45       for child in lane.getchildren():
46         if child.tag == "Sample":
47           attributes['Sample'] = child.text
48         elif child.tag == 'TileRange':
49           attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
50       value_list.append(attributes)
51     return value_list
52
53   def items(self):
54     return zip(self.keys(), self.values())
55
56   def __getitem__(self, key):
57     # FIXME: this is inefficient. building the dictionary be rescanning the xml.
58     v = dict(self.items())
59     return v[key]
60
61 class IPAR(object):
62     XML_VERSION=1
63
64     # xml tag names
65     IPAR = 'IPAR'
66     TIMESTAMP = 'timestamp'
67     MATRIX = 'matrix'
68     RUN = 'Run'
69
70     def __init__(self, xml=None):
71         self.tree = None
72         self.date = datetime.datetime.today()
73         self._tiles = None
74         if xml is not None:
75             self.set_elements(xml)
76
77     def _get_time(self):
78         return time.mktime(self.date.timetuple())
79     def _set_time(self, value):
80         mtime_tuple = time.localtime(value)
81         self.date = datetime.datetime(*(mtime_tuple[0:7]))
82     time = property(_get_time, _set_time,
83                     doc='run time as seconds since epoch')
84
85     def _get_cycles(self):
86         if self.tree is None:
87           raise RuntimeError("get cycles called before xml tree initalized")
88         cycles = self.tree.find("Cycles")
89         assert cycles is not None
90         if cycles is None:
91           return None
92         return cycles.attrib
93
94     def _get_start(self):
95         """
96         return cycle start
97         """
98         cycles = self._get_cycles()
99         if cycles is not None:
100           return int(cycles['First'])
101         else:
102           return None
103     start = property(_get_start, doc="get cycle start")
104
105     def _get_stop(self):
106         """
107         return cycle stop
108         """
109         cycles = self._get_cycles()
110         if cycles is not None:
111           return int(cycles['Last'])
112         else:
113           return None
114     stop = property(_get_stop, doc="get cycle stop")
115
116     def _get_tiles(self):
117       if self._tiles is None:
118         self._tiles = Tiles(self.tree)
119       return self._tiles
120     tiles = property(_get_tiles)
121
122     def _get_version(self):
123       software = self.tree.find('Software')
124       if software is not None:
125         return software.attrib['Version']
126     version = property(_get_version, "IPAR software version")
127
128
129     def file_list(self):
130         """
131         Generate list of all files that should be generated by the IPAR unit
132         """
133         suffix_node = self.tree.find('RunParameters/CompressionSuffix')
134         if suffix_node is None:
135           print "find compression suffix failed"
136           return None
137         suffix = suffix_node.text
138         files = []
139         format = "%s_%s_%04d_%s.txt%s"
140         for lane, attrib in self.tiles.items():
141           for file_type in ["int","nse"]:
142             start, stop = attrib['TileRange']
143             for tile in range(start, stop+1):
144               files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
145         return files
146
147     def dump(self):
148         print "Matrix:", self.matrix
149         print "Tree:", self.tree
150
151     def get_elements(self):
152         attribs = {'version': str(IPAR.XML_VERSION) }
153         root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
154         timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
155         timestamp.text = str(int(self.time))
156         root.append(self.tree)
157         matrix = ElementTree.SubElement(root, IPAR.MATRIX)
158         matrix.text = self.matrix
159         return root
160
161     def set_elements(self, tree):
162         if tree.tag != IPAR.IPAR:
163             raise ValueError('Expected "IPAR" SubElements')
164         xml_version = int(tree.attrib.get('version', 0))
165         if xml_version > IPAR.XML_VERSION:
166             logging.warn('IPAR XML tree is a higher version than this class')
167         for element in list(tree):
168             if element.tag == IPAR.RUN:
169                 self.tree = element
170             elif element.tag == IPAR.TIMESTAMP:
171                 self.time = int(element.text)
172             elif element.tag == IPAR.MATRIX:
173                 self.matrix = element.text
174             else:
175                 raise ValueError("Unrecognized tag: %s" % (element.tag,))
176
177 def load_ipar_param_tree(paramfile):
178     """
179     look for a .param file and load it if it is an IPAR tree
180     """
181
182     tree = ElementTree.parse(paramfile).getroot()
183     run = tree.find('Run')
184     if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
185         return run
186     else:
187         logging.info("No run found")
188         return None
189
190 def ipar(pathname):
191     """
192     Examine the directory at pathname and initalize a IPAR object
193     """
194     logging.info("Searching IPAR directory %s" % (pathname,))
195     i = IPAR()
196     i.pathname = pathname
197
198     # parse firecrest directory name
199     path, name = os.path.split(pathname)
200     groups = name.split('_')
201     if not (groups[0] == 'IPAR' or groups[0] == 'Intensities'):
202       raise ValueError('ipar can only process IPAR directories')
203
204     bustard_pattern = os.path.join(pathname, 'Bustard*')
205     # contents of the matrix file?
206     matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
207     if os.path.exists(matrix_pathname):
208         # this is IPAR_1.01
209         i.matrix = open(matrix_pathname, 'r').read()
210     elif glob(bustard_pattern) > 0:
211         i.matrix = None
212         # its still live.
213
214     # look for parameter xml file
215     paramfiles = [os.path.join(pathname, 'config.xml'),
216                   os.path.join(path, '.params')]
217     for paramfile in paramfiles:
218         if os.path.exists(paramfile):
219             logging.info("Found IPAR Config file at: %s" % ( paramfile, ))
220             i.tree = load_ipar_param_tree(paramfile)
221             mtime_local = os.stat(paramfile)[stat.ST_MTIME]
222             i.time = mtime_local
223             return i
224
225     return i
226
227 def fromxml(tree):
228     """
229     Initialize a IPAR object from an element tree node
230     """
231     f = IPAR()
232     f.set_elements(tree)
233     return f
234
235 #if __name__ == "__main__":
236   #i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
237   #x = i.get_elements()
238   #j = fromxml(x)
239   #ElementTree.dump(x)
240   #print j.date
241   #print j.start
242   #print j.stop
243   #print i.tiles.keys()
244   #print j.tiles.keys()
245   #print j.tiles.items()
246   #print j.file_list()