8907192cf46376b1b8a28639f9a8c44207a5ef72
[htsworkflow.git] / htsworkflow / frontend / experiments / test_experiments.py
1 import re
2 from lxml.html import fromstring
3 try:
4     import json
5 except ImportError, e:
6     import simplejson as json
7 import os
8 import shutil
9 import sys
10 import tempfile
11 from urlparse import urljoin
12
13 from django.conf import settings
14 from django.core import mail
15 from django.core.exceptions import ObjectDoesNotExist
16 from django.test import TestCase
17 from django.test.utils import setup_test_environment, teardown_test_environment
18 from django.db import connection
19 from django.conf import settings
20 from htsworkflow.frontend.experiments import models
21 from htsworkflow.frontend.experiments import experiments
22 from htsworkflow.frontend.auth import apidata
23 from htsworkflow.util.ethelp import validate_xhtml
24
25 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
26
27 LANE_SET = range(1,9)
28
29 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
30
31 from django.db import connection
32
33 OLD_DB_NAME = settings.DATABASES['default']['NAME']
34 VERBOSITY = 0
35 def setUpModule():
36     setup_test_environment()
37     settings.DEBUG = False
38     connection.creation.create_test_db(VERBOSITY)
39
40 def tearDownModule():
41     connection.creation.destroy_test_db(OLD_DB_NAME, VERBOSITY)
42     teardown_test_environment()
43
44 class ClusterStationTestCases(TestCase):
45     fixtures = ['test_flowcells.json']
46
47     def test_default(self):
48         c = models.ClusterStation.default()
49         self.assertEqual(c.id, 2)
50
51         c.isdefault = False
52         c.save()
53
54         total = models.ClusterStation.objects.filter(isdefault=True).count()
55         self.assertEqual(total, 0)
56
57         other_default = models.ClusterStation.default()
58         self.assertEqual(other_default.id, 3)
59
60
61     def test_update_default(self):
62         old_default = models.ClusterStation.default()
63
64         c = models.ClusterStation.objects.get(pk=3)
65         c.isdefault = True
66         c.save()
67
68         new_default = models.ClusterStation.default()
69
70         self.assertNotEqual(old_default, new_default)
71         self.assertEqual(new_default, c)
72
73         total = models.ClusterStation.objects.filter(isdefault=True).count()
74         self.assertEqual(total, 1)
75
76     def test_update_other(self):
77         old_default = models.ClusterStation.default()
78         total = models.ClusterStation.objects.filter(isdefault=True).count()
79         self.assertEqual(total, 1)
80
81         c = models.ClusterStation.objects.get(pk=1)
82         c.name = "Primary Key 1"
83         c.save()
84
85         total = models.ClusterStation.objects.filter(isdefault=True).count()
86         self.assertEqual(total, 1)
87
88         new_default = models.ClusterStation.default()
89         self.assertEqual(old_default, new_default)
90
91
92 class SequencerTestCases(TestCase):
93     fixtures = ['test_flowcells.json']
94
95     def test_default(self):
96         # starting with no default
97         s = models.Sequencer.default()
98         self.assertEqual(s.id, 2)
99
100         total = models.Sequencer.objects.filter(isdefault=True).count()
101         self.assertEqual(total, 1)
102
103         s.isdefault = False
104         s.save()
105
106         total = models.Sequencer.objects.filter(isdefault=True).count()
107         self.assertEqual(total, 0)
108
109         other_default = models.Sequencer.default()
110         self.assertEqual(other_default.id, 7)
111
112     def test_update_default(self):
113         old_default = models.Sequencer.default()
114
115         s = models.Sequencer.objects.get(pk=1)
116         s.isdefault = True
117         s.save()
118
119         new_default = models.Sequencer.default()
120
121         self.assertNotEqual(old_default, new_default)
122         self.assertEqual(new_default, s)
123
124         total = models.Sequencer.objects.filter(isdefault=True).count()
125         self.assertEqual(total, 1)
126
127
128     def test_update_other(self):
129         old_default = models.Sequencer.default()
130         total = models.Sequencer.objects.filter(isdefault=True).count()
131         self.assertEqual(total, 1)
132
133         s = models.Sequencer.objects.get(pk=1)
134         s.name = "Primary Key 1"
135         s.save()
136
137         total = models.Sequencer.objects.filter(isdefault=True).count()
138         self.assertEqual(total, 1)
139
140         new_default = models.Sequencer.default()
141         self.assertEqual(old_default, new_default)
142
143
144 class ExperimentsTestCases(TestCase):
145     fixtures = ['test_flowcells.json',
146                 ]
147
148     def setUp(self):
149         self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
150         settings.RESULT_HOME_DIR = self.tempdir
151
152         self.fc1_id = 'FC12150'
153         self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
154         os.mkdir(self.fc1_root)
155         self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
156         os.mkdir(self.fc1_dir)
157         runxml = 'run_FC12150_2007-09-27.xml'
158         shutil.copy(os.path.join(TESTDATA_DIR, runxml),
159                     os.path.join(self.fc1_dir, runxml))
160         for i in range(1,9):
161             shutil.copy(
162                 os.path.join(TESTDATA_DIR,
163                              'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
164                 os.path.join(self.fc1_dir,
165                              'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
166                 )
167
168         self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
169         os.mkdir(self.fc2_dir)
170         os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
171         os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
172         os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
173
174     def tearDown(self):
175         shutil.rmtree(self.tempdir)
176
177     def test_flowcell_information(self):
178         """
179         Check the code that packs the django objects into simple types.
180         """
181         for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
182             fc_dict = experiments.flowcell_information(fc_id)
183             fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
184             self.assertEqual(fc_dict['flowcell_id'], fc_id)
185             self.assertEqual(fc_django.flowcell_id, fc_id)
186             self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
187             self.assertEqual(fc_dict['read_length'], fc_django.read_length)
188             self.assertEqual(fc_dict['notes'], fc_django.notes)
189             self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
190
191             for lane in fc_django.lane_set.all():
192                 lane_contents = fc_dict['lane_set'][lane.lane_number]
193                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
194                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
195                 self.assertEqual(lane_dict['comment'], lane.comment)
196                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
197                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
198                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
199                 self.assertEqual(lane_dict['library_id'], lane.library.id)
200                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
201                 self.assertEqual(lane_dict['library_species'],
202                                      lane.library.library_species.scientific_name)
203
204             response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
205             # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
206             fc_json = json.loads(response.content)
207             self.assertEqual(fc_json['flowcell_id'], fc_id)
208             self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
209             self.assertEqual(fc_json['read_length'], fc_django.read_length)
210             self.assertEqual(fc_json['notes'], fc_django.notes)
211             self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
212
213
214             for lane in fc_django.lane_set.all():
215                 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
216                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
217
218                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
219                 self.assertEqual(lane_dict['comment'], lane.comment)
220                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
221                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
222                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
223                 self.assertEqual(lane_dict['library_id'], lane.library.id)
224                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
225                 self.assertEqual(lane_dict['library_species'],
226                                      lane.library.library_species.scientific_name)
227
228     def test_invalid_flowcell(self):
229         """
230         Make sure we get a 404 if we request an invalid flowcell ID
231         """
232         response = self.client.get('/experiments/config/nottheone/json', apidata)
233         self.assertEqual(response.status_code, 404)
234
235     def test_no_key(self):
236         """
237         Require logging in to retrieve meta data
238         """
239         response = self.client.get(u'/experiments/config/FC12150/json')
240         self.assertEqual(response.status_code, 403)
241
242     def test_library_id(self):
243         """
244         Library IDs should be flexible, so make sure we can retrive a non-numeric ID
245         """
246         response = self.client.get('/experiments/config/FC12150/json', apidata)
247         self.assertEqual(response.status_code, 200)
248         flowcell = json.loads(response.content)
249
250         lane_contents = flowcell['lane_set']['3']
251         lane_library = lane_contents[0]
252         self.assertEqual(lane_library['library_id'], 'SL039')
253
254         response = self.client.get('/samples/library/SL039/json', apidata)
255         self.assertEqual(response.status_code, 200)
256         library_sl039 = json.loads(response.content)
257
258         self.assertEqual(library_sl039['library_id'], 'SL039')
259
260     def test_raw_id_field(self):
261         """
262         Test ticket:147
263
264         Library's have IDs, libraries also have primary keys,
265         we eventually had enough libraries that the drop down combo box was too
266         hard to filter through, unfortnately we want a field that uses our library
267         id and not the internal primary key, and raw_id_field uses primary keys.
268
269         This tests to make sure that the value entered in the raw library id field matches
270         the library id looked up.
271         """
272         expected_ids = [u'10981',u'11016',u'SL039',u'11060',
273                         u'11061',u'11062',u'11063',u'11064']
274         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
275         response = self.client.get('/admin/experiments/flowcell/153/')
276
277         tree = fromstring(response.content)
278         for i in range(0,8):
279             xpath_expression = '//input[@id="id_lane_set-%d-library"]'
280             input_field = tree.xpath(xpath_expression % (i,))[0]
281             library_field = input_field.find('../strong')
282             library_id, library_name = library_field.text.split(':')
283             # strip leading '#' sign from name
284             library_id = library_id[1:]
285             self.assertEqual(library_id, expected_ids[i])
286             self.assertEqual(input_field.attrib['value'], library_id)
287
288     def test_library_to_flowcell_link(self):
289         """
290         Make sure the library page includes links to the flowcell pages.
291         That work with flowcell IDs that have parenthetical comments.
292         """
293         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
294         response = self.client.get('/library/11070/')
295         self.assertEqual(response.status_code, 200)
296         status = validate_xhtml(response.content)
297         if status is not None: self.assertTrue(status)
298
299         tree = fromstring(response.content)
300         flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
301                                     namespaces=NSMAP)
302         self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)')
303         failed_fc_span = flowcell_spans[0]
304         failed_fc_a = failed_fc_span.getparent()
305         # make sure some of our RDF made it.
306         self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
307         self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
308         fc_response = self.client.get(failed_fc_a.get('href'))
309         self.assertEqual(fc_response.status_code, 200)
310         status = validate_xhtml(response.content)
311         if status is not None: self.assertTrue(status)
312
313         fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
314         self.assertEqual(fc_lane_response.status_code, 200)
315         status = validate_xhtml(response.content)
316         if status is not None: self.assertTrue(status)
317
318
319     def test_pooled_multiplex_id(self):
320         fc_dict = experiments.flowcell_information('42JU1AAXX')
321         lane_contents = fc_dict['lane_set'][3]
322         self.assertEqual(len(lane_contents), 2)
323         lane_dict = multi_lane_to_dict(lane_contents)
324
325         self.assertEqual(lane_dict['12044']['index_sequence'],
326                          {u'1': u'ATCACG',
327                           u'2': u'CGATGT',
328                           u'3': u'TTAGGC'})
329         self.assertEqual(lane_dict['11045']['index_sequence'],
330                          {u'1': u'ATCACG'})
331
332
333
334     def test_lanes_for(self):
335         """
336         Check the code that packs the django objects into simple types.
337         """
338         user = 'test'
339         lanes = experiments.lanes_for(user)
340         self.assertEqual(len(lanes), 5)
341
342         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
343         lanes_json = json.loads(response.content)
344         self.assertEqual(len(lanes), len(lanes_json))
345         for i in range(len(lanes)):
346             self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
347             self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
348             self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
349             self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
350
351     def test_lanes_for_no_lanes(self):
352         """
353         Do we get something meaningful back when the user isn't attached to anything?
354         """
355         user = 'supertest'
356         lanes = experiments.lanes_for(user)
357         self.assertEqual(len(lanes), 0)
358
359         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
360         lanes_json = json.loads(response.content)
361
362     def test_lanes_for_no_user(self):
363         """
364         Do we get something meaningful back when its the wrong user
365         """
366         user = 'not a real user'
367         self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
368
369         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
370         self.assertEqual(response.status_code, 404)
371
372
373     def test_raw_data_dir(self):
374         """Raw data path generator check"""
375         flowcell_id = self.fc1_id
376         raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
377
378         fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
379         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
380
381         fc.flowcell_id = flowcell_id + " (failed)"
382         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
383
384
385     def test_data_run_import(self):
386         srf_file_type = models.FileType.objects.get(name='SRF')
387         runxml_file_type = models.FileType.objects.get(name='run_xml')
388         flowcell_id = self.fc1_id
389         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
390         flowcell.update_data_runs()
391         self.assertEqual(len(flowcell.datarun_set.all()), 1)
392
393         run = flowcell.datarun_set.all()[0]
394         result_files = run.datafile_set.all()
395         result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
396
397         srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
398         self.assertEqual(srf4.file_type, srf_file_type)
399         self.assertEqual(srf4.library_id, '11060')
400         self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
401         self.assertEqual(
402             srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
403             '11060')
404         self.assertEqual(
405             srf4.pathname,
406             os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
407
408         lane_files = run.lane_files()
409         self.assertEqual(lane_files[4]['srf'], srf4)
410
411         runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
412         self.assertEqual(runxml.file_type, runxml_file_type)
413         self.assertEqual(runxml.library_id, None)
414
415         import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
416         # what happens if we import twice?
417         flowcell.import_data_run('FC12150/C1-37',
418                                  'run_FC12150_2007-09-27.xml')
419         self.assertEqual(
420             len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
421             import1)
422
423     def test_read_result_file(self):
424         """make sure we can return a result file
425         """
426         flowcell_id = self.fc1_id
427         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
428         flowcell.update_data_runs()
429
430         #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
431
432         result_files = flowcell.datarun_set.all()[0].datafile_set.all()
433         for f in result_files:
434             url = '/experiments/file/%s' % ( f.random_key,)
435             response = self.client.get(url)
436             self.assertEqual(response.status_code, 200)
437             mimetype = f.file_type.mimetype
438             if mimetype is None:
439                 mimetype = 'application/octet-stream'
440
441             self.assertEqual(mimetype, response['content-type'])
442
443     def test_flowcell_rdf(self):
444         import RDF
445         from htsworkflow.util.rdfhelp import get_model, \
446              fromTypedNode, \
447              load_string_into_model, \
448              rdfNS, \
449              libraryOntology, \
450              dump_model
451
452         model = get_model()
453
454         expected = {'1': ['11034'],
455                     '2': ['11036'],
456                     '3': ['12044','11045'],
457                     '4': ['11047','13044'],
458                     '5': ['11055'],
459                     '6': ['11067'],
460                     '7': ['11069'],
461                     '8': ['11070']}
462         url = '/flowcell/42JU1AAXX/'
463         response = self.client.get(url)
464         self.assertEqual(response.status_code, 200)
465         status = validate_xhtml(response.content)
466         if status is not None: self.assertTrue(status)
467
468         ns = urljoin('http://localhost', url)
469         load_string_into_model(model, 'rdfa', response.content, ns=ns)
470         body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
471         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
472
473         select ?flowcell ?flowcell_id ?lane_id ?library_id
474         where {
475           ?flowcell a libns:IlluminaFlowcell ;
476                     libns:flowcell_id ?flowcell_id ;
477                     libns:has_lane ?lane .
478           ?lane libns:lane_number ?lane_id ;
479                 libns:library ?library .
480           ?library libns:library_id ?library_id .
481         }"""
482         query = RDF.SPARQLQuery(body)
483         count = 0
484         for r in query.execute(model):
485             count += 1
486             self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
487             lane_id = fromTypedNode(r['lane_id'])
488             library_id = fromTypedNode(r['library_id'])
489             self.assertTrue(library_id in expected[lane_id])
490         self.assertEqual(count, 10)
491
492
493 class TestFileType(TestCase):
494     def test_file_type_unicode(self):
495         file_type_objects = models.FileType.objects
496         name = 'QSEQ tarfile'
497         file_type_object = file_type_objects.get(name=name)
498         self.assertEqual(u"QSEQ tarfile",
499                              unicode(file_type_object))
500
501     def test_find_file_type(self):
502         file_type_objects = models.FileType.objects
503         cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
504                   'QSEQ tarfile', 7, 1),
505                  ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
506                   'SRF', 1, None),
507                  ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
508                  ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
509                  ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
510                  ('s_1_export.txt.bz2','ELAND Export', 1, None),
511                  ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
512                  ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
513                  ('s_3_percent_all.png', 'IVC Percent All', 3, None),
514                  ('s_4_call.png', 'IVC Call', 4, None),
515                  ('s_5_all.png', 'IVC All', 5, None),
516                  ('Summary.htm', 'Summary.htm', None, None),
517                  ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
518          ]
519         for filename, typename, lane, end in cases:
520             ft = models.find_file_type_metadata_from_filename(filename)
521             self.assertEqual(ft['file_type'],
522                                  file_type_objects.get(name=typename))
523             self.assertEqual(ft.get('lane', None), lane)
524             self.assertEqual(ft.get('end', None), end)
525
526     def test_assign_file_type_complex_path(self):
527         file_type_objects = models.FileType.objects
528         cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
529                   'QSEQ tarfile', 7, 1),
530                  ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
531                   'SRF', 1, None),
532                  ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
533                  ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
534                  ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
535                  ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
536                  ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
537                  ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
538                  ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
539                  ('amonkey/s_4_call.png', 'IVC Call', 4, None),
540                  ('fishie/s_5_all.png', 'IVC All', 5, None),
541                  ('/random/Summary.htm', 'Summary.htm', None, None),
542                  ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
543          ]
544         for filename, typename, lane, end in cases:
545             result = models.find_file_type_metadata_from_filename(filename)
546             self.assertEqual(result['file_type'],
547                                  file_type_objects.get(name=typename))
548             self.assertEqual(result.get('lane',None), lane)
549             self.assertEqual(result.get('end', None), end)
550
551 class TestEmailNotify(TestCase):
552     fixtures = ['test_flowcells.json']
553
554     def test_started_email_not_logged_in(self):
555         response = self.client.get('/experiments/started/153/')
556         self.assertEqual(response.status_code, 302)
557
558     def test_started_email_logged_in_user(self):
559         self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
560         response = self.client.get('/experiments/started/153/')
561         self.assertEqual(response.status_code, 302)
562
563     def test_started_email_logged_in_staff(self):
564         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
565         response = self.client.get('/experiments/started/153/')
566         self.assertEqual(response.status_code, 200)
567
568     def test_started_email_send(self):
569         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
570         response = self.client.get('/experiments/started/153/')
571         self.assertEqual(response.status_code, 200)
572
573         self.assertTrue('pk1@example.com' in response.content)
574         self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
575
576         response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
577         self.assertEqual(response.status_code, 200)
578         self.assertEqual(len(mail.outbox), 4)
579         bcc = set(settings.NOTIFICATION_BCC).copy()
580         bcc.update(set(settings.MANAGERS))
581         for m in mail.outbox:
582             self.assertTrue(len(m.body) > 0)
583             self.assertEqual(set(m.bcc), bcc)
584
585     def test_email_navigation(self):
586         """
587         Can we navigate between the flowcell and email forms properly?
588         """
589         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
590         response = self.client.get('/experiments/started/153/')
591         self.assertEqual(response.status_code, 200)
592         self.assertTrue(re.search('Flowcell FC12150', response.content))
593         # require that navigation back to the admin page exists
594         self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
595
596 def multi_lane_to_dict(lane):
597     """Convert a list of lane entries into a dictionary indexed by library ID
598     """
599     return dict( ((x['library_id'],x) for x in lane) )
600
601 class TestSequencer(TestCase):
602     fixtures = ['test_flowcells.json',
603                 ]
604
605     def test_name_generation(self):
606         seq = models.Sequencer()
607         seq.name = "Seq1"
608         seq.instrument_name = "HWI-SEQ1"
609         seq.model = "Imaginary 5000"
610
611         self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
612
613     def test_lookup(self):
614         fc = models.FlowCell.objects.get(pk=153)
615         self.assertEqual(fc.sequencer.model,
616                              "Illumina Genome Analyzer IIx")
617         self.assertEqual(fc.sequencer.instrument_name,
618                              "ILLUMINA-EC5D15")
619         # well actually we let the browser tack on the host name
620         url = fc.get_absolute_url()
621         self.assertEqual(url, '/flowcell/FC12150/')
622
623     def test_rdf(self):
624         response = self.client.get('/flowcell/FC12150/', apidata)
625         tree = fromstring(response.content)
626         seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
627                             namespaces=NSMAP)
628         self.assertEqual(len(seq_by), 1)
629         self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
630         seq = seq_by[0].getchildren()
631         self.assertEqual(len(seq), 1)
632         self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
633         self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
634
635         name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
636         self.assertEqual(len(name), 1)
637         self.assertEqual(name[0].text, 'Tardigrade')
638         instrument = seq[0].xpath(
639             './span[@property="libns:sequencer_instrument"]')
640         self.assertEqual(len(instrument), 1)
641         self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
642         model = seq[0].xpath(
643             './span[@property="libns:sequencer_model"]')
644         self.assertEqual(len(model), 1)
645         self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
646
647     def test_flowcell_with_rdf_validation(self):
648         from htsworkflow.util.rdfhelp import add_default_schemas, \
649              dump_model, \
650              get_model, \
651              load_string_into_model
652         from htsworkflow.util.rdfinfer import Infer
653
654         model = get_model()
655         add_default_schemas(model)
656         inference = Infer(model)
657
658         url ='/flowcell/FC12150/'
659         response = self.client.get(url)
660         self.assertEqual(response.status_code, 200)
661         status = validate_xhtml(response.content)
662         if status is not None: self.assertTrue(status)
663
664         load_string_into_model(model, 'rdfa', response.content)
665
666         errmsgs = list(inference.run_validation())
667         self.assertEqual(len(errmsgs), 0)
668
669     def test_lane_with_rdf_validation(self):
670         from htsworkflow.util.rdfhelp import add_default_schemas, \
671              dump_model, \
672              get_model, \
673              load_string_into_model
674         from htsworkflow.util.rdfinfer import Infer
675
676         model = get_model()
677         add_default_schemas(model)
678         inference = Infer(model)
679
680         url = '/lane/1193'
681         response = self.client.get(url)
682         self.assertEqual(response.status_code, 200)
683         status = validate_xhtml(response.content)
684         if status is not None: self.assertTrue(status)
685
686         load_string_into_model(model, 'rdfa', response.content)
687
688         errmsgs = list(inference.run_validation())
689         self.assertEqual(len(errmsgs), 0)
690
691
692 OLD_DB = settings.DATABASES['default']['NAME']
693 def setUpModule():
694     setup_test_environment()
695     connection.creation.create_test_db()
696
697 def tearDownModule():
698     connection.creation.destroy_test_db(OLD_DB)
699     teardown_test_environment()
700
701 def suite():
702     from unittest2 import TestSuite, defaultTestLoader
703     suite = TestSuite()
704     for testcase in [ClusterStationTestCases,
705                      SequencerTestCases,
706                      ExerimentsTestCases,
707                      TestFileType,
708                      TestEmailNotify,
709                      TestSequencer]:
710         suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
711     return suite
712
713 if __name__ == "__main__":
714     from unittest2 import main
715     main(defaultTest="suite")