Progress using rdf model to link fastqs with flowcell/lib metadata.
[htsworkflow.git] / htsworkflow / submission / daf.py
index 7b17f6931a6a291b15173aa732806f9561ccfdfd..f04ac8fe5328e738a012738a5d256b9307e11504 100644 (file)
@@ -2,6 +2,7 @@
 """
 import logging
 import os
+from pprint import pformat
 import re
 import string
 from StringIO import StringIO
@@ -12,6 +13,7 @@ import RDF
 from htsworkflow.util.rdfhelp import \
      blankOrUri, \
      dafTermOntology, \
+     dump_model, \
      get_model, \
      libraryOntology, \
      owlNS, \
@@ -22,10 +24,13 @@ from htsworkflow.util.rdfhelp import \
      fromTypedNode
 from htsworkflow.util.hashfile import make_md5sum
 
-logger = logging.getLogger(__name__)
+LOGGER = logging.getLogger(__name__)
 
 DAF_VARIABLE_NAMES = ("variables", "extraVariables")
 VARIABLES_TERM_NAME = 'variables'
+DAF_PRE_VARIABLES = ['files', 'view']
+DAF_POST_VARIABLES = [ 'labExpId', 'md5sum']
+
 
 class ModelException(RuntimeError):
     """Assumptions about the RDF model failed"""
@@ -125,6 +130,7 @@ def parse_stream(stream):
     if view_name is not None:
         attributes['views'][view_name] = view_attributes
 
+    LOGGER.debug("DAF Attributes" + pformat(attributes))
     return attributes
 
 
@@ -231,8 +237,8 @@ def get_view_namespace(submission_uri):
     return viewNS
 
 
-class DAFMapper(object):
-    """Convert filenames to views in the UCSC Daf
+class UCSCSubmission(object):
+    """Build a submission by examining the DAF for what we need to submit
     """
     def __init__(self, name, daf_file=None, model=None):
         """Construct a RDF backed model of a UCSC DAF
@@ -248,7 +254,7 @@ class DAFMapper(object):
              otherwise specifies model to use
         """
         if daf_file is None and model is None:
-            logger.error("We need a DAF or Model containing a DAF to work")
+            LOGGER.error("We need a DAF or Model containing a DAF to work")
 
         self.name = name
         self.submissionSet = get_submission_uri(self.name)
@@ -261,15 +267,23 @@ class DAFMapper(object):
 
         if hasattr(daf_file, 'next'):
             # its some kind of stream
-            fromstream_into_model(self.model, self.submissionSet, daf_file)
+            self.daf = daf_file.read()
         else:
             # file
-            parse_into_model(self.model, self.submissionSet, daf_file)
+            stream = open(daf_file, 'r')
+            self.daf = stream.read()
+            stream.close()
+
+        fromstring_into_model(self.model, self.submissionSet, self.daf)
 
         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
         self.__view_map = None
 
+    def _get_daf_name(self):
+        return self.name + '.daf'
+    daf_name = property(_get_daf_name,doc="construct name for DAF file")
+
     def add_pattern(self, view_name, filename_pattern):
         """Map a filename regular expression to a view name
         """
@@ -279,6 +293,16 @@ class DAFMapper(object):
                           dafTermOntology['filename_re'],
                           obj))
 
+    def scan_submission_dirs(self, result_map):
+        """Examine files in our result directory
+        """
+        for lib_id, result_dir in result_map.items():
+            LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
+            try:
+                self.import_submission_dir(result_dir, lib_id)
+            except MetadataLookupException, e:
+                LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
+
     def import_submission_dir(self, submission_dir, library_id):
         """Import a submission directories and update our model as needed
         """
