"""
Extract information about the IPAR run
-IPAR
+IPAR
class holding the properties we found
ipar
IPAR factory function initalized from a directory name
import stat
import time
-from htsworkflow.pipelines.runfolder import \
+from htsworkflow.pipelines import \
ElementTree, \
VERSION_RE, \
EUROPEAN_STRPTIME
+LOGGER = logging.getLogger(__name__)
SOFTWARE_NAMES = ('IPAR_1.01', 'IPAR_1.3', 'Intensities')
class Tiles(object):
return value_list
def items(self):
- return zip(self.keys(), self.values())
+ return list(zip(list(self.keys()), list(self.values())))
def __getitem__(self, key):
# FIXME: this is inefficient. building the dictionary be rescanning the xml.
- v = dict(self.items())
+ v = dict(list(self.items()))
return v[key]
class IPAR(object):
def __init__(self, xml=None):
self.tree = None
self.date = datetime.datetime.today()
- self._tiles = None
+ self._tiles = None
if xml is not None:
self.set_elements(xml)
+ def _get_software(self):
+ """Return software name"""
+ if self.tree is None:
+ raise ValueError("Can't determine software name, please load a run")
+ software = self.tree.xpath('Software')
+ if len(software) == 0:
+ return None
+ elif len(software) > 1:
+ raise RuntimeError("Too many software tags, please update ipar.py")
+ else:
+ return software[0].attrib['Name']
+ software = property(_get_software)
+
def _get_time(self):
return time.mktime(self.date.timetuple())
def _set_time(self, value):
"""
suffix_node = self.tree.find('RunParameters/CompressionSuffix')
if suffix_node is None:
- print "find compression suffix failed"
+ print("find compression suffix failed")
return None
suffix = suffix_node.text
files = []
format = "%s_%s_%04d_%s.txt%s"
- for lane, attrib in self.tiles.items():
+ for lane, attrib in list(self.tiles.items()):
for file_type in ["int","nse"]:
start, stop = attrib['TileRange']
for tile in range(start, stop+1):
return files
def dump(self):
- print "Matrix:", self.matrix
- print "Tree:", self.tree
+ print("Matrix:", self.matrix)
+ print("Tree:", self.tree)
def get_elements(self):
attribs = {'version': str(IPAR.XML_VERSION) }
raise ValueError('Expected "IPAR" SubElements')
xml_version = int(tree.attrib.get('version', 0))
if xml_version > IPAR.XML_VERSION:
- logging.warn('IPAR XML tree is a higher version than this class')
+ LOGGER.warn('IPAR XML tree is a higher version than this class')
for element in list(tree):
if element.tag == IPAR.RUN:
self.tree = element
tree = ElementTree.parse(paramfile).getroot()
run = tree.find('Run')
- if run.attrib.has_key('Name') and run.attrib['Name'] in SOFTWARE_NAMES:
+ if 'Name' in run.attrib and run.attrib['Name'] in SOFTWARE_NAMES:
return run
else:
- logging.info("No run found")
+ LOGGER.info("No run found")
return None
def ipar(pathname):
"""
Examine the directory at pathname and initalize a IPAR object
"""
- logging.info("Searching IPAR directory %s" % (pathname,))
+ LOGGER.info("Searching IPAR directory %s" % (pathname,))
i = IPAR()
i.pathname = pathname
# its still live.
# look for parameter xml file
- paramfiles = [os.path.join(pathname, 'config.xml'),
+ paramfiles = [os.path.join(pathname, 'RTAConfig.xml'),
+ os.path.join(pathname, 'config.xml'),
os.path.join(path, '.params')]
for paramfile in paramfiles:
if os.path.exists(paramfile):
- logging.info("Found IPAR Config file at: %s" % ( paramfile, ))
+ LOGGER.info("Found IPAR Config file at: %s" % ( paramfile, ))
i.tree = load_ipar_param_tree(paramfile)
mtime_local = os.stat(paramfile)[stat.ST_MTIME]
i.time = mtime_local