Add more tests for lanes_for
[htsworkflow.git] / experiments / test_experiments.py
index f0c36104de1ec00376e2eaba4da1022c8229bddb..1635560bbada748f402ff148aac973380c7ef05a 100644 (file)
@@ -1,16 +1,13 @@
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
 
 import re
 from lxml.html import fromstring
-try:
-    import json
-except ImportError, e:
-    import simplejson as json
+import json
 import os
 import shutil
 import sys
 import tempfile
-from urlparse import urljoin
+from six.moves.urllib.parse import urljoin
 
 from django.conf import settings
 from django.core import mail
@@ -19,10 +16,10 @@ from django.test import TestCase
 from django.test.utils import setup_test_environment, teardown_test_environment
 from django.db import connection
 from django.conf import settings
+from django.utils.encoding import smart_text
 
 from .models import ClusterStation, cluster_station_default, \
-    DataRun, Sequencer, FlowCell, FileType, \
-    find_file_type_metadata_from_filename
+    DataRun, Sequencer, FlowCell, FileType
 from samples.models import HTSUser
 from .experiments import flowcell_information, lanes_for
 from .experiments_factory import ClusterStationFactory, FlowCellFactory, LaneFactory
@@ -109,7 +106,7 @@ class ExperimentsTestCases(TestCase):
         fc42jtn = self.fc42jtn
         fc42ju1 = FlowCellFactory(flowcell_id='42JU1AAXX')
 
-        for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
+        for fc_id in ['FC12150', '42JTNAAXX', '42JU1AAXX']:
             fc_dict = flowcell_information(fc_id)
             fc_django = FlowCell.objects.get(flowcell_id=fc_id)
             self.assertEqual(fc_dict['flowcell_id'], fc_id)
@@ -134,7 +131,7 @@ class ExperimentsTestCases(TestCase):
 
             response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
             # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
-            fc_json = json.loads(response.content)['result']
+            fc_json = json.loads(smart_text(response.content))['result']
             self.assertEqual(fc_json['flowcell_id'], fc_id)
             self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
             self.assertEqual(fc_json['read_length'], fc_django.read_length)
@@ -143,7 +140,7 @@ class ExperimentsTestCases(TestCase):
 
 
             for lane in fc_django.lane_set.all():
-                lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
+                lane_contents = fc_json['lane_set'][str(lane.lane_number)]
                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
 
                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
@@ -167,7 +164,7 @@ class ExperimentsTestCases(TestCase):
         """
         Require logging in to retrieve meta data
         """
-        response = self.client.get(u'/experiments/config/FC12150/json')
+        response = self.client.get('/experiments/config/FC12150/json')
         self.assertEqual(response.status_code, 403)
 
     def test_library_id(self):
@@ -176,7 +173,7 @@ class ExperimentsTestCases(TestCase):
         """
         response = self.client.get('/experiments/config/FC12150/json', apidata)
         self.assertEqual(response.status_code, 200)
-        flowcell = json.loads(response.content)['result']
+        flowcell = json.loads(smart_text(response.content))['result']
 
         # library id is 12150 + lane number (1-8), so 12153
         lane_contents = flowcell['lane_set']['3']
@@ -185,7 +182,7 @@ class ExperimentsTestCases(TestCase):
 
         response = self.client.get('/samples/library/12153/json', apidata)
         self.assertEqual(response.status_code, 200)
-        library_12153 = json.loads(response.content)['result']
+        library_12153 = json.loads(smart_text(response.content))['result']
 
         self.assertEqual(library_12153['library_id'], '12153')
 
@@ -201,7 +198,7 @@ class ExperimentsTestCases(TestCase):
         This tests to make sure that the value entered in the raw library id field matches
         the library id looked up.
         """
-        expected_ids = [ u'1215{}'.format(i) for i in range(1,9) ]
+        expected_ids = [ '1215{}'.format(i) for i in range(1,9) ]
         self.assertTrue(self.client.login(username=self.admin.username, password=self.password))
         response = self.client.get('/admin/experiments/flowcell/{}/'.format(self.fc12150.id))
 
