import types
from pkg_resources import resource_listdir, resource_string
+from rdflib import ConjunctiveGraph, Graph, Literal, BNode, URIRef, Namespace
+from rdflib.namespace import ClosedNamespace
+
import lxml.html
import lxml.html.clean
-import RDF
logger = logging.getLogger(__name__)
context = Context({'results': results,})
print(template.render(context))
-def blankOrUri(value=None):
- """Return a blank node for None or a resource node for strings.
- """
- node = None
- if value is None:
- node = RDF.Node()
- elif isinstance(value, six.string_types):
- node = RDF.Node(uri_string=value)
- elif isinstance(value, RDF.Node):
- node = value
-
- return node
-
-
-def toTypedNode(value, language="en"):
- """Convert a python variable to a RDF Node with its closest xsd type
- """
- if isinstance(value, bool):
- value_type = xsdNS['boolean'].uri
- if value:
- value = u'1'
- else:
- value = u'0'
- elif isinstance(value, int):
- value_type = xsdNS['decimal'].uri
- value = str(value)
- elif isinstance(value, float):
- value_type = xsdNS['float'].uri
- value = str(value)
- elif isinstance(value, datetime):
- value_type = xsdNS['dateTime'].uri
- if value.microsecond == 0:
- value = value.strftime(ISOFORMAT_SHORT)
- else:
- value = value.strftime(ISOFORMAT_MS)
- else:
- value_type = None
- if six.PY3:
- value = str(value)
- else:
- value = unicode(value).encode('utf-8')
-
- if value_type is not None:
- node = RDF.Node(literal=value, datatype=value_type)
- else:
- node = RDF.Node(literal=value, language=language)
- return node
-
-
-def fromTypedNode(node):
- """Convert a typed RDF Node to its closest python equivalent
- """
- if not isinstance(node, RDF.Node):
- return node
- if node.is_resource():
- return node
-
- value_type = get_node_type(node)
- literal = node.literal_value['string']
- literal_lower = literal.lower()
-
- if value_type == 'boolean':
- if literal_lower in ('1', 'yes', 'true'):
- return True
- elif literal_lower in ('0', 'no', 'false'):
- return False
- else:
- raise ValueError("Unrecognized boolean %s" % (literal,))
- elif value_type == 'integer':
- return int(literal)
- elif value_type == 'decimal' and literal.find('.') == -1:
- return int(literal)
- elif value_type in ('decimal', 'float', 'double'):
- return float(literal)
- elif value_type in ('string'):
- return literal
- elif value_type in ('dateTime'):
- try:
- return datetime.strptime(literal, ISOFORMAT_MS)
- except ValueError:
- return datetime.strptime(literal, ISOFORMAT_SHORT)
- return literal
-
def get_node_type(node):
"""Return just the base name of a XSD datatype:
e.g. http://www.w3.org/2001/XMLSchema#integer -> integer
"""
# chop off xml schema declaration
- value_type = node.literal_value['datatype']
+ value_type = node.datatype
if value_type is None:
return "string"
else:
- value_type = str(value_type)
- return value_type.replace(str(xsdNS[''].uri), '')
+ return value_type.replace(str(XSD), '').lower()
def simplify_rdf(value):
"""Return a short name for a RDF object
e.g. The last part of a URI or an untyped string.
"""
- if isinstance(value, RDF.Node):
- if value.is_resource():
- name = simplify_uri(str(value.uri))
- elif value.is_blank():
- name = '<BLANK>'
- else:
- name = value.literal_value['string']
- elif isinstance(value, RDF.Uri):
+ if isinstance(value, Literal):
+ name = value.value
+ elif isinstance(value, BNode):
+ name = '<BLANK>'
+ elif isinstance(value, URIRef):
name = split_uri(str(value))
else:
name = value
>>> simplify_uri('http://asdf.org/foo/bar?was=foo')
'was=foo'
"""
- if isinstance(uri, RDF.Node):
- if uri.is_resource():
- uri = uri.uri
- else:
- raise ValueError("Can't simplify an RDF literal")
- if isinstance(uri, RDF.Uri):
- uri = str(uri)
+ if isinstance(uri, Literal) and uri.datatype not in (XSD.anyURI,):
+ raise ValueError("Literal terms must be of URI type")
+
+ uri = str(uri)
parsed = urllib.parse.urlparse(uri)
if len(parsed.query) > 0:
returns None if they aren't in common
"""
- if isinstance(term, RDF.Node):
- if term.is_resource():
- term = term.uri
- else:
- raise ValueError("This works on resources")
- elif not isinstance(term, RDF.Uri):
- raise ValueError("This works on resources")
- term_s = str(term)
- if not term_s.startswith(namespace._prefix):
- return None
- return term_s.replace(namespace._prefix, "")
+ if not isinstance(namespace, (URIRef, Namespace, ClosedNamespace)):
+ raise ValueError("Requires a URIRef namespace")
+ if isinstance(term, Literal) and term.datatype not in (XSD.anyURI,):
+ raise ValueError("Term literals must be a URI type")
+ elif not isinstance(term, URIRef):
+ raise ValueError("Term must be a URI type")
-def get_model(model_name=None, directory=None, use_contexts=True):
- if directory is None:
- directory = os.getcwd()
-
- contexts = 'yes' if use_contexts else 'no'
-
- if model_name is None:
- storage = RDF.MemoryStorage(options_string="contexts='{}'".format(contexts))
- logger.info("Using RDF Memory model")
- else:
- options = "contexts='{0}',hash-type='bdb',dir='{1}'".format(contexts, directory)
- storage = RDF.HashStorage(model_name,
- options=options)
- logger.info("Using {0} with options {1}".format(model_name, options))
- model = RDF.Model(storage)
- return model
+ term_s = str(term)
+ if not term_s.startswith(str(namespace)):
+ return None
+ return term_s.replace(str(namespace), "")
def load_into_model(model, parser_name, path, ns=None):
if isinstance(ns, six.string_types):
- ns = RDF.Uri(ns)
+ ns = URIRef(ns)
- if isinstance(path, RDF.Node):
- if path.is_resource():
- path = str(path.uri)
- else:
- raise ValueError("url to load can't be a RDF literal")
+ if isinstance(path, URIRef):
+ path = str(path)
url_parts = list(urllib.parse.urlparse(path))
if len(url_parts[0]) == 0 or url_parts[0] == 'file':
url = urllib.parse.urlunparse(url_parts)
logger.info("Opening {0} with parser {1}".format(url, parser_name))
- rdf_parser = RDF.Parser(name=parser_name)
-
- statements = []
- retries = 3
- succeeded = False
- while retries > 0:
- try:
- retries -= 1
- statements = rdf_parser.parse_as_stream(url, ns)
- retries = 0
- succeeded = True
- except RDF.RedlandError as e:
- errmsg = "RDF.RedlandError: {0} {1} tries remaining"
- logger.error(errmsg.format(str(e), retries))
+ model.parse(url, format=parser_name, publicID=ns)
- if not succeeded:
- logger.warn("Unable to download %s", url)
-
- for s in statements:
- conditionally_add_statement(model, s, ns)
def load_string_into_model(model, parser_name, data, ns=None):
ns = fixup_namespace(ns)
logger.debug("load_string_into_model parser={0}, len={1}".format(
parser_name, len(data)))
- rdf_parser = RDF.Parser(name=str(parser_name))
- for s in rdf_parser.parse_string_as_stream(data, ns):
- conditionally_add_statement(model, s, ns)
+ model.parse(data=data, format=parser_name, publicID=ns)
+ add_imports(model, ns)
def fixup_namespace(ns):
if ns is None:
- ns = RDF.Uri("http://localhost/")
+ ns = URIRef("http://localhost/")
elif isinstance(ns, six.string_types):
- ns = RDF.Uri(ns)
- elif not(isinstance(ns, RDF.Uri)):
+ ns = URIRef(ns)
+ elif not(isinstance(ns, URIRef)):
errmsg = "Namespace should be string or uri not {0}"
raise ValueError(errmsg.format(str(type(ns))))
return ns
-def conditionally_add_statement(model, s, ns):
- imports = owlNS['imports']
- if s.predicate == imports:
- obj = str(s.object)
- logger.info("Importing %s" % (obj,))
- load_into_model(model, None, obj, ns)
- if s.object.is_literal():
- value_type = get_node_type(s.object)
- if value_type == 'string':
- s.object = sanitize_literal(s.object)
- model.add_statement(s)
-
-
+def add_imports(model, ns):
+ for s, p, o in model.triples((None, OWL.imports, None)):
+ if p == OWL.imports:
+ model.remove((s, p, o))
+ load_into_model(model, None, o, ns)
+
def add_default_schemas(model, schema_path=None):
"""Add default schemas to a model
Looks for turtle files in either htsworkflow/util/schemas
or in the list of directories provided in schema_path
"""
-
schemas = resource_listdir(__name__, 'schemas')
for s in schemas:
schema = resource_string(__name__, 'schemas/' + s)
Main difference from 'load_into_model' is it tags it with
a RDFlib context so I can remove them later.
"""
- parser = RDF.Parser(name='turtle')
- context = RDF.Node(RDF.Uri(SCHEMAS_URL))
- for s in parser.parse_string_as_stream(schema, url):
- try:
- model.append(s, context)
- except RDF.RedlandError as e:
- logger.error("%s with %s", str(e), str(s))
+ if not isinstance(model, ConjunctiveGraph):
+ raise ValueError("Schemas requires a graph that supports quads")
+ context = URIRef(SCHEMAS_URL)
+ tmpmodel = Graph()
+ tmpmodel.parse(data=schema, format='turtle', publicID=url)
+ for s, p, o in tmpmodel:
+ model.add((s, p, o, context))
def remove_schemas(model):
"""Remove statements labeled with our schema context"""
- context = RDF.Node(RDF.Uri(SCHEMAS_URL))
- model.context_remove_statements(context)
-
+ context = URIRef(SCHEMAS_URL)
+ for quad in model.triples((None, None, None, context)):
+ model.remove(quad)
+ #model.remove_context(context)
def sanitize_literal(node):
"""Clean up a literal string
"""
- if not isinstance(node, RDF.Node):
- raise ValueError("sanitize_literal only works on RDF.Nodes")
+ if not isinstance(node, Literal):
+ raise ValueError("sanitize_literal only works on Literals")
- s = node.literal_value['string']
+ s = node.value
if len(s) > 0:
element = lxml.html.fromstring(s)
cleaner = lxml.html.clean.Cleaner(page_structure=False)
p_len = 3
slash_p_len = 4
- args = {'literal': text[p_len:-slash_p_len]}
+ value = text[p_len:-slash_p_len]
else:
- args = {'literal': ''}
- datatype = node.literal_value['datatype']
- if datatype is not None:
- args['datatype'] = datatype
- language = node.literal_value['language']
- if language is not None:
- args['language'] = language
- return RDF.Node(**args)
+ value = ''
+ args = {}
+ if node.datatype is not None:
+ args['datatype'] = node.datatype
+ if node.language is not None:
+ args['lang'] = node.language
+ return Literal(value, **args)
def guess_parser(content_type, pathname):
return 'turtle'
return 'guess'
-def get_serializer(name='turtle'):
+def add_default_namespaces(model):
"""Return a serializer with our standard prefixes loaded
"""
- writer = RDF.Serializer(name=name)
- # really standard stuff
- writer.set_namespace('rdf', rdfNS._prefix)
- writer.set_namespace('rdfs', rdfsNS._prefix)
- writer.set_namespace('owl', owlNS._prefix)
- writer.set_namespace('dc', dcNS._prefix)
- writer.set_namespace('xml', xmlNS._prefix)
- writer.set_namespace('xsd', xsdNS._prefix)
- writer.set_namespace('vs', vsNS._prefix)
- writer.set_namespace('wot', wotNS._prefix)
+ model.bind('rdf', RDF)
+ model.bind('rdfs', RDFS)
+ model.bind('owl', OWL)
+ model.bind('dc', DC)
+ model.bind('xml', XML)
+ model.bind('xsd', XSD)
+ model.bind('vs', VS)
+ model.bind('wot', WOT)
# should these be here, kind of specific to an application
- writer.set_namespace('htswlib', libraryOntology._prefix)
- writer.set_namespace('ucscSubmission', submissionOntology._prefix)
- writer.set_namespace('ucscDaf', dafTermOntology._prefix)
- writer.set_namespace('geoSoft', geoSoftNS._prefix)
- writer.set_namespace('encode3', encode3NS._prefix)
- return writer
+ model.bind('htswlib', libraryOntology)
+ model.bind('ucscSubmission', submissionOntology)
+ model.bind('ucscDaf', dafTermOntology)
+ model.bind('geoSoft', geoSoftNS)
+ model.bind('encode3', encode3NS)
+ return model
def get_turtle_header():
"""Return a turtle header with our typical namespaces
"""
- serializer = get_serializer()
empty = get_model()
- return serializer.serialize_model_to_string(empty)
+ add_default_namespaces(model)
+ return model.serialize(format='turtle')
def dump_model(model, destination=None):
if destination is None:
destination = sys.stdout
- serializer = get_serializer()
- destination.write(serializer.serialize_model_to_string(model))
- destination.write(os.linesep)
+ add_default_namespaces(model)
+ model.serialize(destination, format='turtle')
from datetime import datetime
import six
+from rdflib import BNode, ConjunctiveGraph, Literal, Namespace, URIRef, Graph
+
from htsworkflow.util.rdfhelp import \
add_default_schemas, \
- blankOrUri, \
- dcNS, \
+ DC, \
dump_model, \
- fromTypedNode, \
- get_model, \
guess_parser, \
guess_parser_by_extension, \
load_string_into_model, \
- owlNS, \
- rdfNS, \
- rdfsNS, \
+ OWL, \
remove_schemas, \
- toTypedNode, \
+ RDF, \
+ RDFS, \
strip_namespace, \
simplify_uri, \
sanitize_literal, \
- xsdNS
-
-try:
- import RDF
-
- class TestRDFHelp(TestCase):
- def test_from_none(self):
- self.assertEqual(fromTypedNode(None), None)
-
- def test_typed_node_boolean(self):
- node = toTypedNode(True)
- self.assertIn(node.literal_value['string'], (u'1', u'true'))
- self.assertEqual(str(node.literal_value['datatype']),
- 'http://www.w3.org/2001/XMLSchema#boolean')
-
- def test_bad_boolean(self):
- node = RDF.Node(literal='bad', datatype=xsdNS['boolean'].uri)
- # older versions of librdf ~< 1.0.16 left the literal
- # alone. and thus should fail the fromTypedNode call
- # newer versions coerced the odd value to false.
- try:
- self.assertFalse(fromTypedNode(node))
- except ValueError as e:
- pass
-
- def test_typed_node_string_node_attributes(self):
- node = toTypedNode('hello')
- self.assertEqual(node.literal_value['string'], u'hello')
- self.assertTrue(node.literal_value['datatype'] is None)
-
- def test_typed_real_like(self):
- num = 3.14
- node = toTypedNode(num)
- self.assertEqual(fromTypedNode(node), num)
-
- def test_typed_integer(self):
- num = 3
- node = toTypedNode(num)
- self.assertEqual(fromTypedNode(node), num)
- self.assertEqual(type(fromTypedNode(node)), type(num))
-
- def test_typed_node_string(self):
- s = "Argh matey"
- node = toTypedNode(s)
- self.assertEqual(fromTypedNode(node), s)
- self.assertTrue(isinstance(fromTypedNode(node), six.text_type))
-
- def test_blank_or_uri_blank(self):
- node = blankOrUri()
- self.assertEqual(node.is_blank(), True)
-
- def test_blank_or_uri_url(self):
- s = 'http://google.com'
- node = blankOrUri(s)
- self.assertEqual(node.is_resource(), True)
- self.assertEqual(str(node.uri), s)
-
- def test_blank_or_uri_node(self):
- s = RDF.Node(RDF.Uri('http://google.com'))
- node = blankOrUri(s)
- self.assertEqual(node.is_resource(), True)
- self.assertEqual(node, s)
-
- def test_unicode_node_roundtrip(self):
- literal = u'\u5927'
- roundtrip = fromTypedNode(toTypedNode(literal))
- self.assertTrue(isinstance(roundtrip, six.text_type))
-
- def test_datetime_no_microsecond(self):
- dateTimeType = xsdNS['dateTime'].uri
- short_isostamp = '2011-12-20T11:44:25'
- short_node = RDF.Node(literal=short_isostamp,
- datatype=dateTimeType)
- short_datetime = datetime(2011,12,20,11,44,25)
-
- self.assertEqual(fromTypedNode(short_node), short_datetime)
- self.assertEqual(toTypedNode(short_datetime), short_node)
- self.assertEqual(fromTypedNode(toTypedNode(short_datetime)),
- short_datetime)
-
- def test_datetime_with_microsecond(self):
- dateTimeType = xsdNS['dateTime'].uri
- long_isostamp = '2011-12-20T11:44:25.081776'
- long_node = RDF.Node(literal=long_isostamp,
- datatype=dateTimeType)
- long_datetime = datetime(2011,12,20,11,44,25,81776)
-
- self.assertEqual(fromTypedNode(long_node), long_datetime)
- self.assertEqual(toTypedNode(long_datetime), long_node)
- self.assertEqual(fromTypedNode(toTypedNode(long_datetime)),
- long_datetime)
-
- def test_strip_namespace_uri(self):
- nsOrg = RDF.NS('example.org/example#')
- nsCom = RDF.NS('example.com/example#')
-
- term = 'foo'
- node = nsOrg[term]
- self.assertEqual(strip_namespace(nsOrg, node), term)
- self.assertEqual(strip_namespace(nsCom, node), None)
- self.assertEqual(strip_namespace(nsOrg, node.uri), term)
-
- def test_strip_namespace_exceptions(self):
- nsOrg = RDF.NS('example.org/example#')
- nsCom = RDF.NS('example.com/example#')
-
- node = toTypedNode('bad')
- self.assertRaises(ValueError, strip_namespace, nsOrg, node)
- self.assertRaises(ValueError, strip_namespace, nsOrg, nsOrg)
-
- def test_simplify_uri(self):
- DATA = [('http://asdf.org/foo/bar', 'bar'),
- ('http://asdf.org/foo/bar#bleem', 'bleem'),
- ('http://asdf.org/foo/bar/', 'bar'),
- ('http://asdf.org/foo/bar?was=foo', 'was=foo')]
-
- for uri, expected in DATA:
- self.assertEqual(simplify_uri(uri), expected)
-
- for uri, expected in DATA:
- n = RDF.Uri(uri)
- self.assertEqual(simplify_uri(n), expected)
-
- for uri, expected in DATA:
- n = RDF.Node(RDF.Uri(uri))
- self.assertEqual(simplify_uri(n), expected)
-
- # decoding literals is questionable
- n = toTypedNode('http://foo/bar')
- self.assertRaises(ValueError, simplify_uri, n)
-
- def test_owl_import(self):
- path, name = os.path.split(__file__)
- #loc = 'file://'+os.path.abspath(path)+'/'
- loc = os.path.abspath(path)+'/'
- model = get_model()
- fragment = '''
+ XSD
+
+class TestRDFHelp(TestCase):
+ def test_typed_node_boolean(self):
+ node = Literal(True)
+ self.assertTrue(node.value)
+ self.assertEqual(str(node.datatype),
+ 'http://www.w3.org/2001/XMLSchema#boolean')
+
+ def test_typed_node_string_node_attributes(self):
+ node = Literal('hello')
+ self.assertEqual(node.value, 'hello')
+ self.assertTrue(node.datatype is None)
+
+ def test_typed_real_like(self):
+ num = 3.14
+ node = Literal(num)
+ self.assertEqual(node.toPython(), num)
+ self.assertEqual(type(node.toPython()), float)
+
+ def test_typed_integer(self):
+ num = 3
+ node = Literal(num)
+ self.assertEqual(node.toPython(), num)
+ self.assertEqual(type(node.toPython()), type(num))
+
+ def test_typed_node_string(self):
+ s = "Argh matey"
+ node = Literal(s)
+ self.assertEqual(node.toPython(), s)
+ self.assertTrue(isinstance(node.toPython(), six.text_type))
+
+ def test_unicode_node_roundtrip(self):
+ literal = u'\u5927'
+ roundtrip = Literal(literal).toPython()
+ self.assertTrue(isinstance(roundtrip, six.text_type))
+
+ def test_datetime_no_microsecond(self):
+ dateTimeType = XSD.dateTime
+ short_isostamp = '2011-12-20T11:44:25'
+ short_node = Literal(short_isostamp,
+ datatype=dateTimeType)
+ short_datetime = datetime(2011,12,20,11,44,25)
+
+ self.assertEqual(short_node.toPython(), short_datetime)
+ self.assertEqual(Literal(short_datetime), short_node)
+ self.assertEqual(Literal(short_datetime).toPython(),
+ short_datetime)
+
+ def test_datetime_with_microsecond(self):
+ dateTimeType = XSD.dateTime
+ long_isostamp = '2011-12-20T11:44:25.081776'
+ long_node = Literal(long_isostamp,
+ datatype=dateTimeType)
+ long_datetime = datetime(2011,12,20,11,44,25,81776)
+
+ self.assertEqual(long_node.toPython(), long_datetime)
+ self.assertEqual(Literal(long_datetime), long_node)
+ self.assertEqual(Literal(long_datetime).toPython(),
+ long_datetime)
+
+ def test_strip_namespace_uri(self):
+ nsOrg = Namespace('example.org/example#')
+ nsCom = Namespace('example.com/example#')
+
+ term = 'foo'
+ node = nsOrg[term]
+ self.assertEqual(strip_namespace(nsOrg, node), term)
+ self.assertEqual(strip_namespace(nsCom, node), None)
+
+ def test_strip_namespace_exceptions(self):
+ nsOrg = Namespace('example.org/example#')
+ nsCom = Namespace('example.com/example#')
+
+ node = Literal('bad')
+ self.assertRaises(ValueError, strip_namespace, nsOrg, node)
+ self.assertRaises(ValueError, strip_namespace, nsOrg, nsOrg)
+ self.assertRaises(ValueError, strip_namespace, nsOrg, str(node))
+
+ def test_simplify_uri(self):
+ DATA = [('http://asdf.org/foo/bar', 'bar'),
+ ('http://asdf.org/foo/bar#bleem', 'bleem'),
+ ('http://asdf.org/foo/bar/', 'bar'),
+ ('http://asdf.org/foo/bar?was=foo', 'was=foo')]
+
+ for uri, expected in DATA:
+ self.assertEqual(simplify_uri(uri), expected)
+
+ for uri, expected in DATA:
+ n = URIRef(uri)
+ self.assertEqual(simplify_uri(n), expected)
+
+ for uri, expected in DATA:
+ n = Literal(URIRef(uri), datatype=XSD.anyURI)
+ self.assertEqual(simplify_uri(n), expected)
+
+ # decoding literals is questionable
+ n = Literal('http://foo/bar')
+ self.assertRaises(ValueError, simplify_uri, n)
+
+ def test_owl_import(self):
+ path, name = os.path.split(__file__)
+ #loc = 'file://'+os.path.abspath(path)+'/'
+ loc = os.path.abspath(path)+'/'
+ model = Graph()
+ fragment = '''
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
_:a owl:imports "{loc}extra.turtle" .
'''.format(loc=loc)
- load_string_into_model(model, 'turtle', fragment, loc)
- tc = RDF.Node(RDF.Uri('http://jumpgate.caltech.edu/wiki/TestCase'))
- query = RDF.Statement(tc, rdfsNS['label'], None)
- result = list(model.find_statements(query))
- self.assertEqual(len(result), 1)
- self.assertEqual(str(result[0].object), 'TestCase')
-
- def test_sanitize_literal_text(self):
- self.assertRaises(ValueError, sanitize_literal, "hi")
- hello_text = "hello"
- hello_none = RDF.Node(hello_text)
- self.assertEqual(str(sanitize_literal(hello_none)),
- hello_text)
- hello_str = RDF.Node(literal=hello_text,
- datatype=xsdNS['string'].uri)
- hello_clean = sanitize_literal(hello_str)
- self.assertEqual(hello_clean.literal_value['string'],
- hello_text)
-
- def test_sanitize_literal_empty_string(self):
- value = ""
- value_node = RDF.Node(value)
- self.assertEqual(str(sanitize_literal(value_node)), value)
-
- def test_sanitize_literal_html(self):
- hello = "hello <a onload='javascript:alert(\"foo\");' href='http://google.com'>google.com</a>, whats up?"
- hello_clean = 'hello <a href="http://google.com">google.com</a>, whats up?'
- hello_node = RDF.Node(literal=hello,
- datatype=xsdNS['string'].uri)
- hello_sanitized = sanitize_literal(hello_node)
- self.assertEqual(hello_sanitized.literal_value['string'],
- hello_clean)
-
- hostile = "hi <b>there</b><script type='text/javascript>alert('boo');</script><a href='javascript:alert('poke')>evil</a> scammer"
- hostile_node = RDF.Node(hostile)
- hostile_sanitized = sanitize_literal(hostile_node)
- # so it drops the stuff after the javascript link.
- # I suppose it could be worse
- hostile_result = """hi <b>there</b>"""
- self.assertEqual(str(hostile_sanitized), hostile_result)
-
- def test_guess_parser_from_file(self):
- DATA = [
- ('/a/b/c.rdf', 'rdfxml'),
- ('/a/b/c.xml', 'rdfxml'),
- ('/a/b/c.html', 'rdfa'),
- ('/a/b/c.turtle', 'turtle'),
- ('http://foo.bar/bleem.turtle', 'turtle')]
- for path, parser in DATA:
- self.assertEqual(guess_parser_by_extension(path), parser)
- self.assertEqual(guess_parser(None, path), parser)
-
- DATA = [
- ('application/rdf+xml', 'http://a.org/b/c', 'rdfxml'),
- ('application/x-turtle', 'http://a.org/b/c', 'turtle'),
- ('text/html', 'http://a.org/b/c', 'rdfa'),
- ('text/html', 'http://a.org/b/c.html', 'rdfa'),
- ('text/plain', 'http://a.org/b/c.turtle', 'turtle'),
- ('text/plain', 'http://a.org/b/c', 'guess')
- ]
- for contenttype, url, parser in DATA:
- self.assertEqual(guess_parser(contenttype, url), parser)
-
- class TestRDFSchemas(TestCase):
- def test_rdf_schema(self):
- """Does it basically work?
- """
- model = get_model()
- self.assertEqual(model.size(), 0)
- add_default_schemas(model)
- self.assertTrue(model.size() > 0)
- remove_schemas(model)
- self.assertEqual(model.size(), 0)
-
- def test_included_schemas(self):
- model = get_model()
- add_default_schemas(model)
-
- # rdf test
- s = RDF.Statement(rdfNS[''], dcNS['title'], None)
- title = model.get_target(rdfNS[''], dcNS['title'])
- self.assertTrue(title is not None)
-
- s = RDF.Statement(rdfNS['Property'], rdfNS['type'], rdfsNS['Class'])
- self.assertTrue(model.contains_statement(s))
-
- # rdfs test
- s = RDF.Statement(rdfsNS['Class'], rdfNS['type'], rdfsNS['Class'])
- self.assertTrue(model.contains_statement(s))
-
- s = RDF.Statement(owlNS['inverseOf'], rdfNS['type'],
- rdfNS['Property'])
- self.assertTrue(model.contains_statement(s))
-
-
-except ImportError as e:
- print("Unable to test rdfhelp")
+ load_string_into_model(model, 'turtle', fragment, loc)
+ tc = URIRef('http://jumpgate.caltech.edu/wiki/TestCase')
+ result = list(model.triples((tc, RDFS.label, None)))
+ self.assertEqual(len(result), 1)
+ self.assertEqual(str(result[0][2]), 'TestCase')
+
+ def test_sanitize_literal_text(self):
+ self.assertRaises(ValueError, sanitize_literal, "hi")
+ hello_text = "hello"
+ hello_none = Literal(hello_text)
+ self.assertEqual(str(sanitize_literal(hello_none)),
+ hello_text)
+ hello_str = Literal(hello_text,
+ datatype=XSD['string'])
+ hello_clean = sanitize_literal(hello_str)
+ self.assertEqual(hello_clean.value, hello_text)
+
+ def test_sanitize_literal_empty_string(self):
+ value = ""
+ value_node = Literal(value)
+ self.assertEqual(str(sanitize_literal(value_node)), value)
+
+ def test_sanitize_literal_html(self):
+ hello = "hello <a onload='javascript:alert(\"foo\");' href='http://google.com'>google.com</a>, whats up?"
+ hello_clean = 'hello <a href="http://google.com">google.com</a>, whats up?'
+ hello_node = Literal(hello,
+ datatype=XSD['string'])
+ hello_sanitized = sanitize_literal(hello_node)
+ self.assertEqual(hello_sanitized.value, hello_clean)
+
+ hostile = "hi <b>there</b><script type='text/javascript>alert('boo');</script><a href='javascript:alert('poke')>evil</a> scammer"
+ hostile_node = Literal(hostile)
+ hostile_sanitized = sanitize_literal(hostile_node)
+ # so it drops the stuff after the javascript link.
+ # I suppose it could be worse
+ hostile_result = """hi <b>there</b>"""
+ self.assertEqual(str(hostile_sanitized), hostile_result)
+
+ def test_guess_parser_from_file(self):
+ DATA = [
+ ('/a/b/c.rdf', 'rdfxml'),
+ ('/a/b/c.xml', 'rdfxml'),
+ ('/a/b/c.html', 'rdfa'),
+ ('/a/b/c.turtle', 'turtle'),
+ ('http://foo.bar/bleem.turtle', 'turtle')]
+ for path, parser in DATA:
+ self.assertEqual(guess_parser_by_extension(path), parser)
+ self.assertEqual(guess_parser(None, path), parser)
+
+ DATA = [
+ ('application/rdf+xml', 'http://a.org/b/c', 'rdfxml'),
+ ('application/x-turtle', 'http://a.org/b/c', 'turtle'),
+ ('text/html', 'http://a.org/b/c', 'rdfa'),
+ ('text/html', 'http://a.org/b/c.html', 'rdfa'),
+ ('text/plain', 'http://a.org/b/c.turtle', 'turtle'),
+ ('text/plain', 'http://a.org/b/c', 'guess')
+ ]
+ for contenttype, url, parser in DATA:
+ self.assertEqual(guess_parser(contenttype, url), parser)
+
+class TestRDFSchemas(TestCase):
+ def test_rdf_schema(self):
+ """Does it basically work?
+ """
+ model = ConjunctiveGraph()
+ self.assertEqual(len(model), 0)
+ add_default_schemas(model)
+ self.assertTrue(len(model) > 0)
+ remove_schemas(model)
+ self.assertEqual(len(model), 0)
+
+ def test_included_schemas(self):
+ model = ConjunctiveGraph()
+ add_default_schemas(model)
+
+ # rdf test
+ s = [RDF, DC['title'], None]
+ title = model.objects(RDF, DC['title'])
+ self.assertTrue(title is not None)
+
+ s = [RDF['Property'], RDF['type'], RDFS['Class']]
+ self.assertIn(s, model)
+
+ # rdfs test
+ s = [RDFS['Class'], RDF['type'], RDFS['Class']]
+ self.assertIn(s, model)
+
+ s = [OWL['inverseOf'], RDF['type'], RDF['Property']]
+ self.assertIn(s, model)
def suite():
from unittest import TestSuite, defaultTestLoader