7 from StringIO import StringIO
12 from htsworkflow.util.rdfhelp import \
24 logger = logging.getLogger(__name__)
27 class ModelException(RuntimeError): pass
28 class MetadataLookupException(RuntimeError):
29 """Problem accessing metadata"""
36 def parse_into_model(model, submission_name, filename):
37 """Read a DAF into RDF Model
39 requires a short submission name
41 attributes = parse(filename)
42 add_to_model(model, attributes, submission_name)
44 def fromstream_into_model(model, submission_name, daf_stream):
45 attributes = parse_stream(daf_stream)
46 add_to_model(model, attributes, submission_name)
48 def fromstring_into_model(model, submission_name, daf_string):
49 """Read a string containing a DAF into RDF Model
51 requires a short submission name
53 attributes = fromstring(daf_string)
54 add_to_model(model, attributes, submission_name)
57 stream = open(filename,'r')
58 attributes = parse_stream(stream)
62 def fromstring(daf_string):
63 stream = StringIO(daf_string)
64 return parse_stream(stream)
66 def parse_stream(stream):
67 comment_re = re.compile("#.*$")
70 attributes = {'views': {}}
75 line = comment_re.sub("", line)
76 nstop = _extract_name_index(line)
78 sstop = _consume_whitespace(line, start=nstop)
79 vstop = _extract_value_index(line, start=sstop)
80 value = line[sstop:vstop]
82 if value.lower() in ('yes',):
84 elif value.lower() in ('no',):
88 if view_name is not None:
89 attributes['views'][view_name] = view_attributes
93 elif state == DAF_HEADER and name == 'variables':
94 attributes[name] = [ x.strip() for x in value.split(',')]
95 elif state == DAF_HEADER and name == 'view':
97 view_attributes['view'] = value
99 elif state == DAF_HEADER:
100 attributes[name] = value
101 elif state == DAF_VIEW:
102 view_attributes[name] = value
105 if view_name is not None:
106 attributes['views'][view_name] = view_attributes
110 def _consume_whitespace(line, start=0):
111 for i in xrange(start, len(line)):
112 if line[i] not in string.whitespace:
117 def _extract_name_index(line, start=0):
118 for i in xrange(start, len(line)):
119 if line[i] in string.whitespace:
124 def _extract_value_index(line, start=0):
125 shortline = line.rstrip()
126 return len(shortline)
128 def convert_to_rdf_statements(attributes, name):
129 submission_uri = get_submission_uri(name)
130 subject = RDF.Node(submission_uri)
133 for daf_key in attributes:
134 predicate = dafTermOntology[daf_key]
135 if daf_key == 'views':
136 statements.extend(_views_to_statements(name,
138 attributes[daf_key]))
139 elif daf_key == 'variables':
140 #predicate = ddfNS['variables']
141 for var in attributes.get('variables', []):
142 obj = toTypedNode(var)
143 statements.append(RDF.Statement(subject, predicate, obj))
145 value = attributes[daf_key]
146 obj = toTypedNode(value)
147 statements.append(RDF.Statement(subject,predicate,obj))
151 def _views_to_statements(name, dafNS, views):
152 subject = RDF.Node(get_submission_uri(name))
153 viewNS = get_view_namespace(name)
156 for view_name in views:
157 view_attributes = views[view_name]
158 viewSubject = viewNS[view_name]
159 statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
161 RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
162 for view_attribute_name in view_attributes:
163 predicate = dafNS[view_attribute_name]
164 obj = toTypedNode(view_attributes[view_attribute_name])
165 statements.append(RDF.Statement(viewSubject, predicate, obj))
167 #statements.extend(convert_to_rdf_statements(view, viewNode))
170 def add_to_model(model, attributes, name):
171 for statement in convert_to_rdf_statements(attributes, name):
172 model.add_statement(statement)
174 def get_submission_uri(name):
175 return submissionLog[name].uri
177 def get_view_namespace(name):
178 submission_uri = get_submission_uri(name)
179 viewNS = RDF.NS(str(submission_uri) + '/view/')
182 class DAFMapper(object):
183 """Convert filenames to views in the UCSC Daf
185 def __init__(self, name, daf_file=None, model=None):
186 """Construct a RDF backed model of a UCSC DAF
189 name (str): the name of this submission (used to construct DAF url)
190 daf_file (str, stream, or None):
191 if str, use as filename
192 if stream, parse as stream
193 if none, don't attempt to load the DAF into our model
194 model (RDF.Model or None):
195 if None, construct a memory backed model
196 otherwise specifies model to use
198 if daf_file is None and model is None:
199 logger.error("We need a DAF or Model containing a DAF to work")
202 if model is not None:
205 self.model = get_model()
207 if hasattr(daf_file, 'next'):
208 # its some kind of stream
209 fromstream_into_model(self.model, name, daf_file)
212 parse_into_model(self.model, name, daf_file)
214 self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
215 self.submissionSet = get_submission_uri(self.name)
216 self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/')
217 self.__view_map = None
220 def add_pattern(self, view_name, filename_pattern):
221 """Map a filename regular expression to a view name
223 viewNS = get_view_namespace(self.name)
225 obj = toTypedNode(filename_pattern)
226 self.model.add_statement(
227 RDF.Statement(viewNS[view_name],
228 dafTermOntology['filename_re'],
232 def import_submission_dir(self, submission_dir, library_id):
233 """Import a submission directories and update our model as needed
235 #attributes = get_filename_attribute_map(paired)
236 libNode = self.libraryNS[library_id + "/"]
238 submission_files = os.listdir(submission_dir)
239 for f in submission_files:
240 self.construct_file_attributes(submission_dir, libNode, f)
242 #attributes['md5sum'] = "None"
244 #ext = attributes["filename_re"]
245 #if attributes.get("type", None) == 'fastq':
246 # fastqs.setdefault(ext, set()).add(f)
247 # fastq_attributes[ext] = attributes
249 # md5sum = make_md5sum(os.path.join(result_dir,f))
250 # if md5sum is not None:
251 # attributes['md5sum']=md5sum
255 def construct_file_attributes(self, submission_dir, libNode, pathname):
256 """Looking for the best extension
257 The 'best' is the longest match
260 filename (str): the filename whose extention we are about to examine
262 path, filename = os.path.split(pathname)
264 view = self.find_view(filename)
266 logger.warn("Unrecognized file: %s" % (pathname,))
268 if str(view) == str(libraryOntology['ignore']):
271 submission_name = self.make_submission_name(submission_dir)
272 submissionNode = self.get_submission_node(submission_dir)
273 submission_uri = submissionNode.uri
274 print "submission:", str(submission_name), str(submissionNode), str(submission_uri)
276 view_name = fromTypedNode(self.model.get_target(view, dafTermOntology['name']))
277 submissionView = RDF.Node(RDF.Uri(str(submission_uri) + '/' + view_name))
279 self.model.add_statement(
280 RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode))
282 self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView))
283 self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name)))
284 self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
287 self.model.add_statement(
288 RDF.Statement(submissionView, dafTermOntology['filename'], toTypedNode(filename)))
289 self.model.add_statement(
290 RDF.Statement(submissionView, dafTermOntology['view'], view))
291 self.model.add_statement(
292 RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
293 self.model.add_statement(
294 RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode))
297 terms = [dafTermOntology['type'],
298 dafTermOntology['filename_re'],
300 terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
302 # Add everything I can find
304 value = self._get_library_attribute(libNode, term)
305 if value is not None:
306 self.model.add_statement(RDF.Statement(submissionView, term, value))
309 def _add_library_details_to_model(self, libNode):
310 parser = RDF.Parser(name='rdfa')
311 new_statements = parser.parse_as_stream(libNode.uri)
312 for s in new_statements:
313 # don't override things we already have in the model
314 q = RDF.Statement(s.subject, s.predicate, None)
315 if len(list(self.model.find_statements(q))) == 0:
318 statements = list(self.model.find_statements(q))
319 if len(statements) == 0:
320 logger.warning("Nothing known about %s" % (str(libNode),))
322 def get_daf_variables(self):
323 """Returns simple variables names that to include in the ddf
325 variableTerm = dafTermOntology['variables']
327 for obj in self.model.get_targets(self.submissionSet, variableTerm):
328 value = str(fromTypedNode(obj))
329 results.append(value)
330 results.append('labVersion')
333 def make_submission_name(self, submission_dir):
334 submission_dir = os.path.normpath(submission_dir)
335 submission_dir_name = os.path.split(submission_dir)[1]
336 if len(submission_dir_name) == 0:
338 "Submission dir name too short: %s" %(submission_dir,))
339 return submission_dir_name
341 def get_submission_node(self, submission_dir):
342 """Convert a submission directory name to a submission node
344 submission_name = self.make_submission_name(submission_dir)
345 return self.submissionSetNS[submission_name]
347 def _get_library_attribute(self, libNode, attribute):
348 if not isinstance(attribute, RDF.Node):
349 attribute = libraryOntology[attribute]
351 # search through the model twice (adding in data from website)
353 targets = list(self.model.get_targets(libNode, attribute))
355 return self._format_library_attribute(targets)
357 targets = self._search_same_as(libNode, attribute)
358 if targets is not None:
359 return self._format_library_attribute(targets)
361 # we don't know anything about this attribute
362 self._add_library_details_to_model(libNode)
366 def _format_library_attribute(self, targets):
367 if len(targets) == 0:
369 elif len(targets) == 1:
370 return fromTypedNode(targets[0])
371 elif len(targets) > 1:
372 return [fromTypedNode(t) for t in targets]
374 def _search_same_as(self, subject, predicate):
375 # look for alternate names
376 other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
377 for other in other_predicates:
378 targets = list(self.model.get_targets(subject, other))
383 def find_view(self, filename):
384 """Search through potential DAF filename patterns
386 if self.__view_map is None:
387 self.__view_map = self._get_filename_view_map()
390 for pattern, view in self.__view_map.items():
391 if re.match(pattern, filename):
395 msg = "%s matched multiple views %s" % (
397 [str(x) for x in results])
398 raise ModelException(msg)
399 elif len(results) == 1:
405 def _get_filename_view_map(self):
406 """Query our model for filename patterns
408 return a dictionary of compiled regular expressions to view names
410 filename_query = RDF.Statement(
411 None, dafTermOntology['filename_re'], None)
414 for s in self.model.find_statements(filename_query):
415 view_name = s.subject
416 literal_re = s.object.literal_value['string']
417 logger.debug("Found: %s" % (literal_re,))
419 filename_re = re.compile(literal_re)
421 logger.error("Unable to compile: %s" % (literal_re,))
422 patterns[literal_re] = view_name
425 def _is_paired(self, libNode):
426 """Determine if a library is paired end"""
427 library_type = self._get_library_attribute(libNode, 'library_type')
428 if library_type is None:
429 raise ModelException("%s doesn't have a library type" % (str(libNode),))
432 single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
433 paired = ['Paired End', 'Multiplexing', 'Barcoded']
434 if library_type in single:
436 elif library_type in paired:
439 raise MetadataLookupException(
440 "Unrecognized library type %s for %s" % \
441 (library_type, str(libNode)))
443 def _get_library_url(self):
444 return str(self.libraryNS[''].uri)
445 def _set_library_url(self, value):
446 self.libraryNS = RDF.NS(str(value))
447 library_url = property(_get_library_url, _set_library_url)