2 from lxml.html import fromstring
6 import simplejson as json
11 from urlparse import urljoin
13 from django.conf import settings
14 from django.core import mail
15 from django.core.exceptions import ObjectDoesNotExist
16 from django.test import TestCase
17 from django.test.utils import setup_test_environment, teardown_test_environment
18 from django.db import connection
19 from django.conf import settings
20 from htsworkflow.frontend.experiments import models
21 from htsworkflow.frontend.experiments import experiments
22 from htsworkflow.frontend.auth import apidata
23 from htsworkflow.util.ethelp import validate_xhtml
25 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
29 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
31 from django.db import connection
33 OLD_DB_NAME = settings.DATABASES['default']['NAME']
36 setup_test_environment()
37 settings.DEBUG = False
38 connection.creation.create_test_db(VERBOSITY)
41 connection.creation.destroy_test_db(OLD_DB_NAME, VERBOSITY)
42 teardown_test_environment()
44 class ClusterStationTestCases(TestCase):
45 fixtures = ['test_flowcells.json']
47 def test_default(self):
48 c = models.ClusterStation.default()
49 self.assertEqual(c.id, 2)
54 total = models.ClusterStation.objects.filter(isdefault=True).count()
55 self.assertEqual(total, 0)
57 other_default = models.ClusterStation.default()
58 self.assertEqual(other_default.id, 3)
61 def test_update_default(self):
62 old_default = models.ClusterStation.default()
64 c = models.ClusterStation.objects.get(pk=3)
68 new_default = models.ClusterStation.default()
70 self.assertNotEqual(old_default, new_default)
71 self.assertEqual(new_default, c)
73 total = models.ClusterStation.objects.filter(isdefault=True).count()
74 self.assertEqual(total, 1)
76 def test_update_other(self):
77 old_default = models.ClusterStation.default()
78 total = models.ClusterStation.objects.filter(isdefault=True).count()
79 self.assertEqual(total, 1)
81 c = models.ClusterStation.objects.get(pk=1)
82 c.name = "Primary Key 1"
85 total = models.ClusterStation.objects.filter(isdefault=True).count()
86 self.assertEqual(total, 1)
88 new_default = models.ClusterStation.default()
89 self.assertEqual(old_default, new_default)
92 class SequencerTestCases(TestCase):
93 fixtures = ['test_flowcells.json']
95 def test_default(self):
96 # starting with no default
97 s = models.Sequencer.default()
98 self.assertEqual(s.id, 2)
100 total = models.Sequencer.objects.filter(isdefault=True).count()
101 self.assertEqual(total, 1)
106 total = models.Sequencer.objects.filter(isdefault=True).count()
107 self.assertEqual(total, 0)
109 other_default = models.Sequencer.default()
110 self.assertEqual(other_default.id, 7)
112 def test_update_default(self):
113 old_default = models.Sequencer.default()
115 s = models.Sequencer.objects.get(pk=1)
119 new_default = models.Sequencer.default()
121 self.assertNotEqual(old_default, new_default)
122 self.assertEqual(new_default, s)
124 total = models.Sequencer.objects.filter(isdefault=True).count()
125 self.assertEqual(total, 1)
128 def test_update_other(self):
129 old_default = models.Sequencer.default()
130 total = models.Sequencer.objects.filter(isdefault=True).count()
131 self.assertEqual(total, 1)
133 s = models.Sequencer.objects.get(pk=1)
134 s.name = "Primary Key 1"
137 total = models.Sequencer.objects.filter(isdefault=True).count()
138 self.assertEqual(total, 1)
140 new_default = models.Sequencer.default()
141 self.assertEqual(old_default, new_default)
144 class ExperimentsTestCases(TestCase):
145 fixtures = ['test_flowcells.json',
149 self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
150 settings.RESULT_HOME_DIR = self.tempdir
152 self.fc1_id = 'FC12150'
153 self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
154 os.mkdir(self.fc1_root)
155 self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
156 os.mkdir(self.fc1_dir)
157 runxml = 'run_FC12150_2007-09-27.xml'
158 shutil.copy(os.path.join(TESTDATA_DIR, runxml),
159 os.path.join(self.fc1_dir, runxml))
162 os.path.join(TESTDATA_DIR,
163 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
164 os.path.join(self.fc1_dir,
165 'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
168 self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
169 os.mkdir(self.fc2_dir)
170 os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
171 os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
172 os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
175 shutil.rmtree(self.tempdir)
177 def test_flowcell_information(self):
179 Check the code that packs the django objects into simple types.
181 for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
182 fc_dict = experiments.flowcell_information(fc_id)
183 fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
184 self.assertEqual(fc_dict['flowcell_id'], fc_id)
185 self.assertEqual(fc_django.flowcell_id, fc_id)
186 self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
187 self.assertEqual(fc_dict['read_length'], fc_django.read_length)
188 self.assertEqual(fc_dict['notes'], fc_django.notes)
189 self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
191 for lane in fc_django.lane_set.all():
192 lane_contents = fc_dict['lane_set'][lane.lane_number]
193 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
194 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
195 self.assertEqual(lane_dict['comment'], lane.comment)
196 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
197 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
198 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
199 self.assertEqual(lane_dict['library_id'], lane.library.id)
200 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
201 self.assertEqual(lane_dict['library_species'],
202 lane.library.library_species.scientific_name)
204 response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
205 # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
206 fc_json = json.loads(response.content)
207 self.assertEqual(fc_json['flowcell_id'], fc_id)
208 self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
209 self.assertEqual(fc_json['read_length'], fc_django.read_length)
210 self.assertEqual(fc_json['notes'], fc_django.notes)
211 self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
214 for lane in fc_django.lane_set.all():
215 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
216 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
218 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
219 self.assertEqual(lane_dict['comment'], lane.comment)
220 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
221 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
222 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
223 self.assertEqual(lane_dict['library_id'], lane.library.id)
224 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
225 self.assertEqual(lane_dict['library_species'],
226 lane.library.library_species.scientific_name)
228 def test_invalid_flowcell(self):
230 Make sure we get a 404 if we request an invalid flowcell ID
232 response = self.client.get('/experiments/config/nottheone/json', apidata)
233 self.assertEqual(response.status_code, 404)
235 def test_no_key(self):
237 Require logging in to retrieve meta data
239 response = self.client.get(u'/experiments/config/FC12150/json')
240 self.assertEqual(response.status_code, 403)
242 def test_library_id(self):
244 Library IDs should be flexible, so make sure we can retrive a non-numeric ID
246 response = self.client.get('/experiments/config/FC12150/json', apidata)
247 self.assertEqual(response.status_code, 200)
248 flowcell = json.loads(response.content)
250 lane_contents = flowcell['lane_set']['3']
251 lane_library = lane_contents[0]
252 self.assertEqual(lane_library['library_id'], 'SL039')
254 response = self.client.get('/samples/library/SL039/json', apidata)
255 self.assertEqual(response.status_code, 200)
256 library_sl039 = json.loads(response.content)
258 self.assertEqual(library_sl039['library_id'], 'SL039')
260 def test_raw_id_field(self):
264 Library's have IDs, libraries also have primary keys,
265 we eventually had enough libraries that the drop down combo box was too
266 hard to filter through, unfortnately we want a field that uses our library
267 id and not the internal primary key, and raw_id_field uses primary keys.
269 This tests to make sure that the value entered in the raw library id field matches
270 the library id looked up.
272 expected_ids = [u'10981',u'11016',u'SL039',u'11060',
273 u'11061',u'11062',u'11063',u'11064']
274 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
275 response = self.client.get('/admin/experiments/flowcell/153/')
277 tree = fromstring(response.content)
279 xpath_expression = '//input[@id="id_lane_set-%d-library"]'
280 input_field = tree.xpath(xpath_expression % (i,))[0]
281 library_field = input_field.find('../strong')
282 library_id, library_name = library_field.text.split(':')
283 # strip leading '#' sign from name
284 library_id = library_id[1:]
285 self.assertEqual(library_id, expected_ids[i])
286 self.assertEqual(input_field.attrib['value'], library_id)
288 def test_library_to_flowcell_link(self):
290 Make sure the library page includes links to the flowcell pages.
291 That work with flowcell IDs that have parenthetical comments.
293 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
294 response = self.client.get('/library/11070/')
295 self.assertEqual(response.status_code, 200)
296 status = validate_xhtml(response.content)
297 if status is not None: self.assertTrue(status)
299 tree = fromstring(response.content)
300 flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
302 self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)')
303 failed_fc_span = flowcell_spans[0]
304 failed_fc_a = failed_fc_span.getparent()
305 # make sure some of our RDF made it.
306 self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
307 self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
308 fc_response = self.client.get(failed_fc_a.get('href'))
309 self.assertEqual(fc_response.status_code, 200)
310 status = validate_xhtml(response.content)
311 if status is not None: self.assertTrue(status)
313 fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
314 self.assertEqual(fc_lane_response.status_code, 200)
315 status = validate_xhtml(response.content)
316 if status is not None: self.assertTrue(status)
319 def test_pooled_multiplex_id(self):
320 fc_dict = experiments.flowcell_information('42JU1AAXX')
321 lane_contents = fc_dict['lane_set'][3]
322 self.assertEqual(len(lane_contents), 2)
323 lane_dict = multi_lane_to_dict(lane_contents)
325 self.assertEqual(lane_dict['12044']['index_sequence'],
329 self.assertEqual(lane_dict['11045']['index_sequence'],
334 def test_lanes_for(self):
336 Check the code that packs the django objects into simple types.
339 lanes = experiments.lanes_for(user)
340 self.assertEqual(len(lanes), 5)
342 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
343 lanes_json = json.loads(response.content)
344 self.assertEqual(len(lanes), len(lanes_json))
345 for i in range(len(lanes)):
346 self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
347 self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
348 self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
349 self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
351 def test_lanes_for_no_lanes(self):
353 Do we get something meaningful back when the user isn't attached to anything?
356 lanes = experiments.lanes_for(user)
357 self.assertEqual(len(lanes), 0)
359 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
360 lanes_json = json.loads(response.content)
362 def test_lanes_for_no_user(self):
364 Do we get something meaningful back when its the wrong user
366 user = 'not a real user'
367 self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
369 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
370 self.assertEqual(response.status_code, 404)
373 def test_raw_data_dir(self):
374 """Raw data path generator check"""
375 flowcell_id = self.fc1_id
376 raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
378 fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
379 self.assertEqual(fc.get_raw_data_directory(), raw_dir)
381 fc.flowcell_id = flowcell_id + " (failed)"
382 self.assertEqual(fc.get_raw_data_directory(), raw_dir)
385 def test_data_run_import(self):
386 srf_file_type = models.FileType.objects.get(name='SRF')
387 runxml_file_type = models.FileType.objects.get(name='run_xml')
388 flowcell_id = self.fc1_id
389 flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
390 flowcell.update_data_runs()
391 self.assertEqual(len(flowcell.datarun_set.all()), 1)
393 run = flowcell.datarun_set.all()[0]
394 result_files = run.datafile_set.all()
395 result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
397 srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
398 self.assertEqual(srf4.file_type, srf_file_type)
399 self.assertEqual(srf4.library_id, '11060')
400 self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
402 srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
406 os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
408 lane_files = run.lane_files()
409 self.assertEqual(lane_files[4]['srf'], srf4)
411 runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
412 self.assertEqual(runxml.file_type, runxml_file_type)
413 self.assertEqual(runxml.library_id, None)
415 import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
416 # what happens if we import twice?
417 flowcell.import_data_run('FC12150/C1-37',
418 'run_FC12150_2007-09-27.xml')
420 len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
423 def test_read_result_file(self):
424 """make sure we can return a result file
426 flowcell_id = self.fc1_id
427 flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
428 flowcell.update_data_runs()
430 #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
432 result_files = flowcell.datarun_set.all()[0].datafile_set.all()
433 for f in result_files:
434 url = '/experiments/file/%s' % ( f.random_key,)
435 response = self.client.get(url)
436 self.assertEqual(response.status_code, 200)
437 mimetype = f.file_type.mimetype
439 mimetype = 'application/octet-stream'
441 self.assertEqual(mimetype, response['content-type'])
443 def test_flowcell_rdf(self):
445 from htsworkflow.util.rdfhelp import get_model, \
447 load_string_into_model, \
454 expected = {'1': ['11034'],
456 '3': ['12044','11045'],
457 '4': ['11047','13044'],
462 url = '/flowcell/42JU1AAXX/'
463 response = self.client.get(url)
464 self.assertEqual(response.status_code, 200)
465 status = validate_xhtml(response.content)
466 if status is not None: self.assertTrue(status)
468 ns = urljoin('http://localhost', url)
469 load_string_into_model(model, 'rdfa', response.content, ns=ns)
470 body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
471 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
473 select ?flowcell ?flowcell_id ?lane_id ?library_id
475 ?flowcell a libns:IlluminaFlowcell ;
476 libns:flowcell_id ?flowcell_id ;
477 libns:has_lane ?lane .
478 ?lane libns:lane_number ?lane_id ;
479 libns:library ?library .
480 ?library libns:library_id ?library_id .
482 query = RDF.SPARQLQuery(body)
484 for r in query.execute(model):
486 self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
487 lane_id = fromTypedNode(r['lane_id'])
488 library_id = fromTypedNode(r['library_id'])
489 self.assertTrue(library_id in expected[lane_id])
490 self.assertEqual(count, 10)
493 class TestFileType(TestCase):
494 def test_file_type_unicode(self):
495 file_type_objects = models.FileType.objects
496 name = 'QSEQ tarfile'
497 file_type_object = file_type_objects.get(name=name)
498 self.assertEqual(u"QSEQ tarfile",
499 unicode(file_type_object))
501 def test_find_file_type(self):
502 file_type_objects = models.FileType.objects
503 cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
504 'QSEQ tarfile', 7, 1),
505 ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
507 ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
508 ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
509 ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
510 ('s_1_export.txt.bz2','ELAND Export', 1, None),
511 ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
512 ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
513 ('s_3_percent_all.png', 'IVC Percent All', 3, None),
514 ('s_4_call.png', 'IVC Call', 4, None),
515 ('s_5_all.png', 'IVC All', 5, None),
516 ('Summary.htm', 'Summary.htm', None, None),
517 ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
519 for filename, typename, lane, end in cases:
520 ft = models.find_file_type_metadata_from_filename(filename)
521 self.assertEqual(ft['file_type'],
522 file_type_objects.get(name=typename))
523 self.assertEqual(ft.get('lane', None), lane)
524 self.assertEqual(ft.get('end', None), end)
526 def test_assign_file_type_complex_path(self):
527 file_type_objects = models.FileType.objects
528 cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
529 'QSEQ tarfile', 7, 1),
530 ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
532 ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
533 ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
534 ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
535 ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
536 ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
537 ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
538 ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
539 ('amonkey/s_4_call.png', 'IVC Call', 4, None),
540 ('fishie/s_5_all.png', 'IVC All', 5, None),
541 ('/random/Summary.htm', 'Summary.htm', None, None),
542 ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
544 for filename, typename, lane, end in cases:
545 result = models.find_file_type_metadata_from_filename(filename)
546 self.assertEqual(result['file_type'],
547 file_type_objects.get(name=typename))
548 self.assertEqual(result.get('lane',None), lane)
549 self.assertEqual(result.get('end', None), end)
551 class TestEmailNotify(TestCase):
552 fixtures = ['test_flowcells.json']
554 def test_started_email_not_logged_in(self):
555 response = self.client.get('/experiments/started/153/')
556 self.assertEqual(response.status_code, 302)
558 def test_started_email_logged_in_user(self):
559 self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
560 response = self.client.get('/experiments/started/153/')
561 self.assertEqual(response.status_code, 302)
563 def test_started_email_logged_in_staff(self):
564 self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
565 response = self.client.get('/experiments/started/153/')
566 self.assertEqual(response.status_code, 200)
568 def test_started_email_send(self):
569 self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
570 response = self.client.get('/experiments/started/153/')
571 self.assertEqual(response.status_code, 200)
573 self.assertTrue('pk1@example.com' in response.content)
574 self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
576 response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
577 self.assertEqual(response.status_code, 200)
578 self.assertEqual(len(mail.outbox), 4)
579 bcc = set(settings.NOTIFICATION_BCC).copy()
580 bcc.update(set(settings.MANAGERS))
581 for m in mail.outbox:
582 self.assertTrue(len(m.body) > 0)
583 self.assertEqual(set(m.bcc), bcc)
585 def test_email_navigation(self):
587 Can we navigate between the flowcell and email forms properly?
589 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
590 response = self.client.get('/experiments/started/153/')
591 self.assertEqual(response.status_code, 200)
592 self.assertTrue(re.search('Flowcell FC12150', response.content))
593 # require that navigation back to the admin page exists
594 self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
596 def multi_lane_to_dict(lane):
597 """Convert a list of lane entries into a dictionary indexed by library ID
599 return dict( ((x['library_id'],x) for x in lane) )
601 class TestSequencer(TestCase):
602 fixtures = ['test_flowcells.json',
605 def test_name_generation(self):
606 seq = models.Sequencer()
608 seq.instrument_name = "HWI-SEQ1"
609 seq.model = "Imaginary 5000"
611 self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
613 def test_lookup(self):
614 fc = models.FlowCell.objects.get(pk=153)
615 self.assertEqual(fc.sequencer.model,
616 "Illumina Genome Analyzer IIx")
617 self.assertEqual(fc.sequencer.instrument_name,
619 # well actually we let the browser tack on the host name
620 url = fc.get_absolute_url()
621 self.assertEqual(url, '/flowcell/FC12150/')
624 response = self.client.get('/flowcell/FC12150/', apidata)
625 tree = fromstring(response.content)
626 seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
628 self.assertEqual(len(seq_by), 1)
629 self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
630 seq = seq_by[0].getchildren()
631 self.assertEqual(len(seq), 1)
632 self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
633 self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
635 name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
636 self.assertEqual(len(name), 1)
637 self.assertEqual(name[0].text, 'Tardigrade')
638 instrument = seq[0].xpath(
639 './span[@property="libns:sequencer_instrument"]')
640 self.assertEqual(len(instrument), 1)
641 self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
642 model = seq[0].xpath(
643 './span[@property="libns:sequencer_model"]')
644 self.assertEqual(len(model), 1)
645 self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
647 def test_flowcell_with_rdf_validation(self):
648 from htsworkflow.util.rdfhelp import add_default_schemas, \
651 load_string_into_model
652 from htsworkflow.util.rdfinfer import Infer
655 add_default_schemas(model)
656 inference = Infer(model)
658 url ='/flowcell/FC12150/'
659 response = self.client.get(url)
660 self.assertEqual(response.status_code, 200)
661 status = validate_xhtml(response.content)
662 if status is not None: self.assertTrue(status)
664 load_string_into_model(model, 'rdfa', response.content)
666 errmsgs = list(inference.run_validation())
667 self.assertEqual(len(errmsgs), 0)
669 def test_lane_with_rdf_validation(self):
670 from htsworkflow.util.rdfhelp import add_default_schemas, \
673 load_string_into_model
674 from htsworkflow.util.rdfinfer import Infer
677 add_default_schemas(model)
678 inference = Infer(model)
681 response = self.client.get(url)
682 self.assertEqual(response.status_code, 200)
683 status = validate_xhtml(response.content)
684 if status is not None: self.assertTrue(status)
686 load_string_into_model(model, 'rdfa', response.content)
688 errmsgs = list(inference.run_validation())
689 self.assertEqual(len(errmsgs), 0)
692 OLD_DB = settings.DATABASES['default']['NAME']
694 setup_test_environment()
695 connection.creation.create_test_db()
697 def tearDownModule():
698 connection.creation.destroy_test_db(OLD_DB)
699 teardown_test_environment()
702 from unittest2 import TestSuite, defaultTestLoader
704 for testcase in [ClusterStationTestCases,
710 suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
713 if __name__ == "__main__":
714 from unittest2 import main
715 main(defaultTest="suite")