2 from lxml.html import fromstring
6 import simplejson as json
11 from urlparse import urljoin
13 from django.conf import settings
14 from django.core import mail
15 from django.core.exceptions import ObjectDoesNotExist
16 from django.test import TestCase
17 from django.test.utils import setup_test_environment, teardown_test_environment
18 from django.db import connection
19 from django.conf import settings
20 from htsworkflow.frontend.experiments import models
21 from htsworkflow.frontend.experiments import experiments
22 from htsworkflow.frontend.auth import apidata
23 from htsworkflow.util.ethelp import validate_xhtml
25 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
29 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
31 from django.db import connection
33 class ClusterStationTestCases(TestCase):
34 fixtures = ['test_flowcells.json']
36 def test_default(self):
37 c = models.ClusterStation.default()
38 self.assertEqual(c.id, 2)
43 total = models.ClusterStation.objects.filter(isdefault=True).count()
44 self.assertEqual(total, 0)
46 other_default = models.ClusterStation.default()
47 self.assertEqual(other_default.id, 3)
50 def test_update_default(self):
51 old_default = models.ClusterStation.default()
53 c = models.ClusterStation.objects.get(pk=3)
57 new_default = models.ClusterStation.default()
59 self.assertNotEqual(old_default, new_default)
60 self.assertEqual(new_default, c)
62 total = models.ClusterStation.objects.filter(isdefault=True).count()
63 self.assertEqual(total, 1)
65 def test_update_other(self):
66 old_default = models.ClusterStation.default()
67 total = models.ClusterStation.objects.filter(isdefault=True).count()
68 self.assertEqual(total, 1)
70 c = models.ClusterStation.objects.get(pk=1)
71 c.name = "Primary Key 1"
74 total = models.ClusterStation.objects.filter(isdefault=True).count()
75 self.assertEqual(total, 1)
77 new_default = models.ClusterStation.default()
78 self.assertEqual(old_default, new_default)
81 class SequencerTestCases(TestCase):
82 fixtures = ['test_flowcells.json']
84 def test_default(self):
85 # starting with no default
86 s = models.Sequencer.default()
87 self.assertEqual(s.id, 2)
89 total = models.Sequencer.objects.filter(isdefault=True).count()
90 self.assertEqual(total, 1)
95 total = models.Sequencer.objects.filter(isdefault=True).count()
96 self.assertEqual(total, 0)
98 other_default = models.Sequencer.default()
99 self.assertEqual(other_default.id, 7)
101 def test_update_default(self):
102 old_default = models.Sequencer.default()
104 s = models.Sequencer.objects.get(pk=1)
108 new_default = models.Sequencer.default()
110 self.assertNotEqual(old_default, new_default)
111 self.assertEqual(new_default, s)
113 total = models.Sequencer.objects.filter(isdefault=True).count()
114 self.assertEqual(total, 1)
117 def test_update_other(self):
118 old_default = models.Sequencer.default()
119 total = models.Sequencer.objects.filter(isdefault=True).count()
120 self.assertEqual(total, 1)
122 s = models.Sequencer.objects.get(pk=1)
123 s.name = "Primary Key 1"
126 total = models.Sequencer.objects.filter(isdefault=True).count()
127 self.assertEqual(total, 1)
129 new_default = models.Sequencer.default()
130 self.assertEqual(old_default, new_default)
133 class ExperimentsTestCases(TestCase):
134 fixtures = ['test_flowcells.json',
138 self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
139 settings.RESULT_HOME_DIR = self.tempdir
141 self.fc1_id = 'FC12150'
142 self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
143 os.mkdir(self.fc1_root)
144 self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
145 os.mkdir(self.fc1_dir)
146 runxml = 'run_FC12150_2007-09-27.xml'
147 shutil.copy(os.path.join(TESTDATA_DIR, runxml),
148 os.path.join(self.fc1_dir, runxml))
151 os.path.join(TESTDATA_DIR,
152 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
153 os.path.join(self.fc1_dir,
154 'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
157 self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
158 os.mkdir(self.fc2_dir)
159 os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
160 os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
161 os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
164 shutil.rmtree(self.tempdir)
166 def test_flowcell_information(self):
168 Check the code that packs the django objects into simple types.
170 for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
171 fc_dict = experiments.flowcell_information(fc_id)
172 fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
173 self.assertEqual(fc_dict['flowcell_id'], fc_id)
174 self.assertEqual(fc_django.flowcell_id, fc_id)
175 self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
176 self.assertEqual(fc_dict['read_length'], fc_django.read_length)
177 self.assertEqual(fc_dict['notes'], fc_django.notes)
178 self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
180 for lane in fc_django.lane_set.all():
181 lane_contents = fc_dict['lane_set'][lane.lane_number]
182 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
183 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
184 self.assertEqual(lane_dict['comment'], lane.comment)
185 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
186 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
187 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
188 self.assertEqual(lane_dict['library_id'], lane.library.id)
189 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
190 self.assertEqual(lane_dict['library_species'],
191 lane.library.library_species.scientific_name)
193 response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
194 # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
195 fc_json = json.loads(response.content)
196 self.assertEqual(fc_json['flowcell_id'], fc_id)
197 self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
198 self.assertEqual(fc_json['read_length'], fc_django.read_length)
199 self.assertEqual(fc_json['notes'], fc_django.notes)
200 self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
203 for lane in fc_django.lane_set.all():
204 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
205 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
207 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
208 self.assertEqual(lane_dict['comment'], lane.comment)
209 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
210 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
211 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
212 self.assertEqual(lane_dict['library_id'], lane.library.id)
213 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
214 self.assertEqual(lane_dict['library_species'],
215 lane.library.library_species.scientific_name)
217 def test_invalid_flowcell(self):
219 Make sure we get a 404 if we request an invalid flowcell ID
221 response = self.client.get('/experiments/config/nottheone/json', apidata)
222 self.assertEqual(response.status_code, 404)
224 def test_no_key(self):
226 Require logging in to retrieve meta data
228 response = self.client.get(u'/experiments/config/FC12150/json')
229 self.assertEqual(response.status_code, 403)
231 def test_library_id(self):
233 Library IDs should be flexible, so make sure we can retrive a non-numeric ID
235 response = self.client.get('/experiments/config/FC12150/json', apidata)
236 self.assertEqual(response.status_code, 200)
237 flowcell = json.loads(response.content)
239 lane_contents = flowcell['lane_set']['3']
240 lane_library = lane_contents[0]
241 self.assertEqual(lane_library['library_id'], 'SL039')
243 response = self.client.get('/samples/library/SL039/json', apidata)
244 self.assertEqual(response.status_code, 200)
245 library_sl039 = json.loads(response.content)
247 self.assertEqual(library_sl039['library_id'], 'SL039')
249 def test_raw_id_field(self):
253 Library's have IDs, libraries also have primary keys,
254 we eventually had enough libraries that the drop down combo box was too
255 hard to filter through, unfortnately we want a field that uses our library
256 id and not the internal primary key, and raw_id_field uses primary keys.
258 This tests to make sure that the value entered in the raw library id field matches
259 the library id looked up.
261 expected_ids = [u'10981',u'11016',u'SL039',u'11060',
262 u'11061',u'11062',u'11063',u'11064']
263 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
264 response = self.client.get('/admin/experiments/flowcell/153/')
266 tree = fromstring(response.content)
268 xpath_expression = '//input[@id="id_lane_set-%d-library"]'
269 input_field = tree.xpath(xpath_expression % (i,))[0]
270 library_field = input_field.find('../strong')
271 library_id, library_name = library_field.text.split(':')
272 # strip leading '#' sign from name
273 library_id = library_id[1:]
274 self.assertEqual(library_id, expected_ids[i])
275 self.assertEqual(input_field.attrib['value'], library_id)
277 def test_library_to_flowcell_link(self):
279 Make sure the library page includes links to the flowcell pages.
280 That work with flowcell IDs that have parenthetical comments.
282 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
283 response = self.client.get('/library/11070/')
284 self.assertEqual(response.status_code, 200)
285 status = validate_xhtml(response.content)
286 if status is not None: self.assertTrue(status)
288 tree = fromstring(response.content)
289 flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
291 self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)')
292 failed_fc_span = flowcell_spans[0]
293 failed_fc_a = failed_fc_span.getparent()
294 # make sure some of our RDF made it.
295 self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
296 self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
297 fc_response = self.client.get(failed_fc_a.get('href'))
298 self.assertEqual(fc_response.status_code, 200)
299 status = validate_xhtml(response.content)
300 if status is not None: self.assertTrue(status)
302 fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
303 self.assertEqual(fc_lane_response.status_code, 200)
304 status = validate_xhtml(response.content)
305 if status is not None: self.assertTrue(status)
308 def test_pooled_multiplex_id(self):
309 fc_dict = experiments.flowcell_information('42JU1AAXX')
310 lane_contents = fc_dict['lane_set'][3]
311 self.assertEqual(len(lane_contents), 2)
312 lane_dict = multi_lane_to_dict(lane_contents)
314 self.assertEqual(lane_dict['12044']['index_sequence'],
318 self.assertEqual(lane_dict['11045']['index_sequence'],
323 def test_lanes_for(self):
325 Check the code that packs the django objects into simple types.
328 lanes = experiments.lanes_for(user)
329 self.assertEqual(len(lanes), 5)
331 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
332 lanes_json = json.loads(response.content)
333 self.assertEqual(len(lanes), len(lanes_json))
334 for i in range(len(lanes)):
335 self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
336 self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
337 self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
338 self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
340 def test_lanes_for_no_lanes(self):
342 Do we get something meaningful back when the user isn't attached to anything?
345 lanes = experiments.lanes_for(user)
346 self.assertEqual(len(lanes), 0)
348 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
349 lanes_json = json.loads(response.content)
351 def test_lanes_for_no_user(self):
353 Do we get something meaningful back when its the wrong user
355 user = 'not a real user'
356 self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
358 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
359 self.assertEqual(response.status_code, 404)
362 def test_raw_data_dir(self):
363 """Raw data path generator check"""
364 flowcell_id = self.fc1_id
365 raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
367 fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
368 self.assertEqual(fc.get_raw_data_directory(), raw_dir)
370 fc.flowcell_id = flowcell_id + " (failed)"
371 self.assertEqual(fc.get_raw_data_directory(), raw_dir)
374 def test_data_run_import(self):
375 srf_file_type = models.FileType.objects.get(name='SRF')
376 runxml_file_type = models.FileType.objects.get(name='run_xml')
377 flowcell_id = self.fc1_id
378 flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
379 flowcell.update_data_runs()
380 self.assertEqual(len(flowcell.datarun_set.all()), 1)
382 run = flowcell.datarun_set.all()[0]
383 result_files = run.datafile_set.all()
384 result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
386 srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
387 self.assertEqual(srf4.file_type, srf_file_type)
388 self.assertEqual(srf4.library_id, '11060')
389 self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
391 srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
395 os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
397 lane_files = run.lane_files()
398 self.assertEqual(lane_files[4]['srf'], srf4)
400 runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
401 self.assertEqual(runxml.file_type, runxml_file_type)
402 self.assertEqual(runxml.library_id, None)
404 import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
405 # what happens if we import twice?
406 flowcell.import_data_run('FC12150/C1-37',
407 'run_FC12150_2007-09-27.xml')
409 len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
412 def test_read_result_file(self):
413 """make sure we can return a result file
415 flowcell_id = self.fc1_id
416 flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
417 flowcell.update_data_runs()
419 #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
421 result_files = flowcell.datarun_set.all()[0].datafile_set.all()
422 for f in result_files:
423 url = '/experiments/file/%s' % ( f.random_key,)
424 response = self.client.get(url)
425 self.assertEqual(response.status_code, 200)
426 mimetype = f.file_type.mimetype
428 mimetype = 'application/octet-stream'
430 self.assertEqual(mimetype, response['content-type'])
432 def test_flowcell_rdf(self):
434 from htsworkflow.util.rdfhelp import get_model, \
436 load_string_into_model, \
443 expected = {'1': ['11034'],
445 '3': ['12044','11045'],
446 '4': ['11047','13044'],
451 url = '/flowcell/42JU1AAXX/'
452 response = self.client.get(url)
453 self.assertEqual(response.status_code, 200)
454 status = validate_xhtml(response.content)
455 if status is not None: self.assertTrue(status)
457 ns = urljoin('http://localhost', url)
458 load_string_into_model(model, 'rdfa', response.content, ns=ns)
459 body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
460 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
462 select ?flowcell ?flowcell_id ?lane_id ?library_id
464 ?flowcell a libns:IlluminaFlowcell ;
465 libns:flowcell_id ?flowcell_id ;
466 libns:has_lane ?lane .
467 ?lane libns:lane_number ?lane_id ;
468 libns:library ?library .
469 ?library libns:library_id ?library_id .
471 query = RDF.SPARQLQuery(body)
473 for r in query.execute(model):
475 self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
476 lane_id = fromTypedNode(r['lane_id'])
477 library_id = fromTypedNode(r['library_id'])
478 self.assertTrue(library_id in expected[lane_id])
479 self.assertEqual(count, 10)
482 class TestFileType(TestCase):
483 def test_file_type_unicode(self):
484 file_type_objects = models.FileType.objects
485 name = 'QSEQ tarfile'
486 file_type_object = file_type_objects.get(name=name)
487 self.assertEqual(u"QSEQ tarfile",
488 unicode(file_type_object))
490 def test_find_file_type(self):
491 file_type_objects = models.FileType.objects
492 cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
493 'QSEQ tarfile', 7, 1),
494 ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
496 ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
497 ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
498 ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
499 ('s_1_export.txt.bz2','ELAND Export', 1, None),
500 ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
501 ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
502 ('s_3_percent_all.png', 'IVC Percent All', 3, None),
503 ('s_4_call.png', 'IVC Call', 4, None),
504 ('s_5_all.png', 'IVC All', 5, None),
505 ('Summary.htm', 'Summary.htm', None, None),
506 ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
508 for filename, typename, lane, end in cases:
509 ft = models.find_file_type_metadata_from_filename(filename)
510 self.assertEqual(ft['file_type'],
511 file_type_objects.get(name=typename))
512 self.assertEqual(ft.get('lane', None), lane)
513 self.assertEqual(ft.get('end', None), end)
515 def test_assign_file_type_complex_path(self):
516 file_type_objects = models.FileType.objects
517 cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
518 'QSEQ tarfile', 7, 1),
519 ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
521 ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
522 ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
523 ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
524 ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
525 ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
526 ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
527 ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
528 ('amonkey/s_4_call.png', 'IVC Call', 4, None),
529 ('fishie/s_5_all.png', 'IVC All', 5, None),
530 ('/random/Summary.htm', 'Summary.htm', None, None),
531 ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
533 for filename, typename, lane, end in cases:
534 result = models.find_file_type_metadata_from_filename(filename)
535 self.assertEqual(result['file_type'],
536 file_type_objects.get(name=typename))
537 self.assertEqual(result.get('lane',None), lane)
538 self.assertEqual(result.get('end', None), end)
540 class TestEmailNotify(TestCase):
541 fixtures = ['test_flowcells.json']
543 def test_started_email_not_logged_in(self):
544 response = self.client.get('/experiments/started/153/')
545 self.assertEqual(response.status_code, 302)
547 def test_started_email_logged_in_user(self):
548 self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
549 response = self.client.get('/experiments/started/153/')
550 self.assertEqual(response.status_code, 302)
552 def test_started_email_logged_in_staff(self):
553 self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
554 response = self.client.get('/experiments/started/153/')
555 self.assertEqual(response.status_code, 200)
557 def test_started_email_send(self):
558 self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
559 response = self.client.get('/experiments/started/153/')
560 self.assertEqual(response.status_code, 200)
562 self.assertTrue('pk1@example.com' in response.content)
563 self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
565 response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
566 self.assertEqual(response.status_code, 200)
567 self.assertEqual(len(mail.outbox), 4)
568 bcc = set(settings.NOTIFICATION_BCC).copy()
569 bcc.update(set(settings.MANAGERS))
570 for m in mail.outbox:
571 self.assertTrue(len(m.body) > 0)
572 self.assertEqual(set(m.bcc), bcc)
574 def test_email_navigation(self):
576 Can we navigate between the flowcell and email forms properly?
578 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
579 response = self.client.get('/experiments/started/153/')
580 self.assertEqual(response.status_code, 200)
581 self.assertTrue(re.search('Flowcell FC12150', response.content))
582 # require that navigation back to the admin page exists
583 self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
585 def multi_lane_to_dict(lane):
586 """Convert a list of lane entries into a dictionary indexed by library ID
588 return dict( ((x['library_id'],x) for x in lane) )
590 class TestSequencer(TestCase):
591 fixtures = ['test_flowcells.json',
594 def test_name_generation(self):
595 seq = models.Sequencer()
597 seq.instrument_name = "HWI-SEQ1"
598 seq.model = "Imaginary 5000"
600 self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
602 def test_lookup(self):
603 fc = models.FlowCell.objects.get(pk=153)
604 self.assertEqual(fc.sequencer.model,
605 "Illumina Genome Analyzer IIx")
606 self.assertEqual(fc.sequencer.instrument_name,
608 # well actually we let the browser tack on the host name
609 url = fc.get_absolute_url()
610 self.assertEqual(url, '/flowcell/FC12150/')
613 response = self.client.get('/flowcell/FC12150/', apidata)
614 tree = fromstring(response.content)
615 seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
617 self.assertEqual(len(seq_by), 1)
618 self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
619 seq = seq_by[0].getchildren()
620 self.assertEqual(len(seq), 1)
621 self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
622 self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
624 name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
625 self.assertEqual(len(name), 1)
626 self.assertEqual(name[0].text, 'Tardigrade')
627 instrument = seq[0].xpath(
628 './span[@property="libns:sequencer_instrument"]')
629 self.assertEqual(len(instrument), 1)
630 self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
631 model = seq[0].xpath(
632 './span[@property="libns:sequencer_model"]')
633 self.assertEqual(len(model), 1)
634 self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
636 def test_flowcell_with_rdf_validation(self):
637 from htsworkflow.util.rdfhelp import add_default_schemas, \
640 load_string_into_model
641 from htsworkflow.util.rdfinfer import Infer
644 add_default_schemas(model)
645 inference = Infer(model)
647 url ='/flowcell/FC12150/'
648 response = self.client.get(url)
649 self.assertEqual(response.status_code, 200)
650 status = validate_xhtml(response.content)
651 if status is not None: self.assertTrue(status)
653 load_string_into_model(model, 'rdfa', response.content)
655 errmsgs = list(inference.run_validation())
656 self.assertEqual(len(errmsgs), 0)
658 def test_lane_with_rdf_validation(self):
659 from htsworkflow.util.rdfhelp import add_default_schemas, \
662 load_string_into_model
663 from htsworkflow.util.rdfinfer import Infer
666 add_default_schemas(model)
667 inference = Infer(model)
670 response = self.client.get(url)
671 self.assertEqual(response.status_code, 200)
672 status = validate_xhtml(response.content)
673 if status is not None: self.assertTrue(status)
675 load_string_into_model(model, 'rdfa', response.content)
677 errmsgs = list(inference.run_validation())
678 self.assertEqual(len(errmsgs), 0)
681 from unittest2 import TestSuite, defaultTestLoader
683 for testcase in [ClusterStationTestCases,
689 suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
692 if __name__ == "__main__":
693 from unittest2 import main
694 main(defaultTest="suite")