Rework ucsc gather to use RDF models for gathering and storing track metadata.
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import logging
4 import os
5 import re
6 import string
7 from StringIO import StringIO
8 import types
9 import urlparse
10
11 import RDF
12 from htsworkflow.util.rdfhelp import \
13      blankOrUri, \
14      dafTermOntology, \
15      get_model, \
16      libraryOntology, \
17      owlNS, \
18      rdfNS, \
19      submissionLog, \
20      submissionOntology, \
21      toTypedNode, \
22      fromTypedNode
23
24 logger = logging.getLogger(__name__)
25
26 #
27 class ModelException(RuntimeError): pass
28     
29 # STATES
30 DAF_HEADER = 1
31 DAF_VIEW = 2
32
33 def parse_into_model(model, submission_name, filename):
34     """Read a DAF into RDF Model
35
36     requires a short submission name
37     """
38     attributes = parse(filename)
39     add_to_model(model, attributes, submission_name)
40
41 def fromstream_into_model(model, submission_name, daf_stream):
42     attributes = parse_stream(daf_stream)
43     add_to_model(model, attributes, submission_name)
44     
45 def fromstring_into_model(model, submission_name, daf_string):
46     """Read a string containing a DAF into RDF Model
47
48     requires a short submission name
49     """
50     attributes = fromstring(daf_string)
51     add_to_model(model, attributes, submission_name)
52     
53 def parse(filename):
54     stream = open(filename,'r')
55     attributes =  parse_stream(stream)
56     stream.close()
57     return attributes
58
59 def fromstring(daf_string):
60     stream = StringIO(daf_string)
61     return parse_stream(stream)
62
63 def parse_stream(stream):
64     comment_re = re.compile("#.*$")
65
66     state = DAF_HEADER
67     attributes = {'views': {}}
68     view_name = None
69     view_attributes = {}
70     for line in stream:
71         #remove comments
72         line = comment_re.sub("", line)
73         nstop = _extract_name_index(line)
74         name = line[0:nstop]
75         sstop = _consume_whitespace(line, start=nstop)
76         vstop = _extract_value_index(line, start=sstop)
77         value = line[sstop:vstop]
78
79         if value.lower() in ('yes',):
80             value = True
81         elif value.lower() in ('no',):
82             value = False
83             
84         if len(name) == 0:
85             if view_name is not None:
86                 attributes['views'][view_name] = view_attributes
87                 view_name = None
88                 view_attributes = {}
89             state = DAF_HEADER
90         elif state == DAF_HEADER and name == 'variables':
91             attributes[name] = [ x.strip() for x in value.split(',')]
92         elif state == DAF_HEADER and name == 'view':
93             view_name = value
94             view_attributes['view'] = value
95             state = DAF_VIEW
96         elif state == DAF_HEADER:
97             attributes[name] = value
98         elif state == DAF_VIEW:
99             view_attributes[name] = value
100
101     # save last block
102     if view_name is not None:
103         attributes['views'][view_name] = view_attributes
104         
105     return attributes
106
107 def _consume_whitespace(line, start=0):
108     for i in xrange(start, len(line)):
109         if line[i] not in string.whitespace:
110             return i
111         
112     return len(line)
113
114 def _extract_name_index(line, start=0):
115     for i in xrange(start, len(line)):
116         if line[i] in string.whitespace:
117             return i
118         
119     return len(line)
120
121 def _extract_value_index(line, start=0):
122     shortline = line.rstrip()
123     return len(shortline)
124
125 def convert_to_rdf_statements(attributes, name):
126     submission_uri = get_submission_uri(name)
127     subject = RDF.Node(submission_uri)
128
129     statements = []
130     for daf_key in attributes:
131         predicate = dafTermOntology[daf_key]
132         if daf_key == 'views':
133             statements.extend(_views_to_statements(name,
134                                                    dafTermOntology,
135                                                    attributes[daf_key]))
136         elif daf_key == 'variables':
137             #predicate = ddfNS['variables']
138             for var in attributes.get('variables', []):
139                 obj = toTypedNode(var)
140                 statements.append(RDF.Statement(subject, predicate, obj))
141         else:
142             value = attributes[daf_key]
143             obj = toTypedNode(value)
144             statements.append(RDF.Statement(subject,predicate,obj))
145
146     return statements
147
148 def _views_to_statements(name, dafNS, views):
149     subject = RDF.Node(get_submission_uri(name))
150     viewNS = get_view_namespace(name)
151
152     statements = []
153     for view_name in views:
154         view_attributes = views[view_name]
155         viewSubject = viewNS[view_name]
156         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
157         for view_attribute_name in view_attributes:
158             predicate = dafNS[view_attribute_name]
159             obj = toTypedNode(view_attributes[view_attribute_name])
160             statements.append(RDF.Statement(viewSubject, predicate, obj)) 
161             
162         #statements.extend(convert_to_rdf_statements(view, viewNode))
163     return statements
164
165 def add_to_model(model, attributes, name):
166     for statement in convert_to_rdf_statements(attributes, name):
167         model.add_statement(statement)
168             
169 def get_submission_uri(name):
170     return  submissionLog[name].uri
171
172 def get_view_namespace(name):
173     submission_uri = get_submission_uri(name)
174     viewNS = RDF.NS(str(submission_uri) + '/view/')
175     return viewNS
176
177 class DAFMapper(object):
178     """Convert filenames to views in the UCSC Daf
179     """
180     def __init__(self, name, daf_file=None, model=None):
181         """Construct a RDF backed model of a UCSC DAF
182
183         :args:
184           name (str): the name of this submission (used to construct DAF url)
185           daf_file (str, stream, or None):
186              if str, use as filename
187              if stream, parse as stream
188              if none, don't attempt to load the DAF into our model
189           model (RDF.Model or None):
190              if None, construct a memory backed model
191              otherwise specifies model to use
192         """
193         if daf_file is None and model is None:
194             logger.error("We need a DAF or Model containing a DAF to work")
195
196         self.name = name
197         if model is not None:
198             self.model = model
199         else:
200             self.model = get_model()
201
202         if hasattr(daf_file, 'next'):
203             # its some kind of stream
204             fromstream_into_model(self.model, name, daf_file)
205         else:
206             # file
207             parse_into_model(self.model, name, daf_file)
208
209         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
210         self.submissionSet = get_submission_uri(self.name)
211         self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/')
212         self.__view_map = None
213
214
215     def add_pattern(self, view_name, filename_pattern):
216         """Map a filename regular expression to a view name
217         """
218         viewNS = get_view_namespace(self.name)
219
220         obj = toTypedNode(filename_pattern)
221         self.model.add_statement(
222             RDF.Statement(viewNS[view_name],
223                           dafTermOntology['filename_re'],
224                           obj))
225
226
227     def import_submission_dir(self, submission_dir, library_id):
228         """Import a submission directories and update our model as needed
229         """
230         #attributes = get_filename_attribute_map(paired)
231         libNode = self.libraryNS[library_id + "/"]
232         
233         submission_files = os.listdir(submission_dir)
234         for f in submission_files:
235             self.construct_file_attributes(submission_dir, libNode, f)
236
237             #attributes['md5sum'] = "None"
238             #
239             #ext = attributes["filename_re"]
240             #if attributes.get("type", None) == 'fastq':
241             #    fastqs.setdefault(ext, set()).add(f)
242             #    fastq_attributes[ext] = attributes
243             #else:
244             #    md5sum = make_md5sum(os.path.join(result_dir,f))
245             #    if md5sum is not None:
246             #        attributes['md5sum']=md5sum
247             #print attributes
248             
249         
250     def construct_file_attributes(self, submission_dir, libNode, pathname):
251         """Looking for the best extension
252         The 'best' is the longest match
253         
254         :Args:
255         filename (str): the filename whose extention we are about to examine
256         """
257         path, filename = os.path.split(pathname)
258
259         view = self.find_view(filename)
260         if view is None:
261             logger.warn("Unrecognized file: %s" % (pathname,))
262             return None
263         if str(view) == str(libraryOntology['ignore']):
264             return None
265
266         submissionName = toTypedNode(self.make_submission_name(submission_dir))
267         submissionNode = self.get_submission_node(submission_dir)
268         self.model.add_statement(
269             RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode))
270
271         fileNode = RDF.Node(RDF.Uri(str(submissionNode.uri) + '/' +filename))
272         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], view))
273         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], submissionName))
274         self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
275
276         self.model.add_statement(
277             RDF.Statement(view, dafTermOntology['filename'], toTypedNode(filename)))
278         self.model.add_statement(
279             RDF.Statement(view, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
280         self.model.add_statement(
281             RDF.Statement(view, dafTermOntology['submission'], submissionNode))
282             
283         # extra information 
284         terms = [dafTermOntology['type'],
285                  dafTermOntology['filename_re'],
286                  ]
287         terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
288
289         # Add everything I can find
290         for term in terms:
291             value = self._get_library_attribute(libNode, term)
292             if value is not None:
293                 self.model.add_statement(RDF.Statement(view, term, value))
294
295             
296     def _add_library_details_to_model(self, libNode):
297         parser = RDF.Parser(name='rdfa')
298         new_statements = parser.parse_as_stream(libNode.uri)
299         for s in new_statements:
300             # don't override things we already have in the model
301             q = RDF.Statement(s.subject, s.predicate, None)
302             if len(list(self.model.find_statements(q))) == 0:
303                 self.model.append(s)
304             
305         statements = list(self.model.find_statements(q))
306         if len(statements) == 0:
307             logger.warning("Nothing known about %s" % (str(libNode),))
308
309     def get_daf_variables(self):
310         """Returns simple variables names that to include in the ddf
311         """
312         variableTerm = dafTermOntology['variables']
313         results = ['view']
314         for obj in self.model.get_targets(self.submissionSet, variableTerm):
315             value = str(fromTypedNode(obj))
316             results.append(value)
317         results.append('labVersion')
318         return results
319
320     def make_submission_name(self, submission_dir):
321         submission_dir = os.path.normpath(submission_dir)
322         submission_dir_name = os.path.split(submission_dir)[1]
323         if len(submission_dir_name) == 0:
324             raise RuntimeError(
325                 "Submission dir name too short: %s" %(submission_dir,))
326         return submission_dir_name
327     
328     def get_submission_node(self, submission_dir):
329         """Convert a submission directory name to a submission node
330         """
331         submission_name = self.make_submission_name(submission_dir)
332         return self.submissionSetNS[submission_name]
333         
334     def _get_library_attribute(self, libNode, attribute):
335         if not isinstance(attribute, RDF.Node):
336             attribute = libraryOntology[attribute]
337
338         # search through the model twice (adding in data from website)
339         for i in xrange(2):
340             targets = list(self.model.get_targets(libNode, attribute))
341             if len(targets) > 0:
342                 return self._format_library_attribute(targets)
343
344             targets = self._search_same_as(libNode, attribute)
345             if targets is not None:
346                 return self._format_library_attribute(targets)
347             
348             # we don't know anything about this attribute
349             self._add_library_details_to_model(libNode)
350
351         return None
352             
353     def _format_library_attribute(self, targets):
354         if len(targets) == 0:
355             return None
356         elif len(targets) == 1:
357             return fromTypedNode(targets[0])
358         elif len(targets) > 1:
359             return [fromTypedNode(t) for t in targets]
360
361     def _search_same_as(self, subject, predicate):
362         # look for alternate names
363         other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
364         for other in other_predicates:
365             targets = list(self.model.get_targets(subject, other))
366             if len(targets) > 0:
367                 return targets
368         return None
369         
370     def find_view(self, filename):
371         """Search through potential DAF filename patterns
372         """
373         if self.__view_map is None:
374             self.__view_map = self._get_filename_view_map()
375
376         results = []
377         for pattern, view in self.__view_map.items():
378             if re.match(pattern, filename):
379                 results.append(view)
380
381         if len(results) > 1:
382             msg = "%s matched multiple views %s" % (
383                 filename,
384                 [str(x) for x in results])
385             raise ModelException(msg)
386         elif len(results) == 1:
387             return results[0]
388         else:
389             return None
390         
391
392     def _get_filename_view_map(self):
393         """Query our model for filename patterns
394
395         return a dictionary of compiled regular expressions to view names
396         """
397         filename_query = RDF.Statement(
398             None, dafTermOntology['filename_re'], None)
399
400         patterns = {}
401         for s in self.model.find_statements(filename_query):
402             view_name = s.subject
403             literal_re = s.object.literal_value['string']
404             logger.debug("Found: %s" % (literal_re,))
405             try:
406                 filename_re = re.compile(literal_re)
407             except re.error, e:
408                 logger.error("Unable to compile: %s" % (literal_re,))
409             patterns[literal_re] = view_name
410         return patterns
411
412     def _is_paired(self, libNode):
413         """Determine if a library is paired end"""
414         library_type = self._get_library_attribute(libNode, 'library_type')
415         if library_type is None:
416             raise ModelException("%s doesn't have a library type" % (str(libNode),))
417         
418         #single = (1,3,6)
419         single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
420         paired = ['Paired End', 'Multiplexing', 'Barcoded']
421         if library_type in single:
422             return False
423         elif library_type in paired:
424             return True
425         else:
426             raise RuntimeError("Unrecognized library type %s" % (library_type,))