43d091101fd016b8ab1fbbf39936f3b97d59336a
[htsworkflow.git] / inventory / test_inventory.py
1 from __future__ import absolute_import, print_function, unicode_literals
2
3 import RDF
4
5 from django.test import TestCase
6 from django.test.utils import setup_test_environment, \
7      teardown_test_environment
8 from django.db import connection
9 from django.conf import settings
10
11 from django.contrib.auth.models import User
12 from django.core.urlresolvers import reverse
13 from django.utils.encoding import smart_text
14
15 from .models import Item, Vendor
16 from .inventory_factory import ItemFactory, LongTermStorageFactory
17 from samples.samples_factory import HTSUserFactory, LibraryFactory
18 from experiments.experiments_factory import FlowCellFactory
19 from htsworkflow.util.rdfhelp import get_model, load_string_into_model, get_serializer, inventoryOntology, libraryOntology, fromTypedNode
20
21 def localhostNode(url):
22     return RDF.Node(RDF.Uri('http://localhost%s' % (url,)))
23
24 class InventoryTestCase(TestCase):
25     def setUp(self):
26         self.password = 'foo'
27         self.user = HTSUserFactory.create()
28         self.user.set_password(self.password)
29         self.user.save()
30
31     def test_item(self):
32         item = ItemFactory()
33         self.assertTrue(len(item.uuid), 32)
34         url = '/inventory/{}/'.format(item.uuid)
35         self.assertTrue(self.client.login(username=self.user.username,
36                                           password=self.password))
37         response = self.client.get(url)
38         self.failUnlessEqual(response.status_code, 200)
39
40         model = get_model()
41         load_string_into_model(model, 'rdfa', smart_text(response.content), url)
42
43         itemNode = RDF.Node(RDF.Uri(url))
44         item_type = fromTypedNode(
45             model.get_target(itemNode, inventoryOntology[b'item_type']))
46         self.failUnlessEqual(item_type, item.item_type.name)
47
48     def test_itemindex(self):
49         item = ItemFactory()
50         fc1 = FlowCellFactory()
51         lib1 = LibraryFactory()
52         lts = LongTermStorageFactory(flowcell=fc1,
53                                      libraries=[lib1,],
54                                      storage_devices=[item,],)
55
56         url = reverse('inventory.views.itemtype_index',
57                       kwargs={'name': item.item_type.name})
58         disk_url = reverse('inventory.views.item_summary_by_uuid',
59                            kwargs={'uuid': item.uuid})
60         indexNode = localhostNode(url)
61         diskNode = localhostNode(disk_url)
62         self.assertTrue(self.client.login(username=self.user.username,
63                                           password=self.password))
64
65         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
66         self.failUnlessEqual(len(flowcells), 1)
67         flowcell_url = reverse('experiments.views.flowcell_detail',
68                                kwargs={'flowcell_id': fc1.flowcell_id})
69         self.assertTrue(flowcells[0].endswith(flowcell_url))
70
71
72     def test_add_disk(self):
73         item = ItemFactory()
74         url = reverse('inventory.views.itemtype_index',
75                       kwargs={'name': item.item_type.name})
76         disk_url = reverse('inventory.views.item_summary_by_uuid',
77                            kwargs={'uuid': item.uuid})
78         indexNode = localhostNode(url)
79         diskNode = localhostNode(disk_url)
80
81         self.assertTrue(self.client.login(username=self.user.username,
82                                           password=self.password))
83
84         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
85         self.failUnlessEqual(len(flowcells), 0)
86
87         # step two link the flowcell
88         flowcell = FlowCellFactory(flowcell_id='22TWOAAXX')
89         link_url = reverse('inventory.views.link_flowcell_and_device',
90                            args=(flowcell.flowcell_id,
91                                  item.barcode_id))
92         link_response = self.client.get(link_url)
93         self.assertEqual(link_response.status_code, 200)
94
95         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
96         flowcell_url = reverse('experiments.views.flowcell_detail',
97                                kwargs={'flowcell_id': flowcell.flowcell_id})
98         self.assertEqual(len(flowcells), 1)
99         self.assertTrue(flowcells[0].endswith(flowcell_url))
100
101     def test_add_disk_failed_flowcell(self):
102         item = ItemFactory()
103         url = reverse('inventory.views.itemtype_index',
104                       kwargs={'name': item.item_type.name})
105         disk_url = reverse('inventory.views.item_summary_by_uuid',
106                            kwargs={'uuid': item.uuid})
107         indexNode = localhostNode(url)
108         diskNode = localhostNode(disk_url)
109
110         self.assertTrue(self.client.login(username=self.user.username,
111                                           password=self.password))
112
113         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
114         self.failUnlessEqual(len(flowcells), 0)
115
116         # step two link the flowcell
117         flowcell_id = '33THRAAXX'
118         flowcell = FlowCellFactory(flowcell_id=flowcell_id +' (failed)')
119         link_url = reverse('inventory.views.link_flowcell_and_device',
120                            args=(flowcell.flowcell_id, item.barcode_id))
121         link_response = self.client.get(link_url)
122         self.failUnlessEqual(link_response.status_code, 200)
123
124         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
125         self.assertEqual(len(flowcells), 1)
126         flowcell_url = reverse('experiments.views.flowcell_detail',
127                                kwargs={'flowcell_id': flowcell_id})
128         self.assertTrue(flowcells[0].endswith(flowcell_url))
129
130
131     def get_flowcells_from_content(self, url, rootNode, diskNode):
132         model = get_model()
133
134         response = self.client.get(url)
135         self.failUnlessEqual(response.status_code, 200)
136
137         load_string_into_model(model, 'rdfa', response.content, rootNode.uri)
138         targets = model.get_targets(diskNode, libraryOntology['flowcell_id'])
139         flowcells = [ str(x.uri) for x in targets]
140         return flowcells
141
142 def suite():
143     from unittest import TestSuite, defaultTestLoader
144     suite = TestSuite()
145     suite.addTests(defaultTestLoader.loadTestsFromTestCase(InventoryTestCase))
146     return suite
147
148 if __name__ == "__main__":
149     from unittest import main
150     main(defaultTest="suite")