5 from pprint import pformat
8 from io import StringIO
13 from htsworkflow.util.rdfhelp import \
25 from htsworkflow.util.hashfile import make_md5sum
27 LOGGER = logging.getLogger(__name__)
29 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
30 VARIABLES_TERM_NAME = 'variables'
31 DAF_PRE_VARIABLES = ['files', 'view']
32 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
35 class ModelException(RuntimeError):
36 """Assumptions about the RDF model failed"""
40 class MetadataLookupException(RuntimeError):
41 """Problem accessing metadata"""
50 def parse_into_model(model, subject, filename):
51 """Read a DAF into RDF Model
53 requires a subject node to attach statements to
55 attributes = parse(filename)
56 add_to_model(model, attributes, subject)
59 def fromstream_into_model(model, subject, daf_stream):
60 """Load daf stream into model attached to node subject
62 attributes = parse_stream(daf_stream)
63 add_to_model(model, attributes, subject)
66 def fromstring_into_model(model, subject, daf_string):
67 """Read a string containing a DAF into RDF Model
69 requires a short submission name
71 attributes = fromstring(daf_string)
72 add_to_model(model, attributes, subject)
76 """Parse daf from a file
78 stream = open(filename, 'r')
79 attributes = parse_stream(stream)
84 def fromstring(daf_string):
85 """Parse UCSC daf from a provided string"""
86 stream = StringIO(daf_string)
87 return parse_stream(stream)
90 def parse_stream(stream):
91 """Parse UCSC dat stored in a stream"""
92 comment_re = re.compile("#.*$")
95 attributes = {'views': {}}
100 line = comment_re.sub("", line)
101 nstop = _extract_name_index(line)
103 sstop = _consume_whitespace(line, start=nstop)
104 vstop = _extract_value_index(line, start=sstop)
105 value = line[sstop:vstop]
107 if value.lower() in ('yes',):
109 elif value.lower() in ('no',):
113 if view_name is not None:
114 attributes['views'][view_name] = view_attributes
118 elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
119 attributes[name] = [x.strip() for x in value.split(',')]
120 elif state == DAF_HEADER and name == 'view':
122 view_attributes['view'] = value
124 elif state == DAF_HEADER:
125 attributes[name] = value
126 elif state == DAF_VIEW:
127 view_attributes[name] = value
130 if view_name is not None:
131 attributes['views'][view_name] = view_attributes
133 LOGGER.debug("DAF Attributes" + pformat(attributes))
137 def _consume_whitespace(line, start=0):
138 """return index of next non whitespace character
140 returns length of string if it can't find anything
142 for i in range(start, len(line)):
143 if line[i] not in string.whitespace:
149 def _extract_name_index(line, start=0):
150 """Used to find end of word by looking for a whitespace character
152 returns length of string if nothing matches
154 for i in range(start, len(line)):
155 if line[i] in string.whitespace:
161 def _extract_value_index(line, start=0):
162 """Returns position of last non-whitespace character
164 shortline = line.rstrip()
165 return len(shortline)
168 def convert_to_rdf_statements(attributes, subject):
169 """Convert dictionary of DAF attributes into rdf statements
171 The statements are attached to the provided subject node
173 variables_term = dafTermOntology[VARIABLES_TERM_NAME]
175 for daf_key in attributes:
176 predicate = dafTermOntology[daf_key]
177 if daf_key == 'views':
178 statements.extend(_views_to_statements(subject,
180 attributes[daf_key]))
181 elif daf_key in DAF_VARIABLE_NAMES:
182 for var in attributes.get(daf_key, []):
183 obj = toTypedNode(var)
184 statements.append(RDF.Statement(subject, variables_term, obj))
186 value = attributes[daf_key]
187 obj = toTypedNode(value)
188 statements.append(RDF.Statement(subject, predicate, obj))
193 def _views_to_statements(subject, dafNS, views):
194 """Attach view attributes to new view nodes atached to provided subject
196 viewNS = get_view_namespace(subject)
199 for view_name in views:
200 view_attributes = views[view_name]
201 viewSubject = viewNS[view_name]
202 statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
204 RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
205 for view_attribute_name in view_attributes:
206 predicate = dafNS[view_attribute_name]
207 obj = toTypedNode(view_attributes[view_attribute_name])
208 statements.append(RDF.Statement(viewSubject, predicate, obj))
210 #statements.extend(convert_to_rdf_statements(view, viewNode))
214 def add_to_model(model, attributes, subject):
215 for statement in convert_to_rdf_statements(attributes, subject):
216 model.add_statement(statement)
219 def get_submission_uri(name):
220 return submissionLog[name].uri
223 def submission_uri_to_string(submission_uri):
224 if isinstance(submission_uri, RDF.Node):
225 submission_uri = str(submission_uri.uri)
226 elif isinstance(submission_uri, RDF.Uri):
227 submission_uri = str(submission_uri)
228 if submission_uri[-1] != '/':
229 submission_uri += '/'
230 return submission_uri
233 def get_view_namespace(submission_uri):
234 submission_uri = submission_uri_to_string(submission_uri)
235 view_uri = urllib.parse.urljoin(submission_uri, 'view/')
236 viewNS = RDF.NS(view_uri)
240 class UCSCSubmission(object):
241 """Build a submission by examining the DAF for what we need to submit
243 def __init__(self, name, daf_file=None, model=None):
244 """Construct a RDF backed model of a UCSC DAF
247 name (str): the name of this submission (used to construct DAF url)
248 daf_file (str, stream, or None):
249 if str, use as filename
250 if stream, parse as stream
251 if none, don't attempt to load the DAF into our model
252 model (RDF.Model or None):
253 if None, construct a memory backed model
254 otherwise specifies model to use
256 if daf_file is None and model is None:
257 LOGGER.error("We need a DAF or Model containing a DAF to work")
260 self.submissionSet = get_submission_uri(self.name)
261 self.viewNS = get_view_namespace(self.submissionSet)
263 if model is not None:
266 self.model = get_model()
268 if hasattr(daf_file, 'next'):
269 # its some kind of stream
270 self.daf = daf_file.read()
273 stream = open(daf_file, 'r')
274 self.daf = stream.read()
277 fromstring_into_model(self.model, self.submissionSet, self.daf)
279 self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
280 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
281 self.__view_map = None
283 def _get_daf_name(self):
284 return self.name + '.daf'
285 daf_name = property(_get_daf_name,doc="construct name for DAF file")
287 def add_pattern(self, view_name, filename_pattern):
288 """Map a filename regular expression to a view name
290 obj = toTypedNode(filename_pattern)
291 self.model.add_statement(
292 RDF.Statement(self.viewNS[view_name],
293 dafTermOntology['filename_re'],
296 def scan_submission_dirs(self, result_map):
297 """Examine files in our result directory
299 for lib_id, result_dir in list(result_map.items()):
300 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
302 self.import_submission_dir(result_dir, lib_id)
303 except MetadataLookupException as e:
304 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
306 def import_submission_dir(self, submission_dir, library_id):
307 """Import a submission directories and update our model as needed
309 #attributes = get_filename_attribute_map(paired)
310 libNode = self.libraryNS[library_id + "/"]
312 self._add_library_details_to_model(libNode)
314 submission_files = os.listdir(submission_dir)
315 for filename in submission_files:
316 self.construct_track_attributes(submission_dir, libNode, filename)
318 def construct_track_attributes(self, submission_dir, libNode, pathname):
319 """Looking for the best extension
320 The 'best' is the longest match
323 filename (str): the filename whose extention we are about to examine
325 path, filename = os.path.split(pathname)
327 LOGGER.debug("Searching for view")
328 view = self.find_view(filename)
330 LOGGER.warn("Unrecognized file: {0}".format(pathname))
332 if str(view) == str(libraryOntology['ignore']):
335 submission_name = self.make_submission_name(submission_dir)
336 submissionNode = self.get_submission_node(submission_dir)
337 submission_uri = str(submissionNode.uri)
338 view_name = fromTypedNode(self.model.get_target(view,
339 dafTermOntology['name']))
340 if view_name is None:
341 errmsg = 'Could not find view name for {0}'
342 LOGGER.warning(errmsg.format(str(view)))
345 view_name = str(view_name)
346 submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
348 self.model.add_statement(
349 RDF.Statement(self.submissionSet,
350 dafTermOntology['has_submission'],
352 LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
353 self.model.add_statement(RDF.Statement(submissionNode,
354 submissionOntology['has_view'],
356 self.model.add_statement(RDF.Statement(submissionNode,
357 submissionOntology['name'],
358 toTypedNode(submission_name)))
359 self.model.add_statement(
360 RDF.Statement(submissionNode,
362 submissionOntology['submission']))
363 self.model.add_statement(RDF.Statement(submissionNode,
364 libraryOntology['library'],
367 LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
368 # add track specific information
369 self.model.add_statement(
370 RDF.Statement(submissionView, dafTermOntology['view'], view))
371 self.model.add_statement(
372 RDF.Statement(submissionView,
373 dafTermOntology['paired'],
374 toTypedNode(self._is_paired(libNode))))
375 self.model.add_statement(
376 RDF.Statement(submissionView,
377 dafTermOntology['submission'],
380 # add file specific information
381 self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
383 LOGGER.debug("Done.")
385 def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
386 # add file specific information
387 LOGGER.debug("Updating file md5sum")
388 submission_pathname = os.path.join(submission_dir, filename)
389 fileNode = RDF.Node(RDF.Uri("file://" + submission_pathname))
390 self.model.add_statement(
391 RDF.Statement(submissionView,
392 dafTermOntology['has_file'],
394 self.model.add_statement(
395 RDF.Statement(fileNode,
396 dafTermOntology['filename'],
399 md5 = make_md5sum(submission_pathname)
401 errmsg = "Unable to produce md5sum for {0}"
402 LOGGER.warning(errmsg.format(submission_pathname))
404 self.model.add_statement(
405 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
407 def _add_library_details_to_model(self, libNode):
408 parser = RDF.Parser(name='rdfa')
409 new_statements = parser.parse_as_stream(libNode.uri)
410 for s in new_statements:
411 # don't override things we already have in the model
412 targets = list(self.model.get_targets(s.subject, s.predicate))
413 if len(targets) == 0:
416 def get_daf_variables(self):
417 """Returns simple variables names that to include in the ddf
419 variables_term = dafTermOntology[VARIABLES_TERM_NAME]
421 results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
422 results = DAF_PRE_VARIABLES[:]
423 if self.need_replicate() and 'replicate' not in results:
424 results.append('replicate')
426 for obj in self.model.get_targets(self.submissionSet, variables_term):
427 value = str(fromTypedNode(obj))
428 if value not in results:
429 results.append(value)
430 results.extend([v for v in DAF_POST_VARIABLES if v not in results])
433 def make_submission_name(self, submission_dir):
434 submission_dir = os.path.normpath(submission_dir)
435 submission_dir_name = os.path.split(submission_dir)[1]
436 if len(submission_dir_name) == 0:
438 "Submission dir name too short: {0}".format(submission_dir))
439 return submission_dir_name
441 def get_submission_node(self, submission_dir):
442 """Convert a submission directory name to a submission node
444 submission_name = self.make_submission_name(submission_dir)
445 return self.submissionSetNS[submission_name]
447 def _get_library_attribute(self, libNode, attribute):
448 if not isinstance(attribute, RDF.Node):
449 attribute = libraryOntology[attribute]
451 targets = list(self.model.get_targets(libNode, attribute))
453 return self._format_library_attribute(targets)
457 #targets = self._search_same_as(libNode, attribute)
458 #if targets is not None:
459 # return self._format_library_attribute(targets)
461 # we don't know anything about this attribute
462 self._add_library_details_to_model(libNode)
464 targets = list(self.model.get_targets(libNode, attribute))
466 return self._format_library_attribute(targets)
470 def _format_library_attribute(self, targets):
471 if len(targets) == 0:
473 elif len(targets) == 1:
474 return fromTypedNode(targets[0])
475 elif len(targets) > 1:
476 return [fromTypedNode(t) for t in targets]
478 def _search_same_as(self, subject, predicate):
479 # look for alternate names
480 other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
481 for other in other_predicates:
482 targets = list(self.model.get_targets(subject, other))
487 def find_view(self, filename):
488 """Search through potential DAF filename patterns
490 if self.__view_map is None:
491 self.__view_map = self._get_filename_view_map()
494 for pattern, view in list(self.__view_map.items()):
495 if re.match(pattern, filename):
499 msg = "%s matched multiple views %s" % (
501 [str(x) for x in results])
502 raise ModelException(msg)
503 elif len(results) == 1:
508 def get_view_name(self, view):
509 view_term = submissionOntology['view_name']
510 names = list(self.model.get_targets(view, view_term))
512 return fromTypedNode(names[0])
514 msg = "Found wrong number of view names for {0} len = {1}"
515 msg = msg.format(str(view), len(names))
517 raise RuntimeError(msg)
519 def _get_filename_view_map(self):
520 """Query our model for filename patterns
522 return a dictionary of compiled regular expressions to view names
524 filename_query = RDF.Statement(
525 None, dafTermOntology['filename_re'], None)
528 for s in self.model.find_statements(filename_query):
529 view_name = s.subject
530 literal_re = s.object.literal_value['string']
531 LOGGER.debug("Found: %s" % (literal_re,))
533 filename_re = re.compile(literal_re)
534 except re.error as e:
535 LOGGER.error("Unable to compile: %s" % (literal_re,))
536 patterns[literal_re] = view_name
539 def _get_library_url(self):
540 return str(self.libraryNS[''].uri)
542 def _set_library_url(self, value):
543 self.libraryNS = RDF.NS(str(value))
545 library_url = property(_get_library_url, _set_library_url)
547 def _is_paired(self, libNode):
548 """Determine if a library is paired end"""
549 library_type = self._get_library_attribute(libNode, 'library_type')
550 if library_type is None:
551 errmsg = "%s doesn't have a library type"
552 raise ModelException(errmsg % (str(libNode),))
554 single = ['CSHL (lacking last nt)',
555 'Single End (non-multiplexed)',
556 'Small RNA (non-multiplexed)',]
557 paired = ['Barcoded Illumina',
560 'Paired End (non-multiplexed)',]
561 if library_type in single:
563 elif library_type in paired:
566 raise MetadataLookupException(
567 "Unrecognized library type %s for %s" % \
568 (library_type, str(libNode)))
570 def need_replicate(self):
571 viewTerm = dafTermOntology['views']
572 replicateTerm = dafTermOntology['hasReplicates']
574 views = self.model.get_targets(self.submissionSet, viewTerm)
577 replicate = self.model.get_target(view, replicateTerm)
578 if fromTypedNode(replicate):
584 def link_daf(self, result_map):
585 if self.daf is None or len(self.daf) == 0:
587 "DAF data does not exist, how can I link to it?")
589 base_daf = self.daf_name
591 for result_dir in list(result_map.values()):
592 if not os.path.exists(result_dir):
594 "Couldn't find target directory %s" %(result_dir,))
595 submission_daf = os.path.join(result_dir, base_daf)
596 if os.path.exists(submission_daf):
597 previous_daf = open(submission_daf, 'r').read()
598 if self.daf != previous_daf:
599 LOGGER.info("Old daf is different, overwriting it.")
600 stream = open(submission_daf, 'w')
601 stream.write(self.daf)
605 if __name__ == "__main__":
606 example_daf = """# Lab and general info
610 variables cell, antibody,sex,age,strain,control
611 compositeSuffix CaltechHistone
614 validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
616 # Track/view definition
618 longLabelPrefix Caltech Fastq Read 1
624 longLabelPrefix Caltech Histone Signal
630 example_daf_stream = StringIO(example_daf)
632 mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)