import six
from six.moves.urllib.parse import urljoin, urlparse
-import RDF
-from htsworkflow.util.rdfhelp import libraryOntology as libNS
-from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
- strip_namespace, dump_model, simplify_uri
+from rdflib import BNode, Literal, Namespace, URIRef
+from htsworkflow.util.rdfhelp import (
+ dump_model,
+ libraryOntology as libNS,
+ RDF,
+ simplify_uri,
+ strip_namespace,
+)
+
LOGGER = logging.getLogger(__name__)
def save_to_model(self, model, base_url=None):
def add_lit(model, s, p, o):
if o is not None:
- model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
+ model.add((s, p, Literal(o)))
def add(model, s, p, o):
- model.add_statement(RDF.Statement(s,p,o))
+ model.add((s, p, o))
# a bit unreliable... assumes filesystem is encoded in utf-8
path = os.path.abspath(self.path)
- fileNode = RDF.Node(RDF.Uri('file://' + path))
- add(model, fileNode, rdfNS['type'], libNS['IlluminaResult'])
+ fileNode = URIRef('file://' + path)
+ add(model, fileNode, RDF['type'], libNS['IlluminaResult'])
add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
add_lit(model, fileNode, libNS['lane_number'], self.lane)
if self.read is not None:
add(model, fileNode, libNS['file_type'], libNS[self.filetype])
if base_url is not None:
- flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
+ flowcell = URIRef("{base}/flowcell/{flowcell}/".format(
base=base_url,
- flowcell=self.flowcell)))
+ flowcell=self.flowcell))
add(model, fileNode, libNS['flowcell'], flowcell)
if self.project is not None:
- library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
+ library = URIRef("{base}/library/{library}".format(
base=base_url,
- library=self.project)))
+ library=self.project))
add(model, fileNode, libNS['library'], library)
def load_from_model(cls, model, seq_id):
def get(s, p):
values = []
- stmts = model.find_statements(RDF.Statement(s, p, None))
+ stmts = model.triples((s, p, None))
for s in stmts:
- obj = s.object
- if not obj.is_resource():
- values.append(fromTypedNode(obj))
+ obj = s[2]
+ if not isinstance(obj, URIRef):
+ values.append(obj.toPython())
else:
values.append(obj)
return values
else:
return None
- if not isinstance(seq_id, RDF.Node):
- seq_id = RDF.Node(RDF.Uri(seq_id))
- result_statement = RDF.Statement(seq_id,
- rdfNS['type'],
- libNS['IlluminaResult'])
- if not model.contains_statement(result_statement):
+ if not isinstance(seq_id, URIRef):
+ seq_id = URIRef(seq_id)
+ result_statement = (seq_id, RDF['type'], libNS['IlluminaResult'])
+ if not result_statement in model:
raise KeyError(u"%s not found" % (unicode(seq_id),))
- seq_type_node = model.get_target(seq_id, libNS['file_type'])
+ seq_type_node = list(model.objects(seq_id, libNS['file_type']))[0]
seq_type = strip_namespace(libNS, seq_type_node)
- path = urlparse(str(seq_id.uri)).path
+ path = urlparse(str(seq_id)).path
flowcellNode = get_one(seq_id, libNS['flowcell'])
flowcell = get_one(seq_id, libNS['flowcell_id'])
lane = get_one(seq_id, libNS['lane_number'])
}
"""
LOGGER.debug("update_model_sequence_library query %s", file_body)
- file_query = RDF.SPARQLQuery(file_body)
- files = file_query.execute(model)
+ files = model.query(file_body)
- libraryNS = RDF.NS(urljoin(base_url, 'library/'))
- flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
+ libraryNS = Namespace(urljoin(base_url, 'library/'))
+ flowcellNS = Namespace(urljoin(base_url, 'flowcell/'))
for f in files:
filenode = f['filenode']
LOGGER.debug("Updating file node %s", str(filenode))
- lane_id = fromTypedNode(f['lane_id'])
+ lane_id = f['lane_id'].toPython()
if f['flowcell'] is None:
flowcell = flowcellNS[str(f['flowcell_id'])+'/']
LOGGER.debug("Adding file (%s) to flowcell (%s) link",
str(filenode),
str(flowcell))
- model.add_statement(
- RDF.Statement(filenode, libNS['flowcell'], flowcell))
+ model.add((filenode, libNS['flowcell'], flowcell))
else:
flowcell = f['flowcell']
LOGGER.error("Unable to decypher: %s %s",
str(flowcell), str(lane_id))
continue
- library_id = toTypedNode(simplify_uri(library))
+ library_id = Literal(simplify_uri(library))
LOGGER.debug("Adding file (%s) to library (%s) link",
str(filenode),
str(library))
- model.add_statement(
- RDF.Statement(filenode, libNS['library_id'], library_id))
+ model.add((filenode, libNS['library_id'], library_id))
if library is not None:
- model.add_statement(
- RDF.Statement(filenode, libNS['library'], library))
+ model.add((filenode, libNS['library'], library))
def guess_library_from_model(model, base_url, flowcell, lane_id):
"""Attempt to find library URI
"""
- flowcellNode = RDF.Node(flowcell)
- flowcell = str(flowcell.uri)
+ flowcellNode = URIRef(flowcell)
+ flowcell = str(flowcell)
lane_body = """
prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
tries = 3
while len(lanes) == 0 and tries > 0:
tries -= 1
- lane_query = RDF.SPARQLQuery(lane_body)
- lanes = [ l for l in lane_query.execute(model)]
+ lanes = [ l for l in model.query(lane_body)]
if len(lanes) > 1:
# CONFUSED!
errmsg = "Too many libraries for flowcell {flowcell} "\
return lanes[0]['library']
else:
# try grabbing data
- model.load(flowcellNode.uri, name="rdfa")
+ model.parse(source=flowcellNode, format='rdfa')
import tempfile
from unittest import TestCase
-import RDF
+from rdflib import Graph, Namespace, URIRef
+from rdflib.namespace import RDF
from htsworkflow.pipelines import sequences
-from htsworkflow.util.rdfhelp import get_model, load_string_into_model, \
- rdfNS, libraryOntology, dump_model, fromTypedNode
+from htsworkflow.util.rdfns import libraryOntology
class SequenceFileTests(TestCase):
"""
def test_basic_rdf_scan(self):
"""Make sure we can save to RDF model"""
- import RDF
- model = get_model()
+ model = Graph()
for seq in self._generate_sequences():
seq.save_to_model(model)
- files = list(model.find_statements(
- RDF.Statement(None,
- rdfNS['type'],
- libraryOntology['IlluminaResult'])))
+ files = list(model.triples((None,
+ RDF['type'],
+ libraryOntology['IlluminaResult'])))
self.assertEqual(len(files), 5)
- files = list(model.find_statements(
- RDF.Statement(None,
- libraryOntology['file_type'],
- libraryOntology['qseq'])))
+ files = list(model.triples((None,
+ libraryOntology['file_type'],
+ libraryOntology['qseq'])))
self.assertEqual(len(files), 4)
- files = list(model.find_statements(
- RDF.Statement(None,
- libraryOntology['file_type'],
- libraryOntology['split_fastq'])))
+ files = list(model.triples((None,
+ libraryOntology['file_type'],
+ libraryOntology['split_fastq'])))
self.assertEqual(len(files), 1)
- files = list(model.find_statements(
- RDF.Statement(None, libraryOntology['library_id'], None)))
+ files = list(model.triples((None, libraryOntology['library_id'], None)))
self.assertEqual(len(files), 1)
- files = list(model.find_statements(
- RDF.Statement(None, libraryOntology['flowcell_id'], None)))
+ files = list(model.triples((None, libraryOntology['flowcell_id'], None)))
self.assertEqual(len(files), 5)
- files = list(model.find_statements(
- RDF.Statement(None, libraryOntology['flowcell'], None)))
+ files = list(model.triples((None, libraryOntology['flowcell'], None)))
self.assertEqual(len(files), 0)
- files = list(model.find_statements(
- RDF.Statement(None, libraryOntology['library'], None)))
+ files = list(model.triples((None, libraryOntology['library'], None)))
self.assertEqual(len(files), 0)
def test_rdf_scan_with_url(self):
"""Make sure we can save to RDF model"""
- import RDF
- model = get_model()
+ model = Graph()
base_url = 'http://localhost'
for seq in self._generate_sequences():
seq.save_to_model(model, base_url=base_url)
- localFC = RDF.NS(base_url + '/flowcell/')
- localLibrary = RDF.NS(base_url + '/library/')
+ localFC = Namespace(base_url + '/flowcell/')
+ localLibrary = Namespace(base_url + '/library/')
- files = list(model.find_statements(
- RDF.Statement(None, libraryOntology['flowcell'], None)))
+ files = list(model.triples((None, libraryOntology['flowcell'], None)))
self.assertEqual(len(files), 5)
for f in files:
- self.assertEqual(f.object, localFC['42BW9AAXX/'])
+ # object is index 2 in the tuple
+ self.assertEqual(f[2], localFC['42BW9AAXX/'])
- files = list(model.find_statements(
- RDF.Statement(None, libraryOntology['library'], None)))
+ files = list(model.triples((None, libraryOntology['library'], None)))
self.assertEqual(len(files), 1)
- self.assertEqual(files[0].object, localLibrary['12345'])
+ self.assertEqual(files[0][2], localLibrary['12345'])
def test_rdf_fixup_library(self):
"""Make sure we can save to RDF model"""
base_url = 'http://localhost'
- localLibrary = RDF.NS(base_url + '/library/')
+ localLibrary = Namespace(base_url + '/library/')
flowcellInfo = """@prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#> .
<{base}/lane/1172>
libns:lane_number "3" ; libns:library <{base}/library/10930/> .
""".format(base=base_url)
- model = get_model()
- load_string_into_model(model, 'turtle', flowcellInfo)
+ model = Graph()
+ model.parse(data=flowcellInfo, format='turtle')
for seq in self._generate_sequences():
seq.save_to_model(model)
f = sequences.update_model_sequence_library(model, base_url=base_url)
libIdTerm = libraryOntology['library_id']
url = 'file:///root/42BW9AAXX/C1-152/woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2'
- nodes = list(model.get_targets(RDF.Uri(url), libTerm))
+ nodes = list(model.objects(URIRef(url), libTerm))
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0], localLibrary['10923/'])
- nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
+ nodes = list(model.objects(URIRef(url), libIdTerm))
self.assertEqual(len(nodes), 1)
- self.assertEqual(fromTypedNode(nodes[0]), '10923')
+ self.assertEqual(nodes[0].toPython(), '10923')
url = 'file:///root/42BW9AAXX/C1-152/woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r1.tar.bz2'
- nodes = list(model.get_targets(RDF.Uri(url), libTerm))
+ nodes = list(model.objects(URIRef(url), libTerm))
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0], localLibrary['10924/'])
- nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
+ nodes = list(model.objects(URIRef(url), libIdTerm))
self.assertEqual(len(nodes), 1)
- self.assertEqual(fromTypedNode(nodes[0]), '10924')
+ self.assertEqual(nodes[0].toPython(), '10924')
url = 'file:///root/42BW9AAXX/C1-38/Project_12345/12345_AAATTT_L003_R1_001.fastq.gz'
- nodes = list(model.get_targets(RDF.Uri(url), libTerm))
+ nodes = list(model.objects(URIRef(url), libTerm))
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0], localLibrary['12345/'])
- nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
+ nodes = list(model.objects(URIRef(url), libIdTerm))
self.assertEqual(len(nodes), 1)
- self.assertEqual(fromTypedNode(nodes[0]), '12345')
+ self.assertEqual(nodes[0].toPython(), '12345')
def test_load_from_model(self):
"""Can we round trip through a RDF model"""
- model = get_model()
+ model = Graph()
path = '/root/42BW9AAXX/C1-38/Project_12345/'
filename = '12345_AAATTT_L003_R1_001.fastq.gz'
seq = sequences.parse_fastq(path, filename)
seq.save_to_model(model)
seq_id = 'file://'+path+filename
- seqNode = RDF.Node(RDF.Uri(seq_id))
- libNode = RDF.Node(RDF.Uri('http://localhost/library/12345'))
- model.add_statement(
- RDF.Statement(seqNode, libraryOntology['library'], libNode))
+ seqNode = URIRef(seq_id)
+ libNode = URIRef('http://localhost/library/12345')
+ model.add((seqNode, libraryOntology['library'], libNode))
seq2 = sequences.SequenceFile.load_from_model(model, seq_id)
self.assertEqual(seq.flowcell, seq2.flowcell)