Port daf to rdflib
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import collections
4 import logging
5 import os
6 from pprint import pformat
7 import re
8 import string
9 from six.moves import StringIO
10 import types
11 from six.moves import urllib
12
13 from rdflib import Graph, Literal, Namespace, URIRef
14 from rdflib.namespace import OWL, RDF
15
16 from htsworkflow.util.rdfns import (
17     libraryOntology,
18     submissionLog,
19     submissionOntology
20 )
21 from htsworkflow.util.rdfhelp import dump_model
22 from htsworkflow.util.rdfns import dafTermOntology
23 from htsworkflow.util.hashfile import make_md5sum
24
25 LOGGER = logging.getLogger(__name__)
26
27 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
28 VARIABLES_TERM_NAME = 'variables'
29 DAF_PRE_VARIABLES = ['files', 'view']
30 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
31
32
33 class ModelException(RuntimeError):
34     """Assumptions about the RDF model failed"""
35     pass
36
37
38 class MetadataLookupException(RuntimeError):
39     """Problem accessing metadata"""
40     pass
41
42
43 # STATES
44 DAF_HEADER = 1
45 DAF_VIEW = 2
46
47
48 def parse_into_model(model, subject, filename):
49     """Read a DAF into RDF Model
50
51     requires a subject node to attach statements to
52     """
53     attributes = parse(filename)
54     add_to_model(model, attributes, subject)
55
56
57 def fromstream_into_model(model, subject, daf_stream):
58     """Load daf stream into model attached to node subject
59     """
60     attributes = parse_stream(daf_stream)
61     add_to_model(model, attributes, subject)
62
63
64 def fromstring_into_model(model, subject, daf_string):
65     """Read a string containing a DAF into RDF Model
66
67     requires a short submission name
68     """
69     attributes = fromstring(daf_string)
70     add_to_model(model, attributes, subject)
71
72
73 def parse(filename):
74     """Parse daf from a file
75     """
76     stream = open(filename, 'r')
77     attributes = parse_stream(stream)
78     stream.close()
79     return attributes
80
81
82 def fromstring(daf_string):
83     """Parse UCSC daf from a provided string"""
84     stream = StringIO(daf_string)
85     return parse_stream(stream)
86
87
88 def parse_stream(stream):
89     """Parse UCSC dat stored in a stream"""
90     comment_re = re.compile("#.*$")
91
92     state = DAF_HEADER
93     attributes = {'views': {}}
94     view_name = None
95     view_attributes = {}
96     for line in stream:
97         #remove comments
98         line = comment_re.sub("", line)
99         nstop = _extract_name_index(line)
100         name = line[0:nstop]
101         sstop = _consume_whitespace(line, start=nstop)
102         vstop = _extract_value_index(line, start=sstop)
103         value = line[sstop:vstop]
104
105         if value.lower() in ('yes',):
106             value = True
107         elif value.lower() in ('no',):
108             value = False
109
110         if len(name) == 0:
111             if view_name is not None:
112                 attributes['views'][view_name] = view_attributes
113                 view_name = None
114                 view_attributes = {}
115             state = DAF_HEADER
116         elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
117             attributes[name] = [x.strip() for x in value.split(',')]
118         elif state == DAF_HEADER and name == 'view':
119             view_name = value
120             view_attributes['view'] = value
121             state = DAF_VIEW
122         elif state == DAF_HEADER:
123             attributes[name] = value
124         elif state == DAF_VIEW:
125             view_attributes[name] = value
126
127     # save last block
128     if view_name is not None:
129         attributes['views'][view_name] = view_attributes
130
131     LOGGER.debug("DAF Attributes" + pformat(attributes))
132     return attributes
133
134
135 def _consume_whitespace(line, start=0):
136     """return index of next non whitespace character
137
138     returns length of string if it can't find anything
139     """
140     for i, c in enumerate(line[start:]):
141         if c not in string.whitespace:
142             return i+start
143
144     return len(line)
145
146
147 def _extract_name_index(line, start=0):
148     """Used to find end of word by looking for a whitespace character
149
150     returns length of string if nothing matches
151     """
152     for i, c in enumerate(line[start:]):
153         if c in string.whitespace:
154             return i+start
155
156     return len(line)
157
158
159 def _extract_value_index(line, start=0):
160     """Returns position of last non-whitespace character
161     """
162     shortline = line.rstrip()
163     return len(shortline)
164
165
166 def convert_to_rdf_statements(attributes, subject):
167     """Convert dictionary of DAF attributes into rdf statements
168
169     The statements are attached to the provided subject node
170     """
171     variables_term = dafTermOntology[VARIABLES_TERM_NAME]
172     statements = []
173     for daf_key in attributes:
174         predicate = dafTermOntology[daf_key]
175         if daf_key == 'views':
176             statements.extend(_views_to_statements(subject,
177                                                    dafTermOntology,
178                                                    attributes[daf_key]))
179         elif daf_key in DAF_VARIABLE_NAMES:
180             for var in attributes.get(daf_key, []):
181                 obj = Literal(var)
182                 statements.append((subject, variables_term, obj))
183         else:
184             value = attributes[daf_key]
185             obj = Literal(value)
186             statements.append((subject, predicate, obj))
187
188     return statements
189
190
191 def _views_to_statements(subject, dafNS, views):
192     """Attach view attributes to new view nodes atached to provided subject
193     """
194     viewNS = get_view_namespace(subject)
195
196     statements = []
197     for view_name in views:
198         view_attributes = views[view_name]
199         viewSubject = viewNS[view_name]
200         statements.append((subject, dafNS['views'], viewSubject))
201         statements.append((viewSubject, dafNS['name'], Literal(view_name)))
202         for view_attribute_name in view_attributes:
203             predicate = dafNS[view_attribute_name]
204             obj = Literal(view_attributes[view_attribute_name])
205             statements.append((viewSubject, predicate, obj))
206
207         #statements.extend(convert_to_rdf_statements(view, viewNode))
208     return statements
209
210
211 def add_to_model(model, attributes, subject):
212     for statement in convert_to_rdf_statements(attributes, subject):
213         model.add(statement)
214
215
216 def get_submission_uri(name):
217     return submissionLog[name]
218
219
220 def submission_uri_to_string(submission_uri):
221     if isinstance(submission_uri, (Literal, URIRef)):
222         submission_uri = str(submission_uri)
223     if submission_uri[-1] != '/':
224         submission_uri += '/'
225     return submission_uri
226
227
228 def get_view_namespace(submission_uri):
229     submission_uri = submission_uri_to_string(submission_uri)
230     view_uri = urllib.parse.urljoin(submission_uri, 'view/')
231     viewNS = Namespace(view_uri)
232     return viewNS
233
234
235 class UCSCSubmission(object):
236     """Build a submission by examining the DAF for what we need to submit
237     """
238     def __init__(self, name, daf_file=None, model=None):
239         """Construct a RDF backed model of a UCSC DAF
240
241         :args:
242           name (str): the name of this submission (used to construct DAF url)
243           daf_file (str, stream, or None):
244              if str, use as filename
245              if stream, parse as stream
246              if none, don't attempt to load the DAF into our model
247           model (RDF.Model or None):
248              if None, construct a memory backed model
249              otherwise specifies model to use
250         """
251         if daf_file is None and model is None:
252             LOGGER.error("We need a DAF or Model containing a DAF to work")
253
254         self.name = name
255         self.submissionSet = get_submission_uri(self.name)
256         self.viewNS = get_view_namespace(self.submissionSet)
257
258         if model is not None:
259             self.model = model
260         else:
261             self.model = get_model()
262
263         if isinstance(daf_file, collections.Iterable):
264             # its some kind of stream
265             self.daf = daf_file.read()
266         else:
267             # file
268             stream = open(daf_file, 'rt')
269             self.daf = stream.read()
270             stream.close()
271
272         fromstring_into_model(self.model, self.submissionSet, self.daf)
273
274         self.libraryNS = Namespace('http://jumpgate.caltech.edu/library/')
275         self.submissionSetNS = Namespace(str(self.submissionSet) + '/')
276         self.__view_map = None
277
278     def _get_daf_name(self):
279         return self.name + '.daf'
280     daf_name = property(_get_daf_name,doc="construct name for DAF file")
281
282     def add_pattern(self, view_name, filename_pattern):
283         """Map a filename regular expression to a view name
284         """
285         obj = Literal(filename_pattern)
286         self.model.add(
287             (self.viewNS[view_name],
288              dafTermOntology['filename_re'],
289              obj))
290
291     def scan_submission_dirs(self, result_map):
292         """Examine files in our result directory
293         """
294         for lib_id, result_dir in result_map.items():
295             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
296             try:
297                 self.import_submission_dir(result_dir, lib_id)
298             except MetadataLookupException as e:
299                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
300
301     def import_submission_dir(self, submission_dir, library_id):
302         """Import a submission directories and update our model as needed
303         """
304         #attributes = get_filename_attribute_map(paired)
305         libNode = self.libraryNS[library_id + "/"]
306
307         self._add_library_details_to_model(libNode)
308
309         submission_files = os.listdir(submission_dir)
310         for filename in submission_files:
311             self.construct_track_attributes(submission_dir, libNode, filename)
312
313     def construct_track_attributes(self, submission_dir, libNode, pathname):
314         """Looking for the best extension
315         The 'best' is the longest match
316
317         :Args:
318         filename (str): the filename whose extention we are about to examine
319         """
320         path, filename = os.path.split(pathname)
321
322         LOGGER.debug("Searching for view")
323         view = self.find_view(filename)
324         if view is None:
325             LOGGER.warn("Unrecognized file: {0}".format(pathname))
326             return None
327         if str(view) == str(libraryOntology['ignore']):
328             return None
329
330         submission_name = self.make_submission_name(submission_dir)
331         submissionNode = self.get_submission_node(submission_dir)
332         submission_uri = str(submissionNode)
333         view_name = list(self.model.objects(view, dafTermOntology['name']))
334         if len(view_name) == 0:
335             errmsg = 'Could not find view name for {0}'
336             LOGGER.warning(errmsg.format(str(view)))
337             return
338
339         view_name = str(view_name[0])
340         submissionView = URIRef(submission_uri + '/' + view_name)
341
342         self.model.add((self.submissionSet,
343                         dafTermOntology['has_submission'],
344                         submissionNode))
345         LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
346         self.model.add((submissionNode,
347                         submissionOntology['has_view'],
348                         submissionView))
349         self.model.add((submissionNode,
350                         submissionOntology['name'],
351                         Literal(submission_name)))
352         self.model.add((submissionNode,
353                         RDF['type'],
354                         submissionOntology['submission']))
355         self.model.add((submissionNode,
356                         libraryOntology['library'],
357                         libNode))
358
359         LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
360         # add track specific information
361         self.model.add((submissionView, dafTermOntology['view'], view))
362         self.model.add((submissionView,
363                         dafTermOntology['paired'],
364                         Literal(self._is_paired(libNode))))
365         self.model.add((submissionView,
366                         dafTermOntology['submission'],
367                         submissionNode))
368
369         # add file specific information
370         self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
371
372         LOGGER.debug("Done.")
373
374     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
375         # add file specific information
376         LOGGER.debug("Updating file md5sum")
377         submission_pathname = os.path.join(submission_dir, filename)
378         fileNode = URIRef("file://" + submission_pathname)
379         self.model.add((submissionView,
380                         dafTermOntology['has_file'],
381                         fileNode))
382         self.model.add((fileNode,
383                         dafTermOntology['filename'],
384                         Literal(filename)))
385
386         md5 = make_md5sum(submission_pathname)
387         if md5 is None:
388             errmsg = "Unable to produce md5sum for {0}"
389             LOGGER.warning(errmsg.format(submission_pathname))
390         else:
391             self.model.add((fileNode, dafTermOntology['md5sum'], Literal(md5)))
392
393     def _add_library_details_to_model(self, libNode):
394         tmpmodel = Graph()
395         tmpmodel.parse(source=libNode, format='rdfa')
396         for s in tmpmodel:
397             # don't override things we already have in the model
398             targets = list(self.model.objects(s[0], s[1]))
399             if len(targets) == 0:
400                 self.model.add(s)
401
402     def get_daf_variables(self):
403         """Returns simple variables names that to include in the ddf
404         """
405         variables_term = dafTermOntology[VARIABLES_TERM_NAME]
406         results = []
407         results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
408         results = DAF_PRE_VARIABLES[:]
409         if self.need_replicate() and 'replicate' not in results:
410             results.append('replicate')
411
412         for obj in self.model.objects(self.submissionSet, variables_term):
413             value = obj.toPython()
414             if value not in results:
415                 results.append(value)
416         results.extend([v for v in DAF_POST_VARIABLES if v not in results])
417         return results
418
419     def make_submission_name(self, submission_dir):
420         submission_dir = os.path.normpath(submission_dir)
421         submission_dir_name = os.path.split(submission_dir)[1]
422         if len(submission_dir_name) == 0:
423             raise RuntimeError(
424                 "Submission dir name too short: {0}".format(submission_dir))
425         return submission_dir_name
426
427     def get_submission_node(self, submission_dir):
428         """Convert a submission directory name to a submission node
429         """
430         submission_name = self.make_submission_name(submission_dir)
431         return self.submissionSetNS[submission_name]
432
433     def _get_library_attribute(self, libNode, attribute):
434         if not isinstance(attribute, (Literal, URIRef)):
435             attribute = libraryOntology[attribute]
436
437         targets = list(self.model.objects(libNode, attribute))
438         if len(targets) > 0:
439             return self._format_library_attribute(targets)
440         else:
441             return None
442
443         #targets = self._search_same_as(libNode, attribute)
444         #if targets is not None:
445         #    return self._format_library_attribute(targets)
446
447         # we don't know anything about this attribute
448         self._add_library_details_to_model(libNode)
449
450         targets = list(self.model.objects(libNode, attribute))
451         if len(targets) > 0:
452             return self._format_library_attribute(targets)
453
454         return None
455
456     def _format_library_attribute(self, targets):
457         if len(targets) == 0:
458             return None
459         elif len(targets) == 1:
460             return targets[0].toPython()
461         elif len(targets) > 1:
462             return [t.toPython() for t in targets]
463
464     def _search_same_as(self, subject, predicate):
465         # look for alternate names
466         other_predicates = self.model.objects(predicate, OWL['sameAs'])
467         for other in other_predicates:
468             targets = list(self.model.objects(subject, other))
469             if len(targets) > 0:
470                 return targets
471         return None
472
473     def find_view(self, filename):
474         """Search through potential DAF filename patterns
475         """
476         if self.__view_map is None:
477             self.__view_map = self._get_filename_view_map()
478
479         results = []
480         for pattern, view in self.__view_map.items():
481             if re.match(pattern, filename):
482                 results.append(view)
483
484         if len(results) > 1:
485             msg = "%s matched multiple views %s" % (
486                 filename,
487                 [str(x) for x in results])
488             raise ModelException(msg)
489         elif len(results) == 1:
490             return results[0]
491         else:
492             return None
493
494     def get_view_name(self, view):
495         view_term = submissionOntology['view_name']
496         names = list(self.model.objects(view, view_term))
497         if len(names) == 1:
498             return names[0].toPython()
499         else:
500             msg = "Found wrong number of view names for {0} len = {1}"
501             msg = msg.format(str(view), len(names))
502             LOGGER.error(msg)
503             raise RuntimeError(msg)
504
505     def _get_filename_view_map(self):
506         """Query our model for filename patterns
507
508         return a dictionary of compiled regular expressions to view names
509         """
510         filename_query = (None, dafTermOntology['filename_re'], None)
511
512         patterns = {}
513         for s in self.model.triples(filename_query):
514             view_name = s[0]
515             literal_re = s[2].value
516             LOGGER.debug("Found: %s" % (literal_re,))
517             try:
518                 filename_re = re.compile(literal_re)
519             except re.error as e:
520                 LOGGER.error("Unable to compile: %s" % (literal_re,))
521             patterns[literal_re] = view_name
522         return patterns
523
524     def _get_library_url(self):
525         return str(self.libraryNS[''])
526
527     def _set_library_url(self, value):
528         self.libraryNS = Namespace(str(value))
529
530     library_url = property(_get_library_url, _set_library_url)
531
532     def _is_paired(self, libNode):
533         """Determine if a library is paired end"""
534         library_type = self._get_library_attribute(libNode, 'library_type')
535         if library_type is None:
536             errmsg = "%s doesn't have a library type"
537             raise ModelException(errmsg % (str(libNode),))
538
539         single = ['CSHL (lacking last nt)',
540                   'Single End (non-multiplexed)',
541                   'Small RNA (non-multiplexed)',]
542         paired = ['Barcoded Illumina',
543                   'Multiplexing',
544                   'Nextera',
545                   'Paired End (non-multiplexed)',]
546         if library_type in single:
547             return False
548         elif library_type in paired:
549             return True
550         else:
551             raise MetadataLookupException(
552                 "Unrecognized library type %s for %s" % \
553                 (library_type, str(libNode)))
554
555     def need_replicate(self):
556         viewTerm = dafTermOntology['views']
557         replicateTerm = dafTermOntology['hasReplicates']
558
559         views = self.model.objects(self.submissionSet, viewTerm)
560         for view in views:
561             replicate = list(self.model.objects(view, replicateTerm))
562             if len(replicate) > 0 and replicate[0].toPython():
563                 return True
564
565         return False
566
567
568     def link_daf(self, result_map):
569         if self.daf is None or len(self.daf) == 0:
570             raise RuntimeError(
571                 "DAF data does not exist, how can I link to it?")
572
573         base_daf = self.daf_name
574
575         for result_dir in result_map.values():
576             if not os.path.exists(result_dir):
577                 raise RuntimeError(
578                     "Couldn't find target directory %s" %(result_dir,))
579             submission_daf = os.path.join(result_dir, base_daf)
580             if os.path.exists(submission_daf):
581                 previous_daf = open(submission_daf, 'r').read()
582                 if self.daf != previous_daf:
583                     LOGGER.info("Old daf is different, overwriting it.")
584             stream = open(submission_daf, 'w')
585             stream.write(self.daf)
586             stream.close()
587
588
589 if __name__ == "__main__":
590     example_daf = """# Lab and general info
591 grant             Hardison
592 lab               Caltech-m
593 dataType          ChipSeq
594 variables         cell, antibody,sex,age,strain,control
595 compositeSuffix   CaltechHistone
596 assembly          mm9
597 dafVersion        2.0
598 validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
599
600 # Track/view definition
601 view             FastqRd1
602 longLabelPrefix  Caltech Fastq Read 1
603 type             fastq
604 hasReplicates    yes
605 required         no
606
607 view             Signal
608 longLabelPrefix  Caltech Histone Signal
609 type             bigWig
610 hasReplicates    yes
611 required         no
612 """
613     model = get_model()
614     example_daf_stream = StringIO(example_daf)
615     name = "test_rep"
616     mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)
617     dump_model(model)
618
619