b8a4177fd46fd164ea9f1940c1ecc99ce8c92eab
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import logging
4 import os
5 import re
6 import string
7 from StringIO import StringIO
8 import types
9 import urlparse
10
11 import RDF
12 from htsworkflow.util.rdfhelp import \
13      blankOrUri, \
14      dafTermOntology, \
15      get_model, \
16      libraryOntology, \
17      owlNS, \
18      rdfNS, \
19      submissionLog, \
20      submissionOntology, \
21      toTypedNode, \
22      fromTypedNode
23 from htsworkflow.util.hashfile import make_md5sum
24
25 logger = logging.getLogger(__name__)
26
27 #
28 class ModelException(RuntimeError): pass
29 class MetadataLookupException(RuntimeError):
30     """Problem accessing metadata"""
31     pass
32
33 # STATES
34 DAF_HEADER = 1
35 DAF_VIEW = 2
36
37 def parse_into_model(model, submission_name, filename):
38     """Read a DAF into RDF Model
39
40     requires a short submission name
41     """
42     attributes = parse(filename)
43     add_to_model(model, attributes, submission_name)
44
45 def fromstream_into_model(model, submission_name, daf_stream):
46     attributes = parse_stream(daf_stream)
47     add_to_model(model, attributes, submission_name)
48     
49 def fromstring_into_model(model, submission_name, daf_string):
50     """Read a string containing a DAF into RDF Model
51
52     requires a short submission name
53     """
54     attributes = fromstring(daf_string)
55     add_to_model(model, attributes, submission_name)
56     
57 def parse(filename):
58     stream = open(filename,'r')
59     attributes =  parse_stream(stream)
60     stream.close()
61     return attributes
62
63 def fromstring(daf_string):
64     stream = StringIO(daf_string)
65     return parse_stream(stream)
66
67 def parse_stream(stream):
68     comment_re = re.compile("#.*$")
69
70     state = DAF_HEADER
71     attributes = {'views': {}}
72     view_name = None
73     view_attributes = {}
74     for line in stream:
75         #remove comments
76         line = comment_re.sub("", line)
77         nstop = _extract_name_index(line)
78         name = line[0:nstop]
79         sstop = _consume_whitespace(line, start=nstop)
80         vstop = _extract_value_index(line, start=sstop)
81         value = line[sstop:vstop]
82
83         if value.lower() in ('yes',):
84             value = True
85         elif value.lower() in ('no',):
86             value = False
87             
88         if len(name) == 0:
89             if view_name is not None:
90                 attributes['views'][view_name] = view_attributes
91                 view_name = None
92                 view_attributes = {}
93             state = DAF_HEADER
94         elif state == DAF_HEADER and name == 'variables':
95             attributes[name] = [ x.strip() for x in value.split(',')]
96         elif state == DAF_HEADER and name == 'view':
97             view_name = value
98             view_attributes['view'] = value
99             state = DAF_VIEW
100         elif state == DAF_HEADER:
101             attributes[name] = value
102         elif state == DAF_VIEW:
103             view_attributes[name] = value
104
105     # save last block
106     if view_name is not None:
107         attributes['views'][view_name] = view_attributes
108         
109     return attributes
110
111 def _consume_whitespace(line, start=0):
112     for i in xrange(start, len(line)):
113         if line[i] not in string.whitespace:
114             return i
115         
116     return len(line)
117
118 def _extract_name_index(line, start=0):
119     for i in xrange(start, len(line)):
120         if line[i] in string.whitespace:
121             return i
122         
123     return len(line)
124
125 def _extract_value_index(line, start=0):
126     shortline = line.rstrip()
127     return len(shortline)
128
129 def convert_to_rdf_statements(attributes, name):
130     submission_uri = get_submission_uri(name)
131     subject = RDF.Node(submission_uri)
132
133     statements = []
134     for daf_key in attributes:
135         predicate = dafTermOntology[daf_key]
136         if daf_key == 'views':
137             statements.extend(_views_to_statements(name,
138                                                    dafTermOntology,
139                                                    attributes[daf_key]))
140         elif daf_key == 'variables':
141             #predicate = ddfNS['variables']
142             for var in attributes.get('variables', []):
143                 obj = toTypedNode(var)
144                 statements.append(RDF.Statement(subject, predicate, obj))
145         else:
146             value = attributes[daf_key]
147             obj = toTypedNode(value)
148             statements.append(RDF.Statement(subject,predicate,obj))
149
150     return statements
151
152 def _views_to_statements(name, dafNS, views):
153     subject = RDF.Node(get_submission_uri(name))
154     viewNS = get_view_namespace(name)
155
156     statements = []
157     for view_name in views:
158         view_attributes = views[view_name]
159         viewSubject = viewNS[view_name]
160         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
161         statements.append(
162             RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
163         for view_attribute_name in view_attributes:
164             predicate = dafNS[view_attribute_name]
165             obj = toTypedNode(view_attributes[view_attribute_name])
166             statements.append(RDF.Statement(viewSubject, predicate, obj)) 
167             
168         #statements.extend(convert_to_rdf_statements(view, viewNode))
169     return statements
170
171 def add_to_model(model, attributes, name):
172     for statement in convert_to_rdf_statements(attributes, name):
173         model.add_statement(statement)
174             
175 def get_submission_uri(name):
176     return  submissionLog[name].uri
177
178 def get_view_namespace(name):
179     submission_uri = get_submission_uri(name)
180     viewNS = RDF.NS(str(submission_uri) + '/view/')
181     return viewNS
182
183 class DAFMapper(object):
184     """Convert filenames to views in the UCSC Daf
185     """
186     def __init__(self, name, daf_file=None, model=None):
187         """Construct a RDF backed model of a UCSC DAF
188
189         :args:
190           name (str): the name of this submission (used to construct DAF url)
191           daf_file (str, stream, or None):
192              if str, use as filename
193              if stream, parse as stream
194              if none, don't attempt to load the DAF into our model
195           model (RDF.Model or None):
196              if None, construct a memory backed model
197              otherwise specifies model to use
198         """
199         if daf_file is None and model is None:
200             logger.error("We need a DAF or Model containing a DAF to work")
201
202         self.name = name
203         if model is not None:
204             self.model = model
205         else:
206             self.model = get_model()
207
208         if hasattr(daf_file, 'next'):
209             # its some kind of stream
210             fromstream_into_model(self.model, name, daf_file)
211         else:
212             # file
213             parse_into_model(self.model, name, daf_file)
214
215         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
216         self.submissionSet = get_submission_uri(self.name)
217         self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/')
218         self.__view_map = None
219
220
221     def add_pattern(self, view_name, filename_pattern):
222         """Map a filename regular expression to a view name
223         """
224         viewNS = get_view_namespace(self.name)
225
226         obj = toTypedNode(filename_pattern)
227         self.model.add_statement(
228             RDF.Statement(viewNS[view_name],
229                           dafTermOntology['filename_re'],
230                           obj))
231
232
233     def import_submission_dir(self, submission_dir, library_id):
234         """Import a submission directories and update our model as needed
235         """
236         #attributes = get_filename_attribute_map(paired)
237         libNode = self.libraryNS[library_id + "/"]
238         
239         self._add_library_details_to_model(libNode)
240         
241         submission_files = os.listdir(submission_dir)
242         for f in submission_files:
243             self.construct_file_attributes(submission_dir, libNode, f)
244
245         
246     def construct_file_attributes(self, submission_dir, libNode, pathname):
247         """Looking for the best extension
248         The 'best' is the longest match
249         
250         :Args:
251         filename (str): the filename whose extention we are about to examine
252         """
253         path, filename = os.path.split(pathname)
254
255         logger.debug("Searching for view")
256         view = self.find_view(filename)
257         if view is None:
258             logger.warn("Unrecognized file: %s" % (pathname,))
259             return None
260         if str(view) == str(libraryOntology['ignore']):
261             return None
262
263         submission_name = self.make_submission_name(submission_dir)
264         submissionNode = self.get_submission_node(submission_dir)
265         submission_uri = str(submissionNode.uri)
266         view_name = fromTypedNode(self.model.get_target(view, dafTermOntology['name']))
267         if view_name is None:
268             logging.warning('Could not find view name for {0}'.format(str(view)))
269             return
270         
271         view_name = str(view_name)
272         submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
273
274         self.model.add_statement(
275             RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode))
276         logger.debug("Adding statements to {0}".format(str(submissionNode)))
277         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView))
278         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name)))
279         self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
280         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['library'], libNode))
281
282         logger.debug("Adding statements to {0}".format(str(submissionView)))
283         # add trac specific information
284         self.model.add_statement(
285             RDF.Statement(submissionView, dafTermOntology['view'], view))
286         self.model.add_statement(
287             RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
288         self.model.add_statement(
289             RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode))
290
291         # extra information 
292         terms = [dafTermOntology['type'],
293                  dafTermOntology['filename_re'],
294                  ]
295         terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
296
297         # add file specific information
298         logger.debug("Updating file md5sum")
299         fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
300         submission_pathname = os.path.join(submission_dir, filename)
301         md5 = make_md5sum(submission_pathname)
302         self.model.add_statement(
303             RDF.Statement(submissionView, dafTermOntology['has_file'], fileNode))
304         self.model.add_statement(
305             RDF.Statement(fileNode, dafTermOntology['filename'], filename))
306
307         if md5 is None:
308             logging.warning("Unable to produce md5sum for %s" % ( submission_pathname))
309         else:
310             self.model.add_statement(
311                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
312
313         logger.debug("Done.")
314         
315     def _add_library_details_to_model(self, libNode):
316         parser = RDF.Parser(name='rdfa')
317         new_statements = parser.parse_as_stream(libNode.uri)
318         for s in new_statements:
319             # don't override things we already have in the model
320             targets = list(self.model.get_targets(s.subject, s.predicate))
321             if len(targets) == 0:
322                 self.model.append(s)
323
324     def get_daf_variables(self):
325         """Returns simple variables names that to include in the ddf
326         """
327         variableTerm = dafTermOntology['variables']
328         results = ['view']
329         if self.need_replicate():
330             results.append('replicate')
331             
332         for obj in self.model.get_targets(self.submissionSet, variableTerm):
333             value = str(fromTypedNode(obj))
334             results.append(value)
335         results.append('labVersion')
336         return results
337
338     def make_submission_name(self, submission_dir):
339         submission_dir = os.path.normpath(submission_dir)
340         submission_dir_name = os.path.split(submission_dir)[1]
341         if len(submission_dir_name) == 0:
342             raise RuntimeError(
343                 "Submission dir name too short: %s" %(submission_dir,))
344         return submission_dir_name
345         
346     def get_submission_node(self, submission_dir):
347         """Convert a submission directory name to a submission node
348         """
349         submission_name = self.make_submission_name(submission_dir)
350         return self.submissionSetNS[submission_name]
351         
352     def _get_library_attribute(self, libNode, attribute):
353         if not isinstance(attribute, RDF.Node):
354             attribute = libraryOntology[attribute]
355
356         targets = list(self.model.get_targets(libNode, attribute))
357         if len(targets) > 0:
358             return self._format_library_attribute(targets)
359         else:
360             return None
361
362         #targets = self._search_same_as(libNode, attribute)
363         #if targets is not None:
364         #    return self._format_library_attribute(targets)
365         
366         # we don't know anything about this attribute
367         self._add_library_details_to_model(libNode)
368
369         targets = list(self.model.get_targets(libNode, attribute))
370         if len(targets) > 0:
371             return self._format_library_attribute(targets)
372
373         return None
374     
375     def _format_library_attribute(self, targets):
376         if len(targets) == 0:
377             return None
378         elif len(targets) == 1:
379             return fromTypedNode(targets[0])
380         elif len(targets) > 1:
381             return [fromTypedNode(t) for t in targets]
382
383     def _search_same_as(self, subject, predicate):
384         # look for alternate names
385         other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
386         for other in other_predicates:
387             targets = list(self.model.get_targets(subject, other))
388             if len(targets) > 0:
389                 return targets
390         return None
391         
392     def find_view(self, filename):
393         """Search through potential DAF filename patterns
394         """
395         if self.__view_map is None:
396             self.__view_map = self._get_filename_view_map()
397
398         results = []
399         for pattern, view in self.__view_map.items():
400             if re.match(pattern, filename):
401                 results.append(view)
402
403         if len(results) > 1:
404             msg = "%s matched multiple views %s" % (
405                 filename,
406                 [str(x) for x in results])
407             raise ModelException(msg)
408         elif len(results) == 1:
409             return results[0]
410         else:
411             return None
412         
413     def get_view_name(self, view):
414         names = list(self.model.get_targets(view, submissionOntology['view_name']))
415         if len(names) == 1:
416             return fromTypedNode(names[0])
417         else:
418             msg = "Found wrong number of view names for {0} len = {1}"
419             msg = msg.format(str(view), len(names))
420             logger.error(msg)
421             raise RuntimeError(msg)
422         
423             
424     def _get_filename_view_map(self):
425         """Query our model for filename patterns
426
427         return a dictionary of compiled regular expressions to view names
428         """
429         filename_query = RDF.Statement(
430             None, dafTermOntology['filename_re'], None)
431
432         patterns = {}
433         for s in self.model.find_statements(filename_query):
434             view_name = s.subject
435             literal_re = s.object.literal_value['string']
436             logger.debug("Found: %s" % (literal_re,))
437             try:
438                 filename_re = re.compile(literal_re)
439             except re.error, e:
440                 logger.error("Unable to compile: %s" % (literal_re,))
441             patterns[literal_re] = view_name
442         return patterns
443
444     def _get_library_url(self):
445         return str(self.libraryNS[''].uri)
446     def _set_library_url(self, value):
447         self.libraryNS = RDF.NS(str(value))
448     library_url = property(_get_library_url, _set_library_url)
449
450     def _is_paired(self, libNode):
451         """Determine if a library is paired end"""
452         library_type = self._get_library_attribute(libNode, 'library_type')
453         if library_type is None:
454             raise ModelException("%s doesn't have a library type" % (str(libNode),))
455         
456         #single = (1,3,6)
457         single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
458         paired = ['Paired End', 'Multiplexing', 'Barcoded']
459         if library_type in single:
460             return False
461         elif library_type in paired:
462             return True
463         else:
464             raise MetadataLookupException(
465                 "Unrecognized library type %s for %s" % \
466                 (library_type, str(libNode)))
467
468     def need_replicate(self):
469         viewTerm = dafTermOntology['views']
470         replicateTerm = dafTermOntology['hasReplicates']
471
472         views = self.model.get_targets(self.submissionSet, viewTerm)
473
474         for view in views:
475             replicate = self.model.get_target(view, replicateTerm)
476             if fromTypedNode(replicate):
477                 return True
478             
479         return False