@@ -300,10 +324,10 @@ class DAFMapper(object):
         """
         path, filename = os.path.split(pathname)
 
-        logger.debug("Searching for view")
+        LOGGER.debug("Searching for view")
         view = self.find_view(filename)
         if view is None:
-            logger.warn("Unrecognized file: {0}".format(pathname))
+            LOGGER.warn("Unrecognized file: {0}".format(pathname))
             return None
         if str(view) == str(libraryOntology['ignore']):
             return None
@@ -315,7 +339,7 @@ class DAFMapper(object):
                                        dafTermOntology['name']))
         if view_name is None:
             errmsg = 'Could not find view name for {0}'
-            logging.warning(errmsg.format(str(view)))
+            LOGGER.warning(errmsg.format(str(view)))
             return
 
         view_name = str(view_name)
@@ -325,7 +349,7 @@ class DAFMapper(object):
             RDF.Statement(self.submissionSet,
                           dafTermOntology['has_submission'],
                           submissionNode))
-        logger.debug("Adding statements to {0}".format(str(submissionNode)))
+        LOGGER.debug("Adding statements to {0}".format(str(submissionNode)))
         self.model.add_statement(RDF.Statement(submissionNode,
                                                submissionOntology['has_view'],
                                                submissionView))
@@ -337,10 +361,10 @@ class DAFMapper(object):
                           rdfNS['type'],
                           submissionOntology['submission']))
         self.model.add_statement(RDF.Statement(submissionNode,
-                                               submissionOntology['library'],
+                                               libraryOntology['library'],
                                                libNode))
 
-        logger.debug("Adding statements to {0}".format(str(submissionView)))
+        LOGGER.debug("Adding statements to {0}".format(str(submissionView)))
         # add track specific information
         self.model.add_statement(
             RDF.Statement(submissionView, dafTermOntology['view'], view))
@@ -353,22 +377,16 @@ class DAFMapper(object):
                           dafTermOntology['submission'],
                           submissionNode))
 
-        # extra information
-        terms = [dafTermOntology['type'],
-                 dafTermOntology['filename_re'],
-                 ]
-        terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
-
         # add file specific information
         self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
 
-        logger.debug("Done.")
+        LOGGER.debug("Done.")
 
     def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
         # add file specific information
-        logger.debug("Updating file md5sum")
-        fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
+        LOGGER.debug("Updating file md5sum")
         submission_pathname = os.path.join(submission_dir, filename)
+        fileNode = RDF.Node(RDF.Uri("file://" + submission_pathname))
         self.model.add_statement(
             RDF.Statement(submissionView,
                           dafTermOntology['has_file'],
@@ -381,7 +399,7 @@ class DAFMapper(object):
         md5 = make_md5sum(submission_pathname)
         if md5 is None:
             errmsg = "Unable to produce md5sum for {0}"
-            logging.warning(errmsg.format(submission_pathname))
+            LOGGER.warning(errmsg.format(submission_pathname))
         else:
             self.model.add_statement(
                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
@@ -399,14 +417,17 @@ class DAFMapper(object):
         """Returns simple variables names that to include in the ddf
         """
         variables_term = dafTermOntology[VARIABLES_TERM_NAME]
-        results = ['view']
-        if self.need_replicate():
+        results = []
+        results.extend([v for v in DAF_PRE_VARIABLES if v not in results])
+        results = DAF_PRE_VARIABLES[:]
+        if self.need_replicate() and 'replicate' not in results:
             results.append('replicate')
 
         for obj in self.model.get_targets(self.submissionSet, variables_term):
             value = str(fromTypedNode(obj))
-            results.append(value)
-        results.append('labVersion')
+            if value not in results:
+                results.append(value)
+        results.extend([v for v in DAF_POST_VARIABLES if v not in results])
         return results
 
     def make_submission_name(self, submission_dir):
@@ -492,7 +513,7 @@ class DAFMapper(object):
         else:
             msg = "Found wrong number of view names for {0} len = {1}"
             msg = msg.format(str(view), len(names))
-            logger.error(msg)
+            LOGGER.error(msg)
             raise RuntimeError(msg)
 
     def _get_filename_view_map(self):
@@ -507,11 +528,11 @@ class DAFMapper(object):
         for s in self.model.find_statements(filename_query):
             view_name = s.subject
             literal_re = s.object.literal_value['string']
-            logger.debug("Found: %s" % (literal_re,))
+            LOGGER.debug("Found: %s" % (literal_re,))
             try:
                 filename_re = re.compile(literal_re)
             except re.error, e:
-                logger.error("Unable to compile: %s" % (literal_re,))
+                LOGGER.error("Unable to compile: %s" % (literal_re,))
             patterns[literal_re] = view_name
         return patterns
 
@@ -530,9 +551,13 @@ class DAFMapper(object):
             errmsg = "%s doesn't have a library type"
             raise ModelException(errmsg % (str(libNode),))
 
-        #single = (1,3,6)
-        single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
-        paired = ['Paired End', 'Multiplexing', 'Barcoded']
+        single = ['CSHL (lacking last nt)',
+                  'Single End (non-multiplexed)',
+                  'Small RNA (non-multiplexed)',]
+        paired = ['Barcoded Illumina',
+                  'Multiplexing',
+                  'Nextera',
+                  'Paired End (non-multiplexed)',]
         if library_type in single:
             return False
         elif library_type in paired:
@@ -554,3 +579,57 @@ class DAFMapper(object):
                 return True
 
         return False
+
+
+    def link_daf(self, result_map):
+        if self.daf is None or len(self.daf) == 0:
+            raise RuntimeError(
+                "DAF data does not exist, how can I link to it?")
+
+        base_daf = self.daf_name
+
+        for result_dir in result_map.values():
+            if not os.path.exists(result_dir):
+                raise RuntimeError(
+                    "Couldn't find target directory %s" %(result_dir,))
+            submission_daf = os.path.join(result_dir, base_daf)
+            if os.path.exists(submission_daf):
+                previous_daf = open(submission_daf, 'r').read()
+                if self.daf != previous_daf:
+                    LOGGER.info("Old daf is different, overwriting it.")
+            stream = open(submission_daf, 'w')
+            stream.write(self.daf)
+            stream.close()
+
+
+if __name__ == "__main__":
+    example_daf = """# Lab and general info
+grant             Hardison
+lab               Caltech-m
+dataType          ChipSeq
+variables         cell, antibody,sex,age,strain,control
+compositeSuffix   CaltechHistone
+assembly          mm9
+dafVersion        2.0
+validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000
+
+# Track/view definition
+view             FastqRd1
+longLabelPrefix  Caltech Fastq Read 1
+type             fastq
+hasReplicates    yes
+required         no
+
+view             Signal
+longLabelPrefix  Caltech Histone Signal
+type             bigWig
+hasReplicates    yes
+required         no
+"""
+    model = get_model()
+    example_daf_stream = StringIO(example_daf)
+    name = "test_rep"
+    mapper = DAFMapper(name, daf_file = example_daf_stream, model=model)
+    dump_model(model)
+
+