Finish new RDF based ddf construction.
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import logging
4 import os
5 import re
6 import string
7 from StringIO import StringIO
8 import types
9 import urlparse
10
11 import RDF
12 from htsworkflow.util.rdfhelp import \
13      blankOrUri, \
14      dafTermOntology, \
15      get_model, \
16      libraryOntology, \
17      owlNS, \
18      rdfNS, \
19      submissionLog, \
20      submissionOntology, \
21      toTypedNode, \
22      fromTypedNode
23
24 logger = logging.getLogger(__name__)
25
26 #
27 class ModelException(RuntimeError): pass
28 class MetadataLookupException(RuntimeError):
29     """Problem accessing metadata"""
30     pass
31
32 # STATES
33 DAF_HEADER = 1
34 DAF_VIEW = 2
35
36 def parse_into_model(model, submission_name, filename):
37     """Read a DAF into RDF Model
38
39     requires a short submission name
40     """
41     attributes = parse(filename)
42     add_to_model(model, attributes, submission_name)
43
44 def fromstream_into_model(model, submission_name, daf_stream):
45     attributes = parse_stream(daf_stream)
46     add_to_model(model, attributes, submission_name)
47     
48 def fromstring_into_model(model, submission_name, daf_string):
49     """Read a string containing a DAF into RDF Model
50
51     requires a short submission name
52     """
53     attributes = fromstring(daf_string)
54     add_to_model(model, attributes, submission_name)
55     
56 def parse(filename):
57     stream = open(filename,'r')
58     attributes =  parse_stream(stream)
59     stream.close()
60     return attributes
61
62 def fromstring(daf_string):
63     stream = StringIO(daf_string)
64     return parse_stream(stream)
65
66 def parse_stream(stream):
67     comment_re = re.compile("#.*$")
68
69     state = DAF_HEADER
70     attributes = {'views': {}}
71     view_name = None
72     view_attributes = {}
73     for line in stream:
74         #remove comments
75         line = comment_re.sub("", line)
76         nstop = _extract_name_index(line)
77         name = line[0:nstop]
78         sstop = _consume_whitespace(line, start=nstop)
79         vstop = _extract_value_index(line, start=sstop)
80         value = line[sstop:vstop]
81
82         if value.lower() in ('yes',):
83             value = True
84         elif value.lower() in ('no',):
85             value = False
86             
87         if len(name) == 0:
88             if view_name is not None:
89                 attributes['views'][view_name] = view_attributes
90                 view_name = None
91                 view_attributes = {}
92             state = DAF_HEADER
93         elif state == DAF_HEADER and name == 'variables':
94             attributes[name] = [ x.strip() for x in value.split(',')]
95         elif state == DAF_HEADER and name == 'view':
96             view_name = value
97             view_attributes['view'] = value
98             state = DAF_VIEW
99         elif state == DAF_HEADER:
100             attributes[name] = value
101         elif state == DAF_VIEW:
102             view_attributes[name] = value
103
104     # save last block
105     if view_name is not None:
106         attributes['views'][view_name] = view_attributes
107         
108     return attributes
109
110 def _consume_whitespace(line, start=0):
111     for i in xrange(start, len(line)):
112         if line[i] not in string.whitespace:
113             return i
114         
115     return len(line)
116
117 def _extract_name_index(line, start=0):
118     for i in xrange(start, len(line)):
119         if line[i] in string.whitespace:
120             return i
121         
122     return len(line)
123
124 def _extract_value_index(line, start=0):
125     shortline = line.rstrip()
126     return len(shortline)
127
128 def convert_to_rdf_statements(attributes, name):
129     submission_uri = get_submission_uri(name)
130     subject = RDF.Node(submission_uri)
131
132     statements = []
133     for daf_key in attributes:
134         predicate = dafTermOntology[daf_key]
135         if daf_key == 'views':
136             statements.extend(_views_to_statements(name,
137                                                    dafTermOntology,
138                                                    attributes[daf_key]))
139         elif daf_key == 'variables':
140             #predicate = ddfNS['variables']
141             for var in attributes.get('variables', []):
142                 obj = toTypedNode(var)
143                 statements.append(RDF.Statement(subject, predicate, obj))
144         else:
145             value = attributes[daf_key]
146             obj = toTypedNode(value)
147             statements.append(RDF.Statement(subject,predicate,obj))
148
149     return statements
150
151 def _views_to_statements(name, dafNS, views):
152     subject = RDF.Node(get_submission_uri(name))
153     viewNS = get_view_namespace(name)
154
155     statements = []
156     for view_name in views:
157         view_attributes = views[view_name]
158         viewSubject = viewNS[view_name]
159         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
160         statements.append(
161             RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
162         for view_attribute_name in view_attributes:
163             predicate = dafNS[view_attribute_name]
164             obj = toTypedNode(view_attributes[view_attribute_name])
165             statements.append(RDF.Statement(viewSubject, predicate, obj)) 
166             
167         #statements.extend(convert_to_rdf_statements(view, viewNode))
168     return statements
169
170 def add_to_model(model, attributes, name):
171     for statement in convert_to_rdf_statements(attributes, name):
172         model.add_statement(statement)
173             
174 def get_submission_uri(name):
175     return  submissionLog[name].uri
176
177 def get_view_namespace(name):
178     submission_uri = get_submission_uri(name)
179     viewNS = RDF.NS(str(submission_uri) + '/view/')
180     return viewNS
181
182 class DAFMapper(object):
183     """Convert filenames to views in the UCSC Daf
184     """
185     def __init__(self, name, daf_file=None, model=None):
186         """Construct a RDF backed model of a UCSC DAF
187
188         :args:
189           name (str): the name of this submission (used to construct DAF url)
190           daf_file (str, stream, or None):
191              if str, use as filename
192              if stream, parse as stream
193              if none, don't attempt to load the DAF into our model
194           model (RDF.Model or None):
195              if None, construct a memory backed model
196              otherwise specifies model to use
197         """
198         if daf_file is None and model is None:
199             logger.error("We need a DAF or Model containing a DAF to work")
200
201         self.name = name
202         if model is not None:
203             self.model = model
204         else:
205             self.model = get_model()
206
207         if hasattr(daf_file, 'next'):
208             # its some kind of stream
209             fromstream_into_model(self.model, name, daf_file)
210         else:
211             # file
212             parse_into_model(self.model, name, daf_file)
213
214         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
215         self.submissionSet = get_submission_uri(self.name)
216         self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/')
217         self.__view_map = None
218
219
220     def add_pattern(self, view_name, filename_pattern):
221         """Map a filename regular expression to a view name
222         """
223         viewNS = get_view_namespace(self.name)
224
225         obj = toTypedNode(filename_pattern)
226         self.model.add_statement(
227             RDF.Statement(viewNS[view_name],
228                           dafTermOntology['filename_re'],
229                           obj))
230
231
232     def import_submission_dir(self, submission_dir, library_id):
233         """Import a submission directories and update our model as needed
234         """
235         #attributes = get_filename_attribute_map(paired)
236         libNode = self.libraryNS[library_id + "/"]
237         
238         submission_files = os.listdir(submission_dir)
239         for f in submission_files:
240             self.construct_file_attributes(submission_dir, libNode, f)
241
242             #attributes['md5sum'] = "None"
243             #
244             #ext = attributes["filename_re"]
245             #if attributes.get("type", None) == 'fastq':
246             #    fastqs.setdefault(ext, set()).add(f)
247             #    fastq_attributes[ext] = attributes
248             #else:
249             #    md5sum = make_md5sum(os.path.join(result_dir,f))
250             #    if md5sum is not None:
251             #        attributes['md5sum']=md5sum
252             #print attributes
253             
254         
255     def construct_file_attributes(self, submission_dir, libNode, pathname):
256         """Looking for the best extension
257         The 'best' is the longest match
258         
259         :Args:
260         filename (str): the filename whose extention we are about to examine
261         """
262         path, filename = os.path.split(pathname)
263
264         view = self.find_view(filename)
265         if view is None:
266             logger.warn("Unrecognized file: %s" % (pathname,))
267             return None
268         if str(view) == str(libraryOntology['ignore']):
269             return None
270
271         submission_name = self.make_submission_name(submission_dir)
272         submissionNode = self.get_submission_node(submission_dir)
273         submission_uri = submissionNode.uri
274         print "submission:", str(submission_name), str(submissionNode), str(submission_uri)
275
276         view_name = fromTypedNode(self.model.get_target(view, dafTermOntology['name']))
277         submissionView = RDF.Node(RDF.Uri(str(submission_uri) + '/' + view_name))
278
279         self.model.add_statement(
280             RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode))
281
282         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView))
283         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name)))
284         self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
285
286
287         self.model.add_statement(
288             RDF.Statement(submissionView, dafTermOntology['filename'], toTypedNode(filename)))
289         self.model.add_statement(
290             RDF.Statement(submissionView, dafTermOntology['view'], view))
291         self.model.add_statement(
292             RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
293         self.model.add_statement(
294             RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode))
295             
296         # extra information 
297         terms = [dafTermOntology['type'],
298                  dafTermOntology['filename_re'],
299                  ]
300         terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
301
302         # Add everything I can find
303         for term in terms:
304             value = self._get_library_attribute(libNode, term)
305             if value is not None:
306                 self.model.add_statement(RDF.Statement(submissionView, term, value))
307
308             
309     def _add_library_details_to_model(self, libNode):
310         parser = RDF.Parser(name='rdfa')
311         new_statements = parser.parse_as_stream(libNode.uri)
312         for s in new_statements:
313             # don't override things we already have in the model
314             q = RDF.Statement(s.subject, s.predicate, None)
315             if len(list(self.model.find_statements(q))) == 0:
316                 self.model.append(s)
317             
318         statements = list(self.model.find_statements(q))
319         if len(statements) == 0:
320             logger.warning("Nothing known about %s" % (str(libNode),))
321
322     def get_daf_variables(self):
323         """Returns simple variables names that to include in the ddf
324         """
325         variableTerm = dafTermOntology['variables']
326         results = ['view']
327         for obj in self.model.get_targets(self.submissionSet, variableTerm):
328             value = str(fromTypedNode(obj))
329             results.append(value)
330         results.append('labVersion')
331         return results
332
333     def make_submission_name(self, submission_dir):
334         submission_dir = os.path.normpath(submission_dir)
335         submission_dir_name = os.path.split(submission_dir)[1]
336         if len(submission_dir_name) == 0:
337             raise RuntimeError(
338                 "Submission dir name too short: %s" %(submission_dir,))
339         return submission_dir_name
340         
341     def get_submission_node(self, submission_dir):
342         """Convert a submission directory name to a submission node
343         """
344         submission_name = self.make_submission_name(submission_dir)
345         return self.submissionSetNS[submission_name]
346         
347     def _get_library_attribute(self, libNode, attribute):
348         if not isinstance(attribute, RDF.Node):
349             attribute = libraryOntology[attribute]
350
351         # search through the model twice (adding in data from website)
352         for i in xrange(2):
353             targets = list(self.model.get_targets(libNode, attribute))
354             if len(targets) > 0:
355                 return self._format_library_attribute(targets)
356
357             targets = self._search_same_as(libNode, attribute)
358             if targets is not None:
359                 return self._format_library_attribute(targets)
360             
361             # we don't know anything about this attribute
362             self._add_library_details_to_model(libNode)
363
364         return None
365             
366     def _format_library_attribute(self, targets):
367         if len(targets) == 0:
368             return None
369         elif len(targets) == 1:
370             return fromTypedNode(targets[0])
371         elif len(targets) > 1:
372             return [fromTypedNode(t) for t in targets]
373
374     def _search_same_as(self, subject, predicate):
375         # look for alternate names
376         other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
377         for other in other_predicates:
378             targets = list(self.model.get_targets(subject, other))
379             if len(targets) > 0:
380                 return targets
381         return None
382         
383     def find_view(self, filename):
384         """Search through potential DAF filename patterns
385         """
386         if self.__view_map is None:
387             self.__view_map = self._get_filename_view_map()
388
389         results = []
390         for pattern, view in self.__view_map.items():
391             if re.match(pattern, filename):
392                 results.append(view)
393
394         if len(results) > 1:
395             msg = "%s matched multiple views %s" % (
396                 filename,
397                 [str(x) for x in results])
398             raise ModelException(msg)
399         elif len(results) == 1:
400             return results[0]
401         else:
402             return None
403         
404
405     def _get_filename_view_map(self):
406         """Query our model for filename patterns
407
408         return a dictionary of compiled regular expressions to view names
409         """
410         filename_query = RDF.Statement(
411             None, dafTermOntology['filename_re'], None)
412
413         patterns = {}
414         for s in self.model.find_statements(filename_query):
415             view_name = s.subject
416             literal_re = s.object.literal_value['string']
417             logger.debug("Found: %s" % (literal_re,))
418             try:
419                 filename_re = re.compile(literal_re)
420             except re.error, e:
421                 logger.error("Unable to compile: %s" % (literal_re,))
422             patterns[literal_re] = view_name
423         return patterns
424
425     def _is_paired(self, libNode):
426         """Determine if a library is paired end"""
427         library_type = self._get_library_attribute(libNode, 'library_type')
428         if library_type is None:
429             raise ModelException("%s doesn't have a library type" % (str(libNode),))
430         
431         #single = (1,3,6)
432         single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
433         paired = ['Paired End', 'Multiplexing', 'Barcoded']
434         if library_type in single:
435             return False
436         elif library_type in paired:
437             return True
438         else:
439             raise MetadataLookupException(
440                 "Unrecognized library type %s for %s" % \
441                 (library_type, str(libNode)))
442
443     def _get_library_url(self):
444         return str(self.libraryNS[''].uri)
445     def _set_library_url(self, value):
446         self.libraryNS = RDF.NS(str(value))
447     library_url = property(_get_library_url, _set_library_url)