X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=blobdiff_plain;f=htsworkflow%2Fpipelines%2Fipar.py;h=8203f5e0f29fda173728e3f504522f6cd579520f;hp=a559229ee83957068c8c96dcc3f538980e4fc9b4;hb=5644e2bfe7c1f7e5af99289cd918f783b61c441a;hpb=e5fd73d7bdcc8bd091898756674b38261ad5803b diff --git a/htsworkflow/pipelines/ipar.py b/htsworkflow/pipelines/ipar.py index a559229..8203f5e 100644 --- a/htsworkflow/pipelines/ipar.py +++ b/htsworkflow/pipelines/ipar.py @@ -1,24 +1,32 @@ """ Extract information about the IPAR run -IPAR - class holding the properties we found -IPAR - IPAR factory function initalized from a directory name -fromxml - IPAR factory function initalized from an xml dump from - the IPAR object. +IPAR + class holding the properties we found +ipar + IPAR factory function initalized from a directory name +fromxml + IPAR factory function initalized from an xml dump from + the IPAR object. """ +__docformat__ = "restructuredtext en" import datetime +from glob import glob import logging import os import re import stat import time -from htsworkflow.pipelines.runfolder import \ +from htsworkflow.pipelines import \ ElementTree, \ VERSION_RE, \ EUROPEAN_STRPTIME +LOGGER = logging.getLogger(__name__) +SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities') + class Tiles(object): def __init__(self, tree): self.tree = tree.find("TileSelection") @@ -63,10 +71,31 @@ class IPAR(object): def __init__(self, xml=None): self.tree = None self.date = datetime.datetime.today() - self._tiles = None + self._tiles = None if xml is not None: self.set_elements(xml) + def _get_runfolder_name(self): + """Return runfolder name""" + if self.tree is None: + raise ValueError("Can't query an empty run") + runfolder = self.tree.xpath('RunParameters/Runfolder') + return runfolder + runfolder_name = property(_get_runfolder) + + def _get_software(self): + """Return software name""" + if self.tree is None: + raise ValueError("Can't determine software name, please load a run") + software = self.tree.xpath('Software') + if len(software) == 0: + return None + elif len(software) > 1: + raise RuntimeError("Too many software tags, please update ipar.py") + else: + return software[0].attrib['Name'] + software = property(_get_software) + def _get_time(self): return time.mktime(self.date.timetuple()) def _set_time(self, value): @@ -77,8 +106,9 @@ class IPAR(object): def _get_cycles(self): if self.tree is None: - return None + raise RuntimeError("get cycles called before xml tree initalized") cycles = self.tree.find("Cycles") + assert cycles is not None if cycles is None: return None return cycles.attrib @@ -155,7 +185,7 @@ class IPAR(object): raise ValueError('Expected "IPAR" SubElements') xml_version = int(tree.attrib.get('version', 0)) if xml_version > IPAR.XML_VERSION: - logging.warn('IPAR XML tree is a higher version than this class') + LOGGER.warn('IPAR XML tree is a higher version than this class') for element in list(tree): if element.tag == IPAR.RUN: self.tree = element @@ -173,34 +203,48 @@ def load_ipar_param_tree(paramfile): tree = ElementTree.parse(paramfile).getroot() run = tree.find('Run') - if run.attrib.has_key('Name') and run.attrib['Name'].startswith("IPAR"): + if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES: return run - - return None + else: + LOGGER.info("No run found") + return None def ipar(pathname): """ Examine the directory at pathname and initalize a IPAR object """ - logging.info("Searching IPAR directory") + LOGGER.info("Searching IPAR directory %s" % (pathname,)) i = IPAR() + i.pathname = pathname # parse firecrest directory name path, name = os.path.split(pathname) groups = name.split('_') - if groups[0] != 'IPAR': + if not (groups[0] == 'IPAR' or groups[0] == 'Intensities'): raise ValueError('ipar can only process IPAR directories') + bustard_pattern = os.path.join(pathname, 'Bustard*') # contents of the matrix file? matrix_pathname = os.path.join(pathname, 'Matrix', 's_matrix.txt') - i.matrix = open(matrix_pathname, 'r').read() + if os.path.exists(matrix_pathname): + # this is IPAR_1.01 + i.matrix = open(matrix_pathname, 'r').read() + elif glob(bustard_pattern) > 0: + i.matrix = None + # its still live. # look for parameter xml file - paramfile = os.path.join(path, '.params') - if os.path.exists(paramfile): - i.tree = load_ipar_param_tree(paramfile) - mtime_local = os.stat(paramfile)[stat.ST_MTIME] - i.time = mtime_local + paramfiles = [os.path.join(pathname, 'RTAConfig.xml'), + os.path.join(pathname, 'config.xml'), + os.path.join(path, '.params')] + for paramfile in paramfiles: + if os.path.exists(paramfile): + LOGGER.info("Found IPAR Config file at: %s" % ( paramfile, )) + i.tree = load_ipar_param_tree(paramfile) + mtime_local = os.stat(paramfile)[stat.ST_MTIME] + i.time = mtime_local + return i + return i def fromxml(tree): @@ -211,15 +255,15 @@ def fromxml(tree): f.set_elements(tree) return f -if __name__ == "__main__": - i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01')) - x = i.get_elements() - j = fromxml(x) +#if __name__ == "__main__": + #i = ipar(os.path.expanduser('~/gec/081021_HWI-EAS229_0063_30HKUAAXX/Data/IPAR_1.01')) + #x = i.get_elements() + #j = fromxml(x) #ElementTree.dump(x) - print j.date - print j.start - print j.stop - print i.tiles.keys() - print j.tiles.keys() - print j.tiles.items() - print j.file_list() \ No newline at end of file + #print j.date + #print j.start + #print j.stop + #print i.tiles.keys() + #print j.tiles.keys() + #print j.tiles.items() + #print j.file_list()