Start developing GEO SOFT submission tool.
[htsworkflow.git] / htsworkflow / submission / submission.py
diff --git a/htsworkflow/submission/submission.py b/htsworkflow/submission/submission.py
new file mode 100644 (file)
index 0000000..98c25d5
--- /dev/null
@@ -0,0 +1,256 @@
+"""Common submission elements
+"""
+import logging
+import os
+import re
+
+import RDF
+
+from htsworkflow.util.rdfhelp import \
+     blankOrUri, \
+     dafTermOntology, \
+     dump_model, \
+     get_model, \
+     libraryOntology, \
+     owlNS, \
+     rdfNS, \
+     submissionLog, \
+     submissionOntology, \
+     toTypedNode, \
+     fromTypedNode
+from htsworkflow.util.hashfile import make_md5sum
+
+from htsworkflow.submission.daf import \
+     MetadataLookupException, \
+     get_submission_uri
+
+logger = logging.getLogger(__name__)
+
+class Submission(object):
+    def __init__(self, name, model):
+        self.name = name
+        self.model = model
+
+        self.submissionSet = get_submission_uri(self.name)
+        self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
+        self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
+
+        self.__view_map = None
+
+    def scan_submission_dirs(self, result_map):
+        """Examine files in our result directory
+        """
+        for lib_id, result_dir in result_map.items():
+            logger.info("Importing %s from %s" % (lib_id, result_dir))
+            try:
+                self.import_analysis_dir(result_dir, lib_id)
+            except MetadataLookupException, e:
+                logger.error("Skipping %s: %s" % (lib_id, str(e)))
+
+    def import_analysis_dir(self, analysis_dir, library_id):
+        """Import a submission directories and update our model as needed
+        """
+        #attributes = get_filename_attribute_map(paired)
+        libNode = self.libraryNS[library_id + "/"]
+
+        self._add_library_details_to_model(libNode)
+
+        submission_files = os.listdir(analysis_dir)
+        for filename in submission_files:
+            self.construct_file_attributes(analysis_dir, libNode, filename)
+
+    def construct_file_attributes(self, analysis_dir, libNode, pathname):
+        """Looking for the best extension
+        The 'best' is the longest match
+
+        :Args:
+        filename (str): the filename whose extention we are about to examine
+        """
+        path, filename = os.path.split(pathname)
+
+        logger.debug("Searching for view")
+        file_classification = self.find_best_match(filename)
+        if file_classification is None:
+            logger.warn("Unrecognized file: {0}".format(pathname))
+            return None
+        if str(file_classification) == str(libraryOntology['ignore']):
+            return None
+
+        an_analysis_name = self.make_submission_name(analysis_dir)
+        an_analysis = self.get_submission_node(analysis_dir)
+        an_analysis_uri = str(an_analysis.uri)
+
+        self.model.add_statement(RDF.Statement(an_analysis,
+                                               submissionOntology['name'],
+                                               toTypedNode(an_analysis_name)))
+        self.model.add_statement(
+            RDF.Statement(an_analysis,
+                          rdfNS['type'],
+                          submissionOntology['submission']))
+        self.model.add_statement(RDF.Statement(an_analysis,
+                                               submissionOntology['library'],
+                                               libNode))
+
+        logger.debug("Adding statements to {0}".format(str(an_analysis)))
+        # add track specific information
+        self.model.add_statement(
+            RDF.Statement(an_analysis,
+                          dafTermOntology['paired'],
+                          toTypedNode(self._is_paired(libNode))))
+        self.model.add_statement(
+            RDF.Statement(an_analysis,
+                          dafTermOntology['submission'],
+                          an_analysis))
+
+        # add file specific information
+        fileNode = self.link_file_to_classes(filename,
+                                             an_analysis,
+                                             an_analysis_uri,
+                                             analysis_dir)
+        self.add_md5s(filename, fileNode, analysis_dir)
+
+        logger.debug("Done.")
+
+    def link_file_to_classes(self, filename, submissionNode, submission_uri, analysis_dir):
+        # add file specific information
+        fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
+        self.model.add_statement(
+            RDF.Statement(submissionNode,
+                          dafTermOntology['has_file'],
+                          fileNode))
+        self.model.add_statement(
+            RDF.Statement(fileNode,
+                          dafTermOntology['filename'],
+                          filename))
+        return fileNode
+
+    def add_md5s(self, filename, fileNode, analysis_dir):
+        logger.debug("Updating file md5sum")
+        submission_pathname = os.path.join(analysis_dir, filename)
+        md5 = make_md5sum(submission_pathname)
+        if md5 is None:
+            errmsg = "Unable to produce md5sum for {0}"
+            logger.warning(errmsg.format(submission_pathname))
+        else:
+            self.model.add_statement(
+                RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
+
+    def _add_library_details_to_model(self, libNode):
+        parser = RDF.Parser(name='rdfa')
+        new_statements = parser.parse_as_stream(libNode.uri)
+        for s in new_statements:
+            # don't override things we already have in the model
+            targets = list(self.model.get_targets(s.subject, s.predicate))
+            if len(targets) == 0:
+                self.model.append(s)
+
+
+    def find_best_match(self, filename):
+        """Search through potential filename matching patterns
+        """
+        if self.__view_map is None:
+            self.__view_map = self._get_filename_view_map()
+
+        results = []
+        for pattern, view in self.__view_map.items():
+            if re.match(pattern, filename):
+                results.append(view)
+
+        if len(results) > 1:
+            msg = "%s matched multiple views %s" % (
+                filename,
+                [str(x) for x in results])
+            raise ModelException(msg)
+        elif len(results) == 1:
+            return results[0]
+        else:
+            return None
+
+    def _get_filename_view_map(self):
+        """Query our model for filename patterns
+
+        return a dictionary of compiled regular expressions to view names
+        """
+        filename_query = RDF.Statement(
+            None, dafTermOntology['filename_re'], None)
+
+        patterns = {}
+        for s in self.model.find_statements(filename_query):
+            view_name = s.subject
+            literal_re = s.object.literal_value['string']
+            logger.debug("Found: %s" % (literal_re,))
+            try:
+                filename_re = re.compile(literal_re)
+            except re.error, e:
+                logger.error("Unable to compile: %s" % (literal_re,))
+            patterns[literal_re] = view_name
+        return patterns
+
+    def make_submission_name(self, analysis_dir):
+        analysis_dir = os.path.normpath(analysis_dir)
+        analysis_dir_name = os.path.split(analysis_dir)[1]
+        if len(analysis_dir_name) == 0:
+            raise RuntimeError(
+                "Submission dir name too short: {0}".format(analysis_dir))
+        return analysis_dir_name
+
+    def get_submission_node(self, analysis_dir):
+        """Convert a submission directory name to a submission node
+        """
+        submission_name = self.make_submission_name(analysis_dir)
+        return self.submissionSetNS[submission_name]
+
+    def _get_library_attribute(self, libNode, attribute):
+        if not isinstance(attribute, RDF.Node):
+            attribute = libraryOntology[attribute]
+
+        targets = list(self.model.get_targets(libNode, attribute))
+        if len(targets) > 0:
+            return self._format_library_attribute(targets)
+        else:
+            return None
+
+        #targets = self._search_same_as(libNode, attribute)
+        #if targets is not None:
+        #    return self._format_library_attribute(targets)
+
+        # we don't know anything about this attribute
+        self._add_library_details_to_model(libNode)
+
+        targets = list(self.model.get_targets(libNode, attribute))
+        if len(targets) > 0:
+            return self._format_library_attribute(targets)
+
+        return None
+
+    def _format_library_attribute(self, targets):
+        if len(targets) == 0:
+            return None
+        elif len(targets) == 1:
+            return fromTypedNode(targets[0])
+        elif len(targets) > 1:
+            return [fromTypedNode(t) for t in targets]
+
+    def _is_paired(self, libNode):
+        """Determine if a library is paired end"""
+        library_type = self._get_library_attribute(libNode, 'library_type')
+        if library_type is None:
+            errmsg = "%s doesn't have a library type"
+            raise ModelException(errmsg % (str(libNode),))
+
+        single = ['CSHL (lacking last nt)',
+                  'Single End (non-multiplexed)',
+                  'Small RNA (non-multiplexed)',]
+        paired = ['Barcoded Illumina',
+                  'Multiplexing',
+                  'Nextera',
+                  'Paired End (non-multiplexed)',]
+        if library_type in single:
+            return False
+        elif library_type in paired:
+            return True
+        else:
+            raise MetadataLookupException(
+                "Unrecognized library type %s for %s" % \
+                (library_type, str(libNode)))
+