Initial port to python3
[htsworkflow.git] / htsworkflow / submission / daf.py
index cdc93129b5900ed1596af0953ece31465231c4be..51595931ff2fa7699109d6644214ebdd89d531d3 100644 (file)
@@ -1,31 +1,94 @@
 """Parse UCSC DAF File
 """
 import logging
+import os
+from pprint import pformat
 import re
 import string
-from StringIO import StringIO
+from io import StringIO
 import types
+import urllib.parse
 
-from htsworkflow.util.rdfhelp import blankOrUri, toTypedNode
+import RDF
+from htsworkflow.util.rdfhelp import \
+     blankOrUri, \
+     dafTermOntology, \
+     dump_model, \
+     get_model, \
+     libraryOntology, \
+     owlNS, \
+     rdfNS, \
+     submissionLog, \
+     submissionOntology, \
+     toTypedNode, \
+     fromTypedNode
+from htsworkflow.util.hashfile import make_md5sum
+
+LOGGER = logging.getLogger(__name__)
+
+DAF_VARIABLE_NAMES = ("variables", "extraVariables")
+VARIABLES_TERM_NAME = 'variables'
+DAF_PRE_VARIABLES = ['files', 'view']
+DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
+
+
+class ModelException(RuntimeError):
+    """Assumptions about the RDF model failed"""
+    pass
+
+
+class MetadataLookupException(RuntimeError):
+    """Problem accessing metadata"""
+    pass
 
-logger = logging.getLogger(__name__)
 
 # STATES
 DAF_HEADER = 1
 DAF_VIEW = 2
 
 
+def parse_into_model(model, subject, filename):
+    """Read a DAF into RDF Model
+
+    requires a subject node to attach statements to
+    """
+    attributes = parse(filename)
+    add_to_model(model, attributes, subject)
+
+
+def fromstream_into_model(model, subject, daf_stream):
+    """Load daf stream into model attached to node subject
+    """
+    attributes = parse_stream(daf_stream)
+    add_to_model(model, attributes, subject)
+
+
+def fromstring_into_model(model, subject, daf_string):
+    """Read a string containing a DAF into RDF Model
+
+    requires a short submission name
+    """
+    attributes = fromstring(daf_string)
+    add_to_model(model, attributes, subject)
+
+
 def parse(filename):
-    stream = open(filename,'r')
-    attributes =  parse_stream(stream)
+    """Parse daf from a file
+    """
+    stream = open(filename, 'r')
+    attributes = parse_stream(stream)
     stream.close()
-    return stream
+    return attributes
+
 
 def fromstring(daf_string):
+    """Parse UCSC daf from a provided string"""
     stream = StringIO(daf_string)
     return parse_stream(stream)
 
+
 def parse_stream(stream):
+    """Parse UCSC dat stored in a stream"""
     comment_re = re.compile("#.*$")
 
     state = DAF_HEADER
@@ -45,15 +108,15 @@ def parse_stream(stream):
             value = True
         elif value.lower() in ('no',):
             value = False
-            
+
         if len(name) == 0:
             if view_name is not None:
                 attributes['views'][view_name] = view_attributes
                 view_name = None
                 view_attributes = {}
             state = DAF_HEADER
-        elif state == DAF_HEADER and name == 'variables':
-            attributes[name] = [ x.strip() for x in value.split(',')]
+        elif state == DAF_HEADER and name in DAF_VARIABLE_NAMES:
+            attributes[name] = [x.strip() for x in value.split(',')]
         elif state == DAF_HEADER and name == 'view':
             view_name = value
             view_attributes['view'] = value
@@ -66,64 +129,507 @@ def parse_stream(stream):
     # save last block
     if view_name is not None:
         attributes['views'][view_name] = view_attributes
-        
+
+    LOGGER.debug("DAF Attributes" + pformat(attributes))
     return attributes
 
+
 def _consume_whitespace(line, start=0):
-    for i in xrange(start, len(line)):
+    """return index of next non whitespace character
+
+    returns length of string if it can't find anything
+    """
+    for i in range(start, len(line)):
         if line[i] not in string.whitespace:
             return i
-        
+
     return len(line)
 
+
 def _extract_name_index(line, start=0):
