library = guess_library_from_model(model, base_url,
flowcell,
lane_id)
+ if library is None:
+ LOGGER.error("Unable to decypher: %s %s",
+ str(flowcell), str(lane_id))
+ continue
library_id = toTypedNode(simplify_uri(library))
LOGGER.debug("Adding file (%s) to library (%s) link",
str(filenode),
where {{
<{flowcell}> libns:has_lane ?lane ;
a libns:IlluminaFlowcell .
- ?lane libns:lane_number "{lane_id}" ;
+ ?lane libns:lane_number ?lane_id ;
libns:library ?library .
+ FILTER(str(?lane_id) = "{lane_id}")
}}
"""
lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
+ LOGGER.debug("guess_library_from_model: %s", lane_body)
lanes = []
tries = 3
while len(lanes) == 0 and tries > 0:
else:
# try grabbing data
model.load(flowcellNode.uri, name="rdfa")
-
-
from htsworkflow.util.rdfhelp import SCHEMAS_URL
INFER_URL='http://jumpgate.caltech.edu/phony/infer'
+LOGGER = logging.getLogger(__name__)
class Infer(object):
"""Provide some simple inference.
for method_name in dir(self):
if method_name.startswith('_rule_'):
+ LOGGER.info("Running: %s", method_name)
method = getattr(self, method_name)
method()
if self.model.size() == starting_size:
"""
for method_name in dir(self):
if method_name.startswith('_validate_'):
+ LOGGER.info("Running: %s", method_name)
method = getattr(self, method_name)
for msg in method():
yield msg
query = RDF.SPARQLQuery(body)
errmsg = "Missing type for: {0}"
for r in query.execute(self.model):
- yield errmsg.format(str(r['subject'].uri))
+ yield errmsg.format(str(r['subject']))
def _validate_undefined_properties(self):
"""Find properties that aren't defined.
e.g. is a subject (node) the domain (space) of this property
and is the object (node) the range of of this property.
"""
+ resource_error = "Expected resource for {0} in range {1}"
+ type_error = "Type of {0} was {1} not {2}"
# check domain
query = RDF.SPARQLQuery(property_template.format(
predicate=predicate.uri,
space=space))
- seen = []
+ seen = set()
for r in query.execute(self.model):
+ # Make sure we have a resource if we're expecting one
if r['type'] == rdfsNS['Resource']:
+ if not node.is_resource():
+ return resource_error.format(str(node), space)
continue
- seen.append(str(r['type'].uri))
+ seen.add(str(r['type'].uri))
+ if node.is_literal():
+ # literal is a generic type.
+ nodetype = node.literal_value['datatype']
+ if nodetype is None:
+ # lets default to string
+ nodetype = xsdNS['string'].uri
+ if r['type'] == rdfsNS['Literal']:
+ pass
+ elif nodetype != r['type'].uri:
+ return type_error.format(
+ str(node), nodetype, r['type'])
+ # check that node is the expetected class type
check = RDF.Statement(node, rdfNS['type'], r['type'])
if self.model.contains_statement(check):
return
wrong_range_type.format(str(s)))
if msg is not None: yield msg
return
-
-
dc:title "HTS-Workflow ontology" ;
a owl:Ontology .
-htswlib:Class a rdfs:Class .
+htswlib:Class rdfs:subClassOf rdfs:Class ;
+ a rdfs:Class .
+rdfs:Resource a rdfs:Class.
htswlib:IlluminaFlowcell
- a rdfs:Class, htswlib:Class ;
+ a rdfs:Class, htswlib:Class;
rdfs:comment "information about a illumina flowcell" ;
rdfs:label "Flowcell" .
rdfs:label "made on" ;
rdfs:domain htswlib:IlluminaFlowcell ;
rdfs:domain htswlib:Library ;
- rdfs:range rdfs:Literal .
+ rdfs:range xsd:dateTime .
htswlib:total_unique_locations
a rdf:Property ;
rdfs:label "Unique locations" ;
rdfs:domain htswlib:Library ;
rdfs:domain htswlib:IlluminaLane ;
- rdfs:range rdfs:Literal .
+ rdfs:range xsd:integer .
htswlib:has_mappings
a rdf:Property ;
rdfs:comment "The estimated fragment sizes cut from gel";
rdfs:label "Gel Cut" ;
rdfs:domain htswlib:Library ;
- rdfs:range rdfs:Literal .
+ rdfs:range xsd:decimal .
htswlib:made_by
a rdf:Property ;
rdfs:comment "Which lane were we run in" ;
rdfs:label "lane id" ;
rdfs:domain htswlib:IlluminaLane ;
- rdfs:range rdfs:Literal .
+ rdfs:range xsd:string .
# FIXME: should this be note?
htswlib:comment
--- /dev/null
+from argparse import ArgumentParser
+import logging
+from htsworkflow.util import rdfhelp, rdfinfer
+
+def main(cmdline=None):
+ parser = make_parser()
+ args = parser.parse_args(cmdline)
+
+ logging.basicConfig(level=logging.INFO)
+
+ validate_urls(args.urls)
+
+def make_parser():
+ parser = ArgumentParser()
+ parser.add_argument('urls',nargs='*')
+ return parser
+
+def validate_urls(urls):
+ model = rdfhelp.get_model()
+ rdfhelp.add_default_schemas(model)
+
+ for u in urls:
+ rdfhelp.load_into_model(model, None, u)
+
+ engine = rdfinfer.Infer(model)
+ #engine.think()
+ engine.validate()
+
+if __name__ == "__main__":
+ main()