"""
import collections
from datetime import datetime
+ from glob import glob
from urlparse import urlparse, urlunparse
from urllib2 import urlopen
import logging
logger = logging.getLogger(__name__)
- # standard ontology namespaces
- owlNS = RDF.NS('http://www.w3.org/2002/07/owl#')
- dublinCoreNS = RDF.NS("http://purl.org/dc/elements/1.1/")
- rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#")
- rdfsNS = RDF.NS("http://www.w3.org/2000/01/rdf-schema#")
- xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#")
-
- # internal ontologies
- submissionOntology = RDF.NS(
- "http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#")
- dafTermOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UcscDaf#")
- libraryOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#")
- inventoryOntology = RDF.NS(
- "http://jumpgate.caltech.edu/wiki/InventoryOntology#")
- submissionLog = RDF.NS("http://jumpgate.caltech.edu/wiki/SubmissionsLog/")
- geoSoftNS = RDF.NS('http://www.ncbi.nlm.nih.gov/geo/info/soft2.html#')
+ from htsworkflow.util.rdfns import *
+
+ SCHEMAS_URL='http://jumpgate.caltech.edu/phony/schemas'
+ INFERENCE_URL='http://jumpgate.caltech.edu/phony/inference'
ISOFORMAT_MS = "%Y-%m-%dT%H:%M:%S.%f"
ISOFORMAT_SHORT = "%Y-%m-%dT%H:%M:%S"
-
def sparql_query(model, query_filename, output_format='text'):
"""Execute sparql query from file
"""
directory = os.getcwd()
if model_name is None:
- storage = RDF.MemoryStorage()
+ storage = RDF.MemoryStorage(options_string="contexts='yes'")
logger.info("Using RDF Memory model")
else:
- options = "hash-type='bdb',dir='{0}'".format(directory)
+ options = "contexts='yes',hash-type='bdb',dir='{0}'".format(directory)
storage = RDF.HashStorage(model_name,
options=options)
logger.info("Using {0} with options {1}".format(model_name, options))
if len(url_parts[0]) == 0 or url_parts[0] == 'file':
url_parts[0] = 'file'
url_parts[2] = os.path.abspath(url_parts[2])
- if parser_name is None or parser_name == 'guess':
- parser_name = guess_parser_by_extension(path)
+ if parser_name is None or parser_name == 'guess':
+ parser_name = guess_parser_by_extension(path)
url = urlunparse(url_parts)
logger.info("Opening {0} with parser {1}".format(url, parser_name))
rdf_parser = RDF.Parser(name=parser_name)
+ statements = []
retries = 3
while retries > 0:
try:
retries = 0
except RDF.RedlandError, e:
errmsg = "RDF.RedlandError: {0} {1} tries remaining"
- logger.error(errmsg.format(str(e), tries))
+ logger.error(errmsg.format(str(e), retries))
for s in statements:
conditionally_add_statement(model, s, ns)
for s in rdf_parser.parse_string_as_stream(data, ns):
conditionally_add_statement(model, s, ns)
+
def fixup_namespace(ns):
if ns is None:
ns = RDF.Uri("http://localhost/")
raise ValueError(errmsg.format(str(type(ns))))
return ns
+
def conditionally_add_statement(model, s, ns):
imports = owlNS['imports']
if s.predicate == imports:
s.object = sanitize_literal(s.object)
model.add_statement(s)
+
+ def add_default_schemas(model, schema_path=None):
+ """Add default schemas to a model
+ Looks for turtle files in either htsworkflow/util/schemas
+ or in the list of directories provided in schema_path
+ """
+
+ if schema_path is None:
+ path, _ = os.path.split(__file__)
+ schema_path = [os.path.join(path, 'schemas')]
+ elif type(schema_path) in types.StringTypes:
+ schema_path = [schema_path]
+
+ for p in schema_path:
+ for f in glob(os.path.join(p, '*.turtle')):
+ add_schema(model, f)
+
+ def add_schema(model, filename):
+ """Add a schema to a model.
+
+ Main difference from 'load_into_model' is it tags it with
+ a RDFlib context so I can remove them later.
+ """
+ parser = RDF.Parser(name='turtle')
+ context = RDF.Node(RDF.Uri(SCHEMAS_URL))
+ url = 'file://' + filename
+ for s in parser.parse_as_stream(url):
+ try:
+ model.append(s, context)
+ except RDF.RedlandError as e:
+ logger.error("%s with %s", str(e), str(s))
+
+
+ def remove_schemas(model):
+ """Remove statements labeled with our schema context"""
+ context = RDF.Node(RDF.Uri(SCHEMAS_URL))
+ model.context_remove_statements(context)
+
+
def sanitize_literal(node):
"""Clean up a literal string
"""
return 'turtle'
elif content_type in ('text/html',):
return 'rdfa'
- elif content_type is None:
+ elif content_type is None or content_type in ('text/plain',):
return guess_parser_by_extension(pathname)
def guess_parser_by_extension(pathname):
_, ext = os.path.splitext(pathname)
if ext in ('.xml', '.rdf'):
return 'rdfxml'
- elif ext in ('.html'):
+ elif ext in ('.html',):
return 'rdfa'
- elif ext in ('.turtle'):
+ elif ext in ('.turtle',):
return 'turtle'
return 'guess'
"""
writer = RDF.Serializer(name=name)
# really standard stuff
- writer.set_namespace('owl', owlNS._prefix)
writer.set_namespace('rdf', rdfNS._prefix)
writer.set_namespace('rdfs', rdfsNS._prefix)
+ writer.set_namespace('owl', owlNS._prefix)
+ writer.set_namespace('dc', dcNS._prefix)
+ writer.set_namespace('xml', xmlNS._prefix)
writer.set_namespace('xsd', xsdNS._prefix)
+ writer.set_namespace('vs', vsNS._prefix)
+ writer.set_namespace('wot', wotNS._prefix)
# should these be here, kind of specific to an application
writer.set_namespace('libraryOntology', libraryOntology._prefix)
from datetime import datetime
from htsworkflow.util.rdfhelp import \
+ add_default_schemas, \
blankOrUri, \
+ dcNS, \
dump_model, \
fromTypedNode, \
get_model, \
guess_parser, \
guess_parser_by_extension, \
load_string_into_model, \
+ owlNS, \
+ rdfNS, \
rdfsNS, \
+ remove_schemas, \
toTypedNode, \
stripNamespace, \
simplify_uri, \
('/a/b/c.rdf', 'rdfxml'),
('/a/b/c.xml', 'rdfxml'),
('/a/b/c.html', 'rdfa'),
- ('/a/b/c.turtle', 'turtle')]
+ ('/a/b/c.turtle', 'turtle'),
+ ('http://foo.bar/bleem.turtle', 'turtle')]
for path, parser in DATA:
self.assertEqual(guess_parser_by_extension(path), parser)
self.assertEqual(guess_parser(None, path), parser)
DATA = [
('application/rdf+xml', 'http://a.org/b/c', 'rdfxml'),
('application/x-turtle', 'http://a.org/b/c', 'turtle'),
- ('text/html', 'http://a.org/b/c', 'rdfa')
+ ('text/html', 'http://a.org/b/c', 'rdfa'),
+ ('text/html', 'http://a.org/b/c.html', 'rdfa'),
+ ('text/plain', 'http://a.org/b/c.turtle', 'turtle'),
+ ('text/plain', 'http://a.org/b/c', 'guess')
]
for contenttype, url, parser in DATA:
self.assertEqual(guess_parser(contenttype, url), parser)
+ class TestRDFSchemas(unittest.TestCase):
+ def test_rdf_schema(self):
+ """Does it basically work?
+ """
+ model = get_model()
+ self.assertEqual(model.size(), 0)
+ add_default_schemas(model)
+ self.assertGreater(model.size(), 0)
+ remove_schemas(model)
+ self.assertEqual(model.size(), 0)
+
+ def test_included_schemas(self):
+ model = get_model()
+ add_default_schemas(model)
+
+ # rdf test
+ s = RDF.Statement(rdfNS[''], dcNS['title'], None)
+ title = model.get_target(rdfNS[''], dcNS['title'])
+ self.assertTrue(title is not None)
+
+ s = RDF.Statement(rdfNS['Property'], rdfNS['type'], rdfsNS['Class'])
+ self.assertTrue(model.contains_statement(s))
+
+ # rdfs test
+ s = RDF.Statement(rdfsNS['Class'], rdfNS['type'], rdfsNS['Class'])
+ self.assertTrue(model.contains_statement(s))
+
+ s = RDF.Statement(owlNS['inverseOf'], rdfNS['type'],
+ rdfNS['Property'])
+ self.assertTrue(model.contains_statement(s))
+
+
def suite():
return unittest.makeSuite(TestRDFHelp, 'test')
except ImportError, e: