2 Extract information about the IPAR run
5 class holding the properties we found
7 IPAR factory function initalized from a directory name
9 IPAR factory function initalized from an xml dump from
12 from __future__ import print_function
14 __docformat__ = "restructuredtext en"
24 from htsworkflow.pipelines import \
29 LOGGER = logging.getLogger(__name__)
30 SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities')
33 def __init__(self, tree):
34 self.tree = tree.find("TileSelection")
38 for c in self.tree.getchildren():
39 k = c.attrib.get('Index', None)
46 for lane in self.tree.getchildren():
48 for child in lane.getchildren():
49 if child.tag == "Sample":
50 attributes['Sample'] = child.text
51 elif child.tag == 'TileRange':
52 attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
53 value_list.append(attributes)
57 return zip(self.keys(), self.values())
59 def __getitem__(self, key):
60 # FIXME: this is inefficient. building the dictionary be rescanning the xml.
61 v = dict(self.items())
69 TIMESTAMP = 'timestamp'
73 def __init__(self, xml=None):
75 self.date = datetime.datetime.today()
78 self.set_elements(xml)
80 def _get_runfolder_name(self):
81 """Return runfolder name"""
83 raise ValueError("Can't query an empty run")
84 runfolder = self.tree.xpath('RunParameters/RunFolder')
85 if len(runfolder) == 0:
87 elif len(runfolder) > 1:
88 raise RuntimeError("RunXml parse error looking for RunFolder")
90 return runfolder[0].text
91 runfolder_name = property(_get_runfolder_name)
93 def _get_software(self):
94 """Return software name"""
96 raise ValueError("Can't determine software name, please load a run")
97 software = self.tree.xpath('Software')
98 if len(software) == 0:
100 elif len(software) > 1:
101 raise RuntimeError("Too many software tags, please update ipar.py")
103 return software[0].attrib['Name']
104 software = property(_get_software)
107 return time.mktime(self.date.timetuple())
108 def _set_time(self, value):
109 mtime_tuple = time.localtime(value)
110 self.date = datetime.datetime(*(mtime_tuple[0:7]))
111 time = property(_get_time, _set_time,
112 doc='run time as seconds since epoch')
114 def _get_cycles(self):
115 if self.tree is None:
116 raise RuntimeError("get cycles called before xml tree initalized")
117 cycles = self.tree.find("Cycles")
118 assert cycles is not None
123 def _get_start(self):
127 cycles = self._get_cycles()
128 if cycles is not None:
129 return int(cycles['First'])
132 start = property(_get_start, doc="get cycle start")
138 cycles = self._get_cycles()
139 if cycles is not None:
140 return int(cycles['Last'])
143 stop = property(_get_stop, doc="get cycle stop")
145 def _get_tiles(self):
146 if self._tiles is None:
147 self._tiles = Tiles(self.tree)
149 tiles = property(_get_tiles)
151 def _get_version(self):
152 software = self.tree.find('Software')
153 if software is not None:
154 return software.attrib['Version']
155 version = property(_get_version, "IPAR software version")
160 Generate list of all files that should be generated by the IPAR unit
162 suffix_node = self.tree.find('RunParameters/CompressionSuffix')
163 if suffix_node is None:
164 print("find compression suffix failed")
166 suffix = suffix_node.text
168 format = "%s_%s_%04d_%s.txt%s"
169 for lane, attrib in self.tiles.items():
170 for file_type in ["int","nse"]:
171 start, stop = attrib['TileRange']
172 for tile in range(start, stop+1):
173 files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
177 print("Matrix:", self.matrix)
178 print("Tree:", self.tree)
180 def get_elements(self):
181 attribs = {'version': str(IPAR.XML_VERSION) }
182 root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
183 timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
184 timestamp.text = str(int(self.time))
185 root.append(self.tree)
186 matrix = ElementTree.SubElement(root, IPAR.MATRIX)
187 matrix.text = self.matrix
190 def set_elements(self, tree):
191 if tree.tag != IPAR.IPAR:
192 raise ValueError('Expected "IPAR" SubElements')
193 xml_version = int(tree.attrib.get('version', 0))
194 if xml_version > IPAR.XML_VERSION:
195 LOGGER.warn('IPAR XML tree is a higher version than this class')
196 for element in list(tree):
197 if element.tag == IPAR.RUN:
199 elif element.tag == IPAR.TIMESTAMP:
200 self.time = int(element.text)
201 elif element.tag == IPAR.MATRIX:
202 self.matrix = element.text
204 raise ValueError("Unrecognized tag: %s" % (element.tag,))
206 def load_ipar_param_tree(paramfile):
208 look for a .param file and load it if it is an IPAR tree
211 tree = ElementTree.parse(paramfile).getroot()
212 run = tree.find('Run')
213 if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
216 LOGGER.info("No run found")
221 Examine the directory at pathname and initalize a IPAR object
223 LOGGER.info("Searching IPAR directory %s" % (pathname,))
225 i.pathname = pathname
227 # parse firecrest directory name
228 path, name = os.path.split(pathname)
229 groups = name.split('_')
230 if not (groups[0] == 'IPAR' or groups[0] == 'Intensities'):
231 raise ValueError('ipar can only process IPAR directories')
233 bustard_pattern = os.path.join(pathname, 'Bustard*')
234 # contents of the matrix file?
235 matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
236 if os.path.exists(matrix_pathname):
238 i.matrix = open(matrix_pathname, 'r').read()
239 elif glob(bustard_pattern) > 0:
243 # look for parameter xml file
244 paramfiles = [os.path.join(pathname, 'RTAConfig.xml'),
245 os.path.join(pathname, 'config.xml'),
246 os.path.join(path, '.params')]
247 for paramfile in paramfiles:
248 if os.path.exists(paramfile):
249 LOGGER.info("Found IPAR Config file at: %s" % ( paramfile, ))
250 i.tree = load_ipar_param_tree(paramfile)
251 mtime_local = os.stat(paramfile)[stat.ST_MTIME]
259 Initialize a IPAR object from an element tree node
265 #if __name__ == "__main__":
266 #i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
267 #x = i.get_elements()
273 #print i.tiles.keys()
274 #print j.tiles.keys()
275 #print j.tiles.items()