Explicitly list initial_data.json for the test data loader fixtures
[htsworkflow.git] / htsworkflow / frontend / experiments / test_experiments.py
1 import re
2 from lxml.html import fromstring
3 try:
4     import json
5 except ImportError, e:
6     import simplejson as json
7 import os
8 import shutil
9 import sys
10 import tempfile
11 from urlparse import urljoin
12
13 from django.conf import settings
14 from django.core import mail
15 from django.core.exceptions import ObjectDoesNotExist
16 from django.test import TestCase
17 from django.test.utils import setup_test_environment, teardown_test_environment
18 from django.db import connection
19 from django.conf import settings
20 from htsworkflow.frontend.experiments import models
21 from htsworkflow.frontend.experiments import experiments
22 from htsworkflow.frontend.auth import apidata
23 from htsworkflow.util.ethelp import validate_xhtml
24
25 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
26
27 LANE_SET = range(1,9)
28
29 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
30
31 from django.db import connection
32
33 class ClusterStationTestCases(TestCase):
34     fixtures = ['initial_data.json',
35                 'test_flowcells.json']
36
37     def test_default(self):
38         c = models.ClusterStation.default()
39         self.assertEqual(c.id, 2)
40
41         c.isdefault = False
42         c.save()
43
44         total = models.ClusterStation.objects.filter(isdefault=True).count()
45         self.assertEqual(total, 0)
46
47         other_default = models.ClusterStation.default()
48         self.assertEqual(other_default.id, 3)
49
50
51     def test_update_default(self):
52         old_default = models.ClusterStation.default()
53
54         c = models.ClusterStation.objects.get(pk=3)
55         c.isdefault = True
56         c.save()
57
58         new_default = models.ClusterStation.default()
59
60         self.assertNotEqual(old_default, new_default)
61         self.assertEqual(new_default, c)
62
63         total = models.ClusterStation.objects.filter(isdefault=True).count()
64         self.assertEqual(total, 1)
65
66     def test_update_other(self):
67         old_default = models.ClusterStation.default()
68         total = models.ClusterStation.objects.filter(isdefault=True).count()
69         self.assertEqual(total, 1)
70
71         c = models.ClusterStation.objects.get(pk=1)
72         c.name = "Primary Key 1"
73         c.save()
74
75         total = models.ClusterStation.objects.filter(isdefault=True).count()
76         self.assertEqual(total, 1)
77
78         new_default = models.ClusterStation.default()
79         self.assertEqual(old_default, new_default)
80
81
82 class SequencerTestCases(TestCase):
83     fixtures = ['initial_data.json',
84                 'test_flowcells.json']
85
86     def test_default(self):
87         # starting with no default
88         s = models.Sequencer.default()
89         self.assertEqual(s.id, 2)
90
91         total = models.Sequencer.objects.filter(isdefault=True).count()
92         self.assertEqual(total, 1)
93
94         s.isdefault = False
95         s.save()
96
97         total = models.Sequencer.objects.filter(isdefault=True).count()
98         self.assertEqual(total, 0)
99
100         other_default = models.Sequencer.default()
101         self.assertEqual(other_default.id, 7)
102
103     def test_update_default(self):
104         old_default = models.Sequencer.default()
105
106         s = models.Sequencer.objects.get(pk=1)
107         s.isdefault = True
108         s.save()
109
110         new_default = models.Sequencer.default()
111
112         self.assertNotEqual(old_default, new_default)
113         self.assertEqual(new_default, s)
114
115         total = models.Sequencer.objects.filter(isdefault=True).count()
116         self.assertEqual(total, 1)
117
118
119     def test_update_other(self):
120         old_default = models.Sequencer.default()
121         total = models.Sequencer.objects.filter(isdefault=True).count()
122         self.assertEqual(total, 1)
123
124         s = models.Sequencer.objects.get(pk=1)
125         s.name = "Primary Key 1"
126         s.save()
127
128         total = models.Sequencer.objects.filter(isdefault=True).count()
129         self.assertEqual(total, 1)
130
131         new_default = models.Sequencer.default()
132         self.assertEqual(old_default, new_default)
133
134
135 class ExperimentsTestCases(TestCase):
136     fixtures = ['initial_data.json',
137                 'test_flowcells.json',
138                 ]
139
140     def setUp(self):
141         self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
142         settings.RESULT_HOME_DIR = self.tempdir
143
144         self.fc1_id = 'FC12150'
145         self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
146         os.mkdir(self.fc1_root)
147         self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
148         os.mkdir(self.fc1_dir)
149         runxml = 'run_FC12150_2007-09-27.xml'
150         shutil.copy(os.path.join(TESTDATA_DIR, runxml),
151                     os.path.join(self.fc1_dir, runxml))
152         for i in range(1,9):
153             shutil.copy(
154                 os.path.join(TESTDATA_DIR,
155                              'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
156                 os.path.join(self.fc1_dir,
157                              'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
158                 )
159
160         self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
161         os.mkdir(self.fc2_dir)
162         os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
163         os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
164         os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
165
166     def tearDown(self):
167         shutil.rmtree(self.tempdir)
168
169     def test_flowcell_information(self):
170         """
171         Check the code that packs the django objects into simple types.
172         """
173         for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
174             fc_dict = experiments.flowcell_information(fc_id)
175             fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
176             self.assertEqual(fc_dict['flowcell_id'], fc_id)
177             self.assertEqual(fc_django.flowcell_id, fc_id)
178             self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
179             self.assertEqual(fc_dict['read_length'], fc_django.read_length)
180             self.assertEqual(fc_dict['notes'], fc_django.notes)
181             self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
182
183             for lane in fc_django.lane_set.all():
184                 lane_contents = fc_dict['lane_set'][lane.lane_number]
185                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
186                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
187                 self.assertEqual(lane_dict['comment'], lane.comment)
188                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
189                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
190                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
191                 self.assertEqual(lane_dict['library_id'], lane.library.id)
192                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
193                 self.assertEqual(lane_dict['library_species'],
194                                      lane.library.library_species.scientific_name)
195
196             response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
197             # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
198             fc_json = json.loads(response.content)
199             self.assertEqual(fc_json['flowcell_id'], fc_id)
200             self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
201             self.assertEqual(fc_json['read_length'], fc_django.read_length)
202             self.assertEqual(fc_json['notes'], fc_django.notes)
203             self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
204
205
206             for lane in fc_django.lane_set.all():
207                 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
208                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
209
210                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
211                 self.assertEqual(lane_dict['comment'], lane.comment)
212                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
213                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
214                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
215                 self.assertEqual(lane_dict['library_id'], lane.library.id)
216                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
217                 self.assertEqual(lane_dict['library_species'],
218                                      lane.library.library_species.scientific_name)
219
220     def test_invalid_flowcell(self):
221         """
222         Make sure we get a 404 if we request an invalid flowcell ID
223         """
224         response = self.client.get('/experiments/config/nottheone/json', apidata)
225         self.assertEqual(response.status_code, 404)
226
227     def test_no_key(self):
228         """
229         Require logging in to retrieve meta data
230         """
231         response = self.client.get(u'/experiments/config/FC12150/json')
232         self.assertEqual(response.status_code, 403)
233
234     def test_library_id(self):
235         """
236         Library IDs should be flexible, so make sure we can retrive a non-numeric ID
237         """
238         response = self.client.get('/experiments/config/FC12150/json', apidata)
239         self.assertEqual(response.status_code, 200)
240         flowcell = json.loads(response.content)
241
242         lane_contents = flowcell['lane_set']['3']
243         lane_library = lane_contents[0]
244         self.assertEqual(lane_library['library_id'], 'SL039')
245
246         response = self.client.get('/samples/library/SL039/json', apidata)
247         self.assertEqual(response.status_code, 200)
248         library_sl039 = json.loads(response.content)
249
250         self.assertEqual(library_sl039['library_id'], 'SL039')
251
252     def test_raw_id_field(self):
253         """
254         Test ticket:147
255
256         Library's have IDs, libraries also have primary keys,
257         we eventually had enough libraries that the drop down combo box was too
258         hard to filter through, unfortnately we want a field that uses our library
259         id and not the internal primary key, and raw_id_field uses primary keys.
260
261         This tests to make sure that the value entered in the raw library id field matches
262         the library id looked up.
263         """
264         expected_ids = [u'10981',u'11016',u'SL039',u'11060',
265                         u'11061',u'11062',u'11063',u'11064']
266         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
267         response = self.client.get('/admin/experiments/flowcell/153/')
268
269         tree = fromstring(response.content)
270         for i in range(0,8):
271             xpath_expression = '//input[@id="id_lane_set-%d-library"]'
272             input_field = tree.xpath(xpath_expression % (i,))[0]
273             library_field = input_field.find('../strong')
274             library_id, library_name = library_field.text.split(':')
275             # strip leading '#' sign from name
276             library_id = library_id[1:]
277             self.assertEqual(library_id, expected_ids[i])
278             self.assertEqual(input_field.attrib['value'], library_id)
279
280     def test_library_to_flowcell_link(self):
281         """
282         Make sure the library page includes links to the flowcell pages.
283         That work with flowcell IDs that have parenthetical comments.
284         """
285         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
286         response = self.client.get('/library/11070/')
287         self.assertEqual(response.status_code, 200)
288         status = validate_xhtml(response.content)
289         if status is not None: self.assertTrue(status)
290
291         tree = fromstring(response.content)
292         flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
293                                     namespaces=NSMAP)
294         self.assertEqual(flowcell_spans[1].text, '30012AAXX (failed)')
295         failed_fc_span = flowcell_spans[1]
296         failed_fc_a = failed_fc_span.getparent()
297         # make sure some of our RDF made it.
298         self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
299         self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
300         fc_response = self.client.get(failed_fc_a.get('href'))
301         self.assertEqual(fc_response.status_code, 200)
302         status = validate_xhtml(response.content)
303         if status is not None: self.assertTrue(status)
304
305         fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
306         self.assertEqual(fc_lane_response.status_code, 200)
307         status = validate_xhtml(response.content)
308         if status is not None: self.assertTrue(status)
309
310
311     def test_pooled_multiplex_id(self):
312         fc_dict = experiments.flowcell_information('42JU1AAXX')
313         lane_contents = fc_dict['lane_set'][3]
314         self.assertEqual(len(lane_contents), 2)
315         lane_dict = multi_lane_to_dict(lane_contents)
316
317         self.assertEqual(lane_dict['12044']['index_sequence'],
318                          {u'1': u'ATCACG',
319                           u'2': u'CGATGT',
320                           u'3': u'TTAGGC'})
321         self.assertEqual(lane_dict['11045']['index_sequence'],
322                          {u'1': u'ATCACG'})
323
324
325
326     def test_lanes_for(self):
327         """
328         Check the code that packs the django objects into simple types.
329         """
330         user = 'test'
331         lanes = experiments.lanes_for(user)
332         self.assertEqual(len(lanes), 5)
333
334         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
335         lanes_json = json.loads(response.content)
336         self.assertEqual(len(lanes), len(lanes_json))
337         for i in range(len(lanes)):
338             self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
339             self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
340             self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
341             self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
342
343     def test_lanes_for_no_lanes(self):
344         """
345         Do we get something meaningful back when the user isn't attached to anything?
346         """
347         user = 'supertest'
348         lanes = experiments.lanes_for(user)
349         self.assertEqual(len(lanes), 0)
350
351         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
352         lanes_json = json.loads(response.content)
353
354     def test_lanes_for_no_user(self):
355         """
356         Do we get something meaningful back when its the wrong user
357         """
358         user = 'not a real user'
359         self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
360
361         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
362         self.assertEqual(response.status_code, 404)
363
364
365     def test_raw_data_dir(self):
366         """Raw data path generator check"""
367         flowcell_id = self.fc1_id
368         raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
369
370         fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
371         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
372
373         fc.flowcell_id = flowcell_id + " (failed)"
374         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
375
376
377     def test_data_run_import(self):
378         srf_file_type = models.FileType.objects.get(name='SRF')
379         runxml_file_type = models.FileType.objects.get(name='run_xml')
380         flowcell_id = self.fc1_id
381         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
382         flowcell.update_data_runs()
383         self.assertEqual(len(flowcell.datarun_set.all()), 1)
384
385         run = flowcell.datarun_set.all()[0]
386         result_files = run.datafile_set.all()
387         result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
388
389         srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
390         self.assertEqual(srf4.file_type, srf_file_type)
391         self.assertEqual(srf4.library_id, '11060')
392         self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
393         self.assertEqual(
394             srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
395             '11060')
396         self.assertEqual(
397             srf4.pathname,
398             os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
399
400         lane_files = run.lane_files()
401         self.assertEqual(lane_files[4]['srf'], srf4)
402
403         runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
404         self.assertEqual(runxml.file_type, runxml_file_type)
405         self.assertEqual(runxml.library_id, None)
406
407         import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
408         # what happens if we import twice?
409         flowcell.import_data_run('FC12150/C1-37',
410                                  'run_FC12150_2007-09-27.xml')
411         self.assertEqual(
412             len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
413             import1)
414
415     def test_read_result_file(self):
416         """make sure we can return a result file
417         """
418         flowcell_id = self.fc1_id
419         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
420         flowcell.update_data_runs()
421
422         #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
423
424         result_files = flowcell.datarun_set.all()[0].datafile_set.all()
425         for f in result_files:
426             url = '/experiments/file/%s' % ( f.random_key,)
427             response = self.client.get(url)
428             self.assertEqual(response.status_code, 200)
429             mimetype = f.file_type.mimetype
430             if mimetype is None:
431                 mimetype = 'application/octet-stream'
432
433             self.assertEqual(mimetype, response['content-type'])
434
435     def test_flowcell_rdf(self):
436         import RDF
437         from htsworkflow.util.rdfhelp import get_model, \
438              fromTypedNode, \
439              load_string_into_model, \
440              rdfNS, \
441              libraryOntology, \
442              dump_model
443
444         model = get_model()
445
446         expected = {'1': ['11034'],
447                     '2': ['11036'],
448                     '3': ['12044','11045'],
449                     '4': ['11047','13044'],
450                     '5': ['11055'],
451                     '6': ['11067'],
452                     '7': ['11069'],
453                     '8': ['11070']}
454         url = '/flowcell/42JU1AAXX/'
455         response = self.client.get(url)
456         self.assertEqual(response.status_code, 200)
457         status = validate_xhtml(response.content)
458         if status is not None: self.assertTrue(status)
459
460         ns = urljoin('http://localhost', url)
461         load_string_into_model(model, 'rdfa', response.content, ns=ns)
462         body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
463         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
464
465         select ?flowcell ?flowcell_id ?lane_id ?library_id
466         where {
467           ?flowcell a libns:IlluminaFlowcell ;
468                     libns:flowcell_id ?flowcell_id ;
469                     libns:has_lane ?lane .
470           ?lane libns:lane_number ?lane_id ;
471                 libns:library ?library .
472           ?library libns:library_id ?library_id .
473         }"""
474         query = RDF.SPARQLQuery(body)
475         count = 0
476         for r in query.execute(model):
477             count += 1
478             self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
479             lane_id = fromTypedNode(r['lane_id'])
480             library_id = fromTypedNode(r['library_id'])
481             self.assertTrue(library_id in expected[lane_id])
482         self.assertEqual(count, 10)
483
484
485 class TestFileType(TestCase):
486     fixtures = ['initial_data.json',
487                 'test_flowcells.json',
488                 ]
489
490     def test_file_type_unicode(self):
491         file_type_objects = models.FileType.objects
492         name = 'QSEQ tarfile'
493         file_type_object = file_type_objects.get(name=name)
494         self.assertEqual(u"QSEQ tarfile",
495                              unicode(file_type_object))
496
497     def test_find_file_type(self):
498         file_type_objects = models.FileType.objects
499         cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
500                   'QSEQ tarfile', 7, 1),
501                  ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
502                   'SRF', 1, None),
503                  ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
504                  ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
505                  ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
506                  ('s_1_export.txt.bz2','ELAND Export', 1, None),
507                  ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
508                  ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
509                  ('s_3_percent_all.png', 'IVC Percent All', 3, None),
510                  ('s_4_call.png', 'IVC Call', 4, None),
511                  ('s_5_all.png', 'IVC All', 5, None),
512                  ('Summary.htm', 'Summary.htm', None, None),
513                  ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
514          ]
515         for filename, typename, lane, end in cases:
516             ft = models.find_file_type_metadata_from_filename(filename)
517             self.assertEqual(ft['file_type'],
518                                  file_type_objects.get(name=typename))
519             self.assertEqual(ft.get('lane', None), lane)
520             self.assertEqual(ft.get('end', None), end)
521
522     def test_assign_file_type_complex_path(self):
523         file_type_objects = models.FileType.objects
524         cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
525                   'QSEQ tarfile', 7, 1),
526                  ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
527                   'SRF', 1, None),
528                  ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
529                  ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
530                  ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
531                  ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
532                  ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
533                  ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
534                  ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
535                  ('amonkey/s_4_call.png', 'IVC Call', 4, None),
536                  ('fishie/s_5_all.png', 'IVC All', 5, None),
537                  ('/random/Summary.htm', 'Summary.htm', None, None),
538                  ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
539          ]
540         for filename, typename, lane, end in cases:
541             result = models.find_file_type_metadata_from_filename(filename)
542             self.assertEqual(result['file_type'],
543                                  file_type_objects.get(name=typename))
544             self.assertEqual(result.get('lane',None), lane)
545             self.assertEqual(result.get('end', None), end)
546
547 class TestEmailNotify(TestCase):
548     fixtures = ['initial_data.json',
549                 'test_flowcells.json']
550
551     def test_started_email_not_logged_in(self):
552         response = self.client.get('/experiments/started/153/')
553         self.assertEqual(response.status_code, 302)
554
555     def test_started_email_logged_in_user(self):
556         self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
557         response = self.client.get('/experiments/started/153/')
558         self.assertEqual(response.status_code, 302)
559
560     def test_started_email_logged_in_staff(self):
561         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
562         response = self.client.get('/experiments/started/153/')
563         self.assertEqual(response.status_code, 200)
564
565     def test_started_email_send(self):
566         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
567         response = self.client.get('/experiments/started/153/')
568         self.assertEqual(response.status_code, 200)
569
570         self.assertTrue('pk1@example.com' in response.content)
571         self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
572
573         response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
574         self.assertEqual(response.status_code, 200)
575         self.assertEqual(len(mail.outbox), 4)
576         bcc = set(settings.NOTIFICATION_BCC).copy()
577         bcc.update(set(settings.MANAGERS))
578         for m in mail.outbox:
579             self.assertTrue(len(m.body) > 0)
580             self.assertEqual(set(m.bcc), bcc)
581
582     def test_email_navigation(self):
583         """
584         Can we navigate between the flowcell and email forms properly?
585         """
586         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
587         response = self.client.get('/experiments/started/153/')
588         self.assertEqual(response.status_code, 200)
589         self.assertTrue(re.search('Flowcell FC12150', response.content))
590         # require that navigation back to the admin page exists
591         self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
592
593 def multi_lane_to_dict(lane):
594     """Convert a list of lane entries into a dictionary indexed by library ID
595     """
596     return dict( ((x['library_id'],x) for x in lane) )
597
598 class TestSequencer(TestCase):
599     fixtures = ['initial_data.json',
600                 'test_flowcells.json',
601                 ]
602
603     def test_name_generation(self):
604         seq = models.Sequencer()
605         seq.name = "Seq1"
606         seq.instrument_name = "HWI-SEQ1"
607         seq.model = "Imaginary 5000"
608
609         self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
610
611     def test_lookup(self):
612         fc = models.FlowCell.objects.get(pk=153)
613         self.assertEqual(fc.sequencer.model,
614                              "Illumina Genome Analyzer IIx")
615         self.assertEqual(fc.sequencer.instrument_name,
616                              "ILLUMINA-EC5D15")
617         # well actually we let the browser tack on the host name
618         url = fc.get_absolute_url()
619         self.assertEqual(url, '/flowcell/FC12150/')
620
621     def test_rdf(self):
622         response = self.client.get('/flowcell/FC12150/', apidata)
623         tree = fromstring(response.content)
624         seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
625                             namespaces=NSMAP)
626         self.assertEqual(len(seq_by), 1)
627         self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
628         seq = seq_by[0].getchildren()
629         self.assertEqual(len(seq), 1)
630         self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
631         self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
632
633         name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
634         self.assertEqual(len(name), 1)
635         self.assertEqual(name[0].text, 'Tardigrade')
636         instrument = seq[0].xpath(
637             './span[@property="libns:sequencer_instrument"]')
638         self.assertEqual(len(instrument), 1)
639         self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
640         model = seq[0].xpath(
641             './span[@property="libns:sequencer_model"]')
642         self.assertEqual(len(model), 1)
643         self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
644
645     def test_flowcell_with_rdf_validation(self):
646         from htsworkflow.util.rdfhelp import add_default_schemas, \
647              dump_model, \
648              get_model, \
649              load_string_into_model
650         from htsworkflow.util.rdfinfer import Infer
651
652         model = get_model()
653         add_default_schemas(model)
654         inference = Infer(model)
655
656         url ='/flowcell/FC12150/'
657         response = self.client.get(url)
658         self.assertEqual(response.status_code, 200)
659         status = validate_xhtml(response.content)
660         if status is not None: self.assertTrue(status)
661
662         load_string_into_model(model, 'rdfa', response.content)
663
664         errmsgs = list(inference.run_validation())
665         self.assertEqual(len(errmsgs), 0)
666
667     def test_lane_with_rdf_validation(self):
668         from htsworkflow.util.rdfhelp import add_default_schemas, \
669              dump_model, \
670              get_model, \
671              load_string_into_model
672         from htsworkflow.util.rdfinfer import Infer
673
674         model = get_model()
675         add_default_schemas(model)
676         inference = Infer(model)
677
678         url = '/lane/1193'
679         response = self.client.get(url)
680         self.assertEqual(response.status_code, 200)
681         status = validate_xhtml(response.content)
682         if status is not None: self.assertTrue(status)
683
684         load_string_into_model(model, 'rdfa', response.content)
685
686         errmsgs = list(inference.run_validation())
687         self.assertEqual(len(errmsgs), 0)
688
689 def suite():
690     from unittest2 import TestSuite, defaultTestLoader
691     suite = TestSuite()
692     for testcase in [ClusterStationTestCases,
693                      SequencerTestCases,
694                      ExerimentsTestCases,
695                      TestFileType,
696                      TestEmailNotify,
697                      TestSequencer]:
698         suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
699     return suite
700
701 if __name__ == "__main__":
702     from unittest2 import main
703     main(defaultTest="suite")