Add support for scanning for results in the IPAR directory.
[htsworkflow.git] / htsworkflow / pipelines / ipar.py
diff --git a/htsworkflow/pipelines/ipar.py b/htsworkflow/pipelines/ipar.py
new file mode 100644 (file)
index 0000000..a559229
--- /dev/null
@@ -0,0 +1,225 @@
+"""
+Extract information about the IPAR run
+
+IPAR - class holding the properties we found
+IPAR - IPAR factory function initalized from a directory name
+fromxml - IPAR factory function initalized from an xml dump from
+          the IPAR object.
+"""
+
+import datetime
+import logging
+import os
+import re
+import stat
+import time
+
+from htsworkflow.pipelines.runfolder import \
+   ElementTree, \
+   VERSION_RE, \
+   EUROPEAN_STRPTIME
+
+class Tiles(object):
+  def __init__(self, tree):
+    self.tree = tree.find("TileSelection")
+
+  def keys(self):
+    key_list = []
+    for c in self.tree.getchildren():
+      k = c.attrib.get('Index', None)
+      if k is not None:
+        key_list.append(k)
+    return key_list
+
+  def values(self):
+    value_list = []
+    for lane in self.tree.getchildren():
+      attributes = {}
+      for child in lane.getchildren():
+        if child.tag == "Sample":
+          attributes['Sample'] = child.text
+        elif child.tag == 'TileRange':
+          attributes['TileRange'] = (int(child.attrib['Min']),int(child.attrib['Max']))
+      value_list.append(attributes)
+    return value_list
+
+  def items(self):
+    return zip(self.keys(), self.values())
+
+  def __getitem__(self, key):
+    # FIXME: this is inefficient. building the dictionary be rescanning the xml.
+    v = dict(self.items())
+    return v[key]
+
+class IPAR(object):
+    XML_VERSION=1
+
+    # xml tag names
+    IPAR = 'IPAR'
+    TIMESTAMP = 'timestamp'
+    MATRIX = 'matrix'
+    RUN = 'Run'
+
+    def __init__(self, xml=None):
+        self.tree = None
+        self.date = datetime.datetime.today()
+       self._tiles = None
+        if xml is not None:
+            self.set_elements(xml)
+
+    def _get_time(self):
+        return time.mktime(self.date.timetuple())
+    def _set_time(self, value):
+        mtime_tuple = time.localtime(value)
+        self.date = datetime.datetime(*(mtime_tuple[0:7]))
+    time = property(_get_time, _set_time,
+                    doc='run time as seconds since epoch')
+
+    def _get_cycles(self):
+        if self.tree is None:
+          return None
+        cycles = self.tree.find("Cycles")
+        if cycles is None:
+          return None
+        return cycles.attrib
+
+    def _get_start(self):
+        """
+        return cycle start
+        """
+        cycles = self._get_cycles()
+        if cycles is not None:
+          return int(cycles['First'])
+        else:
+          return None
+    start = property(_get_start, doc="get cycle start")
+
+    def _get_stop(self):
+        """
+        return cycle stop
+        """
+        cycles = self._get_cycles()
+        if cycles is not None:
+          return int(cycles['Last'])
+        else:
+          return None
+    stop = property(_get_stop, doc="get cycle stop")
+
+    def _get_tiles(self):
+      if self._tiles is None:
+        self._tiles = Tiles(self.tree)
+      return self._tiles
+    tiles = property(_get_tiles)
+
+    def _get_version(self):
+      software = self.tree.find('Software')
+      if software is not None:
+        return software.attrib['Version']
+    version = property(_get_version, "IPAR software version")
+
+
+    def file_list(self):
+        """
+        Generate list of all files that should be generated by the IPAR unit
+        """
+        suffix_node = self.tree.find('RunParameters/CompressionSuffix')
+        if suffix_node is None:
+          print "find compression suffix failed"
+          return None
+        suffix = suffix_node.text
+        files = []
+        format = "%s_%s_%04d_%s.txt%s"
+        for lane, attrib in self.tiles.items():
+          for file_type in ["int","nse"]:
+            start, stop = attrib['TileRange']
+            for tile in range(start, stop+1):
+              files.append(format % (attrib['Sample'], lane, tile, file_type, suffix))
+        return files
+
+    def dump(self):
+        print "Matrix:", self.matrix
+        print "Tree:", self.tree
+
+    def get_elements(self):
+        attribs = {'version': str(IPAR.XML_VERSION) }
+        root = ElementTree.Element(IPAR.IPAR, attrib=attribs)
+        timestamp = ElementTree.SubElement(root, IPAR.TIMESTAMP)
+        timestamp.text = str(int(self.time))
+        root.append(self.tree)
+        matrix = ElementTree.SubElement(root, IPAR.MATRIX)
+        matrix.text = self.matrix
+        return root
+
+    def set_elements(self, tree):
+        if tree.tag != IPAR.IPAR:
+            raise ValueError('Expected "IPAR" SubElements')
+        xml_version = int(tree.attrib.get('version', 0))
+        if xml_version > IPAR.XML_VERSION:
+            logging.warn('IPAR XML tree is a higher version than this class')
+        for element in list(tree):
+            if element.tag == IPAR.RUN:
+                self.tree = element
+            elif element.tag == IPAR.TIMESTAMP:
+               self.time = int(element.text)
+            elif element.tag == IPAR.MATRIX:
+                self.matrix = element.text
+            else:
+                raise ValueError("Unrecognized tag: %s" % (element.tag,))
+
+def load_ipar_param_tree(paramfile):
+    """
+    look for a .param file and load it if it is an IPAR tree
+    """
+
+    tree = ElementTree.parse(paramfile).getroot()
+    run = tree.find('Run')
+    if run.attrib.has_key('Name') and run.attrib['Name'].startswith("IPAR"):
+        return run
+
+    return None
+
+def ipar(pathname):
+    """
+    Examine the directory at pathname and initalize a IPAR object
+    """
+    logging.info("Searching IPAR directory")
+    i = IPAR()
+
+    # parse firecrest directory name
+    path, name = os.path.split(pathname)
+    groups = name.split('_')
+    if groups[0] != 'IPAR':
+      raise ValueError('ipar can only process IPAR directories')
+
+    # contents of the matrix file?
+    matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt')
+    i.matrix = open(matrix_pathname, 'r').read()
+
+    # look for parameter xml file
+    paramfile = os.path.join(path, '.params')
+    if os.path.exists(paramfile):
+      i.tree = load_ipar_param_tree(paramfile)
+      mtime_local = os.stat(paramfile)[stat.ST_MTIME]
+      i.time = mtime_local
+    return i
+
+def fromxml(tree):
+    """
+    Initialize a IPAR object from an element tree node
+    """
+    f = IPAR()
+    f.set_elements(tree)
+    return f
+
+if __name__ == "__main__":
+  i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01'))
+  x = i.get_elements()
+  j = fromxml(x)
+  #ElementTree.dump(x)
+  print j.date
+  print j.start
+  print j.stop
+  print i.tiles.keys()
+  print j.tiles.keys()
+  print j.tiles.items()
+  print j.file_list()
\ No newline at end of file