5878d726d7ee1d268382230a56a235aef7ca29e6
[htsworkflow.git] / htsworkflow / frontend / experiments / test_experiments.py
1 import re
2 from lxml.html import fromstring
3 try:
4     import json
5 except ImportError, e:
6     import simplejson as json
7 import os
8 import shutil
9 import sys
10 import tempfile
11 from urlparse import urljoin
12
13 from django.conf import settings
14 from django.core import mail
15 from django.core.exceptions import ObjectDoesNotExist
16 from django.test import TestCase
17 from django.test.utils import setup_test_environment, teardown_test_environment
18 from django.db import connection
19 from django.conf import settings
20 from htsworkflow.frontend.experiments import models
21 from htsworkflow.frontend.experiments import experiments
22 from htsworkflow.frontend.auth import apidata
23 from htsworkflow.util.ethelp import validate_xhtml
24
25 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
26
27 LANE_SET = range(1,9)
28
29 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
30
31 from django.db import connection
32 OLD_DB_NAME = settings.DATABASE_NAME
33 VERBOSITY = 0
34 def setUpModule():
35     setup_test_environment()
36     settings.DEBUG = False
37     connection.creation.create_test_db(VERBOSITY)
38
39 def tearDownModule():
40     connection.creation.destroy_test_db(OLD_DB_NAME, VERBOSITY)
41     teardown_test_environment()
42
43 class ClusterStationTestCases(TestCase):
44     fixtures = ['test_flowcells.json']
45
46     def test_default(self):
47         c = models.ClusterStation.default()
48         self.assertEqual(c.id, 2)
49
50         c.isdefault = False
51         c.save()
52
53         total = models.ClusterStation.objects.filter(isdefault=True).count()
54         self.assertEqual(total, 0)
55
56         other_default = models.ClusterStation.default()
57         self.assertEqual(other_default.id, 3)
58
59
60     def test_update_default(self):
61         old_default = models.ClusterStation.default()
62
63         c = models.ClusterStation.objects.get(pk=3)
64         c.isdefault = True
65         c.save()
66
67         new_default = models.ClusterStation.default()
68
69         self.assertNotEqual(old_default, new_default)
70         self.assertEqual(new_default, c)
71
72         total = models.ClusterStation.objects.filter(isdefault=True).count()
73         self.assertEqual(total, 1)
74
75     def test_update_other(self):
76         old_default = models.ClusterStation.default()
77         total = models.ClusterStation.objects.filter(isdefault=True).count()
78         self.assertEqual(total, 1)
79
80         c = models.ClusterStation.objects.get(pk=1)
81         c.name = "Primary Key 1"
82         c.save()
83
84         total = models.ClusterStation.objects.filter(isdefault=True).count()
85         self.assertEqual(total, 1)
86
87         new_default = models.ClusterStation.default()
88         self.assertEqual(old_default, new_default)
89
90
91 class SequencerTestCases(TestCase):
92     fixtures = ['test_flowcells.json']
93
94     def test_default(self):
95         # starting with no default
96         s = models.Sequencer.default()
97         self.assertEqual(s.id, 2)
98
99         total = models.Sequencer.objects.filter(isdefault=True).count()
100         self.assertEqual(total, 1)
101
102         s.isdefault = False
103         s.save()
104
105         total = models.Sequencer.objects.filter(isdefault=True).count()
106         self.assertEqual(total, 0)
107
108         other_default = models.Sequencer.default()
109         self.assertEqual(other_default.id, 7)
110
111     def test_update_default(self):
112         old_default = models.Sequencer.default()
113
114         s = models.Sequencer.objects.get(pk=1)
115         s.isdefault = True
116         s.save()
117
118         new_default = models.Sequencer.default()
119
120         self.assertNotEqual(old_default, new_default)
121         self.assertEqual(new_default, s)
122
123         total = models.Sequencer.objects.filter(isdefault=True).count()
124         self.assertEqual(total, 1)
125
126
127     def test_update_other(self):
128         old_default = models.Sequencer.default()
129         total = models.Sequencer.objects.filter(isdefault=True).count()
130         self.assertEqual(total, 1)
131
132         s = models.Sequencer.objects.get(pk=1)
133         s.name = "Primary Key 1"
134         s.save()
135
136         total = models.Sequencer.objects.filter(isdefault=True).count()
137         self.assertEqual(total, 1)
138
139         new_default = models.Sequencer.default()
140         self.assertEqual(old_default, new_default)
141
142
143 class ExperimentsTestCases(TestCase):
144     fixtures = ['test_flowcells.json',
145                 ]
146
147     def setUp(self):
148         self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
149         settings.RESULT_HOME_DIR = self.tempdir
150
151         self.fc1_id = 'FC12150'
152         self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
153         os.mkdir(self.fc1_root)
154         self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
155         os.mkdir(self.fc1_dir)
156         runxml = 'run_FC12150_2007-09-27.xml'
157         shutil.copy(os.path.join(TESTDATA_DIR, runxml),
158                     os.path.join(self.fc1_dir, runxml))
159         for i in range(1,9):
160             shutil.copy(
161                 os.path.join(TESTDATA_DIR,
162                              'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
163                 os.path.join(self.fc1_dir,
164                              'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
165                 )
166
167         self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
168         os.mkdir(self.fc2_dir)
169         os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
170         os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
171         os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
172
173     def tearDown(self):
174         shutil.rmtree(self.tempdir)
175
176     def test_flowcell_information(self):
177         """
178         Check the code that packs the django objects into simple types.
179         """
180         for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
181             fc_dict = experiments.flowcell_information(fc_id)
182             fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
183             self.assertEqual(fc_dict['flowcell_id'], fc_id)
184             self.assertEqual(fc_django.flowcell_id, fc_id)
185             self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
186             self.assertEqual(fc_dict['read_length'], fc_django.read_length)
187             self.assertEqual(fc_dict['notes'], fc_django.notes)
188             self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
189
190             for lane in fc_django.lane_set.all():
191                 lane_contents = fc_dict['lane_set'][lane.lane_number]
192                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
193                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
194                 self.assertEqual(lane_dict['comment'], lane.comment)
195                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
196                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
197                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
198                 self.assertEqual(lane_dict['library_id'], lane.library.id)
199                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
200                 self.assertEqual(lane_dict['library_species'],
201                                      lane.library.library_species.scientific_name)
202
203             response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
204             # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
205             fc_json = json.loads(response.content)
206             self.assertEqual(fc_json['flowcell_id'], fc_id)
207             self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
208             self.assertEqual(fc_json['read_length'], fc_django.read_length)
209             self.assertEqual(fc_json['notes'], fc_django.notes)
210             self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
211
212
213             for lane in fc_django.lane_set.all():
214                 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
215                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
216
217                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
218                 self.assertEqual(lane_dict['comment'], lane.comment)
219                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
220                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
221                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
222                 self.assertEqual(lane_dict['library_id'], lane.library.id)
223                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
224                 self.assertEqual(lane_dict['library_species'],
225                                      lane.library.library_species.scientific_name)
226
227     def test_invalid_flowcell(self):
228         """
229         Make sure we get a 404 if we request an invalid flowcell ID
230         """
231         response = self.client.get('/experiments/config/nottheone/json', apidata)
232         self.assertEqual(response.status_code, 404)
233
234     def test_no_key(self):
235         """
236         Require logging in to retrieve meta data
237         """
238         response = self.client.get(u'/experiments/config/FC12150/json')
239         self.assertEqual(response.status_code, 403)
240
241     def test_library_id(self):
242         """
243         Library IDs should be flexible, so make sure we can retrive a non-numeric ID
244         """
245         response = self.client.get('/experiments/config/FC12150/json', apidata)
246         self.assertEqual(response.status_code, 200)
247         flowcell = json.loads(response.content)
248
249         lane_contents = flowcell['lane_set']['3']
250         lane_library = lane_contents[0]
251         self.assertEqual(lane_library['library_id'], 'SL039')
252
253         response = self.client.get('/samples/library/SL039/json', apidata)
254         self.assertEqual(response.status_code, 200)
255         library_sl039 = json.loads(response.content)
256
257         self.assertEqual(library_sl039['library_id'], 'SL039')
258
259     def test_raw_id_field(self):
260         """
261         Test ticket:147
262
263         Library's have IDs, libraries also have primary keys,
264         we eventually had enough libraries that the drop down combo box was too
265         hard to filter through, unfortnately we want a field that uses our library
266         id and not the internal primary key, and raw_id_field uses primary keys.
267
268         This tests to make sure that the value entered in the raw library id field matches
269         the library id looked up.
270         """
271         expected_ids = [u'10981',u'11016',u'SL039',u'11060',
272                         u'11061',u'11062',u'11063',u'11064']
273         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
274         response = self.client.get('/admin/experiments/flowcell/153/')
275
276         tree = fromstring(response.content)
277         for i in range(0,8):
278             xpath_expression = '//input[@id="id_lane_set-%d-library"]'
279             input_field = tree.xpath(xpath_expression % (i,))[0]
280             library_field = input_field.find('../strong')
281             library_id, library_name = library_field.text.split(':')
282             # strip leading '#' sign from name
283             library_id = library_id[1:]
284             self.assertEqual(library_id, expected_ids[i])
285             self.assertEqual(input_field.attrib['value'], library_id)
286
287     def test_library_to_flowcell_link(self):
288         """
289         Make sure the library page includes links to the flowcell pages.
290         That work with flowcell IDs that have parenthetical comments.
291         """
292         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
293         response = self.client.get('/library/11070/')
294         self.assertEqual(response.status_code, 200)
295         status = validate_xhtml(response.content)
296         if status is not None: self.assertTrue(status)
297
298         tree = fromstring(response.content)
299         flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
300                                     namespaces=NSMAP)
301         self.assertEqual(flowcell_spans[0].text, '30012AAXX (failed)')
302         failed_fc_span = flowcell_spans[0]
303         failed_fc_a = failed_fc_span.getparent()
304         # make sure some of our RDF made it.
305         self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
306         self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
307         fc_response = self.client.get(failed_fc_a.get('href'))
308         self.assertEqual(fc_response.status_code, 200)
309         status = validate_xhtml(response.content)
310         if status is not None: self.assertTrue(status)
311
312         fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
313         self.assertEqual(fc_lane_response.status_code, 200)
314         status = validate_xhtml(response.content)
315         if status is not None: self.assertTrue(status)
316
317
318     def test_pooled_multiplex_id(self):
319         fc_dict = experiments.flowcell_information('42JU1AAXX')
320         lane_contents = fc_dict['lane_set'][3]
321         self.assertEqual(len(lane_contents), 2)
322         lane_dict = multi_lane_to_dict(lane_contents)
323
324         self.assertEqual(lane_dict['12044']['index_sequence'],
325                          {u'1': u'ATCACG',
326                           u'2': u'CGATGT',
327                           u'3': u'TTAGGC'})
328         self.assertEqual(lane_dict['11045']['index_sequence'],
329                          {u'1': u'ATCACG'})
330
331
332
333     def test_lanes_for(self):
334         """
335         Check the code that packs the django objects into simple types.
336         """
337         user = 'test'
338         lanes = experiments.lanes_for(user)
339         self.assertEqual(len(lanes), 5)
340
341         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
342         lanes_json = json.loads(response.content)
343         self.assertEqual(len(lanes), len(lanes_json))
344         for i in range(len(lanes)):
345             self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
346             self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
347             self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
348             self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
349
350     def test_lanes_for_no_lanes(self):
351         """
352         Do we get something meaningful back when the user isn't attached to anything?
353         """
354         user = 'supertest'
355         lanes = experiments.lanes_for(user)
356         self.assertEqual(len(lanes), 0)
357
358         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
359         lanes_json = json.loads(response.content)
360
361     def test_lanes_for_no_user(self):
362         """
363         Do we get something meaningful back when its the wrong user
364         """
365         user = 'not a real user'
366         self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
367
368         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
369         self.assertEqual(response.status_code, 404)
370
371
372     def test_raw_data_dir(self):
373         """Raw data path generator check"""
374         flowcell_id = self.fc1_id
375         raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
376
377         fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
378         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
379
380         fc.flowcell_id = flowcell_id + " (failed)"
381         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
382
383
384     def test_data_run_import(self):
385         srf_file_type = models.FileType.objects.get(name='SRF')
386         runxml_file_type = models.FileType.objects.get(name='run_xml')
387         flowcell_id = self.fc1_id
388         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
389         flowcell.update_data_runs()
390         self.assertEqual(len(flowcell.datarun_set.all()), 1)
391
392         run = flowcell.datarun_set.all()[0]
393         result_files = run.datafile_set.all()
394         result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
395
396         srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
397         self.assertEqual(srf4.file_type, srf_file_type)
398         self.assertEqual(srf4.library_id, '11060')
399         self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
400         self.assertEqual(
401             srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
402             '11060')
403         self.assertEqual(
404             srf4.pathname,
405             os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
406
407         lane_files = run.lane_files()
408         self.assertEqual(lane_files[4]['srf'], srf4)
409
410         runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
411         self.assertEqual(runxml.file_type, runxml_file_type)
412         self.assertEqual(runxml.library_id, None)
413
414         import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
415         # what happens if we import twice?
416         flowcell.import_data_run('FC12150/C1-37',
417                                  'run_FC12150_2007-09-27.xml')
418         self.assertEqual(
419             len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
420             import1)
421
422     def test_read_result_file(self):
423         """make sure we can return a result file
424         """
425         flowcell_id = self.fc1_id
426         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
427         flowcell.update_data_runs()
428
429         #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
430
431         result_files = flowcell.datarun_set.all()[0].datafile_set.all()
432         for f in result_files:
433             url = '/experiments/file/%s' % ( f.random_key,)
434             response = self.client.get(url)
435             self.assertEqual(response.status_code, 200)
436             mimetype = f.file_type.mimetype
437             if mimetype is None:
438                 mimetype = 'application/octet-stream'
439
440             self.assertEqual(mimetype, response['content-type'])
441
442     def test_flowcell_rdf(self):
443         import RDF
444         from htsworkflow.util.rdfhelp import get_model, \
445              fromTypedNode, \
446              load_string_into_model, \
447              rdfNS, \
448              libraryOntology, \
449              dump_model
450
451         model = get_model()
452
453         expected = {'1': ['11034'],
454                     '2': ['11036'],
455                     '3': ['12044','11045'],
456                     '4': ['11047','13044'],
457                     '5': ['11055'],
458                     '6': ['11067'],
459                     '7': ['11069'],
460                     '8': ['11070']}
461         url = '/flowcell/42JU1AAXX/'
462         response = self.client.get(url)
463         self.assertEqual(response.status_code, 200)
464         status = validate_xhtml(response.content)
465         if status is not None: self.assertTrue(status)
466
467         ns = urljoin('http://localhost', url)
468         load_string_into_model(model, 'rdfa', response.content, ns=ns)
469         body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
470         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
471
472         select ?flowcell ?flowcell_id ?lane_id ?library_id
473         where {
474           ?flowcell a libns:IlluminaFlowcell ;
475                     libns:flowcell_id ?flowcell_id ;
476                     libns:has_lane ?lane .
477           ?lane libns:lane_number ?lane_id ;
478                 libns:library ?library .
479           ?library libns:library_id ?library_id .
480         }"""
481         query = RDF.SPARQLQuery(body)
482         count = 0
483         for r in query.execute(model):
484             count += 1
485             self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
486             lane_id = fromTypedNode(r['lane_id'])
487             library_id = fromTypedNode(r['library_id'])
488             self.assertTrue(library_id in expected[lane_id])
489         self.assertEqual(count, 10)
490
491
492 class TestFileType(TestCase):
493     def test_file_type_unicode(self):
494         file_type_objects = models.FileType.objects
495         name = 'QSEQ tarfile'
496         file_type_object = file_type_objects.get(name=name)
497         self.assertEqual(u"QSEQ tarfile",
498                              unicode(file_type_object))
499
500     def test_find_file_type(self):
501         file_type_objects = models.FileType.objects
502         cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
503                   'QSEQ tarfile', 7, 1),
504                  ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
505                   'SRF', 1, None),
506                  ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
507                  ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
508                  ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
509                  ('s_1_export.txt.bz2','ELAND Export', 1, None),
510                  ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
511                  ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
512                  ('s_3_percent_all.png', 'IVC Percent All', 3, None),
513                  ('s_4_call.png', 'IVC Call', 4, None),
514                  ('s_5_all.png', 'IVC All', 5, None),
515                  ('Summary.htm', 'Summary.htm', None, None),
516                  ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
517          ]
518         for filename, typename, lane, end in cases:
519             ft = models.find_file_type_metadata_from_filename(filename)
520             self.assertEqual(ft['file_type'],
521                                  file_type_objects.get(name=typename))
522             self.assertEqual(ft.get('lane', None), lane)
523             self.assertEqual(ft.get('end', None), end)
524
525     def test_assign_file_type_complex_path(self):
526         file_type_objects = models.FileType.objects
527         cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
528                   'QSEQ tarfile', 7, 1),
529                  ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
530                   'SRF', 1, None),
531                  ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
532                  ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
533                  ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
534                  ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
535                  ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
536                  ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
537                  ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
538                  ('amonkey/s_4_call.png', 'IVC Call', 4, None),
539                  ('fishie/s_5_all.png', 'IVC All', 5, None),
540                  ('/random/Summary.htm', 'Summary.htm', None, None),
541                  ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
542          ]
543         for filename, typename, lane, end in cases:
544             result = models.find_file_type_metadata_from_filename(filename)
545             self.assertEqual(result['file_type'],
546                                  file_type_objects.get(name=typename))
547             self.assertEqual(result.get('lane',None), lane)
548             self.assertEqual(result.get('end', None), end)
549
550 class TestEmailNotify(TestCase):
551     fixtures = ['test_flowcells.json']
552
553     def test_started_email_not_logged_in(self):
554         response = self.client.get('/experiments/started/153/')
555         self.assertEqual(response.status_code, 302)
556
557     def test_started_email_logged_in_user(self):
558         self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
559         response = self.client.get('/experiments/started/153/')
560         self.assertEqual(response.status_code, 302)
561
562     def test_started_email_logged_in_staff(self):
563         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
564         response = self.client.get('/experiments/started/153/')
565         self.assertEqual(response.status_code, 200)
566
567     def test_started_email_send(self):
568         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
569         response = self.client.get('/experiments/started/153/')
570         self.assertEqual(response.status_code, 200)
571
572         self.assertTrue('pk1@example.com' in response.content)
573         self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
574
575         response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
576         self.assertEqual(response.status_code, 200)
577         self.assertEqual(len(mail.outbox), 4)
578         bcc = set(settings.NOTIFICATION_BCC).copy()
579         bcc.update(set(settings.MANAGERS))
580         for m in mail.outbox:
581             self.assertTrue(len(m.body) > 0)
582             self.assertEqual(set(m.bcc), bcc)
583
584     def test_email_navigation(self):
585         """
586         Can we navigate between the flowcell and email forms properly?
587         """
588         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
589         response = self.client.get('/experiments/started/153/')
590         self.assertEqual(response.status_code, 200)
591         self.assertTrue(re.search('Flowcell FC12150', response.content))
592         # require that navigation back to the admin page exists
593         self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
594
595 def multi_lane_to_dict(lane):
596     """Convert a list of lane entries into a dictionary indexed by library ID
597     """
598     return dict( ((x['library_id'],x) for x in lane) )
599
600 class TestSequencer(TestCase):
601     fixtures = ['test_flowcells.json',
602                 ]
603
604     def test_name_generation(self):
605         seq = models.Sequencer()
606         seq.name = "Seq1"
607         seq.instrument_name = "HWI-SEQ1"
608         seq.model = "Imaginary 5000"
609
610         self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
611
612     def test_lookup(self):
613         fc = models.FlowCell.objects.get(pk=153)
614         self.assertEqual(fc.sequencer.model,
615                              "Illumina Genome Analyzer IIx")
616         self.assertEqual(fc.sequencer.instrument_name,
617                              "ILLUMINA-EC5D15")
618         # well actually we let the browser tack on the host name
619         url = fc.get_absolute_url()
620         self.assertEqual(url, '/flowcell/FC12150/')
621
622     def test_rdf(self):
623         response = self.client.get('/flowcell/FC12150/', apidata)
624         tree = fromstring(response.content)
625         seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
626                             namespaces=NSMAP)
627         self.assertEqual(len(seq_by), 1)
628         self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
629         seq = seq_by[0].getchildren()
630         self.assertEqual(len(seq), 1)
631         self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
632         self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
633
634         name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
635         self.assertEqual(len(name), 1)
636         self.assertEqual(name[0].text, 'Tardigrade')
637         instrument = seq[0].xpath(
638             './span[@property="libns:sequencer_instrument"]')
639         self.assertEqual(len(instrument), 1)
640         self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
641         model = seq[0].xpath(
642             './span[@property="libns:sequencer_model"]')
643         self.assertEqual(len(model), 1)
644         self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
645
646     def test_flowcell_with_rdf_validation(self):
647         from htsworkflow.util.rdfhelp import add_default_schemas, \
648              dump_model, \
649              get_model, \
650              load_string_into_model
651         from htsworkflow.util.rdfinfer import Infer
652
653         model = get_model()
654         add_default_schemas(model)
655         inference = Infer(model)
656
657         url ='/flowcell/FC12150/'
658         response = self.client.get(url)
659         self.assertEqual(response.status_code, 200)
660         status = validate_xhtml(response.content)
661         if status is not None: self.assertTrue(status)
662
663         load_string_into_model(model, 'rdfa', response.content)
664
665         errmsgs = list(inference.run_validation())
666         self.assertEqual(len(errmsgs), 0)
667
668     def test_lane_with_rdf_validation(self):
669         from htsworkflow.util.rdfhelp import add_default_schemas, \
670              dump_model, \
671              get_model, \
672              load_string_into_model
673         from htsworkflow.util.rdfinfer import Infer
674
675         model = get_model()
676         add_default_schemas(model)
677         inference = Infer(model)
678
679         url = '/lane/1193'
680         response = self.client.get(url)
681         self.assertEqual(response.status_code, 200)
682         status = validate_xhtml(response.content)
683         if status is not None: self.assertTrue(status)
684
685         load_string_into_model(model, 'rdfa', response.content)
686
687         errmsgs = list(inference.run_validation())
688         self.assertEqual(len(errmsgs), 0)
689
690
691 OLD_DB = settings.DATABASES['default']['NAME']
692 def setUpModule():
693     setup_test_environment()
694     connection.creation.create_test_db()
695
696 def tearDownModule():
697     connection.creation.destroy_test_db(OLD_DB)
698     teardown_test_environment()
699
700 def suite():
701     from unittest2 import TestSuite, defaultTestLoader
702     suite = TestSuite()
703     for testcase in [ClusterStationTestCases,
704                      SequencerTestCases,
705                      ExerimentsTestCases,
706                      TestFileType,
707                      TestEmailNotify,
708                      TestSequencer]:
709         suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
710     return suite
711
712 if __name__ == "__main__":
713     from unittest2 import main
714     main(defaultTest="suite")