import tempfile
import unittest
-from htsworkflow.submission import daf
+from htsworkflow.submission import daf, results
from htsworkflow.util.rdfhelp import \
dafTermOntology, \
fromTypedNode, \
get_model, \
get_serializer
+from htsworkflow.submission.test import test_results
import RDF
test_daf = """# Lab and general info
ns)
test_daf_stream = StringIO(test_daf)
- mapper = daf.DAFMapper(name, daf_file = test_daf_stream, model=model)
+ mapper = daf.UCSCSubmission(name, daf_file = test_daf_stream, model=model)
return mapper
def dump_model(model):
print turtle
-class TestDAFMapper(unittest.TestCase):
+class TestUCSCSubmission(unittest.TestCase):
+ def setUp(self):
+ test_results.generate_sample_results_tree(self)
+
+ def tearDown(self):
+ # see things created by temp_results.generate_sample_results_tree
+ shutil.rmtree(self.tempdir)
+
def test_create_mapper_add_pattern(self):
name = 'testsub'
mapper = load_daf_mapper(name)
self.failUnless('controlId' in variables)
+ def test_link_daf(self):
+ name = 'testsub'
+ submission = load_daf_mapper(name, test_daf=test_daf)
+ result_map = results.ResultMap()
+ result_dir = os.path.join(self.sourcedir,
+ test_results.S1_NAME)
+ result_map.add_result('1000', result_dir)
+
+ submission.link_daf(result_map)
+
+ # make sure daf gets linked
+ created_daf = os.path.join(result_dir, name+'.daf')
+ self.failUnless(os.path.exists(created_daf))
+ stream = open(created_daf,'r')
+ daf_body = stream.read()
+ stream.close()
+
+ self.failUnlessEqual(test_daf, daf_body)
+
+
@contextmanager
def mktempdir(prefix='tmp'):
d = tempfile.mkdtemp(prefix=prefix)
def suite():
suite = unittest.makeSuite(TestDAF, 'test')
- suite.addTest(unittest.makeSuite(TestDAFMapper, 'test'))
+ suite.addTest(unittest.makeSuite(TestUCSCSubmission, 'test'))
return suite
if __name__ == "__main__":