7 from StringIO import StringIO
12 from htsworkflow.util.rdfhelp import \
23 from htsworkflow.util.hashfile import make_md5sum
25 logger = logging.getLogger(__name__)
27 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
28 VARIABLES_TERM_NAME = 'variables'
29 DAF_PRE_VARIABLES = ['files', 'view']
30 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
33 class ModelException(RuntimeError):
34 """Assumptions about the RDF model failed"""
38 class MetadataLookupException(RuntimeError):
39 """Problem accessing metadata"""
48 def parse_into_model(model, subject, filename):
49 """Read a DAF into RDF Model
51 requires a subject node to attach statements to
53 attributes = parse(filename)
54 add_to_model(model, attributes, subject)
57 def fromstream_into_model(model, subject, daf_stream):
58 """Load daf stream into model attached to node subject
60 attributes = parse_stream(daf_stream)
61 add_to_model(model, attributes, subject)
64 def fromstring_into_model(model, subject, daf_string):
65 """Read a string containing a DAF into RDF Model
67 requires a short submission name
69 attributes = fromstring(daf_string)
70 add_to_model(model, attributes, subject)
74 """Parse daf from a file
76 stream = open(filename, 'r')
77 attributes = parse_stream(stream)
82 def fromstring(daf_string):
83 """Parse UCSC daf from a provided string"""
84 stream = StringIO(daf_string)
85 return parse_stream(stream)
88 def parse_stream(stream):
89 """Parse UCSC dat stored in a stream"""
90 comment_re = re.compile("#.*$")
93 attributes = {'views': {}}
98 line = comment_re.sub("", line)
99 nstop = _extract_name_index(line)
101 sstop = _consume_whitespace(line, start=nstop)
102 vstop = _extract_value_index(line, start=sstop)
103 value = line[sstop:vstop]
105 if value.lower() in ('yes',):
107 elif value.lower() in ('no',):
111 if view_name is not None:
112 attributes['views'][view_name] = view_attributes
116 elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
117 attributes[name] = [x.strip() for x in value.split(',')]
118 elif state == DAF_HEADER and name == 'view':
120 view_attributes['view'] = value
122 elif state == DAF_HEADER:
123 attributes[name] = value
124 elif state == DAF_VIEW:
125 view_attributes[name] = value
128 if view_name is not None:
129 attributes['views'][view_name] = view_attributes
134 def _consume_whitespace(line, start=0):
135 """return index of next non whitespace character
137 returns length of string if it can't find anything
139 for i in xrange(start, len(line)):
140 if line[i] not in string.whitespace:
146 def _extract_name_index(line, start=0):
147 """Used to find end of word by looking for a whitespace character
149 returns length of string if nothing matches
151 for i in xrange(start, len(line)):
152 if line[i] in string.whitespace:
158 def _extract_value_index(line, start=0):
159 """Returns position of last non-whitespace character
161 shortline = line.rstrip()
162 return len(shortline)
165 def convert_to_rdf_statements(attributes, subject):
166 """Convert dictionary of DAF attributes into rdf statements
168 The statements are attached to the provided subject node
170 variables_term = dafTermOntology[VARIABLES_TERM_NAME]
172 for daf_key in attributes:
173 predicate = dafTermOntology[daf_key]
174 if daf_key == 'views':
175 statements.extend(_views_to_statements(subject,
177 attributes[daf_key]))
178 elif daf_key in DAF_VARIABLE_NAMES:
179 for var in attributes.get(daf_key, []):
180 obj = toTypedNode(var)
181 statements.append(RDF.Statement(subject, variables_term, obj))
183 value = attributes[daf_key]
184 obj = toTypedNode(value)
185 statements.append(RDF.Statement(subject, predicate, obj))
190 def _views_to_statements(subject, dafNS, views):
191 """Attach view attributes to new view nodes atached to provided subject
193 viewNS = get_view_namespace(subject)
196 for view_name in views:
197 view_attributes = views[view_name]
198 viewSubject = viewNS[view_name]
199 statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
201 RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
202 for view_attribute_name in view_attributes:
203 predicate = dafNS[view_attribute_name]
204 obj = toTypedNode(view_attributes[view_attribute_name])
205 statements.append(RDF.Statement(viewSubject, predicate, obj))
207 #statements.extend(convert_to_rdf_statements(view, viewNode))
211 def add_to_model(model, attributes, subject):
212 for statement in convert_to_rdf_statements(attributes, subject):
213 model.add_statement(statement)
216 def get_submission_uri(name):
217 return submissionLog[name].uri
220 def submission_uri_to_string(submission_uri):
221 if isinstance(submission_uri, RDF.Node):
222 submission_uri = str(submission_uri.uri)
223 elif isinstance(submission_uri, RDF.Uri):
224 submission_uri = str(submission_uri)
225 if submission_uri[-1] != '/':
226 submission_uri += '/'
227 return submission_uri
230 def get_view_namespace(submission_uri):
231 submission_uri = submission_uri_to_string(submission_uri)
232 view_uri = urlparse.urljoin(submission_uri, 'view/')
233 viewNS = RDF.NS(view_uri)
237 class DAFMapper(object):
238 """Convert filenames to views in the UCSC Daf
240 def __init__(self, name, daf_file=None, model=None):
241 """Construct a RDF backed model of a UCSC DAF
244 name (str): the name of this submission (used to construct DAF url)
245 daf_file (str, stream, or None):
246 if str, use as filename
247 if stream, parse as stream
248 if none, don't attempt to load the DAF into our model
249 model (RDF.Model or None):
250 if None, construct a memory backed model
251 otherwise specifies model to use
253 if daf_file is None and model is None:
254 logger.error("We need a DAF or Model containing a DAF to work")
257 self.submissionSet = get_submission_uri(self.name)
258 self.viewNS = get_view_namespace(self.submissionSet)
260 if model is not None:
263 self.model = get_model()
265 if hasattr(daf_file, 'next'):
266 # its some kind of stream
267 fromstream_into_model(self.model, self.submissionSet, daf_file)
270 parse_into_model(self.model, self.submissionSet, daf_file)
272 self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
273 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
274 self.__view_map = None
276 def add_pattern(self, view_name, filename_pattern):
277 """Map a filename regular expression to a view name
279 obj = toTypedNode(filename_pattern)
280 self.model.add_statement(
281 RDF.Statement(self.viewNS[view_name],
282 dafTermOntology['filename_re'],
285 def import_submission_dir(self, submission_dir, library_id):
286 """Import a submission directories and update our model as needed
288 #attributes = get_filename_attribute_map(paired)
289 libNode = self.libraryNS[library_id + "/"]
291 self._add_library_details_to_model(libNode)
293 submission_files = os.listdir(submission_dir)
294 for filename in submission_files:
295 self.construct_track_attributes(submission_dir, libNode, filename)
297 def construct_track_attributes(self, submission_dir, libNode, pathname):
298 """Looking for the best extension
299 The 'best' is the longest match
302 filename (str): the filename whose extention we are about to examine
304 path, filename = os.path.split(pathname)
306 logger.debug("Searching for view")
307 view = self.find_view(filename)
309 logger.warn("Unrecognized file: {0}".format(pathname))
311 if str(view) == str(libraryOntology['ignore']):
314 submission_name = self.make_submission_name(submission_dir)
315 submissionNode = self.get_submission_node(submission_dir)
316 submission_uri = str(submissionNode.uri)
317 view_name = fromTypedNode(self.model.get_target(view,
318 dafTermOntology['name']))
319 if view_name is None:
320 errmsg = 'Could not find view name for {0}'
321 logging.warning(errmsg.format(str(view)))
324 view_name = str(view_name)
325 submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
327 self.model.add_statement(
328 RDF.Statement(self.submissionSet,
329 dafTermOntology['has_submission'],
331 logger.debug("Adding statements to {0}".format(str(submissionNode)))
332 self.model.add_statement(RDF.Statement(submissionNode,
333 submissionOntology['has_view'],
335 self.model.add_statement(RDF.Statement(submissionNode,
336 submissionOntology['name'],
337 toTypedNode(submission_name)))
338 self.model.add_statement(
339 RDF.Statement(submissionNode,
341 submissionOntology['submission']))
342 self.model.add_statement(RDF.Statement(submissionNode,
343 submissionOntology['library'],
346 logger.debug("Adding statements to {0}".format(str(submissionView)))
347 # add track specific information
348 self.model.add_statement(
349 RDF.Statement(submissionView, dafTermOntology['view'], view))
350 self.model.add_statement(
351 RDF.Statement(submissionView,
352 dafTermOntology['paired'],
353 toTypedNode(self._is_paired(libNode))))
354 self.model.add_statement(
355 RDF.Statement(submissionView,
356 dafTermOntology['submission'],
359 # add file specific information
360 self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
362 logger.debug("Done.")
364 def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
365 # add file specific information
366 logger.debug("Updating file md5sum")
367 fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
368 submission_pathname = os.path.join(submission_dir, filename)
369 self.model.add_statement(
370 RDF.Statement(submissionView,
371 dafTermOntology['has_file'],
373 self.model.add_statement(
374 RDF.Statement(fileNode,
375 dafTermOntology['filename'],
378 md5 = make_md5sum(submission_pathname)
380 errmsg = "Unable to produce md5sum for {0}"
381 logging.warning(errmsg.format(submission_pathname))
383 self.model.add_statement(
384 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
386 def _add_library_details_to_model(self, libNode):
387 parser = RDF.Parser(name='rdfa')
388 new_statements = parser.parse_as_stream(libNode.uri)
389 for s in new_statements:
390 # don't override things we already have in the model
391 targets = list(self.model.get_targets(s.subject, s.predicate))
392 if len(targets) == 0:
395 def get_daf_variables(self):
396 """Returns simple variables names that to include in the ddf
398 variables_term = dafTermOntology[VARIABLES_TERM_NAME]
400 results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
401 results = DAF_PRE_VARIABLES[:]
402 if self.need_replicate() and 'replicate' not in results:
403 results.append('replicate')
405 for obj in self.model.get_targets(self.submissionSet, variables_term):
406 value = str(fromTypedNode(obj))
407 if value not in results:
408 results.append(value)
409 results.extend([v for v in DAF_POST_VARIABLES if v not in results])
412 def make_submission_name(self, submission_dir):
413 submission_dir = os.path.normpath(submission_dir)
414 submission_dir_name = os.path.split(submission_dir)[1]
415 if len(submission_dir_name) == 0:
417 "Submission dir name too short: {0}".format(submission_dir))
418 return submission_dir_name
420 def get_submission_node(self, submission_dir):
421 """Convert a submission directory name to a submission node
423 submission_name = self.make_submission_name(submission_dir)
424 return self.submissionSetNS[submission_name]
426 def _get_library_attribute(self, libNode, attribute):
427 if not isinstance(attribute, RDF.Node):
428 attribute = libraryOntology[attribute]
430 targets = list(self.model.get_targets(libNode, attribute))
432 return self._format_library_attribute(targets)
436 #targets = self._search_same_as(libNode, attribute)
437 #if targets is not None:
438 # return self._format_library_attribute(targets)
440 # we don't know anything about this attribute
441 self._add_library_details_to_model(libNode)
443 targets = list(self.model.get_targets(libNode, attribute))
445 return self._format_library_attribute(targets)
449 def _format_library_attribute(self, targets):
450 if len(targets) == 0:
452 elif len(targets) == 1:
453 return fromTypedNode(targets[0])
454 elif len(targets) > 1:
455 return [fromTypedNode(t) for t in targets]
457 def _search_same_as(self, subject, predicate):
458 # look for alternate names
459 other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
460 for other in other_predicates:
461 targets = list(self.model.get_targets(subject, other))
466 def find_view(self, filename):
467 """Search through potential DAF filename patterns
469 if self.__view_map is None:
470 self.__view_map = self._get_filename_view_map()
473 for pattern, view in self.__view_map.items():
474 if re.match(pattern, filename):
478 msg = "%s matched multiple views %s" % (
480 [str(x) for x in results])
481 raise ModelException(msg)
482 elif len(results) == 1:
487 def get_view_name(self, view):
488 view_term = submissionOntology['view_name']
489 names = list(self.model.get_targets(view, view_term))
491 return fromTypedNode(names[0])
493 msg = "Found wrong number of view names for {0} len = {1}"
494 msg = msg.format(str(view), len(names))
496 raise RuntimeError(msg)
498 def _get_filename_view_map(self):
499 """Query our model for filename patterns
501 return a dictionary of compiled regular expressions to view names
503 filename_query = RDF.Statement(
504 None, dafTermOntology['filename_re'], None)
507 for s in self.model.find_statements(filename_query):
508 view_name = s.subject
509 literal_re = s.object.literal_value['string']
510 logger.debug("Found: %s" % (literal_re,))
512 filename_re = re.compile(literal_re)
514 logger.error("Unable to compile: %s" % (literal_re,))
515 patterns[literal_re] = view_name
518 def _get_library_url(self):
519 return str(self.libraryNS[''].uri)
521 def _set_library_url(self, value):
522 self.libraryNS = RDF.NS(str(value))
524 library_url = property(_get_library_url, _set_library_url)
526 def _is_paired(self, libNode):
527 """Determine if a library is paired end"""
528 library_type = self._get_library_attribute(libNode, 'library_type')
529 if library_type is None:
530 errmsg = "%s doesn't have a library type"
531 raise ModelException(errmsg % (str(libNode),))
533 single = ['CSHL (lacking last nt)',
534 'Single End (non-multiplexed)',
535 'Small RNA (non-multiplexed)',]
536 paired = ['Barcoded Illumina',
539 'Paired End (non-multiplexed)',]
540 if library_type in single:
542 elif library_type in paired:
545 raise MetadataLookupException(
546 "Unrecognized library type %s for %s" % \
547 (library_type, str(libNode)))
549 def need_replicate(self):
550 viewTerm = dafTermOntology['views']
551 replicateTerm = dafTermOntology['hasReplicates']
553 views = self.model.get_targets(self.submissionSet, viewTerm)
556 replicate = self.model.get_target(view, replicateTerm)
557 if fromTypedNode(replicate):