From baa70d7fd5c1309d02f1260532e52c9749bd82a5 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Thu, 9 Mar 2017 14:42:56 -0800 Subject: [PATCH] Port submission.py to rdflib --- htsworkflow/submission/submission.py | 216 ++++++++---------- .../submission/test/submission_test_common.py | 12 +- .../submission/test/test_submission.py | 49 ++-- 3 files changed, 126 insertions(+), 151 deletions(-) diff --git a/htsworkflow/submission/submission.py b/htsworkflow/submission/submission.py index 500f4e7..e4de00a 100644 --- a/htsworkflow/submission/submission.py +++ b/htsworkflow/submission/submission.py @@ -4,18 +4,18 @@ import logging import os import re -import RDF +from six.moves.urllib.error import HTTPError -from htsworkflow.util.rdfhelp import \ - dump_model, \ - fromTypedNode, \ - strip_namespace, \ - toTypedNode +from rdflib import Graph, Literal, Namespace, URIRef +from rdflib.namespace import RDF, RDFS + +from htsworkflow.util.rdfhelp import ( + dump_model, + strip_namespace, +) from htsworkflow.util.rdfns import ( dafTermOntology, libraryOntology, - rdfNS, - rdfsNS, submissionLog, submissionOntology, ) @@ -37,9 +37,9 @@ class Submission(object): self.model = model self.submissionSet = get_submission_uri(self.name) - self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#') - self.libraryNS = RDF.NS('{0}/library/'.format(host)) - self.flowcellNS = RDF.NS('{0}/flowcell/'.format(host)) + self.submissionSetNS = Namespace(str(self.submissionSet) + '#') + self.libraryNS = Namespace('{0}/library/'.format(host)) + self.flowcellNS = Namespace('{0}/flowcell/'.format(host)) self.__view_map = None @@ -92,38 +92,34 @@ class Submission(object): an_analysis_name = self.make_submission_name(analysis_dir) an_analysis = self.get_submission_node(analysis_dir) - file_classification = self.model.get_target(file_type, - rdfNS['type']) - if file_classification is None: + file_classifications = list(self.model.objects(file_type, RDF['type'])) + if len(file_classifications) == 0: errmsg = 'Could not find class for {0}' LOGGER.warning(errmsg.format(str(file_type))) return - - self.model.add_statement( - RDF.Statement(self.submissionSetNS[''], - submissionOntology['has_submission'], - an_analysis)) - self.model.add_statement(RDF.Statement(an_analysis, - submissionOntology['name'], - toTypedNode(an_analysis_name))) - self.model.add_statement( - RDF.Statement(an_analysis, - rdfNS['type'], - submissionOntology['submission'])) - self.model.add_statement(RDF.Statement(an_analysis, - submissionOntology['library'], - libNode)) + file_classification = file_classifications[0] + + self.model.add((self.submissionSetNS[''], + submissionOntology['has_submission'], + an_analysis)) + self.model.add((an_analysis, + submissionOntology['name'], + Literal(an_analysis_name))) + self.model.add((an_analysis, + RDF['type'], + submissionOntology['submission'])) + self.model.add((an_analysis, + submissionOntology['library'], + libNode)) LOGGER.debug("Adding statements to {0}".format(str(an_analysis))) # add track specific information - self.model.add_statement( - RDF.Statement(an_analysis, - dafTermOntology['paired'], - toTypedNode(self._is_paired(libNode)))) - self.model.add_statement( - RDF.Statement(an_analysis, - dafTermOntology['submission'], - an_analysis)) + self.model.add((an_analysis, + dafTermOntology['paired'], + Literal(self._is_paired(libNode)))) + self.model.add((an_analysis, + dafTermOntology['submission'], + an_analysis)) # add file specific information fileNode = self.make_file_node(pathname, an_analysis) @@ -132,14 +128,12 @@ class Submission(object): self.add_read_length(filename, fileNode, analysis_dir) self.add_fastq_metadata(filename, fileNode) self.add_label(file_type, fileNode, libNode) - self.model.add_statement( - RDF.Statement(fileNode, - rdfNS['type'], - file_type)) - self.model.add_statement( - RDF.Statement(fileNode, - libraryOntology['library'], - libNode)) + self.model.add((fileNode, + RDF['type'], + file_type)) + self.model.add((fileNode, + libraryOntology['library'], + libNode)) LOGGER.debug("Done.") @@ -149,19 +143,16 @@ class Submission(object): # add file specific information path, filename = os.path.split(pathname) pathname = os.path.abspath(pathname) - fileNode = RDF.Node(RDF.Uri('file://'+ pathname)) - self.model.add_statement( - RDF.Statement(submissionNode, - dafTermOntology['has_file'], - fileNode)) - self.model.add_statement( - RDF.Statement(fileNode, - dafTermOntology['filename'], - filename)) - self.model.add_statement( - RDF.Statement(fileNode, - dafTermOntology['relative_path'], - os.path.relpath(pathname))) + fileNode = URIRef('file://'+ pathname) + self.model.add((submissionNode, + dafTermOntology['has_file'], + fileNode)) + self.model.add((fileNode, + dafTermOntology['filename'], + Literal(filename))) + self.model.add((fileNode, + dafTermOntology['relative_path'], + Literal(os.path.relpath(pathname)))) return fileNode def add_md5s(self, filename, fileNode, analysis_dir): @@ -172,14 +163,12 @@ class Submission(object): errmsg = "Unable to produce md5sum for {0}" LOGGER.warning(errmsg.format(submission_pathname)) else: - self.model.add_statement( - RDF.Statement(fileNode, dafTermOntology['md5sum'], md5)) + self.model.add((fileNode, dafTermOntology['md5sum'], Literal(md5))) def add_file_size(self, filename, fileNode, analysis_dir): submission_pathname = os.path.join(analysis_dir, filename) file_size = os.stat(submission_pathname).st_size - self.model.add_statement( - RDF.Statement(fileNode, dafTermOntology['file_size'], toTypedNode(file_size))) + self.model.add((fileNode, dafTermOntology['file_size'], Literal(file_size))) LOGGER.debug("Updating file size: %d", file_size) def add_read_length(self, filename, fileNode, analysis_dir): @@ -188,11 +177,9 @@ class Submission(object): header = stream.readline().strip() sequence = stream.readline().strip() read_length = len(sequence) - self.model.add_statement( - RDF.Statement(fileNode, - libraryOntology['read_length'], - toTypedNode(read_length)) - ) + self.model.add((fileNode, + libraryOntology['read_length'], + Literal(read_length))) LOGGER.debug("Updating read length: %d", read_length) def add_fastq_metadata(self, filename, fileNode): @@ -211,21 +198,22 @@ class Submission(object): for file_term, model_term in terms: value = fqname.get(file_term) if value is not None: - s = RDF.Statement(fileNode, model_term, toTypedNode(value)) - self.model.append(s) + s = (fileNode, model_term, value.toPython()) + self.model.add(s) if 'flowcell' in fqname: value = self.flowcellNS[fqname['flowcell'] + '/'] - s = RDF.Statement(fileNode, libraryOntology['flowcell'], value) - self.model.append(s) + s = (fileNode, libraryOntology['flowcell'], value) + self.model.add(s) def add_label(self, file_type, file_node, lib_node): """Add rdfs:label to a file node """ #template_term = libraryOntology['label_template'] template_term = libraryOntology['label_template'] - label_template = self.model.get_target(file_type, template_term) - if label_template: + label_templates = list(self.model.objects(file_type, template_term)) + if len(label_templates) > 0: + label_template = label_templates[0] template = loader.get_template('submission_view_rdfs_label_metadata.sparql') context = Context({ 'library': str(lib_node.uri), @@ -233,34 +221,36 @@ class Submission(object): for r in self.execute_query(template, context): context = Context(r) label = Template(label_template).render(context) - s = RDF.Statement(file_node, rdfsNS['label'], unicode(label)) - self.model.append(s) + s = (file_node, rdfsNS['label'], unicode(label)) + self.model.add(s) def _add_library_details_to_model(self, libNode): # attributes that can have multiple values set_attributes = set((libraryOntology['has_lane'], libraryOntology['has_mappings'], dafTermOntology['has_file'])) - parser = RDF.Parser(name='rdfa') + tmpmodel = Graph() try: - new_statements = parser.parse_as_stream(libNode.uri) - except RDF.RedlandError as e: + tmpmodel.parse(source=libNode, format='rdfa') + except HTTPError as e: LOGGER.error(e) return - LOGGER.debug("Scanning %s", str(libNode.uri)) + + LOGGER.debug("Scanning %s", str(libNode)) toadd = [] - for s in new_statements: + for stmt in tmpmodel: + s, p, o = stmt # always add "collections" - if s.predicate in set_attributes: - toadd.append(s) + if p in set_attributes: + toadd.append(stmt) continue # don't override things we already have in the model - targets = list(self.model.get_targets(s.subject, s.predicate)) + targets = list(self.model.get_targets(s, p)) if len(targets) == 0: - toadd.append(s) + toadd.append(stmt) - for s in toadd: - self.model.append(s) + for stmt in toadd: + self.model.add(stmt) self._add_lane_details(libNode) self._add_flowcell_details() @@ -268,32 +258,22 @@ class Submission(object): def _add_lane_details(self, libNode): """Import lane details """ - query = RDF.Statement(libNode, libraryOntology['has_lane'], None) + query = (libNode, libraryOntology['has_lane'], None) lanes = [] - for lane_stmt in self.model.find_statements(query): - lanes.append(lane_stmt.object) + for lane_stmt in self.model.triples(query): + lanes.append(lane_stmt[2]) - parser = RDF.Parser(name='rdfa') for lane in lanes: - LOGGER.debug("Importing %s" % (lane.uri,)) - try: - parser.parse_into_model(self.model, lane.uri) - except RDF.RedlandError as e: - LOGGER.error("Error accessing %s" % (lane.uri,)) - raise e + LOGGER.debug("Importing %s" % (lane,)) + self.model.parse(source=lane, format='rdfa') def _add_flowcell_details(self): template = loader.get_template('aws_flowcell.sparql') - parser = RDF.Parser(name='rdfa') for r in self.execute_query(template, Context()): flowcell = r['flowcell'] - try: - parser.parse_into_model(self.model, flowcell.uri) - except RDF.RedlandError as e: - LOGGER.error("Error accessing %s" % (str(flowcell))) - raise e + self.model.parse(source=flowcell, format='rdfa') def find_best_match(self, filename): @@ -322,13 +302,12 @@ class Submission(object): return a dictionary of compiled regular expressions to view names """ - filename_query = RDF.Statement( - None, dafTermOntology['filename_re'], None) + filename_query = (None, dafTermOntology['filename_re'], None) patterns = {} - for s in self.model.find_statements(filename_query): - view_name = s.subject - literal_re = s.object.literal_value['string'] + for s in self.model.triples(filename_query): + view_name = s[0] + literal_re = s[2].value LOGGER.debug("Found: %s" % (literal_re,)) try: filename_re = re.compile(literal_re) @@ -352,10 +331,12 @@ class Submission(object): return self.submissionSetNS[submission_name] def _get_library_attribute(self, libNode, attribute): - if not isinstance(attribute, RDF.Node): + if not isinstance(libNode, URIRef): + raise ValueError("libNode must be a URIRef") + if not isinstance(attribute, URIRef): attribute = libraryOntology[attribute] - targets = list(self.model.get_targets(libNode, attribute)) + targets = list(self.model.objects(libNode, attribute)) if len(targets) > 0: return self._format_library_attribute(targets) else: @@ -368,7 +349,7 @@ class Submission(object): # we don't know anything about this attribute self._add_library_details_to_model(libNode) - targets = list(self.model.get_targets(libNode, attribute)) + targets = list(self.model.objects(libNode, attribute)) if len(targets) > 0: return self._format_library_attribute(targets) @@ -378,9 +359,9 @@ class Submission(object): if len(targets) == 0: return None elif len(targets) == 1: - return fromTypedNode(targets[0]) + return targets[0].toPython() elif len(targets) > 1: - return [fromTypedNode(t) for t in targets] + return [t.toPython() for t in targets] def _is_paired(self, libNode): """Determine if a library is paired end""" @@ -413,13 +394,12 @@ class Submission(object): """ formatted_query = template.render(context) LOGGER.debug(formatted_query) - query = RDF.SPARQLQuery(str(formatted_query)) - rdfstream = query.execute(self.model) + rdfstream = self.model.query(str(formatted_query)) results = [] for record in rdfstream: d = {} for key, value in record.items(): - d[key] = fromTypedNode(value) + d[key] = value.toPython() results.append(d) return results @@ -433,10 +413,10 @@ def list_submissions(model): select distinct ?submission where { ?submission subns:has_submission ?library_dir } """ - q = RDF.Statement(None, submissionOntology['has_submission'], None) + q = (None, submissionOntology['has_submission'], None) submissions = set() - for statement in model.find_statements(q): - s = strip_namespace(submissionLog, statement.subject) + for statement in model.triples(q): + s = strip_namespace(submissionLog, statement[0]) if s[-1] in ['#', '/', '?']: s = s[:-1] submissions.add(s) diff --git a/htsworkflow/submission/test/submission_test_common.py b/htsworkflow/submission/test/submission_test_common.py index 47b6ed3..ecd5ab5 100644 --- a/htsworkflow/submission/test/submission_test_common.py +++ b/htsworkflow/submission/test/submission_test_common.py @@ -1,11 +1,10 @@ """Code shared between test cases. """ -import RDF import logging import os import tempfile -import htsworkflow.util.rdfhelp +from htsworkflow.util.rdfhelp import get_turtle_header S1_NAME = '1000-sample' S2_NAME = '2000-sample' SCOMBINED_NAME = 'directory' @@ -28,7 +27,7 @@ SCOMBINED_FILES = [ os.path.join(SCOMBINED_NAME, 's2_l4.read2.fastq'), ] -TURTLE_PREFIX = htsworkflow.util.rdfhelp.get_turtle_header() +TURTLE_PREFIX = get_turtle_header() S1_TURTLE = TURTLE_PREFIX + """ @@ -67,13 +66,12 @@ class MockAddDetails(object): self.add_turtle(turtle) def add_turtle(self, turtle): - parser = RDF.Parser('turtle') - parser.parse_string_into_model(self.model, turtle, "http://localhost") + self.model.parse(data=turtle, format='turtle', publicID="http://localhost") def __call__(self, libNode): - q = RDF.Statement(libNode, None, None) + q = (libNode, None, None) found = False - for s in self.model.find_statements(q): + for s in self.model.triples(q): found = True break assert found diff --git a/htsworkflow/submission/test/test_submission.py b/htsworkflow/submission/test/test_submission.py index ed2519a..1ededc5 100644 --- a/htsworkflow/submission/test/test_submission.py +++ b/htsworkflow/submission/test/test_submission.py @@ -3,11 +3,11 @@ import os import shutil from unittest import TestCase, TestSuite, defaultTestLoader +from rdflib import Graph, Namespace + from htsworkflow.util.rdfhelp import \ dafTermOntology, \ - get_turtle_header, \ - load_string_into_model, \ - get_model + get_turtle_header from htsworkflow.submission.submission import list_submissions, Submission from htsworkflow.submission.results import ResultMap from .submission_test_common import ( @@ -19,34 +19,31 @@ from .submission_test_common import ( MockAddDetails, ) -import RDF #import logging #logging.basicConfig(level=logging.DEBUG) class TestSubmissionModule(TestCase): def test_empty_list_submission(self): - model = get_model() + model = Graph() self.assertEqual(len(list(list_submissions(model))), 0) def test_one_submission(self): - model = get_model() - load_string_into_model(model, "turtle", - """ + model = Graph() + model.parse(data=""" @prefix subns: . @prefix test: . subns:has_submission test:lib1 ; subns:has_submission test:lib2. - """) + """, format='turtle') submissions = list(list_submissions(model)) self.assertEqual(len(submissions), 1) self.assertEqual(submissions[0], "test") def test_two_submission(self): - model = get_model() - load_string_into_model(model, "turtle", - """ + model = Graph() + model.parse(data=""" @prefix subns: . @prefix test: . @@ -54,7 +51,7 @@ class TestSubmissionModule(TestCase): subns:has_submission test:lib1 . subns:has_submission test:lib2 . - """) + """, format="turtle") submissions = list(list_submissions(model)) self.assertEqual(len(submissions), 2) truth = set(["test1", "test2"]) @@ -66,7 +63,7 @@ class TestSubmissionModule(TestCase): class TestSubmission(TestCase): def setUp(self): generate_sample_results_tree(self, 'submission_test') - self.model = get_model() + self.model = Graph() def tearDown(self): shutil.rmtree(self.tempdir) @@ -76,12 +73,12 @@ class TestSubmission(TestCase): self.assertEqual(str(s.submissionSet), "http://jumpgate.caltech.edu/wiki/SubmissionsLog/foo") self.assertEqual(str(s.submissionSetNS['']), - str(RDF.NS(str(s.submissionSet) + '#')[''])) + str(Namespace(str(s.submissionSet) + '#')[''])) self.assertEqual(str(s.libraryNS['']), - str(RDF.NS('http://localhost/library/')[''])) + str(Namespace('http://localhost/library/')[''])) def test_scan_submission_dirs(self): - turtle = get_turtle_header() + """ + turtle = get_turtle_header() + r""" @prefix thisView: . thisView:Fastq ucscDaf:filename_re ".*[^12]\\.fastq$" ; a geoSoft:raw ; @@ -101,18 +98,18 @@ thisView:alignments ucscDaf:filename_re ".*\\.bam$" ; ucscDaf:output_type "alignments" . """ - map = ResultMap() - map['1000'] = os.path.join(self.sourcedir, S1_NAME) - map['2000'] = os.path.join(self.sourcedir, S2_NAME) + resultmap = ResultMap() + resultmap['1000'] = os.path.join(self.sourcedir, S1_NAME) + resultmap['2000'] = os.path.join(self.sourcedir, S2_NAME) s = Submission('foo', self.model, 'http://localhost') mock = MockAddDetails(self.model, turtle) mock.add_turtle(S1_TURTLE) mock.add_turtle(S2_TURTLE) - s._add_library_details_to_model = mock - s.scan_submission_dirs(map) + #s._add_library_details_to_model(mock) + s.scan_submission_dirs(resultmap) - nodes = list(s.analysis_nodes(map)) + nodes = list(s.analysis_nodes(resultmap)) self.assertEqual(len(nodes), 2) expected = set(( 'http://jumpgate.caltech.edu/wiki/SubmissionsLog/foo#1000-sample', @@ -122,7 +119,7 @@ thisView:alignments ucscDaf:filename_re ".*\\.bam$" ; self.assertEqual(expected, got) def test_find_best_match(self): - turtle = get_turtle_header() + """ + turtle = get_turtle_header() + r""" @prefix thisView: . thisView:Fastq ucscDaf:filename_re ".*[^12]\\.fastq\\.bz2$" ; a geoSoft:raw ; @@ -142,9 +139,9 @@ thisView:alignments ucscDaf:filename_re ".*\\.bam$" ; ucscDaf:output_type "alignments" . """ - load_string_into_model(self.model, 'turtle', turtle) + self.model.parse(data=turtle, format='turtle') s = Submission('foo', self.model, 'http://localhost') - q = RDF.Statement(None, dafTermOntology['filename_re'], None) + q = (None, dafTermOntology['filename_re'], None) view_map = s._get_filename_view_map() self.assertEqual(len(view_map), 4) -- 2.30.2