Renamed various django tests.py to test_module.py
authorDiane Trout <diane@caltech.edu>
Tue, 20 Nov 2012 22:29:20 +0000 (14:29 -0800)
committerDiane Trout <diane@caltech.edu>
Tue, 20 Nov 2012 22:29:20 +0000 (14:29 -0800)
It appears by default py.test was looking for packages named test_,
since it wasn't finding the tests.py modules. This seemed like
a reasonable alternative convention.

htsworkflow/frontend/experiments/test_experiments.py [new file with mode: 0644]
htsworkflow/frontend/experiments/tests.py [deleted file]
htsworkflow/frontend/inventory/test_inventory.py [new file with mode: 0644]
htsworkflow/frontend/inventory/tests.py [deleted file]
htsworkflow/frontend/labels/test_labels.py [new file with mode: 0644]
htsworkflow/frontend/labels/tests.py [deleted file]
htsworkflow/frontend/samples/test_samples.py [new file with mode: 0644]
htsworkflow/frontend/samples/tests.py [deleted file]

diff --git a/htsworkflow/frontend/experiments/test_experiments.py b/htsworkflow/frontend/experiments/test_experiments.py
new file mode 100644 (file)
index 0000000..0f24147
--- /dev/null
@@ -0,0 +1,677 @@
+import re
+from lxml.html import fromstring
+try:
+    import json
+except ImportError, e:
+    import simplejson as json
+import os
+import shutil
+import sys
+import tempfile
+from urlparse import urljoin
+
+from django.conf import settings
+from django.core import mail
+from django.core.exceptions import ObjectDoesNotExist
+from django.test import TestCase
+from htsworkflow.frontend.experiments import models
+from htsworkflow.frontend.experiments import experiments
+from htsworkflow.frontend.auth import apidata
+from htsworkflow.util.ethelp import validate_xhtml
+
+from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
+
+LANE_SET = range(1,9)
+
+NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
+
+class ClusterStationTestCases(TestCase):
+    fixtures = ['test_flowcells.json']
+
+    def test_default(self):
+        c = models.ClusterStation.default()
+        self.assertEqual(c.id, 2)
+
+        c.isdefault = False
+        c.save()
+
+        total = models.ClusterStation.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 0)
+
+        other_default = models.ClusterStation.default()
+        self.assertEqual(other_default.id, 3)
+
+
+    def test_update_default(self):
+        old_default = models.ClusterStation.default()
+
+        c = models.ClusterStation.objects.get(pk=3)
+        c.isdefault = True
+        c.save()
+
+        new_default = models.ClusterStation.default()
+
+        self.assertNotEqual(old_default, new_default)
+        self.assertEqual(new_default, c)
+
+        total = models.ClusterStation.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 1)
+
+    def test_update_other(self):
+        old_default = models.ClusterStation.default()
+        total = models.ClusterStation.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 1)
+
+        c = models.ClusterStation.objects.get(pk=1)
+        c.name = "Primary Key 1"
+        c.save()
+
+        total = models.ClusterStation.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 1)
+
+        new_default = models.ClusterStation.default()
+        self.assertEqual(old_default, new_default)
+
+
+class SequencerTestCases(TestCase):
+    fixtures = ['test_flowcells.json']
+
+    def test_default(self):
+        # starting with no default
+        s = models.Sequencer.default()
+        self.assertEqual(s.id, 2)
+
+        total = models.Sequencer.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 1)
+
+        s.isdefault = False
+        s.save()
+
+        total = models.Sequencer.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 0)
+
+        other_default = models.Sequencer.default()
+        self.assertEqual(other_default.id, 7)
+
+    def test_update_default(self):
+        old_default = models.Sequencer.default()
+
+        s = models.Sequencer.objects.get(pk=1)
+        s.isdefault = True
+        s.save()
+
+        new_default = models.Sequencer.default()
+
+        self.assertNotEqual(old_default, new_default)
+        self.assertEqual(new_default, s)
+
+        total = models.Sequencer.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 1)
+
+
+    def test_update_other(self):
+        old_default = models.Sequencer.default()
+        total = models.Sequencer.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 1)
+
+        s = models.Sequencer.objects.get(pk=1)
+        s.name = "Primary Key 1"
+        s.save()
+
+        total = models.Sequencer.objects.filter(isdefault=True).count()
+        self.assertEqual(total, 1)
+
+        new_default = models.Sequencer.default()
+        self.assertEqual(old_default, new_default)
+
+
+class ExperimentsTestCases(TestCase):
+    fixtures = ['test_flowcells.json',
+                ]
+
+    def setUp(self):
+        self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
+        settings.RESULT_HOME_DIR = self.tempdir
+
+        self.fc1_id = 'FC12150'
+        self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
+        os.mkdir(self.fc1_root)
+        self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
+        os.mkdir(self.fc1_dir)
+        runxml = 'run_FC12150_2007-09-27.xml'
+        shutil.copy(os.path.join(TESTDATA_DIR, runxml),
+                    os.path.join(self.fc1_dir, runxml))
+        for i in range(1,9):
+            shutil.copy(
+                os.path.join(TESTDATA_DIR,
+                             'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
+                os.path.join(self.fc1_dir,
+                             'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
+                )
+
+        self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
+        os.mkdir(self.fc2_dir)
+        os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
+        os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
+        os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
+
+    def tearDown(self):
+        shutil.rmtree(self.tempdir)
+
+    def test_flowcell_information(self):
+        """
+        Check the code that packs the django objects into simple types.
+        """
+        for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
+            fc_dict = experiments.flowcell_information(fc_id)
+            fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
+            self.assertEqual(fc_dict['flowcell_id'], fc_id)
+            self.assertEqual(fc_django.flowcell_id, fc_id)
+            self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
+            self.assertEqual(fc_dict['read_length'], fc_django.read_length)
+            self.assertEqual(fc_dict['notes'], fc_django.notes)
+            self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
+
+            for lane in fc_django.lane_set.all():
+                lane_contents = fc_dict['lane_set'][lane.lane_number]
+                lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
+                self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
+                self.assertEqual(lane_dict['comment'], lane.comment)
+                self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
+                self.assertEqual(lane_dict['lane_number'], lane.lane_number)
+                self.assertEqual(lane_dict['library_name'], lane.library.library_name)
+                self.assertEqual(lane_dict['library_id'], lane.library.id)
+                self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
+                self.assertEqual(lane_dict['library_species'],
+                                     lane.library.library_species.scientific_name)
+
+            response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
+            # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
+            fc_json = json.loads(response.content)
+            self.assertEqual(fc_json['flowcell_id'], fc_id)
+            self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
+            self.assertEqual(fc_json['read_length'], fc_django.read_length)
+            self.assertEqual(fc_json['notes'], fc_django.notes)
+            self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
+
+
+            for lane in fc_django.lane_set.all():
+                lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
+                lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
+
+                self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
+                self.assertEqual(lane_dict['comment'], lane.comment)
+                self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
+                self.assertEqual(lane_dict['lane_number'], lane.lane_number)
+                self.assertEqual(lane_dict['library_name'], lane.library.library_name)
+                self.assertEqual(lane_dict['library_id'], lane.library.id)
+                self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
+                self.assertEqual(lane_dict['library_species'],
+                                     lane.library.library_species.scientific_name)
+
+    def test_invalid_flowcell(self):
+        """
+        Make sure we get a 404 if we request an invalid flowcell ID
+        """
+        response = self.client.get('/experiments/config/nottheone/json', apidata)
+        self.assertEqual(response.status_code, 404)
+
+    def test_no_key(self):
+        """
+        Require logging in to retrieve meta data
+        """
+        response = self.client.get(u'/experiments/config/FC12150/json')
+        self.assertEqual(response.status_code, 403)
+
+    def test_library_id(self):
+        """
+        Library IDs should be flexible, so make sure we can retrive a non-numeric ID
+        """
+        response = self.client.get('/experiments/config/FC12150/json', apidata)
+        self.assertEqual(response.status_code, 200)
+        flowcell = json.loads(response.content)
+
+        lane_contents = flowcell['lane_set']['3']
+        lane_library = lane_contents[0]
+        self.assertEqual(lane_library['library_id'], 'SL039')
+
+        response = self.client.get('/samples/library/SL039/json', apidata)
+        self.assertEqual(response.status_code, 200)
+        library_sl039 = json.loads(response.content)
+
+        self.assertEqual(library_sl039['library_id'], 'SL039')
+
+    def test_raw_id_field(self):
+        """
+        Test ticket:147
+
+        Library's have IDs, libraries also have primary keys,
+        we eventually had enough libraries that the drop down combo box was too
+        hard to filter through, unfortnately we want a field that uses our library
+        id and not the internal primary key, and raw_id_field uses primary keys.
+
+        This tests to make sure that the value entered in the raw library id field matches
+        the library id looked up.
+        """
+        expected_ids = [u'10981',u'11016',u'SL039',u'11060',
+                        u'11061',u'11062',u'11063',u'11064']
+        self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
+        response = self.client.get('/admin/experiments/flowcell/153/')
+
+        tree = fromstring(response.content)
+        for i in range(0,8):
+            xpath_expression = '//input[@id="id_lane_set-%d-library"]'
+            input_field = tree.xpath(xpath_expression % (i,))[0]
+            library_field = input_field.find('../strong')
+            library_id, library_name = library_field.text.split(':')
+            # strip leading '#' sign from name
+            library_id = library_id[1:]
+            self.assertEqual(library_id, expected_ids[i])
+            self.assertEqual(input_field.attrib['value'], library_id)
+
+    def test_library_to_flowcell_link(self):
+        """
+        Make sure the library page includes links to the flowcell pages.
+        That work with flowcell IDs that have parenthetical comments.
+        """
+        self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
+        response = self.client.get('/library/11070/')
+        self.assertEqual(response.status_code, 200)
+        status = validate_xhtml(response.content)
+        if status is not None: self.assertTrue(status)
+
+        tree = fromstring(response.content)
+        flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
+                                    namespaces=NSMAP)
+        self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)')
+        failed_fc_span = flowcell_spans[0]
+        failed_fc_a = failed_fc_span.getparent()
+        # make sure some of our RDF made it.
+        self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
+        self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
+        fc_response = self.client.get(failed_fc_a.get('href'))
+        self.assertEqual(fc_response.status_code, 200)
+        status = validate_xhtml(response.content)
+        if status is not None: self.assertTrue(status)
+
+        fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
+        self.assertEqual(fc_lane_response.status_code, 200)
+        status = validate_xhtml(response.content)
+        if status is not None: self.assertTrue(status)
+
+
+    def test_pooled_multiplex_id(self):
+        fc_dict = experiments.flowcell_information('42JU1AAXX')
+        lane_contents = fc_dict['lane_set'][3]
+        self.assertEqual(len(lane_contents), 2)
+        lane_dict = multi_lane_to_dict(lane_contents)
+
+        self.assertEqual(lane_dict['12044']['index_sequence'],
+                         {u'1': u'ATCACG',
+                          u'2': u'CGATGT',
+                          u'3': u'TTAGGC'})
+        self.assertEqual(lane_dict['11045']['index_sequence'],
+                         {u'1': u'ATCACG'})
+
+
+
+    def test_lanes_for(self):
+        """
+        Check the code that packs the django objects into simple types.
+        """
+        user = 'test'
+        lanes = experiments.lanes_for(user)
+        self.assertEqual(len(lanes), 5)
+
+        response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
+        lanes_json = json.loads(response.content)
+        self.assertEqual(len(lanes), len(lanes_json))
+        for i in range(len(lanes)):
+            self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
+            self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
+            self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
+            self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
+
+    def test_lanes_for_no_lanes(self):
+        """
+        Do we get something meaningful back when the user isn't attached to anything?
+        """
+        user = 'supertest'
+        lanes = experiments.lanes_for(user)
+        self.assertEqual(len(lanes), 0)
+
+        response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
+        lanes_json = json.loads(response.content)
+
+    def test_lanes_for_no_user(self):
+        """
+        Do we get something meaningful back when its the wrong user
+        """
+        user = 'not a real user'
+        self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
+
+        response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
+        self.assertEqual(response.status_code, 404)
+
+
+    def test_raw_data_dir(self):
+        """Raw data path generator check"""
+        flowcell_id = self.fc1_id
+        raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
+
+        fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
+        self.assertEqual(fc.get_raw_data_directory(), raw_dir)
+
+        fc.flowcell_id = flowcell_id + " (failed)"
+        self.assertEqual(fc.get_raw_data_directory(), raw_dir)
+
+
+    def test_data_run_import(self):
+        srf_file_type = models.FileType.objects.get(name='SRF')
+        runxml_file_type = models.FileType.objects.get(name='run_xml')
+        flowcell_id = self.fc1_id
+        flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
+        flowcell.update_data_runs()
+        self.assertEqual(len(flowcell.datarun_set.all()), 1)
+
+        run = flowcell.datarun_set.all()[0]
+        result_files = run.datafile_set.all()
+        result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
+
+        srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
+        self.assertEqual(srf4.file_type, srf_file_type)
+        self.assertEqual(srf4.library_id, '11060')
+        self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
+        self.assertEqual(
+            srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
+            '11060')
+        self.assertEqual(
+            srf4.pathname,
+            os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
+
+        lane_files = run.lane_files()
+        self.assertEqual(lane_files[4]['srf'], srf4)
+
+        runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
+        self.assertEqual(runxml.file_type, runxml_file_type)
+        self.assertEqual(runxml.library_id, None)
+
+        import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
+        # what happens if we import twice?
+        flowcell.import_data_run('FC12150/C1-37',
+                                 'run_FC12150_2007-09-27.xml')
+        self.assertEqual(
+            len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
+            import1)
+
+    def test_read_result_file(self):
+        """make sure we can return a result file
+        """
+        flowcell_id = self.fc1_id
+        flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
+        flowcell.update_data_runs()
+
+        #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
+
+        result_files = flowcell.datarun_set.all()[0].datafile_set.all()
+        for f in result_files:
+            url = '/experiments/file/%s' % ( f.random_key,)
+            response = self.client.get(url)
+            self.assertEqual(response.status_code, 200)
+            mimetype = f.file_type.mimetype
+            if mimetype is None:
+                mimetype = 'application/octet-stream'
+
+            self.assertEqual(mimetype, response['content-type'])
+
+    def test_flowcell_rdf(self):
+        import RDF
+        from htsworkflow.util.rdfhelp import get_model, \
+             fromTypedNode, \
+             load_string_into_model, \
+             rdfNS, \
+             libraryOntology, \
+             dump_model
+
+        model = get_model()
+
+        expected = {'1': ['11034'],
+                    '2': ['11036'],
+                    '3': ['12044','11045'],
+                    '4': ['11047','13044'],
+                    '5': ['11055'],
+                    '6': ['11067'],
+                    '7': ['11069'],
+                    '8': ['11070']}
+        url = '/flowcell/42JU1AAXX/'
+        response = self.client.get(url)
+        self.assertEqual(response.status_code, 200)
+        status = validate_xhtml(response.content)
+        if status is not None: self.assertTrue(status)
+
+        ns = urljoin('http://localhost', url)
+        load_string_into_model(model, 'rdfa', response.content, ns=ns)
+        body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+        prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+
+        select ?flowcell ?flowcell_id ?lane_id ?library_id
+        where {
+          ?flowcell a libns:IlluminaFlowcell ;
+                    libns:flowcell_id ?flowcell_id ;
+                    libns:has_lane ?lane .
+          ?lane libns:lane_number ?lane_id ;
+                libns:library ?library .
+          ?library libns:library_id ?library_id .
+        }"""
+        query = RDF.SPARQLQuery(body)
+        count = 0
+        for r in query.execute(model):
+            count += 1
+            self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
+            lane_id = fromTypedNode(r['lane_id'])
+            library_id = fromTypedNode(r['library_id'])
+            self.assertTrue(library_id in expected[lane_id])
+        self.assertEqual(count, 10)
+
+
+class TestFileType(TestCase):
+    def test_file_type_unicode(self):
+        file_type_objects = models.FileType.objects
+        name = 'QSEQ tarfile'
+        file_type_object = file_type_objects.get(name=name)
+        self.assertEqual(u"<FileType: QSEQ tarfile>",
+                             unicode(file_type_object))
+
+class TestFileType(TestCase):
+    def test_find_file_type(self):
+        file_type_objects = models.FileType.objects
+        cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
+                  'QSEQ tarfile', 7, 1),
+                 ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
+                  'SRF', 1, None),
+                 ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
+                 ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
+                 ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
+                 ('s_1_export.txt.bz2','ELAND Export', 1, None),
+                 ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
+                 ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
+                 ('s_3_percent_all.png', 'IVC Percent All', 3, None),
+                 ('s_4_call.png', 'IVC Call', 4, None),
+                 ('s_5_all.png', 'IVC All', 5, None),
+                 ('Summary.htm', 'Summary.htm', None, None),
+                 ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
+         ]
+        for filename, typename, lane, end in cases:
+            ft = models.find_file_type_metadata_from_filename(filename)
+            self.assertEqual(ft['file_type'],
+                                 file_type_objects.get(name=typename))
+            self.assertEqual(ft.get('lane', None), lane)
+            self.assertEqual(ft.get('end', None), end)
+
+    def test_assign_file_type_complex_path(self):
+        file_type_objects = models.FileType.objects
+        cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
+                  'QSEQ tarfile', 7, 1),
+                 ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
+                  'SRF', 1, None),
+                 ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
+                 ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
+                 ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
+                 ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
+                 ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
+                 ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
+                 ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
+                 ('amonkey/s_4_call.png', 'IVC Call', 4, None),
+                 ('fishie/s_5_all.png', 'IVC All', 5, None),
+                 ('/random/Summary.htm', 'Summary.htm', None, None),
+                 ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
+         ]
+        for filename, typename, lane, end in cases:
+            result = models.find_file_type_metadata_from_filename(filename)
+            self.assertEqual(result['file_type'],
+                                 file_type_objects.get(name=typename))
+            self.assertEqual(result.get('lane',None), lane)
+            self.assertEqual(result.get('end', None), end)
+
+class TestEmailNotify(TestCase):
+    fixtures = ['test_flowcells.json']
+
+    def test_started_email_not_logged_in(self):
+        response = self.client.get('/experiments/started/153/')
+        self.assertEqual(response.status_code, 302)
+
+    def test_started_email_logged_in_user(self):
+        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
+        response = self.client.get('/experiments/started/153/')
+        self.assertEqual(response.status_code, 302)
+
+    def test_started_email_logged_in_staff(self):
+        self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
+        response = self.client.get('/experiments/started/153/')
+        self.assertEqual(response.status_code, 200)
+
+    def test_started_email_send(self):
+        self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
+        response = self.client.get('/experiments/started/153/')
+        self.assertEqual(response.status_code, 200)
+
+        self.assertTrue('pk1@example.com' in response.content)
+        self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
+
+        response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
+        self.assertEqual(response.status_code, 200)
+        self.assertEqual(len(mail.outbox), 4)
+        bcc = set(settings.NOTIFICATION_BCC).intersect(set(settings.MANAGERS))
+        for m in mail.outbox:
+            self.assertTrue(len(m.body) > 0)
+            self.assertEqual(m.bcc, bcc)
+
+    def test_email_navigation(self):
+        """
+        Can we navigate between the flowcell and email forms properly?
+        """
+        self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
+        response = self.client.get('/experiments/started/153/')
+        self.assertEqual(response.status_code, 200)
+        self.assertTrue(re.search('Flowcell FC12150', response.content))
+        # require that navigation back to the admin page exists
+        self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
+
+def multi_lane_to_dict(lane):
+    """Convert a list of lane entries into a dictionary indexed by library ID
+    """
+    return dict( ((x['library_id'],x) for x in lane) )
+
+class TestSequencer(TestCase):
+    fixtures = ['test_flowcells.json',
+                ]
+
+    def test_name_generation(self):
+        seq = models.Sequencer()
+        seq.name = "Seq1"
+        seq.instrument_name = "HWI-SEQ1"
+        seq.model = "Imaginary 5000"
+
+        self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
+
+    def test_lookup(self):
+        fc = models.FlowCell.objects.get(pk=153)
+        self.assertEqual(fc.sequencer.model,
+                             "Illumina Genome Analyzer IIx")
+        self.assertEqual(fc.sequencer.instrument_name,
+                             "ILLUMINA-EC5D15")
+        # well actually we let the browser tack on the host name
+        url = fc.get_absolute_url()
+        self.assertEqual(url, '/flowcell/FC12150/')
+
+    def test_rdf(self):
+        response = self.client.get('/flowcell/FC12150/', apidata)
+        tree = fromstring(response.content)
+        seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
+                            namespaces=NSMAP)
+        self.assertEqual(len(seq_by), 1)
+        self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
+        seq = seq_by[0].getchildren()
+        self.assertEqual(len(seq), 1)
+        self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
+        self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
+
+        name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
+        self.assertEqual(len(name), 1)
+        self.assertEqual(name[0].text, 'Tardigrade')
+        instrument = seq[0].xpath(
+            './span[@property="libns:sequencer_instrument"]')
+        self.assertEqual(len(instrument), 1)
+        self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
+        model = seq[0].xpath(
+            './span[@property="libns:sequencer_model"]')
+        self.assertEqual(len(model), 1)
+        self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
+
+    def test_flowcell_with_rdf_validation(self):
+        from htsworkflow.util.rdfhelp import add_default_schemas, \
+             dump_model, \
+             get_model, \
+             load_string_into_model
+        from htsworkflow.util.rdfinfer import Infer
+
+        model = get_model()
+        add_default_schemas(model)
+        inference = Infer(model)
+
+        url ='/flowcell/FC12150/'
+        response = self.client.get(url)
+        self.assertEqual(response.status_code, 200)
+        status = validate_xhtml(response.content)
+        if status is not None: self.assertTrue(status)
+
+        load_string_into_model(model, 'rdfa', response.content)
+
+        errmsgs = list(inference.run_validation())
+        self.assertEqual(len(errmsgs), 2)
+        for errmsg in errmsgs:
+            self.assertEqual(errmsg, 'Missing type for: http://localhost/')
+
+    def test_lane_with_rdf_validation(self):
+        from htsworkflow.util.rdfhelp import add_default_schemas, \
+             dump_model, \
+             get_model, \
+             load_string_into_model
+        from htsworkflow.util.rdfinfer import Infer
+
+        model = get_model()
+        add_default_schemas(model)
+        inference = Infer(model)
+
+        url = '/lane/1193'
+        response = self.client.get(url)
+        self.assertEqual(response.status_code, 200)
+        status = validate_xhtml(response.content)
+        if status is not None: self.assertTrue(status)
+
+        load_string_into_model(model, 'rdfa', response.content)
+
+        errmsgs = list(inference.run_validation())
+        self.assertEqual(len(errmsgs), 2)
+        for errmsg in errmsgs:
+            self.assertEqual(errmsg, 'Missing type for: http://localhost/')
diff --git a/htsworkflow/frontend/experiments/tests.py b/htsworkflow/frontend/experiments/tests.py
deleted file mode 100644 (file)
index 0f24147..0000000
+++ /dev/null
@@ -1,677 +0,0 @@
-import re
-from lxml.html import fromstring
-try:
-    import json
-except ImportError, e:
-    import simplejson as json
-import os
-import shutil
-import sys
-import tempfile
-from urlparse import urljoin
-
-from django.conf import settings
-from django.core import mail
-from django.core.exceptions import ObjectDoesNotExist
-from django.test import TestCase
-from htsworkflow.frontend.experiments import models
-from htsworkflow.frontend.experiments import experiments
-from htsworkflow.frontend.auth import apidata
-from htsworkflow.util.ethelp import validate_xhtml
-
-from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
-
-LANE_SET = range(1,9)
-
-NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
-
-class ClusterStationTestCases(TestCase):
-    fixtures = ['test_flowcells.json']
-
-    def test_default(self):
-        c = models.ClusterStation.default()
-        self.assertEqual(c.id, 2)
-
-        c.isdefault = False
-        c.save()
-
-        total = models.ClusterStation.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 0)
-
-        other_default = models.ClusterStation.default()
-        self.assertEqual(other_default.id, 3)
-
-
-    def test_update_default(self):
-        old_default = models.ClusterStation.default()
-
-        c = models.ClusterStation.objects.get(pk=3)
-        c.isdefault = True
-        c.save()
-
-        new_default = models.ClusterStation.default()
-
-        self.assertNotEqual(old_default, new_default)
-        self.assertEqual(new_default, c)
-
-        total = models.ClusterStation.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 1)
-
-    def test_update_other(self):
-        old_default = models.ClusterStation.default()
-        total = models.ClusterStation.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 1)
-
-        c = models.ClusterStation.objects.get(pk=1)
-        c.name = "Primary Key 1"
-        c.save()
-
-        total = models.ClusterStation.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 1)
-
-        new_default = models.ClusterStation.default()
-        self.assertEqual(old_default, new_default)
-
-
-class SequencerTestCases(TestCase):
-    fixtures = ['test_flowcells.json']
-
-    def test_default(self):
-        # starting with no default
-        s = models.Sequencer.default()
-        self.assertEqual(s.id, 2)
-
-        total = models.Sequencer.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 1)
-
-        s.isdefault = False
-        s.save()
-
-        total = models.Sequencer.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 0)
-
-        other_default = models.Sequencer.default()
-        self.assertEqual(other_default.id, 7)
-
-    def test_update_default(self):
-        old_default = models.Sequencer.default()
-
-        s = models.Sequencer.objects.get(pk=1)
-        s.isdefault = True
-        s.save()
-
-        new_default = models.Sequencer.default()
-
-        self.assertNotEqual(old_default, new_default)
-        self.assertEqual(new_default, s)
-
-        total = models.Sequencer.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 1)
-
-
-    def test_update_other(self):
-        old_default = models.Sequencer.default()
-        total = models.Sequencer.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 1)
-
-        s = models.Sequencer.objects.get(pk=1)
-        s.name = "Primary Key 1"
-        s.save()
-
-        total = models.Sequencer.objects.filter(isdefault=True).count()
-        self.assertEqual(total, 1)
-
-        new_default = models.Sequencer.default()
-        self.assertEqual(old_default, new_default)
-
-
-class ExperimentsTestCases(TestCase):
-    fixtures = ['test_flowcells.json',
-                ]
-
-    def setUp(self):
-        self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
-        settings.RESULT_HOME_DIR = self.tempdir
-
-        self.fc1_id = 'FC12150'
-        self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
-        os.mkdir(self.fc1_root)
-        self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
-        os.mkdir(self.fc1_dir)
-        runxml = 'run_FC12150_2007-09-27.xml'
-        shutil.copy(os.path.join(TESTDATA_DIR, runxml),
-                    os.path.join(self.fc1_dir, runxml))
-        for i in range(1,9):
-            shutil.copy(
-                os.path.join(TESTDATA_DIR,
-                             'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
-                os.path.join(self.fc1_dir,
-                             'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
-                )
-
-        self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
-        os.mkdir(self.fc2_dir)
-        os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
-        os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
-        os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
-
-    def tearDown(self):
-        shutil.rmtree(self.tempdir)
-
-    def test_flowcell_information(self):
-        """
-        Check the code that packs the django objects into simple types.
-        """
-        for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
-            fc_dict = experiments.flowcell_information(fc_id)
-            fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
-            self.assertEqual(fc_dict['flowcell_id'], fc_id)
-            self.assertEqual(fc_django.flowcell_id, fc_id)
-            self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
-            self.assertEqual(fc_dict['read_length'], fc_django.read_length)
-            self.assertEqual(fc_dict['notes'], fc_django.notes)
-            self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
-
-            for lane in fc_django.lane_set.all():
-                lane_contents = fc_dict['lane_set'][lane.lane_number]
-                lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
-                self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
-                self.assertEqual(lane_dict['comment'], lane.comment)
-                self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
-                self.assertEqual(lane_dict['lane_number'], lane.lane_number)
-                self.assertEqual(lane_dict['library_name'], lane.library.library_name)
-                self.assertEqual(lane_dict['library_id'], lane.library.id)
-                self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
-                self.assertEqual(lane_dict['library_species'],
-                                     lane.library.library_species.scientific_name)
-
-            response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
-            # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
-            fc_json = json.loads(response.content)
-            self.assertEqual(fc_json['flowcell_id'], fc_id)
-            self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
-            self.assertEqual(fc_json['read_length'], fc_django.read_length)
-            self.assertEqual(fc_json['notes'], fc_django.notes)
-            self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
-
-
-            for lane in fc_django.lane_set.all():
-                lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
-                lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
-
-                self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
-                self.assertEqual(lane_dict['comment'], lane.comment)
-                self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
-                self.assertEqual(lane_dict['lane_number'], lane.lane_number)
-                self.assertEqual(lane_dict['library_name'], lane.library.library_name)
-                self.assertEqual(lane_dict['library_id'], lane.library.id)
-                self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
-                self.assertEqual(lane_dict['library_species'],
-                                     lane.library.library_species.scientific_name)
-
-    def test_invalid_flowcell(self):
-        """
-        Make sure we get a 404 if we request an invalid flowcell ID
-        """
-        response = self.client.get('/experiments/config/nottheone/json', apidata)
-        self.assertEqual(response.status_code, 404)
-
-    def test_no_key(self):
-        """
-        Require logging in to retrieve meta data
-        """
-        response = self.client.get(u'/experiments/config/FC12150/json')
-        self.assertEqual(response.status_code, 403)
-
-    def test_library_id(self):
-        """
-        Library IDs should be flexible, so make sure we can retrive a non-numeric ID
-        """
-        response = self.client.get('/experiments/config/FC12150/json', apidata)
-        self.assertEqual(response.status_code, 200)
-        flowcell = json.loads(response.content)
-
-        lane_contents = flowcell['lane_set']['3']
-        lane_library = lane_contents[0]
-        self.assertEqual(lane_library['library_id'], 'SL039')
-
-        response = self.client.get('/samples/library/SL039/json', apidata)
-        self.assertEqual(response.status_code, 200)
-        library_sl039 = json.loads(response.content)
-
-        self.assertEqual(library_sl039['library_id'], 'SL039')
-
-    def test_raw_id_field(self):
-        """
-        Test ticket:147
-
-        Library's have IDs, libraries also have primary keys,
-        we eventually had enough libraries that the drop down combo box was too
-        hard to filter through, unfortnately we want a field that uses our library
-        id and not the internal primary key, and raw_id_field uses primary keys.
-
-        This tests to make sure that the value entered in the raw library id field matches
-        the library id looked up.
-        """
-        expected_ids = [u'10981',u'11016',u'SL039',u'11060',
-                        u'11061',u'11062',u'11063',u'11064']
-        self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
-        response = self.client.get('/admin/experiments/flowcell/153/')
-
-        tree = fromstring(response.content)
-        for i in range(0,8):
-            xpath_expression = '//input[@id="id_lane_set-%d-library"]'
-            input_field = tree.xpath(xpath_expression % (i,))[0]
-            library_field = input_field.find('../strong')
-            library_id, library_name = library_field.text.split(':')
-            # strip leading '#' sign from name
-            library_id = library_id[1:]
-            self.assertEqual(library_id, expected_ids[i])
-            self.assertEqual(input_field.attrib['value'], library_id)
-
-    def test_library_to_flowcell_link(self):
-        """
-        Make sure the library page includes links to the flowcell pages.
-        That work with flowcell IDs that have parenthetical comments.
-        """
-        self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
-        response = self.client.get('/library/11070/')
-        self.assertEqual(response.status_code, 200)
-        status = validate_xhtml(response.content)
-        if status is not None: self.assertTrue(status)
-
-        tree = fromstring(response.content)
-        flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
-                                    namespaces=NSMAP)
-        self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)')
-        failed_fc_span = flowcell_spans[0]
-        failed_fc_a = failed_fc_span.getparent()
-        # make sure some of our RDF made it.
-        self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
-        self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
-        fc_response = self.client.get(failed_fc_a.get('href'))
-        self.assertEqual(fc_response.status_code, 200)
-        status = validate_xhtml(response.content)
-        if status is not None: self.assertTrue(status)
-
-        fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
-        self.assertEqual(fc_lane_response.status_code, 200)
-        status = validate_xhtml(response.content)
-        if status is not None: self.assertTrue(status)
-
-
-    def test_pooled_multiplex_id(self):
-        fc_dict = experiments.flowcell_information('42JU1AAXX')
-        lane_contents = fc_dict['lane_set'][3]
-        self.assertEqual(len(lane_contents), 2)
-        lane_dict = multi_lane_to_dict(lane_contents)
-
-        self.assertEqual(lane_dict['12044']['index_sequence'],
-                         {u'1': u'ATCACG',
-                          u'2': u'CGATGT',
-                          u'3': u'TTAGGC'})
-        self.assertEqual(lane_dict['11045']['index_sequence'],
-                         {u'1': u'ATCACG'})
-
-
-
-    def test_lanes_for(self):
-        """
-        Check the code that packs the django objects into simple types.
-        """
-        user = 'test'
-        lanes = experiments.lanes_for(user)
-        self.assertEqual(len(lanes), 5)
-
-        response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
-        lanes_json = json.loads(response.content)
-        self.assertEqual(len(lanes), len(lanes_json))
-        for i in range(len(lanes)):
-            self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
-            self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
-            self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
-            self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
-
-    def test_lanes_for_no_lanes(self):
-        """
-        Do we get something meaningful back when the user isn't attached to anything?
-        """
-        user = 'supertest'
-        lanes = experiments.lanes_for(user)
-        self.assertEqual(len(lanes), 0)
-
-        response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
-        lanes_json = json.loads(response.content)
-
-    def test_lanes_for_no_user(self):
-        """
-        Do we get something meaningful back when its the wrong user
-        """
-        user = 'not a real user'
-        self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
-
-        response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
-        self.assertEqual(response.status_code, 404)
-
-
-    def test_raw_data_dir(self):
-        """Raw data path generator check"""
-        flowcell_id = self.fc1_id
-        raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
-
-        fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
-        self.assertEqual(fc.get_raw_data_directory(), raw_dir)
-
-        fc.flowcell_id = flowcell_id + " (failed)"
-        self.assertEqual(fc.get_raw_data_directory(), raw_dir)
-
-
-    def test_data_run_import(self):
-        srf_file_type = models.FileType.objects.get(name='SRF')
-        runxml_file_type = models.FileType.objects.get(name='run_xml')
-        flowcell_id = self.fc1_id
-        flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
-        flowcell.update_data_runs()
-        self.assertEqual(len(flowcell.datarun_set.all()), 1)
-
-        run = flowcell.datarun_set.all()[0]
-        result_files = run.datafile_set.all()
-        result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
-
-        srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
-        self.assertEqual(srf4.file_type, srf_file_type)
-        self.assertEqual(srf4.library_id, '11060')
-        self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
-        self.assertEqual(
-            srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
-            '11060')
-        self.assertEqual(
-            srf4.pathname,
-            os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
-
-        lane_files = run.lane_files()
-        self.assertEqual(lane_files[4]['srf'], srf4)
-
-        runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
-        self.assertEqual(runxml.file_type, runxml_file_type)
-        self.assertEqual(runxml.library_id, None)
-
-        import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
-        # what happens if we import twice?
-        flowcell.import_data_run('FC12150/C1-37',
-                                 'run_FC12150_2007-09-27.xml')
-        self.assertEqual(
-            len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
-            import1)
-
-    def test_read_result_file(self):
-        """make sure we can return a result file
-        """
-        flowcell_id = self.fc1_id
-        flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
-        flowcell.update_data_runs()
-
-        #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
-
-        result_files = flowcell.datarun_set.all()[0].datafile_set.all()
-        for f in result_files:
-            url = '/experiments/file/%s' % ( f.random_key,)
-            response = self.client.get(url)
-            self.assertEqual(response.status_code, 200)
-            mimetype = f.file_type.mimetype
-            if mimetype is None:
-                mimetype = 'application/octet-stream'
-
-            self.assertEqual(mimetype, response['content-type'])
-
-    def test_flowcell_rdf(self):
-        import RDF
-        from htsworkflow.util.rdfhelp import get_model, \
-             fromTypedNode, \
-             load_string_into_model, \
-             rdfNS, \
-             libraryOntology, \
-             dump_model
-
-        model = get_model()
-
-        expected = {'1': ['11034'],
-                    '2': ['11036'],
-                    '3': ['12044','11045'],
-                    '4': ['11047','13044'],
-                    '5': ['11055'],
-                    '6': ['11067'],
-                    '7': ['11069'],
-                    '8': ['11070']}
-        url = '/flowcell/42JU1AAXX/'
-        response = self.client.get(url)
-        self.assertEqual(response.status_code, 200)
-        status = validate_xhtml(response.content)
-        if status is not None: self.assertTrue(status)
-
-        ns = urljoin('http://localhost', url)
-        load_string_into_model(model, 'rdfa', response.content, ns=ns)
-        body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
-        prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
-
-        select ?flowcell ?flowcell_id ?lane_id ?library_id
-        where {
-          ?flowcell a libns:IlluminaFlowcell ;
-                    libns:flowcell_id ?flowcell_id ;
-                    libns:has_lane ?lane .
-          ?lane libns:lane_number ?lane_id ;
-                libns:library ?library .
-          ?library libns:library_id ?library_id .
-        }"""
-        query = RDF.SPARQLQuery(body)
-        count = 0
-        for r in query.execute(model):
-            count += 1
-            self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
-            lane_id = fromTypedNode(r['lane_id'])
-            library_id = fromTypedNode(r['library_id'])
-            self.assertTrue(library_id in expected[lane_id])
-        self.assertEqual(count, 10)
-
-
-class TestFileType(TestCase):
-    def test_file_type_unicode(self):
-        file_type_objects = models.FileType.objects
-        name = 'QSEQ tarfile'
-        file_type_object = file_type_objects.get(name=name)
-        self.assertEqual(u"<FileType: QSEQ tarfile>",
-                             unicode(file_type_object))
-
-class TestFileType(TestCase):
-    def test_find_file_type(self):
-        file_type_objects = models.FileType.objects
-        cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
-                  'QSEQ tarfile', 7, 1),
-                 ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
-                  'SRF', 1, None),
-                 ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
-                 ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
-                 ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
-                 ('s_1_export.txt.bz2','ELAND Export', 1, None),
-                 ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
-                 ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
-                 ('s_3_percent_all.png', 'IVC Percent All', 3, None),
-                 ('s_4_call.png', 'IVC Call', 4, None),
-                 ('s_5_all.png', 'IVC All', 5, None),
-                 ('Summary.htm', 'Summary.htm', None, None),
-                 ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
-         ]
-        for filename, typename, lane, end in cases:
-            ft = models.find_file_type_metadata_from_filename(filename)
-            self.assertEqual(ft['file_type'],
-                                 file_type_objects.get(name=typename))
-            self.assertEqual(ft.get('lane', None), lane)
-            self.assertEqual(ft.get('end', None), end)
-
-    def test_assign_file_type_complex_path(self):
-        file_type_objects = models.FileType.objects
-        cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
-                  'QSEQ tarfile', 7, 1),
-                 ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
-                  'SRF', 1, None),
-                 ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
-                 ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
-                 ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
-                 ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
-                 ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
-                 ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
-                 ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
-                 ('amonkey/s_4_call.png', 'IVC Call', 4, None),
-                 ('fishie/s_5_all.png', 'IVC All', 5, None),
-                 ('/random/Summary.htm', 'Summary.htm', None, None),
-                 ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
-         ]
-        for filename, typename, lane, end in cases:
-            result = models.find_file_type_metadata_from_filename(filename)
-            self.assertEqual(result['file_type'],
-                                 file_type_objects.get(name=typename))
-            self.assertEqual(result.get('lane',None), lane)
-            self.assertEqual(result.get('end', None), end)
-
-class TestEmailNotify(TestCase):
-    fixtures = ['test_flowcells.json']
-
-    def test_started_email_not_logged_in(self):
-        response = self.client.get('/experiments/started/153/')
-        self.assertEqual(response.status_code, 302)
-
-    def test_started_email_logged_in_user(self):
-        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
-        response = self.client.get('/experiments/started/153/')
-        self.assertEqual(response.status_code, 302)
-
-    def test_started_email_logged_in_staff(self):
-        self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
-        response = self.client.get('/experiments/started/153/')
-        self.assertEqual(response.status_code, 200)
-
-    def test_started_email_send(self):
-        self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
-        response = self.client.get('/experiments/started/153/')
-        self.assertEqual(response.status_code, 200)
-
-        self.assertTrue('pk1@example.com' in response.content)
-        self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
-
-        response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
-        self.assertEqual(response.status_code, 200)
-        self.assertEqual(len(mail.outbox), 4)
-        bcc = set(settings.NOTIFICATION_BCC).intersect(set(settings.MANAGERS))
-        for m in mail.outbox:
-            self.assertTrue(len(m.body) > 0)
-            self.assertEqual(m.bcc, bcc)
-
-    def test_email_navigation(self):
-        """
-        Can we navigate between the flowcell and email forms properly?
-        """
-        self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
-        response = self.client.get('/experiments/started/153/')
-        self.assertEqual(response.status_code, 200)
-        self.assertTrue(re.search('Flowcell FC12150', response.content))
-        # require that navigation back to the admin page exists
-        self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
-
-def multi_lane_to_dict(lane):
-    """Convert a list of lane entries into a dictionary indexed by library ID
-    """
-    return dict( ((x['library_id'],x) for x in lane) )
-
-class TestSequencer(TestCase):
-    fixtures = ['test_flowcells.json',
-                ]
-
-    def test_name_generation(self):
-        seq = models.Sequencer()
-        seq.name = "Seq1"
-        seq.instrument_name = "HWI-SEQ1"
-        seq.model = "Imaginary 5000"
-
-        self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
-
-    def test_lookup(self):
-        fc = models.FlowCell.objects.get(pk=153)
-        self.assertEqual(fc.sequencer.model,
-                             "Illumina Genome Analyzer IIx")
-        self.assertEqual(fc.sequencer.instrument_name,
-                             "ILLUMINA-EC5D15")
-        # well actually we let the browser tack on the host name
-        url = fc.get_absolute_url()
-        self.assertEqual(url, '/flowcell/FC12150/')
-
-    def test_rdf(self):
-        response = self.client.get('/flowcell/FC12150/', apidata)
-        tree = fromstring(response.content)
-        seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
-                            namespaces=NSMAP)
-        self.assertEqual(len(seq_by), 1)
-        self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
-        seq = seq_by[0].getchildren()
-        self.assertEqual(len(seq), 1)
-        self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
-        self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
-
-        name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
-        self.assertEqual(len(name), 1)
-        self.assertEqual(name[0].text, 'Tardigrade')
-        instrument = seq[0].xpath(
-            './span[@property="libns:sequencer_instrument"]')
-        self.assertEqual(len(instrument), 1)
-        self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
-        model = seq[0].xpath(
-            './span[@property="libns:sequencer_model"]')
-        self.assertEqual(len(model), 1)
-        self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
-
-    def test_flowcell_with_rdf_validation(self):
-        from htsworkflow.util.rdfhelp import add_default_schemas, \
-             dump_model, \
-             get_model, \
-             load_string_into_model
-        from htsworkflow.util.rdfinfer import Infer
-
-        model = get_model()
-        add_default_schemas(model)
-        inference = Infer(model)
-
-        url ='/flowcell/FC12150/'
-        response = self.client.get(url)
-        self.assertEqual(response.status_code, 200)
-        status = validate_xhtml(response.content)
-        if status is not None: self.assertTrue(status)
-
-        load_string_into_model(model, 'rdfa', response.content)
-
-        errmsgs = list(inference.run_validation())
-        self.assertEqual(len(errmsgs), 2)
-        for errmsg in errmsgs:
-            self.assertEqual(errmsg, 'Missing type for: http://localhost/')
-
-    def test_lane_with_rdf_validation(self):
-        from htsworkflow.util.rdfhelp import add_default_schemas, \
-             dump_model, \
-             get_model, \
-             load_string_into_model
-        from htsworkflow.util.rdfinfer import Infer
-
-        model = get_model()
-        add_default_schemas(model)
-        inference = Infer(model)
-
-        url = '/lane/1193'
-        response = self.client.get(url)
-        self.assertEqual(response.status_code, 200)
-        status = validate_xhtml(response.content)
-        if status is not None: self.assertTrue(status)
-
-        load_string_into_model(model, 'rdfa', response.content)
-
-        errmsgs = list(inference.run_validation())
-        self.assertEqual(len(errmsgs), 2)
-        for errmsg in errmsgs:
-            self.assertEqual(errmsg, 'Missing type for: http://localhost/')
diff --git a/htsworkflow/frontend/inventory/test_inventory.py b/htsworkflow/frontend/inventory/test_inventory.py
new file mode 100644 (file)
index 0000000..024a334
--- /dev/null
@@ -0,0 +1,117 @@
+import RDF
+import unittest
+
+from django.test import TestCase
+from django.contrib.auth.models import User
+from django.core import urlresolvers
+
+from htsworkflow.frontend.inventory.models import Item, Vendor
+from htsworkflow.util.rdfhelp import get_model, load_string_into_model, get_serializer, inventoryOntology, libraryOntology, fromTypedNode
+
+def localhostNode(url):
+    return RDF.Node(RDF.Uri('http://localhost%s' % (url,)))
+
+class InventoryTestCase(TestCase):
+    fixtures = ['test_user', 'test_harddisks']
+
+    def test_fixture(self):
+        # make sure that some of our test data is was loaded
+        # since there was no error message when I typoed the test fixture
+        hd1 = Item.objects.get(pk=1)
+        self.failUnlessEqual(hd1.uuid, '8a90b6ce522311de99b00015172ce556')
+
+        user = User.objects.get(pk=5)
+        self.failUnlessEqual(user.username, 'test')
+
+    def test_item(self):
+        url = '/inventory/8a90b6ce522311de99b00015172ce556/'
+        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
+        response = self.client.get(url)
+        self.failUnlessEqual(response.status_code, 200)
+
+        model = get_model()
+        load_string_into_model(model, 'rdfa', response.content, url)
+
+        itemNode = RDF.Node(RDF.Uri(url))
+        item_type = fromTypedNode(model.get_target(itemNode, inventoryOntology['item_type']))
+        self.failUnlessEqual(item_type, u'Hard Drive')
+
+    def test_itemindex(self):
+        url = '/inventory/it/Hard Drive/'
+        indexNode = localhostNode(url)
+        diskNode = localhostNode('/inventory/8a90b6ce522311de99b00015172ce556/')
+        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
+
+        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
+        self.failUnlessEqual(len(flowcells), 2)
+        self.failUnless('http://localhost/flowcell/11ONEAAXX/' in flowcells)
+        self.failUnless('http://localhost/flowcell/22TWOAAXX/' in flowcells)
+
+    def test_add_disk(self):
+        url = '/inventory/it/Hard Drive/'
+        #url_disk = '/inventory/8a90b6ce522311de99b00015172ce556/'
+        url_disk = '/inventory/b0792d425aa411de99b00015172ce556/'
+        indexNode = localhostNode(url)
+        diskNode = localhostNode(url_disk)
+        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
+
+        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
+        self.failUnlessEqual(len(flowcells), 0)
+
+        # step two link the flowcell
+        flowcell = '22TWOAAXX'
+        serial = 'WCAU49042470'
+        link_url = urlresolvers.reverse(
+                'htsworkflow.frontend.inventory.views.link_flowcell_and_device',
+                args=(flowcell, serial))
+        link_response = self.client.get(link_url)
+        self.failUnlessEqual(link_response.status_code, 200)
+
+        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
+        self.failUnlessEqual(len(flowcells), 1)
+        self.failUnlessEqual('http://localhost/flowcell/%s/' % (flowcell),
+                             flowcells[0])
+
+    def test_add_disk_failed_flowcell(self):
+        url = '/inventory/it/Hard Drive/'
+        #url_disk = '/inventory/8a90b6ce522311de99b00015172ce556/'
+        url_disk = '/inventory/b0792d425aa411de99b00015172ce556/'
+        indexNode = localhostNode(url)
+        diskNode = localhostNode(url_disk)
+        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
+
+        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
+        self.failUnlessEqual(len(flowcells), 0)
+
+        # step two link the flowcell
+        flowcell = '33THRAAXX'
+        serial = 'WCAU49042470'
+        link_url = urlresolvers.reverse(
+                'htsworkflow.frontend.inventory.views.link_flowcell_and_device',
+                args=(flowcell, serial))
+        link_response = self.client.get(link_url)
+        self.failUnlessEqual(link_response.status_code, 200)
+
+        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
+        self.failUnlessEqual(len(flowcells), 1)
+        self.failUnlessEqual('http://localhost/flowcell/%s/' % (flowcell),
+                             flowcells[0])
+
+
+    def get_flowcells_from_content(self, url, rootNode, diskNode):
+        model = get_model()
+
+        response = self.client.get(url)
+        self.failUnlessEqual(response.status_code, 200)
+
+        load_string_into_model(model, 'rdfa', response.content, rootNode.uri)
+        targets = model.get_targets(diskNode, libraryOntology['flowcell_id'])
+        flowcells = [ str(x.uri) for x in targets]
+        return flowcells
+
+def suite():
+    return unittest.makeSuite(InventoryTestCase, 'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest="suite")
+
diff --git a/htsworkflow/frontend/inventory/tests.py b/htsworkflow/frontend/inventory/tests.py
deleted file mode 100644 (file)
index 024a334..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-import RDF
-import unittest
-
-from django.test import TestCase
-from django.contrib.auth.models import User
-from django.core import urlresolvers
-
-from htsworkflow.frontend.inventory.models import Item, Vendor
-from htsworkflow.util.rdfhelp import get_model, load_string_into_model, get_serializer, inventoryOntology, libraryOntology, fromTypedNode
-
-def localhostNode(url):
-    return RDF.Node(RDF.Uri('http://localhost%s' % (url,)))
-
-class InventoryTestCase(TestCase):
-    fixtures = ['test_user', 'test_harddisks']
-
-    def test_fixture(self):
-        # make sure that some of our test data is was loaded
-        # since there was no error message when I typoed the test fixture
-        hd1 = Item.objects.get(pk=1)
-        self.failUnlessEqual(hd1.uuid, '8a90b6ce522311de99b00015172ce556')
-
-        user = User.objects.get(pk=5)
-        self.failUnlessEqual(user.username, 'test')
-
-    def test_item(self):
-        url = '/inventory/8a90b6ce522311de99b00015172ce556/'
-        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
-        response = self.client.get(url)
-        self.failUnlessEqual(response.status_code, 200)
-
-        model = get_model()
-        load_string_into_model(model, 'rdfa', response.content, url)
-
-        itemNode = RDF.Node(RDF.Uri(url))
-        item_type = fromTypedNode(model.get_target(itemNode, inventoryOntology['item_type']))
-        self.failUnlessEqual(item_type, u'Hard Drive')
-
-    def test_itemindex(self):
-        url = '/inventory/it/Hard Drive/'
-        indexNode = localhostNode(url)
-        diskNode = localhostNode('/inventory/8a90b6ce522311de99b00015172ce556/')
-        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
-
-        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
-        self.failUnlessEqual(len(flowcells), 2)
-        self.failUnless('http://localhost/flowcell/11ONEAAXX/' in flowcells)
-        self.failUnless('http://localhost/flowcell/22TWOAAXX/' in flowcells)
-
-    def test_add_disk(self):
-        url = '/inventory/it/Hard Drive/'
-        #url_disk = '/inventory/8a90b6ce522311de99b00015172ce556/'
-        url_disk = '/inventory/b0792d425aa411de99b00015172ce556/'
-        indexNode = localhostNode(url)
-        diskNode = localhostNode(url_disk)
-        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
-
-        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
-        self.failUnlessEqual(len(flowcells), 0)
-
-        # step two link the flowcell
-        flowcell = '22TWOAAXX'
-        serial = 'WCAU49042470'
-        link_url = urlresolvers.reverse(
-                'htsworkflow.frontend.inventory.views.link_flowcell_and_device',
-                args=(flowcell, serial))
-        link_response = self.client.get(link_url)
-        self.failUnlessEqual(link_response.status_code, 200)
-
-        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
-        self.failUnlessEqual(len(flowcells), 1)
-        self.failUnlessEqual('http://localhost/flowcell/%s/' % (flowcell),
-                             flowcells[0])
-
-    def test_add_disk_failed_flowcell(self):
-        url = '/inventory/it/Hard Drive/'
-        #url_disk = '/inventory/8a90b6ce522311de99b00015172ce556/'
-        url_disk = '/inventory/b0792d425aa411de99b00015172ce556/'
-        indexNode = localhostNode(url)
-        diskNode = localhostNode(url_disk)
-        self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
-
-        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
-        self.failUnlessEqual(len(flowcells), 0)
-
-        # step two link the flowcell
-        flowcell = '33THRAAXX'
-        serial = 'WCAU49042470'
-        link_url = urlresolvers.reverse(
-                'htsworkflow.frontend.inventory.views.link_flowcell_and_device',
-                args=(flowcell, serial))
-        link_response = self.client.get(link_url)
-        self.failUnlessEqual(link_response.status_code, 200)
-
-        flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
-        self.failUnlessEqual(len(flowcells), 1)
-        self.failUnlessEqual('http://localhost/flowcell/%s/' % (flowcell),
-                             flowcells[0])
-
-
-    def get_flowcells_from_content(self, url, rootNode, diskNode):
-        model = get_model()
-
-        response = self.client.get(url)
-        self.failUnlessEqual(response.status_code, 200)
-
-        load_string_into_model(model, 'rdfa', response.content, rootNode.uri)
-        targets = model.get_targets(diskNode, libraryOntology['flowcell_id'])
-        flowcells = [ str(x.uri) for x in targets]
-        return flowcells
-
-def suite():
-    return unittest.makeSuite(InventoryTestCase, 'test')
-
-if __name__ == "__main__":
-    unittest.main(defaultTest="suite")
-
diff --git a/htsworkflow/frontend/labels/test_labels.py b/htsworkflow/frontend/labels/test_labels.py
new file mode 100644 (file)
index 0000000..2247054
--- /dev/null
@@ -0,0 +1,23 @@
+"""
+This file demonstrates two different styles of tests (one doctest and one
+unittest). These will both pass when you run "manage.py test".
+
+Replace these with more appropriate tests for your application.
+"""
+
+from django.test import TestCase
+
+class SimpleTest(TestCase):
+    def test_basic_addition(self):
+        """
+        Tests that 1 + 1 always equals 2.
+        """
+        self.failUnlessEqual(1 + 1, 2)
+
+__test__ = {"doctest": """
+Another way to test that 1 + 1 is equal to 2.
+
+>>> 1 + 1 == 2
+True
+"""}
+
diff --git a/htsworkflow/frontend/labels/tests.py b/htsworkflow/frontend/labels/tests.py
deleted file mode 100644 (file)
index 2247054..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-"""
-This file demonstrates two different styles of tests (one doctest and one
-unittest). These will both pass when you run "manage.py test".
-
-Replace these with more appropriate tests for your application.
-"""
-
-from django.test import TestCase
-
-class SimpleTest(TestCase):
-    def test_basic_addition(self):
-        """
-        Tests that 1 + 1 always equals 2.
-        """
-        self.failUnlessEqual(1 + 1, 2)
-
-__test__ = {"doctest": """
-Another way to test that 1 + 1 is equal to 2.
-
->>> 1 + 1 == 2
-True
-"""}
-
diff --git a/htsworkflow/frontend/samples/test_samples.py b/htsworkflow/frontend/samples/test_samples.py
new file mode 100644 (file)
index 0000000..4e0c2a3
--- /dev/null
@@ -0,0 +1,333 @@
+import datetime
+import unittest
+
+try:
+    import json
+except ImportError, e:
+    import simplejson as json
+
+from django.test import TestCase
+
+from htsworkflow.frontend.samples.models import \
+        Affiliation, \
+        ExperimentType, \
+        Species, \
+        Library
+
+from htsworkflow.frontend.samples.views import \
+     library_dict, \
+     library_json
+
+from htsworkflow.frontend.auth import apidata
+from htsworkflow.util.conversion import unicode_or_none
+from htsworkflow.util.ethelp import validate_xhtml
+
+class LibraryTestCase(TestCase):
+    fixtures = ['test_samples.json']
+
+    def setUp(self):
+        create_db(self)
+
+    def testOrganism(self):
+        self.assertEquals(self.library_10001.organism(), 'human')
+
+    def testAffiliations(self):
+        self.library_10001.affiliations.add(self.affiliation_alice)
+        self.library_10002.affiliations.add(
+                self.affiliation_alice,
+                self.affiliation_bob
+        )
+        self.failUnless(len(self.library_10001.affiliations.all()), 1)
+        self.failUnless(self.library_10001.affiliation(), 'Alice')
+
+        self.failUnless(len(self.library_10002.affiliations.all()), 2)
+        self.failUnless(self.library_10001.affiliation(), 'Alice, Bob')
+
+
+class SampleWebTestCase(TestCase):
+    """
+    Test returning data from our database in rest like ways.
+    (like returning json objects)
+    """
+    fixtures = ['test_samples.json']
+
+    def test_library_info(self):
+        for lib in Library.objects.all():
+            lib_dict = library_dict(lib.id)
+            url = '/samples/library/%s/json' % (lib.id,)
+            lib_response = self.client.get(url, apidata)
+            self.failUnlessEqual(lib_response.status_code, 200)
+            lib_json = json.loads(lib_response.content)
+
+            for d in [lib_dict, lib_json]:
+                # amplified_from_sample is a link to the library table,
+                # I want to use the "id" for the data lookups not
+                # the embedded primary key.
+                # It gets slightly confusing on how to implement sending the right id
+                # since amplified_from_sample can be null
+                #self.failUnlessEqual(d['amplified_from_sample'], lib.amplified_from_sample)
+                self.failUnlessEqual(d['antibody_id'], lib.antibody_id)
+                self.failUnlessEqual(d['cell_line_id'], lib.cell_line_id)
+                self.failUnlessEqual(d['cell_line'], unicode_or_none(lib.cell_line))
+                self.failUnlessEqual(d['experiment_type'], lib.experiment_type.name)
+                self.failUnlessEqual(d['experiment_type_id'], lib.experiment_type_id)
+                self.failUnlessEqual(d['gel_cut_size'], lib.gel_cut_size)
+                self.failUnlessEqual(d['hidden'], lib.hidden)
+                self.failUnlessEqual(d['id'], lib.id)
+                self.failUnlessEqual(d['insert_size'], lib.insert_size)
+                self.failUnlessEqual(d['library_name'], lib.library_name)
+                self.failUnlessEqual(d['library_species'], lib.library_species.scientific_name)
+                self.failUnlessEqual(d['library_species_id'], lib.library_species_id)
+                self.failUnlessEqual(d['library_type_id'], lib.library_type_id)
+                if lib.library_type_id is not None:
+                    self.failUnlessEqual(d['library_type'], lib.library_type.name)
+                else:
+                    self.failUnlessEqual(d['library_type'], None)
+                    self.failUnlessEqual(d['made_for'], lib.made_for)
+                    self.failUnlessEqual(d['made_by'], lib.made_by)
+                    self.failUnlessEqual(d['notes'], lib.notes)
+                    self.failUnlessEqual(d['replicate'], lib.replicate)
+                    self.failUnlessEqual(d['stopping_point'], lib.stopping_point)
+                    self.failUnlessEqual(d['successful_pM'], lib.successful_pM)
+                    self.failUnlessEqual(d['undiluted_concentration'],
+                                         unicode(lib.undiluted_concentration))
+                # some specific tests
+                if lib.id == '10981':
+                    # test a case where there is no known status
+                    lane_set = {u'status': u'Unknown',
+                                u'paired_end': True,
+                                u'read_length': 75,
+                                u'lane_number': 1,
+                                u'lane_id': 1193,
+                                u'flowcell': u'303TUAAXX',
+                                u'status_code': None}
+                    self.failUnlessEqual(len(d['lane_set']), 1)
+                    self.failUnlessEqual(d['lane_set'][0], lane_set)
+                elif lib.id == '11016':
+                    # test a case where there is a status
+                    lane_set = {u'status': 'Good',
+                                u'paired_end': True,
+                                u'read_length': 75,
+                                u'lane_number': 5,
+                                u'lane_id': 1197,
+                                u'flowcell': u'303TUAAXX',
+                                u'status_code': 2}
+                    self.failUnlessEqual(len(d['lane_set']), 1)
+                    self.failUnlessEqual(d['lane_set'][0], lane_set)
+
+
+    def test_invalid_library_json(self):
+        """
+        Make sure we get a 404 if we request an invalid library id
+        """
+        response = self.client.get('/samples/library/nottheone/json', apidata)
+        self.failUnlessEqual(response.status_code, 404)
+
+
+    def test_invalid_library(self):
+        response = self.client.get('/library/nottheone/')
+        self.failUnlessEqual(response.status_code, 404)
+
+
+    def test_library_no_key(self):
+        """
+        Make sure we get a 302 if we're not logged in
+        """
+        response = self.client.get('/samples/library/10981/json')
+        self.failUnlessEqual(response.status_code, 403)
+        response = self.client.get('/samples/library/10981/json', apidata)
+        self.failUnlessEqual(response.status_code, 200)
+
+    def test_library_rdf(self):
+        import RDF
+        from htsworkflow.util.rdfhelp import get_model, \
+             dump_model, \
+             fromTypedNode, \
+             load_string_into_model, \
+             rdfNS, \
+             libraryOntology
+        model = get_model()
+
+        response = self.client.get('/library/10981/')
+        self.assertEqual(response.status_code, 200)
+        content = response.content
+        load_string_into_model(model, 'rdfa', content)
+
+        body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+        prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+
+        select ?library ?name ?library_id ?gel_cut ?made_by
+        where {
+           ?library a libns:library ;
+                    libns:name ?name ;
+                    libns:library_id ?library_id ;
+                    libns:gel_cut ?gel_cut ;
+                    libns:made_by ?made_by
+        }"""
+        query = RDF.SPARQLQuery(body)
+        for r in query.execute(model):
+            self.assertEqual(fromTypedNode(r['library_id']), u'10981')
+            self.assertEqual(fromTypedNode(r['name']),
+                             u'Paired End Multiplexed Sp-BAC')
+            self.assertEqual(fromTypedNode(r['gel_cut']), 400)
+            self.assertEqual(fromTypedNode(r['made_by']), u'Igor')
+
+        state = validate_xhtml(content)
+        if state is not None:
+            self.assertTrue(state)
+
+        # validate a library page.
+        from htsworkflow.util.rdfhelp import add_default_schemas
+        from htsworkflow.util.rdfinfer import Infer
+        add_default_schemas(model)
+        inference = Infer(model)
+        ignored = {'Missing type for: http://localhost/'}
+        errmsgs = [msg for msg in inference.run_validation()
+                   if msg not in ignored ]
+        self.assertEqual(len(errmsgs), 0)
+
+    def test_library_index_rdfa(self):
+        from htsworkflow.util.rdfhelp import \
+             add_default_schemas, get_model, load_string_into_model, \
+             dump_model
+        from htsworkflow.util.rdfinfer import Infer
+
+        model = get_model()
+        add_default_schemas(model)
+        inference = Infer(model)
+
+        response = self.client.get('/library/')
+        self.assertEqual(response.status_code, 200)
+        load_string_into_model(model, 'rdfa', response.content)
+
+        ignored = {'Missing type for: http://localhost/'}
+        errmsgs = [msg for msg in inference.run_validation()
+                   if msg not in ignored ]
+        self.assertEqual(len(errmsgs), 0)
+
+        body =  """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+        prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+
+        select ?library ?library_id ?name ?species_name
+        where {
+           ?library a libns:Library .
+           OPTIONAL { ?library libns:library_id ?library_id . }
+           OPTIONAL { ?library libns:species_name ?species_name . }
+           OPTIONAL { ?library libns:name ?name . }
+        }"""
+        bindings = set(['library', 'library_id', 'name', 'species_name'])
+        query = RDF.SPARQLQuery(body)
+        count = 0
+        for r in query.execute(model):
+            count += 1
+            for name, value in r.items():
+                self.assertTrue(name in bindings)
+                self.assertTrue(value is not None)
+
+        self.assertEqual(count, len(Library.objects.filter(hidden=False)))
+
+        state = validate_xhtml(response.content)
+        if state is not None: self.assertTrue(state)
+
+            
+# The django test runner flushes the database between test suites not cases,
+# so to be more compatible with running via nose we flush the database tables
+# of interest before creating our sample data.
+def create_db(obj):
+    obj.species_human = Species.objects.get(pk=8)
+    obj.experiment_rna_seq = ExperimentType.objects.get(pk=4)
+    obj.affiliation_alice = Affiliation.objects.get(pk=1)
+    obj.affiliation_bob = Affiliation.objects.get(pk=2)
+
+    Library.objects.all().delete()
+    obj.library_10001 = Library(
+        id = "10001",
+        library_name = 'C2C12 named poorly',
+        library_species = obj.species_human,
+        experiment_type = obj.experiment_rna_seq,
+        creation_date = datetime.datetime.now(),
+        made_for = 'scientist unit 2007',
+        made_by = 'microfludics system 7321',
+        stopping_point = '2A',
+        undiluted_concentration = '5.01',
+        hidden = False,
+    )
+    obj.library_10001.save()
+    obj.library_10002 = Library(
+        id = "10002",
+        library_name = 'Worm named poorly',
+        library_species = obj.species_human,
+        experiment_type = obj.experiment_rna_seq,
+        creation_date = datetime.datetime.now(),
+        made_for = 'scientist unit 2007',
+        made_by = 'microfludics system 7321',
+        stopping_point = '2A',
+        undiluted_concentration = '5.01',
+        hidden = False,
+    )
+    obj.library_10002.save()
+
+try:
+    import RDF
+    HAVE_RDF = True
+
+    rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#")
+    xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#")
+    libNS = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#")
+except ImportError,e:
+    HAVE_RDF = False
+
+
+class TestRDFaLibrary(TestCase):
+    fixtures = ['test_samples.json']
+
+    def test_parse_rdfa(self):
+        model = get_rdf_memory_model()
+        parser = RDF.Parser(name='rdfa')
+        url = '/library/10981/'
+        lib_response = self.client.get(url)
+        self.failIfEqual(len(lib_response.content), 0)
+
+        parser.parse_string_into_model(model,
+                                       lib_response.content,
+                                       'http://localhost'+url)
+        # http://jumpgate.caltech.edu/wiki/LibraryOntology#affiliation>
+        self.check_literal_object(model, ['Bob'], p=libNS['affiliation'])
+        self.check_literal_object(model, ['Multiplexed'], p=libNS['experiment_type'])
+        self.check_literal_object(model, ['400'], p=libNS['gel_cut'])
+        self.check_literal_object(model, ['Igor'], p=libNS['made_by'])
+        self.check_literal_object(model, ['Paired End Multiplexed Sp-BAC'], p=libNS['name'])
+        self.check_literal_object(model, ['Drosophila melanogaster'], p=libNS['species_name'])
+
+        self.check_uri_object(model,
+                              [u'http://localhost/lane/1193'],
+                              p=libNS['has_lane'])
+
+        fc_uri = RDF.Uri('http://localhost/flowcell/303TUAAXX/')
+        self.check_literal_object(model,
+                                  [u"303TUAAXX"],
+                                  s=fc_uri, p=libNS['flowcell_id'])
+
+    def check_literal_object(self, model, values, s=None, p=None, o=None):
+        statements = list(model.find_statements(
+            RDF.Statement(s,p,o)))
+        self.failUnlessEqual(len(statements), len(values),
+                        "Couln't find %s %s %s" % (s,p,o))
+        for s in statements:
+            self.failUnless(s.object.literal_value['string'] in values)
+
+
+    def check_uri_object(self, model, values, s=None, p=None, o=None):
+        statements = list(model.find_statements(
+            RDF.Statement(s,p,o)))
+        self.failUnlessEqual(len(statements), len(values),
+                        "Couln't find %s %s %s" % (s,p,o))
+        for s in statements:
+            self.failUnless(unicode(s.object.uri) in values)
+
+
+
+def get_rdf_memory_model():
+    storage = RDF.MemoryStorage()
+    model = RDF.Model(storage)
+    return model
diff --git a/htsworkflow/frontend/samples/tests.py b/htsworkflow/frontend/samples/tests.py
deleted file mode 100644 (file)
index 4e0c2a3..0000000
+++ /dev/null
@@ -1,333 +0,0 @@
-import datetime
-import unittest
-
-try:
-    import json
-except ImportError, e:
-    import simplejson as json
-
-from django.test import TestCase
-
-from htsworkflow.frontend.samples.models import \
-        Affiliation, \
-        ExperimentType, \
-        Species, \
-        Library
-
-from htsworkflow.frontend.samples.views import \
-     library_dict, \
-     library_json
-
-from htsworkflow.frontend.auth import apidata
-from htsworkflow.util.conversion import unicode_or_none
-from htsworkflow.util.ethelp import validate_xhtml
-
-class LibraryTestCase(TestCase):
-    fixtures = ['test_samples.json']
-
-    def setUp(self):
-        create_db(self)
-
-    def testOrganism(self):
-        self.assertEquals(self.library_10001.organism(), 'human')
-
-    def testAffiliations(self):
-        self.library_10001.affiliations.add(self.affiliation_alice)
-        self.library_10002.affiliations.add(
-                self.affiliation_alice,
-                self.affiliation_bob
-        )
-        self.failUnless(len(self.library_10001.affiliations.all()), 1)
-        self.failUnless(self.library_10001.affiliation(), 'Alice')
-
-        self.failUnless(len(self.library_10002.affiliations.all()), 2)
-        self.failUnless(self.library_10001.affiliation(), 'Alice, Bob')
-
-
-class SampleWebTestCase(TestCase):
-    """
-    Test returning data from our database in rest like ways.
-    (like returning json objects)
-    """
-    fixtures = ['test_samples.json']
-
-    def test_library_info(self):
-        for lib in Library.objects.all():
-            lib_dict = library_dict(lib.id)
-            url = '/samples/library/%s/json' % (lib.id,)
-            lib_response = self.client.get(url, apidata)
-            self.failUnlessEqual(lib_response.status_code, 200)
-            lib_json = json.loads(lib_response.content)
-
-            for d in [lib_dict, lib_json]:
-                # amplified_from_sample is a link to the library table,
-                # I want to use the "id" for the data lookups not
-                # the embedded primary key.
-                # It gets slightly confusing on how to implement sending the right id
-                # since amplified_from_sample can be null
-                #self.failUnlessEqual(d['amplified_from_sample'], lib.amplified_from_sample)
-                self.failUnlessEqual(d['antibody_id'], lib.antibody_id)
-                self.failUnlessEqual(d['cell_line_id'], lib.cell_line_id)
-                self.failUnlessEqual(d['cell_line'], unicode_or_none(lib.cell_line))
-                self.failUnlessEqual(d['experiment_type'], lib.experiment_type.name)
-                self.failUnlessEqual(d['experiment_type_id'], lib.experiment_type_id)
-                self.failUnlessEqual(d['gel_cut_size'], lib.gel_cut_size)
-                self.failUnlessEqual(d['hidden'], lib.hidden)
-                self.failUnlessEqual(d['id'], lib.id)
-                self.failUnlessEqual(d['insert_size'], lib.insert_size)
-                self.failUnlessEqual(d['library_name'], lib.library_name)
-                self.failUnlessEqual(d['library_species'], lib.library_species.scientific_name)
-                self.failUnlessEqual(d['library_species_id'], lib.library_species_id)
-                self.failUnlessEqual(d['library_type_id'], lib.library_type_id)
-                if lib.library_type_id is not None:
-                    self.failUnlessEqual(d['library_type'], lib.library_type.name)
-                else:
-                    self.failUnlessEqual(d['library_type'], None)
-                    self.failUnlessEqual(d['made_for'], lib.made_for)
-                    self.failUnlessEqual(d['made_by'], lib.made_by)
-                    self.failUnlessEqual(d['notes'], lib.notes)
-                    self.failUnlessEqual(d['replicate'], lib.replicate)
-                    self.failUnlessEqual(d['stopping_point'], lib.stopping_point)
-                    self.failUnlessEqual(d['successful_pM'], lib.successful_pM)
-                    self.failUnlessEqual(d['undiluted_concentration'],
-                                         unicode(lib.undiluted_concentration))
-                # some specific tests
-                if lib.id == '10981':
-                    # test a case where there is no known status
-                    lane_set = {u'status': u'Unknown',
-                                u'paired_end': True,
-                                u'read_length': 75,
-                                u'lane_number': 1,
-                                u'lane_id': 1193,
-                                u'flowcell': u'303TUAAXX',
-                                u'status_code': None}
-                    self.failUnlessEqual(len(d['lane_set']), 1)
-                    self.failUnlessEqual(d['lane_set'][0], lane_set)
-                elif lib.id == '11016':
-                    # test a case where there is a status
-                    lane_set = {u'status': 'Good',
-                                u'paired_end': True,
-                                u'read_length': 75,
-                                u'lane_number': 5,
-                                u'lane_id': 1197,
-                                u'flowcell': u'303TUAAXX',
-                                u'status_code': 2}
-                    self.failUnlessEqual(len(d['lane_set']), 1)
-                    self.failUnlessEqual(d['lane_set'][0], lane_set)
-
-
-    def test_invalid_library_json(self):
-        """
-        Make sure we get a 404 if we request an invalid library id
-        """
-        response = self.client.get('/samples/library/nottheone/json', apidata)
-        self.failUnlessEqual(response.status_code, 404)
-
-
-    def test_invalid_library(self):
-        response = self.client.get('/library/nottheone/')
-        self.failUnlessEqual(response.status_code, 404)
-
-
-    def test_library_no_key(self):
-        """
-        Make sure we get a 302 if we're not logged in
-        """
-        response = self.client.get('/samples/library/10981/json')
-        self.failUnlessEqual(response.status_code, 403)
-        response = self.client.get('/samples/library/10981/json', apidata)
-        self.failUnlessEqual(response.status_code, 200)
-
-    def test_library_rdf(self):
-        import RDF
-        from htsworkflow.util.rdfhelp import get_model, \
-             dump_model, \
-             fromTypedNode, \
-             load_string_into_model, \
-             rdfNS, \
-             libraryOntology
-        model = get_model()
-
-        response = self.client.get('/library/10981/')
-        self.assertEqual(response.status_code, 200)
-        content = response.content
-        load_string_into_model(model, 'rdfa', content)
-
-        body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
-        prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
-
-        select ?library ?name ?library_id ?gel_cut ?made_by
-        where {
-           ?library a libns:library ;
-                    libns:name ?name ;
-                    libns:library_id ?library_id ;
-                    libns:gel_cut ?gel_cut ;
-                    libns:made_by ?made_by
-        }"""
-        query = RDF.SPARQLQuery(body)
-        for r in query.execute(model):
-            self.assertEqual(fromTypedNode(r['library_id']), u'10981')
-            self.assertEqual(fromTypedNode(r['name']),
-                             u'Paired End Multiplexed Sp-BAC')
-            self.assertEqual(fromTypedNode(r['gel_cut']), 400)
-            self.assertEqual(fromTypedNode(r['made_by']), u'Igor')
-
-        state = validate_xhtml(content)
-        if state is not None:
-            self.assertTrue(state)
-
-        # validate a library page.
-        from htsworkflow.util.rdfhelp import add_default_schemas
-        from htsworkflow.util.rdfinfer import Infer
-        add_default_schemas(model)
-        inference = Infer(model)
-        ignored = {'Missing type for: http://localhost/'}
-        errmsgs = [msg for msg in inference.run_validation()
-                   if msg not in ignored ]
-        self.assertEqual(len(errmsgs), 0)
-
-    def test_library_index_rdfa(self):
-        from htsworkflow.util.rdfhelp import \
-             add_default_schemas, get_model, load_string_into_model, \
-             dump_model
-        from htsworkflow.util.rdfinfer import Infer
-
-        model = get_model()
-        add_default_schemas(model)
-        inference = Infer(model)
-
-        response = self.client.get('/library/')
-        self.assertEqual(response.status_code, 200)
-        load_string_into_model(model, 'rdfa', response.content)
-
-        ignored = {'Missing type for: http://localhost/'}
-        errmsgs = [msg for msg in inference.run_validation()
-                   if msg not in ignored ]
-        self.assertEqual(len(errmsgs), 0)
-
-        body =  """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
-        prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
-
-        select ?library ?library_id ?name ?species_name
-        where {
-           ?library a libns:Library .
-           OPTIONAL { ?library libns:library_id ?library_id . }
-           OPTIONAL { ?library libns:species_name ?species_name . }
-           OPTIONAL { ?library libns:name ?name . }
-        }"""
-        bindings = set(['library', 'library_id', 'name', 'species_name'])
-        query = RDF.SPARQLQuery(body)
-        count = 0
-        for r in query.execute(model):
-            count += 1
-            for name, value in r.items():
-                self.assertTrue(name in bindings)
-                self.assertTrue(value is not None)
-
-        self.assertEqual(count, len(Library.objects.filter(hidden=False)))
-
-        state = validate_xhtml(response.content)
-        if state is not None: self.assertTrue(state)
-
-            
-# The django test runner flushes the database between test suites not cases,
-# so to be more compatible with running via nose we flush the database tables
-# of interest before creating our sample data.
-def create_db(obj):
-    obj.species_human = Species.objects.get(pk=8)
-    obj.experiment_rna_seq = ExperimentType.objects.get(pk=4)
-    obj.affiliation_alice = Affiliation.objects.get(pk=1)
-    obj.affiliation_bob = Affiliation.objects.get(pk=2)
-
-    Library.objects.all().delete()
-    obj.library_10001 = Library(
-        id = "10001",
-        library_name = 'C2C12 named poorly',
-        library_species = obj.species_human,
-        experiment_type = obj.experiment_rna_seq,
-        creation_date = datetime.datetime.now(),
-        made_for = 'scientist unit 2007',
-        made_by = 'microfludics system 7321',
-        stopping_point = '2A',
-        undiluted_concentration = '5.01',
-        hidden = False,
-    )
-    obj.library_10001.save()
-    obj.library_10002 = Library(
-        id = "10002",
-        library_name = 'Worm named poorly',
-        library_species = obj.species_human,
-        experiment_type = obj.experiment_rna_seq,
-        creation_date = datetime.datetime.now(),
-        made_for = 'scientist unit 2007',
-        made_by = 'microfludics system 7321',
-        stopping_point = '2A',
-        undiluted_concentration = '5.01',
-        hidden = False,
-    )
-    obj.library_10002.save()
-
-try:
-    import RDF
-    HAVE_RDF = True
-
-    rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#")
-    xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#")
-    libNS = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#")
-except ImportError,e:
-    HAVE_RDF = False
-
-
-class TestRDFaLibrary(TestCase):
-    fixtures = ['test_samples.json']
-
-    def test_parse_rdfa(self):
-        model = get_rdf_memory_model()
-        parser = RDF.Parser(name='rdfa')
-        url = '/library/10981/'
-        lib_response = self.client.get(url)
-        self.failIfEqual(len(lib_response.content), 0)
-
-        parser.parse_string_into_model(model,
-                                       lib_response.content,
-                                       'http://localhost'+url)
-        # http://jumpgate.caltech.edu/wiki/LibraryOntology#affiliation>
-        self.check_literal_object(model, ['Bob'], p=libNS['affiliation'])
-        self.check_literal_object(model, ['Multiplexed'], p=libNS['experiment_type'])
-        self.check_literal_object(model, ['400'], p=libNS['gel_cut'])
-        self.check_literal_object(model, ['Igor'], p=libNS['made_by'])
-        self.check_literal_object(model, ['Paired End Multiplexed Sp-BAC'], p=libNS['name'])
-        self.check_literal_object(model, ['Drosophila melanogaster'], p=libNS['species_name'])
-
-        self.check_uri_object(model,
-                              [u'http://localhost/lane/1193'],
-                              p=libNS['has_lane'])
-
-        fc_uri = RDF.Uri('http://localhost/flowcell/303TUAAXX/')
-        self.check_literal_object(model,
-                                  [u"303TUAAXX"],
-                                  s=fc_uri, p=libNS['flowcell_id'])
-
-    def check_literal_object(self, model, values, s=None, p=None, o=None):
-        statements = list(model.find_statements(
-            RDF.Statement(s,p,o)))
-        self.failUnlessEqual(len(statements), len(values),
-                        "Couln't find %s %s %s" % (s,p,o))
-        for s in statements:
-            self.failUnless(s.object.literal_value['string'] in values)
-
-
-    def check_uri_object(self, model, values, s=None, p=None, o=None):
-        statements = list(model.find_statements(
-            RDF.Statement(s,p,o)))
-        self.failUnlessEqual(len(statements), len(values),
-                        "Couln't find %s %s %s" % (s,p,o))
-        for s in statements:
-            self.failUnless(unicode(s.object.uri) in values)
-
-
-
-def get_rdf_memory_model():
-    storage = RDF.MemoryStorage()
-    model = RDF.Model(storage)
-    return model