@@ -258,7 +255,36 @@ class ExperimentsTestCases(TestCase):
         self.assertTrue(self.fc42jtn_lanes[2].library.multiplex_id in \
                         lane_dict['13003']['index_sequence'])
 
-    def test_lanes_for(self):
+
+    def test_lanes_for_view_user_odd(self):
+        """Make sure lanes_for HTML UI works.
+        """
+        user = self.user_odd.username
+        lanes = lanes_for(user)
+        self.assertEqual(len(lanes), 8)
+
+        response = self.client.get(
+            reverse('experiments.views.lanes_for',
+                    args=[user]))
+        self.assertEqual(response.status_code, 200)
+        tree = fromstring(response.content)
+        lane_trs = tree.xpath('//div[@id="changelist"]/table/tbody/tr')
+        self.assertEqual(len(lane_trs), len(lanes))
+        # lanes is in db order
+        # lane_trs is in newest to oldest order
+        for lane_tr, lane_db in zip(lane_trs, reversed(lanes)):
+            library_id = lane_tr.xpath('td[6]/a')[0].text
+            self.assertEqual(library_id, lane_db['library'])
+
+    def test_lanes_for_view_invalid_user(self):
+        """Make sure we don't find anything with an invalid user
+        """
+        response = self.client.get(
+            reverse('experiments.views.lanes_for',
+                    args=["doesntexist"]))
+        self.assertEqual(response.status_code, 404)
+
+    def test_lanes_for_json(self):
         """
         Check the code that packs the django objects into simple types.
         """
@@ -267,7 +293,7 @@ class ExperimentsTestCases(TestCase):
         self.assertEqual(len(lanes), 8)
 
         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
-        lanes_json = json.loads(response.content)['result']
+        lanes_json = json.loads(smart_text(response.content))['result']
         self.assertEqual(len(lanes), len(lanes_json))
         for i in range(len(lanes)):
             self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
@@ -296,7 +322,6 @@ class ExperimentsTestCases(TestCase):
         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
         self.assertEqual(response.status_code, 404)
 
-
     def test_raw_data_dir(self):
         """Raw data path generator check"""
         flowcell_id = self.fc1_id
@@ -393,7 +418,7 @@ class ExperimentsTestCases(TestCase):
         if status is not None: self.assertTrue(status)
 
         ns = urljoin('http://localhost', url)
-        load_string_into_model(model, 'rdfa', response.content, ns=ns)
+        load_string_into_model(model, 'rdfa', smart_text(response.content), ns=ns)
         body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
 
@@ -410,71 +435,12 @@ class ExperimentsTestCases(TestCase):
         count = 0
         for r in query.execute(model):
             count += 1
-            self.assertEqual(fromTypedNode(r['flowcell_id']), u'FC12150')
+            self.assertEqual(fromTypedNode(r['flowcell_id']), 'FC12150')
             lane_id = fromTypedNode(r['lane_id'])
             library_id = fromTypedNode(r['library_id'])
             self.assertTrue(library_id in expected[lane_id])
         self.assertEqual(count, 8)
 
-
-class TestFileType(TestCase):
-    def test_file_type_unicode(self):
-        file_type_objects = FileType.objects
-        name = 'QSEQ tarfile'
-        file_type_object = file_type_objects.get(name=name)
-        self.assertEqual(u"QSEQ tarfile",
-                             unicode(file_type_object))
-
-    def test_find_file_type(self):
-        file_type_objects = FileType.objects
-        cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
-                  'QSEQ tarfile', 7, 1),
-                 ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
-                  'SRF', 1, None),
-                 ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
-                 ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
-                 ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
-                 ('s_1_export.txt.bz2','ELAND Export', 1, None),
-                 ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
-                 ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
-                 ('s_3_percent_all.png', 'IVC Percent All', 3, None),
-                 ('s_4_call.png', 'IVC Call', 4, None),
-                 ('s_5_all.png', 'IVC All', 5, None),
-                 ('Summary.htm', 'Summary.htm', None, None),
-                 ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
-         ]
-        for filename, typename, lane, end in cases:
-            ft = find_file_type_metadata_from_filename(filename)
-            self.assertEqual(ft['file_type'],
-                                 file_type_objects.get(name=typename))
-            self.assertEqual(ft.get('lane', None), lane)
-            self.assertEqual(ft.get('end', None), end)
-
-    def test_assign_file_type_complex_path(self):
-        file_type_objects = FileType.objects
-        cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
-                  'QSEQ tarfile', 7, 1),
-                 ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
-                  'SRF', 1, None),
-                 ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
-                 ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
-                 ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
-                 ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
-                 ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
-                 ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
-                 ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
-                 ('amonkey/s_4_call.png', 'IVC Call', 4, None),
-                 ('fishie/s_5_all.png', 'IVC All', 5, None),
-                 ('/random/Summary.htm', 'Summary.htm', None, None),
-                 ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
-         ]
-        for filename, typename, lane, end in cases:
-            result = find_file_type_metadata_from_filename(filename)
-            self.assertEqual(result['file_type'],
-                                 file_type_objects.get(name=typename))
-            self.assertEqual(result.get('lane',None), lane)
-            self.assertEqual(result.get('end', None), end)
-
 class TestEmailNotify(TestCase):
     def setUp(self):
         self.password = 'foo27'
