7 from StringIO import StringIO
12 from htsworkflow.util.rdfhelp import \
23 from htsworkflow.util.hashfile import make_md5sum
25 logger = logging.getLogger(__name__)
28 class ModelException(RuntimeError):
29 """Assumptions about the RDF model failed"""
33 class MetadataLookupException(RuntimeError):
34 """Problem accessing metadata"""
43 def parse_into_model(model, subject, filename):
44 """Read a DAF into RDF Model
46 requires a subject node to attach statements to
48 attributes = parse(filename)
49 add_to_model(model, attributes, subject)
52 def fromstream_into_model(model, subject, daf_stream):
53 """Load daf stream into model attached to node subject
55 attributes = parse_stream(daf_stream)
56 add_to_model(model, attributes, subject)
59 def fromstring_into_model(model, subject, daf_string):
60 """Read a string containing a DAF into RDF Model
62 requires a short submission name
64 attributes = fromstring(daf_string)
65 add_to_model(model, attributes, subject)
69 """Parse daf from a file
71 stream = open(filename, 'r')
72 attributes = parse_stream(stream)
77 def fromstring(daf_string):
78 """Parse UCSC daf from a provided string"""
79 stream = StringIO(daf_string)
80 return parse_stream(stream)
83 def parse_stream(stream):
84 """Parse UCSC dat stored in a stream"""
85 comment_re = re.compile("#.*$")
88 attributes = {'views': {}}
93 line = comment_re.sub("", line)
94 nstop = _extract_name_index(line)
96 sstop = _consume_whitespace(line, start=nstop)
97 vstop = _extract_value_index(line, start=sstop)
98 value = line[sstop:vstop]
100 if value.lower() in ('yes',):
102 elif value.lower() in ('no',):
106 if view_name is not None:
107 attributes['views'][view_name] = view_attributes
111 elif state == DAF_HEADER and name == 'variables':
112 attributes[name] = [x.strip() for x in value.split(',')]
113 elif state == DAF_HEADER and name == 'view':
115 view_attributes['view'] = value
117 elif state == DAF_HEADER:
118 attributes[name] = value
119 elif state == DAF_VIEW:
120 view_attributes[name] = value
123 if view_name is not None:
124 attributes['views'][view_name] = view_attributes
129 def _consume_whitespace(line, start=0):
130 """return index of next non whitespace character
132 returns length of string if it can't find anything
134 for i in xrange(start, len(line)):
135 if line[i] not in string.whitespace:
141 def _extract_name_index(line, start=0):
142 """Used to find end of word by looking for a whitespace character
144 returns length of string if nothing matches
146 for i in xrange(start, len(line)):
147 if line[i] in string.whitespace:
153 def _extract_value_index(line, start=0):
154 """Returns position of last non-whitespace character
156 shortline = line.rstrip()
157 return len(shortline)
160 def convert_to_rdf_statements(attributes, subject):
161 """Convert dictionary of DAF attributes into rdf statements
163 The statements are attached to the provided subject node
166 for daf_key in attributes:
167 predicate = dafTermOntology[daf_key]
168 if daf_key == 'views':
169 statements.extend(_views_to_statements(subject,
171 attributes[daf_key]))
172 elif daf_key == 'variables':
173 #predicate = ddfNS['variables']
174 for var in attributes.get('variables', []):
175 obj = toTypedNode(var)
176 statements.append(RDF.Statement(subject, predicate, obj))
178 value = attributes[daf_key]
179 obj = toTypedNode(value)
180 statements.append(RDF.Statement(subject, predicate, obj))
185 def _views_to_statements(subject, dafNS, views):
186 """Attach view attributes to new view nodes atached to provided subject
188 viewNS = get_view_namespace(subject)
191 for view_name in views:
192 view_attributes = views[view_name]
193 viewSubject = viewNS[view_name]
194 statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
196 RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
197 for view_attribute_name in view_attributes:
198 predicate = dafNS[view_attribute_name]
199 obj = toTypedNode(view_attributes[view_attribute_name])
200 statements.append(RDF.Statement(viewSubject, predicate, obj))
202 #statements.extend(convert_to_rdf_statements(view, viewNode))
206 def add_to_model(model, attributes, subject):
207 for statement in convert_to_rdf_statements(attributes, subject):
208 model.add_statement(statement)
211 def get_submission_uri(name):
212 return submissionLog[name].uri
215 def submission_uri_to_string(submission_uri):
216 if isinstance(submission_uri, RDF.Node):
217 submission_uri = str(submission_uri.uri)
218 elif isinstance(submission_uri, RDF.Uri):
219 submission_uri = str(submission_uri)
220 if submission_uri[-1] != '/':
221 submission_uri += '/'
222 return submission_uri
225 def get_view_namespace(submission_uri):
226 submission_uri = submission_uri_to_string(submission_uri)
227 view_uri = urlparse.urljoin(submission_uri, 'view/')
228 viewNS = RDF.NS(view_uri)
232 class DAFMapper(object):
233 """Convert filenames to views in the UCSC Daf
235 def __init__(self, name, daf_file=None, model=None):
236 """Construct a RDF backed model of a UCSC DAF
239 name (str): the name of this submission (used to construct DAF url)
240 daf_file (str, stream, or None):
241 if str, use as filename
242 if stream, parse as stream
243 if none, don't attempt to load the DAF into our model
244 model (RDF.Model or None):
245 if None, construct a memory backed model
246 otherwise specifies model to use
248 if daf_file is None and model is None:
249 logger.error("We need a DAF or Model containing a DAF to work")
252 self.submissionSet = get_submission_uri(self.name)
253 self.viewNS = get_view_namespace(self.submissionSet)
255 if model is not None:
258 self.model = get_model()
260 if hasattr(daf_file, 'next'):
261 # its some kind of stream
262 fromstream_into_model(self.model, self.submissionSet, daf_file)
265 parse_into_model(self.model, self.submissionSet, daf_file)
267 self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
268 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
269 self.__view_map = None
271 def add_pattern(self, view_name, filename_pattern):
272 """Map a filename regular expression to a view name
274 obj = toTypedNode(filename_pattern)
275 self.model.add_statement(
276 RDF.Statement(self.viewNS[view_name],
277 dafTermOntology['filename_re'],
280 def import_submission_dir(self, submission_dir, library_id):
281 """Import a submission directories and update our model as needed
283 #attributes = get_filename_attribute_map(paired)
284 libNode = self.libraryNS[library_id + "/"]
286 self._add_library_details_to_model(libNode)
288 submission_files = os.listdir(submission_dir)
289 for filename in submission_files:
290 self.construct_track_attributes(submission_dir, libNode, filename)
292 def construct_track_attributes(self, submission_dir, libNode, pathname):
293 """Looking for the best extension
294 The 'best' is the longest match
297 filename (str): the filename whose extention we are about to examine
299 path, filename = os.path.split(pathname)
301 logger.debug("Searching for view")
302 view = self.find_view(filename)
304 logger.warn("Unrecognized file: {0}".format(pathname))
306 if str(view) == str(libraryOntology['ignore']):
309 submission_name = self.make_submission_name(submission_dir)
310 submissionNode = self.get_submission_node(submission_dir)
311 submission_uri = str(submissionNode.uri)
312 view_name = fromTypedNode(self.model.get_target(view,
313 dafTermOntology['name']))
314 if view_name is None:
315 errmsg = 'Could not find view name for {0}'
316 logging.warning(errmsg.format(str(view)))
319 view_name = str(view_name)
320 submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
322 self.model.add_statement(
323 RDF.Statement(self.submissionSet,
324 dafTermOntology['has_submission'],
326 logger.debug("Adding statements to {0}".format(str(submissionNode)))
327 self.model.add_statement(RDF.Statement(submissionNode,
328 submissionOntology['has_view'],
330 self.model.add_statement(RDF.Statement(submissionNode,
331 submissionOntology['name'],
332 toTypedNode(submission_name)))
333 self.model.add_statement(
334 RDF.Statement(submissionNode,
336 submissionOntology['submission']))
337 self.model.add_statement(RDF.Statement(submissionNode,
338 submissionOntology['library'],
341 logger.debug("Adding statements to {0}".format(str(submissionView)))
342 # add track specific information
343 self.model.add_statement(
344 RDF.Statement(submissionView, dafTermOntology['view'], view))
345 self.model.add_statement(
346 RDF.Statement(submissionView,
347 dafTermOntology['paired'],
348 toTypedNode(self._is_paired(libNode))))
349 self.model.add_statement(
350 RDF.Statement(submissionView,
351 dafTermOntology['submission'],
355 terms = [dafTermOntology['type'],
356 dafTermOntology['filename_re'],
358 terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
360 # add file specific information
361 self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
363 logger.debug("Done.")
365 def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
366 # add file specific information
367 logger.debug("Updating file md5sum")
368 fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
369 submission_pathname = os.path.join(submission_dir, filename)
370 self.model.add_statement(
371 RDF.Statement(submissionView,
372 dafTermOntology['has_file'],
374 self.model.add_statement(
375 RDF.Statement(fileNode,
376 dafTermOntology['filename'],
379 md5 = make_md5sum(submission_pathname)
381 errmsg = "Unable to produce md5sum for {0}"
382 logging.warning(errmsg.format(submission_pathname))
384 self.model.add_statement(
385 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
387 def _add_library_details_to_model(self, libNode):
388 parser = RDF.Parser(name='rdfa')
389 new_statements = parser.parse_as_stream(libNode.uri)
390 for s in new_statements:
391 # don't override things we already have in the model
392 targets = list(self.model.get_targets(s.subject, s.predicate))
393 if len(targets) == 0:
396 def get_daf_variables(self):
397 """Returns simple variables names that to include in the ddf
399 variableTerm = dafTermOntology['variables']
401 if self.need_replicate():
402 results.append('replicate')
404 for obj in self.model.get_targets(self.submissionSet, variableTerm):
405 value = str(fromTypedNode(obj))
406 results.append(value)
407 results.append('labVersion')
410 def make_submission_name(self, submission_dir):
411 submission_dir = os.path.normpath(submission_dir)
412 submission_dir_name = os.path.split(submission_dir)[1]
413 if len(submission_dir_name) == 0:
415 "Submission dir name too short: {0}".format(submission_dir))
416 return submission_dir_name
418 def get_submission_node(self, submission_dir):
419 """Convert a submission directory name to a submission node
421 submission_name = self.make_submission_name(submission_dir)
422 return self.submissionSetNS[submission_name]
424 def _get_library_attribute(self, libNode, attribute):
425 if not isinstance(attribute, RDF.Node):
426 attribute = libraryOntology[attribute]
428 targets = list(self.model.get_targets(libNode, attribute))
430 return self._format_library_attribute(targets)
434 #targets = self._search_same_as(libNode, attribute)
435 #if targets is not None:
436 # return self._format_library_attribute(targets)
438 # we don't know anything about this attribute
439 self._add_library_details_to_model(libNode)
441 targets = list(self.model.get_targets(libNode, attribute))
443 return self._format_library_attribute(targets)
447 def _format_library_attribute(self, targets):
448 if len(targets) == 0:
450 elif len(targets) == 1:
451 return fromTypedNode(targets[0])
452 elif len(targets) > 1:
453 return [fromTypedNode(t) for t in targets]
455 def _search_same_as(self, subject, predicate):
456 # look for alternate names
457 other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
458 for other in other_predicates:
459 targets = list(self.model.get_targets(subject, other))
464 def find_view(self, filename):
465 """Search through potential DAF filename patterns
467 if self.__view_map is None:
468 self.__view_map = self._get_filename_view_map()
471 for pattern, view in self.__view_map.items():
472 if re.match(pattern, filename):
476 msg = "%s matched multiple views %s" % (
478 [str(x) for x in results])
479 raise ModelException(msg)
480 elif len(results) == 1:
485 def get_view_name(self, view):
486 view_term = submissionOntology['view_name']
487 names = list(self.model.get_targets(view, view_term))
489 return fromTypedNode(names[0])
491 msg = "Found wrong number of view names for {0} len = {1}"
492 msg = msg.format(str(view), len(names))
494 raise RuntimeError(msg)
496 def _get_filename_view_map(self):
497 """Query our model for filename patterns
499 return a dictionary of compiled regular expressions to view names
501 filename_query = RDF.Statement(
502 None, dafTermOntology['filename_re'], None)
505 for s in self.model.find_statements(filename_query):
506 view_name = s.subject
507 literal_re = s.object.literal_value['string']
508 logger.debug("Found: %s" % (literal_re,))
510 filename_re = re.compile(literal_re)
512 logger.error("Unable to compile: %s" % (literal_re,))
513 patterns[literal_re] = view_name
516 def _get_library_url(self):
517 return str(self.libraryNS[''].uri)
519 def _set_library_url(self, value):
520 self.libraryNS = RDF.NS(str(value))
522 library_url = property(_get_library_url, _set_library_url)
524 def _is_paired(self, libNode):
525 """Determine if a library is paired end"""
526 library_type = self._get_library_attribute(libNode, 'library_type')
527 if library_type is None:
528 errmsg = "%s doesn't have a library type"
529 raise ModelException(errmsg % (str(libNode),))
532 single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
533 paired = ['Paired End', 'Multiplexing', 'Barcoded']
534 if library_type in single:
536 elif library_type in paired:
539 raise MetadataLookupException(
540 "Unrecognized library type %s for %s" % \
541 (library_type, str(libNode)))
543 def need_replicate(self):
544 viewTerm = dafTermOntology['views']
545 replicateTerm = dafTermOntology['hasReplicates']
547 views = self.model.get_targets(self.submissionSet, viewTerm)
550 replicate = self.model.get_target(view, replicateTerm)
551 if fromTypedNode(replicate):