Test htsworkflow under several different django & python versions
[htsworkflow.git] / inventory / test_inventory.py
1 from __future__ import absolute_import, print_function
2
3 from django.test import TestCase
4 from django.test.utils import setup_test_environment, \
5      teardown_test_environment
6 from django.db import connection
7 from django.conf import settings
8
9 from django.contrib.auth.models import User
10 from django.core.urlresolvers import reverse
11 from django.utils.encoding import smart_text
12
13 from rdflib import Graph, Literal, URIRef
14
15 from .models import Item, Vendor
16 from .inventory_factory import ItemFactory, LongTermStorageFactory
17 from samples.samples_factory import HTSUserFactory, LibraryFactory
18 from experiments.experiments_factory import FlowCellFactory
19 from htsworkflow.util.rdfns import inventoryOntology, libraryOntology
20
21 def localhostNode(url):
22     return URIRef('http://localhost%s' % (url,))
23
24 class InventoryTestCase(TestCase):
25     def setUp(self):
26         self.password = 'foo'
27         self.user = HTSUserFactory.create()
28         self.user.set_password(self.password)
29         self.user.save()
30
31     def test_item(self):
32         item = ItemFactory()
33         self.assertTrue(len(item.uuid), 32)
34         url = '/inventory/{}/'.format(item.uuid)
35         self.assertTrue(self.client.login(username=self.user.username,
36                                           password=self.password))
37         response = self.client.get(url)
38         self.failUnlessEqual(response.status_code, 200)
39         content = smart_text(response.content)
40
41         model = Graph()
42         model.parse(data=content, format='rdfa', publicID=url)
43
44         itemNode = URIRef(url)
45         items = list(model.objects(itemNode, inventoryOntology['item_type']))
46         item_type = items[0].toPython()
47
48         self.failUnlessEqual(item_type, item.item_type.name)
49
50     def test_itemindex(self):
51         item = ItemFactory()
52         fc1 = FlowCellFactory()
53         lib1 = LibraryFactory()
54         lts = LongTermStorageFactory(flowcell=fc1,
55                                      libraries=[lib1,],
56                                      storage_devices=[item,],)
57
58         url = reverse('itemtype_index',
59                       kwargs={'name': item.item_type.name})
60         disk_url = reverse('item_summary_by_uuid',
61                            kwargs={'uuid': item.uuid})
62         indexNode = localhostNode(url)
63         diskNode = localhostNode(disk_url)
64         self.assertTrue(self.client.login(username=self.user.username,
65                                           password=self.password))
66
67         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
68         self.failUnlessEqual(len(flowcells), 1)
69         flowcell_url = reverse('flowcell_detail',
70                                kwargs={'flowcell_id': fc1.flowcell_id})
71         self.assertTrue(flowcells[0].endswith(flowcell_url))
72
73
74     def test_add_disk(self):
75         item = ItemFactory()
76         url = reverse('itemtype_index',
77                       kwargs={'name': item.item_type.name})
78         disk_url = reverse('item_summary_by_uuid',
79                            kwargs={'uuid': item.uuid})
80         indexNode = localhostNode(url)
81         diskNode = localhostNode(disk_url)
82
83         self.assertTrue(self.client.login(username=self.user.username,
84                                           password=self.password))
85
86         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
87         self.failUnlessEqual(len(flowcells), 0)
88
89         # step two link the flowcell
90         flowcell = FlowCellFactory(flowcell_id='22TWOAAXX')
91         link_url = reverse('link_flowcell_and_device',
92                            args=(flowcell.flowcell_id,
93                                  item.barcode_id))
94         link_response = self.client.get(link_url)
95         self.assertEqual(link_response.status_code, 200)
96
97         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
98         flowcell_url = reverse('flowcell_detail',
99                                kwargs={'flowcell_id': flowcell.flowcell_id})
100         self.assertEqual(len(flowcells), 1)
101         self.assertTrue(flowcells[0].endswith(flowcell_url))
102
103     def test_add_disk_failed_flowcell(self):
104         item = ItemFactory()
105         url = reverse('itemtype_index', kwargs={'name': item.item_type.name})
106         disk_url = reverse('item_summary_by_uuid', kwargs={'uuid': item.uuid})
107         indexNode = localhostNode(url)
108         diskNode = localhostNode(disk_url)
109
110         self.assertTrue(self.client.login(username=self.user.username,
111                                           password=self.password))
112
113         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
114         self.failUnlessEqual(len(flowcells), 0)
115
116         # step two link the flowcell
117         flowcell_id = '33THRAAXX'
118         flowcell = FlowCellFactory(flowcell_id=flowcell_id +' (failed)')
119         link_url = reverse('link_flowcell_and_device',
120                            args=(flowcell.flowcell_id, item.barcode_id))
121         link_response = self.client.get(link_url)
122         self.failUnlessEqual(link_response.status_code, 200)
123
124         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
125         self.assertEqual(len(flowcells), 1)
126         flowcell_url = reverse('flowcell_detail',
127                                kwargs={'flowcell_id': flowcell_id})
128         self.assertTrue(flowcells[0].endswith(flowcell_url))
129
130
131     def get_flowcells_from_content(self, url, rootNode, diskNode):
132         model = Graph()
133
134         response = self.client.get(url)
135         self.failUnlessEqual(response.status_code, 200)
136
137         content = smart_text(response.content)
138         model.parse(data=content, format='rdfa', publicID=rootNode)
139         targets = model.objects(diskNode, libraryOntology['flowcell_id'])
140         flowcells = [ str(x) for x in targets]
141         return flowcells
142
143 def suite():
144     from unittest import TestSuite, defaultTestLoader
145     suite = TestSuite()
146     suite.addTests(defaultTestLoader.loadTestsFromTestCase(InventoryTestCase))
147     return suite
148
149 if __name__ == "__main__":
150     from unittest import main
151     main(defaultTest="suite")