Attempt to download DAF data for a encodesubmit submission
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import logging
4 import os
5 import re
6 import string
7 from StringIO import StringIO
8 import types
9 import urlparse
10
11 import RDF
12 from htsworkflow.util.rdfhelp import \
13      blankOrUri, \
14      dafTermOntology, \
15      get_model, \
16      libraryOntology, \
17      owlNS, \
18      rdfNS, \
19      submissionLog, \
20      submissionOntology, \
21      toTypedNode, \
22      fromTypedNode
23 from htsworkflow.util.hashfile import make_md5sum
24
25 logger = logging.getLogger(__name__)
26
27
28 class ModelException(RuntimeError):
29     """Assumptions about the RDF model failed"""
30     pass
31
32
33 class MetadataLookupException(RuntimeError):
34     """Problem accessing metadata"""
35     pass
36
37
38 # STATES
39 DAF_HEADER = 1
40 DAF_VIEW = 2
41
42
43 def parse_into_model(model, subject, filename):
44     """Read a DAF into RDF Model
45
46     requires a subject node to attach statements to
47     """
48     attributes = parse(filename)
49     add_to_model(model, attributes, subject)
50
51
52 def fromstream_into_model(model, subject, daf_stream):
53     """Load daf stream into model attached to node subject
54     """
55     attributes = parse_stream(daf_stream)
56     add_to_model(model, attributes, subject)
57
58
59 def fromstring_into_model(model, subject, daf_string):
60     """Read a string containing a DAF into RDF Model
61
62     requires a short submission name
63     """
64     attributes = fromstring(daf_string)
65     add_to_model(model, attributes, subject)
66
67
68 def parse(filename):
69     """Parse daf from a file
70     """
71     stream = open(filename, 'r')
72     attributes = parse_stream(stream)
73     stream.close()
74     return attributes
75
76
77 def fromstring(daf_string):
78     """Parse UCSC daf from a provided string"""
79     stream = StringIO(daf_string)
80     return parse_stream(stream)
81
82
83 def parse_stream(stream):
84     """Parse UCSC dat stored in a stream"""
85     comment_re = re.compile("#.*$")
86
87     state = DAF_HEADER
88     attributes = {'views': {}}
89     view_name = None
90     view_attributes = {}
91     for line in stream:
92         #remove comments
93         line = comment_re.sub("", line)
94         nstop = _extract_name_index(line)
95         name = line[0:nstop]
96         sstop = _consume_whitespace(line, start=nstop)
97         vstop = _extract_value_index(line, start=sstop)
98         value = line[sstop:vstop]
99
100         if value.lower() in ('yes',):
101             value = True
102         elif value.lower() in ('no',):
103             value = False
104
105         if len(name) == 0:
106             if view_name is not None:
107                 attributes['views'][view_name] = view_attributes
108                 view_name = None
109                 view_attributes = {}
110             state = DAF_HEADER
111         elif state == DAF_HEADER and name == 'variables':
112             attributes[name] = [x.strip() for x in value.split(',')]
113         elif state == DAF_HEADER and name == 'view':
114             view_name = value
115             view_attributes['view'] = value
116             state = DAF_VIEW
117         elif state == DAF_HEADER:
118             attributes[name] = value
119         elif state == DAF_VIEW:
120             view_attributes[name] = value
121
122     # save last block
123     if view_name is not None:
124         attributes['views'][view_name] = view_attributes
125
126     return attributes
127
128
129 def _consume_whitespace(line, start=0):
130     """return index of next non whitespace character
131
132     returns length of string if it can't find anything
133     """
134     for i in xrange(start, len(line)):
135         if line[i] not in string.whitespace:
136             return i
137
138     return len(line)
139
140
141 def _extract_name_index(line, start=0):
142     """Used to find end of word by looking for a whitespace character
143
144     returns length of string if nothing matches
145     """
146     for i in xrange(start, len(line)):
147         if line[i] in string.whitespace:
148             return i
149
150     return len(line)
151
152
153 def _extract_value_index(line, start=0):
154     """Returns position of last non-whitespace character
155     """
156     shortline = line.rstrip()
157     return len(shortline)
158
159
160 def convert_to_rdf_statements(attributes, subject):
161     """Convert dictionary of DAF attributes into rdf statements
162
163     The statements are attached to the provided subject node
164     """
165     statements = []
166     for daf_key in attributes:
167         predicate = dafTermOntology[daf_key]
168         if daf_key == 'views':
169             statements.extend(_views_to_statements(subject,
170                                                    dafTermOntology,
171                                                    attributes[daf_key]))
172         elif daf_key == 'variables':
173             #predicate = ddfNS['variables']
174             for var in attributes.get('variables', []):
175                 obj = toTypedNode(var)
176                 statements.append(RDF.Statement(subject, predicate, obj))
177         else:
178             value = attributes[daf_key]
179             obj = toTypedNode(value)
180             statements.append(RDF.Statement(subject, predicate, obj))
181
182     return statements
183
184
185 def _views_to_statements(subject, dafNS, views):
186     """Attach view attributes to new view nodes atached to provided subject
187     """
188     viewNS = get_view_namespace(subject)
189
190     statements = []
191     for view_name in views:
192         view_attributes = views[view_name]
193         viewSubject = viewNS[view_name]
194         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
195         statements.append(
196             RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
197         for view_attribute_name in view_attributes:
198             predicate = dafNS[view_attribute_name]
199             obj = toTypedNode(view_attributes[view_attribute_name])
200             statements.append(RDF.Statement(viewSubject, predicate, obj))
201
202         #statements.extend(convert_to_rdf_statements(view, viewNode))
203     return statements
204
205
206 def add_to_model(model, attributes, subject):
207     for statement in convert_to_rdf_statements(attributes, subject):
208         model.add_statement(statement)
209
210
211 def get_submission_uri(name):
212     return submissionLog[name].uri
213
214
215 def submission_uri_to_string(submission_uri):
216     if isinstance(submission_uri, RDF.Node):
217         submission_uri = str(submission_uri.uri)
218     elif isinstance(submission_uri, RDF.Uri):
219         submission_uri = str(submission_uri)
220     if submission_uri[-1] != '/':
221         submission_uri += '/'
222     return submission_uri
223
224
225 def get_view_namespace(submission_uri):
226     submission_uri = submission_uri_to_string(submission_uri)
227     view_uri = urlparse.urljoin(submission_uri, 'view/')
228     viewNS = RDF.NS(view_uri)
229     return viewNS
230
231
232 class DAFMapper(object):
233     """Convert filenames to views in the UCSC Daf
234     """
235     def __init__(self, name, daf_file=None, model=None):
236         """Construct a RDF backed model of a UCSC DAF
237
238         :args:
239           name (str): the name of this submission (used to construct DAF url)
240           daf_file (str, stream, or None):
241              if str, use as filename
242              if stream, parse as stream
243              if none, don't attempt to load the DAF into our model
244           model (RDF.Model or None):
245              if None, construct a memory backed model
246              otherwise specifies model to use
247         """
248         if daf_file is None and model is None:
249             logger.error("We need a DAF or Model containing a DAF to work")
250
251         self.name = name
252         self.submissionSet = get_submission_uri(self.name)
253         self.viewNS = get_view_namespace(self.submissionSet)
254
255         if model is not None:
256             self.model = model
257         else:
258             self.model = get_model()
259
260         if hasattr(daf_file, 'next'):
261             # its some kind of stream
262             fromstream_into_model(self.model, self.submissionSet, daf_file)
263         else:
264             # file
265             parse_into_model(self.model, self.submissionSet, daf_file)
266
267         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
268         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
269         self.__view_map = None
270
271     def add_pattern(self, view_name, filename_pattern):
272         """Map a filename regular expression to a view name
273         """
274         obj = toTypedNode(filename_pattern)
275         self.model.add_statement(
276             RDF.Statement(self.viewNS[view_name],
277                           dafTermOntology['filename_re'],
278                           obj))
279
280     def import_submission_dir(self, submission_dir, library_id):
281         """Import a submission directories and update our model as needed
282         """
283         #attributes = get_filename_attribute_map(paired)
284         libNode = self.libraryNS[library_id + "/"]
285
286         self._add_library_details_to_model(libNode)
287
288         submission_files = os.listdir(submission_dir)
289         for filename in submission_files:
290             self.construct_track_attributes(submission_dir, libNode, filename)
291
292     def construct_track_attributes(self, submission_dir, libNode, pathname):
293         """Looking for the best extension
294         The 'best' is the longest match
295
296         :Args:
297         filename (str): the filename whose extention we are about to examine
298         """
299         path, filename = os.path.split(pathname)
300
301         logger.debug("Searching for view")
302         view = self.find_view(filename)
303         if view is None:
304             logger.warn("Unrecognized file: {0}".format(pathname))
305             return None
306         if str(view) == str(libraryOntology['ignore']):
307             return None
308
309         submission_name = self.make_submission_name(submission_dir)
310         submissionNode = self.get_submission_node(submission_dir)
311         submission_uri = str(submissionNode.uri)
312         view_name = fromTypedNode(self.model.get_target(view,
313                                        dafTermOntology['name']))
314         if view_name is None:
315             errmsg = 'Could not find view name for {0}'
316             logging.warning(errmsg.format(str(view)))
317             return
318
319         view_name = str(view_name)
320         submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
321
322         self.model.add_statement(
323             RDF.Statement(self.submissionSet,
324                           dafTermOntology['has_submission'],
325                           submissionNode))
326         logger.debug("Adding statements to {0}".format(str(submissionNode)))
327         self.model.add_statement(RDF.Statement(submissionNode,
328                                                submissionOntology['has_view'],
329                                                submissionView))
330         self.model.add_statement(RDF.Statement(submissionNode,
331                                                submissionOntology['name'],
332                                                toTypedNode(submission_name)))
333         self.model.add_statement(
334             RDF.Statement(submissionNode,
335                           rdfNS['type'],
336                           submissionOntology['submission']))
337         self.model.add_statement(RDF.Statement(submissionNode,
338                                                submissionOntology['library'],
339                                                libNode))
340
341         logger.debug("Adding statements to {0}".format(str(submissionView)))
342         # add track specific information
343         self.model.add_statement(
344             RDF.Statement(submissionView, dafTermOntology['view'], view))
345         self.model.add_statement(
346             RDF.Statement(submissionView,
347                           dafTermOntology['paired'],
348                           toTypedNode(self._is_paired(libNode))))
349         self.model.add_statement(
350             RDF.Statement(submissionView,
351                           dafTermOntology['submission'],
352                           submissionNode))
353
354         # extra information
355         terms = [dafTermOntology['type'],
356                  dafTermOntology['filename_re'],
357                  ]
358         terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
359
360         # add file specific information
361         self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
362
363         logger.debug("Done.")
364
365     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
366         # add file specific information
367         logger.debug("Updating file md5sum")
368         fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
369         submission_pathname = os.path.join(submission_dir, filename)
370         self.model.add_statement(
371             RDF.Statement(submissionView,
372                           dafTermOntology['has_file'],
373                           fileNode))
374         self.model.add_statement(
375             RDF.Statement(fileNode,
376                           dafTermOntology['filename'],
377                           filename))
378
379         md5 = make_md5sum(submission_pathname)
380         if md5 is None:
381             errmsg = "Unable to produce md5sum for {0}"
382             logging.warning(errmsg.format(submission_pathname))
383         else:
384             self.model.add_statement(
385                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
386
387     def _add_library_details_to_model(self, libNode):
388         parser = RDF.Parser(name='rdfa')
389         new_statements = parser.parse_as_stream(libNode.uri)
390         for s in new_statements:
391             # don't override things we already have in the model
392             targets = list(self.model.get_targets(s.subject, s.predicate))
393             if len(targets) == 0:
394                 self.model.append(s)
395
396     def get_daf_variables(self):
397         """Returns simple variables names that to include in the ddf
398         """
399         variableTerm = dafTermOntology['variables']
400         results = ['view']
401         if self.need_replicate():
402             results.append('replicate')
403
404         for obj in self.model.get_targets(self.submissionSet, variableTerm):
405             value = str(fromTypedNode(obj))
406             results.append(value)
407         results.append('labVersion')
408         return results
409
410     def make_submission_name(self, submission_dir):
411         submission_dir = os.path.normpath(submission_dir)
412         submission_dir_name = os.path.split(submission_dir)[1]
413         if len(submission_dir_name) == 0:
414             raise RuntimeError(
415                 "Submission dir name too short: {0}".format(submission_dir))
416         return submission_dir_name
417
418     def get_submission_node(self, submission_dir):
419         """Convert a submission directory name to a submission node
420         """
421         submission_name = self.make_submission_name(submission_dir)
422         return self.submissionSetNS[submission_name]
423
424     def _get_library_attribute(self, libNode, attribute):
425         if not isinstance(attribute, RDF.Node):
426             attribute = libraryOntology[attribute]
427
428         targets = list(self.model.get_targets(libNode, attribute))
429         if len(targets) > 0:
430             return self._format_library_attribute(targets)
431         else:
432             return None
433
434         #targets = self._search_same_as(libNode, attribute)
435         #if targets is not None:
436         #    return self._format_library_attribute(targets)
437
438         # we don't know anything about this attribute
439         self._add_library_details_to_model(libNode)
440
441         targets = list(self.model.get_targets(libNode, attribute))
442         if len(targets) > 0:
443             return self._format_library_attribute(targets)
444
445         return None
446
447     def _format_library_attribute(self, targets):
448         if len(targets) == 0:
449             return None
450         elif len(targets) == 1:
451             return fromTypedNode(targets[0])
452         elif len(targets) > 1:
453             return [fromTypedNode(t) for t in targets]
454
455     def _search_same_as(self, subject, predicate):
456         # look for alternate names
457         other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
458         for other in other_predicates:
459             targets = list(self.model.get_targets(subject, other))
460             if len(targets) > 0:
461                 return targets
462         return None
463
464     def find_view(self, filename):
465         """Search through potential DAF filename patterns
466         """
467         if self.__view_map is None:
468             self.__view_map = self._get_filename_view_map()
469
470         results = []
471         for pattern, view in self.__view_map.items():
472             if re.match(pattern, filename):
473                 results.append(view)
474
475         if len(results) > 1:
476             msg = "%s matched multiple views %s" % (
477                 filename,
478                 [str(x) for x in results])
479             raise ModelException(msg)
480         elif len(results) == 1:
481             return results[0]
482         else:
483             return None
484
485     def get_view_name(self, view):
486         view_term = submissionOntology['view_name']
487         names = list(self.model.get_targets(view, view_term))
488         if len(names) == 1:
489             return fromTypedNode(names[0])
490         else:
491             msg = "Found wrong number of view names for {0} len = {1}"
492             msg = msg.format(str(view), len(names))
493             logger.error(msg)
494             raise RuntimeError(msg)
495
496     def _get_filename_view_map(self):
497         """Query our model for filename patterns
498
499         return a dictionary of compiled regular expressions to view names
500         """
501         filename_query = RDF.Statement(
502             None, dafTermOntology['filename_re'], None)
503
504         patterns = {}
505         for s in self.model.find_statements(filename_query):
506             view_name = s.subject
507             literal_re = s.object.literal_value['string']
508             logger.debug("Found: %s" % (literal_re,))
509             try:
510                 filename_re = re.compile(literal_re)
511             except re.error, e:
512                 logger.error("Unable to compile: %s" % (literal_re,))
513             patterns[literal_re] = view_name
514         return patterns
515
516     def _get_library_url(self):
517         return str(self.libraryNS[''].uri)
518
519     def _set_library_url(self, value):
520         self.libraryNS = RDF.NS(str(value))
521
522     library_url = property(_get_library_url, _set_library_url)
523
524     def _is_paired(self, libNode):
525         """Determine if a library is paired end"""
526         library_type = self._get_library_attribute(libNode, 'library_type')
527         if library_type is None:
528             errmsg = "%s doesn't have a library type"
529             raise ModelException(errmsg % (str(libNode),))
530
531         #single = (1,3,6)
532         single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
533         paired = ['Paired End', 'Multiplexing', 'Barcoded']
534         if library_type in single:
535             return False
536         elif library_type in paired:
537             return True
538         else:
539             raise MetadataLookupException(
540                 "Unrecognized library type %s for %s" % \
541                 (library_type, str(libNode)))
542
543     def need_replicate(self):
544         viewTerm = dafTermOntology['views']
545         replicateTerm = dafTermOntology['hasReplicates']
546
547         views = self.model.get_targets(self.submissionSet, viewTerm)
548
549         for view in views:
550             replicate = self.model.get_target(view, replicateTerm)
551             if fromTypedNode(replicate):
552                 return True
553
554         return False