convert to unicode_literals
[htsworkflow.git] / inventory / test_inventory.py
1 from __future__ import absolute_import, print_function, unicode_literals
2
3 import RDF
4
5 from django.test import TestCase
6 from django.test.utils import setup_test_environment, \
7      teardown_test_environment
8 from django.db import connection
9 from django.conf import settings
10
11 from django.contrib.auth.models import User
12 from django.core.urlresolvers import reverse
13
14 from .models import Item, Vendor
15 from .inventory_factory import ItemFactory, LongTermStorageFactory
16 from samples.samples_factory import HTSUserFactory, LibraryFactory
17 from experiments.experiments_factory import FlowCellFactory
18 from htsworkflow.util.rdfhelp import get_model, load_string_into_model, get_serializer, inventoryOntology, libraryOntology, fromTypedNode
19
20 def localhostNode(url):
21     return RDF.Node(RDF.Uri('http://localhost%s' % (url,)))
22
23 class InventoryTestCase(TestCase):
24     def setUp(self):
25         self.password = 'foo'
26         self.user = HTSUserFactory.create()
27         self.user.set_password(self.password)
28         self.user.save()
29
30     def test_item(self):
31         item = ItemFactory()
32         self.assertTrue(len(item.uuid), 32)
33         url = '/inventory/{}/'.format(item.uuid)
34         self.assertTrue(self.client.login(username=self.user.username,
35                                           password=self.password))
36         response = self.client.get(url)
37         self.failUnlessEqual(response.status_code, 200)
38
39         model = get_model()
40         load_string_into_model(model, 'rdfa', response.content, url)
41
42         itemNode = RDF.Node(RDF.Uri(url))
43         item_type = fromTypedNode(
44             model.get_target(itemNode, inventoryOntology[b'item_type']))
45         self.failUnlessEqual(item_type, item.item_type.name)
46
47     def test_itemindex(self):
48         item = ItemFactory()
49         fc1 = FlowCellFactory()
50         lib1 = LibraryFactory()
51         lts = LongTermStorageFactory(flowcell=fc1,
52                                      libraries=[lib1,],
53                                      storage_devices=[item,],)
54
55         url = reverse('inventory.views.itemtype_index',
56                       kwargs={'name': item.item_type.name})
57         disk_url = reverse('inventory.views.item_summary_by_uuid',
58                            kwargs={'uuid': item.uuid})
59         indexNode = localhostNode(url)
60         diskNode = localhostNode(disk_url)
61         self.assertTrue(self.client.login(username=self.user.username,
62                                           password=self.password))
63
64         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
65         self.failUnlessEqual(len(flowcells), 1)
66         flowcell_url = reverse('experiments.views.flowcell_detail',
67                                kwargs={'flowcell_id': fc1.flowcell_id})
68         self.assertTrue(flowcells[0].endswith(flowcell_url))
69
70
71     def test_add_disk(self):
72         item = ItemFactory()
73         url = reverse('inventory.views.itemtype_index',
74                       kwargs={'name': item.item_type.name})
75         disk_url = reverse('inventory.views.item_summary_by_uuid',
76                            kwargs={'uuid': item.uuid})
77         indexNode = localhostNode(url)
78         diskNode = localhostNode(disk_url)
79
80         self.assertTrue(self.client.login(username=self.user.username,
81                                           password=self.password))
82
83         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
84         self.failUnlessEqual(len(flowcells), 0)
85
86         # step two link the flowcell
87         flowcell = FlowCellFactory(flowcell_id='22TWOAAXX')
88         link_url = reverse('inventory.views.link_flowcell_and_device',
89                            args=(flowcell.flowcell_id,
90                                  item.barcode_id))
91         link_response = self.client.get(link_url)
92         self.assertEqual(link_response.status_code, 200)
93
94         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
95         flowcell_url = reverse('experiments.views.flowcell_detail',
96                                kwargs={'flowcell_id': flowcell.flowcell_id})
97         self.assertEqual(len(flowcells), 1)
98         self.assertTrue(flowcells[0].endswith(flowcell_url))
99
100     def test_add_disk_failed_flowcell(self):
101         item = ItemFactory()
102         url = reverse('inventory.views.itemtype_index',
103                       kwargs={'name': item.item_type.name})
104         disk_url = reverse('inventory.views.item_summary_by_uuid',
105                            kwargs={'uuid': item.uuid})
106         indexNode = localhostNode(url)
107         diskNode = localhostNode(disk_url)
108
109         self.assertTrue(self.client.login(username=self.user.username,
110                                           password=self.password))
111
112         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
113         self.failUnlessEqual(len(flowcells), 0)
114
115         # step two link the flowcell
116         flowcell_id = '33THRAAXX'
117         flowcell = FlowCellFactory(flowcell_id=flowcell_id +' (failed)')
118         link_url = reverse('inventory.views.link_flowcell_and_device',
119                            args=(flowcell.flowcell_id, item.barcode_id))
120         link_response = self.client.get(link_url)
121         self.failUnlessEqual(link_response.status_code, 200)
122
123         flowcells = self.get_flowcells_from_content(url, indexNode, diskNode)
124         self.assertEqual(len(flowcells), 1)
125         flowcell_url = reverse('experiments.views.flowcell_detail',
126                                kwargs={'flowcell_id': flowcell_id})
127         self.assertTrue(flowcells[0].endswith(flowcell_url))
128
129
130     def get_flowcells_from_content(self, url, rootNode, diskNode):
131         model = get_model()
132
133         response = self.client.get(url)
134         self.failUnlessEqual(response.status_code, 200)
135
136         load_string_into_model(model, 'rdfa', response.content, rootNode.uri)
137         targets = model.get_targets(diskNode, libraryOntology['flowcell_id'])
138         flowcells = [ str(x.uri) for x in targets]
139         return flowcells
140
141 def suite():
142     from unittest import TestSuite, defaultTestLoader
143     suite = TestSuite()
144     suite.addTests(defaultTestLoader.loadTestsFromTestCase(InventoryTestCase))
145     return suite
146
147 if __name__ == "__main__":
148     from unittest import main
149     main(defaultTest="suite")