7 from StringIO import StringIO
12 from htsworkflow.util.rdfhelp import \
24 logger = logging.getLogger(__name__)
27 class ModelException(RuntimeError): pass
28 class MetadataLookupException(RuntimeError):
29 """Problem accessing metadata"""
36 def parse_into_model(model, submission_name, filename):
37 """Read a DAF into RDF Model
39 requires a short submission name
41 attributes = parse(filename)
42 add_to_model(model, attributes, submission_name)
44 def fromstream_into_model(model, submission_name, daf_stream):
45 attributes = parse_stream(daf_stream)
46 add_to_model(model, attributes, submission_name)
48 def fromstring_into_model(model, submission_name, daf_string):
49 """Read a string containing a DAF into RDF Model
51 requires a short submission name
53 attributes = fromstring(daf_string)
54 add_to_model(model, attributes, submission_name)
57 stream = open(filename,'r')
58 attributes = parse_stream(stream)
62 def fromstring(daf_string):
63 stream = StringIO(daf_string)
64 return parse_stream(stream)
66 def parse_stream(stream):
67 comment_re = re.compile("#.*$")
70 attributes = {'views': {}}
75 line = comment_re.sub("", line)
76 nstop = _extract_name_index(line)
78 sstop = _consume_whitespace(line, start=nstop)
79 vstop = _extract_value_index(line, start=sstop)
80 value = line[sstop:vstop]
82 if value.lower() in ('yes',):
84 elif value.lower() in ('no',):
88 if view_name is not None:
89 attributes['views'][view_name] = view_attributes
93 elif state == DAF_HEADER and name == 'variables':
94 attributes[name] = [ x.strip() for x in value.split(',')]
95 elif state == DAF_HEADER and name == 'view':
97 view_attributes['view'] = value
99 elif state == DAF_HEADER:
100 attributes[name] = value
101 elif state == DAF_VIEW:
102 view_attributes[name] = value
105 if view_name is not None:
106 attributes['views'][view_name] = view_attributes
110 def _consume_whitespace(line, start=0):
111 for i in xrange(start, len(line)):
112 if line[i] not in string.whitespace:
117 def _extract_name_index(line, start=0):
118 for i in xrange(start, len(line)):
119 if line[i] in string.whitespace:
124 def _extract_value_index(line, start=0):
125 shortline = line.rstrip()
126 return len(shortline)
128 def convert_to_rdf_statements(attributes, name):
129 submission_uri = get_submission_uri(name)
130 subject = RDF.Node(submission_uri)
133 for daf_key in attributes:
134 predicate = dafTermOntology[daf_key]
135 if daf_key == 'views':
136 statements.extend(_views_to_statements(name,
138 attributes[daf_key]))
139 elif daf_key == 'variables':
140 #predicate = ddfNS['variables']
141 for var in attributes.get('variables', []):
142 obj = toTypedNode(var)
143 statements.append(RDF.Statement(subject, predicate, obj))
145 value = attributes[daf_key]
146 obj = toTypedNode(value)
147 statements.append(RDF.Statement(subject,predicate,obj))
151 def _views_to_statements(name, dafNS, views):
152 subject = RDF.Node(get_submission_uri(name))
153 viewNS = get_view_namespace(name)
156 for view_name in views:
157 view_attributes = views[view_name]
158 viewSubject = viewNS[view_name]
159 statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
161 RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
162 for view_attribute_name in view_attributes:
163 predicate = dafNS[view_attribute_name]
164 obj = toTypedNode(view_attributes[view_attribute_name])
165 statements.append(RDF.Statement(viewSubject, predicate, obj))
167 #statements.extend(convert_to_rdf_statements(view, viewNode))
170 def add_to_model(model, attributes, name):
171 for statement in convert_to_rdf_statements(attributes, name):
172 model.add_statement(statement)
174 def get_submission_uri(name):
175 return submissionLog[name].uri
177 def get_view_namespace(name):
178 submission_uri = get_submission_uri(name)
179 viewNS = RDF.NS(str(submission_uri) + '/view/')
182 class DAFMapper(object):
183 """Convert filenames to views in the UCSC Daf
185 def __init__(self, name, daf_file=None, model=None):
186 """Construct a RDF backed model of a UCSC DAF
189 name (str): the name of this submission (used to construct DAF url)
190 daf_file (str, stream, or None):
191 if str, use as filename
192 if stream, parse as stream
193 if none, don't attempt to load the DAF into our model
194 model (RDF.Model or None):
195 if None, construct a memory backed model
196 otherwise specifies model to use
198 if daf_file is None and model is None:
199 logger.error("We need a DAF or Model containing a DAF to work")
202 if model is not None:
205 self.model = get_model()
207 if hasattr(daf_file, 'next'):
208 # its some kind of stream
209 fromstream_into_model(self.model, name, daf_file)
212 parse_into_model(self.model, name, daf_file)
214 self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
215 self.submissionSet = get_submission_uri(self.name)
216 self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/')
217 self.__view_map = None
220 def add_pattern(self, view_name, filename_pattern):
221 """Map a filename regular expression to a view name
223 viewNS = get_view_namespace(self.name)
225 obj = toTypedNode(filename_pattern)
226 self.model.add_statement(
227 RDF.Statement(viewNS[view_name],
228 dafTermOntology['filename_re'],
232 def import_submission_dir(self, submission_dir, library_id):
233 """Import a submission directories and update our model as needed
235 #attributes = get_filename_attribute_map(paired)
236 libNode = self.libraryNS[library_id + "/"]
238 submission_files = os.listdir(submission_dir)
239 for f in submission_files:
240 self.construct_file_attributes(submission_dir, libNode, f)
242 #attributes['md5sum'] = "None"
244 #ext = attributes["filename_re"]
245 #if attributes.get("type", None) == 'fastq':
246 # fastqs.setdefault(ext, set()).add(f)
247 # fastq_attributes[ext] = attributes
249 # md5sum = make_md5sum(os.path.join(result_dir,f))
250 # if md5sum is not None:
251 # attributes['md5sum']=md5sum
255 def construct_file_attributes(self, submission_dir, libNode, pathname):
256 """Looking for the best extension
257 The 'best' is the longest match
260 filename (str): the filename whose extention we are about to examine
262 path, filename = os.path.split(pathname)
264 view = self.find_view(filename)
266 logger.warn("Unrecognized file: %s" % (pathname,))
268 if str(view) == str(libraryOntology['ignore']):
271 submission_name = self.make_submission_name(submission_dir)
272 submissionNode = self.get_submission_node(submission_dir)
273 submission_uri = str(submissionNode.uri)
274 view_name = fromTypedNode(self.model.get_target(view, dafTermOntology['name']))
275 submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
277 self.model.add_statement(
278 RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode))
280 self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView))
281 self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name)))
282 self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
285 self.model.add_statement(
286 RDF.Statement(submissionView, dafTermOntology['filename'], toTypedNode(filename)))
287 self.model.add_statement(
288 RDF.Statement(submissionView, dafTermOntology['view'], view))
289 self.model.add_statement(
290 RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
291 self.model.add_statement(
292 RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode))
295 terms = [dafTermOntology['type'],
296 dafTermOntology['filename_re'],
298 terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
300 # Add everything I can find
302 value = self._get_library_attribute(libNode, term)
303 if value is not None:
304 self.model.add_statement(RDF.Statement(submissionView, term, value))
307 def _add_library_details_to_model(self, libNode):
308 parser = RDF.Parser(name='rdfa')
309 new_statements = parser.parse_as_stream(libNode.uri)
310 for s in new_statements:
311 # don't override things we already have in the model
312 q = RDF.Statement(s.subject, s.predicate, None)
313 if len(list(self.model.find_statements(q))) == 0:
316 statements = list(self.model.find_statements(q))
317 if len(statements) == 0:
318 logger.warning("Nothing known about %s" % (str(libNode),))
320 def get_daf_variables(self):
321 """Returns simple variables names that to include in the ddf
323 variableTerm = dafTermOntology['variables']
325 for obj in self.model.get_targets(self.submissionSet, variableTerm):
326 value = str(fromTypedNode(obj))
327 results.append(value)
328 results.append('labVersion')
331 def make_submission_name(self, submission_dir):
332 submission_dir = os.path.normpath(submission_dir)
333 submission_dir_name = os.path.split(submission_dir)[1]
334 if len(submission_dir_name) == 0:
336 "Submission dir name too short: %s" %(submission_dir,))
337 return submission_dir_name
339 def get_submission_node(self, submission_dir):
340 """Convert a submission directory name to a submission node
342 submission_name = self.make_submission_name(submission_dir)
343 return self.submissionSetNS[submission_name]
345 def _get_library_attribute(self, libNode, attribute):
346 if not isinstance(attribute, RDF.Node):
347 attribute = libraryOntology[attribute]
349 # search through the model twice (adding in data from website)
351 targets = list(self.model.get_targets(libNode, attribute))
353 return self._format_library_attribute(targets)
355 targets = self._search_same_as(libNode, attribute)
356 if targets is not None:
357 return self._format_library_attribute(targets)
359 # we don't know anything about this attribute
360 self._add_library_details_to_model(libNode)
364 def _format_library_attribute(self, targets):
365 if len(targets) == 0:
367 elif len(targets) == 1:
368 return fromTypedNode(targets[0])
369 elif len(targets) > 1:
370 return [fromTypedNode(t) for t in targets]
372 def _search_same_as(self, subject, predicate):
373 # look for alternate names
374 other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
375 for other in other_predicates:
376 targets = list(self.model.get_targets(subject, other))
381 def find_view(self, filename):
382 """Search through potential DAF filename patterns
384 if self.__view_map is None:
385 self.__view_map = self._get_filename_view_map()
388 for pattern, view in self.__view_map.items():
389 if re.match(pattern, filename):
393 msg = "%s matched multiple views %s" % (
395 [str(x) for x in results])
396 raise ModelException(msg)
397 elif len(results) == 1:
403 def _get_filename_view_map(self):
404 """Query our model for filename patterns
406 return a dictionary of compiled regular expressions to view names
408 filename_query = RDF.Statement(
409 None, dafTermOntology['filename_re'], None)
412 for s in self.model.find_statements(filename_query):
413 view_name = s.subject
414 literal_re = s.object.literal_value['string']
415 logger.debug("Found: %s" % (literal_re,))
417 filename_re = re.compile(literal_re)
419 logger.error("Unable to compile: %s" % (literal_re,))
420 patterns[literal_re] = view_name
423 def _is_paired(self, libNode):
424 """Determine if a library is paired end"""
425 library_type = self._get_library_attribute(libNode, 'library_type')
426 if library_type is None:
427 raise ModelException("%s doesn't have a library type" % (str(libNode),))
430 single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
431 paired = ['Paired End', 'Multiplexing', 'Barcoded']
432 if library_type in single:
434 elif library_type in paired:
437 raise MetadataLookupException(
438 "Unrecognized library type %s for %s" % \
439 (library_type, str(libNode)))
441 def _get_library_url(self):
442 return str(self.libraryNS[''].uri)
443 def _set_library_url(self, value):
444 self.libraryNS = RDF.NS(str(value))
445 library_url = property(_get_library_url, _set_library_url)