Allow option to save/restore a sequence class to a RDF model.
authorDiane Trout <diane@caltech.edu>
Fri, 31 Aug 2012 19:08:08 +0000 (12:08 -0700)
committerDiane Trout <diane@caltech.edu>
Fri, 31 Aug 2012 19:16:59 +0000 (12:16 -0700)
(After doing this I started having dreams of some set of mixins
designed to persist data into different types of storage).

I also renamed the sql save to indicate that its going to a SQL
database.

Also I renamed one of my simplify Uris to stripNamespace
to make it clearer what it was actually doing.

simplify_uri just returns the end of a uri -- regardless of type.
stripNamespace removes a specific namespave from a uri.

htsworkflow/pipelines/sequences.py
htsworkflow/pipelines/test/test_sequences.py
htsworkflow/submission/geo.py
htsworkflow/util/rdfhelp.py
htsworkflow/util/test/test_rdfhelp.py

index 0e5612a8e47b393684f9ee7f69cd779cca97c607..462c03460df700650aa82d81f0fff0c9fd9bd6b4 100644 (file)
@@ -6,6 +6,13 @@ import logging
 import os
 import types
 import re
+import sys
+from urlparse import urljoin, urlparse
+
+import RDF
+from htsworkflow.util.rdfhelp import libraryOntology as libNS
+from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
+     stripNamespace, dump_model, simplify_uri
 
 LOGGER = logging.getLogger(__name__)
 
@@ -122,7 +129,7 @@ class SequenceFile(object):
         # information and thus are unique so we don't have to do anything
         return os.path.join(root, basename)
 
-    def save(self, cursor):
+    def save_to_sql(self, cursor):
         """
         Add this entry to a DB2.0 database.
         """
@@ -148,6 +155,93 @@ class SequenceFile(object):
 
         return cursor.execute(sql, sql_values)
 