@@ -520,8 +486,8 @@ class TestEmailNotify(TestCase):
         response = self.client.get(self.url)
         self.assertEqual(response.status_code, 200)
 
-        self.assertTrue(self.affiliation.email in response.content)
-        self.assertTrue(self.library.library_name in response.content)
+        self.assertTrue(self.affiliation.email in smart_text(response.content))
+        self.assertTrue(self.library.library_name in smart_text(response.content))
 
         response = self.client.get(self.url, {'send':'1','bcc':'on'})
         self.assertEqual(response.status_code, 200)
@@ -541,9 +507,10 @@ class TestEmailNotify(TestCase):
         response = self.client.get(self.url)
         self.assertEqual(response.status_code, 200)
         #print("email navigation content:", response.content)
-        self.assertTrue(re.search(self.fc.flowcell_id, response.content))
+        self.assertTrue(re.search(self.fc.flowcell_id, smart_text(response.content)))
         # require that navigation back to the admin page exists
-        self.assertTrue(re.search('<a href="{}">[^<]+</a>'.format(admin_url), response.content))
+        self.assertTrue(re.search('<a href="{}">[^<]+</a>'.format(admin_url),
+                                  smart_text(response.content)))
 
 def multi_lane_to_dict(lane):
     """Convert a list of lane entries into a dictionary indexed by library ID
@@ -562,7 +529,7 @@ class TestSequencer(TestCase):
         seq.instrument_name = "HWI-SEQ1"
         seq.model = "Imaginary 5000"
 
-        self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
+        self.assertEqual(str(seq), "Seq1 (HWI-SEQ1)")
 
     def test_lookup(self):
         fc = self.fc12150
@@ -614,7 +581,7 @@ class TestSequencer(TestCase):
         status = validate_xhtml(response.content)
         if status is not None: self.assertTrue(status)
 
-        load_string_into_model(model, 'rdfa', response.content)
+        load_string_into_model(model, 'rdfa', smart_text(response.content))
 
         errmsgs = list(inference.run_validation())
         self.assertEqual(len(errmsgs), 0)
@@ -632,11 +599,12 @@ class TestSequencer(TestCase):
 
         url = '/lane/{}'.format(self.lane.id)
         response = self.client.get(url)
+        rdfbody = smart_text(response.content)
         self.assertEqual(response.status_code, 200)
-        status = validate_xhtml(response.content)
+        status = validate_xhtml(rdfbody)
         if status is not None: self.assertTrue(status)
 
-        load_string_into_model(model, 'rdfa', response.content)
+        load_string_into_model(model, 'rdfa', rdfbody)
 
         errmsgs = list(inference.run_validation())
         self.assertEqual(len(errmsgs), 0)
@@ -644,10 +612,7 @@ class TestSequencer(TestCase):
 def suite():
     from unittest import TestSuite, defaultTestLoader
     suite = TestSuite()
-    for testcase in [ClusterStationTestCases,
-                     SequencerTestCases,
-                     ExerimentsTestCases,
-                     TestFileType,
+    for testcase in [ExerimentsTestCases,
                      TestEmailNotify,
                      TestSequencer]:
         suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))