Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / frontend / experiments / test_experiments.py
1 import re
2 from lxml.html import fromstring
3 try:
4     import json
5 except ImportError, e:
6     import simplejson as json
7 from datetime import timedelta
8 import os
9 import shutil
10 import sys
11 import tempfile
12 from urlparse import urljoin
13
14 from django.conf import settings
15 from django.core import mail
16 from django.core.exceptions import ObjectDoesNotExist
17 from django.test import TestCase
18 from django.test.utils import setup_test_environment, teardown_test_environment
19 from django.db import connection
20 from django.conf import settings
21 from htsworkflow.frontend.experiments import models
22 from htsworkflow.frontend.experiments import experiments
23 from htsworkflow.frontend.auth import apidata
24 from htsworkflow.util.ethelp import validate_xhtml
25
26 from htsworkflow.pipelines.test.simulate_runfolder import TESTDATA_DIR
27
28 LANE_SET = range(1,9)
29
30 NSMAP = {'libns':'http://jumpgate.caltech.edu/wiki/LibraryOntology#'}
31
32 from django.db import connection
33
34 class ClusterStationTestCases(TestCase):
35     fixtures = ['initial_data.json',
36                 'test_flowcells.json']
37
38     def test_default(self):
39         c = models.ClusterStation.default()
40         self.assertEqual(c.id, 2)
41
42         c.isdefault = False
43         c.save()
44
45         total = models.ClusterStation.objects.filter(isdefault=True).count()
46         self.assertEqual(total, 0)
47
48         other_default = models.ClusterStation.default()
49         self.assertEqual(other_default.id, 3)
50
51
52     def test_update_default(self):
53         old_default = models.ClusterStation.default()
54
55         c = models.ClusterStation.objects.get(pk=3)
56         c.isdefault = True
57         c.save()
58
59         new_default = models.ClusterStation.default()
60
61         self.assertNotEqual(old_default, new_default)
62         self.assertEqual(new_default, c)
63
64         total = models.ClusterStation.objects.filter(isdefault=True).count()
65         self.assertEqual(total, 1)
66
67     def test_update_other(self):
68         old_default = models.ClusterStation.default()
69         total = models.ClusterStation.objects.filter(isdefault=True).count()
70         self.assertEqual(total, 1)
71
72         c = models.ClusterStation.objects.get(pk=1)
73         c.name = "Primary Key 1"
74         c.save()
75
76         total = models.ClusterStation.objects.filter(isdefault=True).count()
77         self.assertEqual(total, 1)
78
79         new_default = models.ClusterStation.default()
80         self.assertEqual(old_default, new_default)
81
82
83 class SequencerTestCases(TestCase):
84     fixtures = ['initial_data.json',
85                 'test_flowcells.json']
86
87     def test_default(self):
88         # starting with no default
89         s = models.Sequencer.default()
90         self.assertEqual(s.id, 2)
91
92         total = models.Sequencer.objects.filter(isdefault=True).count()
93         self.assertEqual(total, 1)
94
95         s.isdefault = False
96         s.save()
97
98         total = models.Sequencer.objects.filter(isdefault=True).count()
99         self.assertEqual(total, 0)
100
101         other_default = models.Sequencer.default()
102         self.assertEqual(other_default.id, 7)
103
104     def test_update_default(self):
105         old_default = models.Sequencer.default()
106
107         s = models.Sequencer.objects.get(pk=1)
108         s.isdefault = True
109         s.save()
110
111         new_default = models.Sequencer.default()
112
113         self.assertNotEqual(old_default, new_default)
114         self.assertEqual(new_default, s)
115
116         total = models.Sequencer.objects.filter(isdefault=True).count()
117         self.assertEqual(total, 1)
118
119
120     def test_update_other(self):
121         old_default = models.Sequencer.default()
122         total = models.Sequencer.objects.filter(isdefault=True).count()
123         self.assertEqual(total, 1)
124
125         s = models.Sequencer.objects.get(pk=1)
126         s.name = "Primary Key 1"
127         s.save()
128
129         total = models.Sequencer.objects.filter(isdefault=True).count()
130         self.assertEqual(total, 1)
131
132         new_default = models.Sequencer.default()
133         self.assertEqual(old_default, new_default)
134
135
136 class ExperimentsTestCases(TestCase):
137     fixtures = ['initial_data.json',
138                 'test_flowcells.json',
139                 ]
140
141     def setUp(self):
142         self.tempdir = tempfile.mkdtemp(prefix='htsw-test-experiments-')
143         settings.RESULT_HOME_DIR = self.tempdir
144
145         self.fc1_id = 'FC12150'
146         self.fc1_root = os.path.join(self.tempdir, self.fc1_id)
147         os.mkdir(self.fc1_root)
148         self.fc1_dir = os.path.join(self.fc1_root, 'C1-37')
149         os.mkdir(self.fc1_dir)
150         runxml = 'run_FC12150_2007-09-27.xml'
151         shutil.copy(os.path.join(TESTDATA_DIR, runxml),
152                     os.path.join(self.fc1_dir, runxml))
153         for i in range(1,9):
154             shutil.copy(
155                 os.path.join(TESTDATA_DIR,
156                              'woldlab_070829_USI-EAS44_0017_FC11055_1.srf'),
157                 os.path.join(self.fc1_dir,
158                              'woldlab_070829_SERIAL_FC12150_%d.srf' %(i,))
159                 )
160
161         self.fc2_dir = os.path.join(self.tempdir, '42JTNAAXX')
162         os.mkdir(self.fc2_dir)
163         os.mkdir(os.path.join(self.fc2_dir, 'C1-25'))
164         os.mkdir(os.path.join(self.fc2_dir, 'C1-37'))
165         os.mkdir(os.path.join(self.fc2_dir, 'C1-37', 'Plots'))
166
167     def tearDown(self):
168         shutil.rmtree(self.tempdir)
169
170     def test_flowcell_information(self):
171         """
172         Check the code that packs the django objects into simple types.
173         """
174         for fc_id in [u'FC12150', u"42JTNAAXX", "42JU1AAXX"]:
175             fc_dict = experiments.flowcell_information(fc_id)
176             fc_django = models.FlowCell.objects.get(flowcell_id=fc_id)
177             self.assertEqual(fc_dict['flowcell_id'], fc_id)
178             self.assertEqual(fc_django.flowcell_id, fc_id)
179             self.assertEqual(fc_dict['sequencer'], fc_django.sequencer.name)
180             self.assertEqual(fc_dict['read_length'], fc_django.read_length)
181             self.assertEqual(fc_dict['notes'], fc_django.notes)
182             self.assertEqual(fc_dict['cluster_station'], fc_django.cluster_station.name)
183
184             for lane in fc_django.lane_set.all():
185                 lane_contents = fc_dict['lane_set'][lane.lane_number]
186                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
187                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
188                 self.assertEqual(lane_dict['comment'], lane.comment)
189                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
190                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
191                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
192                 self.assertEqual(lane_dict['library_id'], lane.library.id)
193                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
194                 self.assertEqual(lane_dict['library_species'],
195                                      lane.library.library_species.scientific_name)
196
197             response = self.client.get('/experiments/config/%s/json' % (fc_id,), apidata)
198             # strptime isoformat string = '%Y-%m-%dT%H:%M:%S'
199             fc_json = json.loads(response.content)
200             self.assertEqual(fc_json['flowcell_id'], fc_id)
201             self.assertEqual(fc_json['sequencer'], fc_django.sequencer.name)
202             self.assertEqual(fc_json['read_length'], fc_django.read_length)
203             self.assertEqual(fc_json['notes'], fc_django.notes)
204             self.assertEqual(fc_json['cluster_station'], fc_django.cluster_station.name)
205
206
207             for lane in fc_django.lane_set.all():
208                 lane_contents = fc_json['lane_set'][unicode(lane.lane_number)]
209                 lane_dict = multi_lane_to_dict(lane_contents)[lane.library_id]
210
211                 self.assertEqual(lane_dict['cluster_estimate'], lane.cluster_estimate)
212                 self.assertEqual(lane_dict['comment'], lane.comment)
213                 self.assertEqual(lane_dict['flowcell'], lane.flowcell.flowcell_id)
214                 self.assertEqual(lane_dict['lane_number'], lane.lane_number)
215                 self.assertEqual(lane_dict['library_name'], lane.library.library_name)
216                 self.assertEqual(lane_dict['library_id'], lane.library.id)
217                 self.assertAlmostEqual(float(lane_dict['pM']), float(lane.pM))
218                 self.assertEqual(lane_dict['library_species'],
219                                      lane.library.library_species.scientific_name)
220
221     def test_invalid_flowcell(self):
222         """
223         Make sure we get a 404 if we request an invalid flowcell ID
224         """
225         response = self.client.get('/experiments/config/nottheone/json', apidata)
226         self.assertEqual(response.status_code, 404)
227
228     def test_no_key(self):
229         """
230         Require logging in to retrieve meta data
231         """
232         response = self.client.get(u'/experiments/config/FC12150/json')
233         self.assertEqual(response.status_code, 403)
234
235     def test_library_id(self):
236         """
237         Library IDs should be flexible, so make sure we can retrive a non-numeric ID
238         """
239         response = self.client.get('/experiments/config/FC12150/json', apidata)
240         self.assertEqual(response.status_code, 200)
241         flowcell = json.loads(response.content)
242
243         lane_contents = flowcell['lane_set']['3']
244         lane_library = lane_contents[0]
245         self.assertEqual(lane_library['library_id'], 'SL039')
246
247         response = self.client.get('/samples/library/SL039/json', apidata)
248         self.assertEqual(response.status_code, 200)
249         library_sl039 = json.loads(response.content)
250
251         self.assertEqual(library_sl039['library_id'], 'SL039')
252
253     def test_raw_id_field(self):
254         """
255         Test ticket:147
256
257         Library's have IDs, libraries also have primary keys,
258         we eventually had enough libraries that the drop down combo box was too
259         hard to filter through, unfortnately we want a field that uses our library
260         id and not the internal primary key, and raw_id_field uses primary keys.
261
262         This tests to make sure that the value entered in the raw library id field matches
263         the library id looked up.
264         """
265         expected_ids = [u'10981',u'11016',u'SL039',u'11060',
266                         u'11061',u'11062',u'11063',u'11064']
267         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
268         response = self.client.get('/admin/experiments/flowcell/153/')
269
270         tree = fromstring(response.content)
271         for i in range(0,8):
272             xpath_expression = '//input[@id="id_lane_set-%d-library"]'
273             input_field = tree.xpath(xpath_expression % (i,))[0]
274             library_field = input_field.find('../strong')
275             library_id, library_name = library_field.text.split(':')
276             # strip leading '#' sign from name
277             library_id = library_id[1:]
278             self.assertEqual(library_id, expected_ids[i])
279             self.assertEqual(input_field.attrib['value'], library_id)
280
281     def test_library_to_flowcell_link(self):
282         """
283         Make sure the library page includes links to the flowcell pages.
284         That work with flowcell IDs that have parenthetical comments.
285         """
286         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
287         response = self.client.get('/library/11070/')
288         self.assertEqual(response.status_code, 200)
289         status = validate_xhtml(response.content)
290         if status is not None: self.assertTrue(status)
291
292         tree = fromstring(response.content)
293         flowcell_spans = tree.xpath('//span[@property="libns:flowcell_id"]',
294                                     namespaces=NSMAP)
295         self.assertEqual(flowcell_spans[1].text, '30012AAXX (failed)')
296         failed_fc_span = flowcell_spans[1]
297         failed_fc_a = failed_fc_span.getparent()
298         # make sure some of our RDF made it.
299         self.assertEqual(failed_fc_a.get('typeof'), 'libns:IlluminaFlowcell')
300         self.assertEqual(failed_fc_a.get('href'), '/flowcell/30012AAXX/')
301         fc_response = self.client.get(failed_fc_a.get('href'))
302         self.assertEqual(fc_response.status_code, 200)
303         status = validate_xhtml(response.content)
304         if status is not None: self.assertTrue(status)
305
306         fc_lane_response = self.client.get('/flowcell/30012AAXX/8/')
307         self.assertEqual(fc_lane_response.status_code, 200)
308         status = validate_xhtml(response.content)
309         if status is not None: self.assertTrue(status)
310
311
312     def test_pooled_multiplex_id(self):
313         fc_dict = experiments.flowcell_information('42JU1AAXX')
314         lane_contents = fc_dict['lane_set'][3]
315         self.assertEqual(len(lane_contents), 2)
316         lane_dict = multi_lane_to_dict(lane_contents)
317
318         self.assertEqual(lane_dict['12044']['index_sequence'],
319                          {u'1': u'ATCACG',
320                           u'2': u'CGATGT',
321                           u'3': u'TTAGGC'})
322         self.assertEqual(lane_dict['11045']['index_sequence'],
323                          {u'1': u'ATCACG'})
324
325
326
327     def test_lanes_for(self):
328         """
329         Check the code that packs the django objects into simple types.
330         """
331         user = 'test'
332         lanes = experiments.lanes_for(user)
333         self.assertEqual(len(lanes), 5)
334
335         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
336         lanes_json = json.loads(response.content)
337         self.assertEqual(len(lanes), len(lanes_json))
338         for i in range(len(lanes)):
339             self.assertEqual(lanes[i]['comment'], lanes_json[i]['comment'])
340             self.assertEqual(lanes[i]['lane_number'], lanes_json[i]['lane_number'])
341             self.assertEqual(lanes[i]['flowcell'], lanes_json[i]['flowcell'])
342             self.assertEqual(lanes[i]['run_date'], lanes_json[i]['run_date'])
343
344     def test_lanes_for_no_lanes(self):
345         """
346         Do we get something meaningful back when the user isn't attached to anything?
347         """
348         user = 'supertest'
349         lanes = experiments.lanes_for(user)
350         self.assertEqual(len(lanes), 0)
351
352         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
353         lanes_json = json.loads(response.content)
354
355     def test_lanes_for_no_user(self):
356         """
357         Do we get something meaningful back when its the wrong user
358         """
359         user = 'not a real user'
360         self.assertRaises(ObjectDoesNotExist, experiments.lanes_for, user)
361
362         response = self.client.get('/experiments/lanes_for/%s/json' % (user,), apidata)
363         self.assertEqual(response.status_code, 404)
364
365
366     def test_raw_data_dir(self):
367         """Raw data path generator check"""
368         flowcell_id = self.fc1_id
369         raw_dir = os.path.join(settings.RESULT_HOME_DIR, flowcell_id)
370
371         fc = models.FlowCell.objects.get(flowcell_id=flowcell_id)
372         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
373
374         fc.flowcell_id = flowcell_id + " (failed)"
375         self.assertEqual(fc.get_raw_data_directory(), raw_dir)
376
377
378     def test_data_run_import(self):
379         srf_file_type = models.FileType.objects.get(name='SRF')
380         runxml_file_type = models.FileType.objects.get(name='run_xml')
381         flowcell_id = self.fc1_id
382         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
383         flowcell.update_data_runs()
384         self.assertEqual(len(flowcell.datarun_set.all()), 1)
385
386         run = flowcell.datarun_set.all()[0]
387         result_files = run.datafile_set.all()
388         result_dict = dict(((rf.relative_pathname, rf) for rf in result_files))
389
390         srf4 = result_dict['FC12150/C1-37/woldlab_070829_SERIAL_FC12150_4.srf']
391         self.assertEqual(srf4.file_type, srf_file_type)
392         self.assertEqual(srf4.library_id, '11060')
393         self.assertEqual(srf4.data_run.flowcell.flowcell_id, 'FC12150')
394         self.assertEqual(
395             srf4.data_run.flowcell.lane_set.get(lane_number=4).library_id,
396             '11060')
397         self.assertEqual(
398             srf4.pathname,
399             os.path.join(settings.RESULT_HOME_DIR, srf4.relative_pathname))
400
401         lane_files = run.lane_files()
402         self.assertEqual(lane_files[4]['srf'], srf4)
403
404         runxml= result_dict['FC12150/C1-37/run_FC12150_2007-09-27.xml']
405         self.assertEqual(runxml.file_type, runxml_file_type)
406         self.assertEqual(runxml.library_id, None)
407
408         import1 = len(models.DataRun.objects.filter(result_dir='FC12150/C1-37'))
409         # what happens if we import twice?
410         flowcell.import_data_run('FC12150/C1-37',
411                                  'run_FC12150_2007-09-27.xml')
412         self.assertEqual(
413             len(models.DataRun.objects.filter(result_dir='FC12150/C1-37')),
414             import1)
415
416     def test_read_result_file(self):
417         """make sure we can return a result file
418         """
419         flowcell_id = self.fc1_id
420         flowcell = models.FlowCell.objects.get(flowcell_id=flowcell_id)
421         flowcell.update_data_runs()
422
423         #self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
424
425         result_files = flowcell.datarun_set.all()[0].datafile_set.all()
426         for f in result_files:
427             url = '/experiments/file/%s' % ( f.random_key,)
428             response = self.client.get(url)
429             self.assertEqual(response.status_code, 200)
430             mimetype = f.file_type.mimetype
431             if mimetype is None:
432                 mimetype = 'application/octet-stream'
433
434             self.assertEqual(mimetype, response['content-type'])
435
436     def test_flowcell_rdf(self):
437         import RDF
438         from htsworkflow.util.rdfhelp import get_model, \
439              fromTypedNode, \
440              load_string_into_model, \
441              rdfNS, \
442              libraryOntology, \
443              dump_model
444
445         model = get_model()
446
447         expected = {'1': ['11034'],
448                     '2': ['11036'],
449                     '3': ['12044','11045'],
450                     '4': ['11047','13044'],
451                     '5': ['11055'],
452                     '6': ['11067'],
453                     '7': ['11069'],
454                     '8': ['11070']}
455         url = '/flowcell/42JU1AAXX/'
456         response = self.client.get(url)
457         self.assertEqual(response.status_code, 200)
458         status = validate_xhtml(response.content)
459         if status is not None: self.assertTrue(status)
460
461         ns = urljoin('http://localhost', url)
462         load_string_into_model(model, 'rdfa', response.content, ns=ns)
463         body = """prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
464         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
465
466         select ?flowcell ?flowcell_id ?lane_id ?library_id
467         where {
468           ?flowcell a libns:IlluminaFlowcell ;
469                     libns:flowcell_id ?flowcell_id ;
470                     libns:has_lane ?lane .
471           ?lane libns:lane_number ?lane_id ;
472                 libns:library ?library .
473           ?library libns:library_id ?library_id .
474         }"""
475         query = RDF.SPARQLQuery(body)
476         count = 0
477         for r in query.execute(model):
478             count += 1
479             self.assertEqual(fromTypedNode(r['flowcell_id']), u'42JU1AAXX')
480             lane_id = fromTypedNode(r['lane_id'])
481             library_id = fromTypedNode(r['library_id'])
482             self.assertTrue(library_id in expected[lane_id])
483         self.assertEqual(count, 10)
484
485     def test_flowcell_estimates(self):
486         classic_flowcell = models.FlowCell.objects.get(pk=153)
487         classic_mid = experiments.estimateFlowcellDuration(classic_flowcell)
488
489         self.assertEqual(classic_mid, timedelta(4, 44000))
490
491         rapid_flowcell = models.FlowCell.objects.get(pk=300)
492         rapid_mid = experiments.estimateFlowcellDuration(rapid_flowcell)
493         self.assertEqual(rapid_mid, timedelta(seconds=60800))
494
495     def test_round_to_days(self):
496         data = [
497             [timedelta(2, 12345), (timedelta(days=2), timedelta(days=3))],
498             [timedelta(0, 345), (timedelta(days=0), timedelta(days=1))],
499         ]
500
501         for estimate, expected in data:
502             rounded = experiments.roundToDays(estimate)
503             self.assertEqual(rounded, expected)
504
505 class TestFileType(TestCase):
506     fixtures = ['initial_data.json',
507                 'test_flowcells.json',
508                 ]
509
510     def test_file_type_unicode(self):
511         file_type_objects = models.FileType.objects
512         name = 'QSEQ tarfile'
513         file_type_object = file_type_objects.get(name=name)
514         self.assertEqual(u"QSEQ tarfile",
515                              unicode(file_type_object))
516
517     def test_find_file_type(self):
518         file_type_objects = models.FileType.objects
519         cases = [('woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
520                   'QSEQ tarfile', 7, 1),
521                  ('woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
522                   'SRF', 1, None),
523                  ('s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
524                  ('s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
525                  ('s_3_eland_result.txt.bz2','ELAND Result', 3, None),
526                  ('s_1_export.txt.bz2','ELAND Export', 1, None),
527                  ('s_1_percent_call.png', 'IVC Percent Call', 1, None),
528                  ('s_2_percent_base.png', 'IVC Percent Base', 2, None),
529                  ('s_3_percent_all.png', 'IVC Percent All', 3, None),
530                  ('s_4_call.png', 'IVC Call', 4, None),
531                  ('s_5_all.png', 'IVC All', 5, None),
532                  ('Summary.htm', 'Summary.htm', None, None),
533                  ('run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
534          ]
535         for filename, typename, lane, end in cases:
536             ft = models.find_file_type_metadata_from_filename(filename)
537             self.assertEqual(ft['file_type'],
538                                  file_type_objects.get(name=typename))
539             self.assertEqual(ft.get('lane', None), lane)
540             self.assertEqual(ft.get('end', None), end)
541
542     def test_assign_file_type_complex_path(self):
543         file_type_objects = models.FileType.objects
544         cases = [('/a/b/c/woldlab_090921_HWUSI-EAS627_0009_42FC3AAXX_l7_r1.tar.bz2',
545                   'QSEQ tarfile', 7, 1),
546                  ('foo/woldlab_091005_HWUSI-EAS627_0010_42JT2AAXX_1.srf',
547                   'SRF', 1, None),
548                  ('../s_1_eland_extended.txt.bz2','ELAND Extended', 1, None),
549                  ('/bleem/s_7_eland_multi.txt.bz2', 'ELAND Multi', 7, None),
550                  ('/qwer/s_3_eland_result.txt.bz2','ELAND Result', 3, None),
551                  ('/ty///1/s_1_export.txt.bz2','ELAND Export', 1, None),
552                  ('/help/s_1_percent_call.png', 'IVC Percent Call', 1, None),
553                  ('/bored/s_2_percent_base.png', 'IVC Percent Base', 2, None),
554                  ('/example1/s_3_percent_all.png', 'IVC Percent All', 3, None),
555                  ('amonkey/s_4_call.png', 'IVC Call', 4, None),
556                  ('fishie/s_5_all.png', 'IVC All', 5, None),
557                  ('/random/Summary.htm', 'Summary.htm', None, None),
558                  ('/notrandom/run_42JT2AAXX_2009-10-07.xml', 'run_xml', None, None),
559          ]
560         for filename, typename, lane, end in cases:
561             result = models.find_file_type_metadata_from_filename(filename)
562             self.assertEqual(result['file_type'],
563                                  file_type_objects.get(name=typename))
564             self.assertEqual(result.get('lane',None), lane)
565             self.assertEqual(result.get('end', None), end)
566
567 class TestEmailNotify(TestCase):
568     fixtures = ['initial_data.json',
569                 'test_flowcells.json']
570
571     def test_started_email_not_logged_in(self):
572         response = self.client.get('/experiments/started/153/')
573         self.assertEqual(response.status_code, 302)
574
575     def test_started_email_logged_in_user(self):
576         self.client.login(username='test', password='BJOKL5kAj6aFZ6A5')
577         response = self.client.get('/experiments/started/153/')
578         self.assertEqual(response.status_code, 302)
579
580     def test_started_email_logged_in_staff(self):
581         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
582         response = self.client.get('/experiments/started/153/')
583         self.assertEqual(response.status_code, 200)
584
585     def test_started_email_send(self):
586         self.client.login(username='admintest', password='BJOKL5kAj6aFZ6A5')
587         response = self.client.get('/experiments/started/153/')
588         self.assertEqual(response.status_code, 200)
589
590         self.assertTrue('pk1@example.com' in response.content)
591         self.assertTrue('Lane #8 : (11064) Paired ends 104' in response.content)
592
593         response = self.client.get('/experiments/started/153/', {'send':'1','bcc':'on'})
594         self.assertEqual(response.status_code, 200)
595         self.assertEqual(len(mail.outbox), 4)
596         bcc = set(settings.NOTIFICATION_BCC).copy()
597         bcc.update(set(settings.MANAGERS))
598         for m in mail.outbox:
599             self.assertTrue(len(m.body) > 0)
600             self.assertEqual(set(m.bcc), bcc)
601
602     def test_email_navigation(self):
603         """
604         Can we navigate between the flowcell and email forms properly?
605         """
606         self.client.login(username='supertest', password='BJOKL5kAj6aFZ6A5')
607         response = self.client.get('/experiments/started/153/')
608         self.assertEqual(response.status_code, 200)
609         self.assertTrue(re.search('Flowcell FC12150', response.content))
610         # require that navigation back to the admin page exists
611         self.assertTrue(re.search('<a href="/admin/experiments/flowcell/153/">[^<]+</a>', response.content))
612
613 def multi_lane_to_dict(lane):
614     """Convert a list of lane entries into a dictionary indexed by library ID
615     """
616     return dict( ((x['library_id'],x) for x in lane) )
617
618 class TestSequencer(TestCase):
619     fixtures = ['initial_data.json',
620                 'test_flowcells.json',
621                 ]
622
623     def test_name_generation(self):
624         seq = models.Sequencer()
625         seq.name = "Seq1"
626         seq.instrument_name = "HWI-SEQ1"
627         seq.model = "Imaginary 5000"
628
629         self.assertEqual(unicode(seq), "Seq1 (HWI-SEQ1)")
630
631     def test_lookup(self):
632         fc = models.FlowCell.objects.get(pk=153)
633         self.assertEqual(fc.sequencer.model,
634                              "Illumina Genome Analyzer IIx")
635         self.assertEqual(fc.sequencer.instrument_name,
636                              "ILLUMINA-EC5D15")
637         # well actually we let the browser tack on the host name
638         url = fc.get_absolute_url()
639         self.assertEqual(url, '/flowcell/FC12150/')
640
641     def test_rdf(self):
642         response = self.client.get('/flowcell/FC12150/', apidata)
643         tree = fromstring(response.content)
644         seq_by = tree.xpath('//div[@rel="libns:sequenced_by"]',
645                             namespaces=NSMAP)
646         self.assertEqual(len(seq_by), 1)
647         self.assertEqual(seq_by[0].attrib['rel'], 'libns:sequenced_by')
648         seq = seq_by[0].getchildren()
649         self.assertEqual(len(seq), 1)
650         self.assertEqual(seq[0].attrib['about'], '/sequencer/2')
651         self.assertEqual(seq[0].attrib['typeof'], 'libns:Sequencer')
652
653         name = seq[0].xpath('./span[@property="libns:sequencer_name"]')
654         self.assertEqual(len(name), 1)
655         self.assertEqual(name[0].text, 'Tardigrade')
656         instrument = seq[0].xpath(
657             './span[@property="libns:sequencer_instrument"]')
658         self.assertEqual(len(instrument), 1)
659         self.assertEqual(instrument[0].text, 'ILLUMINA-EC5D15')
660         model = seq[0].xpath(
661             './span[@property="libns:sequencer_model"]')
662         self.assertEqual(len(model), 1)
663         self.assertEqual(model[0].text, 'Illumina Genome Analyzer IIx')
664
665     def test_flowcell_with_rdf_validation(self):
666         from htsworkflow.util.rdfhelp import add_default_schemas, \
667              dump_model, \
668              get_model, \
669              load_string_into_model
670         from htsworkflow.util.rdfinfer import Infer
671
672         model = get_model()
673         add_default_schemas(model)
674         inference = Infer(model)
675
676         url ='/flowcell/FC12150/'
677         response = self.client.get(url)
678         self.assertEqual(response.status_code, 200)
679         status = validate_xhtml(response.content)
680         if status is not None: self.assertTrue(status)
681
682         load_string_into_model(model, 'rdfa', response.content)
683
684         errmsgs = list(inference.run_validation())
685         self.assertEqual(len(errmsgs), 0)
686
687     def test_lane_with_rdf_validation(self):
688         from htsworkflow.util.rdfhelp import add_default_schemas, \
689              dump_model, \
690              get_model, \
691              load_string_into_model
692         from htsworkflow.util.rdfinfer import Infer
693
694         model = get_model()
695         add_default_schemas(model)
696         inference = Infer(model)
697
698         url = '/lane/1193'
699         response = self.client.get(url)
700         self.assertEqual(response.status_code, 200)
701         status = validate_xhtml(response.content)
702         if status is not None: self.assertTrue(status)
703
704         load_string_into_model(model, 'rdfa', response.content)
705
706         errmsgs = list(inference.run_validation())
707         self.assertEqual(len(errmsgs), 0)
708
709 def suite():
710     from unittest import TestSuite, defaultTestLoader
711     suite = TestSuite()
712     for testcase in [ClusterStationTestCases,
713                      SequencerTestCases,
714                      ExerimentsTestCases,
715                      TestFileType,
716                      TestEmailNotify,
717                      TestSequencer]:
718         suite.addTests(defaultTestLoader.loadTestsFromTestCase(testcase))
719     return suite
720
721 if __name__ == "__main__":
722     from unittest import main
723     main(defaultTest="suite")