2 Extract information about the IPAR run
5 class holding the properties we found
7 IPAR factory function initalized from a directory name
9 IPAR factory function initalized from an xml dump from
12 __docformat__ = "restructuredtext en"
22 from htsworkflow.pipelines.runfolder import \
28 def __init__(self, tree):
29 self.tree = tree.find("TileSelection")
33 for c in self.tree.getchildren():
34 k = c.attrib.get('Index', None)
41 for lane in self.tree.getchildren():
43 for child in lane.getchildren():
44 if child.tag == "Sample":
45 attributes['Sample'] = child.text
46 elif child.tag == 'TileRange':
47 attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
48 value_list.append(attributes)
52 return zip(self.keys(), self.values())
54 def __getitem__(self, key):
55 # FIXME: this is inefficient. building the dictionary be rescanning the xml.
56 v = dict(self.items())
64 TIMESTAMP = 'timestamp'
68 def __init__(self, xml=None):
70 self.date = datetime.datetime.today()
73 self.set_elements(xml)
76 return time.mktime(self.date.timetuple())
77 def _set_time(self, value):
78 mtime_tuple = time.localtime(value)
79 self.date = datetime.datetime(*(mtime_tuple[0:7]))
80 time = property(_get_time, _set_time,
81 doc='run time as seconds since epoch')
83 def _get_cycles(self):
86 cycles = self.tree.find("Cycles")
95 cycles = self._get_cycles()
96 if cycles is not None:
97 return int(cycles['First'])
100 start = property(_get_start, doc="get cycle start")
106 cycles = self._get_cycles()
107 if cycles is not None:
108 return int(cycles['Last'])
111 stop = property(_get_stop, doc="get cycle stop")
113 def _get_tiles(self):
114 if self._tiles is None:
115 self._tiles = Tiles(self.tree)
117 tiles = property(_get_tiles)
119 def _get_version(self):
120 software = self.tree.find('Software')
121 if software is not None:
122 return software.attrib['Version']
123 version = property(_get_version, "IPAR software version")
128 Generate list of all files that should be generated by the IPAR unit
130 suffix_node = self.tree.find('RunParameters/CompressionSuffix')
131 if suffix_node is None:
132 print "find compression suffix failed"
134 suffix = suffix_node.text
136 format = "%s_%s_%04d_%s.txt%s"
137 for lane, attrib in self.tiles.items():
138 for file_type in ["int","nse"]:
139 start, stop = attrib['TileRange']
140 for tile in range(start, stop+1):
141 files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
145 print "Matrix:", self.matrix
146 print "Tree:", self.tree
148 def get_elements(self):
149 attribs = {'version': str(IPAR.XML_VERSION) }
150 root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
151 timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
152 timestamp.text = str(int(self.time))
153 root.append(self.tree)
154 matrix = ElementTree.SubElement(root, IPAR.MATRIX)
155 matrix.text = self.matrix
158 def set_elements(self, tree):
159 if tree.tag != IPAR.IPAR:
160 raise ValueError('Expected "IPAR" SubElements')
161 xml_version = int(tree.attrib.get('version', 0))
162 if xml_version > IPAR.XML_VERSION:
163 logging.warn('IPAR XML tree is a higher version than this class')
164 for element in list(tree):
165 if element.tag == IPAR.RUN:
167 elif element.tag == IPAR.TIMESTAMP:
168 self.time = int(element.text)
169 elif element.tag == IPAR.MATRIX:
170 self.matrix = element.text
172 raise ValueError("Unrecognized tag: %s" % (element.tag,))
174 def load_ipar_param_tree(paramfile):
176 look for a .param file and load it if it is an IPAR tree
179 tree = ElementTree.parse(paramfile).getroot()
180 run = tree.find('Run')
181 if run.attrib.has_key('Name') and run.attrib['Name'].startswith("IPAR"):
188 Examine the directory at pathname and initalize a IPAR object
190 logging.info("Searching IPAR directory")
193 # parse firecrest directory name
194 path, name = os.path.split(pathname)
195 groups = name.split('_')
196 if groups[0] != 'IPAR':
197 raise ValueError('ipar can only process IPAR directories')
199 bustard_pattern = os.path.join(pathname, 'Bustard*')
200 # contents of the matrix file?
201 matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
202 if os.path.exists(matrix_pathname):
204 i.matrix = open(matrix_pathname, 'r').read()
205 elif glob(bustard_pattern) > 0:
211 # look for parameter xml file
212 paramfile = os.path.join(path, '.params')
213 if os.path.exists(paramfile):
214 i.tree = load_ipar_param_tree(paramfile)
215 mtime_local = os.stat(paramfile)[stat.ST_MTIME]
221 Initialize a IPAR object from an element tree node
227 if __name__ == "__main__":
228 i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
237 print j.tiles.items()