2 from lxml.html import fromstring
6 import simplejson as json
11 from urlparse import urljoin
13 from django.conf import settings
14 from django.core import mail
15 from django.core.exceptions import ObjectDoesNotExist
16 from django.test import TestCase
17 from django.test.utils import setup_test_environment, teardown_test_environment
18 from django.db import connection
19 from django.conf import settings
20 from htsworkflow.frontend.experiments import models
21 from htsworkflow.frontend.experiments import experiments
22 from htsworkflow.frontend.auth import apidata
23 from htsworkflow.util.ethelp import validate_xhtml
25 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
29 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
31 from django.db import connection
32 OLD_DB_NAME = settings.DATABASE_NAME
35 setup_test_environment()
36 settings.DEBUG = False
37 connection.creation.create_test_db(VERBOSITY)
40 connection.creation.destroy_test_db(OLD_DB_NAME, VERBOSITY)
41 teardown_test_environment()
43 class ClusterStationTestCases(TestCase):
44 fixtures = ['test_flowcells.json']
46 def test_default(self):
47 c = models.ClusterStation.default()
48 self.assertEqual(c.id, 2)
53 total = models.ClusterStation.objects.filter(isdefault=True).count()
54 self.assertEqual(total, 0)
56 other_default = models.ClusterStation.default()
57 self.assertEqual(other_default.id, 3)
60 def test_update_default(self):
61 old_default = models.ClusterStation.default()
63 c = models.ClusterStation.objects.get(pk=3)
67 new_default = models.ClusterStation.default()
69 self.assertNotEqual(old_default, new_default)
70 self.assertEqual(new_default, c)
72 total = models.ClusterStation.objects.filter(isdefault=True).count()
73 self.assertEqual(total, 1)
75 def test_update_other(self):
76 old_default = models.ClusterStation.default()
77 total = models.ClusterStation.objects.filter(isdefault=True).count()
78 self.assertEqual(total, 1)
80 c = models.ClusterStation.objects.get(pk=1)
81 c.name = "Primary Key 1"
84 total = models.ClusterStation.objects.filter(isdefault=True).count()
85 self.assertEqual(total, 1)
87 new_default = models.ClusterStation.default()
88 self.assertEqual(old_default, new_default)
91 class SequencerTestCases(TestCase):
92 fixtures = ['test_flowcells.json']
94 def test_default(self):
95 # starting with no default
96 s = models.Sequencer.default()
97 self.assertEqual(s.id, 2)
99 total = models.Sequencer.objects.filter(isdefault=True).count()
100 self.assertEqual(total, 1)
105 total = models.Sequencer.objects.filter(isdefault=True).count()
106 self.assertEqual(total, 0)
108 other_default = models.Sequencer.default()
109 self.assertEqual(other_default.id, 7)
111 def test_update_default(self):
112 old_default = models.Sequencer.default()
114 s = models.Sequencer.objects.get(pk=1)
118 new_default = models.Sequencer.default()
120 self.assertNotEqual(old_default, new_default)
121 self.assertEqual(new_default, s)
123 total = models.Sequencer.objects.filter(isdefault=True).count()
124 self.assertEqual(total, 1)
127 def test_update_other(self):
128 old_default = models.Sequencer.default()
129 total = models.Sequencer.objects.filter(isdefault=True).count()
130 self.assertEqual(total, 1)
132 s = models.Sequencer.objects.get(pk=1)
133 s.name = "Primary Key 1"
136 total = models.Sequencer.objects.filter(isdefault=True).count()
137 self.assertEqual(total, 1)
139 new_default = models.Sequencer.default()
140 self.assertEqual(old_default, new_default)
143 class ExperimentsTestCases(TestCase):
144 fixtures = ['test_flowcells.json',
148 self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
149 settings.RESULT_HOME_DIR = self.tempdir
151 self.fc1_id = 'FC12150'
152 self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
153 os.mkdir(self.fc1_root)
154 self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
155 os.mkdir(self.fc1_dir)
156 runxml = 'run_FC12150_2007-09-27.xml'
157 shutil.copy(os.path.join(TESTDATA_DIR, runxml),
158 os.path.join(self.fc1_dir, runxml))
161 os.path.join(TESTDATA_DIR,
162 'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
163 os.path.join(self.fc1_dir,
164 'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
167 self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
168 os.mkdir(self.fc2_dir)
169 os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
170 os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
171 os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
174 shutil.rmtree(self.tempdir)
176 def test_flowcell_information(self):
178 Check the code that packs the django objects into simple types.
180 for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
181 fc_dict = experiments.flowcell_information(fc_id)
182 fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
183 self.assertEqual(fc_dict['flowcell_id'], fc_id)
184 self.assertEqual(fc_django.flowcell_id, fc_id)
185 self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
186 self.assertEqual(fc_dict['read_length'], fc_django.read_length)
187 self.assertEqual(fc_dict['notes'], fc_django.notes)
188 self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
190 for lane in fc_django.lane_set.all():
191 lane_contents = fc_dict['lane_set'][lane.lane_number]
192 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
193 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
194 self.assertEqual(lane_dict['comment'], lane.comment)
195 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
196 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
197 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
198 self.assertEqual(lane_dict['library_id'], lane.library.id)
199 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
200 self.assertEqual(lane_dict['library_species'],
201 lane.library.library_species.scientific_name)
203 response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
204 # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
205 fc_json = json.loads(response.content)
206 self.assertEqual(fc_json['flowcell_id'], fc_id)
207 self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
208 self.assertEqual(fc_json['read_length'], fc_django.read_length)
209 self.assertEqual(fc_json['notes'], fc_django.notes)
210 self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
213 for lane in fc_django.lane_set.all():
214 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
215 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
217 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
218 self.assertEqual(lane_dict['comment'], lane.comment)
219 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
220 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
221 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
222 self.assertEqual(lane_dict['library_id'], lane.library.id)
223 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
224 self.assertEqual(lane_dict['library_species'],
225 lane.library.library_species.scientific_name)
227 def test_invalid_flowcell(self):
229 Make sure we get a 404 if we request an invalid flowcell ID
231 response = self.client.get('/experiments/config/nottheone/json', apidata)
232 self.assertEqual(response.status_code, 404)
234 def test_no_key(self):
236 Require logging in to retrieve meta data
238 response = self.client.get(u'/experiments/config/FC12150/json')
239 self.assertEqual(response.status_code, 403)
241 def test_library_id(self):
243 Library IDs should be flexible, so make sure we can retrive a non-numeric ID
245 response = self.client.get('/experiments/config/FC12150/json', apidata)
246 self.assertEqual(response.status_code, 200)
247 flowcell = json.loads(response.content)
249 lane_contents = flowcell['lane_set']['3']
250 lane_library = lane_contents[0]
251 self.assertEqual(lane_library['library_id'], 'SL039')
253 response = self.client.get('/samples/library/SL039/json', apidata)
254 self.assertEqual(response.status_code, 200)
255 library_sl039 = json.loads(response.content)
257 self.assertEqual(library_sl039['library_id'], 'SL039')
259 def test_raw_id_field(self):
263 Library's have IDs, libraries also have primary keys,
264 we eventually had enough libraries that the drop down combo box was too
265 hard to filter through, unfortnately we want a field that uses our library
266 id and not the internal primary key, and raw_id_field uses primary keys.
268 This tests to make sure that the value entered in the raw library id field matches
269 the library id looked up.
271 expected_ids = [u'10981',u'11016',u'SL039',u'11060',
272 u'11061',u'11062',u'11063',u'11064']
273 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
274 response = self.client.get('/admin/experiments/flowcell/153/')
276 tree = fromstring(response.content)
278 xpath_expression = '//input[@id="id_lane_set-%d-library"]'
279 input_field = tree.xpath(xpath_expression % (i,))[0]
280 library_field = input_field.find('../strong')
281 library_id, library_name = library_field.text.split(':')
282 # strip leading '#' sign from name
283 library_id = library_id[1:]
284 self.assertEqual(library_id, expected_ids[i])
285 self.assertEqual(input_field.attrib['value'], library_id)
287 def test_library_to_flowcell_link(self):
289 Make sure the library page includes links to the flowcell pages.
290 That work with flowcell IDs that have parenthetical comments.
292 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
293 response = self.client.get('/library/11070/')
294 self.assertEqual(response.status_code, 200)
295 status = validate_xhtml(response.content)
296 if status is not None: self.assertTrue(status)
298 tree = fromstring(response.content)
299 flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
301 self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)')
302 failed_fc_span = flowcell_spans[0]
303 failed_fc_a = failed_fc_span.getparent()
304 # make sure some of our RDF made it.
305 self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
306 self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
307 fc_response = self.client.get(failed_fc_a.get('href'))
308 self.assertEqual(fc_response.status_code, 200)
309 status = validate_xhtml(response.content)
310 if status is not None: self.assertTrue(status)
312 fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
313 self.assertEqual(fc_lane_response.status_code, 200)
314 status = validate_xhtml(response.content)
315 if status is not None: self.assertTrue(status)
318 def test_pooled_multiplex_id(self):
319 fc_dict = experiments.flowcell_information('42JU1AAXX')
320 lane_contents = fc_dict['lane_set'][3]
321 self.assertEqual(len(lane_contents), 2)
322 lane_dict = multi_lane_to_dict(lane_contents)
324 self.assertEqual(lane_dict['12044']['index_sequence'],
328 self.assertEqual(lane_dict['11045']['index_sequence'],
333 def test_lanes_for(self):
335 Check the code that packs the django objects into simple types.
338 lanes = experiments.lanes_for(user)
339 self.assertEqual(len(lanes), 5)
341 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
342 lanes_json = json.loads(response.content)
343 self.assertEqual(len(lanes), len(lanes_json))
344 for i in range(len(lanes)):
345 self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
346 self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
347 self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
348 self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
350 def test_lanes_for_no_lanes(self):
352 Do we get something meaningful back when the user isn't attached to anything?
355 lanes = experiments.lanes_for(user)
356 self.assertEqual(len(lanes), 0)
358 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
359 lanes_json = json.loads(response.content)
361 def test_lanes_for_no_user(self):
363 Do we get something meaningful back when its the wrong user
365 user = 'not a real user'
366 self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
368 response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
369 self.assertEqual(response.status_code, 404)
372 def test_raw_data_dir(self):
373 """Raw data path generator check"""
374 flowcell_id = self.fc1_id
375 raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
377 fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
378 self.assertEqual(fc.get_raw_data_directory(), raw_dir)
380 fc.flowcell_id = flowcell_id + " (failed)"
381 self.assertEqual(fc.get_raw_data_directory(), raw_dir)
384 def test_data_run_import(self):
385 srf_file_type = models.FileType.objects.get(name='SRF')
386 runxml_file_type = models.FileType.objects.get(name='run_xml')
387 flowcell_id = self.fc1_id
388 flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
389 flowcell.update_data_runs()
390 self.assertEqual(len(flowcell.datarun_set.all()), 1)
392 run = flowcell.datarun_set.all()[0]
393 result_files = run.datafile_set.all()
394 result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
396 srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
397 self.assertEqual(srf4.file_type, srf_file_type)
398 self.assertEqual(srf4.library_id, '11060')
399 self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
401 srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
405 os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
407 lane_files = run.lane_files()
408 self.assertEqual(lane_files[4]['srf'], srf4)
410 runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
411 self.assertEqual(runxml.file_type, runxml_file_type)
412 self.assertEqual(runxml.library_id, None)
414 import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
415 # what happens if we import twice?
416 flowcell.import_data_run('FC12150/C1-37',
417 'run_FC12150_2007-09-27.xml')
419 len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
422 def test_read_result_file(self):
423 """make sure we can return a result file
425 flowcell_id = self.fc1_id
426 flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
427 flowcell.update_data_runs()
429 #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
431 result_files = flowcell.datarun_set.all()[0].datafile_set.all()
432 for f in result_files:
433 url = '/experiments/file/%s' % ( f.random_key,)
434 response = self.client.get(url)
435 self.assertEqual(response.status_code, 200)
436 mimetype = f.file_type.mimetype
438 mimetype = 'application/octet-stream'
440 self.assertEqual(mimetype, response['content-type'])
442 def test_flowcell_rdf(self):
444 from htsworkflow.util.rdfhelp import get_model, \
446 load_string_into_model, \
453 expected = {'1': ['11034'],
455 '3': ['12044','11045'],
456 '4': ['11047','13044'],
461 url = '/flowcell/42JU1AAXX/'
462 response = self.client.get(url)
463 self.assertEqual(response.status_code, 200)
464 status = validate_xhtml(response.content)
465 if status is not None: self.assertTrue(status)
467 ns = urljoin('http://localhost', url)
468 load_string_into_model(model, 'rdfa', response.content, ns=ns)
469 body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
470 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
472 select ?flowcell ?flowcell_id ?lane_id ?library_id
474 ?flowcell a libns:IlluminaFlowcell ;
475 libns:flowcell_id ?flowcell_id ;
476 libns:has_lane ?lane .
477 ?lane libns:lane_number ?lane_id ;
478 libns:library ?library .
479 ?library libns:library_id ?library_id .
481 query = RDF.SPARQLQuery(body)
483 for r in query.execute(model):
485 self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
486 lane_id = fromTypedNode(r['lane_id'])
487 library_id = fromTypedNode(r['library_id'])
488 self.assertTrue(library_id in expected[lane_id])
489 self.assertEqual(count, 10)
492 class TestFileType(TestCase):
493 def test_file_type_unicode(self):
494 file_type_objects = models.FileType.objects
495 name = 'QSEQ tarfile'
496 file_type_object = file_type_objects.get(name=name)
497 self.assertEqual(u"QSEQ tarfile",
498 unicode(file_type_object))
500 def test_find_file_type(self):
501 file_type_objects = models.FileType.objects
502 cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
503 'QSEQ tarfile', 7, 1),
504 ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
506 ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
507 ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
508 ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
509 ('s_1_export.txt.bz2','ELAND Export', 1, None),
510 ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
511 ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
512 ('s_3_percent_all.png', 'IVC Percent All', 3, None),
513 ('s_4_call.png', 'IVC Call', 4, None),
514 ('s_5_all.png', 'IVC All', 5, None),
515 ('Summary.htm', 'Summary.htm', None, None),
516 ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
518 for filename, typename, lane, end in cases:
519 ft = models.find_file_type_metadata_from_filename(filename)
520 self.assertEqual(ft['file_type'],
521 file_type_objects.get(name=typename))
522 self.assertEqual(ft.get('lane', None), lane)
523 self.assertEqual(ft.get('end', None), end)
525 def test_assign_file_type_complex_path(self):
526 file_type_objects = models.FileType.objects
527 cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
528 'QSEQ tarfile', 7, 1),
529 ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
531 ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
532 ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
533 ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
534 ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
535 ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
536 ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
537 ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
538 ('amonkey/s_4_call.png', 'IVC Call', 4, None),
539 ('fishie/s_5_all.png', 'IVC All', 5, None),
540 ('/random/Summary.htm', 'Summary.htm', None, None),
541 ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
543 for filename, typename, lane, end in cases:
544 result = models.find_file_type_metadata_from_filename(filename)
545 self.assertEqual(result['file_type'],
546 file_type_objects.get(name=typename))
547 self.assertEqual(result.get('lane',None), lane)
548 self.assertEqual(result.get('end', None), end)
550 class TestEmailNotify(TestCase):
551 fixtures = ['test_flowcells.json']
553 def test_started_email_not_logged_in(self):
554 response = self.client.get('/experiments/started/153/')
555 self.assertEqual(response.status_code, 302)
557 def test_started_email_logged_in_user(self):
558 self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
559 response = self.client.get('/experiments/started/153/')
560 self.assertEqual(response.status_code, 302)
562 def test_started_email_logged_in_staff(self):
563 self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
564 response = self.client.get('/experiments/started/153/')
565 self.assertEqual(response.status_code, 200)
567 def test_started_email_send(self):
568 self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
569 response = self.client.get('/experiments/started/153/')
570 self.assertEqual(response.status_code, 200)
572 self.assertTrue('pk1@example.com' in response.content)
573 self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
575 response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
576 self.assertEqual(response.status_code, 200)
577 self.assertEqual(len(mail.outbox), 4)
578 bcc = set(settings.NOTIFICATION_BCC).copy()
579 bcc.update(set(settings.MANAGERS))
580 for m in mail.outbox:
581 self.assertTrue(len(m.body) > 0)
582 self.assertEqual(set(m.bcc), bcc)
584 def test_email_navigation(self):
586 Can we navigate between the flowcell and email forms properly?
588 self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
589 response = self.client.get('/experiments/started/153/')
590 self.assertEqual(response.status_code, 200)
591 self.assertTrue(re.search('Flowcell FC12150', response.content))
592 # require that navigation back to the admin page exists
593 self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
595 def multi_lane_to_dict(lane):
596 """Convert a list of lane entries into a dictionary indexed by library ID
598 return dict( ((x['library_id'],x) for x in lane) )
600 class TestSequencer(TestCase):
601 fixtures = ['test_flowcells.json',
604 def test_name_generation(self):
605 seq = models.Sequencer()
607 seq.instrument_name = "HWI-SEQ1"
608 seq.model = "Imaginary 5000"
610 self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
612 def test_lookup(self):
613 fc = models.FlowCell.objects.get(pk=153)
614 self.assertEqual(fc.sequencer.model,
615 "Illumina Genome Analyzer IIx")
616 self.assertEqual(fc.sequencer.instrument_name,
618 # well actually we let the browser tack on the host name
619 url = fc.get_absolute_url()
620 self.assertEqual(url, '/flowcell/FC12150/')
623 response = self.client.get('/flowcell/FC12150/', apidata)
624 tree = fromstring(response.content)
625 seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
627 self.assertEqual(len(seq_by), 1)
628 self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
629 seq = seq_by[0].getchildren()
630 self.assertEqual(len(seq), 1)
631 self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
632 self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
634 name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
635 self.assertEqual(len(name), 1)
636 self.assertEqual(name[0].text, 'Tardigrade')
637 instrument = seq[0].xpath(
638 './span[@property="libns:sequencer_instrument"]')
639 self.assertEqual(len(instrument), 1)
640 self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
641 model = seq[0].xpath(
642 './span[@property="libns:sequencer_model"]')
643 self.assertEqual(len(model), 1)
644 self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
646 def test_flowcell_with_rdf_validation(self):
647 from htsworkflow.util.rdfhelp import add_default_schemas, \
650 load_string_into_model
651 from htsworkflow.util.rdfinfer import Infer
654 add_default_schemas(model)
655 inference = Infer(model)
657 url ='/flowcell/FC12150/'
658 response = self.client.get(url)
659 self.assertEqual(response.status_code, 200)
660 status = validate_xhtml(response.content)
661 if status is not None: self.assertTrue(status)
663 load_string_into_model(model, 'rdfa', response.content)
665 errmsgs = list(inference.run_validation())
666 self.assertEqual(len(errmsgs), 0)
668 def test_lane_with_rdf_validation(self):
669 from htsworkflow.util.rdfhelp import add_default_schemas, \
672 load_string_into_model
673 from htsworkflow.util.rdfinfer import Infer
676 add_default_schemas(model)
677 inference = Infer(model)
680 response = self.client.get(url)
681 self.assertEqual(response.status_code, 200)
682 status = validate_xhtml(response.content)
683 if status is not None: self.assertTrue(status)
685 load_string_into_model(model, 'rdfa', response.content)
687 errmsgs = list(inference.run_validation())
688 self.assertEqual(len(errmsgs), 0)
691 OLD_DB = settings.DATABASES['default']['NAME']
693 setup_test_environment()
694 connection.creation.create_test_db()
696 def tearDownModule():
697 connection.creation.destroy_test_db(OLD_DB)
698 teardown_test_environment()
701 from unittest2 import TestSuite, defaultTestLoader
703 for testcase in [ClusterStationTestCases,
709 suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
712 if __name__ == "__main__":
713 from unittest2 import main
714 main(defaultTest="suite")