import RDF
from htsworkflow.util.rdfhelp import libraryOntology as libNS
from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
- stripNamespace, dump_model, simplify_uri
+ strip_namespace, dump_model, simplify_uri
LOGGER = logging.getLogger(__name__)
raise KeyError(u"%s not found" % (unicode(seq_id),))
seq_type_node = model.get_target(seq_id, libNS['file_type'])
- seq_type = stripNamespace(libNS, seq_type_node)
+ seq_type = strip_namespace(libNS, seq_type_node)
path = urlparse(str(seq_id.uri)).path
flowcellNode = get_one(seq_id, libNS['flowcell'])
from htsworkflow.submission.fastqname import FastqName
from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
fromTypedNode, \
- stripNamespace
+ strip_namespace
from htsworkflow.util.rdfns import *
from htsworkflow.util.conversion import parse_flowcell_id
ispaired = property(_get_ispaired)
def _get_filetype(self):
- return stripNamespace(libraryOntology, self._filetype)
+ return strip_namespace(libraryOntology, self._filetype)
filetype = property(_get_filetype)
def _get_path(self):
from htsworkflow.util.rdfhelp import \
fromTypedNode, \
geoSoftNS, \
- stripNamespace, \
+ strip_namespace, \
submissionOntology
from django.conf import settings
def query_to_soft_dictionary(self, results, heading):
attributes = []
for r in results:
- name = stripNamespace(geoSoftNS, r['name'])
+ name = strip_namespace(geoSoftNS, r['name'])
if name is not None:
if name.lower() == heading.lower():
name = '^' + name
dump_model, \
fromTypedNode, \
get_model, \
- stripNamespace, \
+ strip_namespace, \
toTypedNode
from htsworkflow.util.rdfns import *
from htsworkflow.util.hashfile import make_md5sum
query = RDF.SPARQLQuery(query_body)
rdfstream = query.execute(model)
for row in rdfstream:
- s = stripNamespace(submissionLog, row['submission'])
+ s = strip_namespace(submissionLog, row['submission'])
if s[-1] in ['#', '/', '?']:
s = s[:-1]
yield s
from htsworkflow.util.rdfhelp import \
fromTypedNode, \
geoSoftNS, \
- stripNamespace, \
submissionOntology
from htsworkflow.util.url import parse_ssh_url
from htsworkflow.util.ucsc import bigWigInfo
return element
raise ValueError("Unable to simplify %s" % (uri,))
-def stripNamespace(namespace, term):
+def strip_namespace(namespace, term):
"""Remove the namespace portion of a term
returns None if they aren't in common
rdfsNS, \
remove_schemas, \
toTypedNode, \
- stripNamespace, \
+ strip_namespace, \
simplify_uri, \
sanitize_literal, \
xsdNS
term = 'foo'
node = nsOrg[term]
- self.assertEqual(stripNamespace(nsOrg, node), term)
- self.assertEqual(stripNamespace(nsCom, node), None)
- self.assertEqual(stripNamespace(nsOrg, node.uri), term)
+ self.assertEqual(strip_namespace(nsOrg, node), term)
+ self.assertEqual(strip_namespace(nsCom, node), None)
+ self.assertEqual(strip_namespace(nsOrg, node.uri), term)
def test_strip_namespace_exceptions(self):
nsOrg = RDF.NS('example.org/example#')
nsCom = RDF.NS('example.com/example#')
node = toTypedNode('bad')
- self.assertRaises(ValueError, stripNamespace, nsOrg, node)
- self.assertRaises(ValueError, stripNamespace, nsOrg, nsOrg)
+ self.assertRaises(ValueError, strip_namespace, nsOrg, node)
+ self.assertRaises(ValueError, strip_namespace, nsOrg, nsOrg)
def test_simplify_uri(self):
DATA = [('http://asdf.org/foo/bar', 'bar'),