0a0c9419827c7b4e9cb1c6d12e99fcb9100626e0
[htsworkflow.git] / htsworkflow / frontend / experiments / test_experiments.py
1 import re
2 from lxml.html import fromstring
3 try:
4     import json
5 except ImportError, e:
6     import simplejson as json
7 import os
8 import shutil
9 import sys
10 import tempfile
11 from urlparse import urljoin
12
13 from django.conf import settings
14 from django.core import mail
15 from django.core.exceptions import ObjectDoesNotExist
16 from django.test import TestCase
17 from django.test.utils import setup_test_environment, teardown_test_environment
18 from django.db import connection
19 from django.conf import settings
20 from htsworkflow.frontend.experiments import models
21 from htsworkflow.frontend.experiments import experiments
22 from htsworkflow.frontend.auth import apidata
23 from htsworkflow.util.ethelp import validate_xhtml
24
25 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
26
27 LANE_SET = range(1,9)
28
29 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
30
31 from django.db import connection
32
33 class ClusterStationTestCases(TestCase):
34     fixtures = ['initial_data.json',
35                 'test_flowcells.json']
36
37     def test_default(self):
38         c = models.ClusterStation.default()
39         self.assertEqual(c.id, 2)
40
41         c.isdefault = False
42         c.save()
43
44         total = models.ClusterStation.objects.filter(isdefault=True).count()
45         self.assertEqual(total, 0)
46
47         other_default = models.ClusterStation.default()
48         self.assertEqual(other_default.id, 3)
49
50
51     def test_update_default(self):
52         old_default = models.ClusterStation.default()
53
54         c = models.ClusterStation.objects.get(pk=3)
55         c.isdefault = True
56         c.save()
57
58         new_default = models.ClusterStation.default()
59
60         self.assertNotEqual(old_default, new_default)
61         self.assertEqual(new_default, c)
62
63         total = models.ClusterStation.objects.filter(isdefault=True).count()
64         self.assertEqual(total, 1)
65
66     def test_update_other(self):
67         old_default = models.ClusterStation.default()
68         total = models.ClusterStation.objects.filter(isdefault=True).count()
69         self.assertEqual(total, 1)
70
71         c = models.ClusterStation.objects.get(pk=1)
72         c.name = "Primary Key 1"
73         c.save()
74
75         total = models.ClusterStation.objects.filter(isdefault=True).count()
76         self.assertEqual(total, 1)
77
78         new_default = models.ClusterStation.default()
79         self.assertEqual(old_default, new_default)
80
81
82 class SequencerTestCases(TestCase):
83     fixtures = ['initial_data.json',
84                 'woldlab.json',
85                 'test_flowcells.json']
86
87     def test_default(self):
88         # starting with no default
89         s = models.Sequencer.default()
90         self.assertEqual(s.id, 2)
91
92         total = models.Sequencer.objects.filter(isdefault=True).count()
93         self.assertEqual(total, 1)
94
95         s.isdefault = False
96         s.save()
97
98         total = models.Sequencer.objects.filter(isdefault=True).count()
99         self.assertEqual(total, 0)
100
101         other_default = models.Sequencer.default()
102         self.assertEqual(other_default.id, 7)
103
104     def test_update_default(self):
105         old_default = models.Sequencer.default()
106
107         s = models.Sequencer.objects.get(pk=1)
108         s.isdefault = True
109         s.save()
110
111         new_default = models.Sequencer.default()
112
113         self.assertNotEqual(old_default, new_default)
114         self.assertEqual(new_default, s)
115
116         total = models.Sequencer.objects.filter(isdefault=True).count()
117         self.assertEqual(total, 1)
118
119
120     def test_update_other(self):
121         old_default = models.Sequencer.default()
122         total = models.Sequencer.objects.filter(isdefault=True).count()
123         self.assertEqual(total, 1)
124
125         s = models.Sequencer.objects.get(pk=1)
126         s.name = "Primary Key 1"
127         s.save()
128
129         total = models.Sequencer.objects.filter(isdefault=True).count()
130         self.assertEqual(total, 1)
131
132         new_default = models.Sequencer.default()
133         self.assertEqual(old_default, new_default)
134
135
136 class ExperimentsTestCases(TestCase):
137     fixtures = ['initial_data.json',
138                 'test_flowcells.json',
139                 ]
140
141     def setUp(self):
142         self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
143         settings.RESULT_HOME_DIR = self.tempdir
144
145         self.fc1_id = 'FC12150'
146         self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
147         os.mkdir(self.fc1_root)
148         self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
149         os.mkdir(self.fc1_dir)
150         runxml = 'run_FC12150_2007-09-27.xml'
151         shutil.copy(os.path.join(TESTDATA_DIR, runxml),
152                     os.path.join(self.fc1_dir, runxml))
153         for i in range(1,9):
154             shutil.copy(
155                 os.path.join(TESTDATA_DIR,
156                              'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
157                 os.path.join(self.fc1_dir,
158                              'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
159                 )
160
161         self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
162         os.mkdir(self.fc2_dir)
163         os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
164         os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
165         os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
166
167     def tearDown(self):
168         shutil.rmtree(self.tempdir)
169
170     def test_flowcell_information(self):
171         """
172         Check the code that packs the django objects into simple types.
173         """
174         for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
175             fc_dict = experiments.flowcell_information(fc_id)
176             fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
177             self.assertEqual(fc_dict['flowcell_id'], fc_id)
178             self.assertEqual(fc_django.flowcell_id, fc_id)
179             self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
180             self.assertEqual(fc_dict['read_length'], fc_django.read_length)
181             self.assertEqual(fc_dict['notes'], fc_django.notes)
182             self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
183
184             for lane in fc_django.lane_set.all():
185                 lane_contents = fc_dict['lane_set'][lane.lane_number]
186                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
187                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
188                 self.assertEqual(lane_dict['comment'], lane.comment)
189                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
190                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
191                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
192                 self.assertEqual(lane_dict['library_id'], lane.library.id)
193                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
194                 self.assertEqual(lane_dict['library_species'],
195                                      lane.library.library_species.scientific_name)
196
197             response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
198             # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
199             fc_json = json.loads(response.content)
200             self.assertEqual(fc_json['flowcell_id'], fc_id)
201             self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
202             self.assertEqual(fc_json['read_length'], fc_django.read_length)
203             self.assertEqual(fc_json['notes'], fc_django.notes)
204             self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
205
206
207             for lane in fc_django.lane_set.all():
208                 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
209                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
210
211                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
212                 self.assertEqual(lane_dict['comment'], lane.comment)
213                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
214                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
215                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
216                 self.assertEqual(lane_dict['library_id'], lane.library.id)
217                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
218                 self.assertEqual(lane_dict['library_species'],
219                                      lane.library.library_species.scientific_name)
220
221     def test_invalid_flowcell(self):
222         """
223         Make sure we get a 404 if we request an invalid flowcell ID
224         """
225         response = self.client.get('/experiments/config/nottheone/json', apidata)
226         self.assertEqual(response.status_code, 404)
227
228     def test_no_key(self):
229         """
230         Require logging in to retrieve meta data
231         """
232         response = self.client.get(u'/experiments/config/FC12150/json')
233         self.assertEqual(response.status_code, 403)
234
235     def test_library_id(self):
236         """
237         Library IDs should be flexible, so make sure we can retrive a non-numeric ID
238         """
239         response = self.client.get('/experiments/config/FC12150/json', apidata)
240         self.assertEqual(response.status_code, 200)
241         flowcell = json.loads(response.content)
242
243         lane_contents = flowcell['lane_set']['3']
244         lane_library = lane_contents[0]
245         self.assertEqual(lane_library['library_id'], 'SL039')
246
247         response = self.client.get('/samples/library/SL039/json', apidata)
248         self.assertEqual(response.status_code, 200)
249         library_sl039 = json.loads(response.content)
250
251         self.assertEqual(library_sl039['library_id'], 'SL039')
252
253     def test_raw_id_field(self):
254         """
255         Test ticket:147
256
257         Library's have IDs, libraries also have primary keys,
258         we eventually had enough libraries that the drop down combo box was too
259         hard to filter through, unfortnately we want a field that uses our library
260         id and not the internal primary key, and raw_id_field uses primary keys.
261
262         This tests to make sure that the value entered in the raw library id field matches
263         the library id looked up.
264         """
265         expected_ids = [u'10981',u'11016',u'SL039',u'11060',
266                         u'11061',u'11062',u'11063',u'11064']
267         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
268         response = self.client.get('/admin/experiments/flowcell/153/')
269
270         tree = fromstring(response.content)
271         for i in range(0,8):
272             xpath_expression = '//input[@id="id_lane_set-%d-library"]'
273             input_field = tree.xpath(xpath_expression % (i,))[0]
274             library_field = input_field.find('../strong')
275             library_id, library_name = library_field.text.split(':')
276             # strip leading '#' sign from name
277             library_id = library_id[1:]
278             self.assertEqual(library_id, expected_ids[i])
279             self.assertEqual(input_field.attrib['value'], library_id)
280
281     def test_library_to_flowcell_link(self):
282         """
283         Make sure the library page includes links to the flowcell pages.
284         That work with flowcell IDs that have parenthetical comments.
285         """
286         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
287         response = self.client.get('/library/11070/')
288         self.assertEqual(response.status_code, 200)
289         status = validate_xhtml(response.content)
290         if status is not None: self.assertTrue(status)
291
292         tree = fromstring(response.content)
293         flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
294                                     namespaces=NSMAP)
295         self.assertEqual(flowcell_spans[1].text, '30012AAXX (failed)')
296         failed_fc_span = flowcell_spans[1]
297         failed_fc_a = failed_fc_span.getparent()
298         # make sure some of our RDF made it.
299         self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
300         self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
301         fc_response = self.client.get(failed_fc_a.get('href'))
302         self.assertEqual(fc_response.status_code, 200)
303         status = validate_xhtml(response.content)
304         if status is not None: self.assertTrue(status)
305
306         fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
307         self.assertEqual(fc_lane_response.status_code, 200)
308         status = validate_xhtml(response.content)
309         if status is not None: self.assertTrue(status)
310
311
312     def test_pooled_multiplex_id(self):
313         fc_dict = experiments.flowcell_information('42JU1AAXX')
314         lane_contents = fc_dict['lane_set'][3]
315         self.assertEqual(len(lane_contents), 2)
316         lane_dict = multi_lane_to_dict(lane_contents)
317
318         self.assertEqual(lane_dict['12044']['index_sequence'],
319                          {u'1': u'ATCACG',
320                           u'2': u'CGATGT',
321                           u'3': u'TTAGGC'})
322         self.assertEqual(lane_dict['11045']['index_sequence'],
323                          {u'1': u'ATCACG'})
324
325
326
327     def test_lanes_for(self):
328         """
329         Check the code that packs the django objects into simple types.
330         """
331         user = 'test'
332         lanes = experiments.lanes_for(user)
333         self.assertEqual(len(lanes), 5)
334
335         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
336         lanes_json = json.loads(response.content)
337         self.assertEqual(len(lanes), len(lanes_json))
338         for i in range(len(lanes)):
339             self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
340             self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
341             self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
342             self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
343
344     def test_lanes_for_no_lanes(self):
345         """
346         Do we get something meaningful back when the user isn't attached to anything?
347         """
348         user = 'supertest'
349         lanes = experiments.lanes_for(user)
350         self.assertEqual(len(lanes), 0)
351
352         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
353         lanes_json = json.loads(response.content)
354
355     def test_lanes_for_no_user(self):
356         """
357         Do we get something meaningful back when its the wrong user
358         """
359         user = 'not a real user'
360         self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
361
362         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
363         self.assertEqual(response.status_code, 404)
364
365
366     def test_raw_data_dir(self):
367         """Raw data path generator check"""
368         flowcell_id = self.fc1_id
369         raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
370
371         fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
372         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
373
374         fc.flowcell_id = flowcell_id + " (failed)"
375         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
376
377
378     def test_data_run_import(self):
379         srf_file_type = models.FileType.objects.get(name='SRF')
380         runxml_file_type = models.FileType.objects.get(name='run_xml')
381         flowcell_id = self.fc1_id
382         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
383         flowcell.update_data_runs()
384         self.assertEqual(len(flowcell.datarun_set.all()), 1)
385
386         run = flowcell.datarun_set.all()[0]
387         result_files = run.datafile_set.all()
388         result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
389
390         srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
391         self.assertEqual(srf4.file_type, srf_file_type)
392         self.assertEqual(srf4.library_id, '11060')
393         self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
394         self.assertEqual(
395             srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
396             '11060')
397         self.assertEqual(
398             srf4.pathname,
399             os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
400
401         lane_files = run.lane_files()
402         self.assertEqual(lane_files[4]['srf'], srf4)
403
404         runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
405         self.assertEqual(runxml.file_type, runxml_file_type)
406         self.assertEqual(runxml.library_id, None)
407
408         import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
409         # what happens if we import twice?
410         flowcell.import_data_run('FC12150/C1-37',
411                                  'run_FC12150_2007-09-27.xml')
412         self.assertEqual(
413             len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
414             import1)
415
416     def test_read_result_file(self):
417         """make sure we can return a result file
418         """
419         flowcell_id = self.fc1_id
420         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
421         flowcell.update_data_runs()
422
423         #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
424
425         result_files = flowcell.datarun_set.all()[0].datafile_set.all()
426         for f in result_files:
427             url = '/experiments/file/%s' % ( f.random_key,)
428             response = self.client.get(url)
429             self.assertEqual(response.status_code, 200)
430             mimetype = f.file_type.mimetype
431             if mimetype is None:
432                 mimetype = 'application/octet-stream'
433
434             self.assertEqual(mimetype, response['content-type'])
435
436     def test_flowcell_rdf(self):
437         import RDF
438         from htsworkflow.util.rdfhelp import get_model, \
439              fromTypedNode, \
440              load_string_into_model, \
441              rdfNS, \
442              libraryOntology, \
443              dump_model
444
445         model = get_model()
446
447         expected = {'1': ['11034'],
448                     '2': ['11036'],
449                     '3': ['12044','11045'],
450                     '4': ['11047','13044'],
451                     '5': ['11055'],
452                     '6': ['11067'],
453                     '7': ['11069'],
454                     '8': ['11070']}
455         url = '/flowcell/42JU1AAXX/'
456         response = self.client.get(url)
457         self.assertEqual(response.status_code, 200)
458         status = validate_xhtml(response.content)
459         if status is not None: self.assertTrue(status)
460
461         ns = urljoin('http://localhost', url)
462         load_string_into_model(model, 'rdfa', response.content, ns=ns)
463         body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
464         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
465
466         select ?flowcell ?flowcell_id ?lane_id ?library_id
467         where {
468           ?flowcell a libns:IlluminaFlowcell ;
469                     libns:flowcell_id ?flowcell_id ;
470                     libns:has_lane ?lane .
471           ?lane libns:lane_number ?lane_id ;
472                 libns:library ?library .
473           ?library libns:library_id ?library_id .
474         }"""
475         query = RDF.SPARQLQuery(body)
476         count = 0
477         for r in query.execute(model):
478             count += 1
479             self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
480             lane_id = fromTypedNode(r['lane_id'])
481             library_id = fromTypedNode(r['library_id'])
482             self.assertTrue(library_id in expected[lane_id])
483         self.assertEqual(count, 10)
484
485
486 class TestFileType(TestCase):
487     fixtures = ['initial_data.json',
488                 'test_flowcells.json',
489                 ]
490
491     def test_file_type_unicode(self):
492         file_type_objects = models.FileType.objects
493         name = 'QSEQ tarfile'
494         file_type_object = file_type_objects.get(name=name)
495         self.assertEqual(u"QSEQ tarfile",
496                              unicode(file_type_object))
497
498     def test_find_file_type(self):
499         file_type_objects = models.FileType.objects
500         cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
501                   'QSEQ tarfile', 7, 1),
502                  ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
503                   'SRF', 1, None),
504                  ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
505                  ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
506                  ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
507                  ('s_1_export.txt.bz2','ELAND Export', 1, None),
508                  ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
509                  ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
510                  ('s_3_percent_all.png', 'IVC Percent All', 3, None),
511                  ('s_4_call.png', 'IVC Call', 4, None),
512                  ('s_5_all.png', 'IVC All', 5, None),
513                  ('Summary.htm', 'Summary.htm', None, None),
514                  ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
515          ]
516         for filename, typename, lane, end in cases:
517             ft = models.find_file_type_metadata_from_filename(filename)
518             self.assertEqual(ft['file_type'],
519                                  file_type_objects.get(name=typename))
520             self.assertEqual(ft.get('lane', None), lane)
521             self.assertEqual(ft.get('end', None), end)
522
523     def test_assign_file_type_complex_path(self):
524         file_type_objects = models.FileType.objects
525         cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
526                   'QSEQ tarfile', 7, 1),
527                  ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
528                   'SRF', 1, None),
529                  ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
530                  ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
531                  ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
532                  ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
533                  ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
534                  ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
535                  ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
536                  ('amonkey/s_4_call.png', 'IVC Call', 4, None),
537                  ('fishie/s_5_all.png', 'IVC All', 5, None),
538                  ('/random/Summary.htm', 'Summary.htm', None, None),
539                  ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
540          ]
541         for filename, typename, lane, end in cases:
542             result = models.find_file_type_metadata_from_filename(filename)
543             self.assertEqual(result['file_type'],
544                                  file_type_objects.get(name=typename))
545             self.assertEqual(result.get('lane',None), lane)
546             self.assertEqual(result.get('end', None), end)
547
548 class TestEmailNotify(TestCase):
549     fixtures = ['initial_data.json',
550                 'test_flowcells.json']
551
552     def test_started_email_not_logged_in(self):
553         response = self.client.get('/experiments/started/153/')
554         self.assertEqual(response.status_code, 302)
555
556     def test_started_email_logged_in_user(self):
557         self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
558         response = self.client.get('/experiments/started/153/')
559         self.assertEqual(response.status_code, 302)
560
561     def test_started_email_logged_in_staff(self):
562         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
563         response = self.client.get('/experiments/started/153/')
564         self.assertEqual(response.status_code, 200)
565
566     def test_started_email_send(self):
567         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
568         response = self.client.get('/experiments/started/153/')
569         self.assertEqual(response.status_code, 200)
570
571         self.assertTrue('pk1@example.com' in response.content)
572         self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
573
574         response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
575         self.assertEqual(response.status_code, 200)
576         self.assertEqual(len(mail.outbox), 4)
577         bcc = set(settings.NOTIFICATION_BCC).copy()
578         bcc.update(set(settings.MANAGERS))
579         for m in mail.outbox:
580             self.assertTrue(len(m.body) > 0)
581             self.assertEqual(set(m.bcc), bcc)
582
583     def test_email_navigation(self):
584         """
585         Can we navigate between the flowcell and email forms properly?
586         """
587         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
588         response = self.client.get('/experiments/started/153/')
589         self.assertEqual(response.status_code, 200)
590         self.assertTrue(re.search('Flowcell FC12150', response.content))
591         # require that navigation back to the admin page exists
592         self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
593
594 def multi_lane_to_dict(lane):
595     """Convert a list of lane entries into a dictionary indexed by library ID
596     """
597     return dict( ((x['library_id'],x) for x in lane) )
598
599 class TestSequencer(TestCase):
600     fixtures = ['initial_data.json',
601                 'test_flowcells.json',
602                 ]
603
604     def test_name_generation(self):
605         seq = models.Sequencer()
606         seq.name = "Seq1"
607         seq.instrument_name = "HWI-SEQ1"
608         seq.model = "Imaginary 5000"
609
610         self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
611
612     def test_lookup(self):
613         fc = models.FlowCell.objects.get(pk=153)
614         self.assertEqual(fc.sequencer.model,
615                              "Illumina Genome Analyzer IIx")
616         self.assertEqual(fc.sequencer.instrument_name,
617                              "ILLUMINA-EC5D15")
618         # well actually we let the browser tack on the host name
619         url = fc.get_absolute_url()
620         self.assertEqual(url, '/flowcell/FC12150/')
621
622     def test_rdf(self):
623         response = self.client.get('/flowcell/FC12150/', apidata)
624         tree = fromstring(response.content)
625         seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
626                             namespaces=NSMAP)
627         self.assertEqual(len(seq_by), 1)
628         self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
629         seq = seq_by[0].getchildren()
630         self.assertEqual(len(seq), 1)
631         self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
632         self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
633
634         name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
635         self.assertEqual(len(name), 1)
636         self.assertEqual(name[0].text, 'Tardigrade')
637         instrument = seq[0].xpath(
638             './span[@property="libns:sequencer_instrument"]')
639         self.assertEqual(len(instrument), 1)
640         self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
641         model = seq[0].xpath(
642             './span[@property="libns:sequencer_model"]')
643         self.assertEqual(len(model), 1)
644         self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
645
646     def test_flowcell_with_rdf_validation(self):
647         from htsworkflow.util.rdfhelp import add_default_schemas, \
648              dump_model, \
649              get_model, \
650              load_string_into_model
651         from htsworkflow.util.rdfinfer import Infer
652
653         model = get_model()
654         add_default_schemas(model)
655         inference = Infer(model)
656
657         url ='/flowcell/FC12150/'
658         response = self.client.get(url)
659         self.assertEqual(response.status_code, 200)
660         status = validate_xhtml(response.content)
661         if status is not None: self.assertTrue(status)
662
663         load_string_into_model(model, 'rdfa', response.content)
664
665         errmsgs = list(inference.run_validation())
666         self.assertEqual(len(errmsgs), 0)
667
668     def test_lane_with_rdf_validation(self):
669         from htsworkflow.util.rdfhelp import add_default_schemas, \
670              dump_model, \
671              get_model, \
672              load_string_into_model
673         from htsworkflow.util.rdfinfer import Infer
674
675         model = get_model()
676         add_default_schemas(model)
677         inference = Infer(model)
678
679         url = '/lane/1193'
680         response = self.client.get(url)
681         self.assertEqual(response.status_code, 200)
682         status = validate_xhtml(response.content)
683         if status is not None: self.assertTrue(status)
684
685         load_string_into_model(model, 'rdfa', response.content)
686
687         errmsgs = list(inference.run_validation())
688         self.assertEqual(len(errmsgs), 0)
689
690 def suite():
691     from unittest import TestSuite, defaultTestLoader
692     suite = TestSuite()
693     for testcase in [ClusterStationTestCases,
694                      SequencerTestCases,
695                      ExerimentsTestCases,
696                      TestFileType,
697                      TestEmailNotify,
698                      TestSequencer]:
699         suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
700     return suite
701
702 if __name__ == "__main__":
703     from unittest import main
704     main(defaultTest="suite")