6 from pprint import pformat
9 from six.moves import StringIO
11 from six.moves import urllib
13 from rdflib import Graph, Literal, Namespace, URIRef
14 from rdflib.namespace import OWL, RDF
16 from htsworkflow.util.rdfns import (
21 from htsworkflow.util.rdfhelp import dump_model
22 from htsworkflow.util.rdfns import dafTermOntology
23 from htsworkflow.util.hashfile import make_md5sum
25 LOGGER = logging.getLogger(__name__)
27 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
28 VARIABLES_TERM_NAME = 'variables'
29 DAF_PRE_VARIABLES = ['files', 'view']
30 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
33 class ModelException(RuntimeError):
34 """Assumptions about the RDF model failed"""
38 class MetadataLookupException(RuntimeError):
39 """Problem accessing metadata"""
48 def parse_into_model(model, subject, filename):
49 """Read a DAF into RDF Model
51 requires a subject node to attach statements to
53 attributes = parse(filename)
54 add_to_model(model, attributes, subject)
57 def fromstream_into_model(model, subject, daf_stream):
58 """Load daf stream into model attached to node subject
60 attributes = parse_stream(daf_stream)
61 add_to_model(model, attributes, subject)
64 def fromstring_into_model(model, subject, daf_string):
65 """Read a string containing a DAF into RDF Model
67 requires a short submission name
69 attributes = fromstring(daf_string)
70 add_to_model(model, attributes, subject)
74 """Parse daf from a file
76 stream = open(filename, 'r')
77 attributes = parse_stream(stream)
82 def fromstring(daf_string):
83 """Parse UCSC daf from a provided string"""
84 stream = StringIO(daf_string)
85 return parse_stream(stream)
88 def parse_stream(stream):
89 """Parse UCSC dat stored in a stream"""
90 comment_re = re.compile("#.*$")
93 attributes = {'views': {}}
98 line = comment_re.sub("", line)
99 nstop = _extract_name_index(line)
101 sstop = _consume_whitespace(line, start=nstop)
102 vstop = _extract_value_index(line, start=sstop)
103 value = line[sstop:vstop]
105 if value.lower() in ('yes',):
107 elif value.lower() in ('no',):
111 if view_name is not None:
112 attributes['views'][view_name] = view_attributes
116 elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
117 attributes[name] = [x.strip() for x in value.split(',')]
118 elif state == DAF_HEADER and name == 'view':
120 view_attributes['view'] = value
122 elif state == DAF_HEADER:
123 attributes[name] = value
124 elif state == DAF_VIEW:
125 view_attributes[name] = value
128 if view_name is not None:
129 attributes['views'][view_name] = view_attributes
131 LOGGER.debug("DAF Attributes" + pformat(attributes))
135 def _consume_whitespace(line, start=0):
136 """return index of next non whitespace character
138 returns length of string if it can't find anything
140 for i, c in enumerate(line[start:]):
141 if c not in string.whitespace:
147 def _extract_name_index(line, start=0):
148 """Used to find end of word by looking for a whitespace character
150 returns length of string if nothing matches
152 for i, c in enumerate(line[start:]):
153 if c in string.whitespace:
159 def _extract_value_index(line, start=0):
160 """Returns position of last non-whitespace character
162 shortline = line.rstrip()
163 return len(shortline)
166 def convert_to_rdf_statements(attributes, subject):
167 """Convert dictionary of DAF attributes into rdf statements
169 The statements are attached to the provided subject node
171 variables_term = dafTermOntology[VARIABLES_TERM_NAME]
173 for daf_key in attributes:
174 predicate = dafTermOntology[daf_key]
175 if daf_key == 'views':
176 statements.extend(_views_to_statements(subject,
178 attributes[daf_key]))
179 elif daf_key in DAF_VARIABLE_NAMES:
180 for var in attributes.get(daf_key, []):
182 statements.append((subject, variables_term, obj))
184 value = attributes[daf_key]
186 statements.append((subject, predicate, obj))
191 def _views_to_statements(subject, dafNS, views):
192 """Attach view attributes to new view nodes atached to provided subject
194 viewNS = get_view_namespace(subject)
197 for view_name in views:
198 view_attributes = views[view_name]
199 viewSubject = viewNS[view_name]
200 statements.append((subject, dafNS['views'], viewSubject))
201 statements.append((viewSubject, dafNS['name'], Literal(view_name)))
202 for view_attribute_name in view_attributes:
203 predicate = dafNS[view_attribute_name]
204 obj = Literal(view_attributes[view_attribute_name])
205 statements.append((viewSubject, predicate, obj))
207 #statements.extend(convert_to_rdf_statements(view, viewNode))
211 def add_to_model(model, attributes, subject):
212 for statement in convert_to_rdf_statements(attributes, subject):
216 def get_submission_uri(name):
217 return submissionLog[name]
220 def submission_uri_to_string(submission_uri):
221 if isinstance(submission_uri, (Literal, URIRef)):
222 submission_uri = str(submission_uri)
223 if submission_uri[-1] != '/':
224 submission_uri += '/'
225 return submission_uri
228 def get_view_namespace(submission_uri):
229 submission_uri = submission_uri_to_string(submission_uri)
230 view_uri = urllib.parse.urljoin(submission_uri, 'view/')
231 viewNS = Namespace(view_uri)
235 class UCSCSubmission(object):
236 """Build a submission by examining the DAF for what we need to submit
238 def __init__(self, name, daf_file=None, model=None):
239 """Construct a RDF backed model of a UCSC DAF
242 name (str): the name of this submission (used to construct DAF url)
243 daf_file (str, stream, or None):
244 if str, use as filename
245 if stream, parse as stream
246 if none, don't attempt to load the DAF into our model
247 model (RDF.Model or None):
248 if None, construct a memory backed model
249 otherwise specifies model to use
251 if daf_file is None and model is None:
252 LOGGER.error("We need a DAF or Model containing a DAF to work")
255 self.submissionSet = get_submission_uri(self.name)
256 self.viewNS = get_view_namespace(self.submissionSet)
258 if model is not None:
261 self.model = get_model()
263 if isinstance(daf_file, collections.Iterable):
264 # its some kind of stream
265 self.daf = daf_file.read()
268 stream = open(daf_file, 'rt')
269 self.daf = stream.read()
272 fromstring_into_model(self.model, self.submissionSet, self.daf)
274 self.libraryNS = Namespace('http://jumpgate.caltech.edu/library/')
275 self.submissionSetNS = Namespace(str(self.submissionSet) + '/')
276 self.__view_map = None
278 def _get_daf_name(self):
279 return self.name + '.daf'
280 daf_name = property(_get_daf_name,doc="construct name for DAF file")
282 def add_pattern(self, view_name, filename_pattern):
283 """Map a filename regular expression to a view name
285 obj = Literal(filename_pattern)
287 (self.viewNS[view_name],
288 dafTermOntology['filename_re'],
291 def scan_submission_dirs(self, result_map):
292 """Examine files in our result directory
294 for lib_id, result_dir in result_map.items():
295 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
297 self.import_submission_dir(result_dir, lib_id)
298 except MetadataLookupException as e:
299 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
301 def import_submission_dir(self, submission_dir, library_id):
302 """Import a submission directories and update our model as needed
304 #attributes = get_filename_attribute_map(paired)
305 libNode = self.libraryNS[library_id + "/"]
307 self._add_library_details_to_model(libNode)
309 submission_files = os.listdir(submission_dir)
310 for filename in submission_files:
311 self.construct_track_attributes(submission_dir, libNode, filename)
313 def construct_track_attributes(self, submission_dir, libNode, pathname):
314 """Looking for the best extension
315 The 'best' is the longest match
318 filename (str): the filename whose extention we are about to examine
320 path, filename = os.path.split(pathname)
322 LOGGER.debug("Searching for view")
323 view = self.find_view(filename)
325 LOGGER.warn("Unrecognized file: {0}".format(pathname))
327 if str(view) == str(libraryOntology['ignore']):
330 submission_name = self.make_submission_name(submission_dir)
331 submissionNode = self.get_submission_node(submission_dir)
332 submission_uri = str(submissionNode)
333 view_name = list(self.model.objects(view, dafTermOntology['name']))
334 if len(view_name) == 0:
335 errmsg = 'Could not find view name for {0}'
336 LOGGER.warning(errmsg.format(str(view)))
339 view_name = str(view_name[0])
340 submissionView = URIRef(submission_uri + '/' + view_name)
342 self.model.add((self.submissionSet,
343 dafTermOntology['has_submission'],
345 LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
346 self.model.add((submissionNode,
347 submissionOntology['has_view'],
349 self.model.add((submissionNode,
350 submissionOntology['name'],
351 Literal(submission_name)))
352 self.model.add((submissionNode,
354 submissionOntology['submission']))
355 self.model.add((submissionNode,
356 libraryOntology['library'],
359 LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
360 # add track specific information
361 self.model.add((submissionView, dafTermOntology['view'], view))
362 self.model.add((submissionView,
363 dafTermOntology['paired'],
364 Literal(self._is_paired(libNode))))
365 self.model.add((submissionView,
366 dafTermOntology['submission'],
369 # add file specific information
370 self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
372 LOGGER.debug("Done.")
374 def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
375 # add file specific information
376 LOGGER.debug("Updating file md5sum")
377 submission_pathname = os.path.join(submission_dir, filename)
378 fileNode = URIRef("file://" + submission_pathname)
379 self.model.add((submissionView,
380 dafTermOntology['has_file'],
382 self.model.add((fileNode,
383 dafTermOntology['filename'],
386 md5 = make_md5sum(submission_pathname)
388 errmsg = "Unable to produce md5sum for {0}"
389 LOGGER.warning(errmsg.format(submission_pathname))
391 self.model.add((fileNode, dafTermOntology['md5sum'], Literal(md5)))
393 def _add_library_details_to_model(self, libNode):
395 tmpmodel.parse(source=libNode, format='rdfa')
397 # don't override things we already have in the model
398 targets = list(self.model.objects(s[0], s[1]))
399 if len(targets) == 0:
402 def get_daf_variables(self):
403 """Returns simple variables names that to include in the ddf
405 variables_term = dafTermOntology[VARIABLES_TERM_NAME]
407 results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
408 results = DAF_PRE_VARIABLES[:]
409 if self.need_replicate() and 'replicate' not in results:
410 results.append('replicate')
412 for obj in self.model.objects(self.submissionSet, variables_term):
413 value = obj.toPython()
414 if value not in results:
415 results.append(value)
416 results.extend([v for v in DAF_POST_VARIABLES if v not in results])
419 def make_submission_name(self, submission_dir):
420 submission_dir = os.path.normpath(submission_dir)
421 submission_dir_name = os.path.split(submission_dir)[1]
422 if len(submission_dir_name) == 0:
424 "Submission dir name too short: {0}".format(submission_dir))
425 return submission_dir_name
427 def get_submission_node(self, submission_dir):
428 """Convert a submission directory name to a submission node
430 submission_name = self.make_submission_name(submission_dir)
431 return self.submissionSetNS[submission_name]
433 def _get_library_attribute(self, libNode, attribute):
434 if not isinstance(attribute, (Literal, URIRef)):
435 attribute = libraryOntology[attribute]
437 targets = list(self.model.objects(libNode, attribute))
439 return self._format_library_attribute(targets)
443 #targets = self._search_same_as(libNode, attribute)
444 #if targets is not None:
445 # return self._format_library_attribute(targets)
447 # we don't know anything about this attribute
448 self._add_library_details_to_model(libNode)
450 targets = list(self.model.objects(libNode, attribute))
452 return self._format_library_attribute(targets)
456 def _format_library_attribute(self, targets):
457 if len(targets) == 0:
459 elif len(targets) == 1:
460 return targets[0].toPython()
461 elif len(targets) > 1:
462 return [t.toPython() for t in targets]
464 def _search_same_as(self, subject, predicate):
465 # look for alternate names
466 other_predicates = self.model.objects(predicate, OWL['sameAs'])
467 for other in other_predicates:
468 targets = list(self.model.objects(subject, other))
473 def find_view(self, filename):
474 """Search through potential DAF filename patterns
476 if self.__view_map is None:
477 self.__view_map = self._get_filename_view_map()
480 for pattern, view in self.__view_map.items():
481 if re.match(pattern, filename):
485 msg = "%s matched multiple views %s" % (
487 [str(x) for x in results])
488 raise ModelException(msg)
489 elif len(results) == 1:
494 def get_view_name(self, view):
495 view_term = submissionOntology['view_name']
496 names = list(self.model.objects(view, view_term))
498 return names[0].toPython()
500 msg = "Found wrong number of view names for {0} len = {1}"
501 msg = msg.format(str(view), len(names))
503 raise RuntimeError(msg)
505 def _get_filename_view_map(self):
506 """Query our model for filename patterns
508 return a dictionary of compiled regular expressions to view names
510 filename_query = (None, dafTermOntology['filename_re'], None)
513 for s in self.model.triples(filename_query):
515 literal_re = s[2].value
516 LOGGER.debug("Found: %s" % (literal_re,))
518 filename_re = re.compile(literal_re)
519 except re.error as e:
520 LOGGER.error("Unable to compile: %s" % (literal_re,))
521 patterns[literal_re] = view_name
524 def _get_library_url(self):
525 return str(self.libraryNS[''])
527 def _set_library_url(self, value):
528 self.libraryNS = Namespace(str(value))
530 library_url = property(_get_library_url, _set_library_url)
532 def _is_paired(self, libNode):
533 """Determine if a library is paired end"""
534 library_type = self._get_library_attribute(libNode, 'library_type')
535 if library_type is None:
536 errmsg = "%s doesn't have a library type"
537 raise ModelException(errmsg % (str(libNode),))
539 single = ['CSHL (lacking last nt)',
540 'Single End (non-multiplexed)',
541 'Small RNA (non-multiplexed)',]
542 paired = ['Barcoded Illumina',
545 'Paired End (non-multiplexed)',]
546 if library_type in single:
548 elif library_type in paired:
551 raise MetadataLookupException(
552 "Unrecognized library type %s for %s" % \
553 (library_type, str(libNode)))
555 def need_replicate(self):
556 viewTerm = dafTermOntology['views']
557 replicateTerm = dafTermOntology['hasReplicates']
559 views = self.model.objects(self.submissionSet, viewTerm)
561 replicate = list(self.model.objects(view, replicateTerm))
562 if len(replicate) > 0 and replicate[0].toPython():
568 def link_daf(self, result_map):
569 if self.daf is None or len(self.daf) == 0:
571 "DAF data does not exist, how can I link to it?")
573 base_daf = self.daf_name
575 for result_dir in result_map.values():
576 if not os.path.exists(result_dir):
578 "Couldn't find target directory %s" %(result_dir,))
579 submission_daf = os.path.join(result_dir, base_daf)
580 if os.path.exists(submission_daf):
581 previous_daf = open(submission_daf, 'r').read()
582 if self.daf != previous_daf:
583 LOGGER.info("Old daf is different, overwriting it.")
584 stream = open(submission_daf, 'w')
585 stream.write(self.daf)
589 if __name__ == "__main__":
590 example_daf = """# Lab and general info
594 variables cell, antibody,sex,age,strain,control
595 compositeSuffix CaltechHistone
598 validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
600 # Track/view definition
602 longLabelPrefix Caltech Fastq Read 1
608 longLabelPrefix Caltech Histone Signal
614 example_daf_stream = StringIO(example_daf)
616 mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)