Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / submission / test / test_daf.py
index 3d31c95807ad38753c4c49735933916c924208e8..5bc9014267cfbbb2c1536e8c21ac83d3d7f57d23 100644 (file)
@@ -1,11 +1,12 @@
 from contextlib import contextmanager
+import logging
 import os
 from StringIO import StringIO
 import shutil
 import tempfile
-import unittest
+from unittest import TestCase, TestSuite, defaultTestLoader
 
-from htsworkflow.submission import daf
+from htsworkflow.submission import daf, results
 from htsworkflow.util.rdfhelp import \
      dafTermOntology, \
      fromTypedNode, \
@@ -15,6 +16,7 @@ from htsworkflow.util.rdfhelp import \
      get_model, \
      get_serializer
 
+from htsworkflow.submission.test import test_results
 import RDF
 
 test_daf = """# Lab and general info
@@ -59,7 +61,27 @@ hasReplicates    no
 required         no
 """
 
-class TestDAF(unittest.TestCase):
+test_daf_extra = """# Lab and general info
+grant             Hardison
+lab               Caltech-m
+dataType          ChipSeq
+variables         cell,antibody,sex,age,strain
+extraVariables    controlId,treatment
+compositeSuffix   CaltechHistone
+assembly          mm9
+dafVersion        2.0
+validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
+
+# Track/view definition
+view             FastqRd1
+longLabelPrefix  Caltech Fastq Read 1
+type             fastq
+hasReplicates    no
+required         no
+"""
+
+
+class TestDAF(TestCase):
     def test_parse(self):
 
         parsed = daf.fromstring(test_daf)
@@ -75,6 +97,7 @@ class TestDAF(unittest.TestCase):
         self.failUnlessEqual(signal['longLabelPrefix'],
                              'Caltech Histone Signal')
 
+
     def test_rdf(self):
 
         parsed = daf.fromstring(test_daf)
@@ -134,7 +157,7 @@ def load_daf_mapper(name, extra_statements=None, ns=None, test_daf=test_daf):
                                        ns)
 
     test_daf_stream = StringIO(test_daf)
-    mapper = daf.DAFMapper(name, daf_file = test_daf_stream, model=model)
+    mapper = daf.UCSCSubmission(name, daf_file = test_daf_stream, model=model)
     return mapper
 
 def dump_model(model):
@@ -142,7 +165,15 @@ def dump_model(model):
     turtle =  writer.serialize_model_to_string(model)
     print turtle
 
-class TestDAFMapper(unittest.TestCase):
+
+class TestUCSCSubmission(TestCase):
+    def setUp(self):
+        test_results.generate_sample_results_tree(self, 'daf_results')
+
+    def tearDown(self):
+        # see things created by temp_results.generate_sample_results_tree
+        shutil.rmtree(self.tempdir)
+
     def test_create_mapper_add_pattern(self):
         name = 'testsub'
         mapper = load_daf_mapper(name)
@@ -176,7 +207,7 @@ thisView:FastqRd1 dafTerm:filename_re ".*_r1\\\\.fastq" .
         # dump_model(daf_mapper.model)
         view_root = 'http://jumpgate.caltech.edu/wiki/SubmissionsLog/{0}/view/'
         view_root = view_root.format(name)
-        self.failUnlessEqual(str(view)[1:-1],
+        self.failUnlessEqual(str(view.uri),
                              '{0}{1}'.format(view_root,'FastqRd1'))
 
     def test_find_overlapping_view(self):
@@ -217,7 +248,7 @@ thisView:FastqRd1 dafTerm:filename_re ".*\\\\.fastq" ;
         # server is 500 for this library
         self.failUnlessEqual(gel_cut, 100)
 
-        species = daf_mapper._get_library_attribute(libNode, 'species')
+        species = daf_mapper._get_library_attribute(libNode, 'species_name')
         self.failUnlessEqual(species, "Homo sapiens")
 
         with mktempdir('analysis') as analysis_dir:
@@ -257,28 +288,55 @@ thisView:FastqRd1 dafTerm:filename_re ".*\\\\.fastq" ;
         self.failUnlessEqual(daf_mapper.need_replicate(), False)
         self.failUnless('replicate' not in daf_mapper.get_daf_variables())
 
+    def test_daf_with_extra(self):
+        daf_mapper = load_daf_mapper('test_rep',test_daf=test_daf_extra)
+        variables = daf_mapper.get_daf_variables()
+        self.assertEqual(len(variables), 11)
+        self.failUnless('treatment' in variables)
+        self.failUnless('controlId' in variables)
+
+
+    def test_link_daf(self):
+        name = 'testsub'
+        submission = load_daf_mapper(name, test_daf=test_daf)
+        result_map = results.ResultMap()
+        result_dir = os.path.join(self.sourcedir,
+                                  test_results.S1_NAME)
+        result_map['1000'] = result_dir
+
+        submission.link_daf(result_map)
+
+        # make sure daf gets linked
+        created_daf = os.path.join(result_dir, name+'.daf')
+        self.failUnless(os.path.exists(created_daf))
+        stream = open(created_daf,'r')
+        daf_body = stream.read()
+        stream.close()
+
+        self.failUnlessEqual(test_daf, daf_body)
+
+
 @contextmanager
 def mktempdir(prefix='tmp'):
     d = tempfile.mkdtemp(prefix=prefix)
-    print "made", d
     yield d
     shutil.rmtree(d)
-    print "unmade", d
+
 
 @contextmanager
 def mktempfile(suffix='', prefix='tmp', dir=None):
     fd, pathname = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
     yield pathname
-    print "made", pathname
     os.close(fd)
     os.unlink(pathname)
-    print "unmade", pathname
-
 
 def suite():
-    suite = unittest.makeSuite(TestDAF, 'test')
-    suite.addTest(unittest.makeSuite(TestDAFMapper, 'test'))
+    suite = TestSuite()
+    suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestDAF))
+    suite.addTests(defaultTestLoader.loadTestsFromTestCase(TestUCSCSubmission))
     return suite
 
 if __name__ == "__main__":
-    unittest.main(defaultTest='suite')
+    logging.basicConfig(level=logging.DEBUG)
+    from unittest import main
+    main(defaultTest='suite')