-    for i in xrange(start, len(line)):
+    """Used to find end of word by looking for a whitespace character
+
+    returns length of string if nothing matches
+    """
+    for i in range(start, len(line)):
         if line[i] in string.whitespace:
             return i
-        
+
     return len(line)
 
+
 def _extract_value_index(line, start=0):
+    """Returns position of last non-whitespace character
+    """
     shortline = line.rstrip()
     return len(shortline)
 
-try:
-    import RDF
-    def convert_to_rdf_statements(attributes, source=None):
-        ddfNS = RDF.NS("http://encodesubmit.ucsc.edu/pipeline/download_ddf#")
-    
-        subject = blankOrUri(source)
-        
-        statements = []
-        for name in attributes:
-            predicate = ddfNS[name]
-            if name == 'views':
-                predicate = ddfNS['views']
-                for view_name in attributes.get('views', []):
-                    view = attributes['views'][view_name]
-                    viewNode = RDF.Node()
-                    statements.append(RDF.Statement(subject, predicate, viewNode))
-                    statements.extend(convert_to_rdf_statements(view, viewNode))
-            elif name == 'variables':
-                predicate = ddfNS['variables']
-                for var in attributes.get('variables', []):
-                    obj = toTypedNode(var)
-                    statements.append(RDF.Statement(subject, predicate, obj))
-            else:
-                value = attributes[name]
-                obj = toTypedNode(value)
-                statements.append(RDF.Statement(subject,predicate,obj))
-    
-        return statements
-    
-    
-    def add_to_model(model, attributes, source=None):
-        for statement in convert_to_rdf_statements(attributes, source):
-            model.add_statement(statement)
-            
-except ImportError, e:
-    def convert_to_rdf_statements(attributes, source=None):
-        raise NotImplementedError("librdf not installed")
-    def add_to_model(model, attributes, source=None):
-        raise NotImplementedError("librdf not installed")
+
+def convert_to_rdf_statements(attributes, subject):
+    """Convert dictionary of DAF attributes into rdf statements
+
+    The statements are attached to the provided subject node
+    """
+    variables_term = dafTermOntology[VARIABLES_TERM_NAME]
+    statements = []
+    for daf_key in attributes:
+        predicate = dafTermOntology[daf_key]
+        if daf_key == 'views':
+            statements.extend(_views_to_statements(subject,
+                                                   dafTermOntology,
+                                                   attributes[daf_key]))
+        elif daf_key in DAF_VARIABLE_NAMES:
+            for var in attributes.get(daf_key, []):
+                obj = toTypedNode(var)
+                statements.append(RDF.Statement(subject, variables_term, obj))
+        else:
+            value = attributes[daf_key]
+            obj = toTypedNode(value)
+            statements.append(RDF.Statement(subject, predicate, obj))
+
+    return statements
+
+
+def _views_to_statements(subject, dafNS, views):
+    """Attach view attributes to new view nodes atached to provided subject
+    """
+    viewNS = get_view_namespace(subject)
+
+    statements = []
+    for view_name in views:
+        view_attributes = views[view_name]
+        viewSubject = viewNS[view_name]
+        statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
+        statements.append(
+            RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
+        for view_attribute_name in view_attributes:
+            predicate = dafNS[view_attribute_name]
+            obj = toTypedNode(view_attributes[view_attribute_name])
+            statements.append(RDF.Statement(viewSubject, predicate, obj))
+
+        #statements.extend(convert_to_rdf_statements(view, viewNode))
+    return statements
+
+
+def add_to_model(model, attributes, subject):
+    for statement in convert_to_rdf_statements(attributes, subject):
+        model.add_statement(statement)
+
+
+def get_submission_uri(name):
+    return submissionLog[name].uri
+
+
+def submission_uri_to_string(submission_uri):
+    if isinstance(submission_uri, RDF.Node):
+        submission_uri = str(submission_uri.uri)
+    elif isinstance(submission_uri, RDF.Uri):
+        submission_uri = str(submission_uri)
+    if submission_uri[-1] != '/':
+        submission_uri += '/'
+    return submission_uri
+
+
+def get_view_namespace(submission_uri):
+    submission_uri = submission_uri_to_string(submission_uri)
+    view_uri = urllib.parse.urljoin(submission_uri, 'view/')
+    viewNS = RDF.NS(view_uri)
+    return viewNS
+
+
+class UCSCSubmission(object):
+    """Build a submission by examining the DAF for what we need to submit
+    """
+    def __init__(self, name, daf_file=None, model=None):
+        """Construct a RDF backed model of a UCSC DAF
+
+        :args:
+          name (str): the name of this submission (used to construct DAF url)
+          daf_file (str, stream, or None):
+             if str, use as filename
+             if stream, parse as stream
+             if none, don't attempt to load the DAF into our model
+          model (RDF.Model or None):
+             if None, construct a memory backed model
+             otherwise specifies model to use
+        """
+        if daf_file is None and model is None:
+            LOGGER.error("We need a DAF or Model containing a DAF to work")
+
+        self.name = name
+        self.submissionSet = get_submission_uri(self.name)
+        self.viewNS = get_view_namespace(self.submissionSet)
+
+        if model is not None:
+            self.model = model
+        else:
+            self.model = get_model()
+
+        if hasattr(daf_file, 'next'):
+            # its some kind of stream
+            self.daf = daf_file.read()
+        else:
+            # file
+            stream = open(daf_file, 'r')
+            self.daf = stream.read()
+            stream.close()
+
+        fromstring_into_model(self.model, self.submissionSet, self.daf)
+
+        self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
+        self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
+        self.__view_map = None
+
+    def _get_daf_name(self):
+        return self.name + '.daf'
+    daf_name = property(_get_daf_name,doc="construct name for DAF file")
+
+    def add_pattern(self, view_name, filename_pattern):
+        """Map a filename regular expression to a view name
+        """
+        obj = toTypedNode(filename_pattern)
+        self.model.add_statement(
+            RDF.Statement(self.viewNS[view_name],
+                          dafTermOntology['filename_re'],
+                          obj))
+
+    def scan_submission_dirs(self, result_map):
+        """Examine files in our result directory
+        """
+        for lib_id, result_dir in list(result_map.items()):
+            LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
+            try:
+                self.import_submission_dir(result_dir, lib_id)
+            except MetadataLookupException as e:
+                LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
+
+    def import_submission_dir(self, submission_dir, library_id):
+        """Import a submission directories and update our model as needed
+        """
+        #attributes = get_filename_attribute_map(paired)
+        libNode = self.libraryNS[library_id + "/"]
+
+        self._add_library_details_to_model(libNode)
+
+        submission_files = os.listdir(submission_dir)
+        for filename in submission_files:
+            self.construct_track_attributes(submission_dir, libNode, filename)
+
+    def construct_track_attributes(self, submission_dir, libNode, pathname):
+        """Looking for the best extension
+        The 'best' is the longest match
+
+        :Args:
+        filename (str): the filename whose extention we are about to examine
+        """
+        path, filename = os.path.split(pathname)
+
+        LOGGER.debug("Searching for view")
+        view = self.find_view(filename)
+        if view is None:
+            LOGGER.warn("Unrecognized file: {0}".format(pathname))
+            return None
+        if str(view) == str(libraryOntology['ignore']):
+            return None
+
+        submission_name = self.make_submission_name(submission_dir)
+        submissionNode = self.get_submission_node(submission_dir)
+        submission_uri = str(submissionNode.uri)
+        view_name = fromTypedNode(self.model.get_target(view,
+                                       dafTermOntology['name']))
+        if view_name is None:
+            errmsg = 'Could not find view name for {0}'
+            LOGGER.warning(errmsg.format(str(view)))
+            return
+
+        view_name = str(view_name)
+        submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
+
+        self.model.add_statement(
+            RDF.Statement(self.submissionSet,
+                          dafTermOntology['has_submission'],
+                          submissionNode))
+        LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
+        self.model.add_statement(RDF.Statement(submissionNode,
+                                               submissionOntology['has_view'],
+                                               submissionView))
+        self.model.add_statement(RDF.Statement(submissionNode,
+                                               submissionOntology['name'],
+                                               toTypedNode(submission_name)))
+        self.model.add_statement(
+            RDF.Statement(submissionNode,
+                          rdfNS['type'],
+                          submissionOntology['submission']))
+        self.model.add_statement(RDF.Statement(submissionNode,
+                                               libraryOntology['library'],
+                                               libNode))
+
+        LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
+        # add track specific information
+        self.model.add_statement(
+            RDF.Statement(submissionView, dafTermOntology['view'], view))
+        self.model.add_statement(
+            RDF.Statement(submissionView,
+                          dafTermOntology['paired'],
+                          toTypedNode(self._is_paired(libNode))))
+        self.model.add_statement(
+            RDF.Statement(submissionView,
+                          dafTermOntology['submission'],
+                          submissionNode))
+
+        # add file specific information
+        self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
+
+        LOGGER.debug("Done.")
+
+    def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
+        # add file specific information
+        LOGGER.debug("Updating file md5sum")
+        submission_pathname = os.path.join(submission_dir, filename)
+        fileNode = RDF.Node(RDF.Uri("file://" + submission_pathname))
+        self.model.add_statement(
+            RDF.Statement(submissionView,
+                          dafTermOntology['has_file'],
+                          fileNode))
+        self.model.add_statement(
+            RDF.Statement(fileNode,
+                          dafTermOntology['filename'],
+                          filename))
+
+        md5 = make_md5sum(submission_pathname)
+        if md5 is None:
+            errmsg = "Unable to produce md5sum for {0}"
+            LOGGER.warning(errmsg.format(submission_pathname))
+        else:
+            self.model.add_statement(
+                RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
+
+    def _add_library_details_to_model(self, libNode):
+        parser = RDF.Parser(name='rdfa')
+        new_statements = parser.parse_as_stream(libNode.uri)
+        for s in new_statements:
+            # don't override things we already have in the model
+            targets = list(self.model.get_targets(s.subject, s.predicate))
+            if len(targets) == 0:
+                self.model.append(s)
+
+    def get_daf_variables(self):
+        """Returns simple variables names that to include in the ddf
+        """
+        variables_term = dafTermOntology[VARIABLES_TERM_NAME]
+        results = []
+        results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
+        results = DAF_PRE_VARIABLES[:]
+        if self.need_replicate() and 'replicate' not in results:
+            results.append('replicate')
+
+        for obj in self.model.get_targets(self.submissionSet, variables_term):
+            value = str(fromTypedNode(obj))
+            if value not in results:
+                results.append(value)
+        results.extend([v for v in DAF_POST_VARIABLES if v not in results])
+        return results
+
+    def make_submission_name(self, submission_dir):
+        submission_dir = os.path.normpath(submission_dir)
+        submission_dir_name = os.path.split(submission_dir)[1]
+        if len(submission_dir_name) == 0:
+            raise RuntimeError(
+                "Submission dir name too short: {0}".format(submission_dir))
+        return submission_dir_name
+
+    def get_submission_node(self, submission_dir):
+        """Convert a submission directory name to a submission node
+        """
+        submission_name = self.make_submission_name(submission_dir)
+        return self.submissionSetNS[submission_name]
+
+    def _get_library_attribute(self, libNode, attribute):
+        if not isinstance(attribute, RDF.Node):
+            attribute = libraryOntology[attribute]
+
+        targets = list(self.model.get_targets(libNode, attribute))
+        if len(targets) > 0:
+            return self._format_library_attribute(targets)
+        else:
+            return None
+
+        #targets = self._search_same_as(libNode, attribute)
+        #if targets is not None:
+        #    return self._format_library_attribute(targets)
+
+        # we don't know anything about this attribute
+        self._add_library_details_to_model(libNode)
+
+        targets = list(self.model.get_targets(libNode, attribute))
+        if len(targets) > 0:
+            return self._format_library_attribute(targets)
+
+        return None
+
+    def _format_library_attribute(self, targets):
+        if len(targets) == 0:
+            return None
+        elif len(targets) == 1:
+            return fromTypedNode(targets[0])
+        elif len(targets) > 1:
+            return [fromTypedNode(t) for t in targets]
+
+    def _search_same_as(self, subject, predicate):
+        # look for alternate names
+        other_predicates = self.model.get_targets(predicate, owlNS['sameAs'])
+        for other in other_predicates:
+            targets = list(self.model.get_targets(subject, other))
+            if len(targets) > 0:
+                return targets
+        return None
+
+    def find_view(self, filename):
+        """Search through potential DAF filename patterns
+        """
+        if self.__view_map is None:
+            self.__view_map = self._get_filename_view_map()
+
+        results = []
+        for pattern, view in list(self.__view_map.items()):
+            if re.match(pattern, filename):
+                results.append(view)
+
+        if len(results) > 1:
+            msg = "%s matched multiple views %s" % (
+                filename,
+                [str(x) for x in results])
+            raise ModelException(msg)
+        elif len(results) == 1:
+            return results[0]
+        else:
+            return None
+
+    def get_view_name(self, view):
+        view_term = submissionOntology['view_name']
+        names = list(self.model.get_targets(view, view_term))
+        if len(names) == 1:
+            return fromTypedNode(names[0])
+        else:
+            msg = "Found wrong number of view names for {0} len = {1}"
+            msg = msg.format(str(view), len(names))
+            LOGGER.error(msg)
+            raise RuntimeError(msg)
+
+    def _get_filename_view_map(self):
+        """Query our model for filename patterns
+
+        return a dictionary of compiled regular expressions to view names
+        """
+        filename_query = RDF.Statement(
+            None, dafTermOntology['filename_re'], None)
+
+        patterns = {}
+        for s in self.model.find_statements(filename_query):
+            view_name = s.subject
+            literal_re = s.object.literal_value['string']
+            LOGGER.debug("Found: %s" % (literal_re,))
+            try:
+                filename_re = re.compile(literal_re)
+            except re.error as e:
+                LOGGER.error("Unable to compile: %s" % (literal_re,))
+            patterns[literal_re] = view_name
+        return patterns
+
+    def _get_library_url(self):
+        return str(self.libraryNS[''].uri)
+
+    def _set_library_url(self, value):
+        self.libraryNS = RDF.NS(str(value))
+
+    library_url = property(_get_library_url, _set_library_url)
+
+    def _is_paired(self, libNode):
+        """Determine if a library is paired end"""
+        library_type = self._get_library_attribute(libNode, 'library_type')
+        if library_type is None:
+            errmsg = "%s doesn't have a library type"
+            raise ModelException(errmsg % (str(libNode),))
+
+        single = ['CSHL (lacking last nt)',
+                  'Single End (non-multiplexed)',
+                  'Small RNA (non-multiplexed)',]
+        paired = ['Barcoded Illumina',
+                  'Multiplexing',
+                  'Nextera',
+                  'Paired End (non-multiplexed)',]
+        if library_type in single:
+            return False
+        elif library_type in paired:
+            return True
+        else:
+            raise MetadataLookupException(
+                "Unrecognized library type %s for %s" % \
+                (library_type, str(libNode)))
+
+    def need_replicate(self):
+        viewTerm = dafTermOntology['views']
+        replicateTerm = dafTermOntology['hasReplicates']
+
+        views = self.model.get_targets(self.submissionSet, viewTerm)
+
+        for view in views:
+            replicate = self.model.get_target(view, replicateTerm)
+            if fromTypedNode(replicate):
+                return True
+
+        return False
+
+
+    def link_daf(self, result_map):
+        if self.daf is None or len(self.daf) == 0:
+            raise RuntimeError(
+                "DAF data does not exist, how can I link to it?")
+
+        base_daf = self.daf_name
+
+        for result_dir in list(result_map.values()):
+            if not os.path.exists(result_dir):
+                raise RuntimeError(
+                    "Couldn't find target directory %s" %(result_dir,))
+            submission_daf = os.path.join(result_dir, base_daf)
+            if os.path.exists(submission_daf):
+                previous_daf = open(submission_daf, 'r').read()
+                if self.daf != previous_daf:
+                    LOGGER.info("Old daf is different, overwriting it.")
+            stream = open(submission_daf, 'w')
+            stream.write(self.daf)
+            stream.close()
+
+
+if __name__ == "__main__":
+    example_daf = """# Lab and general info
+grant             Hardison
+lab               Caltech-m
+dataType          ChipSeq
+variables         cell, antibody,sex,age,strain,control
+compositeSuffix   CaltechHistone
+assembly          mm9
+dafVersion        2.0
+validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
+
+# Track/view definition
+view             FastqRd1
+longLabelPrefix  Caltech Fastq Read 1
+type             fastq
+hasReplicates    yes
+required         no
+
+view             Signal
+longLabelPrefix  Caltech Histone Signal
+type             bigWig
+hasReplicates    yes
+required         no
+"""
+    model = get_model()
+    example_daf_stream = StringIO(example_daf)
+    name = "test_rep"
+    mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)
+    dump_model(model)
+