Initial port to python3
[htsworkflow.git] / htsworkflow / submission / daf.py
1 """Parse UCSC DAF File
2 """
3 import logging
4 import os
5 from pprint import pformat
6 import re
7 import string
8 from io import StringIO
9 import types
10 import urllib.parse
11
12 import RDF
13 from htsworkflow.util.rdfhelp import \
14      blankOrUri, \
15      dafTermOntology, \
16      dump_model, \
17      get_model, \
18      libraryOntology, \
19      owlNS, \
20      rdfNS, \
21      submissionLog, \
22      submissionOntology, \
23      toTypedNode, \
24      fromTypedNode
25 from htsworkflow.util.hashfile import make_md5sum
26
27 LOGGER = logging.getLogger(__name__)
28
29 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
30 VARIABLES_TERM_NAME = 'variables'
31 DAF_PRE_VARIABLES = ['files', 'view']
32 DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
33
34
35 class ModelException(RuntimeError):
36     """Assumptions about the RDF model failed"""
37     pass
38
39
40 class MetadataLookupException(RuntimeError):
41     """Problem accessing metadata"""
42     pass
43
44
45 # STATES
46 DAF_HEADER = 1
47 DAF_VIEW = 2
48
49
50 def parse_into_model(model, subject, filename):
51     """Read a DAF into RDF Model
52
53     requires a subject node to attach statements to
54     """
55     attributes = parse(filename)
56     add_to_model(model, attributes, subject)
57
58
59 def fromstream_into_model(model, subject, daf_stream):
60     """Load daf stream into model attached to node subject
61     """
62     attributes = parse_stream(daf_stream)
63     add_to_model(model, attributes, subject)
64
65
66 def fromstring_into_model(model, subject, daf_string):
67     """Read a string containing a DAF into RDF Model
68
69     requires a short submission name
70     """
71     attributes = fromstring(daf_string)
72     add_to_model(model, attributes, subject)
73
74
75 def parse(filename):
76     """Parse daf from a file
77     """
78     stream = open(filename, 'r')
79     attributes = parse_stream(stream)
80     stream.close()
81     return attributes
82
83
84 def fromstring(daf_string):
85     """Parse UCSC daf from a provided string"""
86     stream = StringIO(daf_string)
87     return parse_stream(stream)
88
89
90 def parse_stream(stream):
91     """Parse UCSC dat stored in a stream"""
92     comment_re = re.compile("#.*$")
93
94     state = DAF_HEADER
95     attributes = {'views': {}}
96     view_name = None
97     view_attributes = {}
98     for line in stream:
99         #remove comments
100         line = comment_re.sub("", line)
101         nstop = _extract_name_index(line)
102         name = line[0:nstop]
103         sstop = _consume_whitespace(line, start=nstop)
104         vstop = _extract_value_index(line, start=sstop)
105         value = line[sstop:vstop]
106
107         if value.lower() in ('yes',):
108             value = True
109         elif value.lower() in ('no',):
110             value = False
111
112         if len(name) == 0:
113             if view_name is not None:
114                 attributes['views'][view_name] = view_attributes
115                 view_name = None
116                 view_attributes = {}
117             state = DAF_HEADER
118         elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
119             attributes[name] = [x.strip() for x in value.split(',')]
120         elif state == DAF_HEADER and name == 'view':
121             view_name = value
122             view_attributes['view'] = value
123             state = DAF_VIEW
124         elif state == DAF_HEADER:
125             attributes[name] = value
126         elif state == DAF_VIEW:
127             view_attributes[name] = value
128
129     # save last block
130     if view_name is not None:
131         attributes['views'][view_name] = view_attributes
132
133     LOGGER.debug("DAF Attributes" + pformat(attributes))
134     return attributes
135
136
137 def _consume_whitespace(line, start=0):
138     """return index of next non whitespace character
139
140     returns length of string if it can't find anything
141     """
142     for i in range(start, len(line)):
143         if line[i] not in string.whitespace:
144             return i
145
146     return len(line)
147
148
149 def _extract_name_index(line, start=0):
150     """Used to find end of word by looking for a whitespace character
151
152     returns length of string if nothing matches
153     """
154     for i in range(start, len(line)):
155         if line[i] in string.whitespace:
156             return i
157
158     return len(line)
159
160
161 def _extract_value_index(line, start=0):
162     """Returns position of last non-whitespace character
163     """
164     shortline = line.rstrip()
165     return len(shortline)
166
167
168 def convert_to_rdf_statements(attributes, subject):
169     """Convert dictionary of DAF attributes into rdf statements
170
171     The statements are attached to the provided subject node
172     """
173     variables_term = dafTermOntology[VARIABLES_TERM_NAME]
174     statements = []
175     for daf_key in attributes:
176         predicate = dafTermOntology[daf_key]
177         if daf_key == 'views':
178             statements.extend(_views_to_statements(subject,
179                                                    dafTermOntology,
180                                                    attributes[daf_key]))
181         elif daf_key in DAF_VARIABLE_NAMES:
182             for var in attributes.get(daf_key, []):
183                 obj = toTypedNode(var)
184                 statements.append(RDF.Statement(subject, variables_term, obj))
185         else:
186             value = attributes[daf_key]
187             obj = toTypedNode(value)
188             statements.append(RDF.Statement(subject, predicate, obj))
189
190     return statements
191
192
193 def _views_to_statements(subject, dafNS, views):
194     """Attach view attributes to new view nodes atached to provided subject
195     """
196     viewNS = get_view_namespace(subject)
197
198     statements = []
199     for view_name in views:
200         view_attributes = views[view_name]
201         viewSubject = viewNS[view_name]
202         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
203         statements.append(
204             RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
205         for view_attribute_name in view_attributes:
206             predicate = dafNS[view_attribute_name]
207             obj = toTypedNode(view_attributes[view_attribute_name])
208             statements.append(RDF.Statement(viewSubject, predicate, obj))
209
210         #statements.extend(convert_to_rdf_statements(view, viewNode))
211     return statements
212
213
214 def add_to_model(model, attributes, subject):
215     for statement in convert_to_rdf_statements(attributes, subject):
216         model.add_statement(statement)
217
218
219 def get_submission_uri(name):
220     return submissionLog[name].uri
221
222
223 def submission_uri_to_string(submission_uri):
224     if isinstance(submission_uri, RDF.Node):
225         submission_uri = str(submission_uri.uri)
226     elif isinstance(submission_uri, RDF.Uri):
227         submission_uri = str(submission_uri)
228     if submission_uri[-1] != '/':
229         submission_uri += '/'
230     return submission_uri
231
232
233 def get_view_namespace(submission_uri):
234     submission_uri = submission_uri_to_string(submission_uri)
235     view_uri = urllib.parse.urljoin(submission_uri, 'view/')
236     viewNS = RDF.NS(view_uri)
237     return viewNS
238
239
240 class UCSCSubmission(object):
241     """Build a submission by examining the DAF for what we need to submit
242     """
243     def __init__(self, name, daf_file=None, model=None):
244         """Construct a RDF backed model of a UCSC DAF
245
246         :args:
247           name (str): the name of this submission (used to construct DAF url)
248           daf_file (str, stream, or None):
249              if str, use as filename
250              if stream, parse as stream
251              if none, don't attempt to load the DAF into our model
252           model (RDF.Model or None):
253              if None, construct a memory backed model
254              otherwise specifies model to use
255         """
256         if daf_file is None and model is None:
257             LOGGER.error("We need a DAF or Model containing a DAF to work")
258
259         self.name = name
260         self.submissionSet = get_submission_uri(self.name)
261         self.viewNS = get_view_namespace(self.submissionSet)
262
263         if model is not None:
264             self.model = model
265         else:
266             self.model = get_model()
267
268         if hasattr(daf_file, 'next'):
269             # its some kind of stream
270             self.daf = daf_file.read()
271         else:
272             # file
273             stream = open(daf_file, 'r')
274             self.daf = stream.read()
275             stream.close()
276
277         fromstring_into_model(self.model, self.submissionSet, self.daf)
278
279         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
280         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
281         self.__view_map = None
282
283     def _get_daf_name(self):
284         return self.name + '.daf'
285     daf_name = property(_get_daf_name,doc="construct name for DAF file")
286
287     def add_pattern(self, view_name, filename_pattern):
288         """Map a filename regular expression to a view name
289         """
290         obj = toTypedNode(filename_pattern)
291         self.model.add_statement(
292             RDF.Statement(self.viewNS[view_name],
293                           dafTermOntology['filename_re'],
294                           obj))
295
296     def scan_submission_dirs(self, result_map):
297         """Examine files in our result directory
298         """
299         for lib_id, result_dir in list(result_map.items()):
300             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
301             try:
302                 self.import_submission_dir(result_dir, lib_id)
303             except MetadataLookupException as e:
304                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
305
306     def import_submission_dir(self, submission_dir, library_id):
307         """Import a submission directories and update our model as needed
308         """
309         #attributes = get_filename_attribute_map(paired)
310         libNode = self.libraryNS[library_id + "/"]
311
312         self._add_library_details_to_model(libNode)
313
314         submission_files = os.listdir(submission_dir)
315         for filename in submission_files:
316             self.construct_track_attributes(submission_dir, libNode, filename)
317
318     def construct_track_attributes(self, submission_dir, libNode, pathname):
319         """Looking for the best extension
320         The 'best' is the longest match
321
322         :Args:
323         filename (str): the filename whose extention we are about to examine
324         """
325         path, filename = os.path.split(pathname)
326
327         LOGGER.debug("Searching for view")
328         view = self.find_view(filename)
329         if view is None:
330             LOGGER.warn("Unrecognized file: {0}".format(pathname))
331             return None
332         if str(view) == str(libraryOntology['ignore']):
333             return None
334
335         submission_name = self.make_submission_name(submission_dir)
336         submissionNode = self.get_submission_node(submission_dir)
337         submission_uri = str(submissionNode.uri)
338         view_name = fromTypedNode(self.model.get_target(view,
339                                        dafTermOntology['name']))
340         if view_name is None:
341             errmsg = 'Could not find view name for {0}'
342             LOGGER.warning(errmsg.format(str(view)))
343             return
344
345         view_name = str(view_name)
346         submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
347
348         self.model.add_statement(
349             RDF.Statement(self.submissionSet,
350                           dafTermOntology['has_submission'],
351                           submissionNode))
352         LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
353         self.model.add_statement(RDF.Statement(submissionNode,
354                                                submissionOntology['has_view'],
355                                                submissionView))
356         self.model.add_statement(RDF.Statement(submissionNode,
357                                                submissionOntology['name'],
358                                                toTypedNode(submission_name)))
359         self.model.add_statement(
360             RDF.Statement(submissionNode,
361                           rdfNS['type'],
362                           submissionOntology['submission']))
363         self.model.add_statement(RDF.Statement(submissionNode,
364                                                libraryOntology['library'],
365                                                libNode))
366
367         LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
368         # add track specific information
369         self.model.add_statement(
370             RDF.Statement(submissionView, dafTermOntology['view'], view))
371         self.model.add_statement(
372             RDF.Statement(submissionView,
373                           dafTermOntology['paired'],
374                           toTypedNode(self._is_paired(libNode))))
375         self.model.add_statement(
376             RDF.Statement(submissionView,
377                           dafTermOntology['submission'],
378                           submissionNode))
379
380         # add file specific information
381         self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
382
383         LOGGER.debug("Done.")
384
385     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
386         # add file specific information
387         LOGGER.debug("Updating file md5sum")
388         submission_pathname = os.path.join(submission_dir, filename)
389         fileNode = RDF.Node(RDF.Uri("file://" + submission_pathname))
390         self.model.add_statement(
391             RDF.Statement(submissionView,
392                           dafTermOntology['has_file'],
393                           fileNode))
394         self.model.add_statement(
395             RDF.Statement(fileNode,
396                           dafTermOntology['filename'],
397                           filename))
398
399         md5 = make_md5sum(submission_pathname)
400         if md5 is None:
401             errmsg = "Unable to produce md5sum for {0}"
402             LOGGER.warning(errmsg.format(submission_pathname))
403         else:
404             self.model.add_statement(
405                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
406
407     def _add_library_details_to_model(self, libNode):
408         parser = RDF.Parser(name='rdfa')
409         new_statements = parser.parse_as_stream(libNode.uri)
410         for s in new_statements:
411             # don't override things we already have in the model
412             targets = list(self.model.get_targets(s.subject, s.predicate))
413             if len(targets) == 0:
414                 self.model.append(s)
415
416     def get_daf_variables(self):
417         """Returns simple variables names that to include in the ddf
418         """
419         variables_term = dafTermOntology[VARIABLES_TERM_NAME]
420         results = []
421         results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
422         results = DAF_PRE_VARIABLES[:]
423         if self.need_replicate() and 'replicate' not in results:
424             results.append('replicate')
425
426         for obj in self.model.get_targets(self.submissionSet, variables_term):
427             value = str(fromTypedNode(obj))
428             if value not in results:
429                 results.append(value)
430         results.extend([v for v in DAF_POST_VARIABLES if v not in results])
431         return results
432
433     def make_submission_name(self, submission_dir):
434         submission_dir = os.path.normpath(submission_dir)
435         submission_dir_name = os.path.split(submission_dir)[1]
436         if len(submission_dir_name) == 0:
437             raise RuntimeError(
438                 "Submission dir name too short: {0}".format(submission_dir))
439         return submission_dir_name
440
441     def get_submission_node(self, submission_dir):
442         """Convert a submission directory name to a submission node
443         """
444         submission_name = self.make_submission_name(submission_dir)
445         return self.submissionSetNS[submission_name]
446
447     def _get_library_attribute(self, libNode, attribute):
448         if not isinstance(attribute, RDF.Node):
449             attribute = libraryOntology[attribute]
450
451         targets = list(self.model.get_targets(libNode, attribute))
452         if len(targets) > 0:
453             return self._format_library_attribute(targets)
454         else:
455             return None
456
457         #targets = self._search_same_as(libNode, attribute)
458         #if targets is not None:
459         #    return self._format_library_attribute(targets)
460
461         # we don't know anything about this attribute
462         self._add_library_details_to_model(libNode)
463
464         targets = list(self.model.get_targets(libNode, attribute))
465         if len(targets) > 0:
466             return self._format_library_attribute(targets)
467
468         return None
469
470     def _format_library_attribute(self, targets):
471         if len(targets) == 0:
472             return None
473         elif len(targets) == 1:
474             return fromTypedNode(targets[0])
475         elif len(targets) > 1:
476             return [fromTypedNode(t) for t in targets]
477
478     def _search_same_as(self, subject, predicate):
479         # look for alternate names
480         other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
481         for other in other_predicates:
482             targets = list(self.model.get_targets(subject, other))
483             if len(targets) > 0:
484                 return targets
485         return None
486
487     def find_view(self, filename):
488         """Search through potential DAF filename patterns
489         """
490         if self.__view_map is None:
491             self.__view_map = self._get_filename_view_map()
492
493         results = []
494         for pattern, view in list(self.__view_map.items()):
495             if re.match(pattern, filename):
496                 results.append(view)
497
498         if len(results) > 1:
499             msg = "%s matched multiple views %s" % (
500                 filename,
501                 [str(x) for x in results])
502             raise ModelException(msg)
503         elif len(results) == 1:
504             return results[0]
505         else:
506             return None
507
508     def get_view_name(self, view):
509         view_term = submissionOntology['view_name']
510         names = list(self.model.get_targets(view, view_term))
511         if len(names) == 1:
512             return fromTypedNode(names[0])
513         else:
514             msg = "Found wrong number of view names for {0} len = {1}"
515             msg = msg.format(str(view), len(names))
516             LOGGER.error(msg)
517             raise RuntimeError(msg)
518
519     def _get_filename_view_map(self):
520         """Query our model for filename patterns
521
522         return a dictionary of compiled regular expressions to view names
523         """
524         filename_query = RDF.Statement(
525             None, dafTermOntology['filename_re'], None)
526
527         patterns = {}
528         for s in self.model.find_statements(filename_query):
529             view_name = s.subject
530             literal_re = s.object.literal_value['string']
531             LOGGER.debug("Found: %s" % (literal_re,))
532             try:
533                 filename_re = re.compile(literal_re)
534             except re.error as e:
535                 LOGGER.error("Unable to compile: %s" % (literal_re,))
536             patterns[literal_re] = view_name
537         return patterns
538
539     def _get_library_url(self):
540         return str(self.libraryNS[''].uri)
541
542     def _set_library_url(self, value):
543         self.libraryNS = RDF.NS(str(value))
544
545     library_url = property(_get_library_url, _set_library_url)
546
547     def _is_paired(self, libNode):
548         """Determine if a library is paired end"""
549         library_type = self._get_library_attribute(libNode, 'library_type')
550         if library_type is None:
551             errmsg = "%s doesn't have a library type"
552             raise ModelException(errmsg % (str(libNode),))
553
554         single = ['CSHL (lacking last nt)',
555                   'Single End (non-multiplexed)',
556                   'Small RNA (non-multiplexed)',]
557         paired = ['Barcoded Illumina',
558                   'Multiplexing',
559                   'Nextera',
560                   'Paired End (non-multiplexed)',]
561         if library_type in single:
562             return False
563         elif library_type in paired:
564             return True
565         else:
566             raise MetadataLookupException(
567                 "Unrecognized library type %s for %s" % \
568                 (library_type, str(libNode)))
569
570     def need_replicate(self):
571         viewTerm = dafTermOntology['views']
572         replicateTerm = dafTermOntology['hasReplicates']
573
574         views = self.model.get_targets(self.submissionSet, viewTerm)
575
576         for view in views:
577             replicate = self.model.get_target(view, replicateTerm)
578             if fromTypedNode(replicate):
579                 return True
580
581         return False
582
583
584     def link_daf(self, result_map):
585         if self.daf is None or len(self.daf) == 0:
586             raise RuntimeError(
587                 "DAF data does not exist, how can I link to it?")
588
589         base_daf = self.daf_name
590
591         for result_dir in list(result_map.values()):
592             if not os.path.exists(result_dir):
593                 raise RuntimeError(
594                     "Couldn't find target directory %s" %(result_dir,))
595             submission_daf = os.path.join(result_dir, base_daf)
596             if os.path.exists(submission_daf):
597                 previous_daf = open(submission_daf, 'r').read()
598                 if self.daf != previous_daf:
599                     LOGGER.info("Old daf is different, overwriting it.")
600             stream = open(submission_daf, 'w')
601             stream.write(self.daf)
602             stream.close()
603
604
605 if __name__ == "__main__":
606     example_daf = """# Lab and general info
607 grant             Hardison
608 lab               Caltech-m
609 dataType          ChipSeq
610 variables         cell, antibody,sex,age,strain,control
611 compositeSuffix   CaltechHistone
612 assembly          mm9
613 dafVersion        2.0
614 validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
615
616 # Track/view definition
617 view             FastqRd1
618 longLabelPrefix  Caltech Fastq Read 1
619 type             fastq
620 hasReplicates    yes
621 required         no
622
623 view             Signal
624 longLabelPrefix  Caltech Histone Signal
625 type             bigWig
626 hasReplicates    yes
627 required         no
628 """
629     model = get_model()
630     example_daf_stream = StringIO(example_daf)
631     name = "test_rep"
632     mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)
633     dump_model(model)
634
635