from contextlib import contextmanager
+import logging
import os
from StringIO import StringIO
import shutil
import tempfile
-import unittest
+from unittest import TestCase, TestSuite, defaultTestLoader
-from htsworkflow.submission import daf
+from htsworkflow.submission import daf, results
from htsworkflow.util.rdfhelp import \
dafTermOntology, \
fromTypedNode, \
get_model, \
get_serializer
+from htsworkflow.submission.test import test_results
import RDF
test_daf = """# Lab and general info
"""
-class TestDAF(unittest.TestCase):
+class TestDAF(TestCase):
def test_parse(self):
parsed = daf.fromstring(test_daf)
ns)
test_daf_stream = StringIO(test_daf)
- mapper = daf.DAFMapper(name, daf_file = test_daf_stream, model=model)
+ mapper = daf.UCSCSubmission(name, daf_file = test_daf_stream, model=model)
return mapper
def dump_model(model):
print turtle
-class TestDAFMapper(unittest.TestCase):
+class TestUCSCSubmission(TestCase):
+ def setUp(self):
+ test_results.generate_sample_results_tree(self, 'daf_results')
+
+ def tearDown(self):
+ # see things created by temp_results.generate_sample_results_tree
+ shutil.rmtree(self.tempdir)
+
def test_create_mapper_add_pattern(self):
name = 'testsub'
mapper = load_daf_mapper(name)
# dump_model(daf_mapper.model)
view_root = 'http://jumpgate.caltech.edu/wiki/SubmissionsLog/{0}/view/'
view_root = view_root.format(name)
- self.failUnlessEqual(str(view)[1:-1],
+ self.failUnlessEqual(str(view.uri),
'{0}{1}'.format(view_root,'FastqRd1'))
def test_find_overlapping_view(self):
# server is 500 for this library
self.failUnlessEqual(gel_cut, 100)
- species = daf_mapper._get_library_attribute(libNode, 'species')
+ species = daf_mapper._get_library_attribute(libNode, 'species_name')
self.failUnlessEqual(species, "Homo sapiens")
with mktempdir('analysis') as analysis_dir:
self.failUnless('controlId' in variables)
+ def test_link_daf(self):
+ name = 'testsub'
+ submission = load_daf_mapper(name, test_daf=test_daf)
+ result_map = results.ResultMap()
+ result_dir = os.path.join(self.sourcedir,
+ test_results.S1_NAME)
+ result_map['1000'] = result_dir
+
+ submission.link_daf(result_map)
+
+ # make sure daf gets linked
+ created_daf = os.path.join(result_dir, name+'.daf')
+ self.failUnless(os.path.exists(created_daf))
+ stream = open(created_daf,'r')
+ daf_body = stream.read()
+ stream.close()
+
+ self.failUnlessEqual(test_daf, daf_body)
+
+
@contextmanager
def mktempdir(prefix='tmp'):
d = tempfile.mkdtemp(prefix=prefix)
os.close(fd)
os.unlink(pathname)
-
def suite():
- suite = unittest.makeSuite(TestDAF, 'test')
- suite.addTest(unittest.makeSuite(TestDAFMapper, 'test'))
+ suite = TestSuite()
+ suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestDAF))
+ suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestUCSCSubmission))
return suite
if __name__ == "__main__":
- unittest.main(defaultTest='suite')
+ logging.basicConfig(level=logging.DEBUG)
+ from unittest import main
+ main(defaultTest='suite')