7 from StringIO import StringIO
12 from htsworkflow.util.rdfhelp import \
24 logger = logging.getLogger(__name__)
27 class ModelException(RuntimeError): pass
33 def parse_into_model(model, submission_name, filename):
34 """Read a DAF into RDF Model
36 requires a short submission name
38 attributes = parse(filename)
39 add_to_model(model, attributes, submission_name)
41 def fromstream_into_model(model, submission_name, daf_stream):
42 attributes = parse_stream(daf_stream)
43 add_to_model(model, attributes, submission_name)
45 def fromstring_into_model(model, submission_name, daf_string):
46 """Read a string containing a DAF into RDF Model
48 requires a short submission name
50 attributes = fromstring(daf_string)
51 add_to_model(model, attributes, submission_name)
54 stream = open(filename,'r')
55 attributes = parse_stream(stream)
59 def fromstring(daf_string):
60 stream = StringIO(daf_string)
61 return parse_stream(stream)
63 def parse_stream(stream):
64 comment_re = re.compile("#.*$")
67 attributes = {'views': {}}
72 line = comment_re.sub("", line)
73 nstop = _extract_name_index(line)
75 sstop = _consume_whitespace(line, start=nstop)
76 vstop = _extract_value_index(line, start=sstop)
77 value = line[sstop:vstop]
79 if value.lower() in ('yes',):
81 elif value.lower() in ('no',):
85 if view_name is not None:
86 attributes['views'][view_name] = view_attributes
90 elif state == DAF_HEADER and name == 'variables':
91 attributes[name] = [ x.strip() for x in value.split(',')]
92 elif state == DAF_HEADER and name == 'view':
94 view_attributes['view'] = value
96 elif state == DAF_HEADER:
97 attributes[name] = value
98 elif state == DAF_VIEW:
99 view_attributes[name] = value
102 if view_name is not None:
103 attributes['views'][view_name] = view_attributes
107 def _consume_whitespace(line, start=0):
108 for i in xrange(start, len(line)):
109 if line[i] not in string.whitespace:
114 def _extract_name_index(line, start=0):
115 for i in xrange(start, len(line)):
116 if line[i] in string.whitespace:
121 def _extract_value_index(line, start=0):
122 shortline = line.rstrip()
123 return len(shortline)
125 def convert_to_rdf_statements(attributes, name):
126 submission_uri = get_submission_uri(name)
127 subject = RDF.Node(submission_uri)
130 for daf_key in attributes:
131 predicate = dafTermOntology[daf_key]
132 if daf_key == 'views':
133 statements.extend(_views_to_statements(name,
135 attributes[daf_key]))
136 elif daf_key == 'variables':
137 #predicate = ddfNS['variables']
138 for var in attributes.get('variables', []):
139 obj = toTypedNode(var)
140 statements.append(RDF.Statement(subject, predicate, obj))
142 value = attributes[daf_key]
143 obj = toTypedNode(value)
144 statements.append(RDF.Statement(subject,predicate,obj))
148 def _views_to_statements(name, dafNS, views):
149 subject = RDF.Node(get_submission_uri(name))
150 viewNS = get_view_namespace(name)
153 for view_name in views:
154 view_attributes = views[view_name]
155 viewSubject = viewNS[view_name]
156 statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
157 for view_attribute_name in view_attributes:
158 predicate = dafNS[view_attribute_name]
159 obj = toTypedNode(view_attributes[view_attribute_name])
160 statements.append(RDF.Statement(viewSubject, predicate, obj))
162 #statements.extend(convert_to_rdf_statements(view, viewNode))
165 def add_to_model(model, attributes, name):
166 for statement in convert_to_rdf_statements(attributes, name):
167 model.add_statement(statement)
169 def get_submission_uri(name):
170 return submissionLog[name].uri
172 def get_view_namespace(name):
173 submission_uri = get_submission_uri(name)
174 viewNS = RDF.NS(str(submission_uri) + '/view/')
177 class DAFMapper(object):
178 """Convert filenames to views in the UCSC Daf
180 def __init__(self, name, daf_file=None, model=None):
181 """Construct a RDF backed model of a UCSC DAF
184 name (str): the name of this submission (used to construct DAF url)
185 daf_file (str, stream, or None):
186 if str, use as filename
187 if stream, parse as stream
188 if none, don't attempt to load the DAF into our model
189 model (RDF.Model or None):
190 if None, construct a memory backed model
191 otherwise specifies model to use
193 if daf_file is None and model is None:
194 logger.error("We need a DAF or Model containing a DAF to work")
197 if model is not None:
200 self.model = get_model()
202 if hasattr(daf_file, 'next'):
203 # its some kind of stream
204 fromstream_into_model(self.model, name, daf_file)
207 parse_into_model(self.model, name, daf_file)
209 self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
210 self.submissionSet = get_submission_uri(self.name)
211 self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/')
212 self.__view_map = None
215 def add_pattern(self, view_name, filename_pattern):
216 """Map a filename regular expression to a view name
218 viewNS = get_view_namespace(self.name)
220 obj = toTypedNode(filename_pattern)
221 self.model.add_statement(
222 RDF.Statement(viewNS[view_name],
223 dafTermOntology['filename_re'],
227 def import_submission_dir(self, submission_dir, library_id):
228 """Import a submission directories and update our model as needed
230 #attributes = get_filename_attribute_map(paired)
231 libNode = self.libraryNS[library_id + "/"]
233 submission_files = os.listdir(submission_dir)
234 for f in submission_files:
235 self.construct_file_attributes(submission_dir, libNode, f)
237 #attributes['md5sum'] = "None"
239 #ext = attributes["filename_re"]
240 #if attributes.get("type", None) == 'fastq':
241 # fastqs.setdefault(ext, set()).add(f)
242 # fastq_attributes[ext] = attributes
244 # md5sum = make_md5sum(os.path.join(result_dir,f))
245 # if md5sum is not None:
246 # attributes['md5sum']=md5sum
250 def construct_file_attributes(self, submission_dir, libNode, pathname):
251 """Looking for the best extension
252 The 'best' is the longest match
255 filename (str): the filename whose extention we are about to examine
257 path, filename = os.path.split(pathname)
259 view = self.find_view(filename)
261 logger.warn("Unrecognized file: %s" % (pathname,))
263 if str(view) == str(libraryOntology['ignore']):
266 submissionName = toTypedNode(self.make_submission_name(submission_dir))
267 submissionNode = self.get_submission_node(submission_dir)
268 self.model.add_statement(
269 RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode))
271 fileNode = RDF.Node(RDF.Uri(str(submissionNode.uri) + '/' +filename))
272 self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], view))
273 self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], submissionName))
274 self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
276 self.model.add_statement(
277 RDF.Statement(view, dafTermOntology['filename'], toTypedNode(filename)))
278 self.model.add_statement(
279 RDF.Statement(view, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
280 self.model.add_statement(
281 RDF.Statement(view, dafTermOntology['submission'], submissionNode))
284 terms = [dafTermOntology['type'],
285 dafTermOntology['filename_re'],
287 terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
289 # Add everything I can find
291 value = self._get_library_attribute(libNode, term)
292 if value is not None:
293 self.model.add_statement(RDF.Statement(view, term, value))
296 def _add_library_details_to_model(self, libNode):
297 parser = RDF.Parser(name='rdfa')
298 new_statements = parser.parse_as_stream(libNode.uri)
299 for s in new_statements:
300 # don't override things we already have in the model
301 q = RDF.Statement(s.subject, s.predicate, None)
302 if len(list(self.model.find_statements(q))) == 0:
305 statements = list(self.model.find_statements(q))
306 if len(statements) == 0:
307 logger.warning("Nothing known about %s" % (str(libNode),))
309 def get_daf_variables(self):
310 """Returns simple variables names that to include in the ddf
312 variableTerm = dafTermOntology['variables']
314 for obj in self.model.get_targets(self.submissionSet, variableTerm):
315 value = str(fromTypedNode(obj))
316 results.append(value)
317 results.append('labVersion')
320 def make_submission_name(self, submission_dir):
321 submission_dir = os.path.normpath(submission_dir)
322 submission_dir_name = os.path.split(submission_dir)[1]
323 if len(submission_dir_name) == 0:
325 "Submission dir name too short: %s" %(submission_dir,))
326 return submission_dir_name
328 def get_submission_node(self, submission_dir):
329 """Convert a submission directory name to a submission node
331 submission_name = self.make_submission_name(submission_dir)
332 return self.submissionSetNS[submission_name]
334 def _get_library_attribute(self, libNode, attribute):
335 if not isinstance(attribute, RDF.Node):
336 attribute = libraryOntology[attribute]
338 # search through the model twice (adding in data from website)
340 targets = list(self.model.get_targets(libNode, attribute))
342 return self._format_library_attribute(targets)
344 targets = self._search_same_as(libNode, attribute)
345 if targets is not None:
346 return self._format_library_attribute(targets)
348 # we don't know anything about this attribute
349 self._add_library_details_to_model(libNode)
353 def _format_library_attribute(self, targets):
354 if len(targets) == 0:
356 elif len(targets) == 1:
357 return fromTypedNode(targets[0])
358 elif len(targets) > 1:
359 return [fromTypedNode(t) for t in targets]
361 def _search_same_as(self, subject, predicate):
362 # look for alternate names
363 other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
364 for other in other_predicates:
365 targets = list(self.model.get_targets(subject, other))
370 def find_view(self, filename):
371 """Search through potential DAF filename patterns
373 if self.__view_map is None:
374 self.__view_map = self._get_filename_view_map()
377 for pattern, view in self.__view_map.items():
378 if re.match(pattern, filename):
382 msg = "%s matched multiple views %s" % (
384 [str(x) for x in results])
385 raise ModelException(msg)
386 elif len(results) == 1:
392 def _get_filename_view_map(self):
393 """Query our model for filename patterns
395 return a dictionary of compiled regular expressions to view names
397 filename_query = RDF.Statement(
398 None, dafTermOntology['filename_re'], None)
401 for s in self.model.find_statements(filename_query):
402 view_name = s.subject
403 literal_re = s.object.literal_value['string']
404 logger.debug("Found: %s" % (literal_re,))
406 filename_re = re.compile(literal_re)
408 logger.error("Unable to compile: %s" % (literal_re,))
409 patterns[literal_re] = view_name
412 def _is_paired(self, libNode):
413 """Determine if a library is paired end"""
414 library_type = self._get_library_attribute(libNode, 'library_type')
415 if library_type is None:
416 raise ModelException("%s doesn't have a library type" % (str(libNode),))
419 single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
420 paired = ['Paired End', 'Multiplexing', 'Barcoded']
421 if library_type in single:
423 elif library_type in paired:
426 raise RuntimeError("Unrecognized library type %s" % (library_type,))