+    def save_to_model(self, model, base_url=None):
+        def add_lit(model, s, p, o):
+            if o is not None:
+                model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
+        def add(model, s, p, o):
+            model.add_statement(RDF.Statement(s,p,o))
+        fileNode = RDF.Node(RDF.Uri('file://' + os.path.abspath(self.path)))
+        add(model, fileNode, rdfNS['type'], libNS['raw_file'])
+        add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
+        add_lit(model, fileNode, libNS['lane_number'], self.lane)
+        if self.read is not None:
+            add_lit(model, fileNode, libNS['read'], self.read)
+        else:
+            add_lit(model, fileNode, libNS['read'], '')
+
+        add_lit(model, fileNode, libNS['library_id'], self.project)
+        add_lit(model, fileNode, libNS['multiplex_index'], self.index)
+        add_lit(model, fileNode, libNS['split_id'], self.split)
+        add_lit(model, fileNode, libNS['cycle'], self.cycle)
+        add_lit(model, fileNode, libNS['passed_filter'], self.pf)
+        add(model, fileNode, rdfNS['type'], libNS[self.filetype])
+
+        if base_url is not None:
+            flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
+                base=base_url,
+                flowcell=self.flowcell)))
+            add(model, fileNode, libNS['flowcell'], flowcell)
+            if self.project is not None:
+                library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
+                    base=base_url,
+                    library=self.project)))
+                add(model, fileNode, libNS['library'], library)
+
+
+    @classmethod
+    def load_from_model(cls, model, seq_id):
+        def get(s, p):
+            values = []
+            stmts = model.find_statements(RDF.Statement(s, p, None))
+            for s in stmts:
+                obj = s.object
+                if not obj.is_resource():
+                    values.append(fromTypedNode(obj))
+                else:
+                    values.append(obj)
+            return values
+        def get_one(s, p):
+            values = get(s, p)
+            if len(values) > 1:
+                errmsg = u"To many values for %s %s"
+                raise ValueError(errmsg % (unicode(s), unicode(p)))
+            elif len(values) == 1:
+                return values[0]
+            else:
+                return None
+
+        if not isinstance(seq_id, RDF.Node):
+            seq_id = RDF.Node(RDF.Uri(seq_id))
+        seqTypesStmt = RDF.Statement(seq_id, rdfNS['type'], None)
+        seqTypes = model.find_statements(seqTypesStmt)
+        isSequenceFile = False
+        for s in seqTypes:
+            if s.object == libNS['raw_file']:
+                isSequenceFile = True
+            else:
+                seq_type = stripNamespace(libNS, s.object)
+
+        if not isSequenceFile:
+            raise KeyError(u"%s not found" % (unicode(seq_id),))
+
+        path = urlparse(str(seq_id.uri)).path
+        flowcellNode = get_one(seq_id, libNS['flowcell'])
+        flowcell = get_one(seq_id, libNS['flowcell_id'])
+        lane = get_one(seq_id, libNS['lane_number'])
+        read = get_one(seq_id, libNS['read'])
+
+        obj = cls(seq_type, path, flowcell, lane)
+        obj.read = read if read != '' else None
+        obj.project = get_one(seq_id, libNS['library_id'])
+        obj.index = get_one(seq_id, libNS['multiplex_index'])
+        obj.split = get_one(seq_id, libNS['split_id'])
+        obj.cycle = get_one(seq_id, libNS['cycle'] )
+        obj.pf = get_one(seq_id, libNS['passed_filter'])
+        obj.libraryNode = get_one(seq_id, libNS['library'])
+        return obj
+
+
 def get_flowcell_cycle(path):
     """
     Extract flowcell, cycle from pathname
@@ -316,3 +410,91 @@ def scan_for_sequences(dirs):
                     LOGGER.debug("Found sequence at %s" % (f,))
 
     return sequences
+
+
+def update_model_sequence_library(model, base_url):
+    """Find sequence objects and add library information if its missing
+    """
+    file_body = """
+    prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+    select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
+    where {
+       ?filenode a libNS:raw_file ;
+                 libNS:flowcell_id ?flowcell_id ;
+                 libNS:lane_number ?lane_id .
+       OPTIONAL { ?filenode libNS:flowcell ?flowcell . }
+       OPTIONAL { ?filenode libNS:library ?library .}
+       OPTIONAL { ?filenode libNS:library_id ?library_id .}
+    }
+    """
+    file_query = RDF.SPARQLQuery(file_body)
+    files = file_query.execute(model)
+
+    libraryNS = RDF.NS(urljoin(base_url, 'library/'))
+    flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
+    for f in files:
+        filenode = f['filenode']
+        lane_id = fromTypedNode(f['lane_id'])
+        if f['flowcell'] is None:
+            flowcell = flowcellNS[str(f['flowcell_id'])+'/']
+            model.add_statement(
+                RDF.Statement(filenode, libNS['flowcell'], flowcell))
+        else:
+            flowcell = f['flowcell']
+
+        if f['library'] is None:
+            if f['library_id'] is not None:
+                library = libraryNS[str(f['library_id']) + '/']
+            else:
+                library = guess_library_from_model(model, base_url,
+                                                   flowcell,
+                                                   lane_id)
+                library_id = toTypedNode(simplify_uri(library))
+                model.add_statement(
+                    RDF.Statement(filenode, libNS['library_id'], library_id))
+            if library is not None:
+                model.add_statement(
+                    RDF.Statement(filenode, libNS['library'], library))
+
+
+def guess_library_from_model(model, base_url, flowcell, lane_id):
+    """Attempt to find library URI
+    """
+    flowcellNode = RDF.Node(flowcell)
+    flowcell = str(flowcell.uri)
+    lane_body = """
+    prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+    prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+    prefix xsd: <http://www.w3.org/2001/XMLSchema#>
+
+    select ?library ?lane
+    where {{
+      <{flowcell}> libNS:has_lane ?lane ;
+                   a libNS:illumina_flowcell .
+      ?lane libNS:lane_number {lane_id} ;
+            libNS:library ?library .
+    }}
+    """
+    lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
+    lanes = []
+    tries = 3
+    while len(lanes) == 0 and tries > 0:
+        tries -= 1
+        lane_query = RDF.SPARQLQuery(lane_body)
+        lanes = [ l for l in lane_query.execute(model)]
+        if len(lanes) > 1:
+            # CONFUSED!
+            errmsg = "Too many libraries for flowcell {flowcell} "\
+                     "lane {lane} = {count}"
+            LOGGER.error(errmsg.format(flowcell=flowcell,
+                                       lane=lane_id,
+                                       count=len(lanes)))
+            return None
+        elif len(lanes) == 1:
+            # success
+            return lanes[0]['library']
+        else:
+            # try grabbing data
+            model.load(flowcellNode.uri, name="rdfa")
+
+
index 7bbcc2e762f6361b1fd9ba97165e6d22c241e1a5..d051c036ca2eddda6ffbbc1cc3f1c1f31bb5fcea 100644 (file)
@@ -4,8 +4,11 @@ import shutil
 import tempfile
 import unittest
 
-from htsworkflow.pipelines import sequences
+import RDF
 
+from htsworkflow.pipelines import sequences
+from htsworkflow.util.rdfhelp import get_model, load_string_into_model, \
+     rdfNS, libraryOntology, dump_model, fromTypedNode
 
 class SequenceFileTests(unittest.TestCase):
     """
