Flatten project hierarchy, moving djano applications out of htsworkflow.frontend...
[htsworkflow.git] / experiments / test_experiments.py
1 from __future__ import absolute_import, print_function
2
3 import re
4 from lxml.html import fromstring
5 try:
6     import json
7 except ImportError, e:
8     import simplejson as json
9 import os
10 import shutil
11 import sys
12 import tempfile
13 from urlparse import urljoin
14
15 from django.conf import settings
16 from django.core import mail
17 from django.core.exceptions import ObjectDoesNotExist
18 from django.test import TestCase
19 from django.test.utils import setup_test_environment, teardown_test_environment
20 from django.db import connection
21 from django.conf import settings
22
23 from .models import ClusterStation, DataRun, Sequencer, FlowCell, FileType, \
24     find_file_type_metadata_from_filename
25 from .experiments import flowcell_information, lanes_for
26 from htsworkflow.auth import apidata
27 from htsworkflow.util.ethelp import validate_xhtml
28
29 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
30
31 LANE_SET = range(1,9)
32
33 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
34
35 from django.db import connection
36
37 class ClusterStationTestCases(TestCase):
38     fixtures = ['initial_data.json',
39                 'test_flowcells.json']
40
41     def test_default(self):
42         c = ClusterStation.default()
43         self.assertEqual(c.id, 2)
44
45         c.isdefault = False
46         c.save()
47
48         total = ClusterStation.objects.filter(isdefault=True).count()
49         self.assertEqual(total, 0)
50
51         other_default = ClusterStation.default()
52         self.assertEqual(other_default.id, 3)
53
54
55     def test_update_default(self):
56         old_default = ClusterStation.default()
57
58         c = ClusterStation.objects.get(pk=3)
59         c.isdefault = True
60         c.save()
61
62         new_default = ClusterStation.default()
63
64         self.assertNotEqual(old_default, new_default)
65         self.assertEqual(new_default, c)
66
67         total = ClusterStation.objects.filter(isdefault=True).count()
68         self.assertEqual(total, 1)
69
70     def test_update_other(self):
71         old_default = ClusterStation.default()
72         total = ClusterStation.objects.filter(isdefault=True).count()
73         self.assertEqual(total, 1)
74
75         c = ClusterStation.objects.get(pk=1)
76         c.name = "Primary Key 1"
77         c.save()
78
79         total = ClusterStation.objects.filter(isdefault=True).count()
80         self.assertEqual(total, 1)
81
82         new_default = ClusterStation.default()
83         self.assertEqual(old_default, new_default)
84
85
86 class SequencerTestCases(TestCase):
87     fixtures = ['initial_data.json',
88                 'woldlab.json',
89                 'test_flowcells.json']
90
91     def test_default(self):
92         # starting with no default
93         s = Sequencer.default()
94         self.assertEqual(s.id, 2)
95
96         total = Sequencer.objects.filter(isdefault=True).count()
97         self.assertEqual(total, 1)
98
99         s.isdefault = False
100         s.save()
101
102         total = Sequencer.objects.filter(isdefault=True).count()
103         self.assertEqual(total, 0)
104
105         other_default = Sequencer.default()
106         self.assertEqual(other_default.id, 7)
107
108     def test_update_default(self):
109         old_default = Sequencer.default()
110
111         s = Sequencer.objects.get(pk=1)
112         s.isdefault = True
113         s.save()
114
115         new_default = Sequencer.default()
116
117         self.assertNotEqual(old_default, new_default)
118         self.assertEqual(new_default, s)
119
120         total = Sequencer.objects.filter(isdefault=True).count()
121         self.assertEqual(total, 1)
122
123
124     def test_update_other(self):
125         old_default = Sequencer.default()
126         total = Sequencer.objects.filter(isdefault=True).count()
127         self.assertEqual(total, 1)
128
129         s = Sequencer.objects.get(pk=1)
130         s.name = "Primary Key 1"
131         s.save()
132
133         total = Sequencer.objects.filter(isdefault=True).count()
134         self.assertEqual(total, 1)
135
136         new_default = Sequencer.default()
137         self.assertEqual(old_default, new_default)
138
139
140 class ExperimentsTestCases(TestCase):
141     fixtures = ['initial_data.json',
142                 'test_flowcells.json',
143                 ]
144
145     def setUp(self):
146         self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
147         settings.RESULT_HOME_DIR = self.tempdir
148
149         self.fc1_id = 'FC12150'
150         self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
151         os.mkdir(self.fc1_root)
152         self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
153         os.mkdir(self.fc1_dir)
154         runxml = 'run_FC12150_2007-09-27.xml'
155         shutil.copy(os.path.join(TESTDATA_DIR, runxml),
156                     os.path.join(self.fc1_dir, runxml))
157         for i in range(1,9):
158             shutil.copy(
159                 os.path.join(TESTDATA_DIR,
160                              'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
161                 os.path.join(self.fc1_dir,
162                              'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
163                 )
164
165         self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
166         os.mkdir(self.fc2_dir)
167         os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
168         os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
169         os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
170
171     def tearDown(self):
172         shutil.rmtree(self.tempdir)
173
174     def test_flowcell_information(self):
175         """
176         Check the code that packs the django objects into simple types.
177         """
178         for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
179             fc_dict = flowcell_information(fc_id)
180             fc_django = FlowCell.objects.get(flowcell_id=fc_id)
181             self.assertEqual(fc_dict['flowcell_id'], fc_id)
182             self.assertEqual(fc_django.flowcell_id, fc_id)
183             self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
184             self.assertEqual(fc_dict['read_length'], fc_django.read_length)
185             self.assertEqual(fc_dict['notes'], fc_django.notes)
186             self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
187
188             for lane in fc_django.lane_set.all():
189                 lane_contents = fc_dict['lane_set'][lane.lane_number]
190                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
191                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
192                 self.assertEqual(lane_dict['comment'], lane.comment)
193                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
194                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
195                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
196                 self.assertEqual(lane_dict['library_id'], lane.library.id)
197                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
198                 self.assertEqual(lane_dict['library_species'],
199                                      lane.library.library_species.scientific_name)
200
201             response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
202             # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
203             fc_json = json.loads(response.content)
204             self.assertEqual(fc_json['flowcell_id'], fc_id)
205             self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
206             self.assertEqual(fc_json['read_length'], fc_django.read_length)
207             self.assertEqual(fc_json['notes'], fc_django.notes)
208             self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
209
210
211             for lane in fc_django.lane_set.all():
212                 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
213                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
214
215                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
216                 self.assertEqual(lane_dict['comment'], lane.comment)
217                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
218                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
219                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
220                 self.assertEqual(lane_dict['library_id'], lane.library.id)
221                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
222                 self.assertEqual(lane_dict['library_species'],
223                                      lane.library.library_species.scientific_name)
224
225     def test_invalid_flowcell(self):
226         """
227         Make sure we get a 404 if we request an invalid flowcell ID
228         """
229         response = self.client.get('/experiments/config/nottheone/json', apidata)
230         self.assertEqual(response.status_code, 404)
231
232     def test_no_key(self):
233         """
234         Require logging in to retrieve meta data
235         """
236         response = self.client.get(u'/experiments/config/FC12150/json')
237         self.assertEqual(response.status_code, 403)
238
239     def test_library_id(self):
240         """
241         Library IDs should be flexible, so make sure we can retrive a non-numeric ID
242         """
243         response = self.client.get('/experiments/config/FC12150/json', apidata)
244         self.assertEqual(response.status_code, 200)
245         flowcell = json.loads(response.content)
246
247         lane_contents = flowcell['lane_set']['3']
248         lane_library = lane_contents[0]
249         self.assertEqual(lane_library['library_id'], 'SL039')
250
251         response = self.client.get('/samples/library/SL039/json', apidata)
252         self.assertEqual(response.status_code, 200)
253         library_sl039 = json.loads(response.content)
254
255         self.assertEqual(library_sl039['library_id'], 'SL039')
256
257     def test_raw_id_field(self):
258         """
259         Test ticket:147
260
261         Library's have IDs, libraries also have primary keys,
262         we eventually had enough libraries that the drop down combo box was too
263         hard to filter through, unfortnately we want a field that uses our library
264         id and not the internal primary key, and raw_id_field uses primary keys.
265
266         This tests to make sure that the value entered in the raw library id field matches
267         the library id looked up.
268         """
269         expected_ids = [u'10981',u'11016',u'SL039',u'11060',
270                         u'11061',u'11062',u'11063',u'11064']
271         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
272         response = self.client.get('/admin/experiments/flowcell/153/')
273
274         tree = fromstring(response.content)
275         for i in range(0,8):
276             xpath_expression = '//input[@id="id_lane_set-%d-library"]'
277             input_field = tree.xpath(xpath_expression % (i,))[0]
278             library_field = input_field.find('../strong')
279             library_id, library_name = library_field.text.split(':')
280             # strip leading '#' sign from name
281             library_id = library_id[1:]
282             self.assertEqual(library_id, expected_ids[i])
283             self.assertEqual(input_field.attrib['value'], library_id)
284
285     def test_library_to_flowcell_link(self):
286         """
287         Make sure the library page includes links to the flowcell pages.
288         That work with flowcell IDs that have parenthetical comments.
289         """
290         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
291         response = self.client.get('/library/11070/')
292         self.assertEqual(response.status_code, 200)
293         status = validate_xhtml(response.content)
294         if status is not None: self.assertTrue(status)
295
296         tree = fromstring(response.content)
297         flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
298                                     namespaces=NSMAP)
299         self.assertEqual(flowcell_spans[1].text, '30012AAXX (failed)')
300         failed_fc_span = flowcell_spans[1]
301         failed_fc_a = failed_fc_span.getparent()
302         # make sure some of our RDF made it.
303         self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
304         self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
305         fc_response = self.client.get(failed_fc_a.get('href'))
306         self.assertEqual(fc_response.status_code, 200)
307         status = validate_xhtml(response.content)
308         if status is not None: self.assertTrue(status)
309
310         fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
311         self.assertEqual(fc_lane_response.status_code, 200)
312         status = validate_xhtml(response.content)
313         if status is not None: self.assertTrue(status)
314
315
316     def test_pooled_multiplex_id(self):
317         fc_dict = flowcell_information('42JU1AAXX')
318         lane_contents = fc_dict['lane_set'][3]
319         self.assertEqual(len(lane_contents), 2)
320         lane_dict = multi_lane_to_dict(lane_contents)
321
322         self.assertEqual(lane_dict['12044']['index_sequence'],
323                          {u'1': u'ATCACG',
324                           u'2': u'CGATGT',
325                           u'3': u'TTAGGC'})
326         self.assertEqual(lane_dict['11045']['index_sequence'],
327                          {u'1': u'ATCACG'})
328
329
330
331     def test_lanes_for(self):
332         """
333         Check the code that packs the django objects into simple types.
334         """
335         user = 'test'
336         lanes = lanes_for(user)
337         self.assertEqual(len(lanes), 5)
338
339         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
340         lanes_json = json.loads(response.content)
341         self.assertEqual(len(lanes), len(lanes_json))
342         for i in range(len(lanes)):
343             self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
344             self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
345             self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
346             self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
347
348     def test_lanes_for_no_lanes(self):
349         """
350         Do we get something meaningful back when the user isn't attached to anything?
351         """
352         user = 'supertest'
353         lanes = lanes_for(user)
354         self.assertEqual(len(lanes), 0)
355
356         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
357         lanes_json = json.loads(response.content)
358
359     def test_lanes_for_no_user(self):
360         """
361         Do we get something meaningful back when its the wrong user
362         """
363         user = 'not a real user'
364         self.assertRaises(ObjectDoesNotExist, lanes_for, user)
365
366         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
367         self.assertEqual(response.status_code, 404)
368
369
370     def test_raw_data_dir(self):
371         """Raw data path generator check"""
372         flowcell_id = self.fc1_id
373         raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
374
375         fc = FlowCell.objects.get(flowcell_id=flowcell_id)
376         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
377
378         fc.flowcell_id = flowcell_id + " (failed)"
379         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
380
381
382     def test_data_run_import(self):
383         srf_file_type = FileType.objects.get(name='SRF')
384         runxml_file_type = FileType.objects.get(name='run_xml')
385         flowcell_id = self.fc1_id
386         flowcell = FlowCell.objects.get(flowcell_id=flowcell_id)
387         flowcell.update_data_runs()
388         self.assertEqual(len(flowcell.datarun_set.all()), 1)
389
390         run = flowcell.datarun_set.all()[0]
391         result_files = run.datafile_set.all()
392         result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
393
394         srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
395         self.assertEqual(srf4.file_type, srf_file_type)
396         self.assertEqual(srf4.library_id, '11060')
397         self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
398         self.assertEqual(
399             srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
400             '11060')
401         self.assertEqual(
402             srf4.pathname,
403             os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
404
405         lane_files = run.lane_files()
406         self.assertEqual(lane_files[4]['srf'], srf4)
407
408         runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
409         self.assertEqual(runxml.file_type, runxml_file_type)
410         self.assertEqual(runxml.library_id, None)
411
412         import1 = len(DataRun.objects.filter(result_dir='FC12150/C1-37'))
413         # what happens if we import twice?
414         flowcell.import_data_run('FC12150/C1-37',
415                                  'run_FC12150_2007-09-27.xml')
416         self.assertEqual(
417             len(DataRun.objects.filter(result_dir='FC12150/C1-37')),
418             import1)
419
420     def test_read_result_file(self):
421         """make sure we can return a result file
422         """
423         flowcell_id = self.fc1_id
424         flowcell = FlowCell.objects.get(flowcell_id=flowcell_id)
425         flowcell.update_data_runs()
426
427         #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
428
429         result_files = flowcell.datarun_set.all()[0].datafile_set.all()
430         for f in result_files:
431             url = '/experiments/file/%s' % ( f.random_key,)
432             response = self.client.get(url)
433             self.assertEqual(response.status_code, 200)
434             mimetype = f.file_type.mimetype
435             if mimetype is None:
436                 mimetype = 'application/octet-stream'
437
438             self.assertEqual(mimetype, response['content-type'])
439
440     def test_flowcell_rdf(self):
441         import RDF
442         from htsworkflow.util.rdfhelp import get_model, \
443              fromTypedNode, \
444              load_string_into_model, \
445              rdfNS, \
446              libraryOntology, \
447              dump_model
448
449         model = get_model()
450
451         expected = {'1': ['11034'],
452                     '2': ['11036'],
453                     '3': ['12044','11045'],
454                     '4': ['11047','13044'],
455                     '5': ['11055'],
456                     '6': ['11067'],
457                     '7': ['11069'],
458                     '8': ['11070']}
459         url = '/flowcell/42JU1AAXX/'
460         response = self.client.get(url)
461         self.assertEqual(response.status_code, 200)
462         status = validate_xhtml(response.content)
463         if status is not None: self.assertTrue(status)
464
465         ns = urljoin('http://localhost', url)
466         load_string_into_model(model, 'rdfa', response.content, ns=ns)
467         body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
468         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
469
470         select ?flowcell ?flowcell_id ?lane_id ?library_id
471         where {
472           ?flowcell a libns:IlluminaFlowcell ;
473                     libns:flowcell_id ?flowcell_id ;
474                     libns:has_lane ?lane .
475           ?lane libns:lane_number ?lane_id ;
476                 libns:library ?library .
477           ?library libns:library_id ?library_id .
478         }"""
479         query = RDF.SPARQLQuery(body)
480         count = 0
481         for r in query.execute(model):
482             count += 1
483             self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
484             lane_id = fromTypedNode(r['lane_id'])
485             library_id = fromTypedNode(r['library_id'])
486             self.assertTrue(library_id in expected[lane_id])
487         self.assertEqual(count, 10)
488
489
490 class TestFileType(TestCase):
491     fixtures = ['initial_data.json',
492                 'test_flowcells.json',
493                 ]
494
495     def test_file_type_unicode(self):
496         file_type_objects = FileType.objects
497         name = 'QSEQ tarfile'
498         file_type_object = file_type_objects.get(name=name)
499         self.assertEqual(u"QSEQ tarfile",
500                              unicode(file_type_object))
501
502     def test_find_file_type(self):
503         file_type_objects = FileType.objects
504         cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
505                   'QSEQ tarfile', 7, 1),
506                  ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
507                   'SRF', 1, None),
508                  ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
509                  ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
510                  ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
511                  ('s_1_export.txt.bz2','ELAND Export', 1, None),
512                  ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
513                  ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
514                  ('s_3_percent_all.png', 'IVC Percent All', 3, None),
515                  ('s_4_call.png', 'IVC Call', 4, None),
516                  ('s_5_all.png', 'IVC All', 5, None),
517                  ('Summary.htm', 'Summary.htm', None, None),
518                  ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
519          ]
520         for filename, typename, lane, end in cases:
521             ft = find_file_type_metadata_from_filename(filename)
522             self.assertEqual(ft['file_type'],
523                                  file_type_objects.get(name=typename))
524             self.assertEqual(ft.get('lane', None), lane)
525             self.assertEqual(ft.get('end', None), end)
526
527     def test_assign_file_type_complex_path(self):
528         file_type_objects = FileType.objects
529         cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
530                   'QSEQ tarfile', 7, 1),
531                  ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
532                   'SRF', 1, None),
533                  ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
534                  ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
535                  ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
536                  ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
537                  ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
538                  ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
539                  ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
540                  ('amonkey/s_4_call.png', 'IVC Call', 4, None),
541                  ('fishie/s_5_all.png', 'IVC All', 5, None),
542                  ('/random/Summary.htm', 'Summary.htm', None, None),
543                  ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
544          ]
545         for filename, typename, lane, end in cases:
546             result = find_file_type_metadata_from_filename(filename)
547             self.assertEqual(result['file_type'],
548                                  file_type_objects.get(name=typename))
549             self.assertEqual(result.get('lane',None), lane)
550             self.assertEqual(result.get('end', None), end)
551
552 class TestEmailNotify(TestCase):
553     fixtures = ['initial_data.json',
554                 'test_flowcells.json']
555
556     def test_started_email_not_logged_in(self):
557         response = self.client.get('/experiments/started/153/')
558         self.assertEqual(response.status_code, 302)
559
560     def test_started_email_logged_in_user(self):
561         self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
562         response = self.client.get('/experiments/started/153/')
563         self.assertEqual(response.status_code, 302)
564
565     def test_started_email_logged_in_staff(self):
566         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
567         response = self.client.get('/experiments/started/153/')
568         self.assertEqual(response.status_code, 200)
569
570     def test_started_email_send(self):
571         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
572         response = self.client.get('/experiments/started/153/')
573         self.assertEqual(response.status_code, 200)
574
575         self.assertTrue('pk1@example.com' in response.content)
576         self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
577
578         response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
579         self.assertEqual(response.status_code, 200)
580         self.assertEqual(len(mail.outbox), 4)
581         bcc = set(settings.NOTIFICATION_BCC).copy()
582         bcc.update(set(settings.MANAGERS))
583         for m in mail.outbox:
584             self.assertTrue(len(m.body) > 0)
585             self.assertEqual(set(m.bcc), bcc)
586
587     def test_email_navigation(self):
588         """
589         Can we navigate between the flowcell and email forms properly?
590         """
591         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
592         response = self.client.get('/experiments/started/153/')
593         self.assertEqual(response.status_code, 200)
594         self.assertTrue(re.search('Flowcell FC12150', response.content))
595         # require that navigation back to the admin page exists
596         self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
597
598 def multi_lane_to_dict(lane):
599     """Convert a list of lane entries into a dictionary indexed by library ID
600     """
601     return dict( ((x['library_id'],x) for x in lane) )
602
603 class TestSequencer(TestCase):
604     fixtures = ['initial_data.json',
605                 'test_flowcells.json',
606                 ]
607
608     def test_name_generation(self):
609         seq = Sequencer()
610         seq.name = "Seq1"
611         seq.instrument_name = "HWI-SEQ1"
612         seq.model = "Imaginary 5000"
613
614         self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
615
616     def test_lookup(self):
617         fc = FlowCell.objects.get(pk=153)
618         self.assertEqual(fc.sequencer.model,
619                              "Illumina Genome Analyzer IIx")
620         self.assertEqual(fc.sequencer.instrument_name,
621                              "ILLUMINA-EC5D15")
622         # well actually we let the browser tack on the host name
623         url = fc.get_absolute_url()
624         self.assertEqual(url, '/flowcell/FC12150/')
625
626     def test_rdf(self):
627         response = self.client.get('/flowcell/FC12150/', apidata)
628         tree = fromstring(response.content)
629         seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
630                             namespaces=NSMAP)
631         self.assertEqual(len(seq_by), 1)
632         self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
633         seq = seq_by[0].getchildren()
634         self.assertEqual(len(seq), 1)
635         self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
636         self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
637
638         name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
639         self.assertEqual(len(name), 1)
640         self.assertEqual(name[0].text, 'Tardigrade')
641         instrument = seq[0].xpath(
642             './span[@property="libns:sequencer_instrument"]')
643         self.assertEqual(len(instrument), 1)
644         self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
645         model = seq[0].xpath(
646             './span[@property="libns:sequencer_model"]')
647         self.assertEqual(len(model), 1)
648         self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
649
650     def test_flowcell_with_rdf_validation(self):
651         from htsworkflow.util.rdfhelp import add_default_schemas, \
652              dump_model, \
653              get_model, \
654              load_string_into_model
655         from htsworkflow.util.rdfinfer import Infer
656
657         model = get_model()
658         add_default_schemas(model)
659         inference = Infer(model)
660
661         url ='/flowcell/FC12150/'
662         response = self.client.get(url)
663         self.assertEqual(response.status_code, 200)
664         status = validate_xhtml(response.content)
665         if status is not None: self.assertTrue(status)
666
667         load_string_into_model(model, 'rdfa', response.content)
668
669         errmsgs = list(inference.run_validation())
670         self.assertEqual(len(errmsgs), 0)
671
672     def test_lane_with_rdf_validation(self):
673         from htsworkflow.util.rdfhelp import add_default_schemas, \
674              dump_model, \
675              get_model, \
676              load_string_into_model
677         from htsworkflow.util.rdfinfer import Infer
678
679         model = get_model()
680         add_default_schemas(model)
681         inference = Infer(model)
682
683         url = '/lane/1193'
684         response = self.client.get(url)
685         self.assertEqual(response.status_code, 200)
686         status = validate_xhtml(response.content)
687         if status is not None: self.assertTrue(status)
688
689         load_string_into_model(model, 'rdfa', response.content)
690
691         errmsgs = list(inference.run_validation())
692         self.assertEqual(len(errmsgs), 0)
693
694 def suite():
695     from unittest import TestSuite, defaultTestLoader
696     suite = TestSuite()
697     for testcase in [ClusterStationTestCases,
698                      SequencerTestCases,
699                      ExerimentsTestCases,
700                      TestFileType,
701                      TestEmailNotify,
702                      TestSequencer]:
703         suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
704     return suite
705
706 if __name__ == "__main__":
707     from unittest import main
708     main(defaultTest="suite")