From: Diane Trout Date: Tue, 20 Nov 2012 22:29:20 +0000 (-0800) Subject: Renamed various django tests.py to test_module.py X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=07ecbb168eeb659f35f8a1b92c0cb9d760e35082 Renamed various django tests.py to test_module.py It appears by default py.test was looking for packages named test_, since it wasn't finding the tests.py modules. This seemed like a reasonable alternative convention. --- diff --git a/htsworkflow/frontend/experiments/test_experiments.py b/htsworkflow/frontend/experiments/test_experiments.py new file mode 100644 index 0000000..0f24147 --- /dev/null +++ b/htsworkflow/frontend/experiments/test_experiments.py @@ -0,0 +1,677 @@ +import re +from lxml.html import fromstring +try: + import json +except ImportError, e: + import simplejson as json +import os +import shutil +import sys +import tempfile +from urlparse import urljoin + +from django.conf import settings +from django.core import mail +from django.core.exceptions import ObjectDoesNotExist +from django.test import TestCase +from htsworkflow.frontend.experiments import models +from htsworkflow.frontend.experiments import experiments +from htsworkflow.frontend.auth import apidata +from htsworkflow.util.ethelp import validate_xhtml + +from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR + +LANE_SET = range(1,9) + +NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'} + +class ClusterStationTestCases(TestCase): + fixtures = ['test_flowcells.json'] + + def test_default(self): + c = models.ClusterStation.default() + self.assertEqual(c.id, 2) + + c.isdefault = False + c.save() + + total = models.ClusterStation.objects.filter(isdefault=True).count() + self.assertEqual(total, 0) + + other_default = models.ClusterStation.default() + self.assertEqual(other_default.id, 3) + + + def test_update_default(self): + old_default = models.ClusterStation.default() + + c = models.ClusterStation.objects.get(pk=3) + c.isdefault = True + c.save() + + new_default = models.ClusterStation.default() + + self.assertNotEqual(old_default, new_default) + self.assertEqual(new_default, c) + + total = models.ClusterStation.objects.filter(isdefault=True).count() + self.assertEqual(total, 1) + + def test_update_other(self): + old_default = models.ClusterStation.default() + total = models.ClusterStation.objects.filter(isdefault=True).count() + self.assertEqual(total, 1) + + c = models.ClusterStation.objects.get(pk=1) + c.name = "Primary Key 1" + c.save() + + total = models.ClusterStation.objects.filter(isdefault=True).count() + self.assertEqual(total, 1) + + new_default = models.ClusterStation.default() + self.assertEqual(old_default, new_default) + + +class SequencerTestCases(TestCase): + fixtures = ['test_flowcells.json'] + + def test_default(self): + # starting with no default + s = models.Sequencer.default() + self.assertEqual(s.id, 2) + + total = models.Sequencer.objects.filter(isdefault=True).count() + self.assertEqual(total, 1) + + s.isdefault = False + s.save() + + total = models.Sequencer.objects.filter(isdefault=True).count() + self.assertEqual(total, 0) + + other_default = models.Sequencer.default() + self.assertEqual(other_default.id, 7) + + def test_update_default(self): + old_default = models.Sequencer.default() + + s = models.Sequencer.objects.get(pk=1) + s.isdefault = True + s.save() + + new_default = models.Sequencer.default() + + self.assertNotEqual(old_default, new_default) + self.assertEqual(new_default, s) + + total = models.Sequencer.objects.filter(isdefault=True).count() + self.assertEqual(total, 1) + + + def test_update_other(self): + old_default = models.Sequencer.default() + total = models.Sequencer.objects.filter(isdefault=True).count() + self.assertEqual(total, 1) + + s = models.Sequencer.objects.get(pk=1) + s.name = "Primary Key 1" + s.save() + + total = models.Sequencer.objects.filter(isdefault=True).count() + self.assertEqual(total, 1) + + new_default = models.Sequencer.default() + self.assertEqual(old_default, new_default) + + +class ExperimentsTestCases(TestCase): + fixtures = ['test_flowcells.json', + ] + + def setUp(self): + self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-') + settings.RESULT_HOME_DIR = self.tempdir + + self.fc1_id = 'FC12150' + self.fc1_root = os.path.join(self.tempdir, self.fc1_id) + os.mkdir(self.fc1_root) + self.fc1_dir = os.path.join(self.fc1_root, 'C1-37') + os.mkdir(self.fc1_dir) + runxml = 'run_FC12150_2007-09-27.xml' + shutil.copy(os.path.join(TESTDATA_DIR, runxml), + os.path.join(self.fc1_dir, runxml)) + for i in range(1,9): + shutil.copy( + os.path.join(TESTDATA_DIR, + 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'), + os.path.join(self.fc1_dir, + 'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,)) + ) + + self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX') + os.mkdir(self.fc2_dir) + os.mkdir(os.path.join(self.fc2_dir, 'C1-25')) + os.mkdir(os.path.join(self.fc2_dir, 'C1-37')) + os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots')) + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_flowcell_information(self): + """ + Check the code that packs the django objects into simple types. + """ + for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]: + fc_dict = experiments.flowcell_information(fc_id) + fc_django = models.FlowCell.objects.get(flowcell_id=fc_id) + self.assertEqual(fc_dict['flowcell_id'], fc_id) + self.assertEqual(fc_django.flowcell_id, fc_id) + self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name) + self.assertEqual(fc_dict['read_length'], fc_django.read_length) + self.assertEqual(fc_dict['notes'], fc_django.notes) + self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name) + + for lane in fc_django.lane_set.all(): + lane_contents = fc_dict['lane_set'][lane.lane_number] + lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id] + self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate) + self.assertEqual(lane_dict['comment'], lane.comment) + self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id) + self.assertEqual(lane_dict['lane_number'], lane.lane_number) + self.assertEqual(lane_dict['library_name'], lane.library.library_name) + self.assertEqual(lane_dict['library_id'], lane.library.id) + self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM)) + self.assertEqual(lane_dict['library_species'], + lane.library.library_species.scientific_name) + + response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata) + # strptime isoformat string = '%Y-%m-%dT%H:%M:%S' + fc_json = json.loads(response.content) + self.assertEqual(fc_json['flowcell_id'], fc_id) + self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name) + self.assertEqual(fc_json['read_length'], fc_django.read_length) + self.assertEqual(fc_json['notes'], fc_django.notes) + self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name) + + + for lane in fc_django.lane_set.all(): + lane_contents = fc_json['lane_set'][unicode(lane.lane_number)] + lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id] + + self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate) + self.assertEqual(lane_dict['comment'], lane.comment) + self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id) + self.assertEqual(lane_dict['lane_number'], lane.lane_number) + self.assertEqual(lane_dict['library_name'], lane.library.library_name) + self.assertEqual(lane_dict['library_id'], lane.library.id) + self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM)) + self.assertEqual(lane_dict['library_species'], + lane.library.library_species.scientific_name) + + def test_invalid_flowcell(self): + """ + Make sure we get a 404 if we request an invalid flowcell ID + """ + response = self.client.get('/experiments/config/nottheone/json', apidata) + self.assertEqual(response.status_code, 404) + + def test_no_key(self): + """ + Require logging in to retrieve meta data + """ + response = self.client.get(u'/experiments/config/FC12150/json') + self.assertEqual(response.status_code, 403) + + def test_library_id(self): + """ + Library IDs should be flexible, so make sure we can retrive a non-numeric ID + """ + response = self.client.get('/experiments/config/FC12150/json', apidata) + self.assertEqual(response.status_code, 200) + flowcell = json.loads(response.content) + + lane_contents = flowcell['lane_set']['3'] + lane_library = lane_contents[0] + self.assertEqual(lane_library['library_id'], 'SL039') + + response = self.client.get('/samples/library/SL039/json', apidata) + self.assertEqual(response.status_code, 200) + library_sl039 = json.loads(response.content) + + self.assertEqual(library_sl039['library_id'], 'SL039') + + def test_raw_id_field(self): + """ + Test ticket:147 + + Library's have IDs, libraries also have primary keys, + we eventually had enough libraries that the drop down combo box was too + hard to filter through, unfortnately we want a field that uses our library + id and not the internal primary key, and raw_id_field uses primary keys. + + This tests to make sure that the value entered in the raw library id field matches + the library id looked up. + """ + expected_ids = [u'10981',u'11016',u'SL039',u'11060', + u'11061',u'11062',u'11063',u'11064'] + self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5') + response = self.client.get('/admin/experiments/flowcell/153/') + + tree = fromstring(response.content) + for i in range(0,8): + xpath_expression = '//input[@id="id_lane_set-%d-library"]' + input_field = tree.xpath(xpath_expression % (i,))[0] + library_field = input_field.find('../strong') + library_id, library_name = library_field.text.split(':') + # strip leading '#' sign from name + library_id = library_id[1:] + self.assertEqual(library_id, expected_ids[i]) + self.assertEqual(input_field.attrib['value'], library_id) + + def test_library_to_flowcell_link(self): + """ + Make sure the library page includes links to the flowcell pages. + That work with flowcell IDs that have parenthetical comments. + """ + self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5') + response = self.client.get('/library/11070/') + self.assertEqual(response.status_code, 200) + status = validate_xhtml(response.content) + if status is not None: self.assertTrue(status) + + tree = fromstring(response.content) + flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]', + namespaces=NSMAP) + self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)') + failed_fc_span = flowcell_spans[0] + failed_fc_a = failed_fc_span.getparent() + # make sure some of our RDF made it. + self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell') + self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/') + fc_response = self.client.get(failed_fc_a.get('href')) + self.assertEqual(fc_response.status_code, 200) + status = validate_xhtml(response.content) + if status is not None: self.assertTrue(status) + + fc_lane_response = self.client.get('/flowcell/30012AAXX/8/') + self.assertEqual(fc_lane_response.status_code, 200) + status = validate_xhtml(response.content) + if status is not None: self.assertTrue(status) + + + def test_pooled_multiplex_id(self): + fc_dict = experiments.flowcell_information('42JU1AAXX') + lane_contents = fc_dict['lane_set'][3] + self.assertEqual(len(lane_contents), 2) + lane_dict = multi_lane_to_dict(lane_contents) + + self.assertEqual(lane_dict['12044']['index_sequence'], + {u'1': u'ATCACG', + u'2': u'CGATGT', + u'3': u'TTAGGC'}) + self.assertEqual(lane_dict['11045']['index_sequence'], + {u'1': u'ATCACG'}) + + + + def test_lanes_for(self): + """ + Check the code that packs the django objects into simple types. + """ + user = 'test' + lanes = experiments.lanes_for(user) + self.assertEqual(len(lanes), 5) + + response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata) + lanes_json = json.loads(response.content) + self.assertEqual(len(lanes), len(lanes_json)) + for i in range(len(lanes)): + self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment']) + self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number']) + self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell']) + self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date']) + + def test_lanes_for_no_lanes(self): + """ + Do we get something meaningful back when the user isn't attached to anything? + """ + user = 'supertest' + lanes = experiments.lanes_for(user) + self.assertEqual(len(lanes), 0) + + response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata) + lanes_json = json.loads(response.content) + + def test_lanes_for_no_user(self): + """ + Do we get something meaningful back when its the wrong user + """ + user = 'not a real user' + self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user) + + response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata) + self.assertEqual(response.status_code, 404) + + + def test_raw_data_dir(self): + """Raw data path generator check""" + flowcell_id = self.fc1_id + raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id) + + fc = models.FlowCell.objects.get(flowcell_id=flowcell_id) + self.assertEqual(fc.get_raw_data_directory(), raw_dir) + + fc.flowcell_id = flowcell_id + " (failed)" + self.assertEqual(fc.get_raw_data_directory(), raw_dir) + + + def test_data_run_import(self): + srf_file_type = models.FileType.objects.get(name='SRF') + runxml_file_type = models.FileType.objects.get(name='run_xml') + flowcell_id = self.fc1_id + flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id) + flowcell.update_data_runs() + self.assertEqual(len(flowcell.datarun_set.all()), 1) + + run = flowcell.datarun_set.all()[0] + result_files = run.datafile_set.all() + result_dict = dict(((rf.relative_pathname, rf) for rf in result_files)) + + srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf'] + self.assertEqual(srf4.file_type, srf_file_type) + self.assertEqual(srf4.library_id, '11060') + self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150') + self.assertEqual( + srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id, + '11060') + self.assertEqual( + srf4.pathname, + os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname)) + + lane_files = run.lane_files() + self.assertEqual(lane_files[4]['srf'], srf4) + + runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml'] + self.assertEqual(runxml.file_type, runxml_file_type) + self.assertEqual(runxml.library_id, None) + + import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')) + # what happens if we import twice? + flowcell.import_data_run('FC12150/C1-37', + 'run_FC12150_2007-09-27.xml') + self.assertEqual( + len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')), + import1) + + def test_read_result_file(self): + """make sure we can return a result file + """ + flowcell_id = self.fc1_id + flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id) + flowcell.update_data_runs() + + #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5') + + result_files = flowcell.datarun_set.all()[0].datafile_set.all() + for f in result_files: + url = '/experiments/file/%s' % ( f.random_key,) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + mimetype = f.file_type.mimetype + if mimetype is None: + mimetype = 'application/octet-stream' + + self.assertEqual(mimetype, response['content-type']) + + def test_flowcell_rdf(self): + import RDF + from htsworkflow.util.rdfhelp import get_model, \ + fromTypedNode, \ + load_string_into_model, \ + rdfNS, \ + libraryOntology, \ + dump_model + + model = get_model() + + expected = {'1': ['11034'], + '2': ['11036'], + '3': ['12044','11045'], + '4': ['11047','13044'], + '5': ['11055'], + '6': ['11067'], + '7': ['11069'], + '8': ['11070']} + url = '/flowcell/42JU1AAXX/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + status = validate_xhtml(response.content) + if status is not None: self.assertTrue(status) + + ns = urljoin('http://localhost', url) + load_string_into_model(model, 'rdfa', response.content, ns=ns) + body = """prefix rdf: + prefix libns: + + select ?flowcell ?flowcell_id ?lane_id ?library_id + where { + ?flowcell a libns:IlluminaFlowcell ; + libns:flowcell_id ?flowcell_id ; + libns:has_lane ?lane . + ?lane libns:lane_number ?lane_id ; + libns:library ?library . + ?library libns:library_id ?library_id . + }""" + query = RDF.SPARQLQuery(body) + count = 0 + for r in query.execute(model): + count += 1 + self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX') + lane_id = fromTypedNode(r['lane_id']) + library_id = fromTypedNode(r['library_id']) + self.assertTrue(library_id in expected[lane_id]) + self.assertEqual(count, 10) + + +class TestFileType(TestCase): + def test_file_type_unicode(self): + file_type_objects = models.FileType.objects + name = 'QSEQ tarfile' + file_type_object = file_type_objects.get(name=name) + self.assertEqual(u"", + unicode(file_type_object)) + +class TestFileType(TestCase): + def test_find_file_type(self): + file_type_objects = models.FileType.objects + cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2', + 'QSEQ tarfile', 7, 1), + ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf', + 'SRF', 1, None), + ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None), + ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None), + ('s_3_eland_result.txt.bz2','ELAND Result', 3, None), + ('s_1_export.txt.bz2','ELAND Export', 1, None), + ('s_1_percent_call.png', 'IVC Percent Call', 1, None), + ('s_2_percent_base.png', 'IVC Percent Base', 2, None), + ('s_3_percent_all.png', 'IVC Percent All', 3, None), + ('s_4_call.png', 'IVC Call', 4, None), + ('s_5_all.png', 'IVC All', 5, None), + ('Summary.htm', 'Summary.htm', None, None), + ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None), + ] + for filename, typename, lane, end in cases: + ft = models.find_file_type_metadata_from_filename(filename) + self.assertEqual(ft['file_type'], + file_type_objects.get(name=typename)) + self.assertEqual(ft.get('lane', None), lane) + self.assertEqual(ft.get('end', None), end) + + def test_assign_file_type_complex_path(self): + file_type_objects = models.FileType.objects + cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2', + 'QSEQ tarfile', 7, 1), + ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf', + 'SRF', 1, None), + ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None), + ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None), + ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None), + ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None), + ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None), + ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None), + ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None), + ('amonkey/s_4_call.png', 'IVC Call', 4, None), + ('fishie/s_5_all.png', 'IVC All', 5, None), + ('/random/Summary.htm', 'Summary.htm', None, None), + ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None), + ] + for filename, typename, lane, end in cases: + result = models.find_file_type_metadata_from_filename(filename) + self.assertEqual(result['file_type'], + file_type_objects.get(name=typename)) + self.assertEqual(result.get('lane',None), lane) + self.assertEqual(result.get('end', None), end) + +class TestEmailNotify(TestCase): + fixtures = ['test_flowcells.json'] + + def test_started_email_not_logged_in(self): + response = self.client.get('/experiments/started/153/') + self.assertEqual(response.status_code, 302) + + def test_started_email_logged_in_user(self): + self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') + response = self.client.get('/experiments/started/153/') + self.assertEqual(response.status_code, 302) + + def test_started_email_logged_in_staff(self): + self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5') + response = self.client.get('/experiments/started/153/') + self.assertEqual(response.status_code, 200) + + def test_started_email_send(self): + self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5') + response = self.client.get('/experiments/started/153/') + self.assertEqual(response.status_code, 200) + + self.assertTrue('pk1@example.com' in response.content) + self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content) + + response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'}) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(mail.outbox), 4) + bcc = set(settings.NOTIFICATION_BCC).intersect(set(settings.MANAGERS)) + for m in mail.outbox: + self.assertTrue(len(m.body) > 0) + self.assertEqual(m.bcc, bcc) + + def test_email_navigation(self): + """ + Can we navigate between the flowcell and email forms properly? + """ + self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5') + response = self.client.get('/experiments/started/153/') + self.assertEqual(response.status_code, 200) + self.assertTrue(re.search('Flowcell FC12150', response.content)) + # require that navigation back to the admin page exists + self.assertTrue(re.search('[^<]+', response.content)) + +def multi_lane_to_dict(lane): + """Convert a list of lane entries into a dictionary indexed by library ID + """ + return dict( ((x['library_id'],x) for x in lane) ) + +class TestSequencer(TestCase): + fixtures = ['test_flowcells.json', + ] + + def test_name_generation(self): + seq = models.Sequencer() + seq.name = "Seq1" + seq.instrument_name = "HWI-SEQ1" + seq.model = "Imaginary 5000" + + self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)") + + def test_lookup(self): + fc = models.FlowCell.objects.get(pk=153) + self.assertEqual(fc.sequencer.model, + "Illumina Genome Analyzer IIx") + self.assertEqual(fc.sequencer.instrument_name, + "ILLUMINA-EC5D15") + # well actually we let the browser tack on the host name + url = fc.get_absolute_url() + self.assertEqual(url, '/flowcell/FC12150/') + + def test_rdf(self): + response = self.client.get('/flowcell/FC12150/', apidata) + tree = fromstring(response.content) + seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]', + namespaces=NSMAP) + self.assertEqual(len(seq_by), 1) + self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by') + seq = seq_by[0].getchildren() + self.assertEqual(len(seq), 1) + self.assertEqual(seq[0].attrib['about'], '/sequencer/2') + self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer') + + name = seq[0].xpath('./span[@property="libns:sequencer_name"]') + self.assertEqual(len(name), 1) + self.assertEqual(name[0].text, 'Tardigrade') + instrument = seq[0].xpath( + './span[@property="libns:sequencer_instrument"]') + self.assertEqual(len(instrument), 1) + self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15') + model = seq[0].xpath( + './span[@property="libns:sequencer_model"]') + self.assertEqual(len(model), 1) + self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx') + + def test_flowcell_with_rdf_validation(self): + from htsworkflow.util.rdfhelp import add_default_schemas, \ + dump_model, \ + get_model, \ + load_string_into_model + from htsworkflow.util.rdfinfer import Infer + + model = get_model() + add_default_schemas(model) + inference = Infer(model) + + url ='/flowcell/FC12150/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + status = validate_xhtml(response.content) + if status is not None: self.assertTrue(status) + + load_string_into_model(model, 'rdfa', response.content) + + errmsgs = list(inference.run_validation()) + self.assertEqual(len(errmsgs), 2) + for errmsg in errmsgs: + self.assertEqual(errmsg, 'Missing type for: http://localhost/') + + def test_lane_with_rdf_validation(self): + from htsworkflow.util.rdfhelp import add_default_schemas, \ + dump_model, \ + get_model, \ + load_string_into_model + from htsworkflow.util.rdfinfer import Infer + + model = get_model() + add_default_schemas(model) + inference = Infer(model) + + url = '/lane/1193' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + status = validate_xhtml(response.content) + if status is not None: self.assertTrue(status) + + load_string_into_model(model, 'rdfa', response.content) + + errmsgs = list(inference.run_validation()) + self.assertEqual(len(errmsgs), 2) + for errmsg in errmsgs: + self.assertEqual(errmsg, 'Missing type for: http://localhost/') diff --git a/htsworkflow/frontend/experiments/tests.py b/htsworkflow/frontend/experiments/tests.py deleted file mode 100644 index 0f24147..0000000 --- a/htsworkflow/frontend/experiments/tests.py +++ /dev/null @@ -1,677 +0,0 @@ -import re -from lxml.html import fromstring -try: - import json -except ImportError, e: - import simplejson as json -import os -import shutil -import sys -import tempfile -from urlparse import urljoin - -from django.conf import settings -from django.core import mail -from django.core.exceptions import ObjectDoesNotExist -from django.test import TestCase -from htsworkflow.frontend.experiments import models -from htsworkflow.frontend.experiments import experiments -from htsworkflow.frontend.auth import apidata -from htsworkflow.util.ethelp import validate_xhtml - -from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR - -LANE_SET = range(1,9) - -NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'} - -class ClusterStationTestCases(TestCase): - fixtures = ['test_flowcells.json'] - - def test_default(self): - c = models.ClusterStation.default() - self.assertEqual(c.id, 2) - - c.isdefault = False - c.save() - - total = models.ClusterStation.objects.filter(isdefault=True).count() - self.assertEqual(total, 0) - - other_default = models.ClusterStation.default() - self.assertEqual(other_default.id, 3) - - - def test_update_default(self): - old_default = models.ClusterStation.default() - - c = models.ClusterStation.objects.get(pk=3) - c.isdefault = True - c.save() - - new_default = models.ClusterStation.default() - - self.assertNotEqual(old_default, new_default) - self.assertEqual(new_default, c) - - total = models.ClusterStation.objects.filter(isdefault=True).count() - self.assertEqual(total, 1) - - def test_update_other(self): - old_default = models.ClusterStation.default() - total = models.ClusterStation.objects.filter(isdefault=True).count() - self.assertEqual(total, 1) - - c = models.ClusterStation.objects.get(pk=1) - c.name = "Primary Key 1" - c.save() - - total = models.ClusterStation.objects.filter(isdefault=True).count() - self.assertEqual(total, 1) - - new_default = models.ClusterStation.default() - self.assertEqual(old_default, new_default) - - -class SequencerTestCases(TestCase): - fixtures = ['test_flowcells.json'] - - def test_default(self): - # starting with no default - s = models.Sequencer.default() - self.assertEqual(s.id, 2) - - total = models.Sequencer.objects.filter(isdefault=True).count() - self.assertEqual(total, 1) - - s.isdefault = False - s.save() - - total = models.Sequencer.objects.filter(isdefault=True).count() - self.assertEqual(total, 0) - - other_default = models.Sequencer.default() - self.assertEqual(other_default.id, 7) - - def test_update_default(self): - old_default = models.Sequencer.default() - - s = models.Sequencer.objects.get(pk=1) - s.isdefault = True - s.save() - - new_default = models.Sequencer.default() - - self.assertNotEqual(old_default, new_default) - self.assertEqual(new_default, s) - - total = models.Sequencer.objects.filter(isdefault=True).count() - self.assertEqual(total, 1) - - - def test_update_other(self): - old_default = models.Sequencer.default() - total = models.Sequencer.objects.filter(isdefault=True).count() - self.assertEqual(total, 1) - - s = models.Sequencer.objects.get(pk=1) - s.name = "Primary Key 1" - s.save() - - total = models.Sequencer.objects.filter(isdefault=True).count() - self.assertEqual(total, 1) - - new_default = models.Sequencer.default() - self.assertEqual(old_default, new_default) - - -class ExperimentsTestCases(TestCase): - fixtures = ['test_flowcells.json', - ] - - def setUp(self): - self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-') - settings.RESULT_HOME_DIR = self.tempdir - - self.fc1_id = 'FC12150' - self.fc1_root = os.path.join(self.tempdir, self.fc1_id) - os.mkdir(self.fc1_root) - self.fc1_dir = os.path.join(self.fc1_root, 'C1-37') - os.mkdir(self.fc1_dir) - runxml = 'run_FC12150_2007-09-27.xml' - shutil.copy(os.path.join(TESTDATA_DIR, runxml), - os.path.join(self.fc1_dir, runxml)) - for i in range(1,9): - shutil.copy( - os.path.join(TESTDATA_DIR, - 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'), - os.path.join(self.fc1_dir, - 'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,)) - ) - - self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX') - os.mkdir(self.fc2_dir) - os.mkdir(os.path.join(self.fc2_dir, 'C1-25')) - os.mkdir(os.path.join(self.fc2_dir, 'C1-37')) - os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots')) - - def tearDown(self): - shutil.rmtree(self.tempdir) - - def test_flowcell_information(self): - """ - Check the code that packs the django objects into simple types. - """ - for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]: - fc_dict = experiments.flowcell_information(fc_id) - fc_django = models.FlowCell.objects.get(flowcell_id=fc_id) - self.assertEqual(fc_dict['flowcell_id'], fc_id) - self.assertEqual(fc_django.flowcell_id, fc_id) - self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name) - self.assertEqual(fc_dict['read_length'], fc_django.read_length) - self.assertEqual(fc_dict['notes'], fc_django.notes) - self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name) - - for lane in fc_django.lane_set.all(): - lane_contents = fc_dict['lane_set'][lane.lane_number] - lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id] - self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate) - self.assertEqual(lane_dict['comment'], lane.comment) - self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id) - self.assertEqual(lane_dict['lane_number'], lane.lane_number) - self.assertEqual(lane_dict['library_name'], lane.library.library_name) - self.assertEqual(lane_dict['library_id'], lane.library.id) - self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM)) - self.assertEqual(lane_dict['library_species'], - lane.library.library_species.scientific_name) - - response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata) - # strptime isoformat string = '%Y-%m-%dT%H:%M:%S' - fc_json = json.loads(response.content) - self.assertEqual(fc_json['flowcell_id'], fc_id) - self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name) - self.assertEqual(fc_json['read_length'], fc_django.read_length) - self.assertEqual(fc_json['notes'], fc_django.notes) - self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name) - - - for lane in fc_django.lane_set.all(): - lane_contents = fc_json['lane_set'][unicode(lane.lane_number)] - lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id] - - self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate) - self.assertEqual(lane_dict['comment'], lane.comment) - self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id) - self.assertEqual(lane_dict['lane_number'], lane.lane_number) - self.assertEqual(lane_dict['library_name'], lane.library.library_name) - self.assertEqual(lane_dict['library_id'], lane.library.id) - self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM)) - self.assertEqual(lane_dict['library_species'], - lane.library.library_species.scientific_name) - - def test_invalid_flowcell(self): - """ - Make sure we get a 404 if we request an invalid flowcell ID - """ - response = self.client.get('/experiments/config/nottheone/json', apidata) - self.assertEqual(response.status_code, 404) - - def test_no_key(self): - """ - Require logging in to retrieve meta data - """ - response = self.client.get(u'/experiments/config/FC12150/json') - self.assertEqual(response.status_code, 403) - - def test_library_id(self): - """ - Library IDs should be flexible, so make sure we can retrive a non-numeric ID - """ - response = self.client.get('/experiments/config/FC12150/json', apidata) - self.assertEqual(response.status_code, 200) - flowcell = json.loads(response.content) - - lane_contents = flowcell['lane_set']['3'] - lane_library = lane_contents[0] - self.assertEqual(lane_library['library_id'], 'SL039') - - response = self.client.get('/samples/library/SL039/json', apidata) - self.assertEqual(response.status_code, 200) - library_sl039 = json.loads(response.content) - - self.assertEqual(library_sl039['library_id'], 'SL039') - - def test_raw_id_field(self): - """ - Test ticket:147 - - Library's have IDs, libraries also have primary keys, - we eventually had enough libraries that the drop down combo box was too - hard to filter through, unfortnately we want a field that uses our library - id and not the internal primary key, and raw_id_field uses primary keys. - - This tests to make sure that the value entered in the raw library id field matches - the library id looked up. - """ - expected_ids = [u'10981',u'11016',u'SL039',u'11060', - u'11061',u'11062',u'11063',u'11064'] - self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5') - response = self.client.get('/admin/experiments/flowcell/153/') - - tree = fromstring(response.content) - for i in range(0,8): - xpath_expression = '//input[@id="id_lane_set-%d-library"]' - input_field = tree.xpath(xpath_expression % (i,))[0] - library_field = input_field.find('../strong') - library_id, library_name = library_field.text.split(':') - # strip leading '#' sign from name - library_id = library_id[1:] - self.assertEqual(library_id, expected_ids[i]) - self.assertEqual(input_field.attrib['value'], library_id) - - def test_library_to_flowcell_link(self): - """ - Make sure the library page includes links to the flowcell pages. - That work with flowcell IDs that have parenthetical comments. - """ - self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5') - response = self.client.get('/library/11070/') - self.assertEqual(response.status_code, 200) - status = validate_xhtml(response.content) - if status is not None: self.assertTrue(status) - - tree = fromstring(response.content) - flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]', - namespaces=NSMAP) - self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)') - failed_fc_span = flowcell_spans[0] - failed_fc_a = failed_fc_span.getparent() - # make sure some of our RDF made it. - self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell') - self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/') - fc_response = self.client.get(failed_fc_a.get('href')) - self.assertEqual(fc_response.status_code, 200) - status = validate_xhtml(response.content) - if status is not None: self.assertTrue(status) - - fc_lane_response = self.client.get('/flowcell/30012AAXX/8/') - self.assertEqual(fc_lane_response.status_code, 200) - status = validate_xhtml(response.content) - if status is not None: self.assertTrue(status) - - - def test_pooled_multiplex_id(self): - fc_dict = experiments.flowcell_information('42JU1AAXX') - lane_contents = fc_dict['lane_set'][3] - self.assertEqual(len(lane_contents), 2) - lane_dict = multi_lane_to_dict(lane_contents) - - self.assertEqual(lane_dict['12044']['index_sequence'], - {u'1': u'ATCACG', - u'2': u'CGATGT', - u'3': u'TTAGGC'}) - self.assertEqual(lane_dict['11045']['index_sequence'], - {u'1': u'ATCACG'}) - - - - def test_lanes_for(self): - """ - Check the code that packs the django objects into simple types. - """ - user = 'test' - lanes = experiments.lanes_for(user) - self.assertEqual(len(lanes), 5) - - response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata) - lanes_json = json.loads(response.content) - self.assertEqual(len(lanes), len(lanes_json)) - for i in range(len(lanes)): - self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment']) - self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number']) - self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell']) - self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date']) - - def test_lanes_for_no_lanes(self): - """ - Do we get something meaningful back when the user isn't attached to anything? - """ - user = 'supertest' - lanes = experiments.lanes_for(user) - self.assertEqual(len(lanes), 0) - - response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata) - lanes_json = json.loads(response.content) - - def test_lanes_for_no_user(self): - """ - Do we get something meaningful back when its the wrong user - """ - user = 'not a real user' - self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user) - - response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata) - self.assertEqual(response.status_code, 404) - - - def test_raw_data_dir(self): - """Raw data path generator check""" - flowcell_id = self.fc1_id - raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id) - - fc = models.FlowCell.objects.get(flowcell_id=flowcell_id) - self.assertEqual(fc.get_raw_data_directory(), raw_dir) - - fc.flowcell_id = flowcell_id + " (failed)" - self.assertEqual(fc.get_raw_data_directory(), raw_dir) - - - def test_data_run_import(self): - srf_file_type = models.FileType.objects.get(name='SRF') - runxml_file_type = models.FileType.objects.get(name='run_xml') - flowcell_id = self.fc1_id - flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id) - flowcell.update_data_runs() - self.assertEqual(len(flowcell.datarun_set.all()), 1) - - run = flowcell.datarun_set.all()[0] - result_files = run.datafile_set.all() - result_dict = dict(((rf.relative_pathname, rf) for rf in result_files)) - - srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf'] - self.assertEqual(srf4.file_type, srf_file_type) - self.assertEqual(srf4.library_id, '11060') - self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150') - self.assertEqual( - srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id, - '11060') - self.assertEqual( - srf4.pathname, - os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname)) - - lane_files = run.lane_files() - self.assertEqual(lane_files[4]['srf'], srf4) - - runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml'] - self.assertEqual(runxml.file_type, runxml_file_type) - self.assertEqual(runxml.library_id, None) - - import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')) - # what happens if we import twice? - flowcell.import_data_run('FC12150/C1-37', - 'run_FC12150_2007-09-27.xml') - self.assertEqual( - len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')), - import1) - - def test_read_result_file(self): - """make sure we can return a result file - """ - flowcell_id = self.fc1_id - flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id) - flowcell.update_data_runs() - - #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5') - - result_files = flowcell.datarun_set.all()[0].datafile_set.all() - for f in result_files: - url = '/experiments/file/%s' % ( f.random_key,) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - mimetype = f.file_type.mimetype - if mimetype is None: - mimetype = 'application/octet-stream' - - self.assertEqual(mimetype, response['content-type']) - - def test_flowcell_rdf(self): - import RDF - from htsworkflow.util.rdfhelp import get_model, \ - fromTypedNode, \ - load_string_into_model, \ - rdfNS, \ - libraryOntology, \ - dump_model - - model = get_model() - - expected = {'1': ['11034'], - '2': ['11036'], - '3': ['12044','11045'], - '4': ['11047','13044'], - '5': ['11055'], - '6': ['11067'], - '7': ['11069'], - '8': ['11070']} - url = '/flowcell/42JU1AAXX/' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - status = validate_xhtml(response.content) - if status is not None: self.assertTrue(status) - - ns = urljoin('http://localhost', url) - load_string_into_model(model, 'rdfa', response.content, ns=ns) - body = """prefix rdf: - prefix libns: - - select ?flowcell ?flowcell_id ?lane_id ?library_id - where { - ?flowcell a libns:IlluminaFlowcell ; - libns:flowcell_id ?flowcell_id ; - libns:has_lane ?lane . - ?lane libns:lane_number ?lane_id ; - libns:library ?library . - ?library libns:library_id ?library_id . - }""" - query = RDF.SPARQLQuery(body) - count = 0 - for r in query.execute(model): - count += 1 - self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX') - lane_id = fromTypedNode(r['lane_id']) - library_id = fromTypedNode(r['library_id']) - self.assertTrue(library_id in expected[lane_id]) - self.assertEqual(count, 10) - - -class TestFileType(TestCase): - def test_file_type_unicode(self): - file_type_objects = models.FileType.objects - name = 'QSEQ tarfile' - file_type_object = file_type_objects.get(name=name) - self.assertEqual(u"", - unicode(file_type_object)) - -class TestFileType(TestCase): - def test_find_file_type(self): - file_type_objects = models.FileType.objects - cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2', - 'QSEQ tarfile', 7, 1), - ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf', - 'SRF', 1, None), - ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None), - ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None), - ('s_3_eland_result.txt.bz2','ELAND Result', 3, None), - ('s_1_export.txt.bz2','ELAND Export', 1, None), - ('s_1_percent_call.png', 'IVC Percent Call', 1, None), - ('s_2_percent_base.png', 'IVC Percent Base', 2, None), - ('s_3_percent_all.png', 'IVC Percent All', 3, None), - ('s_4_call.png', 'IVC Call', 4, None), - ('s_5_all.png', 'IVC All', 5, None), - ('Summary.htm', 'Summary.htm', None, None), - ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None), - ] - for filename, typename, lane, end in cases: - ft = models.find_file_type_metadata_from_filename(filename) - self.assertEqual(ft['file_type'], - file_type_objects.get(name=typename)) - self.assertEqual(ft.get('lane', None), lane) - self.assertEqual(ft.get('end', None), end) - - def test_assign_file_type_complex_path(self): - file_type_objects = models.FileType.objects - cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2', - 'QSEQ tarfile', 7, 1), - ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf', - 'SRF', 1, None), - ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None), - ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None), - ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None), - ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None), - ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None), - ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None), - ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None), - ('amonkey/s_4_call.png', 'IVC Call', 4, None), - ('fishie/s_5_all.png', 'IVC All', 5, None), - ('/random/Summary.htm', 'Summary.htm', None, None), - ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None), - ] - for filename, typename, lane, end in cases: - result = models.find_file_type_metadata_from_filename(filename) - self.assertEqual(result['file_type'], - file_type_objects.get(name=typename)) - self.assertEqual(result.get('lane',None), lane) - self.assertEqual(result.get('end', None), end) - -class TestEmailNotify(TestCase): - fixtures = ['test_flowcells.json'] - - def test_started_email_not_logged_in(self): - response = self.client.get('/experiments/started/153/') - self.assertEqual(response.status_code, 302) - - def test_started_email_logged_in_user(self): - self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') - response = self.client.get('/experiments/started/153/') - self.assertEqual(response.status_code, 302) - - def test_started_email_logged_in_staff(self): - self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5') - response = self.client.get('/experiments/started/153/') - self.assertEqual(response.status_code, 200) - - def test_started_email_send(self): - self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5') - response = self.client.get('/experiments/started/153/') - self.assertEqual(response.status_code, 200) - - self.assertTrue('pk1@example.com' in response.content) - self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content) - - response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'}) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(mail.outbox), 4) - bcc = set(settings.NOTIFICATION_BCC).intersect(set(settings.MANAGERS)) - for m in mail.outbox: - self.assertTrue(len(m.body) > 0) - self.assertEqual(m.bcc, bcc) - - def test_email_navigation(self): - """ - Can we navigate between the flowcell and email forms properly? - """ - self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5') - response = self.client.get('/experiments/started/153/') - self.assertEqual(response.status_code, 200) - self.assertTrue(re.search('Flowcell FC12150', response.content)) - # require that navigation back to the admin page exists - self.assertTrue(re.search('[^<]+', response.content)) - -def multi_lane_to_dict(lane): - """Convert a list of lane entries into a dictionary indexed by library ID - """ - return dict( ((x['library_id'],x) for x in lane) ) - -class TestSequencer(TestCase): - fixtures = ['test_flowcells.json', - ] - - def test_name_generation(self): - seq = models.Sequencer() - seq.name = "Seq1" - seq.instrument_name = "HWI-SEQ1" - seq.model = "Imaginary 5000" - - self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)") - - def test_lookup(self): - fc = models.FlowCell.objects.get(pk=153) - self.assertEqual(fc.sequencer.model, - "Illumina Genome Analyzer IIx") - self.assertEqual(fc.sequencer.instrument_name, - "ILLUMINA-EC5D15") - # well actually we let the browser tack on the host name - url = fc.get_absolute_url() - self.assertEqual(url, '/flowcell/FC12150/') - - def test_rdf(self): - response = self.client.get('/flowcell/FC12150/', apidata) - tree = fromstring(response.content) - seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]', - namespaces=NSMAP) - self.assertEqual(len(seq_by), 1) - self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by') - seq = seq_by[0].getchildren() - self.assertEqual(len(seq), 1) - self.assertEqual(seq[0].attrib['about'], '/sequencer/2') - self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer') - - name = seq[0].xpath('./span[@property="libns:sequencer_name"]') - self.assertEqual(len(name), 1) - self.assertEqual(name[0].text, 'Tardigrade') - instrument = seq[0].xpath( - './span[@property="libns:sequencer_instrument"]') - self.assertEqual(len(instrument), 1) - self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15') - model = seq[0].xpath( - './span[@property="libns:sequencer_model"]') - self.assertEqual(len(model), 1) - self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx') - - def test_flowcell_with_rdf_validation(self): - from htsworkflow.util.rdfhelp import add_default_schemas, \ - dump_model, \ - get_model, \ - load_string_into_model - from htsworkflow.util.rdfinfer import Infer - - model = get_model() - add_default_schemas(model) - inference = Infer(model) - - url ='/flowcell/FC12150/' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - status = validate_xhtml(response.content) - if status is not None: self.assertTrue(status) - - load_string_into_model(model, 'rdfa', response.content) - - errmsgs = list(inference.run_validation()) - self.assertEqual(len(errmsgs), 2) - for errmsg in errmsgs: - self.assertEqual(errmsg, 'Missing type for: http://localhost/') - - def test_lane_with_rdf_validation(self): - from htsworkflow.util.rdfhelp import add_default_schemas, \ - dump_model, \ - get_model, \ - load_string_into_model - from htsworkflow.util.rdfinfer import Infer - - model = get_model() - add_default_schemas(model) - inference = Infer(model) - - url = '/lane/1193' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - status = validate_xhtml(response.content) - if status is not None: self.assertTrue(status) - - load_string_into_model(model, 'rdfa', response.content) - - errmsgs = list(inference.run_validation()) - self.assertEqual(len(errmsgs), 2) - for errmsg in errmsgs: - self.assertEqual(errmsg, 'Missing type for: http://localhost/') diff --git a/htsworkflow/frontend/inventory/test_inventory.py b/htsworkflow/frontend/inventory/test_inventory.py new file mode 100644 index 0000000..024a334 --- /dev/null +++ b/htsworkflow/frontend/inventory/test_inventory.py @@ -0,0 +1,117 @@ +import RDF +import unittest + +from django.test import TestCase +from django.contrib.auth.models import User +from django.core import urlresolvers + +from htsworkflow.frontend.inventory.models import Item, Vendor +from htsworkflow.util.rdfhelp import get_model, load_string_into_model, get_serializer, inventoryOntology, libraryOntology, fromTypedNode + +def localhostNode(url): + return RDF.Node(RDF.Uri('http://localhost%s' % (url,))) + +class InventoryTestCase(TestCase): + fixtures = ['test_user', 'test_harddisks'] + + def test_fixture(self): + # make sure that some of our test data is was loaded + # since there was no error message when I typoed the test fixture + hd1 = Item.objects.get(pk=1) + self.failUnlessEqual(hd1.uuid, '8a90b6ce522311de99b00015172ce556') + + user = User.objects.get(pk=5) + self.failUnlessEqual(user.username, 'test') + + def test_item(self): + url = '/inventory/8a90b6ce522311de99b00015172ce556/' + self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') + response = self.client.get(url) + self.failUnlessEqual(response.status_code, 200) + + model = get_model() + load_string_into_model(model, 'rdfa', response.content, url) + + itemNode = RDF.Node(RDF.Uri(url)) + item_type = fromTypedNode(model.get_target(itemNode, inventoryOntology['item_type'])) + self.failUnlessEqual(item_type, u'Hard Drive') + + def test_itemindex(self): + url = '/inventory/it/Hard Drive/' + indexNode = localhostNode(url) + diskNode = localhostNode('/inventory/8a90b6ce522311de99b00015172ce556/') + self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') + + flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) + self.failUnlessEqual(len(flowcells), 2) + self.failUnless('http://localhost/flowcell/11ONEAAXX/' in flowcells) + self.failUnless('http://localhost/flowcell/22TWOAAXX/' in flowcells) + + def test_add_disk(self): + url = '/inventory/it/Hard Drive/' + #url_disk = '/inventory/8a90b6ce522311de99b00015172ce556/' + url_disk = '/inventory/b0792d425aa411de99b00015172ce556/' + indexNode = localhostNode(url) + diskNode = localhostNode(url_disk) + self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') + + flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) + self.failUnlessEqual(len(flowcells), 0) + + # step two link the flowcell + flowcell = '22TWOAAXX' + serial = 'WCAU49042470' + link_url = urlresolvers.reverse( + 'htsworkflow.frontend.inventory.views.link_flowcell_and_device', + args=(flowcell, serial)) + link_response = self.client.get(link_url) + self.failUnlessEqual(link_response.status_code, 200) + + flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) + self.failUnlessEqual(len(flowcells), 1) + self.failUnlessEqual('http://localhost/flowcell/%s/' % (flowcell), + flowcells[0]) + + def test_add_disk_failed_flowcell(self): + url = '/inventory/it/Hard Drive/' + #url_disk = '/inventory/8a90b6ce522311de99b00015172ce556/' + url_disk = '/inventory/b0792d425aa411de99b00015172ce556/' + indexNode = localhostNode(url) + diskNode = localhostNode(url_disk) + self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') + + flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) + self.failUnlessEqual(len(flowcells), 0) + + # step two link the flowcell + flowcell = '33THRAAXX' + serial = 'WCAU49042470' + link_url = urlresolvers.reverse( + 'htsworkflow.frontend.inventory.views.link_flowcell_and_device', + args=(flowcell, serial)) + link_response = self.client.get(link_url) + self.failUnlessEqual(link_response.status_code, 200) + + flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) + self.failUnlessEqual(len(flowcells), 1) + self.failUnlessEqual('http://localhost/flowcell/%s/' % (flowcell), + flowcells[0]) + + + def get_flowcells_from_content(self, url, rootNode, diskNode): + model = get_model() + + response = self.client.get(url) + self.failUnlessEqual(response.status_code, 200) + + load_string_into_model(model, 'rdfa', response.content, rootNode.uri) + targets = model.get_targets(diskNode, libraryOntology['flowcell_id']) + flowcells = [ str(x.uri) for x in targets] + return flowcells + +def suite(): + return unittest.makeSuite(InventoryTestCase, 'test') + +if __name__ == "__main__": + unittest.main(defaultTest="suite") + diff --git a/htsworkflow/frontend/inventory/tests.py b/htsworkflow/frontend/inventory/tests.py deleted file mode 100644 index 024a334..0000000 --- a/htsworkflow/frontend/inventory/tests.py +++ /dev/null @@ -1,117 +0,0 @@ -import RDF -import unittest - -from django.test import TestCase -from django.contrib.auth.models import User -from django.core import urlresolvers - -from htsworkflow.frontend.inventory.models import Item, Vendor -from htsworkflow.util.rdfhelp import get_model, load_string_into_model, get_serializer, inventoryOntology, libraryOntology, fromTypedNode - -def localhostNode(url): - return RDF.Node(RDF.Uri('http://localhost%s' % (url,))) - -class InventoryTestCase(TestCase): - fixtures = ['test_user', 'test_harddisks'] - - def test_fixture(self): - # make sure that some of our test data is was loaded - # since there was no error message when I typoed the test fixture - hd1 = Item.objects.get(pk=1) - self.failUnlessEqual(hd1.uuid, '8a90b6ce522311de99b00015172ce556') - - user = User.objects.get(pk=5) - self.failUnlessEqual(user.username, 'test') - - def test_item(self): - url = '/inventory/8a90b6ce522311de99b00015172ce556/' - self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') - response = self.client.get(url) - self.failUnlessEqual(response.status_code, 200) - - model = get_model() - load_string_into_model(model, 'rdfa', response.content, url) - - itemNode = RDF.Node(RDF.Uri(url)) - item_type = fromTypedNode(model.get_target(itemNode, inventoryOntology['item_type'])) - self.failUnlessEqual(item_type, u'Hard Drive') - - def test_itemindex(self): - url = '/inventory/it/Hard Drive/' - indexNode = localhostNode(url) - diskNode = localhostNode('/inventory/8a90b6ce522311de99b00015172ce556/') - self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') - - flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) - self.failUnlessEqual(len(flowcells), 2) - self.failUnless('http://localhost/flowcell/11ONEAAXX/' in flowcells) - self.failUnless('http://localhost/flowcell/22TWOAAXX/' in flowcells) - - def test_add_disk(self): - url = '/inventory/it/Hard Drive/' - #url_disk = '/inventory/8a90b6ce522311de99b00015172ce556/' - url_disk = '/inventory/b0792d425aa411de99b00015172ce556/' - indexNode = localhostNode(url) - diskNode = localhostNode(url_disk) - self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') - - flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) - self.failUnlessEqual(len(flowcells), 0) - - # step two link the flowcell - flowcell = '22TWOAAXX' - serial = 'WCAU49042470' - link_url = urlresolvers.reverse( - 'htsworkflow.frontend.inventory.views.link_flowcell_and_device', - args=(flowcell, serial)) - link_response = self.client.get(link_url) - self.failUnlessEqual(link_response.status_code, 200) - - flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) - self.failUnlessEqual(len(flowcells), 1) - self.failUnlessEqual('http://localhost/flowcell/%s/' % (flowcell), - flowcells[0]) - - def test_add_disk_failed_flowcell(self): - url = '/inventory/it/Hard Drive/' - #url_disk = '/inventory/8a90b6ce522311de99b00015172ce556/' - url_disk = '/inventory/b0792d425aa411de99b00015172ce556/' - indexNode = localhostNode(url) - diskNode = localhostNode(url_disk) - self.client.login(username='test', password='BJOKL5kAj6aFZ6A5') - - flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) - self.failUnlessEqual(len(flowcells), 0) - - # step two link the flowcell - flowcell = '33THRAAXX' - serial = 'WCAU49042470' - link_url = urlresolvers.reverse( - 'htsworkflow.frontend.inventory.views.link_flowcell_and_device', - args=(flowcell, serial)) - link_response = self.client.get(link_url) - self.failUnlessEqual(link_response.status_code, 200) - - flowcells = self.get_flowcells_from_content(url, indexNode, diskNode) - self.failUnlessEqual(len(flowcells), 1) - self.failUnlessEqual('http://localhost/flowcell/%s/' % (flowcell), - flowcells[0]) - - - def get_flowcells_from_content(self, url, rootNode, diskNode): - model = get_model() - - response = self.client.get(url) - self.failUnlessEqual(response.status_code, 200) - - load_string_into_model(model, 'rdfa', response.content, rootNode.uri) - targets = model.get_targets(diskNode, libraryOntology['flowcell_id']) - flowcells = [ str(x.uri) for x in targets] - return flowcells - -def suite(): - return unittest.makeSuite(InventoryTestCase, 'test') - -if __name__ == "__main__": - unittest.main(defaultTest="suite") - diff --git a/htsworkflow/frontend/labels/test_labels.py b/htsworkflow/frontend/labels/test_labels.py new file mode 100644 index 0000000..2247054 --- /dev/null +++ b/htsworkflow/frontend/labels/test_labels.py @@ -0,0 +1,23 @@ +""" +This file demonstrates two different styles of tests (one doctest and one +unittest). These will both pass when you run "manage.py test". + +Replace these with more appropriate tests for your application. +""" + +from django.test import TestCase + +class SimpleTest(TestCase): + def test_basic_addition(self): + """ + Tests that 1 + 1 always equals 2. + """ + self.failUnlessEqual(1 + 1, 2) + +__test__ = {"doctest": """ +Another way to test that 1 + 1 is equal to 2. + +>>> 1 + 1 == 2 +True +"""} + diff --git a/htsworkflow/frontend/labels/tests.py b/htsworkflow/frontend/labels/tests.py deleted file mode 100644 index 2247054..0000000 --- a/htsworkflow/frontend/labels/tests.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -This file demonstrates two different styles of tests (one doctest and one -unittest). These will both pass when you run "manage.py test". - -Replace these with more appropriate tests for your application. -""" - -from django.test import TestCase - -class SimpleTest(TestCase): - def test_basic_addition(self): - """ - Tests that 1 + 1 always equals 2. - """ - self.failUnlessEqual(1 + 1, 2) - -__test__ = {"doctest": """ -Another way to test that 1 + 1 is equal to 2. - ->>> 1 + 1 == 2 -True -"""} - diff --git a/htsworkflow/frontend/samples/test_samples.py b/htsworkflow/frontend/samples/test_samples.py new file mode 100644 index 0000000..4e0c2a3 --- /dev/null +++ b/htsworkflow/frontend/samples/test_samples.py @@ -0,0 +1,333 @@ +import datetime +import unittest + +try: + import json +except ImportError, e: + import simplejson as json + +from django.test import TestCase + +from htsworkflow.frontend.samples.models import \ + Affiliation, \ + ExperimentType, \ + Species, \ + Library + +from htsworkflow.frontend.samples.views import \ + library_dict, \ + library_json + +from htsworkflow.frontend.auth import apidata +from htsworkflow.util.conversion import unicode_or_none +from htsworkflow.util.ethelp import validate_xhtml + +class LibraryTestCase(TestCase): + fixtures = ['test_samples.json'] + + def setUp(self): + create_db(self) + + def testOrganism(self): + self.assertEquals(self.library_10001.organism(), 'human') + + def testAffiliations(self): + self.library_10001.affiliations.add(self.affiliation_alice) + self.library_10002.affiliations.add( + self.affiliation_alice, + self.affiliation_bob + ) + self.failUnless(len(self.library_10001.affiliations.all()), 1) + self.failUnless(self.library_10001.affiliation(), 'Alice') + + self.failUnless(len(self.library_10002.affiliations.all()), 2) + self.failUnless(self.library_10001.affiliation(), 'Alice, Bob') + + +class SampleWebTestCase(TestCase): + """ + Test returning data from our database in rest like ways. + (like returning json objects) + """ + fixtures = ['test_samples.json'] + + def test_library_info(self): + for lib in Library.objects.all(): + lib_dict = library_dict(lib.id) + url = '/samples/library/%s/json' % (lib.id,) + lib_response = self.client.get(url, apidata) + self.failUnlessEqual(lib_response.status_code, 200) + lib_json = json.loads(lib_response.content) + + for d in [lib_dict, lib_json]: + # amplified_from_sample is a link to the library table, + # I want to use the "id" for the data lookups not + # the embedded primary key. + # It gets slightly confusing on how to implement sending the right id + # since amplified_from_sample can be null + #self.failUnlessEqual(d['amplified_from_sample'], lib.amplified_from_sample) + self.failUnlessEqual(d['antibody_id'], lib.antibody_id) + self.failUnlessEqual(d['cell_line_id'], lib.cell_line_id) + self.failUnlessEqual(d['cell_line'], unicode_or_none(lib.cell_line)) + self.failUnlessEqual(d['experiment_type'], lib.experiment_type.name) + self.failUnlessEqual(d['experiment_type_id'], lib.experiment_type_id) + self.failUnlessEqual(d['gel_cut_size'], lib.gel_cut_size) + self.failUnlessEqual(d['hidden'], lib.hidden) + self.failUnlessEqual(d['id'], lib.id) + self.failUnlessEqual(d['insert_size'], lib.insert_size) + self.failUnlessEqual(d['library_name'], lib.library_name) + self.failUnlessEqual(d['library_species'], lib.library_species.scientific_name) + self.failUnlessEqual(d['library_species_id'], lib.library_species_id) + self.failUnlessEqual(d['library_type_id'], lib.library_type_id) + if lib.library_type_id is not None: + self.failUnlessEqual(d['library_type'], lib.library_type.name) + else: + self.failUnlessEqual(d['library_type'], None) + self.failUnlessEqual(d['made_for'], lib.made_for) + self.failUnlessEqual(d['made_by'], lib.made_by) + self.failUnlessEqual(d['notes'], lib.notes) + self.failUnlessEqual(d['replicate'], lib.replicate) + self.failUnlessEqual(d['stopping_point'], lib.stopping_point) + self.failUnlessEqual(d['successful_pM'], lib.successful_pM) + self.failUnlessEqual(d['undiluted_concentration'], + unicode(lib.undiluted_concentration)) + # some specific tests + if lib.id == '10981': + # test a case where there is no known status + lane_set = {u'status': u'Unknown', + u'paired_end': True, + u'read_length': 75, + u'lane_number': 1, + u'lane_id': 1193, + u'flowcell': u'303TUAAXX', + u'status_code': None} + self.failUnlessEqual(len(d['lane_set']), 1) + self.failUnlessEqual(d['lane_set'][0], lane_set) + elif lib.id == '11016': + # test a case where there is a status + lane_set = {u'status': 'Good', + u'paired_end': True, + u'read_length': 75, + u'lane_number': 5, + u'lane_id': 1197, + u'flowcell': u'303TUAAXX', + u'status_code': 2} + self.failUnlessEqual(len(d['lane_set']), 1) + self.failUnlessEqual(d['lane_set'][0], lane_set) + + + def test_invalid_library_json(self): + """ + Make sure we get a 404 if we request an invalid library id + """ + response = self.client.get('/samples/library/nottheone/json', apidata) + self.failUnlessEqual(response.status_code, 404) + + + def test_invalid_library(self): + response = self.client.get('/library/nottheone/') + self.failUnlessEqual(response.status_code, 404) + + + def test_library_no_key(self): + """ + Make sure we get a 302 if we're not logged in + """ + response = self.client.get('/samples/library/10981/json') + self.failUnlessEqual(response.status_code, 403) + response = self.client.get('/samples/library/10981/json', apidata) + self.failUnlessEqual(response.status_code, 200) + + def test_library_rdf(self): + import RDF + from htsworkflow.util.rdfhelp import get_model, \ + dump_model, \ + fromTypedNode, \ + load_string_into_model, \ + rdfNS, \ + libraryOntology + model = get_model() + + response = self.client.get('/library/10981/') + self.assertEqual(response.status_code, 200) + content = response.content + load_string_into_model(model, 'rdfa', content) + + body = """prefix rdf: + prefix libns: + + select ?library ?name ?library_id ?gel_cut ?made_by + where { + ?library a libns:library ; + libns:name ?name ; + libns:library_id ?library_id ; + libns:gel_cut ?gel_cut ; + libns:made_by ?made_by + }""" + query = RDF.SPARQLQuery(body) + for r in query.execute(model): + self.assertEqual(fromTypedNode(r['library_id']), u'10981') + self.assertEqual(fromTypedNode(r['name']), + u'Paired End Multiplexed Sp-BAC') + self.assertEqual(fromTypedNode(r['gel_cut']), 400) + self.assertEqual(fromTypedNode(r['made_by']), u'Igor') + + state = validate_xhtml(content) + if state is not None: + self.assertTrue(state) + + # validate a library page. + from htsworkflow.util.rdfhelp import add_default_schemas + from htsworkflow.util.rdfinfer import Infer + add_default_schemas(model) + inference = Infer(model) + ignored = {'Missing type for: http://localhost/'} + errmsgs = [msg for msg in inference.run_validation() + if msg not in ignored ] + self.assertEqual(len(errmsgs), 0) + + def test_library_index_rdfa(self): + from htsworkflow.util.rdfhelp import \ + add_default_schemas, get_model, load_string_into_model, \ + dump_model + from htsworkflow.util.rdfinfer import Infer + + model = get_model() + add_default_schemas(model) + inference = Infer(model) + + response = self.client.get('/library/') + self.assertEqual(response.status_code, 200) + load_string_into_model(model, 'rdfa', response.content) + + ignored = {'Missing type for: http://localhost/'} + errmsgs = [msg for msg in inference.run_validation() + if msg not in ignored ] + self.assertEqual(len(errmsgs), 0) + + body = """prefix rdf: + prefix libns: + + select ?library ?library_id ?name ?species_name + where { + ?library a libns:Library . + OPTIONAL { ?library libns:library_id ?library_id . } + OPTIONAL { ?library libns:species_name ?species_name . } + OPTIONAL { ?library libns:name ?name . } + }""" + bindings = set(['library', 'library_id', 'name', 'species_name']) + query = RDF.SPARQLQuery(body) + count = 0 + for r in query.execute(model): + count += 1 + for name, value in r.items(): + self.assertTrue(name in bindings) + self.assertTrue(value is not None) + + self.assertEqual(count, len(Library.objects.filter(hidden=False))) + + state = validate_xhtml(response.content) + if state is not None: self.assertTrue(state) + + +# The django test runner flushes the database between test suites not cases, +# so to be more compatible with running via nose we flush the database tables +# of interest before creating our sample data. +def create_db(obj): + obj.species_human = Species.objects.get(pk=8) + obj.experiment_rna_seq = ExperimentType.objects.get(pk=4) + obj.affiliation_alice = Affiliation.objects.get(pk=1) + obj.affiliation_bob = Affiliation.objects.get(pk=2) + + Library.objects.all().delete() + obj.library_10001 = Library( + id = "10001", + library_name = 'C2C12 named poorly', + library_species = obj.species_human, + experiment_type = obj.experiment_rna_seq, + creation_date = datetime.datetime.now(), + made_for = 'scientist unit 2007', + made_by = 'microfludics system 7321', + stopping_point = '2A', + undiluted_concentration = '5.01', + hidden = False, + ) + obj.library_10001.save() + obj.library_10002 = Library( + id = "10002", + library_name = 'Worm named poorly', + library_species = obj.species_human, + experiment_type = obj.experiment_rna_seq, + creation_date = datetime.datetime.now(), + made_for = 'scientist unit 2007', + made_by = 'microfludics system 7321', + stopping_point = '2A', + undiluted_concentration = '5.01', + hidden = False, + ) + obj.library_10002.save() + +try: + import RDF + HAVE_RDF = True + + rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#") + xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#") + libNS = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#") +except ImportError,e: + HAVE_RDF = False + + +class TestRDFaLibrary(TestCase): + fixtures = ['test_samples.json'] + + def test_parse_rdfa(self): + model = get_rdf_memory_model() + parser = RDF.Parser(name='rdfa') + url = '/library/10981/' + lib_response = self.client.get(url) + self.failIfEqual(len(lib_response.content), 0) + + parser.parse_string_into_model(model, + lib_response.content, + 'http://localhost'+url) + # http://jumpgate.caltech.edu/wiki/LibraryOntology#affiliation> + self.check_literal_object(model, ['Bob'], p=libNS['affiliation']) + self.check_literal_object(model, ['Multiplexed'], p=libNS['experiment_type']) + self.check_literal_object(model, ['400'], p=libNS['gel_cut']) + self.check_literal_object(model, ['Igor'], p=libNS['made_by']) + self.check_literal_object(model, ['Paired End Multiplexed Sp-BAC'], p=libNS['name']) + self.check_literal_object(model, ['Drosophila melanogaster'], p=libNS['species_name']) + + self.check_uri_object(model, + [u'http://localhost/lane/1193'], + p=libNS['has_lane']) + + fc_uri = RDF.Uri('http://localhost/flowcell/303TUAAXX/') + self.check_literal_object(model, + [u"303TUAAXX"], + s=fc_uri, p=libNS['flowcell_id']) + + def check_literal_object(self, model, values, s=None, p=None, o=None): + statements = list(model.find_statements( + RDF.Statement(s,p,o))) + self.failUnlessEqual(len(statements), len(values), + "Couln't find %s %s %s" % (s,p,o)) + for s in statements: + self.failUnless(s.object.literal_value['string'] in values) + + + def check_uri_object(self, model, values, s=None, p=None, o=None): + statements = list(model.find_statements( + RDF.Statement(s,p,o))) + self.failUnlessEqual(len(statements), len(values), + "Couln't find %s %s %s" % (s,p,o)) + for s in statements: + self.failUnless(unicode(s.object.uri) in values) + + + +def get_rdf_memory_model(): + storage = RDF.MemoryStorage() + model = RDF.Model(storage) + return model diff --git a/htsworkflow/frontend/samples/tests.py b/htsworkflow/frontend/samples/tests.py deleted file mode 100644 index 4e0c2a3..0000000 --- a/htsworkflow/frontend/samples/tests.py +++ /dev/null @@ -1,333 +0,0 @@ -import datetime -import unittest - -try: - import json -except ImportError, e: - import simplejson as json - -from django.test import TestCase - -from htsworkflow.frontend.samples.models import \ - Affiliation, \ - ExperimentType, \ - Species, \ - Library - -from htsworkflow.frontend.samples.views import \ - library_dict, \ - library_json - -from htsworkflow.frontend.auth import apidata -from htsworkflow.util.conversion import unicode_or_none -from htsworkflow.util.ethelp import validate_xhtml - -class LibraryTestCase(TestCase): - fixtures = ['test_samples.json'] - - def setUp(self): - create_db(self) - - def testOrganism(self): - self.assertEquals(self.library_10001.organism(), 'human') - - def testAffiliations(self): - self.library_10001.affiliations.add(self.affiliation_alice) - self.library_10002.affiliations.add( - self.affiliation_alice, - self.affiliation_bob - ) - self.failUnless(len(self.library_10001.affiliations.all()), 1) - self.failUnless(self.library_10001.affiliation(), 'Alice') - - self.failUnless(len(self.library_10002.affiliations.all()), 2) - self.failUnless(self.library_10001.affiliation(), 'Alice, Bob') - - -class SampleWebTestCase(TestCase): - """ - Test returning data from our database in rest like ways. - (like returning json objects) - """ - fixtures = ['test_samples.json'] - - def test_library_info(self): - for lib in Library.objects.all(): - lib_dict = library_dict(lib.id) - url = '/samples/library/%s/json' % (lib.id,) - lib_response = self.client.get(url, apidata) - self.failUnlessEqual(lib_response.status_code, 200) - lib_json = json.loads(lib_response.content) - - for d in [lib_dict, lib_json]: - # amplified_from_sample is a link to the library table, - # I want to use the "id" for the data lookups not - # the embedded primary key. - # It gets slightly confusing on how to implement sending the right id - # since amplified_from_sample can be null - #self.failUnlessEqual(d['amplified_from_sample'], lib.amplified_from_sample) - self.failUnlessEqual(d['antibody_id'], lib.antibody_id) - self.failUnlessEqual(d['cell_line_id'], lib.cell_line_id) - self.failUnlessEqual(d['cell_line'], unicode_or_none(lib.cell_line)) - self.failUnlessEqual(d['experiment_type'], lib.experiment_type.name) - self.failUnlessEqual(d['experiment_type_id'], lib.experiment_type_id) - self.failUnlessEqual(d['gel_cut_size'], lib.gel_cut_size) - self.failUnlessEqual(d['hidden'], lib.hidden) - self.failUnlessEqual(d['id'], lib.id) - self.failUnlessEqual(d['insert_size'], lib.insert_size) - self.failUnlessEqual(d['library_name'], lib.library_name) - self.failUnlessEqual(d['library_species'], lib.library_species.scientific_name) - self.failUnlessEqual(d['library_species_id'], lib.library_species_id) - self.failUnlessEqual(d['library_type_id'], lib.library_type_id) - if lib.library_type_id is not None: - self.failUnlessEqual(d['library_type'], lib.library_type.name) - else: - self.failUnlessEqual(d['library_type'], None) - self.failUnlessEqual(d['made_for'], lib.made_for) - self.failUnlessEqual(d['made_by'], lib.made_by) - self.failUnlessEqual(d['notes'], lib.notes) - self.failUnlessEqual(d['replicate'], lib.replicate) - self.failUnlessEqual(d['stopping_point'], lib.stopping_point) - self.failUnlessEqual(d['successful_pM'], lib.successful_pM) - self.failUnlessEqual(d['undiluted_concentration'], - unicode(lib.undiluted_concentration)) - # some specific tests - if lib.id == '10981': - # test a case where there is no known status - lane_set = {u'status': u'Unknown', - u'paired_end': True, - u'read_length': 75, - u'lane_number': 1, - u'lane_id': 1193, - u'flowcell': u'303TUAAXX', - u'status_code': None} - self.failUnlessEqual(len(d['lane_set']), 1) - self.failUnlessEqual(d['lane_set'][0], lane_set) - elif lib.id == '11016': - # test a case where there is a status - lane_set = {u'status': 'Good', - u'paired_end': True, - u'read_length': 75, - u'lane_number': 5, - u'lane_id': 1197, - u'flowcell': u'303TUAAXX', - u'status_code': 2} - self.failUnlessEqual(len(d['lane_set']), 1) - self.failUnlessEqual(d['lane_set'][0], lane_set) - - - def test_invalid_library_json(self): - """ - Make sure we get a 404 if we request an invalid library id - """ - response = self.client.get('/samples/library/nottheone/json', apidata) - self.failUnlessEqual(response.status_code, 404) - - - def test_invalid_library(self): - response = self.client.get('/library/nottheone/') - self.failUnlessEqual(response.status_code, 404) - - - def test_library_no_key(self): - """ - Make sure we get a 302 if we're not logged in - """ - response = self.client.get('/samples/library/10981/json') - self.failUnlessEqual(response.status_code, 403) - response = self.client.get('/samples/library/10981/json', apidata) - self.failUnlessEqual(response.status_code, 200) - - def test_library_rdf(self): - import RDF - from htsworkflow.util.rdfhelp import get_model, \ - dump_model, \ - fromTypedNode, \ - load_string_into_model, \ - rdfNS, \ - libraryOntology - model = get_model() - - response = self.client.get('/library/10981/') - self.assertEqual(response.status_code, 200) - content = response.content - load_string_into_model(model, 'rdfa', content) - - body = """prefix rdf: - prefix libns: - - select ?library ?name ?library_id ?gel_cut ?made_by - where { - ?library a libns:library ; - libns:name ?name ; - libns:library_id ?library_id ; - libns:gel_cut ?gel_cut ; - libns:made_by ?made_by - }""" - query = RDF.SPARQLQuery(body) - for r in query.execute(model): - self.assertEqual(fromTypedNode(r['library_id']), u'10981') - self.assertEqual(fromTypedNode(r['name']), - u'Paired End Multiplexed Sp-BAC') - self.assertEqual(fromTypedNode(r['gel_cut']), 400) - self.assertEqual(fromTypedNode(r['made_by']), u'Igor') - - state = validate_xhtml(content) - if state is not None: - self.assertTrue(state) - - # validate a library page. - from htsworkflow.util.rdfhelp import add_default_schemas - from htsworkflow.util.rdfinfer import Infer - add_default_schemas(model) - inference = Infer(model) - ignored = {'Missing type for: http://localhost/'} - errmsgs = [msg for msg in inference.run_validation() - if msg not in ignored ] - self.assertEqual(len(errmsgs), 0) - - def test_library_index_rdfa(self): - from htsworkflow.util.rdfhelp import \ - add_default_schemas, get_model, load_string_into_model, \ - dump_model - from htsworkflow.util.rdfinfer import Infer - - model = get_model() - add_default_schemas(model) - inference = Infer(model) - - response = self.client.get('/library/') - self.assertEqual(response.status_code, 200) - load_string_into_model(model, 'rdfa', response.content) - - ignored = {'Missing type for: http://localhost/'} - errmsgs = [msg for msg in inference.run_validation() - if msg not in ignored ] - self.assertEqual(len(errmsgs), 0) - - body = """prefix rdf: - prefix libns: - - select ?library ?library_id ?name ?species_name - where { - ?library a libns:Library . - OPTIONAL { ?library libns:library_id ?library_id . } - OPTIONAL { ?library libns:species_name ?species_name . } - OPTIONAL { ?library libns:name ?name . } - }""" - bindings = set(['library', 'library_id', 'name', 'species_name']) - query = RDF.SPARQLQuery(body) - count = 0 - for r in query.execute(model): - count += 1 - for name, value in r.items(): - self.assertTrue(name in bindings) - self.assertTrue(value is not None) - - self.assertEqual(count, len(Library.objects.filter(hidden=False))) - - state = validate_xhtml(response.content) - if state is not None: self.assertTrue(state) - - -# The django test runner flushes the database between test suites not cases, -# so to be more compatible with running via nose we flush the database tables -# of interest before creating our sample data. -def create_db(obj): - obj.species_human = Species.objects.get(pk=8) - obj.experiment_rna_seq = ExperimentType.objects.get(pk=4) - obj.affiliation_alice = Affiliation.objects.get(pk=1) - obj.affiliation_bob = Affiliation.objects.get(pk=2) - - Library.objects.all().delete() - obj.library_10001 = Library( - id = "10001", - library_name = 'C2C12 named poorly', - library_species = obj.species_human, - experiment_type = obj.experiment_rna_seq, - creation_date = datetime.datetime.now(), - made_for = 'scientist unit 2007', - made_by = 'microfludics system 7321', - stopping_point = '2A', - undiluted_concentration = '5.01', - hidden = False, - ) - obj.library_10001.save() - obj.library_10002 = Library( - id = "10002", - library_name = 'Worm named poorly', - library_species = obj.species_human, - experiment_type = obj.experiment_rna_seq, - creation_date = datetime.datetime.now(), - made_for = 'scientist unit 2007', - made_by = 'microfludics system 7321', - stopping_point = '2A', - undiluted_concentration = '5.01', - hidden = False, - ) - obj.library_10002.save() - -try: - import RDF - HAVE_RDF = True - - rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#") - xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#") - libNS = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#") -except ImportError,e: - HAVE_RDF = False - - -class TestRDFaLibrary(TestCase): - fixtures = ['test_samples.json'] - - def test_parse_rdfa(self): - model = get_rdf_memory_model() - parser = RDF.Parser(name='rdfa') - url = '/library/10981/' - lib_response = self.client.get(url) - self.failIfEqual(len(lib_response.content), 0) - - parser.parse_string_into_model(model, - lib_response.content, - 'http://localhost'+url) - # http://jumpgate.caltech.edu/wiki/LibraryOntology#affiliation> - self.check_literal_object(model, ['Bob'], p=libNS['affiliation']) - self.check_literal_object(model, ['Multiplexed'], p=libNS['experiment_type']) - self.check_literal_object(model, ['400'], p=libNS['gel_cut']) - self.check_literal_object(model, ['Igor'], p=libNS['made_by']) - self.check_literal_object(model, ['Paired End Multiplexed Sp-BAC'], p=libNS['name']) - self.check_literal_object(model, ['Drosophila melanogaster'], p=libNS['species_name']) - - self.check_uri_object(model, - [u'http://localhost/lane/1193'], - p=libNS['has_lane']) - - fc_uri = RDF.Uri('http://localhost/flowcell/303TUAAXX/') - self.check_literal_object(model, - [u"303TUAAXX"], - s=fc_uri, p=libNS['flowcell_id']) - - def check_literal_object(self, model, values, s=None, p=None, o=None): - statements = list(model.find_statements( - RDF.Statement(s,p,o))) - self.failUnlessEqual(len(statements), len(values), - "Couln't find %s %s %s" % (s,p,o)) - for s in statements: - self.failUnless(s.object.literal_value['string'] in values) - - - def check_uri_object(self, model, values, s=None, p=None, o=None): - statements = list(model.find_statements( - RDF.Statement(s,p,o))) - self.failUnlessEqual(len(statements), len(values), - "Couln't find %s %s %s" % (s,p,o)) - for s in statements: - self.failUnless(unicode(s.object.uri) in values) - - - -def get_rdf_memory_model(): - storage = RDF.MemoryStorage() - model = RDF.Model(storage) - return model