Further attempts to validate RDF models.
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
index 0e5612a8e47b393684f9ee7f69cd779cca97c607..87212dddbb0acda92894cf49fec9937b7f2d9e21 100644 (file)
@@ -6,6 +6,13 @@ import logging
 import os
 import types
 import re
+import sys
+from urlparse import urljoin, urlparse
+
+import RDF
+from htsworkflow.util.rdfhelp import libraryOntology as libNS
+from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
+     stripNamespace, dump_model, simplify_uri
 
 LOGGER = logging.getLogger(__name__)
 
@@ -122,7 +129,7 @@ class SequenceFile(object):
         # information and thus are unique so we don't have to do anything
         return os.path.join(root, basename)
 
-    def save(self, cursor):
+    def save_to_sql(self, cursor):
         """
         Add this entry to a DB2.0 database.
         """
@@ -148,6 +155,92 @@ class SequenceFile(object):
 
         return cursor.execute(sql, sql_values)
 
+    def save_to_model(self, model, base_url=None):
+        def add_lit(model, s, p, o):
+            if o is not None:
+                model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
+        def add(model, s, p, o):
+            model.add_statement(RDF.Statement(s,p,o))
+        # a bit unreliable... assumes filesystem is encoded in utf-8
+        path = os.path.abspath(self.path.encode('utf-8'))
+        fileNode = RDF.Node(RDF.Uri('file://' + path))
+        add(model, fileNode, rdfNS['type'], libNS['IlluminaResult'])
+        add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
+        add_lit(model, fileNode, libNS['lane_number'], self.lane)
+        if self.read is not None:
+            add_lit(model, fileNode, libNS['read'], self.read)
+        else:
+            add_lit(model, fileNode, libNS['read'], '')
+
+        add_lit(model, fileNode, libNS['library_id'], self.project)
+        add_lit(model, fileNode, libNS['multiplex_index'], self.index)
+        add_lit(model, fileNode, libNS['split_id'], self.split)
+        add_lit(model, fileNode, libNS['cycle'], self.cycle)
+        add_lit(model, fileNode, libNS['passed_filter'], self.pf)
+        add(model, fileNode, libNS['file_type'], libNS[self.filetype])
+
+        if base_url is not None:
+            flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
+                base=base_url,
+                flowcell=self.flowcell)))
+            add(model, fileNode, libNS['flowcell'], flowcell)
+            if self.project is not None:
+                library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
+                    base=base_url,
+                    library=self.project)))
+                add(model, fileNode, libNS['library'], library)
+
+
+    @classmethod
+    def load_from_model(cls, model, seq_id):
+        def get(s, p):
+            values = []
+            stmts = model.find_statements(RDF.Statement(s, p, None))
+            for s in stmts:
+                obj = s.object
+                if not obj.is_resource():
+                    values.append(fromTypedNode(obj))
+                else:
+                    values.append(obj)
+            return values
+        def get_one(s, p):
+            values = get(s, p)
+            if len(values) > 1:
+                errmsg = u"To many values for %s %s"
+                raise ValueError(errmsg % (unicode(s), unicode(p)))
+            elif len(values) == 1:
+                return values[0]
+            else:
+                return None
+
+        if not isinstance(seq_id, RDF.Node):
+            seq_id = RDF.Node(RDF.Uri(seq_id))
+        result_statement = RDF.Statement(seq_id,
+                                         rdfNS['type'],
+                                         libNS['IlluminaResult'])
+        if not model.contains_statement(result_statement):
+            raise KeyError(u"%s not found" % (unicode(seq_id),))
+
+        seq_type_node = model.get_target(seq_id, libNS['file_type'])
+        seq_type = stripNamespace(libNS, seq_type_node)
+
+        path = urlparse(str(seq_id.uri)).path
+        flowcellNode = get_one(seq_id, libNS['flowcell'])
+        flowcell = get_one(seq_id, libNS['flowcell_id'])
+        lane = get_one(seq_id, libNS['lane_number'])
+        read = get_one(seq_id, libNS['read'])
+
+        obj = cls(seq_type, path, flowcell, lane)
+        obj.read = read if read != '' else None
+        obj.project = get_one(seq_id, libNS['library_id'])
+        obj.index = get_one(seq_id, libNS['multiplex_index'])
+        obj.split = get_one(seq_id, libNS['split_id'])
+        obj.cycle = get_one(seq_id, libNS['cycle'] )
+        obj.pf = get_one(seq_id, libNS['passed_filter'])
+        obj.libraryNode = get_one(seq_id, libNS['library'])
+        return obj
+
+
 def get_flowcell_cycle(path):
     """
     Extract flowcell, cycle from pathname
@@ -182,7 +275,7 @@ def parse_srf(path, filename):
     basename, ext = os.path.splitext(filename)
     records = basename.split('_')
     flowcell = records[4]
-    lane = int(records[5][0])
+    lane = records[5][0]
     fullpath = os.path.join(path, filename)
 
     if flowcell_dir != flowcell:
@@ -197,7 +290,7 @@ def parse_qseq(path, filename):
     records = basename.split('_')
     fullpath = os.path.join(path, filename)
     flowcell = records[4]
-    lane = int(records[5][1])
+    lane = records[5][1]
     read = int(records[6][1])
 
     if flowcell_dir != flowcell:
@@ -216,7 +309,7 @@ def parse_fastq(path, filename):
     if project is not None:
         # demultiplexed sample!
         flowcell = flowcell_dir
-        lane = int(records[2][-1])
+        lane = records[2][-1]
         read = int(records[3][-1])
         pf = True # as I understand it hiseq runs toss the ones that fail filter
         index = records[1]
@@ -225,7 +318,7 @@ def parse_fastq(path, filename):
         sequence_type = 'split_fastq'
     else:
         flowcell = records[4]
-        lane = int(records[5][1])
+        lane = records[5][1]
         read = int(records[6][1])
         pf = parse_fastq_pf_flag(records)
         index = None
@@ -269,7 +362,7 @@ def parse_eland(path, filename, eland_match=None):
     fullpath = os.path.join(path, filename)
     flowcell, start, stop, project = get_flowcell_cycle(path)
     if eland_match.group('lane'):
-        lane = int(eland_match.group('lane'))
+        lane = eland_match.group('lane')
     else:
         lane = None
     if eland_match.group('read'):
@@ -316,3 +409,103 @@ def scan_for_sequences(dirs):
                     LOGGER.debug("Found sequence at %s" % (f,))
 
     return sequences
+
+
+def update_model_sequence_library(model, base_url):
+    """Find sequence objects and add library information if its missing
+    """
+    file_body = """
+    prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+    select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
+    where {
+       ?filenode a libns:IlluminaResult ;
+                 libns:flowcell_id ?flowcell_id ;
+                 libns:lane_number ?lane_id .
+       OPTIONAL { ?filenode libns:flowcell ?flowcell . }
+       OPTIONAL { ?filenode libns:library ?library .}
+       OPTIONAL { ?filenode libns:library_id ?library_id .}
+    }
+    """
+    LOGGER.debug("update_model_sequence_library query %s", file_body)
+    file_query = RDF.SPARQLQuery(file_body)
+    files = file_query.execute(model)
+
+    libraryNS = RDF.NS(urljoin(base_url, 'library/'))
+    flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
+    for f in files:
+        filenode = f['filenode']
+        LOGGER.debug("Updating file node %s", str(filenode))
+        lane_id = fromTypedNode(f['lane_id'])
+        if f['flowcell'] is None:
+            flowcell = flowcellNS[str(f['flowcell_id'])+'/']
+            LOGGER.debug("Adding file (%s) to flowcell (%s) link",
+                         str(filenode),
+                         str(flowcell))
+            model.add_statement(
+                RDF.Statement(filenode, libNS['flowcell'], flowcell))
+        else:
+            flowcell = f['flowcell']
+
+        if f['library'] is None:
+            if f['library_id'] is not None:
+                library = libraryNS[str(f['library_id']) + '/']
+            else:
+                library = guess_library_from_model(model, base_url,
+                                                   flowcell,
+                                                   lane_id)
+                if library is None:
+                    LOGGER.error("Unable to decypher: %s %s",
+                                 str(flowcell), str(lane_id))
+                    continue
+                library_id = toTypedNode(simplify_uri(library))
+                LOGGER.debug("Adding file (%s) to library (%s) link",
+                             str(filenode),
+                             str(library))
+                model.add_statement(
+                    RDF.Statement(filenode, libNS['library_id'], library_id))
+            if library is not None:
+                model.add_statement(
+                    RDF.Statement(filenode, libNS['library'], library))
+
+
+def guess_library_from_model(model, base_url, flowcell, lane_id):
+    """Attempt to find library URI
+    """
+    flowcellNode = RDF.Node(flowcell)
+    flowcell = str(flowcell.uri)
+    lane_body = """
+    prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+    prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+    prefix xsd: <http://www.w3.org/2001/XMLSchema#>
+
+    select ?library ?lane
+    where {{
+      <{flowcell}> libns:has_lane ?lane ;
+                   a libns:IlluminaFlowcell .
+      ?lane libns:lane_number ?lane_id ;
+            libns:library ?library .
+      FILTER(str(?lane_id) = "{lane_id}")
+    }}
+    """
+    lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
+    LOGGER.debug("guess_library_from_model: %s", lane_body)
+    lanes = []
+    tries = 3
+    while len(lanes) == 0 and tries > 0:
+        tries -= 1
+        lane_query = RDF.SPARQLQuery(lane_body)
+        lanes = [ l for l in lane_query.execute(model)]
+        if len(lanes) > 1:
+            # CONFUSED!
+            errmsg = "Too many libraries for flowcell {flowcell} "\
+                     "lane {lane} = {count}"
+            LOGGER.error(errmsg.format(flowcell=flowcell,
+                                       lane=lane_id,
+                                       count=len(lanes)))
+            return None
+        elif len(lanes) == 1:
+            # success
+            return lanes[0]['library']
+        else:
+            # try grabbing data
+            model.load(flowcellNode.uri, name="rdfa")