@@ -294,15 +297,8 @@ class SequenceFileTests(unittest.TestCase):
         self.assertEqual(f.make_target_name('/tmp'),
                          '/tmp/42BW9AAXX_152_s_4_1_eland_extended.txt.bz2')
 
-    def test_sql(self):
-        """
-        Make sure that the quick and dirty sql interface in sequences works
-        """
-        import sqlite3
-        db = sqlite3.connect(":memory:")
-        c = db.cursor()
-        sequences.create_sequence_table(c)
-
+    def _generate_sequences(self):
+        seqs = []
         data = [('/root/42BW9AAXX/C1-152',
                 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r1.tar.bz2'),
                 ('/root/42BW9AAXX/C1-152',
@@ -313,12 +309,176 @@ class SequenceFileTests(unittest.TestCase):
                 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r21.tar.bz2'),]
 
         for path, name in data:
-            seq = sequences.parse_qseq(path, name)
-            seq.save(c)
+            seqs.append(sequences.parse_qseq(path, name))
+
+        path = '/root/42BW9AAXX/C1-38/Project_12345'
+        name = '12345_AAATTT_L003_R1_001.fastq.gz'
+        pathname = os.path.join(path,name)
+        seqs.append(sequences.parse_fastq(path, name))
+        self.assertEqual(len(seqs), 5)
+        return seqs
+
+
+    def test_sql(self):
+        """
+        Make sure that the quick and dirty sql interface in sequences works
+        """
+        import sqlite3
+        db = sqlite3.connect(":memory:")
+        c = db.cursor()
+        sequences.create_sequence_table(c)
+
+        for seq in self._generate_sequences():
+            seq.save_to_sql(c)
 
         count = c.execute("select count(*) from sequences")
         row = count.fetchone()
-        self.assertEqual(row[0], 4)
+        self.assertEqual(row[0], 5)
+
+    def test_basic_rdf_scan(self):
+        """Make sure we can save to RDF model"""
+        import RDF
+        model = get_model()
+
+        for seq in self._generate_sequences():
+            seq.save_to_model(model)
+
+        files = list(model.find_statements(
+            RDF.Statement(None, rdfNS['type'], libraryOntology['raw_file'])))
+        self.assertEqual(len(files), 5)
+        files = list(model.find_statements(
+            RDF.Statement(None, rdfNS['type'], libraryOntology['qseq'])))
+        self.assertEqual(len(files), 4)
+        files = list(model.find_statements(
+            RDF.Statement(None, rdfNS['type'], libraryOntology['split_fastq'])))
+        self.assertEqual(len(files), 1)
+
+        files = list(model.find_statements(
+            RDF.Statement(None, libraryOntology['library_id'], None)))
+        self.assertEqual(len(files), 1)
+
+        files = list(model.find_statements(
+            RDF.Statement(None, libraryOntology['flowcell_id'], None)))
+        self.assertEqual(len(files), 5)
+
+        files = list(model.find_statements(
+            RDF.Statement(None, libraryOntology['flowcell'], None)))
+        self.assertEqual(len(files), 0)
+
+        files = list(model.find_statements(
+            RDF.Statement(None, libraryOntology['library'], None)))
+        self.assertEqual(len(files), 0)
+
+    def test_rdf_scan_with_url(self):
+        """Make sure we can save to RDF model"""
+        import RDF
+        model = get_model()
+        base_url = 'http://localhost'
+        for seq in self._generate_sequences():
+            seq.save_to_model(model, base_url=base_url)
+        localFC = RDF.NS(base_url + '/flowcell/')
+        localLibrary = RDF.NS(base_url + '/library/')
+
+        files = list(model.find_statements(
+            RDF.Statement(None, libraryOntology['flowcell'], None)))
+        self.assertEqual(len(files), 5)
+        for f in files:
+            self.assertEqual(f.object, localFC['42BW9AAXX/'])
+
+        files = list(model.find_statements(
+            RDF.Statement(None, libraryOntology['library'], None)))
+        self.assertEqual(len(files), 1)
+        self.assertEqual(files[0].object, localLibrary['12345'])
+
+    def test_rdf_fixup_library(self):
+        """Make sure we can save to RDF model"""
+        base_url = 'http://localhost'
+        localLibrary = RDF.NS(base_url + '/library/')
+
+        flowcellInfo = """@prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#> .
+
+<{base}/flowcell/42BW9AAXX/>
+    libns:flowcell_id "42BW9AXX"@en ;
+    libns:has_lane <{base}/lane/1169>, <{base}/lane/1170>,
+                   <{base}/lane/1171>, <{base}/lane/1172> ;
+    libns:read_length 75 ;
+    a libns:illumina_flowcell .
+
+<{base}/lane/1169>
+    libns:lane_number 1 ; libns:library <{base}/library/10923/> .
+<{base}/lane/1170>
+    libns:lane_number 2 ; libns:library <{base}/library/10924/> .
+<{base}/lane/1171>
+    libns:lane_number 3 ; libns:library <{base}/library/12345/> .
+<{base}/lane/1172>
+    libns:lane_number 3 ; libns:library <{base}/library/10930/> .
+""".format(base=base_url)
+        model = get_model()
+        load_string_into_model(model, 'turtle', flowcellInfo)
+        for seq in self._generate_sequences():
+            seq.save_to_model(model)
+        f = sequences.update_model_sequence_library(model, base_url=base_url)
+
+        libTerm = libraryOntology['library']
+        libIdTerm = libraryOntology['library_id']
+
+        url = 'file:///root/42BW9AAXX/C1-152/woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2'
+        nodes = list(model.get_targets(RDF.Uri(url), libTerm))
+        self.assertEqual(len(nodes), 1)
+        self.assertEqual(nodes[0], localLibrary['10923/'])
+        nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
+        self.assertEqual(len(nodes), 1)
+        self.assertEqual(fromTypedNode(nodes[0]), '10923')
+
+        url = 'file:///root/42BW9AAXX/C1-152/woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r1.tar.bz2'
+        nodes = list(model.get_targets(RDF.Uri(url), libTerm))
+        self.assertEqual(len(nodes), 1)
+        self.assertEqual(nodes[0], localLibrary['10924/'])
+        nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
+        self.assertEqual(len(nodes), 1)
+        self.assertEqual(fromTypedNode(nodes[0]), '10924')
+
+        url = 'file:///root/42BW9AAXX/C1-38/Project_12345/12345_AAATTT_L003_R1_001.fastq.gz'
+        nodes = list(model.get_targets(RDF.Uri(url), libTerm))
+        self.assertEqual(len(nodes), 1)
+        self.assertEqual(nodes[0], localLibrary['12345/'])
+        nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
+        self.assertEqual(len(nodes), 1)
+        self.assertEqual(fromTypedNode(nodes[0]), '12345')
+
+    def test_load_from_model(self):
+        """Can we round trip through a RDF model"""
+        model = get_model()
+        path = '/root/42BW9AAXX/C1-38/Project_12345/'
+        filename = '12345_AAATTT_L003_R1_001.fastq.gz'
+        seq = sequences.parse_fastq(path, filename)
+        seq.save_to_model(model)
+
+        seq_id = 'file://'+path+filename
+        seqNode = RDF.Node(RDF.Uri(seq_id))
+        libNode = RDF.Node(RDF.Uri('http://localhost/library/12345'))
+        model.add_statement(
+            RDF.Statement(seqNode, libraryOntology['library'], libNode))
+        seq2 = sequences.SequenceFile.load_from_model(model, seq_id)
+
+        self.assertEqual(seq.flowcell, seq2.flowcell)
+        self.assertEqual(seq.flowcell, '42BW9AAXX')
+        self.assertEqual(seq.filetype, seq2.filetype)
+        self.assertEqual(seq2.filetype, 'split_fastq')
+        self.assertEqual(seq.lane, seq2.lane)
+        self.assertEqual(seq2.lane, 3)
+        self.assertEqual(seq.read, seq2.read)
+        self.assertEqual(seq2.read, 1)
+        self.assertEqual(seq.project, seq2.project)
+        self.assertEqual(seq2.project, '12345')
+        self.assertEqual(seq.index, seq2.index)
+        self.assertEqual(seq2.index, 'AAATTT')
+        self.assertEqual(seq.split, seq2.split)
+        self.assertEqual(seq2.split, '001')
+        self.assertEqual(seq.cycle, seq2.cycle)
+        self.assertEqual(seq.pf, seq2.pf)
+        self.assertEqual(seq2.libraryNode, libNode)
+        self.assertEqual(seq.path, seq2.path)
 
     def test_scan_for_sequences(self):
         # simulate tree
index a3ac2f17b904c832c9996336bb74d0186f0ddef9..6137875b1ad86048b0c08080f6fb947d090e9471 100644 (file)
@@ -8,7 +8,7 @@ from htsworkflow.submission.submission import Submission
 from htsworkflow.util.rdfhelp import \
      fromTypedNode, \
      geoSoftNS, \
-     simplifyUri, \
+     stripNamespace, \
      submissionOntology
 
 from django.conf import settings
@@ -203,7 +203,7 @@ class GEOSubmission(Submission):
     def query_to_soft_dictionary(self, results, heading):
         attributes = []
         for r in results:
-            name = simplifyUri(geoSoftNS, r['name'])
+            name = stripNamespace(geoSoftNS, r['name'])
             if name is not None:
                 if name.lower() == heading.lower():
                     name = '^' + name
index 29df21ec492beeaee42b54dc65e7216753f6a3a7..73d32c6f0e804c4229e8282acf73ade474b4efd2 100644 (file)
@@ -203,6 +203,14 @@ def simplify_uri(uri):
     >>> simplify_uri('http://asdf.org/foo/bar?was=foo')
     'was=foo'
     """
+    if isinstance(uri, RDF.Node):
+        if uri.is_resource():
+            uri = uri.uri
+        else:
+            raise ValueError("Can't simplify an RDF literal")
+    if isinstance(uri, RDF.Uri):
+        uri = str(uri)
+
     parsed = urlparse(uri)
     if len(parsed.query) > 0:
         return parsed.query
@@ -214,7 +222,7 @@ def simplify_uri(uri):
                 return element
     raise ValueError("Unable to simplify %s" % (uri,))
 
-def simplifyUri(namespace, term):
+def stripNamespace(namespace, term):
     """Remove the namespace portion of a term
 
     returns None if they aren't in common
@@ -249,6 +257,12 @@ def get_model(model_name=None, directory=None):
 
 
 def load_into_model(model, parser_name, path, ns=None):
+    if isinstance(path, RDF.Node):
+        if path.is_resource():
+            path = str(path.uri)
+        else:
+            raise ValueError("url to load can't be a RDF literal")
+
     url_parts = list(urlparse(path))
     if len(url_parts[0]) == 0:
         url_parts[0] = 'file'
index 34c3200a909de310c8fb9dd48bcb04605aa8a3c9..2643aef22029d48d00df0df58959a8dbf3fc42db 100644 (file)
@@ -13,7 +13,8 @@ from htsworkflow.util.rdfhelp import \
      load_string_into_model, \
      rdfsNS, \
      toTypedNode, \
-     simplifyUri, \
+     stripNamespace, \
+     simplify_uri, \
      sanitize_literal, \
      xsdNS
 
@@ -102,23 +103,44 @@ try:
             self.assertEqual(fromTypedNode(toTypedNode(long_datetime)),
                              long_datetime)
 
-        def test_simplify_uri(self):
+        def test_strip_namespace_uri(self):
             nsOrg = RDF.NS('example.org/example#')
             nsCom = RDF.NS('example.com/example#')
 
             term = 'foo'
             node = nsOrg[term]
-            self.failUnlessEqual(simplifyUri(nsOrg, node), term)
-            self.failUnlessEqual(simplifyUri(nsCom, node), None)
-            self.failUnlessEqual(simplifyUri(nsOrg, node.uri), term)
+            self.failUnlessEqual(stripNamespace(nsOrg, node), term)
+            self.failUnlessEqual(stripNamespace(nsCom, node), None)
+            self.failUnlessEqual(stripNamespace(nsOrg, node.uri), term)
 
-        def test_simplify_uri_exceptions(self):
+        def test_strip_namespace_exceptions(self):
             nsOrg = RDF.NS('example.org/example#')
             nsCom = RDF.NS('example.com/example#')
 
             node = toTypedNode('bad')
-            self.failUnlessRaises(ValueError, simplifyUri, nsOrg, node)
-            self.failUnlessRaises(ValueError, simplifyUri, nsOrg, nsOrg)
+            self.failUnlessRaises(ValueError, stripNamespace, nsOrg, node)
+            self.failUnlessRaises(ValueError, stripNamespace, nsOrg, nsOrg)
+
+        def test_simplify_uri(self):
+            DATA = [('http://asdf.org/foo/bar', 'bar'),
+                    ('http://asdf.org/foo/bar#bleem', 'bleem'),
+                    ('http://asdf.org/foo/bar/', 'bar'),
+                    ('http://asdf.org/foo/bar?was=foo', 'was=foo')]
+
+            for uri, expected in DATA:
+                self.assertEqual(simplify_uri(uri), expected)
+
+            for uri, expected in DATA:
+                n = RDF.Uri(uri)
+                self.assertEqual(simplify_uri(n), expected)
+
+            for uri, expected in DATA:
+                n = RDF.Node(RDF.Uri(uri))
+                self.assertEqual(simplify_uri(n), expected)
+
+            # decoding literals is questionable
+            n = toTypedNode('http://foo/bar')
+            self.assertRaises(ValueError, simplify_uri, n)
 
         def test_owl_import(self):
             path, name = os.path.split(__file__)