Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_runfolder_rta1_12.py
index ed225bfd36b9ca4dbf5cf618e04eddd50d3af1af..c22535b6bcbf7da056d621e92bc4b9bce1c750d1 100644 (file)
@@ -5,14 +5,15 @@ import logging
 import os
 import tempfile
 import shutil
-import unittest
+from unittest import TestCase
 
 from htsworkflow.pipelines import eland
+from htsworkflow.pipelines.samplekey import SampleKey
 from htsworkflow.pipelines import ipar
 from htsworkflow.pipelines import bustard
 from htsworkflow.pipelines import gerald
 from htsworkflow.pipelines import runfolder
-from htsworkflow.pipelines.runfolder import ElementTree
+from htsworkflow.pipelines import ElementTree
 
 from htsworkflow.pipelines.test.simulate_runfolder import *
 
@@ -25,10 +26,13 @@ def make_runfolder(obj=None):
     flowcell_id = 'D07K6ACXX'
     temp_dir = tempfile.mkdtemp(prefix='tmp_runfolder_')
 
-    runfolder_dir = os.path.join(temp_dir,
-                                 '110815_SN787_0101_A{0}'.format(flowcell_id))
+    runfolder_dir = os.path.join(
+        temp_dir,
+        '110815_SN787_0101_A{0}'.format(flowcell_id))
     os.mkdir(runfolder_dir)
 
+    make_runinfo(runfolder_dir, flowcell_id)
+
     data_dir = os.path.join(runfolder_dir, 'Data')
     os.mkdir(data_dir)
 
@@ -42,6 +46,7 @@ def make_runfolder(obj=None):
     os.mkdir(unaligned_dir)
     make_unaligned_fastqs_1_12(unaligned_dir, flowcell_id)
     make_unaligned_config_1_12(unaligned_dir)
+    make_unaligned_status_1_12(unaligned_dir, flowcell_id)
 
     aligned_dir = os.path.join(runfolder_dir, "Aligned")
     os.mkdir(aligned_dir)
@@ -49,15 +54,17 @@ def make_runfolder(obj=None):
     make_aligned_config_1_12(aligned_dir)
 
     if obj is not None:
+        obj.flowcell_id = flowcell_id
         obj.temp_dir = temp_dir
         obj.runfolder_dir = runfolder_dir
         obj.data_dir = data_dir
         obj.image_analysis_dir = intensities_dir
         obj.bustard_dir = unaligned_dir
         obj.gerald_dir = aligned_dir
