port condorfastq to rdflib
authorDiane Trout <diane@testing.woldlab.caltech.edu>
Thu, 9 Mar 2017 23:26:28 +0000 (15:26 -0800)
committerDiane Trout <diane@testing.woldlab.caltech.edu>
Thu, 9 Mar 2017 23:26:28 +0000 (15:26 -0800)
htsworkflow/submission/condorfastq.py
htsworkflow/submission/test/test_condorfastq.py

index fdac4ba6fa45784d8392af2d5c7a9d95e7c042f0..cd2d61b7243bda74706e101af8df0b6350472e38 100644 (file)
@@ -8,6 +8,8 @@ import types
 import six
 from six.moves.urllib.parse import urljoin, urlparse
 
+from rdflib import ConjunctiveGraph, URIRef
+
 from htsworkflow.pipelines.sequences import scan_for_sequences, \
      update_model_sequence_library
 from htsworkflow.pipelines.samplekey import SampleKey
@@ -15,9 +17,7 @@ from htsworkflow.pipelines import qseq2fastq
 from htsworkflow.pipelines import srf2fastq
 from htsworkflow.pipelines import desplit_fastq
 from htsworkflow.submission.fastqname import FastqName
-from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
-     fromTypedNode, \
-     strip_namespace
+from htsworkflow.util.rdfhelp import dump_model, strip_namespace
 from htsworkflow.util.rdfns import *
 from htsworkflow.util.conversion import parse_flowcell_id
 
@@ -25,8 +25,6 @@ from django.conf import settings
 from django.template import Context, loader
 from django.utils.encoding import smart_str
 
-import RDF
-
 LOGGER = logging.getLogger(__name__)
 
 COMPRESSION_EXTENSIONS = {
@@ -37,7 +35,6 @@ COMPRESSION_EXTENSIONS = {
 class CondorFastqExtract(object):
     def __init__(self, host, sequences_path,
                  log_path='log',
-                 model=None,
                  compression=None,
                  force=False):
         """Extract fastqs from results archive
@@ -51,7 +48,7 @@ class CondorFastqExtract(object):
           force (bool): do we force overwriting current files?
         """
         self.host = host
-        self.model = get_model(model)
+        self.model = ConjunctiveGraph()
         self.sequences_path = sequences_path
         self.log_path = log_path
         self.compression=compression
@@ -158,10 +155,9 @@ class CondorFastqExtract(object):
         """
         LOGGER.debug("find_archive_sequence_files query: %s",
                      query_text)
-        query = RDF.SPARQLQuery(query_text)
         results = []
-        for r in query.execute(self.model):
-            library_id = fromTypedNode(r['library_id'])
+        for r in self.model.query(query_text):
+            library_id = r['library_id'].toPython()
             if library_id in result_map:
                 seq = SequenceResult(r)
                 LOGGER.debug("Creating sequence result for library %s: %s",
@@ -173,23 +169,23 @@ class CondorFastqExtract(object):
     def import_libraries(self, result_map):
         for lib_id in result_map.keys():
             liburl = urljoin(self.host, 'library/%s/' % (lib_id,))
-            library = RDF.Node(RDF.Uri(liburl))
+            library = URIRef(liburl)
             self.import_library(library)
 
     def import_library(self, library):
         """Import library data into our model if we don't have it already
         """
-        q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
+        q = (library, RDF['type'], libraryOntology['Library'])
         present = False
-        if not self.model.contains_statement(q):
+        if q not in self.model:
             present = True
-            load_into_model(self.model, 'rdfa', library)
-        LOGGER.debug("Did we import %s: %s", library.uri, present)
+            self.model.parse(source=library, format='rdfa')
+        LOGGER.debug("Did we import %s: %s", str(library), present)
 
     def find_relevant_flowcell_ids(self):
         """Generate set of flowcell ids that had samples of interest on them
         """
-        flowcell_query = RDF.SPARQLQuery("""
+        flowcell_query = """
 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
 
 select distinct ?flowcell ?flowcell_id
@@ -198,18 +194,18 @@ WHERE {
            libns:has_lane ?lane .
   ?lane libns:flowcell ?flowcell .
   ?flowcell libns:flowcell_id ?flowcell_id .
-}""")
+}"""
         flowcell_ids = set()
-        for r in flowcell_query.execute(self.model):
-            flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
+        for r in self.model.query(flowcell_query):
+            flowcell_ids.add(r['flowcell_id'].toPython())
             imported = False
-            a_lane = self.model.get_target(r['flowcell'],
-                                           libraryOntology['has_lane'])
-            if a_lane is None:
+            a_lane = list(self.model.objects(r['flowcell'],
+                                             libraryOntology['has_lane']))
+            if len(a_lane) == 0:
                 imported = True
                 # we lack information about which lanes were on this flowcell
-                load_into_model(self.model, 'rdfa', r['flowcell'])
-            LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
+                self.model.parse(r['flowcell'], format='rdfa')
+            LOGGER.debug("Did we imported %s: %s" % (str(r['flowcell']),
                                                      imported))
 
         return flowcell_ids
@@ -259,9 +255,9 @@ WHERE {
         (note lane objects are now post demultiplexing.)
         """
         target_uri = 'file://' + smart_str(target)
-        target_node = RDF.Node(RDF.Uri(target_uri))
-        source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
-        self.model.add_statement(source_stmt)
+        target_node = URIRef(target_uri)
+        source_stmt = (target_node, DC['source'], seq.filenode)
+        self.model.add(source_stmt)
 
     def condor_srf_to_fastq(self, sources, target_pathname):
         if len(sources) > 1:
