6 from pprint import pformat
9 from six.moves import StringIO
11 from six.moves import urllib
14 from htsworkflow.util.rdfhelp import \
26 from htsworkflow.util.hashfile import make_md5sum
28 LOGGER = logging.getLogger(__name__)
30 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
31 VARIABLES_TERM_NAME = 'variables'
32 DAF_PRE_VARIABLES = ['files', 'view']
33 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
36 class ModelException(RuntimeError):
37 """Assumptions about the RDF model failed"""
41 class MetadataLookupException(RuntimeError):
42 """Problem accessing metadata"""
51 def parse_into_model(model, subject, filename):
52 """Read a DAF into RDF Model
54 requires a subject node to attach statements to
56 attributes = parse(filename)
57 add_to_model(model, attributes, subject)
60 def fromstream_into_model(model, subject, daf_stream):
61 """Load daf stream into model attached to node subject
63 attributes = parse_stream(daf_stream)
64 add_to_model(model, attributes, subject)
67 def fromstring_into_model(model, subject, daf_string):
68 """Read a string containing a DAF into RDF Model
70 requires a short submission name
72 attributes = fromstring(daf_string)
73 add_to_model(model, attributes, subject)
77 """Parse daf from a file
79 stream = open(filename, 'r')
80 attributes = parse_stream(stream)
85 def fromstring(daf_string):
86 """Parse UCSC daf from a provided string"""
87 stream = StringIO(daf_string)
88 return parse_stream(stream)
91 def parse_stream(stream):
92 """Parse UCSC dat stored in a stream"""
93 comment_re = re.compile("#.*$")
96 attributes = {'views': {}}
101 line = comment_re.sub("", line)
102 nstop = _extract_name_index(line)
104 sstop = _consume_whitespace(line, start=nstop)
105 vstop = _extract_value_index(line, start=sstop)
106 value = line[sstop:vstop]
108 if value.lower() in ('yes',):
110 elif value.lower() in ('no',):
114 if view_name is not None:
115 attributes['views'][view_name] = view_attributes
119 elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
120 attributes[name] = [x.strip() for x in value.split(',')]
121 elif state == DAF_HEADER and name == 'view':
123 view_attributes['view'] = value
125 elif state == DAF_HEADER:
126 attributes[name] = value
127 elif state == DAF_VIEW:
128 view_attributes[name] = value
131 if view_name is not None:
132 attributes['views'][view_name] = view_attributes
134 LOGGER.debug("DAF Attributes" + pformat(attributes))
138 def _consume_whitespace(line, start=0):
139 """return index of next non whitespace character
141 returns length of string if it can't find anything
143 for i, c in enumerate(line[start:]):
144 if c not in string.whitespace:
150 def _extract_name_index(line, start=0):
151 """Used to find end of word by looking for a whitespace character
153 returns length of string if nothing matches
155 for i, c in enumerate(line[start:]):
156 if c in string.whitespace:
162 def _extract_value_index(line, start=0):
163 """Returns position of last non-whitespace character
165 shortline = line.rstrip()
166 return len(shortline)
169 def convert_to_rdf_statements(attributes, subject):
170 """Convert dictionary of DAF attributes into rdf statements
172 The statements are attached to the provided subject node
174 variables_term = dafTermOntology[VARIABLES_TERM_NAME]
176 for daf_key in attributes:
177 predicate = dafTermOntology[daf_key]
178 if daf_key == 'views':
179 statements.extend(_views_to_statements(subject,
181 attributes[daf_key]))
182 elif daf_key in DAF_VARIABLE_NAMES:
183 for var in attributes.get(daf_key, []):
184 obj = toTypedNode(var)
185 statements.append(RDF.Statement(subject, variables_term, obj))
187 value = attributes[daf_key]
188 obj = toTypedNode(value)
189 statements.append(RDF.Statement(subject, predicate, obj))
194 def _views_to_statements(subject, dafNS, views):
195 """Attach view attributes to new view nodes atached to provided subject
197 viewNS = get_view_namespace(subject)
200 for view_name in views:
201 view_attributes = views[view_name]
202 viewSubject = viewNS[view_name]
203 statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
205 RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
206 for view_attribute_name in view_attributes:
207 predicate = dafNS[view_attribute_name]
208 obj = toTypedNode(view_attributes[view_attribute_name])
209 statements.append(RDF.Statement(viewSubject, predicate, obj))
211 #statements.extend(convert_to_rdf_statements(view, viewNode))
215 def add_to_model(model, attributes, subject):
216 for statement in convert_to_rdf_statements(attributes, subject):
217 model.add_statement(statement)
220 def get_submission_uri(name):
221 return submissionLog[name].uri
224 def submission_uri_to_string(submission_uri):
225 if isinstance(submission_uri, RDF.Node):
226 submission_uri = str(submission_uri.uri)
227 elif isinstance(submission_uri, RDF.Uri):
228 submission_uri = str(submission_uri)
229 if submission_uri[-1] != '/':
230 submission_uri += '/'
231 return submission_uri
234 def get_view_namespace(submission_uri):
235 submission_uri = submission_uri_to_string(submission_uri)
236 view_uri = urllib.parse.urljoin(submission_uri, 'view/')
237 viewNS = RDF.NS(view_uri)
241 class UCSCSubmission(object):
242 """Build a submission by examining the DAF for what we need to submit
244 def __init__(self, name, daf_file=None, model=None):
245 """Construct a RDF backed model of a UCSC DAF
248 name (str): the name of this submission (used to construct DAF url)
249 daf_file (str, stream, or None):
250 if str, use as filename
251 if stream, parse as stream
252 if none, don't attempt to load the DAF into our model
253 model (RDF.Model or None):
254 if None, construct a memory backed model
255 otherwise specifies model to use
257 if daf_file is None and model is None:
258 LOGGER.error("We need a DAF or Model containing a DAF to work")
261 self.submissionSet = get_submission_uri(self.name)
262 self.viewNS = get_view_namespace(self.submissionSet)
264 if model is not None:
267 self.model = get_model()
269 if isinstance(daf_file, collections.Iterable):
270 # its some kind of stream
271 self.daf = daf_file.read()
274 stream = open(daf_file, 'rt')
275 self.daf = stream.read()
278 fromstring_into_model(self.model, self.submissionSet, self.daf)
280 self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
281 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
282 self.__view_map = None
284 def _get_daf_name(self):
285 return self.name + '.daf'
286 daf_name = property(_get_daf_name,doc="construct name for DAF file")
288 def add_pattern(self, view_name, filename_pattern):
289 """Map a filename regular expression to a view name
291 obj = toTypedNode(filename_pattern)
292 self.model.add_statement(
293 RDF.Statement(self.viewNS[view_name],
294 dafTermOntology['filename_re'],
297 def scan_submission_dirs(self, result_map):
298 """Examine files in our result directory
300 for lib_id, result_dir in result_map.items():
301 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
303 self.import_submission_dir(result_dir, lib_id)
304 except MetadataLookupException as e:
305 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
307 def import_submission_dir(self, submission_dir, library_id):
308 """Import a submission directories and update our model as needed
310 #attributes = get_filename_attribute_map(paired)
311 libNode = self.libraryNS[library_id + "/"]
313 self._add_library_details_to_model(libNode)
315 submission_files = os.listdir(submission_dir)
316 for filename in submission_files:
317 self.construct_track_attributes(submission_dir, libNode, filename)
319 def construct_track_attributes(self, submission_dir, libNode, pathname):
320 """Looking for the best extension
321 The 'best' is the longest match
324 filename (str): the filename whose extention we are about to examine
326 path, filename = os.path.split(pathname)
328 LOGGER.debug("Searching for view")
329 view = self.find_view(filename)
331 LOGGER.warn("Unrecognized file: {0}".format(pathname))
333 if str(view) == str(libraryOntology['ignore']):
336 submission_name = self.make_submission_name(submission_dir)
337 submissionNode = self.get_submission_node(submission_dir)
338 submission_uri = str(submissionNode.uri)
339 view_name = fromTypedNode(self.model.get_target(view,
340 dafTermOntology['name']))
341 if view_name is None:
342 errmsg = 'Could not find view name for {0}'
343 LOGGER.warning(errmsg.format(str(view)))
346 view_name = str(view_name)
347 submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
349 self.model.add_statement(
350 RDF.Statement(self.submissionSet,
351 dafTermOntology['has_submission'],
353 LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
354 self.model.add_statement(RDF.Statement(submissionNode,
355 submissionOntology['has_view'],
357 self.model.add_statement(RDF.Statement(submissionNode,
358 submissionOntology['name'],
359 toTypedNode(submission_name)))
360 self.model.add_statement(
361 RDF.Statement(submissionNode,
363 submissionOntology['submission']))
364 self.model.add_statement(RDF.Statement(submissionNode,
365 libraryOntology['library'],
368 LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
369 # add track specific information
370 self.model.add_statement(
371 RDF.Statement(submissionView, dafTermOntology['view'], view))
372 self.model.add_statement(
373 RDF.Statement(submissionView,
374 dafTermOntology['paired'],
375 toTypedNode(self._is_paired(libNode))))
376 self.model.add_statement(
377 RDF.Statement(submissionView,
378 dafTermOntology['submission'],
381 # add file specific information
382 self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
384 LOGGER.debug("Done.")
386 def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
387 # add file specific information
388 LOGGER.debug("Updating file md5sum")
389 submission_pathname = os.path.join(submission_dir, filename)
390 fileNode = RDF.Node(RDF.Uri("file://" + submission_pathname))
391 self.model.add_statement(
392 RDF.Statement(submissionView,
393 dafTermOntology['has_file'],
395 self.model.add_statement(
396 RDF.Statement(fileNode,
397 dafTermOntology['filename'],
400 md5 = make_md5sum(submission_pathname)
402 errmsg = "Unable to produce md5sum for {0}"
403 LOGGER.warning(errmsg.format(submission_pathname))
405 self.model.add_statement(
406 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
408 def _add_library_details_to_model(self, libNode):
409 parser = RDF.Parser(name='rdfa')
410 new_statements = parser.parse_as_stream(libNode.uri)
411 for s in new_statements:
412 # don't override things we already have in the model
413 targets = list(self.model.get_targets(s.subject, s.predicate))
414 if len(targets) == 0:
417 def get_daf_variables(self):
418 """Returns simple variables names that to include in the ddf
420 variables_term = dafTermOntology[VARIABLES_TERM_NAME]
422 results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
423 results = DAF_PRE_VARIABLES[:]
424 if self.need_replicate() and 'replicate' not in results:
425 results.append('replicate')
427 for obj in self.model.get_targets(self.submissionSet, variables_term):
428 value = str(fromTypedNode(obj))
429 if value not in results:
430 results.append(value)
431 results.extend([v for v in DAF_POST_VARIABLES if v not in results])
434 def make_submission_name(self, submission_dir):
435 submission_dir = os.path.normpath(submission_dir)
436 submission_dir_name = os.path.split(submission_dir)[1]
437 if len(submission_dir_name) == 0:
439 "Submission dir name too short: {0}".format(submission_dir))
440 return submission_dir_name
442 def get_submission_node(self, submission_dir):
443 """Convert a submission directory name to a submission node
445 submission_name = self.make_submission_name(submission_dir)
446 return self.submissionSetNS[submission_name]
448 def _get_library_attribute(self, libNode, attribute):
449 if not isinstance(attribute, RDF.Node):
450 attribute = libraryOntology[attribute]
452 targets = list(self.model.get_targets(libNode, attribute))
454 return self._format_library_attribute(targets)
458 #targets = self._search_same_as(libNode, attribute)
459 #if targets is not None:
460 # return self._format_library_attribute(targets)
462 # we don't know anything about this attribute
463 self._add_library_details_to_model(libNode)
465 targets = list(self.model.get_targets(libNode, attribute))
467 return self._format_library_attribute(targets)
471 def _format_library_attribute(self, targets):
472 if len(targets) == 0:
474 elif len(targets) == 1:
475 return fromTypedNode(targets[0])
476 elif len(targets) > 1:
477 return [fromTypedNode(t) for t in targets]
479 def _search_same_as(self, subject, predicate):
480 # look for alternate names
481 other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
482 for other in other_predicates:
483 targets = list(self.model.get_targets(subject, other))
488 def find_view(self, filename):
489 """Search through potential DAF filename patterns
491 if self.__view_map is None:
492 self.__view_map = self._get_filename_view_map()
495 for pattern, view in self.__view_map.items():
496 if re.match(pattern, filename):
500 msg = "%s matched multiple views %s" % (
502 [str(x) for x in results])
503 raise ModelException(msg)
504 elif len(results) == 1:
509 def get_view_name(self, view):
510 view_term = submissionOntology['view_name']
511 names = list(self.model.get_targets(view, view_term))
513 return fromTypedNode(names[0])
515 msg = "Found wrong number of view names for {0} len = {1}"
516 msg = msg.format(str(view), len(names))
518 raise RuntimeError(msg)
520 def _get_filename_view_map(self):
521 """Query our model for filename patterns
523 return a dictionary of compiled regular expressions to view names
525 filename_query = RDF.Statement(
526 None, dafTermOntology['filename_re'], None)
529 for s in self.model.find_statements(filename_query):
530 view_name = s.subject
531 literal_re = s.object.literal_value['string']
532 LOGGER.debug("Found: %s" % (literal_re,))
534 filename_re = re.compile(literal_re)
535 except re.error as e:
536 LOGGER.error("Unable to compile: %s" % (literal_re,))
537 patterns[literal_re] = view_name
540 def _get_library_url(self):
541 return str(self.libraryNS[''].uri)
543 def _set_library_url(self, value):
544 self.libraryNS = RDF.NS(str(value))
546 library_url = property(_get_library_url, _set_library_url)
548 def _is_paired(self, libNode):
549 """Determine if a library is paired end"""
550 library_type = self._get_library_attribute(libNode, 'library_type')
551 if library_type is None:
552 errmsg = "%s doesn't have a library type"
553 raise ModelException(errmsg % (str(libNode),))
555 single = ['CSHL (lacking last nt)',
556 'Single End (non-multiplexed)',
557 'Small RNA (non-multiplexed)',]
558 paired = ['Barcoded Illumina',
561 'Paired End (non-multiplexed)',]
562 if library_type in single:
564 elif library_type in paired:
567 raise MetadataLookupException(
568 "Unrecognized library type %s for %s" % \
569 (library_type, str(libNode)))
571 def need_replicate(self):
572 viewTerm = dafTermOntology['views']
573 replicateTerm = dafTermOntology['hasReplicates']
575 views = self.model.get_targets(self.submissionSet, viewTerm)
578 replicate = self.model.get_target(view, replicateTerm)
579 if fromTypedNode(replicate):
585 def link_daf(self, result_map):
586 if self.daf is None or len(self.daf) == 0:
588 "DAF data does not exist, how can I link to it?")
590 base_daf = self.daf_name
592 for result_dir in result_map.values():
593 if not os.path.exists(result_dir):
595 "Couldn't find target directory %s" %(result_dir,))
596 submission_daf = os.path.join(result_dir, base_daf)
597 if os.path.exists(submission_daf):
598 previous_daf = open(submission_daf, 'r').read()
599 if self.daf != previous_daf:
600 LOGGER.info("Old daf is different, overwriting it.")
601 stream = open(submission_daf, 'w')
602 stream.write(self.daf)
606 if __name__ == "__main__":
607 example_daf = """# Lab and general info
611 variables cell, antibody,sex,age,strain,control
612 compositeSuffix CaltechHistone
615 validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
617 # Track/view definition
619 longLabelPrefix Caltech Fastq Read 1
625 longLabelPrefix Caltech Histone Signal
631 example_daf_stream = StringIO(example_daf)
633 mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)