+        obj.reads = 2
 
 
-class RunfolderTests(unittest.TestCase):
+class RunfolderTests(TestCase):
     """
     Test components of the runfolder processing code
     which includes firecrest, bustard, and gerald
@@ -72,6 +79,7 @@ class RunfolderTests(unittest.TestCase):
     def test_bustard(self):
         """Construct a bustard object"""
         b = bustard.bustard(self.bustard_dir)
+        self.failUnlessEqual(b.software, 'RTA')
         self.failUnlessEqual(b.version, '1.12.4.2')
         self.failUnlessEqual(b.date,    None)
         self.failUnlessEqual(b.user,    None)
@@ -79,15 +87,17 @@ class RunfolderTests(unittest.TestCase):
 
         xml = b.get_elements()
         b2 = bustard.Bustard(xml=xml)
-        self.failUnlessEqual(b.version, b2.version)
-        self.failUnlessEqual(b.date,    b2.date )
-        self.failUnlessEqual(b.user,    b2.user)
+        self.failUnlessEqual(b.software, b2.software)
+        self.failUnlessEqual(b.version,  b2.version)
+        self.failUnlessEqual(b.date,     b2.date )
+        self.failUnlessEqual(b.user,     b2.user)
 
     def test_gerald(self):
         # need to update gerald and make tests for it
         g = gerald.gerald(self.gerald_dir)
 
-        self.failUnlessEqual(g.version, 'CASAVA-1.8.1')
+        self.failUnlessEqual(g.software, 'CASAVA')
+        self.failUnlessEqual(g.version, '1.8.1')
         self.failUnlessEqual(len(g.lanes), len(g.lanes.keys()))
         self.failUnlessEqual(len(g.lanes), len(g.lanes.items()))
 
@@ -95,38 +105,28 @@ class RunfolderTests(unittest.TestCase):
         # make_gerald_config.
         # the first None is to offset the genomes list to be 1..9
         # instead of pythons default 0..8
-        genomes = [None,
-                   '/g/mm9',
-                   '/g/mm9',
-                   '/g/elegans190',
-                   '/g/arabidopsis01222004',
-                   '/g/mm9',
-                   '/g/mm9',
-                   '/g/mm9',
-                   '/g/mm9', ]
-
         # test lane specific parameters from gerald config file
-        for i in range(1,9):
-            cur_lane = g.lanes[i]
-            self.failUnlessEqual(cur_lane.analysis, 'eland_extended')
-            self.failUnlessEqual(cur_lane.eland_genome, genomes[i])
-            self.failUnlessEqual(cur_lane.read_length, '37')
-            self.failUnlessEqual(cur_lane.use_bases, 'Y'*37)
-
-        # I want to be able to use a simple iterator
-        for l in g.lanes.values():
-          self.failUnlessEqual(l.analysis, 'eland_extended')
-          self.failUnlessEqual(l.read_length, '37')
-          self.failUnlessEqual(l.use_bases, 'Y'*37)
+
+        undetermined = g.lanes[SampleKey(sample='Undetermined_indices')]
+        self.failUnlessEqual(undetermined.analysis, 'none')
+        self.failUnlessEqual(undetermined.read_length, None)
+        self.failUnlessEqual(undetermined.use_bases, None)
+
+        project = g.lanes[SampleKey(sample='11115')]
+        self.failUnlessEqual(project.analysis, 'eland_extended')
+        self.failUnlessEqual(project.eland_genome, '/g/hg18/chromosomes/')
+        self.failUnlessEqual(project.read_length, '49')
+        self.failUnlessEqual(project.use_bases, 'y'*49+'n')
 
         # test data extracted from summary file
         clusters = [None,
-                    (281331, 11169), (203841, 13513),
-                    (220889, 15653), (137294, 14666),
-                    (129388, 14525), (262092, 10751),
-                    (185754, 13503), (233765, 9537),]
+                    (3878755,  579626.0), (3920639, 1027332.4),
+                    (5713049,  876187.3), (5852907,  538640.6),
+                    (4006751, 1265247.4), (5678021,  627070.7),
+                    (1854131,  429053.2), (4777517,  592904.0),
+                   ]
 
-        self.failUnlessEqual(len(g.summary), 1)
+        self.failUnlessEqual(len(g.summary), self.reads)
         for i in range(1,9):
             summary_lane = g.summary[0][i]
             self.failUnlessEqual(summary_lane.cluster, clusters[i])
@@ -135,17 +135,17 @@ class RunfolderTests(unittest.TestCase):
         xml = g.get_elements()
         # just make sure that element tree can serialize the tree
         xml_str = ElementTree.tostring(xml)
-        g2 = gerald.Gerald(xml=xml)
-        return
+        g2 = gerald.CASAVA(xml=xml)
 
         # do it all again after extracting from the xml file
+        self.failUnlessEqual(g.software, g2.software)
         self.failUnlessEqual(g.version, g2.version)
         self.failUnlessEqual(g.date, g2.date)
         self.failUnlessEqual(len(g.lanes.keys()), len(g2.lanes.keys()))
         self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
 
         # test lane specific parameters from gerald config file
-        for i in range(1,9):
+        for i in g.lanes.keys():
             g_lane = g.lanes[i]
             g2_lane = g2.lanes[i]
             self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
@@ -154,7 +154,7 @@ class RunfolderTests(unittest.TestCase):
             self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
 
         # test (some) summary elements
-        self.failUnlessEqual(len(g.summary), 1)
+        self.failUnlessEqual(len(g.summary), self.reads)
         for i in range(1,9):
             g_summary = g.summary[0][i]
             g2_summary = g2.summary[0][i]
@@ -163,9 +163,9 @@ class RunfolderTests(unittest.TestCase):
 
             g_eland = g.eland_results
             g2_eland = g2.eland_results
-            for lane in g_eland.results[0].keys():
-                g_results = g_eland.results[0][lane]
-                g2_results = g2_eland.results[0][lane]
+            for key in g_eland:
+                g_results = g_eland[key]
+                g2_results = g2_eland[key]
                 self.failUnlessEqual(g_results.reads,
                                      g2_results.reads)
                 if isinstance(g_results, eland.ElandLane):
@@ -183,62 +183,60 @@ class RunfolderTests(unittest.TestCase):
 
 
     def test_eland(self):
-        return
         hg_map = {'Lambda.fa': 'Lambda.fa'}
         for i in range(1,22):
           short_name = 'chr%d.fa' % (i,)
           long_name = 'hg18/chr%d.fa' % (i,)
           hg_map[short_name] = long_name
 
-        genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
-                        5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
+        samples = set(('11111', '11112', '11113', '11114', '11115',
+                       '11116', '11117', '11118', '11119', '11120'))
+        genome_maps = {}
+        for i in range(1,9):
+            genome_maps[i] = hg_map
+
         eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
 
-        # I added sequence lanes to the last 2 lanes of this test case
-        for i in range(1,7):
-            lane = eland_container.results[0][i]
-            self.failUnlessEqual(lane.reads, 6)
-            self.failUnlessEqual(lane.sample_name, "s")
-            self.failUnlessEqual(lane.lane_id, i)
-            self.failUnlessEqual(len(lane.mapped_reads), 17)
-            self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
-            self.failUnlessEqual(lane.match_codes['U0'], 3)
-            self.failUnlessEqual(lane.match_codes['R0'], 2)
-            self.failUnlessEqual(lane.match_codes['U1'], 1)
-            self.failUnlessEqual(lane.match_codes['R1'], 9)
-            self.failUnlessEqual(lane.match_codes['U2'], 0)
-            self.failUnlessEqual(lane.match_codes['R2'], 12)
-            self.failUnlessEqual(lane.match_codes['NM'], 1)
-            self.failUnlessEqual(lane.match_codes['QC'], 0)
-
-        # test scarf
-        lane = eland_container.results[0][7]
-        self.failUnlessEqual(lane.reads, 5)
-        self.failUnlessEqual(lane.sample_name, 's')
-        self.failUnlessEqual(lane.lane_id, 7)
-        self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
-
-        # test fastq
-        lane = eland_container.results[0][8]
-        self.failUnlessEqual(lane.reads, 3)
-        self.failUnlessEqual(lane.sample_name, 's')
-        self.failUnlessEqual(lane.lane_id, 8)
-        self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
+        for lane in eland_container.values():
+            # I added sequence lanes to the last 2 lanes of this test case
+            if lane.sample_name == '11113':
+                self.assertEqual(lane.reads, 24)
+                self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 6)
+                self.assertEqual(lane.match_codes['U0'], 6)
+                self.assertEqual(lane.match_codes['R0'], 18)
+                self.assertEqual(lane.match_codes['R1'], 24)
+                self.assertEqual(lane.match_codes['R2'], 18)
+                self.assertEqual(lane.match_codes['NM'], 12)
+            else:
+                self.assertEqual(lane.reads, 8)
+                self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 2)
+                self.assertEqual(lane.match_codes['U0'], 2)
+                self.assertEqual(lane.match_codes['R0'], 6)
+                self.assertEqual(lane.match_codes['R1'], 8)
+                self.assertEqual(lane.match_codes['R2'], 6)
+                self.assertEqual(lane.match_codes['NM'], 4)
+
+            self.assertTrue(lane.sample_name in samples)
+            #self.assertEqual(lane.lane_id, 1)
+            self.assertEqual(len(lane.mapped_reads), 1)
+            self.assertEqual(lane.match_codes['U1'], 0)
+            self.assertEqual(lane.match_codes['U2'], 0)
+            self.assertEqual(lane.match_codes['QC'], 0)
 
         xml = eland_container.get_elements()
         # just make sure that element tree can serialize the tree
         xml_str = ElementTree.tostring(xml)
         e2 = gerald.ELAND(xml=xml)
 
-        for i in range(1,9):
-            l1 = eland_container.results[0][i]
-            l2 = e2.results[0][i]
+        for key in eland_container.results:
+            l1 = eland_container.results[key]
+            l2 = e2.results[key]
             self.failUnlessEqual(l1.reads, l2.reads)
             self.failUnlessEqual(l1.sample_name, l2.sample_name)
             self.failUnlessEqual(l1.lane_id, l2.lane_id)
             if isinstance(l1, eland.ElandLane):
               self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
-              self.failUnlessEqual(len(l1.mapped_reads), 17)
+              self.failUnlessEqual(len(l1.mapped_reads), 1)
               for k in l1.mapped_reads.keys():
                   self.failUnlessEqual(l1.mapped_reads[k],
                                        l2.mapped_reads[k])
@@ -252,36 +250,37 @@ class RunfolderTests(unittest.TestCase):
                 self.failUnlessEqual(l1.sequence_type, l2.sequence_type)
 
     def test_runfolder(self):
-        return
         runs = runfolder.get_runs(self.runfolder_dir)
 
         # do we get the flowcell id from the filename?
         self.failUnlessEqual(len(runs), 1)
-        name = 'run_4286GAAXX_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
-        self.failUnlessEqual(runs[0].name, name)
-
-        # do we get the flowcell id from the FlowcellId.xml file
-        make_flowcell_id(self.runfolder_dir, '207BTAAXY')
-        runs = runfolder.get_runs(self.runfolder_dir)
-        self.failUnlessEqual(len(runs), 1)
-        name = 'run_207BTAAXY_%s.xml' % ( date.today().strftime('%Y-%m-%d'),)
-        self.failUnlessEqual(runs[0].name, name)
+        self.assertEqual(runs[0].flowcell_id, self.flowcell_id)
+        name = 'run_%s_%s.xml' % ( self.flowcell_id,
+                                   date.today().strftime('%Y-%m-%d'),)
+        self.failUnlessEqual(runs[0].serialization_filename, name)
 
+        bustard_dir = os.path.join(self.runfolder_dir, 'Unaligned')
         r1 = runs[0]
+        self.failUnlessEqual(r1.bustard.sequence_format, 'fastq')
+        self.failUnlessEqual(r1.bustard.pathname, bustard_dir)
+        self.failUnlessEqual(r1.gerald.runfolder_name, 'Unaligned')
+
         xml = r1.get_elements()
         xml_str = ElementTree.tostring(xml)
 
         r2 = runfolder.PipelineRun(xml=xml)
-        self.failUnlessEqual(r1.name, r2.name)
+        self.failUnlessEqual(r1.serialization_filename, r2.serialization_filename)
         self.failIfEqual(r2.image_analysis, None)
         self.failIfEqual(r2.bustard, None)
         self.failIfEqual(r2.gerald, None)
 
-
 def suite():
-    return unittest.makeSuite(RunfolderTests,'test')
+    from unittest import TestSuite, defaultTestLoader
+    suite = TestSuite()
+    suite.addTests(defaultTestLoader.loadTestsFromTestCase(RunfolderTests))
+    return suite
 
-if __name__ == "__main__":
-    logging.basicConfig(level=logging.WARN)
-    unittest.main(defaultTest="suite")
 
+if __name__ == "__main__":
+    from unittest import main
+    main(defaultTest="suite")