@@ -334,47 +330,56 @@ class SequenceResult(object):
     """Convert the sparql query result from find_archive_sequence_files
     """
     def __init__(self, result):
+        keys = {
+            'cycle': 'cycle',
+            'lane_number': 'lane_number',
+            'read': 'read',
+            'library': 'library',
+            'library_id': 'library_id',
+            'flowcell': 'flowcell',
+            'flowcell_id': 'flowcell_id',
+            'flowcell_type': 'flowcell_type',
+            'flowcell_status': 'flowcell_status',
+        }
+        for result_name, attribute_name in keys.items():
+            node = result[result_name]
+            if node is not None:
+                setattr(self, attribute_name, node.toPython())
+            else:
+                setattr(self, attribute_name, None)
+
         self.filenode = result['filenode']
         self._filetype = result['filetype']
-        self.cycle = fromTypedNode(result['cycle'])
-        self.lane_number = fromTypedNode(result['lane_number'])
-        self.read = fromTypedNode(result['read'])
         if isinstance(self.read, six.string_types):
             self.read = 1
-        self.library = result['library']
-        self.library_id = fromTypedNode(result['library_id'])
-        self.flowcell = result['flowcell']
-        self.flowcell_id = fromTypedNode(result['flowcell_id'])
-        self.flowcell_type = fromTypedNode(result['flowcell_type'])
-        self.flowcell_status = fromTypedNode(result['flowcell_status'])
-
-    def _is_good(self):
+
+    @property
+    def isgood(self):
         """is this sequence / flowcell 'good enough'"""
         if self.flowcell_status is not None and \
            self.flowcell_status.lower() == "failed":
             return False
         return True
-    isgood = property(_is_good)
 
-    def _get_ispaired(self):
+    @property
+    def ispaired(self):
         if self.flowcell_type.lower() == "paired":
             return True
         else:
             return False
-    ispaired = property(_get_ispaired)
 
-    def _get_filetype(self):
+    @property
+    def filetype(self):
         return strip_namespace(libraryOntology, self._filetype)
-    filetype = property(_get_filetype)
 
-    def _get_path(self):
-        url = urlparse(str(self.filenode.uri))
+    @property
+    def path(self):
+        url = urlparse(str(self.filenode))
         if url.scheme == 'file':
             return url.path
         else:
             errmsg = u"Unsupported scheme {0} for {1}"
             raise ValueError(errmsg.format(url.scheme, unicode(url)))
-    path = property(_get_path)
 
     def __repr__(self):
         return "SequenceResult({0},{1},{2})".format(
index 400b4db98d3ba13ec5b4fcff3db66267047be00b..3ed78c0714c1d93ac5a8fdca2f6b3b8e9487d3f3 100644 (file)
@@ -16,7 +16,7 @@ from django.conf import settings
 from htsworkflow.submission.condorfastq import CondorFastqExtract
 from htsworkflow.submission.results import ResultMap
 from htsworkflow.util.rdfhelp import \
-     add_default_schemas, load_string_into_model, dump_model
+     add_default_schemas, dump_model
 from htsworkflow.util.rdfinfer import Infer
 
 FCDIRS = [
@@ -446,7 +446,7 @@ class TestCondorFastq(TestCase):
         self.extract = CondorFastqExtract(HOST,
                                           self.flowcelldir,
                                           self.logdir)
-        load_string_into_model(self.extract.model, 'turtle', lib_turtle)
+        self.extract.model.parse(data=lib_turtle, format='turtle')
         add_default_schemas(self.extract.model)
         inference = Infer(self.extract.model)
         errmsgs = list(inference.run_validation())
@@ -470,21 +470,21 @@ class TestCondorFastq(TestCase):
         seqs = self.extract.find_archive_sequence_files(self.result_map)
 
         expected = set([
-            (u'11154', u'42JUYAAXX', '5', 1, 76, True, 'qseq'),
-            (u'11154', u'42JUYAAXX', '5', 2, 76, True, 'qseq'),
-            (u'11154', u'61MJTAAXX', '6', 1, 76, False, 'qseq'),
-            (u'11154', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
-            (u'11154', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
-            (u'11154', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
-            (u'11154', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
-            (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
-            (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
-            (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
-            (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
-            (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'),
-            (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'),
-            (u'11154', u'30221AAXX', '4', 1, 33, False, 'srf'),
-            (u'11154', u'30DY0AAXX', '8', 1, 151, True, 'srf')
+            (u'11154', u'42JUYAAXX', u'5', 1, 76, True, 'qseq'),
+            (u'11154', u'42JUYAAXX', u'5', 2, 76, True, 'qseq'),
+            (u'11154', u'61MJTAAXX', u'6', 1, 76, False, 'qseq'),
+            (u'11154', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'),
+            (u'11154', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'),
+            (u'11154', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'),
+            (u'11154', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'),
+            (u'12345', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'),
+            (u'12345', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'),
+            (u'12345', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'),
+            (u'12345', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'),
+            (u'12345', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'),
+            (u'12345', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'),
+            (u'11154', u'30221AAXX', u'4', 1, 33, False, 'srf'),
+            (u'11154', u'30DY0AAXX', u'8', 1, 151, True, 'srf')
         ])
         found = set([(l.library_id, l.flowcell_id, l.lane_number, l.read, l.cycle, l.ispaired, l.filetype) for l in seqs])
         self.assertEqual(expected, found)