From 3842b2c1505bf20afd802a664b6fa15e7a2036e9 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Fri, 31 Aug 2012 12:08:08 -0700 Subject: [PATCH] Allow option to save/restore a sequence class to a RDF model. (After doing this I started having dreams of some set of mixins designed to persist data into different types of storage). I also renamed the sql save to indicate that its going to a SQL database. Also I renamed one of my simplify Uris to stripNamespace to make it clearer what it was actually doing. simplify_uri just returns the end of a uri -- regardless of type. stripNamespace removes a specific namespave from a uri. --- htsworkflow/pipelines/sequences.py | 184 +++++++++++++++++- htsworkflow/pipelines/test/test_sequences.py | 186 +++++++++++++++++-- htsworkflow/submission/geo.py | 4 +- htsworkflow/util/rdfhelp.py | 16 +- htsworkflow/util/test/test_rdfhelp.py | 38 +++- 5 files changed, 403 insertions(+), 25 deletions(-) diff --git a/htsworkflow/pipelines/sequences.py b/htsworkflow/pipelines/sequences.py index 0e5612a..462c034 100644 --- a/htsworkflow/pipelines/sequences.py +++ b/htsworkflow/pipelines/sequences.py @@ -6,6 +6,13 @@ import logging import os import types import re +import sys +from urlparse import urljoin, urlparse + +import RDF +from htsworkflow.util.rdfhelp import libraryOntology as libNS +from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \ + stripNamespace, dump_model, simplify_uri LOGGER = logging.getLogger(__name__) @@ -122,7 +129,7 @@ class SequenceFile(object): # information and thus are unique so we don't have to do anything return os.path.join(root, basename) - def save(self, cursor): + def save_to_sql(self, cursor): """ Add this entry to a DB2.0 database. """ @@ -148,6 +155,93 @@ class SequenceFile(object): return cursor.execute(sql, sql_values) + def save_to_model(self, model, base_url=None): + def add_lit(model, s, p, o): + if o is not None: + model.add_statement(RDF.Statement(s, p, toTypedNode(o))) + def add(model, s, p, o): + model.add_statement(RDF.Statement(s,p,o)) + fileNode = RDF.Node(RDF.Uri('file://' + os.path.abspath(self.path))) + add(model, fileNode, rdfNS['type'], libNS['raw_file']) + add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell) + add_lit(model, fileNode, libNS['lane_number'], self.lane) + if self.read is not None: + add_lit(model, fileNode, libNS['read'], self.read) + else: + add_lit(model, fileNode, libNS['read'], '') + + add_lit(model, fileNode, libNS['library_id'], self.project) + add_lit(model, fileNode, libNS['multiplex_index'], self.index) + add_lit(model, fileNode, libNS['split_id'], self.split) + add_lit(model, fileNode, libNS['cycle'], self.cycle) + add_lit(model, fileNode, libNS['passed_filter'], self.pf) + add(model, fileNode, rdfNS['type'], libNS[self.filetype]) + + if base_url is not None: + flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format( + base=base_url, + flowcell=self.flowcell))) + add(model, fileNode, libNS['flowcell'], flowcell) + if self.project is not None: + library = RDF.Node(RDF.Uri("{base}/library/{library}".format( + base=base_url, + library=self.project))) + add(model, fileNode, libNS['library'], library) + + + @classmethod + def load_from_model(cls, model, seq_id): + def get(s, p): + values = [] + stmts = model.find_statements(RDF.Statement(s, p, None)) + for s in stmts: + obj = s.object + if not obj.is_resource(): + values.append(fromTypedNode(obj)) + else: + values.append(obj) + return values + def get_one(s, p): + values = get(s, p) + if len(values) > 1: + errmsg = u"To many values for %s %s" + raise ValueError(errmsg % (unicode(s), unicode(p))) + elif len(values) == 1: + return values[0] + else: + return None + + if not isinstance(seq_id, RDF.Node): + seq_id = RDF.Node(RDF.Uri(seq_id)) + seqTypesStmt = RDF.Statement(seq_id, rdfNS['type'], None) + seqTypes = model.find_statements(seqTypesStmt) + isSequenceFile = False + for s in seqTypes: + if s.object == libNS['raw_file']: + isSequenceFile = True + else: + seq_type = stripNamespace(libNS, s.object) + + if not isSequenceFile: + raise KeyError(u"%s not found" % (unicode(seq_id),)) + + path = urlparse(str(seq_id.uri)).path + flowcellNode = get_one(seq_id, libNS['flowcell']) + flowcell = get_one(seq_id, libNS['flowcell_id']) + lane = get_one(seq_id, libNS['lane_number']) + read = get_one(seq_id, libNS['read']) + + obj = cls(seq_type, path, flowcell, lane) + obj.read = read if read != '' else None + obj.project = get_one(seq_id, libNS['library_id']) + obj.index = get_one(seq_id, libNS['multiplex_index']) + obj.split = get_one(seq_id, libNS['split_id']) + obj.cycle = get_one(seq_id, libNS['cycle'] ) + obj.pf = get_one(seq_id, libNS['passed_filter']) + obj.libraryNode = get_one(seq_id, libNS['library']) + return obj + + def get_flowcell_cycle(path): """ Extract flowcell, cycle from pathname @@ -316,3 +410,91 @@ def scan_for_sequences(dirs): LOGGER.debug("Found sequence at %s" % (f,)) return sequences + + +def update_model_sequence_library(model, base_url): + """Find sequence objects and add library information if its missing + """ + file_body = """ + prefix libNS: + select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library + where { + ?filenode a libNS:raw_file ; + libNS:flowcell_id ?flowcell_id ; + libNS:lane_number ?lane_id . + OPTIONAL { ?filenode libNS:flowcell ?flowcell . } + OPTIONAL { ?filenode libNS:library ?library .} + OPTIONAL { ?filenode libNS:library_id ?library_id .} + } + """ + file_query = RDF.SPARQLQuery(file_body) + files = file_query.execute(model) + + libraryNS = RDF.NS(urljoin(base_url, 'library/')) + flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/')) + for f in files: + filenode = f['filenode'] + lane_id = fromTypedNode(f['lane_id']) + if f['flowcell'] is None: + flowcell = flowcellNS[str(f['flowcell_id'])+'/'] + model.add_statement( + RDF.Statement(filenode, libNS['flowcell'], flowcell)) + else: + flowcell = f['flowcell'] + + if f['library'] is None: + if f['library_id'] is not None: + library = libraryNS[str(f['library_id']) + '/'] + else: + library = guess_library_from_model(model, base_url, + flowcell, + lane_id) + library_id = toTypedNode(simplify_uri(library)) + model.add_statement( + RDF.Statement(filenode, libNS['library_id'], library_id)) + if library is not None: + model.add_statement( + RDF.Statement(filenode, libNS['library'], library)) + + +def guess_library_from_model(model, base_url, flowcell, lane_id): + """Attempt to find library URI + """ + flowcellNode = RDF.Node(flowcell) + flowcell = str(flowcell.uri) + lane_body = """ + prefix libNS: + prefix rdf: + prefix xsd: + + select ?library ?lane + where {{ + <{flowcell}> libNS:has_lane ?lane ; + a libNS:illumina_flowcell . + ?lane libNS:lane_number {lane_id} ; + libNS:library ?library . + }} + """ + lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id) + lanes = [] + tries = 3 + while len(lanes) == 0 and tries > 0: + tries -= 1 + lane_query = RDF.SPARQLQuery(lane_body) + lanes = [ l for l in lane_query.execute(model)] + if len(lanes) > 1: + # CONFUSED! + errmsg = "Too many libraries for flowcell {flowcell} "\ + "lane {lane} = {count}" + LOGGER.error(errmsg.format(flowcell=flowcell, + lane=lane_id, + count=len(lanes))) + return None + elif len(lanes) == 1: + # success + return lanes[0]['library'] + else: + # try grabbing data + model.load(flowcellNode.uri, name="rdfa") + + diff --git a/htsworkflow/pipelines/test/test_sequences.py b/htsworkflow/pipelines/test/test_sequences.py index 7bbcc2e..d051c03 100644 --- a/htsworkflow/pipelines/test/test_sequences.py +++ b/htsworkflow/pipelines/test/test_sequences.py @@ -4,8 +4,11 @@ import shutil import tempfile import unittest -from htsworkflow.pipelines import sequences +import RDF +from htsworkflow.pipelines import sequences +from htsworkflow.util.rdfhelp import get_model, load_string_into_model, \ + rdfNS, libraryOntology, dump_model, fromTypedNode class SequenceFileTests(unittest.TestCase): """ @@ -294,15 +297,8 @@ class SequenceFileTests(unittest.TestCase): self.assertEqual(f.make_target_name('/tmp'), '/tmp/42BW9AAXX_152_s_4_1_eland_extended.txt.bz2') - def test_sql(self): - """ - Make sure that the quick and dirty sql interface in sequences works - """ - import sqlite3 - db = sqlite3.connect(":memory:") - c = db.cursor() - sequences.create_sequence_table(c) - + def _generate_sequences(self): + seqs = [] data = [('/root/42BW9AAXX/C1-152', 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r1.tar.bz2'), ('/root/42BW9AAXX/C1-152', @@ -313,12 +309,176 @@ class SequenceFileTests(unittest.TestCase): 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r21.tar.bz2'),] for path, name in data: - seq = sequences.parse_qseq(path, name) - seq.save(c) + seqs.append(sequences.parse_qseq(path, name)) + + path = '/root/42BW9AAXX/C1-38/Project_12345' + name = '12345_AAATTT_L003_R1_001.fastq.gz' + pathname = os.path.join(path,name) + seqs.append(sequences.parse_fastq(path, name)) + self.assertEqual(len(seqs), 5) + return seqs + + + def test_sql(self): + """ + Make sure that the quick and dirty sql interface in sequences works + """ + import sqlite3 + db = sqlite3.connect(":memory:") + c = db.cursor() + sequences.create_sequence_table(c) + + for seq in self._generate_sequences(): + seq.save_to_sql(c) count = c.execute("select count(*) from sequences") row = count.fetchone() - self.assertEqual(row[0], 4) + self.assertEqual(row[0], 5) + + def test_basic_rdf_scan(self): + """Make sure we can save to RDF model""" + import RDF + model = get_model() + + for seq in self._generate_sequences(): + seq.save_to_model(model) + + files = list(model.find_statements( + RDF.Statement(None, rdfNS['type'], libraryOntology['raw_file']))) + self.assertEqual(len(files), 5) + files = list(model.find_statements( + RDF.Statement(None, rdfNS['type'], libraryOntology['qseq']))) + self.assertEqual(len(files), 4) + files = list(model.find_statements( + RDF.Statement(None, rdfNS['type'], libraryOntology['split_fastq']))) + self.assertEqual(len(files), 1) + + files = list(model.find_statements( + RDF.Statement(None, libraryOntology['library_id'], None))) + self.assertEqual(len(files), 1) + + files = list(model.find_statements( + RDF.Statement(None, libraryOntology['flowcell_id'], None))) + self.assertEqual(len(files), 5) + + files = list(model.find_statements( + RDF.Statement(None, libraryOntology['flowcell'], None))) + self.assertEqual(len(files), 0) + + files = list(model.find_statements( + RDF.Statement(None, libraryOntology['library'], None))) + self.assertEqual(len(files), 0) + + def test_rdf_scan_with_url(self): + """Make sure we can save to RDF model""" + import RDF + model = get_model() + base_url = 'http://localhost' + for seq in self._generate_sequences(): + seq.save_to_model(model, base_url=base_url) + localFC = RDF.NS(base_url + '/flowcell/') + localLibrary = RDF.NS(base_url + '/library/') + + files = list(model.find_statements( + RDF.Statement(None, libraryOntology['flowcell'], None))) + self.assertEqual(len(files), 5) + for f in files: + self.assertEqual(f.object, localFC['42BW9AAXX/']) + + files = list(model.find_statements( + RDF.Statement(None, libraryOntology['library'], None))) + self.assertEqual(len(files), 1) + self.assertEqual(files[0].object, localLibrary['12345']) + + def test_rdf_fixup_library(self): + """Make sure we can save to RDF model""" + base_url = 'http://localhost' + localLibrary = RDF.NS(base_url + '/library/') + + flowcellInfo = """@prefix libns: . + +<{base}/flowcell/42BW9AAXX/> + libns:flowcell_id "42BW9AXX"@en ; + libns:has_lane <{base}/lane/1169>, <{base}/lane/1170>, + <{base}/lane/1171>, <{base}/lane/1172> ; + libns:read_length 75 ; + a libns:illumina_flowcell . + +<{base}/lane/1169> + libns:lane_number 1 ; libns:library <{base}/library/10923/> . +<{base}/lane/1170> + libns:lane_number 2 ; libns:library <{base}/library/10924/> . +<{base}/lane/1171> + libns:lane_number 3 ; libns:library <{base}/library/12345/> . +<{base}/lane/1172> + libns:lane_number 3 ; libns:library <{base}/library/10930/> . +""".format(base=base_url) + model = get_model() + load_string_into_model(model, 'turtle', flowcellInfo) + for seq in self._generate_sequences(): + seq.save_to_model(model) + f = sequences.update_model_sequence_library(model, base_url=base_url) + + libTerm = libraryOntology['library'] + libIdTerm = libraryOntology['library_id'] + + url = 'file:///root/42BW9AAXX/C1-152/woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2' + nodes = list(model.get_targets(RDF.Uri(url), libTerm)) + self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0], localLibrary['10923/']) + nodes = list(model.get_targets(RDF.Uri(url), libIdTerm)) + self.assertEqual(len(nodes), 1) + self.assertEqual(fromTypedNode(nodes[0]), '10923') + + url = 'file:///root/42BW9AAXX/C1-152/woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r1.tar.bz2' + nodes = list(model.get_targets(RDF.Uri(url), libTerm)) + self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0], localLibrary['10924/']) + nodes = list(model.get_targets(RDF.Uri(url), libIdTerm)) + self.assertEqual(len(nodes), 1) + self.assertEqual(fromTypedNode(nodes[0]), '10924') + + url = 'file:///root/42BW9AAXX/C1-38/Project_12345/12345_AAATTT_L003_R1_001.fastq.gz' + nodes = list(model.get_targets(RDF.Uri(url), libTerm)) + self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0], localLibrary['12345/']) + nodes = list(model.get_targets(RDF.Uri(url), libIdTerm)) + self.assertEqual(len(nodes), 1) + self.assertEqual(fromTypedNode(nodes[0]), '12345') + + def test_load_from_model(self): + """Can we round trip through a RDF model""" + model = get_model() + path = '/root/42BW9AAXX/C1-38/Project_12345/' + filename = '12345_AAATTT_L003_R1_001.fastq.gz' + seq = sequences.parse_fastq(path, filename) + seq.save_to_model(model) + + seq_id = 'file://'+path+filename + seqNode = RDF.Node(RDF.Uri(seq_id)) + libNode = RDF.Node(RDF.Uri('http://localhost/library/12345')) + model.add_statement( + RDF.Statement(seqNode, libraryOntology['library'], libNode)) + seq2 = sequences.SequenceFile.load_from_model(model, seq_id) + + self.assertEqual(seq.flowcell, seq2.flowcell) + self.assertEqual(seq.flowcell, '42BW9AAXX') + self.assertEqual(seq.filetype, seq2.filetype) + self.assertEqual(seq2.filetype, 'split_fastq') + self.assertEqual(seq.lane, seq2.lane) + self.assertEqual(seq2.lane, 3) + self.assertEqual(seq.read, seq2.read) + self.assertEqual(seq2.read, 1) + self.assertEqual(seq.project, seq2.project) + self.assertEqual(seq2.project, '12345') + self.assertEqual(seq.index, seq2.index) + self.assertEqual(seq2.index, 'AAATTT') + self.assertEqual(seq.split, seq2.split) + self.assertEqual(seq2.split, '001') + self.assertEqual(seq.cycle, seq2.cycle) + self.assertEqual(seq.pf, seq2.pf) + self.assertEqual(seq2.libraryNode, libNode) + self.assertEqual(seq.path, seq2.path) def test_scan_for_sequences(self): # simulate tree diff --git a/htsworkflow/submission/geo.py b/htsworkflow/submission/geo.py index a3ac2f1..6137875 100644 --- a/htsworkflow/submission/geo.py +++ b/htsworkflow/submission/geo.py @@ -8,7 +8,7 @@ from htsworkflow.submission.submission import Submission from htsworkflow.util.rdfhelp import \ fromTypedNode, \ geoSoftNS, \ - simplifyUri, \ + stripNamespace, \ submissionOntology from django.conf import settings @@ -203,7 +203,7 @@ class GEOSubmission(Submission): def query_to_soft_dictionary(self, results, heading): attributes = [] for r in results: - name = simplifyUri(geoSoftNS, r['name']) + name = stripNamespace(geoSoftNS, r['name']) if name is not None: if name.lower() == heading.lower(): name = '^' + name diff --git a/htsworkflow/util/rdfhelp.py b/htsworkflow/util/rdfhelp.py index 29df21e..73d32c6 100644 --- a/htsworkflow/util/rdfhelp.py +++ b/htsworkflow/util/rdfhelp.py @@ -203,6 +203,14 @@ def simplify_uri(uri): >>> simplify_uri('http://asdf.org/foo/bar?was=foo') 'was=foo' """ + if isinstance(uri, RDF.Node): + if uri.is_resource(): + uri = uri.uri + else: + raise ValueError("Can't simplify an RDF literal") + if isinstance(uri, RDF.Uri): + uri = str(uri) + parsed = urlparse(uri) if len(parsed.query) > 0: return parsed.query @@ -214,7 +222,7 @@ def simplify_uri(uri): return element raise ValueError("Unable to simplify %s" % (uri,)) -def simplifyUri(namespace, term): +def stripNamespace(namespace, term): """Remove the namespace portion of a term returns None if they aren't in common @@ -249,6 +257,12 @@ def get_model(model_name=None, directory=None): def load_into_model(model, parser_name, path, ns=None): + if isinstance(path, RDF.Node): + if path.is_resource(): + path = str(path.uri) + else: + raise ValueError("url to load can't be a RDF literal") + url_parts = list(urlparse(path)) if len(url_parts[0]) == 0: url_parts[0] = 'file' diff --git a/htsworkflow/util/test/test_rdfhelp.py b/htsworkflow/util/test/test_rdfhelp.py index 34c3200..2643aef 100644 --- a/htsworkflow/util/test/test_rdfhelp.py +++ b/htsworkflow/util/test/test_rdfhelp.py @@ -13,7 +13,8 @@ from htsworkflow.util.rdfhelp import \ load_string_into_model, \ rdfsNS, \ toTypedNode, \ - simplifyUri, \ + stripNamespace, \ + simplify_uri, \ sanitize_literal, \ xsdNS @@ -102,23 +103,44 @@ try: self.assertEqual(fromTypedNode(toTypedNode(long_datetime)), long_datetime) - def test_simplify_uri(self): + def test_strip_namespace_uri(self): nsOrg = RDF.NS('example.org/example#') nsCom = RDF.NS('example.com/example#') term = 'foo' node = nsOrg[term] - self.failUnlessEqual(simplifyUri(nsOrg, node), term) - self.failUnlessEqual(simplifyUri(nsCom, node), None) - self.failUnlessEqual(simplifyUri(nsOrg, node.uri), term) + self.failUnlessEqual(stripNamespace(nsOrg, node), term) + self.failUnlessEqual(stripNamespace(nsCom, node), None) + self.failUnlessEqual(stripNamespace(nsOrg, node.uri), term) - def test_simplify_uri_exceptions(self): + def test_strip_namespace_exceptions(self): nsOrg = RDF.NS('example.org/example#') nsCom = RDF.NS('example.com/example#') node = toTypedNode('bad') - self.failUnlessRaises(ValueError, simplifyUri, nsOrg, node) - self.failUnlessRaises(ValueError, simplifyUri, nsOrg, nsOrg) + self.failUnlessRaises(ValueError, stripNamespace, nsOrg, node) + self.failUnlessRaises(ValueError, stripNamespace, nsOrg, nsOrg) + + def test_simplify_uri(self): + DATA = [('http://asdf.org/foo/bar', 'bar'), + ('http://asdf.org/foo/bar#bleem', 'bleem'), + ('http://asdf.org/foo/bar/', 'bar'), + ('http://asdf.org/foo/bar?was=foo', 'was=foo')] + + for uri, expected in DATA: + self.assertEqual(simplify_uri(uri), expected) + + for uri, expected in DATA: + n = RDF.Uri(uri) + self.assertEqual(simplify_uri(n), expected) + + for uri, expected in DATA: + n = RDF.Node(RDF.Uri(uri)) + self.assertEqual(simplify_uri(n), expected) + + # decoding literals is questionable + n = toTypedNode('http://foo/bar') + self.assertRaises(ValueError, simplify_uri, n) def test_owl_import(self): path, name = os.path.split(__file__) -- 2.30.2