Use a logger initialized to the module name much more consistently.
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import logging
4 import os
5 import re
6 import string
7 from StringIO import StringIO
8 import types
9 import urlparse
10
11 import RDF
12 from htsworkflow.util.rdfhelp import \
13      blankOrUri, \
14      dafTermOntology, \
15      get_model, \
16      libraryOntology, \
17      owlNS, \
18      rdfNS, \
19      submissionLog, \
20      submissionOntology, \
21      toTypedNode, \
22      fromTypedNode
23 from htsworkflow.util.hashfile import make_md5sum
24
25 logger = logging.getLogger(__name__)
26
27 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
28 VARIABLES_TERM_NAME = 'variables'
29 DAF_PRE_VARIABLES = ['files', 'view']
30 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
31
32
33 class ModelException(RuntimeError):
34     """Assumptions about the RDF model failed"""
35     pass
36
37
38 class MetadataLookupException(RuntimeError):
39     """Problem accessing metadata"""
40     pass
41
42
43 # STATES
44 DAF_HEADER = 1
45 DAF_VIEW = 2
46
47
48 def parse_into_model(model, subject, filename):
49     """Read a DAF into RDF Model
50
51     requires a subject node to attach statements to
52     """
53     attributes = parse(filename)
54     add_to_model(model, attributes, subject)
55
56
57 def fromstream_into_model(model, subject, daf_stream):
58     """Load daf stream into model attached to node subject
59     """
60     attributes = parse_stream(daf_stream)
61     add_to_model(model, attributes, subject)
62
63
64 def fromstring_into_model(model, subject, daf_string):
65     """Read a string containing a DAF into RDF Model
66
67     requires a short submission name
68     """
69     attributes = fromstring(daf_string)
70     add_to_model(model, attributes, subject)
71
72
73 def parse(filename):
74     """Parse daf from a file
75     """
76     stream = open(filename, 'r')
77     attributes = parse_stream(stream)
78     stream.close()
79     return attributes
80
81
82 def fromstring(daf_string):
83     """Parse UCSC daf from a provided string"""
84     stream = StringIO(daf_string)
85     return parse_stream(stream)
86
87
88 def parse_stream(stream):
89     """Parse UCSC dat stored in a stream"""
90     comment_re = re.compile("#.*$")
91
92     state = DAF_HEADER
93     attributes = {'views': {}}
94     view_name = None
95     view_attributes = {}
96     for line in stream:
97         #remove comments
98         line = comment_re.sub("", line)
99         nstop = _extract_name_index(line)
100         name = line[0:nstop]
101         sstop = _consume_whitespace(line, start=nstop)
102         vstop = _extract_value_index(line, start=sstop)
103         value = line[sstop:vstop]
104
105         if value.lower() in ('yes',):
106             value = True
107         elif value.lower() in ('no',):
108             value = False
109
110         if len(name) == 0:
111             if view_name is not None:
112                 attributes['views'][view_name] = view_attributes
113                 view_name = None
114                 view_attributes = {}
115             state = DAF_HEADER
116         elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
117             attributes[name] = [x.strip() for x in value.split(',')]
118         elif state == DAF_HEADER and name == 'view':
119             view_name = value
120             view_attributes['view'] = value
121             state = DAF_VIEW
122         elif state == DAF_HEADER:
123             attributes[name] = value
124         elif state == DAF_VIEW:
125             view_attributes[name] = value
126
127     # save last block
128     if view_name is not None:
129         attributes['views'][view_name] = view_attributes
130
131     return attributes
132
133
134 def _consume_whitespace(line, start=0):
135     """return index of next non whitespace character
136
137     returns length of string if it can't find anything
138     """
139     for i in xrange(start, len(line)):
140         if line[i] not in string.whitespace:
141             return i
142
143     return len(line)
144
145
146 def _extract_name_index(line, start=0):
147     """Used to find end of word by looking for a whitespace character
148
149     returns length of string if nothing matches
150     """
151     for i in xrange(start, len(line)):
152         if line[i] in string.whitespace:
153             return i
154
155     return len(line)
156
157
158 def _extract_value_index(line, start=0):
159     """Returns position of last non-whitespace character
160     """
161     shortline = line.rstrip()
162     return len(shortline)
163
164
165 def convert_to_rdf_statements(attributes, subject):
166     """Convert dictionary of DAF attributes into rdf statements
167
168     The statements are attached to the provided subject node
169     """
170     variables_term = dafTermOntology[VARIABLES_TERM_NAME]
171     statements = []
172     for daf_key in attributes:
173         predicate = dafTermOntology[daf_key]
174         if daf_key == 'views':
175             statements.extend(_views_to_statements(subject,
176                                                    dafTermOntology,
177                                                    attributes[daf_key]))
178         elif daf_key in DAF_VARIABLE_NAMES:
179             for var in attributes.get(daf_key, []):
180                 obj = toTypedNode(var)
181                 statements.append(RDF.Statement(subject, variables_term, obj))
182         else:
183             value = attributes[daf_key]
184             obj = toTypedNode(value)
185             statements.append(RDF.Statement(subject, predicate, obj))
186
187     return statements
188
189
190 def _views_to_statements(subject, dafNS, views):
191     """Attach view attributes to new view nodes atached to provided subject
192     """
193     viewNS = get_view_namespace(subject)
194
195     statements = []
196     for view_name in views:
197         view_attributes = views[view_name]
198         viewSubject = viewNS[view_name]
199         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
200         statements.append(
201             RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
202         for view_attribute_name in view_attributes:
203             predicate = dafNS[view_attribute_name]
204             obj = toTypedNode(view_attributes[view_attribute_name])
205             statements.append(RDF.Statement(viewSubject, predicate, obj))
206
207         #statements.extend(convert_to_rdf_statements(view, viewNode))
208     return statements
209
210
211 def add_to_model(model, attributes, subject):
212     for statement in convert_to_rdf_statements(attributes, subject):
213         model.add_statement(statement)
214
215
216 def get_submission_uri(name):
217     return submissionLog[name].uri
218
219
220 def submission_uri_to_string(submission_uri):
221     if isinstance(submission_uri, RDF.Node):
222         submission_uri = str(submission_uri.uri)
223     elif isinstance(submission_uri, RDF.Uri):
224         submission_uri = str(submission_uri)
225     if submission_uri[-1] != '/':
226         submission_uri += '/'
227     return submission_uri
228
229
230 def get_view_namespace(submission_uri):
231     submission_uri = submission_uri_to_string(submission_uri)
232     view_uri = urlparse.urljoin(submission_uri, 'view/')
233     viewNS = RDF.NS(view_uri)
234     return viewNS
235
236
237 class DAFMapper(object):
238     """Convert filenames to views in the UCSC Daf
239     """
240     def __init__(self, name, daf_file=None, model=None):
241         """Construct a RDF backed model of a UCSC DAF
242
243         :args:
244           name (str): the name of this submission (used to construct DAF url)
245           daf_file (str, stream, or None):
246              if str, use as filename
247              if stream, parse as stream
248              if none, don't attempt to load the DAF into our model
249           model (RDF.Model or None):
250              if None, construct a memory backed model
251              otherwise specifies model to use
252         """
253         if daf_file is None and model is None:
254             logger.error("We need a DAF or Model containing a DAF to work")
255
256         self.name = name
257         self.submissionSet = get_submission_uri(self.name)
258         self.viewNS = get_view_namespace(self.submissionSet)
259
260         if model is not None:
261             self.model = model
262         else:
263             self.model = get_model()
264
265         if hasattr(daf_file, 'next'):
266             # its some kind of stream
267             fromstream_into_model(self.model, self.submissionSet, daf_file)
268         else:
269             # file
270             parse_into_model(self.model, self.submissionSet, daf_file)
271
272         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
273         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
274         self.__view_map = None
275
276     def add_pattern(self, view_name, filename_pattern):
277         """Map a filename regular expression to a view name
278         """
279         obj = toTypedNode(filename_pattern)
280         self.model.add_statement(
281             RDF.Statement(self.viewNS[view_name],
282                           dafTermOntology['filename_re'],
283                           obj))
284
285     def import_submission_dir(self, submission_dir, library_id):
286         """Import a submission directories and update our model as needed
287         """
288         #attributes = get_filename_attribute_map(paired)
289         libNode = self.libraryNS[library_id + "/"]
290
291         self._add_library_details_to_model(libNode)
292
293         submission_files = os.listdir(submission_dir)
294         for filename in submission_files:
295             self.construct_track_attributes(submission_dir, libNode, filename)
296
297     def construct_track_attributes(self, submission_dir, libNode, pathname):
298         """Looking for the best extension
299         The 'best' is the longest match
300
301         :Args:
302         filename (str): the filename whose extention we are about to examine
303         """
304         path, filename = os.path.split(pathname)
305
306         logger.debug("Searching for view")
307         view = self.find_view(filename)
308         if view is None:
309             logger.warn("Unrecognized file: {0}".format(pathname))
310             return None
311         if str(view) == str(libraryOntology['ignore']):
312             return None
313
314         submission_name = self.make_submission_name(submission_dir)
315         submissionNode = self.get_submission_node(submission_dir)
316         submission_uri = str(submissionNode.uri)
317         view_name = fromTypedNode(self.model.get_target(view,
318                                        dafTermOntology['name']))
319         if view_name is None:
320             errmsg = 'Could not find view name for {0}'
321             logger.warning(errmsg.format(str(view)))
322             return
323
324         view_name = str(view_name)
325         submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
326
327         self.model.add_statement(
328             RDF.Statement(self.submissionSet,
329                           dafTermOntology['has_submission'],
330                           submissionNode))
331         logger.debug("Adding statements to {0}".format(str(submissionNode)))
332         self.model.add_statement(RDF.Statement(submissionNode,
333                                                submissionOntology['has_view'],
334                                                submissionView))
335         self.model.add_statement(RDF.Statement(submissionNode,
336                                                submissionOntology['name'],
337                                                toTypedNode(submission_name)))
338         self.model.add_statement(
339             RDF.Statement(submissionNode,
340                           rdfNS['type'],
341                           submissionOntology['submission']))
342         self.model.add_statement(RDF.Statement(submissionNode,
343                                                submissionOntology['library'],
344                                                libNode))
345
346         logger.debug("Adding statements to {0}".format(str(submissionView)))
347         # add track specific information
348         self.model.add_statement(
349             RDF.Statement(submissionView, dafTermOntology['view'], view))
350         self.model.add_statement(
351             RDF.Statement(submissionView,
352                           dafTermOntology['paired'],
353                           toTypedNode(self._is_paired(libNode))))
354         self.model.add_statement(
355             RDF.Statement(submissionView,
356                           dafTermOntology['submission'],
357                           submissionNode))
358
359         # add file specific information
360         self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
361
362         logger.debug("Done.")
363
364     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
365         # add file specific information
366         logger.debug("Updating file md5sum")
367         fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
368         submission_pathname = os.path.join(submission_dir, filename)
369         self.model.add_statement(
370             RDF.Statement(submissionView,
371                           dafTermOntology['has_file'],
372                           fileNode))
373         self.model.add_statement(
374             RDF.Statement(fileNode,
375                           dafTermOntology['filename'],
376                           filename))
377
378         md5 = make_md5sum(submission_pathname)
379         if md5 is None:
380             errmsg = "Unable to produce md5sum for {0}"
381             logger.warning(errmsg.format(submission_pathname))
382         else:
383             self.model.add_statement(
384                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
385
386     def _add_library_details_to_model(self, libNode):
387         parser = RDF.Parser(name='rdfa')
388         new_statements = parser.parse_as_stream(libNode.uri)
389         for s in new_statements:
390             # don't override things we already have in the model
391             targets = list(self.model.get_targets(s.subject, s.predicate))
392             if len(targets) == 0:
393                 self.model.append(s)
394
395     def get_daf_variables(self):
396         """Returns simple variables names that to include in the ddf
397         """
398         variables_term = dafTermOntology[VARIABLES_TERM_NAME]
399         results = []
400         results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
401         results = DAF_PRE_VARIABLES[:]
402         if self.need_replicate() and 'replicate' not in results:
403             results.append('replicate')
404
405         for obj in self.model.get_targets(self.submissionSet, variables_term):
406             value = str(fromTypedNode(obj))
407             if value not in results:
408                 results.append(value)
409         results.extend([v for v in DAF_POST_VARIABLES if v not in results])
410         return results
411
412     def make_submission_name(self, submission_dir):
413         submission_dir = os.path.normpath(submission_dir)
414         submission_dir_name = os.path.split(submission_dir)[1]
415         if len(submission_dir_name) == 0:
416             raise RuntimeError(
417                 "Submission dir name too short: {0}".format(submission_dir))
418         return submission_dir_name
419
420     def get_submission_node(self, submission_dir):
421         """Convert a submission directory name to a submission node
422         """
423         submission_name = self.make_submission_name(submission_dir)
424         return self.submissionSetNS[submission_name]
425
426     def _get_library_attribute(self, libNode, attribute):
427         if not isinstance(attribute, RDF.Node):
428             attribute = libraryOntology[attribute]
429
430         targets = list(self.model.get_targets(libNode, attribute))
431         if len(targets) > 0:
432             return self._format_library_attribute(targets)
433         else:
434             return None
435
436         #targets = self._search_same_as(libNode, attribute)
437         #if targets is not None:
438         #    return self._format_library_attribute(targets)
439
440         # we don't know anything about this attribute
441         self._add_library_details_to_model(libNode)
442
443         targets = list(self.model.get_targets(libNode, attribute))
444         if len(targets) > 0:
445             return self._format_library_attribute(targets)
446
447         return None
448
449     def _format_library_attribute(self, targets):
450         if len(targets) == 0:
451             return None
452         elif len(targets) == 1:
453             return fromTypedNode(targets[0])
454         elif len(targets) > 1:
455             return [fromTypedNode(t) for t in targets]
456
457     def _search_same_as(self, subject, predicate):
458         # look for alternate names
459         other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
460         for other in other_predicates:
461             targets = list(self.model.get_targets(subject, other))
462             if len(targets) > 0:
463                 return targets
464         return None
465
466     def find_view(self, filename):
467         """Search through potential DAF filename patterns
468         """
469         if self.__view_map is None:
470             self.__view_map = self._get_filename_view_map()
471
472         results = []
473         for pattern, view in self.__view_map.items():
474             if re.match(pattern, filename):
475                 results.append(view)
476
477         if len(results) > 1:
478             msg = "%s matched multiple views %s" % (
479                 filename,
480                 [str(x) for x in results])
481             raise ModelException(msg)
482         elif len(results) == 1:
483             return results[0]
484         else:
485             return None
486
487     def get_view_name(self, view):
488         view_term = submissionOntology['view_name']
489         names = list(self.model.get_targets(view, view_term))
490         if len(names) == 1:
491             return fromTypedNode(names[0])
492         else:
493             msg = "Found wrong number of view names for {0} len = {1}"
494             msg = msg.format(str(view), len(names))
495             logger.error(msg)
496             raise RuntimeError(msg)
497
498     def _get_filename_view_map(self):
499         """Query our model for filename patterns
500
501         return a dictionary of compiled regular expressions to view names
502         """
503         filename_query = RDF.Statement(
504             None, dafTermOntology['filename_re'], None)
505
506         patterns = {}
507         for s in self.model.find_statements(filename_query):
508             view_name = s.subject
509             literal_re = s.object.literal_value['string']
510             logger.debug("Found: %s" % (literal_re,))
511             try:
512                 filename_re = re.compile(literal_re)
513             except re.error, e:
514                 logger.error("Unable to compile: %s" % (literal_re,))
515             patterns[literal_re] = view_name
516         return patterns
517
518     def _get_library_url(self):
519         return str(self.libraryNS[''].uri)
520
521     def _set_library_url(self, value):
522         self.libraryNS = RDF.NS(str(value))
523
524     library_url = property(_get_library_url, _set_library_url)
525
526     def _is_paired(self, libNode):
527         """Determine if a library is paired end"""
528         library_type = self._get_library_attribute(libNode, 'library_type')
529         if library_type is None:
530             errmsg = "%s doesn't have a library type"
531             raise ModelException(errmsg % (str(libNode),))
532
533         single = ['CSHL (lacking last nt)',
534                   'Single End (non-multiplexed)',
535                   'Small RNA (non-multiplexed)',]
536         paired = ['Barcoded Illumina',
537                   'Multiplexing',
538                   'Nextera',
539                   'Paired End (non-multiplexed)',]
540         if library_type in single:
541             return False
542         elif library_type in paired:
543             return True
544         else:
545             raise MetadataLookupException(
546                 "Unrecognized library type %s for %s" % \
547                 (library_type, str(libNode)))
548
549     def need_replicate(self):
550         viewTerm = dafTermOntology['views']
551         replicateTerm = dafTermOntology['hasReplicates']
552
553         views = self.model.get_targets(self.submissionSet, viewTerm)
554
555         for view in views:
556             replicate = self.model.get_target(view, replicateTerm)
557             if fromTypedNode(replicate):
558                 return True
559
560         return False