import os
import sys
-import RDF
+from rdflib import ConjunctiveGraph, BNode, Literal, URIRef
+from rdflib.plugins.sparql import prepareQuery
from htsworkflow.util.rdfns import *
from htsworkflow.util.rdfhelp import SCHEMAS_URL
Provides a few default rules as methods starting with _rule_
"""
def __init__(self, model):
+ if not isinstance(model, ConjunctiveGraph):
+ raise ValueError("Inferences require a ConjunctiveGraph")
+
self.model = model
- self._context = RDF.Node(RDF.Uri(INFER_URL))
+ self._context = URIRef(INFER_URL)
def think(self, max_iterations=None):
?alias a ?class .
?obj a ?alias .
}"""
- query = RDF.SPARQLQuery(body)
- for r in query.execute(self.model):
- s = RDF.Statement(r['obj'], rdfNS['type'], r['class'])
+ for r in self.model.query(body):
+ s = (r['obj'], RDF['type'], r['class'], self._context)
if s not in self.model:
- self.model.append(s, self._context)
+ self.model.add(s)
def _rule_subclass(self):
"""A subclass is a parent class
?subclass rdfs:subClassOf ?parent .
?obj a ?subclass .
}"""
- query = RDF.SPARQLQuery(body)
- for r in query.execute(self.model):
- s = RDF.Statement(r['obj'], rdfNS['type'], r['parent'])
+ for r in self.model.query(body):
+ s = (r['obj'], RDF['type'], r['parent'], self._context)
if s not in self.model:
- self.model.append(s, self._context)
+ self.model.add(s)
def _rule_inverse_of(self):
"""Add statements computed with inverseOf
?reverse rdfs:domain ?object_type ;
rdfs:range ?subject_type .
}"""
- query = RDF.SPARQLQuery(body)
-
- statements = []
- for r in query.execute(self.model):
- s = RDF.Statement(r['o'], r['reverse'], r['s'])
+ for r in self.model.query(body):
+ s = (r['o'], r['reverse'], r['s'], self._context)
if s not in self.model:
- self.model.append(s, self._context)
-
+ self.model.add(s)
def _validate_types(self):
body = """
FILTER(?predicate != xhtmlv:stylesheet)
}
"""
- query = RDF.SPARQLQuery(body)
errmsg = "Missing type for: {0}"
- for r in query.execute(self.model):
- yield errmsg.format(str(r['subject']))
+ for r in self.model.query(body):
+ yield errmsg.format(str(r[0]))
def _validate_undefined_properties(self):
"""Find properties that aren't defined.
OPTIONAL { ?predicate a ?predicate_class }
FILTER(!bound(?predicate_class))
}"""
- query = RDF.SPARQLQuery(body)
msg = "Undefined property in {0} {1} {2}"
- for r in query.execute(self.model):
- yield msg.format(str(r['subject']),
- str(r['predicate']),
- str(r['object']))
+ for r in self.model.query(body):
+ yield msg.format(r['subject'],
+ r['predicate'],
+ r['object'])
def _validate_property_types(self):
"""Find resources that don't have a type
"""
- property_template = """
+ property_query = prepareQuery("""
prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>
- select ?type
- where {{
- <{predicate}> a rdf:Property ;
- {space} ?type .
- }}"""
+ select ?type ?predicate
+ where {
+ ?predicate a rdf:Property ;
+ ?space ?type .
+ }""")
def check_node_space(node, predicate, space, errmsg):
"""Check that a node conforms to it's allowable space of types.
resource_error = "Expected resource for {0} in range {1}"
type_error = "Type of {0} was {1} not {2}"
# check domain
- query = RDF.SPARQLQuery(property_template.format(
- predicate=predicate.uri,
- space=space))
seen = set()
- for r in query.execute(self.model):
+ errors = []
+ for i, r in enumerate(self.model.query(property_query,
+ initBindings={
+ 'predicate': predicate,
+ 'space': space})):
# Make sure we have a resource if we're expecting one
- if r['type'] == rdfsNS['Resource']:
- if node.is_literal():
- return resource_error.format(str(node), space)
- continue
- seen.add(str(r['type'].uri))
- if node.is_literal():
- # literal is a generic type.
- nodetype = node.literal_value['datatype']
- if nodetype is None:
- # lets default to string
- nodetype = xsdNS['string'].uri
- if r['type'] == rdfsNS['Literal']:
- pass
- elif nodetype != r['type'].uri:
- return type_error.format(
- str(node), nodetype, r['type'])
- # check that node is the expetected class type
- check = RDF.Statement(node, rdfNS['type'], r['type'])
- if self.model.contains_statement(check):
- return
-
- # need the seen check, because we're surpressing checking
- # rdfs:Resource types
- if len(seen) > 0:
- return errmsg + ",".join(seen)
-
+ expected_type = r['type']
+
+ if isinstance(node, Literal):
+ if expected_type == RDFS['Literal']:
+ return []
+ elif node.datatype == expected_type:
+ return []
+ else:
+ # not currently handling type hierarchy.
+ # a integer could pass a range of decimal for instance.
+ errors.append(
+ "Type error: {} was type {}, expected {}".format(
+ str(node),
+ str(node.datatype),
+ str(expected_type)))
+ elif expected_type == RDFS['Resource']:
+ if isinstance(node, Literal):
+ errors.append(resource_error.format(str(node), space))
+ else:
+ return []
+ else:
+ check = (node, RDF['type'], expected_type)
+ if check not in self.model:
+ errors.append(errmsg + str(node) + ' was not a ' + str(expected_type))
+ else:
+ return []
+
+ return errors
+ ### End nested function
wrong_domain_type = "Domain of {0} was not in:"
wrong_range_type = "Range of {0} was not in:"
count = 0
- schema = RDF.Node(RDF.Uri(SCHEMAS_URL))
- for s, context in self.model.as_stream_context():
+ schema = ConjunctiveGraph(identifier=SCHEMAS_URL)
+ for subject, predicate, obj, context in self.model.quads():
+ stmt = (subject, predicate, obj)
+
if context == schema:
continue
# check domain
- msg = check_node_space(s.subject, s.predicate, 'rdfs:domain',
- wrong_domain_type.format(str(s)))
- if msg is not None: yield msg
+ for error in check_node_space(subject, predicate, RDFS.domain,
+ wrong_domain_type.format(str(stmt))):
+ yield error
# check range
- msg = check_node_space(s.object, s.predicate, 'rdfs:range',
- wrong_range_type.format(str(s)))
- if msg is not None: yield msg
- return
+ for error in check_node_space(obj, predicate, RDFS.range,
+ wrong_range_type.format(str(stmt))):
+ yield error
from unittest import TestCase
-import RDF
+from rdflib import ConjunctiveGraph, BNode, Literal, Namespace, URIRef
+from rdflib.plugins.sparql import prepareQuery
-from htsworkflow.util.rdfhelp import get_model, \
- add_default_schemas, add_schema, load_string_into_model, dump_model
+from htsworkflow.util.rdfhelp import \
+ add_default_schemas, load_string_into_model, dump_model
from htsworkflow.util.rdfns import *
from htsworkflow.util.rdfinfer import Infer
-foafNS = RDF.NS('http://xmlns.com/foaf/0.1/')
+from rdflib.namespace import FOAF
MINI_FOAF_ONTOLOGY = """
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
class TestInfer(TestCase):
def setUp(self):
- self.model = get_model()
+ self.model = ConjunctiveGraph()
add_default_schemas(self.model)
- load_string_into_model(self.model, 'turtle', MINI_FOAF_ONTOLOGY)
+ self.model.parse(data=MINI_FOAF_ONTOLOGY, format='turtle')
def test_class(self):
- fooNS = RDF.NS('http://example.org/')
- load_string_into_model(self.model, 'turtle', FOAF_DATA)
+ fooNS = Namespace('http://example.org/')
+ self.model.parse(data=FOAF_DATA, format='turtle')
inference = Infer(self.model)
- s = RDF.Statement(fooNS['me.jpg'], rdfNS['type'], rdfsNS['Class'])
- found = list(self.model.find_statements(s))
+ s = [fooNS['me.jpg'], RDF['type'], RDFS['Class']]
+ found = list(self.model.triples(s))
self.assertEqual(len(found), 0)
inference._rule_class()
- s = RDF.Statement(fooNS['me.jpg'], rdfNS['type'], rdfsNS['Class'])
- found = list(self.model.find_statements(s))
+ s = [fooNS['me.jpg'], RDF['type'], RDFS['Class']]
+ found = list(self.model.triples(s))
self.assertEqual(len(found), 1)
def test_inverse_of(self):
- fooNS = RDF.NS('http://example.org/')
- load_string_into_model(self.model, 'turtle', FOAF_DATA)
+ fooNS = Namespace('http://example.org/')
+ self.model.parse(data=FOAF_DATA, format='turtle')
inference = Infer(self.model)
- depiction = RDF.Statement(None,
- foafNS['depiction'],
- fooNS['me.jpg'])
- size = self.model.size()
- found_statements = list(self.model.find_statements(depiction))
+ depiction = (None, FOAF['depiction'], fooNS['me.jpg'])
+ size = len(self.model)
+ found_statements = list(self.model.triples(depiction))
self.assertEqual(len(found_statements), 0)
inference._rule_inverse_of()
- found_statements = list(self.model.find_statements(depiction))
+ found_statements = list(self.model.triples(depiction))
self.assertEqual(len(found_statements), 1)
# we should've added one statement.
- self.assertEqual(self.model.size(), size + 1)
+ self.assertEqual(len(self.model), size + 1)
- size = self.model.size()
+ size = len(self.model)
inference._rule_inverse_of()
# we should already have both versions in our model
- self.assertEqual(self.model.size(), size)
+ self.assertEqual(len(self.model), size)
def test_validate_types(self):
- fooNS = RDF.NS('http://example.org/')
- load_string_into_model(self.model, 'turtle', FOAF_DATA)
+ fooNS = Namespace('http://example.org/')
+ self.model.parse(data=FOAF_DATA, format='turtle')
inference = Infer(self.model)
errors = list(inference._validate_types())
self.assertEqual(len(errors), 0)
- s = RDF.Statement(fooNS['document'],
- dcNS['title'],
- RDF.Node("bleem"))
- self.model.append(s)
+ s = (fooNS['document'], DC['title'], Literal("bleem"))
+ self.model.add(s)
errors = list(inference._validate_types())
self.assertEqual(len(errors), 1)
- def test_validate_undefined_properties(self):
- fooNS = RDF.NS('http://example.org/')
+ def test_validate_undefined_properties_in_schemas(self):
+ fooNS = Namespace('http://example.org/')
inference = Infer(self.model)
errors = list(inference._validate_undefined_properties())
self.assertEqual(len(errors), 0)
- load_string_into_model(self.model, 'turtle', FOAF_DATA)
+ def test_validate_undefined_properties_in_inference(self):
+ fooNS = Namespace('http://example.org/')
+ foafNS = Namespace('http://xmlns.com/foaf/0.1/')
+ self.model.parse(data=FOAF_DATA, format='turtle')
+
+ inference = Infer(self.model)
errors = list(inference._validate_undefined_properties())
self.assertEqual(len(errors), 2)
-
- def test_validate_undefined_properties(self):
- fooNS = RDF.NS('http://example.org/')
- foafNS = RDF.NS('http://xmlns.com/foaf/0.1/')
- load_string_into_model(self.model, 'turtle', FOAF_DATA)
inference = Infer(self.model)
-
errors = list(inference._validate_property_types())
self.assertEqual(len(errors), 0)
- s = RDF.Statement(fooNS['me.jpg'],
- foafNS['firstName'],
- RDF.Node("name"))
- self.model.append(s)
+ s = (fooNS['me.jpg'], FOAF['firstName'], Literal("name"))
+ self.model.add(s)
errors = list(inference._validate_property_types())
self.assertEqual(len(errors), 1)
startswith = 'Domain of '
self.assertTrue('http://example.org/me.jpg' in errors[0])
endswith = 'http://xmlns.com/foaf/0.1/Person'
self.assertEqual(errors[0][-len(endswith):], endswith)
- del self.model[s]
+ self.model.remove(s)
errors = list(inference._validate_property_types())
self.assertEqual(len(errors), 0)
- s = RDF.Statement(fooNS['foo.txt'], rdfNS['type'], foafNS['Document'])
- self.model.append(s)
- s = RDF.Statement(fooNS['me.jpg'],
- foafNS['depicts'],
- foafNS['foo.txt'])
- self.model.append(s)
+ s = (fooNS['foo.txt'], RDF['type'], FOAF['Document'])
+ self.model.add(s)
+ s = (fooNS['me.jpg'], FOAF['depicts'], FOAF['foo.txt'])
+ self.model.add(s)
errors = list(inference._validate_property_types())
self.assertEqual(len(errors), 1)
self.assertTrue('http://example.org/me.jpg' in errors[0])
endswith = 'http://www.w3.org/2002/07/owl#Thing'
self.assertEqual(errors[0][-len(endswith):], endswith)
- del self.model[s]
+ self.model.remove(s)
def test_property_multiple_domain_types(self):
"""Can we process a property with multiple domain types?
bar:subject a bar:ABarClass ;
foo:aprop foo:object .
"""
- load_string_into_model(self.model, 'turtle', turtle)
+ self.model.parse(data=turtle, format='turtle')
inference = Infer(self.model)
errmsg = list(inference._validate_property_types())