2 from lxml.html import fromstring
6 import simplejson as json
11 from urlparse import urljoin
13 from django.conf import settings
14 from django.core import mail
15 from django.core.exceptions import ObjectDoesNotExist
16 from django.test import TestCase
17 from django.test.utils import setup_test_environment, teardown_test_environment
18 from django.db import connection
19 from django.conf import settings
20 from htsworkflow.frontend.experiments import models
21 from htsworkflow.frontend.experiments import experiments
22 from htsworkflow.frontend.auth import apidata
23 from htsworkflow.util.ethelp import validate_xhtml
25 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
29 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
31 from django.db import connection
33 class ClusterStationTestCases(TestCase):
34 fixtures = ['initial_data.json',
35 'test_flowcells.json']
37 def test_default(self):
38 c = models.ClusterStation.default()
39 self.assertEqual(c.id, 2)
44 total = models.ClusterStation.objects.filter(isdefault=True).count()
45 self.assertEqual(total, 0)
47 other_default = models.ClusterStation.default()
48 self.assertEqual(other_default.id, 3)
51 def test_update_default(self):
52 old_default = models.ClusterStation.default()
54 c = models.ClusterStation.objects.get(pk=3)
58 new_default = models.ClusterStation.default()
60 self.assertNotEqual(old_default, new_default)
61 self.assertEqual(new_default, c)
63 total = models.ClusterStation.objects.filter(isdefault=True).count()
64 self.assertEqual(total, 1)
66 def test_update_other(self):
67 old_default = models.ClusterStation.default()
68 total = models.ClusterStation.objects.filter(isdefault=True).count()
69 self.assertEqual(total, 1)
71 c = models.ClusterStation.objects.get(pk=1)
72 c.name = "Primary Key 1"
75 total = models.ClusterStation.objects.filter(isdefault=True).count()
76 self.assertEqual(total, 1)
78 new_default = models.ClusterStation.default()
79 self.assertEqual(old_default, new_default)
82 class SequencerTestCases(TestCase):
83 fixtures = ['initial_data.json',
84 'test_flowcells.json']
86 def test_default(self):
87 # starting with no default
88 s = models.Sequencer.default()
89 self.assertEqual(s.id, 2)
91 total = models.Sequencer.objects.filter(isdefault=True).count()
92 self.assertEqual(total, 1)
97 total = models.Sequencer.objects.filter(isdefault=True).count()
98 self.assertEqual(total, 0)
100 other_default = models.Sequencer.default()
101 self.assertEqual(other_default.id, 7)
103 def test_update_default(self):
104 old_default = models.Sequencer.default()
106 s = models.Sequencer.objects.get(pk=1)
110 new_default = models.Sequencer.default()
112 self.assertNotEqual(old_default, new_default)
113 self.assertEqual(new_default, s)
115 total = models.Sequencer.objects.filter(isdefault=True).count()
116 self.assertEqual(total, 1)
119 def test_update_other(self):
120 old_default = models.Sequencer.default()
121 total = models.Sequencer.objects.filter(isdefault=True).count()
122 self.assertEqual(total, 1)
124 s = models.Sequencer.objects.get(pk=1)
125 s.name = "Primary Key 1"
128 total = models.Sequencer.objects.filter(isdefault=True).count()
129 self.assertEqual(total, 1)
131 new_default = models.Sequencer.default()
132 self.assertEqual(old_default, new_default)
135 class ExperimentsTestCases(TestCase):
136 fixtures = ['initial_data.json',
137 'test_flowcells.json',
141 self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
142 settings.RESULT_HOME_DIR = self.tempdir
144 self.fc1_id = 'FC12150'
145 self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
146 os.mkdir(self.fc1_root)
147 self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
148 os.mkdir(self.fc1_dir)
149 runxml = 'run_FC12150_2007-09-27.xml'
150 shutil.copy(os.path.join(TESTDATA_DIR, runxml),
151 os.path.join(self.fc1_dir, runxml))
154 os.path.join(TESTDATA_DIR,
155 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
156 os.path.join(self.fc1_dir,
157 'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
160 self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
161 os.mkdir(self.fc2_dir)
162 os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
163 os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
164 os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
167 shutil.rmtree(self.tempdir)
169 def test_flowcell_information(self):
171 Check the code that packs the django objects into simple types.
173 for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
174 fc_dict = experiments.flowcell_information(fc_id)
175 fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
176 self.assertEqual(fc_dict['flowcell_id'], fc_id)
177 self.assertEqual(fc_django.flowcell_id, fc_id)
178 self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
179 self.assertEqual(fc_dict['read_length'], fc_django.read_length)
180 self.assertEqual(fc_dict['notes'], fc_django.notes)
181 self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
183 for lane in fc_django.lane_set.all():
184 lane_contents = fc_dict['lane_set'][lane.lane_number]
185 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
186 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
187 self.assertEqual(lane_dict['comment'], lane.comment)
188 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
189 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
190 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
191 self.assertEqual(lane_dict['library_id'], lane.library.id)
192 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
193 self.assertEqual(lane_dict['library_species'],
194 lane.library.library_species.scientific_name)
196 response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
197 # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
198 fc_json = json.loads(response.content)
199 self.assertEqual(fc_json['flowcell_id'], fc_id)
200 self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
201 self.assertEqual(fc_json['read_length'], fc_django.read_length)
202 self.assertEqual(fc_json['notes'], fc_django.notes)
203 self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
206 for lane in fc_django.lane_set.all():
207 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
208 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
210 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
211 self.assertEqual(lane_dict['comment'], lane.comment)
212 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
213 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
214 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
215 self.assertEqual(lane_dict['library_id'], lane.library.id)
216 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
217 self.assertEqual(lane_dict['library_species'],
218 lane.library.library_species.scientific_name)
220 def test_invalid_flowcell(self):
222 Make sure we get a 404 if we request an invalid flowcell ID
224 response = self.client.get('/experiments/config/nottheone/json', apidata)
225 self.assertEqual(response.status_code, 404)
227 def test_no_key(self):
229 Require logging in to retrieve meta data
231 response = self.client.get(u'/experiments/config/FC12150/json')
232 self.assertEqual(response.status_code, 403)
234 def test_library_id(self):
236 Library IDs should be flexible, so make sure we can retrive a non-numeric ID
238 response = self.client.get('/experiments/config/FC12150/json', apidata)
239 self.assertEqual(response.status_code, 200)
240 flowcell = json.loads(response.content)
242 lane_contents = flowcell['lane_set']['3']
243 lane_library = lane_contents[0]
244 self.assertEqual(lane_library['library_id'], 'SL039')
246 response = self.client.get('/samples/library/SL039/json', apidata)
247 self.assertEqual(response.status_code, 200)
248 library_sl039 = json.loads(response.content)
250 self.assertEqual(library_sl039['library_id'], 'SL039')
252 def test_raw_id_field(self):
256 Library's have IDs, libraries also have primary keys,
257 we eventually had enough libraries that the drop down combo box was too
258 hard to filter through, unfortnately we want a field that uses our library
259 id and not the internal primary key, and raw_id_field uses primary keys.
261 This tests to make sure that the value entered in the raw library id field matches
262 the library id looked up.
264 expected_ids = [u'10981',u'11016',u'SL039',u'11060',
265 u'11061',u'11062',u'11063',u'11064']
266 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
267 response = self.client.get('/admin/experiments/flowcell/153/')
269 tree = fromstring(response.content)
271 xpath_expression = '//input[@id="id_lane_set-%d-library"]'
272 input_field = tree.xpath(xpath_expression % (i,))[0]
273 library_field = input_field.find('../strong')
274 library_id, library_name = library_field.text.split(':')
275 # strip leading '#' sign from name
276 library_id = library_id[1:]
277 self.assertEqual(library_id, expected_ids[i])
278 self.assertEqual(input_field.attrib['value'], library_id)
280 def test_library_to_flowcell_link(self):
282 Make sure the library page includes links to the flowcell pages.
283 That work with flowcell IDs that have parenthetical comments.
285 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
286 response = self.client.get('/library/11070/')
287 self.assertEqual(response.status_code, 200)
288 status = validate_xhtml(response.content)
289 if status is not None: self.assertTrue(status)
291 tree = fromstring(response.content)
292 flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
294 self.assertEqual(flowcell_spans[1].text, '30012AAXX (failed)')
295 failed_fc_span = flowcell_spans[1]
296 failed_fc_a = failed_fc_span.getparent()
297 # make sure some of our RDF made it.
298 self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
299 self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
300 fc_response = self.client.get(failed_fc_a.get('href'))
301 self.assertEqual(fc_response.status_code, 200)
302 status = validate_xhtml(response.content)
303 if status is not None: self.assertTrue(status)
305 fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
306 self.assertEqual(fc_lane_response.status_code, 200)
307 status = validate_xhtml(response.content)
308 if status is not None: self.assertTrue(status)
311 def test_pooled_multiplex_id(self):
312 fc_dict = experiments.flowcell_information('42JU1AAXX')
313 lane_contents = fc_dict['lane_set'][3]
314 self.assertEqual(len(lane_contents), 2)
315 lane_dict = multi_lane_to_dict(lane_contents)
317 self.assertEqual(lane_dict['12044']['index_sequence'],
321 self.assertEqual(lane_dict['11045']['index_sequence'],
326 def test_lanes_for(self):
328 Check the code that packs the django objects into simple types.
331 lanes = experiments.lanes_for(user)
332 self.assertEqual(len(lanes), 5)
334 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
335 lanes_json = json.loads(response.content)
336 self.assertEqual(len(lanes), len(lanes_json))
337 for i in range(len(lanes)):
338 self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
339 self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
340 self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
341 self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
343 def test_lanes_for_no_lanes(self):
345 Do we get something meaningful back when the user isn't attached to anything?
348 lanes = experiments.lanes_for(user)
349 self.assertEqual(len(lanes), 0)
351 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
352 lanes_json = json.loads(response.content)
354 def test_lanes_for_no_user(self):
356 Do we get something meaningful back when its the wrong user
358 user = 'not a real user'
359 self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
361 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
362 self.assertEqual(response.status_code, 404)
365 def test_raw_data_dir(self):
366 """Raw data path generator check"""
367 flowcell_id = self.fc1_id
368 raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
370 fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
371 self.assertEqual(fc.get_raw_data_directory(), raw_dir)
373 fc.flowcell_id = flowcell_id + " (failed)"
374 self.assertEqual(fc.get_raw_data_directory(), raw_dir)
377 def test_data_run_import(self):
378 srf_file_type = models.FileType.objects.get(name='SRF')
379 runxml_file_type = models.FileType.objects.get(name='run_xml')
380 flowcell_id = self.fc1_id
381 flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
382 flowcell.update_data_runs()
383 self.assertEqual(len(flowcell.datarun_set.all()), 1)
385 run = flowcell.datarun_set.all()[0]
386 result_files = run.datafile_set.all()
387 result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
389 srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
390 self.assertEqual(srf4.file_type, srf_file_type)
391 self.assertEqual(srf4.library_id, '11060')
392 self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
394 srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
398 os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
400 lane_files = run.lane_files()
401 self.assertEqual(lane_files[4]['srf'], srf4)
403 runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
404 self.assertEqual(runxml.file_type, runxml_file_type)
405 self.assertEqual(runxml.library_id, None)
407 import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
408 # what happens if we import twice?
409 flowcell.import_data_run('FC12150/C1-37',
410 'run_FC12150_2007-09-27.xml')
412 len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
415 def test_read_result_file(self):
416 """make sure we can return a result file
418 flowcell_id = self.fc1_id
419 flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
420 flowcell.update_data_runs()
422 #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
424 result_files = flowcell.datarun_set.all()[0].datafile_set.all()
425 for f in result_files:
426 url = '/experiments/file/%s' % ( f.random_key,)
427 response = self.client.get(url)
428 self.assertEqual(response.status_code, 200)
429 mimetype = f.file_type.mimetype
431 mimetype = 'application/octet-stream'
433 self.assertEqual(mimetype, response['content-type'])
435 def test_flowcell_rdf(self):
437 from htsworkflow.util.rdfhelp import get_model, \
439 load_string_into_model, \
446 expected = {'1': ['11034'],
448 '3': ['12044','11045'],
449 '4': ['11047','13044'],
454 url = '/flowcell/42JU1AAXX/'
455 response = self.client.get(url)
456 self.assertEqual(response.status_code, 200)
457 status = validate_xhtml(response.content)
458 if status is not None: self.assertTrue(status)
460 ns = urljoin('http://localhost', url)
461 load_string_into_model(model, 'rdfa', response.content, ns=ns)
462 body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
463 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
465 select ?flowcell ?flowcell_id ?lane_id ?library_id
467 ?flowcell a libns:IlluminaFlowcell ;
468 libns:flowcell_id ?flowcell_id ;
469 libns:has_lane ?lane .
470 ?lane libns:lane_number ?lane_id ;
471 libns:library ?library .
472 ?library libns:library_id ?library_id .
474 query = RDF.SPARQLQuery(body)
476 for r in query.execute(model):
478 self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
479 lane_id = fromTypedNode(r['lane_id'])
480 library_id = fromTypedNode(r['library_id'])
481 self.assertTrue(library_id in expected[lane_id])
482 self.assertEqual(count, 10)
485 class TestFileType(TestCase):
486 fixtures = ['initial_data.json',
487 'test_flowcells.json',
490 def test_file_type_unicode(self):
491 file_type_objects = models.FileType.objects
492 name = 'QSEQ tarfile'
493 file_type_object = file_type_objects.get(name=name)
494 self.assertEqual(u"QSEQ tarfile",
495 unicode(file_type_object))
497 def test_find_file_type(self):
498 file_type_objects = models.FileType.objects
499 cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
500 'QSEQ tarfile', 7, 1),
501 ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
503 ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
504 ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
505 ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
506 ('s_1_export.txt.bz2','ELAND Export', 1, None),
507 ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
508 ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
509 ('s_3_percent_all.png', 'IVC Percent All', 3, None),
510 ('s_4_call.png', 'IVC Call', 4, None),
511 ('s_5_all.png', 'IVC All', 5, None),
512 ('Summary.htm', 'Summary.htm', None, None),
513 ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
515 for filename, typename, lane, end in cases:
516 ft = models.find_file_type_metadata_from_filename(filename)
517 self.assertEqual(ft['file_type'],
518 file_type_objects.get(name=typename))
519 self.assertEqual(ft.get('lane', None), lane)
520 self.assertEqual(ft.get('end', None), end)
522 def test_assign_file_type_complex_path(self):
523 file_type_objects = models.FileType.objects
524 cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
525 'QSEQ tarfile', 7, 1),
526 ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
528 ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
529 ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
530 ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
531 ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
532 ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
533 ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
534 ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
535 ('amonkey/s_4_call.png', 'IVC Call', 4, None),
536 ('fishie/s_5_all.png', 'IVC All', 5, None),
537 ('/random/Summary.htm', 'Summary.htm', None, None),
538 ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
540 for filename, typename, lane, end in cases:
541 result = models.find_file_type_metadata_from_filename(filename)
542 self.assertEqual(result['file_type'],
543 file_type_objects.get(name=typename))
544 self.assertEqual(result.get('lane',None), lane)
545 self.assertEqual(result.get('end', None), end)
547 class TestEmailNotify(TestCase):
548 fixtures = ['initial_data.json',
549 'test_flowcells.json']
551 def test_started_email_not_logged_in(self):
552 response = self.client.get('/experiments/started/153/')
553 self.assertEqual(response.status_code, 302)
555 def test_started_email_logged_in_user(self):
556 self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
557 response = self.client.get('/experiments/started/153/')
558 self.assertEqual(response.status_code, 302)
560 def test_started_email_logged_in_staff(self):
561 self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
562 response = self.client.get('/experiments/started/153/')
563 self.assertEqual(response.status_code, 200)
565 def test_started_email_send(self):
566 self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
567 response = self.client.get('/experiments/started/153/')
568 self.assertEqual(response.status_code, 200)
570 self.assertTrue('pk1@example.com' in response.content)
571 self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
573 response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
574 self.assertEqual(response.status_code, 200)
575 self.assertEqual(len(mail.outbox), 4)
576 bcc = set(settings.NOTIFICATION_BCC).copy()
577 bcc.update(set(settings.MANAGERS))
578 for m in mail.outbox:
579 self.assertTrue(len(m.body) > 0)
580 self.assertEqual(set(m.bcc), bcc)
582 def test_email_navigation(self):
584 Can we navigate between the flowcell and email forms properly?
586 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
587 response = self.client.get('/experiments/started/153/')
588 self.assertEqual(response.status_code, 200)
589 self.assertTrue(re.search('Flowcell FC12150', response.content))
590 # require that navigation back to the admin page exists
591 self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
593 def multi_lane_to_dict(lane):
594 """Convert a list of lane entries into a dictionary indexed by library ID
596 return dict( ((x['library_id'],x) for x in lane) )
598 class TestSequencer(TestCase):
599 fixtures = ['initial_data.json',
600 'test_flowcells.json',
603 def test_name_generation(self):
604 seq = models.Sequencer()
606 seq.instrument_name = "HWI-SEQ1"
607 seq.model = "Imaginary 5000"
609 self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
611 def test_lookup(self):
612 fc = models.FlowCell.objects.get(pk=153)
613 self.assertEqual(fc.sequencer.model,
614 "Illumina Genome Analyzer IIx")
615 self.assertEqual(fc.sequencer.instrument_name,
617 # well actually we let the browser tack on the host name
618 url = fc.get_absolute_url()
619 self.assertEqual(url, '/flowcell/FC12150/')
622 response = self.client.get('/flowcell/FC12150/', apidata)
623 tree = fromstring(response.content)
624 seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
626 self.assertEqual(len(seq_by), 1)
627 self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
628 seq = seq_by[0].getchildren()
629 self.assertEqual(len(seq), 1)
630 self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
631 self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
633 name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
634 self.assertEqual(len(name), 1)
635 self.assertEqual(name[0].text, 'Tardigrade')
636 instrument = seq[0].xpath(
637 './span[@property="libns:sequencer_instrument"]')
638 self.assertEqual(len(instrument), 1)
639 self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
640 model = seq[0].xpath(
641 './span[@property="libns:sequencer_model"]')
642 self.assertEqual(len(model), 1)
643 self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
645 def test_flowcell_with_rdf_validation(self):
646 from htsworkflow.util.rdfhelp import add_default_schemas, \
649 load_string_into_model
650 from htsworkflow.util.rdfinfer import Infer
653 add_default_schemas(model)
654 inference = Infer(model)
656 url ='/flowcell/FC12150/'
657 response = self.client.get(url)
658 self.assertEqual(response.status_code, 200)
659 status = validate_xhtml(response.content)
660 if status is not None: self.assertTrue(status)
662 load_string_into_model(model, 'rdfa', response.content)
664 errmsgs = list(inference.run_validation())
665 self.assertEqual(len(errmsgs), 0)
667 def test_lane_with_rdf_validation(self):
668 from htsworkflow.util.rdfhelp import add_default_schemas, \
671 load_string_into_model
672 from htsworkflow.util.rdfinfer import Infer
675 add_default_schemas(model)
676 inference = Infer(model)
679 response = self.client.get(url)
680 self.assertEqual(response.status_code, 200)
681 status = validate_xhtml(response.content)
682 if status is not None: self.assertTrue(status)
684 load_string_into_model(model, 'rdfa', response.content)
686 errmsgs = list(inference.run_validation())
687 self.assertEqual(len(errmsgs), 0)
690 from unittest2 import TestSuite, defaultTestLoader
692 for testcase in [ClusterStationTestCases,
698 suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
701 if __name__ == "__main__":
702 from unittest2 import main
703 main(defaultTest="suite")