7534a8d9ef6082ffd901d80fa8ea87c6b43b55d4
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import logging
4 import os
5 from pprint import pformat
6 import re
7 import string
8 from StringIO import StringIO
9 import types
10 import urlparse
11
12 import RDF
13 from htsworkflow.util.rdfhelp import \
14      blankOrUri, \
15      dafTermOntology, \
16      dump_model, \
17      get_model, \
18      libraryOntology, \
19      owlNS, \
20      rdfNS, \
21      submissionLog, \
22      submissionOntology, \
23      toTypedNode, \
24      fromTypedNode
25 from htsworkflow.util.hashfile import make_md5sum
26
27 logger = logging.getLogger(__name__)
28
29 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
30 VARIABLES_TERM_NAME = 'variables'
31 DAF_PRE_VARIABLES = ['files', 'view']
32 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
33
34
35 class ModelException(RuntimeError):
36     """Assumptions about the RDF model failed"""
37     pass
38
39
40 class MetadataLookupException(RuntimeError):
41     """Problem accessing metadata"""
42     pass
43
44
45 # STATES
46 DAF_HEADER = 1
47 DAF_VIEW = 2
48
49
50 def parse_into_model(model, subject, filename):
51     """Read a DAF into RDF Model
52
53     requires a subject node to attach statements to
54     """
55     attributes = parse(filename)
56     add_to_model(model, attributes, subject)
57
58
59 def fromstream_into_model(model, subject, daf_stream):
60     """Load daf stream into model attached to node subject
61     """
62     attributes = parse_stream(daf_stream)
63     add_to_model(model, attributes, subject)
64
65
66 def fromstring_into_model(model, subject, daf_string):
67     """Read a string containing a DAF into RDF Model
68
69     requires a short submission name
70     """
71     attributes = fromstring(daf_string)
72     add_to_model(model, attributes, subject)
73
74
75 def parse(filename):
76     """Parse daf from a file
77     """
78     stream = open(filename, 'r')
79     attributes = parse_stream(stream)
80     stream.close()
81     return attributes
82
83
84 def fromstring(daf_string):
85     """Parse UCSC daf from a provided string"""
86     stream = StringIO(daf_string)
87     return parse_stream(stream)
88
89
90 def parse_stream(stream):
91     """Parse UCSC dat stored in a stream"""
92     comment_re = re.compile("#.*$")
93
94     state = DAF_HEADER
95     attributes = {'views': {}}
96     view_name = None
97     view_attributes = {}
98     for line in stream:
99         #remove comments
100         line = comment_re.sub("", line)
101         nstop = _extract_name_index(line)
102         name = line[0:nstop]
103         sstop = _consume_whitespace(line, start=nstop)
104         vstop = _extract_value_index(line, start=sstop)
105         value = line[sstop:vstop]
106
107         if value.lower() in ('yes',):
108             value = True
109         elif value.lower() in ('no',):
110             value = False
111
112         if len(name) == 0:
113             if view_name is not None:
114                 attributes['views'][view_name] = view_attributes
115                 view_name = None
116                 view_attributes = {}
117             state = DAF_HEADER
118         elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
119             attributes[name] = [x.strip() for x in value.split(',')]
120         elif state == DAF_HEADER and name == 'view':
121             view_name = value
122             view_attributes['view'] = value
123             state = DAF_VIEW
124         elif state == DAF_HEADER:
125             attributes[name] = value
126         elif state == DAF_VIEW:
127             view_attributes[name] = value
128
129     # save last block
130     if view_name is not None:
131         attributes['views'][view_name] = view_attributes
132
133     logger.debug("DAF Attributes" + pformat(attributes))
134     return attributes
135
136
137 def _consume_whitespace(line, start=0):
138     """return index of next non whitespace character
139
140     returns length of string if it can't find anything
141     """
142     for i in xrange(start, len(line)):
143         if line[i] not in string.whitespace:
144             return i
145
146     return len(line)
147
148
149 def _extract_name_index(line, start=0):
150     """Used to find end of word by looking for a whitespace character
151
152     returns length of string if nothing matches
153     """
154     for i in xrange(start, len(line)):
155         if line[i] in string.whitespace:
156             return i
157
158     return len(line)
159
160
161 def _extract_value_index(line, start=0):
162     """Returns position of last non-whitespace character
163     """
164     shortline = line.rstrip()
165     return len(shortline)
166
167
168 def convert_to_rdf_statements(attributes, subject):
169     """Convert dictionary of DAF attributes into rdf statements
170
171     The statements are attached to the provided subject node
172     """
173     variables_term = dafTermOntology[VARIABLES_TERM_NAME]
174     statements = []
175     for daf_key in attributes:
176         predicate = dafTermOntology[daf_key]
177         if daf_key == 'views':
178             statements.extend(_views_to_statements(subject,
179                                                    dafTermOntology,
180                                                    attributes[daf_key]))
181         elif daf_key in DAF_VARIABLE_NAMES:
182             for var in attributes.get(daf_key, []):
183                 obj = toTypedNode(var)
184                 statements.append(RDF.Statement(subject, variables_term, obj))
185         else:
186             value = attributes[daf_key]
187             obj = toTypedNode(value)
188             statements.append(RDF.Statement(subject, predicate, obj))
189
190     return statements
191
192
193 def _views_to_statements(subject, dafNS, views):
194     """Attach view attributes to new view nodes atached to provided subject
195     """
196     viewNS = get_view_namespace(subject)
197
198     statements = []
199     for view_name in views:
200         view_attributes = views[view_name]
201         viewSubject = viewNS[view_name]
202         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
203         statements.append(
204             RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
205         for view_attribute_name in view_attributes:
206             predicate = dafNS[view_attribute_name]
207             obj = toTypedNode(view_attributes[view_attribute_name])
208             statements.append(RDF.Statement(viewSubject, predicate, obj))
209
210         #statements.extend(convert_to_rdf_statements(view, viewNode))
211     return statements
212
213
214 def add_to_model(model, attributes, subject):
215     for statement in convert_to_rdf_statements(attributes, subject):
216         model.add_statement(statement)
217
218
219 def get_submission_uri(name):
220     return submissionLog[name].uri
221
222
223 def submission_uri_to_string(submission_uri):
224     if isinstance(submission_uri, RDF.Node):
225         submission_uri = str(submission_uri.uri)
226     elif isinstance(submission_uri, RDF.Uri):
227         submission_uri = str(submission_uri)
228     if submission_uri[-1] != '/':
229         submission_uri += '/'
230     return submission_uri
231
232
233 def get_view_namespace(submission_uri):
234     submission_uri = submission_uri_to_string(submission_uri)
235     view_uri = urlparse.urljoin(submission_uri, 'view/')
236     viewNS = RDF.NS(view_uri)
237     return viewNS
238
239
240 class DAFMapper(object):
241     """Convert filenames to views in the UCSC Daf
242     """
243     def __init__(self, name, daf_file=None, model=None):
244         """Construct a RDF backed model of a UCSC DAF
245
246         :args:
247           name (str): the name of this submission (used to construct DAF url)
248           daf_file (str, stream, or None):
249              if str, use as filename
250              if stream, parse as stream
251              if none, don't attempt to load the DAF into our model
252           model (RDF.Model or None):
253              if None, construct a memory backed model
254              otherwise specifies model to use
255         """
256         if daf_file is None and model is None:
257             logger.error("We need a DAF or Model containing a DAF to work")
258
259         self.name = name
260         self.submissionSet = get_submission_uri(self.name)
261         self.viewNS = get_view_namespace(self.submissionSet)
262
263         if model is not None:
264             self.model = model
265         else:
266             self.model = get_model()
267
268         if hasattr(daf_file, 'next'):
269             # its some kind of stream
270             fromstream_into_model(self.model, self.submissionSet, daf_file)
271         else:
272             # file
273             parse_into_model(self.model, self.submissionSet, daf_file)
274
275         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
276         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
277         self.__view_map = None
278
279     def add_pattern(self, view_name, filename_pattern):
280         """Map a filename regular expression to a view name
281         """
282         obj = toTypedNode(filename_pattern)
283         self.model.add_statement(
284             RDF.Statement(self.viewNS[view_name],
285                           dafTermOntology['filename_re'],
286                           obj))
287
288     def import_submission_dir(self, submission_dir, library_id):
289         """Import a submission directories and update our model as needed
290         """
291         #attributes = get_filename_attribute_map(paired)
292         libNode = self.libraryNS[library_id + "/"]
293
294         self._add_library_details_to_model(libNode)
295
296         submission_files = os.listdir(submission_dir)
297         for filename in submission_files:
298             self.construct_track_attributes(submission_dir, libNode, filename)
299
300     def construct_track_attributes(self, submission_dir, libNode, pathname):
301         """Looking for the best extension
302         The 'best' is the longest match
303
304         :Args:
305         filename (str): the filename whose extention we are about to examine
306         """
307         path, filename = os.path.split(pathname)
308
309         logger.debug("Searching for view")
310         view = self.find_view(filename)
311         if view is None:
312             logger.warn("Unrecognized file: {0}".format(pathname))
313             return None
314         if str(view) == str(libraryOntology['ignore']):
315             return None
316
317         submission_name = self.make_submission_name(submission_dir)
318         submissionNode = self.get_submission_node(submission_dir)
319         submission_uri = str(submissionNode.uri)
320         view_name = fromTypedNode(self.model.get_target(view,
321                                        dafTermOntology['name']))
322         if view_name is None:
323             errmsg = 'Could not find view name for {0}'
324             logger.warning(errmsg.format(str(view)))
325             return
326
327         view_name = str(view_name)
328         submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
329
330         self.model.add_statement(
331             RDF.Statement(self.submissionSet,
332                           dafTermOntology['has_submission'],
333                           submissionNode))
334         logger.debug("Adding statements to {0}".format(str(submissionNode)))
335         self.model.add_statement(RDF.Statement(submissionNode,
336                                                submissionOntology['has_view'],
337                                                submissionView))
338         self.model.add_statement(RDF.Statement(submissionNode,
339                                                submissionOntology['name'],
340                                                toTypedNode(submission_name)))
341         self.model.add_statement(
342             RDF.Statement(submissionNode,
343                           rdfNS['type'],
344                           submissionOntology['submission']))
345         self.model.add_statement(RDF.Statement(submissionNode,
346                                                submissionOntology['library'],
347                                                libNode))
348
349         logger.debug("Adding statements to {0}".format(str(submissionView)))
350         # add track specific information
351         self.model.add_statement(
352             RDF.Statement(submissionView, dafTermOntology['view'], view))
353         self.model.add_statement(
354             RDF.Statement(submissionView,
355                           dafTermOntology['paired'],
356                           toTypedNode(self._is_paired(libNode))))
357         self.model.add_statement(
358             RDF.Statement(submissionView,
359                           dafTermOntology['submission'],
360                           submissionNode))
361
362         # add file specific information
363         self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
364
365         logger.debug("Done.")
366
367     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
368         # add file specific information
369         logger.debug("Updating file md5sum")
370         fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
371         submission_pathname = os.path.join(submission_dir, filename)
372         self.model.add_statement(
373             RDF.Statement(submissionView,
374                           dafTermOntology['has_file'],
375                           fileNode))
376         self.model.add_statement(
377             RDF.Statement(fileNode,
378                           dafTermOntology['filename'],
379                           filename))
380
381         md5 = make_md5sum(submission_pathname)
382         if md5 is None:
383             errmsg = "Unable to produce md5sum for {0}"
384             logger.warning(errmsg.format(submission_pathname))
385         else:
386             self.model.add_statement(
387                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
388
389     def _add_library_details_to_model(self, libNode):
390         parser = RDF.Parser(name='rdfa')
391         new_statements = parser.parse_as_stream(libNode.uri)
392         for s in new_statements:
393             # don't override things we already have in the model
394             targets = list(self.model.get_targets(s.subject, s.predicate))
395             if len(targets) == 0:
396                 self.model.append(s)
397
398     def get_daf_variables(self):
399         """Returns simple variables names that to include in the ddf
400         """
401         variables_term = dafTermOntology[VARIABLES_TERM_NAME]
402         results = []
403         results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
404         results = DAF_PRE_VARIABLES[:]
405         if self.need_replicate() and 'replicate' not in results:
406             results.append('replicate')
407
408         for obj in self.model.get_targets(self.submissionSet, variables_term):
409             value = str(fromTypedNode(obj))
410             if value not in results:
411                 results.append(value)
412         results.extend([v for v in DAF_POST_VARIABLES if v not in results])
413         return results
414
415     def make_submission_name(self, submission_dir):
416         submission_dir = os.path.normpath(submission_dir)
417         submission_dir_name = os.path.split(submission_dir)[1]
418         if len(submission_dir_name) == 0:
419             raise RuntimeError(
420                 "Submission dir name too short: {0}".format(submission_dir))
421         return submission_dir_name
422
423     def get_submission_node(self, submission_dir):
424         """Convert a submission directory name to a submission node
425         """
426         submission_name = self.make_submission_name(submission_dir)
427         return self.submissionSetNS[submission_name]
428
429     def _get_library_attribute(self, libNode, attribute):
430         if not isinstance(attribute, RDF.Node):
431             attribute = libraryOntology[attribute]
432
433         targets = list(self.model.get_targets(libNode, attribute))
434         if len(targets) > 0:
435             return self._format_library_attribute(targets)
436         else:
437             return None
438
439         #targets = self._search_same_as(libNode, attribute)
440         #if targets is not None:
441         #    return self._format_library_attribute(targets)
442
443         # we don't know anything about this attribute
444         self._add_library_details_to_model(libNode)
445
446         targets = list(self.model.get_targets(libNode, attribute))
447         if len(targets) > 0:
448             return self._format_library_attribute(targets)
449
450         return None
451
452     def _format_library_attribute(self, targets):
453         if len(targets) == 0:
454             return None
455         elif len(targets) == 1:
456             return fromTypedNode(targets[0])
457         elif len(targets) > 1:
458             return [fromTypedNode(t) for t in targets]
459
460     def _search_same_as(self, subject, predicate):
461         # look for alternate names
462         other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
463         for other in other_predicates:
464             targets = list(self.model.get_targets(subject, other))
465             if len(targets) > 0:
466                 return targets
467         return None
468
469     def find_view(self, filename):
470         """Search through potential DAF filename patterns
471         """
472         if self.__view_map is None:
473             self.__view_map = self._get_filename_view_map()
474
475         results = []
476         for pattern, view in self.__view_map.items():
477             if re.match(pattern, filename):
478                 results.append(view)
479
480         if len(results) > 1:
481             msg = "%s matched multiple views %s" % (
482                 filename,
483                 [str(x) for x in results])
484             raise ModelException(msg)
485         elif len(results) == 1:
486             return results[0]
487         else:
488             return None
489
490     def get_view_name(self, view):
491         view_term = submissionOntology['view_name']
492         names = list(self.model.get_targets(view, view_term))
493         if len(names) == 1:
494             return fromTypedNode(names[0])
495         else:
496             msg = "Found wrong number of view names for {0} len = {1}"
497             msg = msg.format(str(view), len(names))
498             logger.error(msg)
499             raise RuntimeError(msg)
500
501     def _get_filename_view_map(self):
502         """Query our model for filename patterns
503
504         return a dictionary of compiled regular expressions to view names
505         """
506         filename_query = RDF.Statement(
507             None, dafTermOntology['filename_re'], None)
508
509         patterns = {}
510         for s in self.model.find_statements(filename_query):
511             view_name = s.subject
512             literal_re = s.object.literal_value['string']
513             logger.debug("Found: %s" % (literal_re,))
514             try:
515                 filename_re = re.compile(literal_re)
516             except re.error, e:
517                 logger.error("Unable to compile: %s" % (literal_re,))
518             patterns[literal_re] = view_name
519         return patterns
520
521     def _get_library_url(self):
522         return str(self.libraryNS[''].uri)
523
524     def _set_library_url(self, value):
525         self.libraryNS = RDF.NS(str(value))
526
527     library_url = property(_get_library_url, _set_library_url)
528
529     def _is_paired(self, libNode):
530         """Determine if a library is paired end"""
531         library_type = self._get_library_attribute(libNode, 'library_type')
532         if library_type is None:
533             errmsg = "%s doesn't have a library type"
534             raise ModelException(errmsg % (str(libNode),))
535
536         single = ['CSHL (lacking last nt)',
537                   'Single End (non-multiplexed)',
538                   'Small RNA (non-multiplexed)',]
539         paired = ['Barcoded Illumina',
540                   'Multiplexing',
541                   'Nextera',
542                   'Paired End (non-multiplexed)',]
543         if library_type in single:
544             return False
545         elif library_type in paired:
546             return True
547         else:
548             raise MetadataLookupException(
549                 "Unrecognized library type %s for %s" % \
550                 (library_type, str(libNode)))
551
552     def need_replicate(self):
553         viewTerm = dafTermOntology['views']
554         replicateTerm = dafTermOntology['hasReplicates']
555
556         views = self.model.get_targets(self.submissionSet, viewTerm)
557
558         for view in views:
559             replicate = self.model.get_target(view, replicateTerm)
560             if fromTypedNode(replicate):
561                 return True
562
563         return False
564
565 if __name__ == "__main__":
566     example_daf = """# Lab and general info
567 grant             Hardison
568 lab               Caltech-m
569 dataType          ChipSeq
570 variables         cell, antibody,sex,age,strain,control
571 compositeSuffix   CaltechHistone
572 assembly          mm9
573 dafVersion        2.0
574 validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
575
576 # Track/view definition
577 view             FastqRd1
578 longLabelPrefix  Caltech Fastq Read 1
579 type             fastq
580 hasReplicates    yes
581 required         no
582
583 view             Signal
584 longLabelPrefix  Caltech Histone Signal
585 type             bigWig
586 hasReplicates    yes
587 required         no
588 """
589     model = get_model()
590     example_daf_stream = StringIO(example_daf)
591     name = "test_rep"
592     mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)
593     dump_model(model)