--- /dev/null
+"""Test wrappers around ucsc file formats
+"""
+import os
+from unittest2 import TestCase
+from htsworkflow.util.test import TEST_DATA_DIR
+from htsworkflow.util.ucsc import bigWigInfo
+
+from distutils.spawn import find_executable
+
+class TestUCSC(TestCase):
+ def test_bigwig_info(self):
+ if not find_executable('bigWigInfo'):
+ self.skipTest('Need bigWigInfo on path to test')
+
+ filename = os.path.join(TEST_DATA_DIR, 'foo.bigWig')
+ info = bigWigInfo(filename)
+ self.assertEqual(info.version, 4)
+ self.assertEqual(info.isCompressed, True)
+ # what should i do for byteswapped arch?
+ self.assertEqual(info.isSwapped, True)
+ self.assertEqual(info.primaryDataSize, 48)
+ self.assertEqual(info.primaryIndexSize, 6204)
+ self.assertEqual(info.zoomLevels, 2)
+ self.assertEqual(info.basesCovered, 30)
+ self.assertAlmostEqual(info.mean, 0.0)
+ self.assertAlmostEqual(info.min, -5.5)
+ self.assertAlmostEqual(info.max, 5.5)
+ self.assertAlmostEqual(info.std, 4.567501)
+
--- /dev/null
+"""Wrap ucsc command line utilities
+"""
+
+import logging
+import os
+from subprocess import Popen, PIPE
+
+LOGGER = logging.getLogger(__name__)
+
+def parseNumber(number):
+ buffer = []
+ isFloat = False
+ for n in number:
+ if n == ',':
+ continue
+ if n == '.':
+ isFloat = True
+ buffer.append(n)
+ else:
+ buffer.append(n)
+ if isFloat:
+ return float(''.join(buffer))
+ else:
+ return int(''.join(buffer))
+
+def parseBoolean(value):
+ if value.lower() in ('yes', '1', 'true'):
+ return True
+ elif value.lower() in ('no', '0', 'false'):
+ return False
+
+class bigWigInfo:
+ def __init__(self, filename=None):
+ self.version = None
+ self.isCompressed = None
+ self.isSwapped = None
+ self.primaryDataSize = None
+ self.primaryIndexSize = None
+ self.zoomLevels = None
+ self.chromCount = None
+ self.basesCovered = None
+ self.mean = None
+ self.min = None
+ self.max = None
+ self.std = None
+ self.filename = None
+ if filename:
+ self.scan_file(filename)
+ self.filename = filename
+
+ def scan_file(self, filename):
+ cmd = ['bigWigInfo',
+ filename]
+ p = Popen(cmd, stdout=PIPE)
+ stdout, _ = p.communicate()
+ for line in stdout.split(os.linesep):
+ if len(line) > 0:
+ term, value = line.split(': ')
+ if term in ('isCompressed', 'isSwapped'):
+ value = parseBoolean(value)
+ else:
+ value = parseNumber(value)
+ LOGGER.debug('%s: %s', term, str(value))
+ setattr(self, term, value)
+
+
+