From ce48338a6b7813fcfcf9fdef5bd7ff92c1aa4ce1 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Thu, 9 Mar 2017 15:26:28 -0800 Subject: [PATCH] port condorfastq to rdflib --- htsworkflow/submission/condorfastq.py | 97 ++++++++++--------- .../submission/test/test_condorfastq.py | 34 +++---- 2 files changed, 68 insertions(+), 63 deletions(-) diff --git a/htsworkflow/submission/condorfastq.py b/htsworkflow/submission/condorfastq.py index fdac4ba..cd2d61b 100644 --- a/htsworkflow/submission/condorfastq.py +++ b/htsworkflow/submission/condorfastq.py @@ -8,6 +8,8 @@ import types import six from six.moves.urllib.parse import urljoin, urlparse +from rdflib import ConjunctiveGraph, URIRef + from htsworkflow.pipelines.sequences import scan_for_sequences, \ update_model_sequence_library from htsworkflow.pipelines.samplekey import SampleKey @@ -15,9 +17,7 @@ from htsworkflow.pipelines import qseq2fastq from htsworkflow.pipelines import srf2fastq from htsworkflow.pipelines import desplit_fastq from htsworkflow.submission.fastqname import FastqName -from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \ - fromTypedNode, \ - strip_namespace +from htsworkflow.util.rdfhelp import dump_model, strip_namespace from htsworkflow.util.rdfns import * from htsworkflow.util.conversion import parse_flowcell_id @@ -25,8 +25,6 @@ from django.conf import settings from django.template import Context, loader from django.utils.encoding import smart_str -import RDF - LOGGER = logging.getLogger(__name__) COMPRESSION_EXTENSIONS = { @@ -37,7 +35,6 @@ COMPRESSION_EXTENSIONS = { class CondorFastqExtract(object): def __init__(self, host, sequences_path, log_path='log', - model=None, compression=None, force=False): """Extract fastqs from results archive @@ -51,7 +48,7 @@ class CondorFastqExtract(object): force (bool): do we force overwriting current files? """ self.host = host - self.model = get_model(model) + self.model = ConjunctiveGraph() self.sequences_path = sequences_path self.log_path = log_path self.compression=compression @@ -158,10 +155,9 @@ class CondorFastqExtract(object): """ LOGGER.debug("find_archive_sequence_files query: %s", query_text) - query = RDF.SPARQLQuery(query_text) results = [] - for r in query.execute(self.model): - library_id = fromTypedNode(r['library_id']) + for r in self.model.query(query_text): + library_id = r['library_id'].toPython() if library_id in result_map: seq = SequenceResult(r) LOGGER.debug("Creating sequence result for library %s: %s", @@ -173,23 +169,23 @@ class CondorFastqExtract(object): def import_libraries(self, result_map): for lib_id in result_map.keys(): liburl = urljoin(self.host, 'library/%s/' % (lib_id,)) - library = RDF.Node(RDF.Uri(liburl)) + library = URIRef(liburl) self.import_library(library) def import_library(self, library): """Import library data into our model if we don't have it already """ - q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library']) + q = (library, RDF['type'], libraryOntology['Library']) present = False - if not self.model.contains_statement(q): + if q not in self.model: present = True - load_into_model(self.model, 'rdfa', library) - LOGGER.debug("Did we import %s: %s", library.uri, present) + self.model.parse(source=library, format='rdfa') + LOGGER.debug("Did we import %s: %s", str(library), present) def find_relevant_flowcell_ids(self): """Generate set of flowcell ids that had samples of interest on them """ - flowcell_query = RDF.SPARQLQuery(""" + flowcell_query = """ prefix libns: select distinct ?flowcell ?flowcell_id @@ -198,18 +194,18 @@ WHERE { libns:has_lane ?lane . ?lane libns:flowcell ?flowcell . ?flowcell libns:flowcell_id ?flowcell_id . -}""") +}""" flowcell_ids = set() - for r in flowcell_query.execute(self.model): - flowcell_ids.add( fromTypedNode(r['flowcell_id']) ) + for r in self.model.query(flowcell_query): + flowcell_ids.add(r['flowcell_id'].toPython()) imported = False - a_lane = self.model.get_target(r['flowcell'], - libraryOntology['has_lane']) - if a_lane is None: + a_lane = list(self.model.objects(r['flowcell'], + libraryOntology['has_lane'])) + if len(a_lane) == 0: imported = True # we lack information about which lanes were on this flowcell - load_into_model(self.model, 'rdfa', r['flowcell']) - LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri, + self.model.parse(r['flowcell'], format='rdfa') + LOGGER.debug("Did we imported %s: %s" % (str(r['flowcell']), imported)) return flowcell_ids @@ -259,9 +255,9 @@ WHERE { (note lane objects are now post demultiplexing.) """ target_uri = 'file://' + smart_str(target) - target_node = RDF.Node(RDF.Uri(target_uri)) - source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode) - self.model.add_statement(source_stmt) + target_node = URIRef(target_uri) + source_stmt = (target_node, DC['source'], seq.filenode) + self.model.add(source_stmt) def condor_srf_to_fastq(self, sources, target_pathname): if len(sources) > 1: @@ -334,47 +330,56 @@ class SequenceResult(object): """Convert the sparql query result from find_archive_sequence_files """ def __init__(self, result): + keys = { + 'cycle': 'cycle', + 'lane_number': 'lane_number', + 'read': 'read', + 'library': 'library', + 'library_id': 'library_id', + 'flowcell': 'flowcell', + 'flowcell_id': 'flowcell_id', + 'flowcell_type': 'flowcell_type', + 'flowcell_status': 'flowcell_status', + } + for result_name, attribute_name in keys.items(): + node = result[result_name] + if node is not None: + setattr(self, attribute_name, node.toPython()) + else: + setattr(self, attribute_name, None) + self.filenode = result['filenode'] self._filetype = result['filetype'] - self.cycle = fromTypedNode(result['cycle']) - self.lane_number = fromTypedNode(result['lane_number']) - self.read = fromTypedNode(result['read']) if isinstance(self.read, six.string_types): self.read = 1 - self.library = result['library'] - self.library_id = fromTypedNode(result['library_id']) - self.flowcell = result['flowcell'] - self.flowcell_id = fromTypedNode(result['flowcell_id']) - self.flowcell_type = fromTypedNode(result['flowcell_type']) - self.flowcell_status = fromTypedNode(result['flowcell_status']) - - def _is_good(self): + + @property + def isgood(self): """is this sequence / flowcell 'good enough'""" if self.flowcell_status is not None and \ self.flowcell_status.lower() == "failed": return False return True - isgood = property(_is_good) - def _get_ispaired(self): + @property + def ispaired(self): if self.flowcell_type.lower() == "paired": return True else: return False - ispaired = property(_get_ispaired) - def _get_filetype(self): + @property + def filetype(self): return strip_namespace(libraryOntology, self._filetype) - filetype = property(_get_filetype) - def _get_path(self): - url = urlparse(str(self.filenode.uri)) + @property + def path(self): + url = urlparse(str(self.filenode)) if url.scheme == 'file': return url.path else: errmsg = u"Unsupported scheme {0} for {1}" raise ValueError(errmsg.format(url.scheme, unicode(url))) - path = property(_get_path) def __repr__(self): return "SequenceResult({0},{1},{2})".format( diff --git a/htsworkflow/submission/test/test_condorfastq.py b/htsworkflow/submission/test/test_condorfastq.py index 400b4db..3ed78c0 100644 --- a/htsworkflow/submission/test/test_condorfastq.py +++ b/htsworkflow/submission/test/test_condorfastq.py @@ -16,7 +16,7 @@ from django.conf import settings from htsworkflow.submission.condorfastq import CondorFastqExtract from htsworkflow.submission.results import ResultMap from htsworkflow.util.rdfhelp import \ - add_default_schemas, load_string_into_model, dump_model + add_default_schemas, dump_model from htsworkflow.util.rdfinfer import Infer FCDIRS = [ @@ -446,7 +446,7 @@ class TestCondorFastq(TestCase): self.extract = CondorFastqExtract(HOST, self.flowcelldir, self.logdir) - load_string_into_model(self.extract.model, 'turtle', lib_turtle) + self.extract.model.parse(data=lib_turtle, format='turtle') add_default_schemas(self.extract.model) inference = Infer(self.extract.model) errmsgs = list(inference.run_validation()) @@ -470,21 +470,21 @@ class TestCondorFastq(TestCase): seqs = self.extract.find_archive_sequence_files(self.result_map) expected = set([ - (u'11154', u'42JUYAAXX', '5', 1, 76, True, 'qseq'), - (u'11154', u'42JUYAAXX', '5', 2, 76, True, 'qseq'), - (u'11154', u'61MJTAAXX', '6', 1, 76, False, 'qseq'), - (u'11154', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'), - (u'11154', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'), - (u'11154', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'), - (u'11154', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'), - (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'), - (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'), - (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'), - (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'), - (u'12345', u'C02F9ACXX', '3', 1, 202, True, 'split_fastq'), - (u'12345', u'C02F9ACXX', '3', 2, 202, True, 'split_fastq'), - (u'11154', u'30221AAXX', '4', 1, 33, False, 'srf'), - (u'11154', u'30DY0AAXX', '8', 1, 151, True, 'srf') + (u'11154', u'42JUYAAXX', u'5', 1, 76, True, 'qseq'), + (u'11154', u'42JUYAAXX', u'5', 2, 76, True, 'qseq'), + (u'11154', u'61MJTAAXX', u'6', 1, 76, False, 'qseq'), + (u'11154', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'), + (u'11154', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'), + (u'11154', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'), + (u'11154', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'), + (u'12345', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'), + (u'12345', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'), + (u'12345', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'), + (u'12345', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'), + (u'12345', u'C02F9ACXX', u'3', 1, 202, True, 'split_fastq'), + (u'12345', u'C02F9ACXX', u'3', 2, 202, True, 'split_fastq'), + (u'11154', u'30221AAXX', u'4', 1, 33, False, 'srf'), + (u'11154', u'30DY0AAXX', u'8', 1, 151, True, 'srf') ]) found = set([(l.library_id, l.flowcell_id, l.lane_number, l.read, l.cycle, l.ispaired, l.filetype) for l in seqs]) self.assertEqual(expected, found) -- 2.30.2