"""Helper features for working with librdf
"""
from datetime import datetime
+from urlparse import urlparse, urlunparse
+from urllib2 import urlopen
import logging
import os
import types
+import lxml.html
+import lxml.html.clean
import RDF
logger = logging.getLogger(__name__)
owlNS = RDF.NS('http://www.w3.org/2002/07/owl#')
dublinCoreNS = RDF.NS("http://purl.org/dc/elements/1.1/")
rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#")
-rdfsNS= RDF.NS("http://www.w3.org/2000/01/rdf-schema#")
+rdfsNS = RDF.NS("http://www.w3.org/2000/01/rdf-schema#")
xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#")
# internal ontologies
-submissionOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#")
+submissionOntology = RDF.NS(
+ "http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#")
dafTermOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UcscDaf#")
libraryOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#")
-inventoryOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/InventoryOntology#")
+inventoryOntology = RDF.NS(
+ "http://jumpgate.caltech.edu/wiki/InventoryOntology#")
submissionLog = RDF.NS("http://jumpgate.caltech.edu/wiki/SubmissionsLog/")
+geoSoftNS = RDF.NS('http://www.ncbi.nlm.nih.gov/geo/info/soft2.html#')
ISOFORMAT_MS = "%Y-%m-%dT%H:%M:%S.%f"
ISOFORMAT_SHORT = "%Y-%m-%dT%H:%M:%S"
+
def sparql_query(model, query_filename):
"""Execute sparql query from file
"""
logger.info("Opening: %s" % (query_filename,))
- query_body = open(query_filename,'r').read()
+ query_body = open(query_filename, 'r').read()
query = RDF.SPARQLQuery(query_body)
results = query.execute(model)
display_query_results(results)
+
def display_query_results(results):
+ """A very simple display of sparql query results showing name value pairs
+ """
for row in results:
- output = []
- for k,v in row.items()[::-1]:
- print "{0}: {1}".format(k,v)
+ for k, v in row.items()[::-1]:
+ print "{0}: {1}".format(k, v)
print
def blankOrUri(value=None):
+ """Return a blank node for None or a resource node for strings.
+ """
node = None
if value is None:
node = RDF.Node()
def toTypedNode(value):
+ """Convert a python variable to a RDF Node with its closest xsd type
+ """
if type(value) == types.BooleanType:
value_type = xsdNS['boolean'].uri
if value:
node = RDF.Node(literal=unicode(value).encode('utf-8'))
return node
+
def fromTypedNode(node):
+ """Convert a typed RDF Node to its closest python equivalent
+ """
if node is None:
return None
- value_type = str(node.literal_value['datatype'])
- # chop off xml schema declaration
- value_type = value_type.replace(str(xsdNS[''].uri),'')
+ value_type = get_node_type(node)
literal = node.literal_value['string']
literal_lower = literal.lower()
elif value_type in ('dateTime'):
try:
return datetime.strptime(literal, ISOFORMAT_MS)
- except ValueError, e:
+ except ValueError, _:
return datetime.strptime(literal, ISOFORMAT_SHORT)
return literal
+def get_node_type(node):
+ """Return just the base name of a XSD datatype:
+ e.g. http://www.w3.org/2001/XMLSchema#integer -> integer
+ """
+ # chop off xml schema declaration
+ value_type = node.literal_value['datatype']
+ if value_type is None:
+ return "string"
+ else:
+ value_type = str(value_type)
+ return value_type.replace(str(xsdNS[''].uri), '')
+
+
+def simplifyUri(namespace, term):
+ """Remove the namespace portion of a term
+
+ returns None if they aren't in common
+ """
+ if isinstance(term, RDF.Node):
+ if term.is_resource():
+ term = term.uri
+ else:
+ raise ValueError("This works on resources")
+ elif not isinstance(term, RDF.Uri):
+ raise ValueError("This works on resources")
+ term_s = str(term)
+ if not term_s.startswith(namespace._prefix):
+ return None
+ return term_s.replace(namespace._prefix, "")
+
+
def get_model(model_name=None, directory=None):
if directory is None:
directory = os.getcwd()
return model
-def load_into_model(model, parser_name, filename, ns=None):
- if not os.path.exists(filename):
- raise IOError("Can't find {0}".format(filename))
-
- data = open(filename, 'r').read()
+def load_into_model(model, parser_name, path, ns=None):
+ url_parts = list(urlparse(path))
+ if len(url_parts[0]) == 0:
+ url_parts[0] = 'file'
+ url_parts[2] = os.path.abspath(url_parts[2])
+ url = urlunparse(url_parts)
+ logger.info("Opening %s" % (url,))
+ req = urlopen(url)
+ logger.debug("request status: %s" % (req.code,))
+ if parser_name is None:
+ content_type = req.headers.get('Content-Type', None)
+ parser_name = guess_parser(content_type, path)
+ logger.debug("Guessed parser: %s" % (parser_name,))
+ data = req.read()
load_string_into_model(model, parser_name, data, ns)
def load_string_into_model(model, parser_name, data, ns=None):
if ns is None:
- ns = "http://localhost/"
-
+ ns = RDF.NS("http://localhost/")
+ imports = owlNS['imports']
rdf_parser = RDF.Parser(name=parser_name)
- rdf_parser.parse_string_into_model(model, data, ns)
+ for s in rdf_parser.parse_string_as_stream(data, ns):
+ if s.predicate == imports:
+ obj = str(s.object)
+ logger.info("Importing %s" % (obj,))
+ load_into_model(model, None, obj, ns)
+ if s.object.is_literal():
+ value_type = get_node_type(s.object)
+ if value_type == 'string':
+ s.object = sanitize_literal(s.object)
+ model.add_statement(s)
+
+
+def sanitize_literal(node):
+ """Clean up a literal string
+ """
+ if not isinstance(node, RDF.Node):
+ raise ValueError("sanitize_literal only works on RDF.Nodes")
+
+ element = lxml.html.fromstring(node.literal_value['string'])
+ cleaner = lxml.html.clean.Cleaner(page_structure=False)
+ element = cleaner.clean_html(element)
+ text = lxml.html.tostring(element)
+ p_len = 3
+ slash_p_len = 4
+
+ args = {'literal': text[p_len:-slash_p_len]}
+ datatype = node.literal_value['datatype']
+ if datatype is not None:
+ args['datatype'] = datatype
+ language = node.literal_value['language']
+ if language is not None:
+ args['language'] = language
+ return RDF.Node(**args)
+
+
+def guess_parser(content_type, pathname):
+ if content_type in ('application/rdf+xml'):
+ return 'rdfxml'
+ elif content_type in ('application/x-turtle'):
+ return 'turtle'
+ elif content_type in ('text/html'):
+ return 'rdfa'
+ elif content_type is None:
+ _, ext = os.path.splitext(pathname)
+ if ext in ('xml', 'rdf'):
+ return 'rdfxml'
+ elif ext in ('html'):
+ return 'rdfa'
+ elif ext in ('turtle'):
+ return 'turtle'
+ return 'guess'
def get_serializer(name='turtle'):
writer.set_namespace('ucscDaf', dafTermOntology._prefix)
return writer
+
def dump_model(model):
serializer = get_serializer()
print serializer.serialize_model_to_string(model)
+import os
import unittest
import types
+
from datetime import datetime
from htsworkflow.util.rdfhelp import \
blankOrUri, \
- toTypedNode, \
+ dump_model, \
fromTypedNode, \
+ get_model, \
+ load_string_into_model, \
+ rdfsNS, \
+ toTypedNode, \
+ simplifyUri, \
+ sanitize_literal, \
xsdNS
try:
- import RDF
-
- class TestRDFHelp(unittest.TestCase):
- def test_typed_node_boolean(self):
- node = toTypedNode(True)
- self.failUnlessEqual(node.literal_value['string'], u'1')
- self.failUnlessEqual(str(node.literal_value['datatype']),
- 'http://www.w3.org/2001/XMLSchema#boolean')
-
- def test_typed_node_string(self):
- node = toTypedNode('hello')
- self.failUnlessEqual(node.literal_value['string'], u'hello')
- self.failUnless(node.literal_value['datatype'] is None)
-
- def test_blank_or_uri_blank(self):
- node = blankOrUri()
- self.failUnlessEqual(node.is_blank(), True)
-
- def test_blank_or_uri_url(self):
- s = 'http://google.com'
- node = blankOrUri(s)
- self.failUnlessEqual(node.is_resource(), True)
- self.failUnlessEqual(str(node.uri), s)
-
- def test_blank_or_uri_node(self):
- s = RDF.Node(RDF.Uri('http://google.com'))
- node = blankOrUri(s)
- self.failUnlessEqual(node.is_resource(), True)
- self.failUnlessEqual(node, s)
-
- def test_unicode_node_roundtrip(self):
- literal = u'\u5927'
- roundtrip = fromTypedNode(toTypedNode(literal))
- self.failUnlessEqual(roundtrip, literal)
- self.failUnlessEqual(type(roundtrip), types.UnicodeType)
-
- def test_datetime_no_microsecond(self):
- dateTimeType = xsdNS['dateTime'].uri
- short_isostamp = '2011-12-20T11:44:25'
- short_node = RDF.Node(literal=short_isostamp,
- datatype=dateTimeType)
- short_datetime = datetime(2011,12,20,11,44,25)
-
- self.assertEqual(fromTypedNode(short_node), short_datetime)
- self.assertEqual(toTypedNode(short_datetime), short_node)
- self.assertEqual(fromTypedNode(toTypedNode(short_datetime)),
- short_datetime)
-
- def test_datetime_with_microsecond(self):
- dateTimeType = xsdNS['dateTime'].uri
- long_isostamp = '2011-12-20T11:44:25.081776'
- long_node = RDF.Node(literal=long_isostamp,
- datatype=dateTimeType)
- long_datetime = datetime(2011,12,20,11,44,25,81776)
-
- self.assertEqual(fromTypedNode(long_node), long_datetime)
- self.assertEqual(toTypedNode(long_datetime), long_node)
- self.assertEqual(fromTypedNode(toTypedNode(long_datetime)),
- long_datetime)
-
- def suite():
- return unittest.makeSuite(testRdfHelp, 'test')
+ import RDF
+
+ class TestRDFHelp(unittest.TestCase):
+ def test_from_none(self):
+ self.failUnlessEqual(fromTypedNode(None), None)
+
+ def test_typed_node_boolean(self):
+ node = toTypedNode(True)
+ self.failUnlessEqual(node.literal_value['string'], u'1')
+ self.failUnlessEqual(str(node.literal_value['datatype']),
+ 'http://www.w3.org/2001/XMLSchema#boolean')
+
+ def test_bad_boolean(self):
+ node = RDF.Node(literal='bad', datatype=xsdNS['boolean'].uri)
+ self.failUnlessRaises(ValueError, fromTypedNode, node)
+
+ def test_typed_node_string(self):
+ node = toTypedNode('hello')
+ self.failUnlessEqual(node.literal_value['string'], u'hello')
+ self.failUnless(node.literal_value['datatype'] is None)
+
+ def test_typed_real_like(self):
+ num = 3.14
+ node = toTypedNode(num)
+ self.failUnlessEqual(fromTypedNode(node), num)
+
+ def test_typed_integer(self):
+ num = 3
+ node = toTypedNode(num)
+ self.failUnlessEqual(fromTypedNode(node), num)
+ self.failUnlessEqual(type(fromTypedNode(node)), type(num))
+
+ def test_typed_node_string(self):
+ s = "Argh matey"
+ node = toTypedNode(s)
+ self.failUnlessEqual(fromTypedNode(node), s)
+ self.failUnlessEqual(type(fromTypedNode(node)), types.UnicodeType)
+
+ def test_blank_or_uri_blank(self):
+ node = blankOrUri()
+ self.failUnlessEqual(node.is_blank(), True)
+
+ def test_blank_or_uri_url(self):
+ s = 'http://google.com'
+ node = blankOrUri(s)
+ self.failUnlessEqual(node.is_resource(), True)
+ self.failUnlessEqual(str(node.uri), s)
+
+ def test_blank_or_uri_node(self):
+ s = RDF.Node(RDF.Uri('http://google.com'))
+ node = blankOrUri(s)
+ self.failUnlessEqual(node.is_resource(), True)
+ self.failUnlessEqual(node, s)
+
+ def test_unicode_node_roundtrip(self):
+ literal = u'\u5927'
+ roundtrip = fromTypedNode(toTypedNode(literal))
+ self.failUnlessEqual(roundtrip, literal)
+ self.failUnlessEqual(type(roundtrip), types.UnicodeType)
+
+ def test_datetime_no_microsecond(self):
+ dateTimeType = xsdNS['dateTime'].uri
+ short_isostamp = '2011-12-20T11:44:25'
+ short_node = RDF.Node(literal=short_isostamp,
+ datatype=dateTimeType)
+ short_datetime = datetime(2011,12,20,11,44,25)
+
+ self.assertEqual(fromTypedNode(short_node), short_datetime)
+ self.assertEqual(toTypedNode(short_datetime), short_node)
+ self.assertEqual(fromTypedNode(toTypedNode(short_datetime)),
+ short_datetime)
+
+ def test_datetime_with_microsecond(self):
+ dateTimeType = xsdNS['dateTime'].uri
+ long_isostamp = '2011-12-20T11:44:25.081776'
+ long_node = RDF.Node(literal=long_isostamp,
+ datatype=dateTimeType)
+ long_datetime = datetime(2011,12,20,11,44,25,81776)
+
+ self.assertEqual(fromTypedNode(long_node), long_datetime)
+ self.assertEqual(toTypedNode(long_datetime), long_node)
+ self.assertEqual(fromTypedNode(toTypedNode(long_datetime)),
+ long_datetime)
+
+ def test_simplify_uri(self):
+ nsOrg = RDF.NS('example.org/example#')
+ nsCom = RDF.NS('example.com/example#')
+
+ term = 'foo'
+ node = nsOrg[term]
+ self.failUnlessEqual(simplifyUri(nsOrg, node), term)
+ self.failUnlessEqual(simplifyUri(nsCom, node), None)
+ self.failUnlessEqual(simplifyUri(nsOrg, node.uri), term)
+
+ def test_simplify_uri_exceptions(self):
+ nsOrg = RDF.NS('example.org/example#')
+ nsCom = RDF.NS('example.com/example#')
+
+ node = toTypedNode('bad')
+ self.failUnlessRaises(ValueError, simplifyUri, nsOrg, node)
+ self.failUnlessRaises(ValueError, simplifyUri, nsOrg, nsOrg)
+
+ def test_owl_import(self):
+ path, name = os.path.split(__file__)
+ loc = 'file://'+os.path.abspath(path)+'/'
+ model = get_model()
+ fragment = '''
+@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix owl: <http://www.w3.org/2002/07/owl#> .
+
+_:a owl:imports "{loc}extra.turtle"
+'''.format(loc=loc)
+ load_string_into_model(model, 'turtle', fragment, loc)
+ tc = RDF.Node(RDF.Uri('http://jumpgate.caltech.edu/wiki/TestCase'))
+ query = RDF.Statement(tc, rdfsNS['label'], None)
+ result = list(model.find_statements(query))
+ self.failUnlessEqual(len(result), 1)
+ self.failUnlessEqual(str(result[0].object), 'TestCase')
+
+ def test_sanitize_literal_text(self):
+ self.failUnlessRaises(ValueError, sanitize_literal, "hi")
+ hello_text = "hello"
+ hello_none = RDF.Node(hello_text)
+ self.failUnlessEqual(str(sanitize_literal(hello_none)),
+ hello_text)
+ hello_str = RDF.Node(literal=hello_text,
+ datatype=xsdNS['string'].uri)
+ self.failUnlessEqual(str(sanitize_literal(hello_str)),
+ hello_text)
+
+ def test_sanitize_literal_html(self):
+ hello = "hello <a onload='javascript:alert(\"foo\");' href='http://google.com'>google.com</a>, whats up?"
+ hello_clean = 'hello <a href="http://google.com">google.com</a>, whats up?'
+ hello_node = RDF.Node(literal=hello,
+ datatype=xsdNS['string'].uri)
+ hello_sanitized = sanitize_literal(hello_node)
+ self.failUnlessEqual(str(hello_sanitized),
+ hello_clean)
+
+ hostile = "hi <b>there</b><script type='text/javascript>alert('boo');</script><a href='javascript:alert('poke')>evil</a> scammer"
+ hostile_node = RDF.Node(hostile)
+ hostile_sanitized = sanitize_literal(hostile_node)
+ # so it drops the stuff after the javascript link.
+ # I suppose it could be worse
+ hostile_result = """hi <b>there</b>"""
+ self.failUnlessEqual(str(hostile_sanitized), hostile_result)
+
+
+ def suite():
+ return unittest.makeSuite(TestRDFHelp, 'test')
except ImportError, e:
print "Unable to test rdfhelp"