Attempt to download DAF data for a encodesubmit submission
[htsworkflow.git] / htsworkflow / submission / daf.py
index b8a4177fd46fd164ea9f1940c1ecc99ce8c92eab..1be882b051eba71ffe274c8b2b905500c70e67ca 100644 (file)
@@ -24,47 +24,64 @@ from htsworkflow.util.hashfile import make_md5sum
 
 logger = logging.getLogger(__name__)
 
-#
-class ModelException(RuntimeError): pass
+
+class ModelException(RuntimeError):
+    """Assumptions about the RDF model failed"""
+    pass
+
+
 class MetadataLookupException(RuntimeError):
     """Problem accessing metadata"""
     pass
 
+
 # STATES
 DAF_HEADER = 1
 DAF_VIEW = 2
 
-def parse_into_model(model, submission_name, filename):
+
+def parse_into_model(model, subject, filename):
     """Read a DAF into RDF Model
 
-    requires a short submission name
+    requires a subject node to attach statements to
     """
     attributes = parse(filename)
-    add_to_model(model, attributes, submission_name)
+    add_to_model(model, attributes, subject)
 
-def fromstream_into_model(model, submission_name, daf_stream):
+
+def fromstream_into_model(model, subject, daf_stream):
+    """Load daf stream into model attached to node subject
+    """
     attributes = parse_stream(daf_stream)
-    add_to_model(model, attributes, submission_name)
-    
-def fromstring_into_model(model, submission_name, daf_string):
+    add_to_model(model, attributes, subject)
+
+
+def fromstring_into_model(model, subject, daf_string):
     """Read a string containing a DAF into RDF Model
 
     requires a short submission name
     """
     attributes = fromstring(daf_string)
-    add_to_model(model, attributes, submission_name)
-    
+    add_to_model(model, attributes, subject)
+
+
 def parse(filename):
-    stream = open(filename,'r')
-    attributes =  parse_stream(stream)
+    """Parse daf from a file
+    """
+    stream = open(filename, 'r')
+    attributes = parse_stream(stream)
     stream.close()
     return attributes
 
+
 def fromstring(daf_string):
+    """Parse UCSC daf from a provided string"""
     stream = StringIO(daf_string)
     return parse_stream(stream)
 
+
 def parse_stream(stream):
+    """Parse UCSC dat stored in a stream"""
     comment_re = re.compile("#.*$")
 
     state = DAF_HEADER
@@ -84,7 +101,7 @@ def parse_stream(stream):
             value = True
         elif value.lower() in ('no',):
             value = False
-            
+
         if len(name) == 0:
             if view_name is not None:
                 attributes['views'][view_name] = view_attributes
@@ -92,7 +109,7 @@ def parse_stream(stream):
                 view_attributes = {}
             state = DAF_HEADER
         elif state == DAF_HEADER and name == 'variables':
-            attributes[name] = [ x.strip() for x in value.split(',')]
+            attributes[name] = [x.strip() for x in value.split(',')]
         elif state == DAF_HEADER and name == 'view':
             view_name = value
             view_attributes['view'] = value
@@ -105,36 +122,51 @@ def parse_stream(stream):
     # save last block
     if view_name is not None:
         attributes['views'][view_name] = view_attributes
-        
+
     return attributes
 
+
 def _consume_whitespace(line, start=0):
+    """return index of next non whitespace character
+
+    returns length of string if it can't find anything
+    """
     for i in xrange(start, len(line)):
         if line[i] not in string.whitespace:
             return i
-        
+
     return len(line)
 
+
 def _extract_name_index(line, start=0):
+    """Used to find end of word by looking for a whitespace character
+
+    returns length of string if nothing matches
+    """
     for i in xrange(start, len(line)):
         if line[i] in string.whitespace:
             return i
-        
+
     return len(line)
 
+
 def _extract_value_index(line, start=0):
+    """Returns position of last non-whitespace character
+    """
     shortline = line.rstrip()
     return len(shortline)
 
-def convert_to_rdf_statements(attributes, name):
-    submission_uri = get_submission_uri(name)
-    subject = RDF.Node(submission_uri)
 
+def convert_to_rdf_statements(attributes, subject):
+    """Convert dictionary of DAF attributes into rdf statements
+
+    The statements are attached to the provided subject node
+    """
     statements = []
     for daf_key in attributes:
         predicate = dafTermOntology[daf_key]
         if daf_key == 'views':
-            statements.extend(_views_to_statements(name,
+            statements.extend(_views_to_statements(subject,
                                                    dafTermOntology,
                                                    attributes[daf_key]))
         elif daf_key == 'variables':
@@ -145,13 +177,15 @@ def convert_to_rdf_statements(attributes, name):
         else:
             value = attributes[daf_key]
             obj = toTypedNode(value)
-            statements.append(RDF.Statement(subject,predicate,obj))
+            statements.append(RDF.Statement(subject, predicate, obj))
 
     return statements
 
-def _views_to_statements(name, dafNS, views):
-    subject = RDF.Node(get_submission_uri(name))
-    viewNS = get_view_namespace(name)
+
+def _views_to_statements(subject, dafNS, views):
+    """Attach view attributes to new view nodes atached to provided subject
+    """
+    viewNS = get_view_namespace(subject)
 
     statements = []
     for view_name in views:
@@ -163,23 +197,38 @@ def _views_to_statements(name, dafNS, views):
         for view_attribute_name in view_attributes:
             predicate = dafNS[view_attribute_name]
             obj = toTypedNode(view_attributes[view_attribute_name])
-            statements.append(RDF.Statement(viewSubject, predicate, obj)) 
-            
+            statements.append(RDF.Statement(viewSubject, predicate, obj))
+
         #statements.extend(convert_to_rdf_statements(view, viewNode))
     return statements
 
-def add_to_model(model, attributes, name):
-    for statement in convert_to_rdf_statements(attributes, name):
+
+def add_to_model(model, attributes, subject):
+    for statement in convert_to_rdf_statements(attributes, subject):
         model.add_statement(statement)
-            
+
+
 def get_submission_uri(name):
-    return  submissionLog[name].uri
+    return submissionLog[name].uri
+
+
+def submission_uri_to_string(submission_uri):
+    if isinstance(submission_uri, RDF.Node):
+        submission_uri = str(submission_uri.uri)
+    elif isinstance(submission_uri, RDF.Uri):
+        submission_uri = str(submission_uri)
+    if submission_uri[-1] != '/':
+        submission_uri += '/'
+    return submission_uri
 
-def get_view_namespace(name):
-    submission_uri = get_submission_uri(name)
-    viewNS = RDF.NS(str(submission_uri) + '/view/')
+
+def get_view_namespace(submission_uri):
+    submission_uri = submission_uri_to_string(submission_uri)
+    view_uri = urlparse.urljoin(submission_uri, 'view/')
+    viewNS = RDF.NS(view_uri)
     return viewNS
 
+
 class DAFMapper(object):
     """Convert filenames to views in the UCSC Daf
     """
@@ -200,6 +249,9 @@ class DAFMapper(object):
             logger.error("We need a DAF or Model containing a DAF to work")
 
         self.name = name
+        self.submissionSet = get_submission_uri(self.name)
+        self.viewNS = get_view_namespace(self.submissionSet)
+
         if model is not None:
             self.model = model
         else:
@@ -207,46 +259,40 @@ class DAFMapper(object):
 
         if hasattr(daf_file, 'next'):
             # its some kind of stream
-            fromstream_into_model(self.model, name, daf_file)
+            fromstream_into_model(self.model, self.submissionSet, daf_file)
         else:
             # file
-            parse_into_model(self.model, name, daf_file)
+            parse_into_model(self.model, self.submissionSet, daf_file)
 
         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
-        self.submissionSet = get_submission_uri(self.name)
-        self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/')
+        self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/')
         self.__view_map = None
 
-
     def add_pattern(self, view_name, filename_pattern):
         """Map a filename regular expression to a view name
         """
-        viewNS = get_view_namespace(self.name)
-
         obj = toTypedNode(filename_pattern)
         self.model.add_statement(
-            RDF.Statement(viewNS[view_name],
+            RDF.Statement(self.viewNS[view_name],
                           dafTermOntology['filename_re'],
                           obj))
 
-
     def import_submission_dir(self, submission_dir, library_id):
         """Import a submission directories and update our model as needed
         """
         #attributes = get_filename_attribute_map(paired)
         libNode = self.libraryNS[library_id + "/"]
-        
+
         self._add_library_details_to_model(libNode)
-        
+
         submission_files = os.listdir(submission_dir)
-        for f in submission_files:
-            self.construct_file_attributes(submission_dir, libNode, f)
+        for filename in submission_files:
+            self.construct_track_attributes(submission_dir, libNode, filename)
 
-        
-    def construct_file_attributes(self, submission_dir, libNode, pathname):
+    def construct_track_attributes(self, submission_dir, libNode, pathname):
         """Looking for the best extension
         The 'best' is the longest match
-        
+
         :Args:
         filename (str): the filename whose extention we are about to examine
         """
@@ -255,7 +301,7 @@ class DAFMapper(object):
         logger.debug("Searching for view")
         view = self.find_view(filename)
         if view is None:
-            logger.warn("Unrecognized file: %s" % (pathname,))
+            logger.warn("Unrecognized file: {0}".format(pathname))
             return None
         if str(view) == str(libraryOntology['ignore']):
             return None
@@ -263,55 +309,81 @@ class DAFMapper(object):
         submission_name = self.make_submission_name(submission_dir)
         submissionNode = self.get_submission_node(submission_dir)
         submission_uri = str(submissionNode.uri)
-        view_name = fromTypedNode(self.model.get_target(view, dafTermOntology['name']))
+        view_name = fromTypedNode(self.model.get_target(view,
+                                       dafTermOntology['name']))
         if view_name is None:
-            logging.warning('Could not find view name for {0}'.format(str(view)))
+            errmsg = 'Could not find view name for {0}'
+            logging.warning(errmsg.format(str(view)))
             return
-        
+
         view_name = str(view_name)
         submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name))
 
         self.model.add_statement(
-            RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode))
+            RDF.Statement(self.submissionSet,
+                          dafTermOntology['has_submission'],
+                          submissionNode))
         logger.debug("Adding statements to {0}".format(str(submissionNode)))
-        self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView))
-        self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name)))
-        self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
-        self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['library'], libNode))
+        self.model.add_statement(RDF.Statement(submissionNode,
+                                               submissionOntology['has_view'],
+                                               submissionView))
+        self.model.add_statement(RDF.Statement(submissionNode,
+                                               submissionOntology['name'],
+                                               toTypedNode(submission_name)))
+        self.model.add_statement(
+            RDF.Statement(submissionNode,
+                          rdfNS['type'],
+                          submissionOntology['submission']))
+        self.model.add_statement(RDF.Statement(submissionNode,
+                                               submissionOntology['library'],
+                                               libNode))
 
         logger.debug("Adding statements to {0}".format(str(submissionView)))
-        # add trac specific information
+        # add track specific information
         self.model.add_statement(
             RDF.Statement(submissionView, dafTermOntology['view'], view))
         self.model.add_statement(
-            RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
+            RDF.Statement(submissionView,
+                          dafTermOntology['paired'],
+                          toTypedNode(self._is_paired(libNode))))
         self.model.add_statement(
-            RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode))
+            RDF.Statement(submissionView,
+                          dafTermOntology['submission'],
+                          submissionNode))
 
-        # extra information 
+        # extra information
         terms = [dafTermOntology['type'],
                  dafTermOntology['filename_re'],
                  ]
         terms.extend((dafTermOntology[v] for v in self.get_daf_variables()))
 
+        # add file specific information
+        self.create_file_attributes(filename, submissionView, submission_uri, submission_dir)
+
+        logger.debug("Done.")
+
+    def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir):
         # add file specific information
         logger.debug("Updating file md5sum")
         fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
         submission_pathname = os.path.join(submission_dir, filename)
-        md5 = make_md5sum(submission_pathname)
         self.model.add_statement(
-            RDF.Statement(submissionView, dafTermOntology['has_file'], fileNode))
+            RDF.Statement(submissionView,
+                          dafTermOntology['has_file'],
+                          fileNode))
         self.model.add_statement(
-            RDF.Statement(fileNode, dafTermOntology['filename'], filename))
+            RDF.Statement(fileNode,
+                          dafTermOntology['filename'],
+                          filename))
 
+        md5 = make_md5sum(submission_pathname)
         if md5 is None:
-            logging.warning("Unable to produce md5sum for %s" % ( submission_pathname))
+            errmsg = "Unable to produce md5sum for {0}"
+            logging.warning(errmsg.format(submission_pathname))
         else:
             self.model.add_statement(
                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
 
-        logger.debug("Done.")
-        
     def _add_library_details_to_model(self, libNode):
         parser = RDF.Parser(name='rdfa')
         new_statements = parser.parse_as_stream(libNode.uri)
@@ -328,7 +400,7 @@ class DAFMapper(object):
         results = ['view']
         if self.need_replicate():
             results.append('replicate')
-            
+
         for obj in self.model.get_targets(self.submissionSet, variableTerm):
             value = str(fromTypedNode(obj))
             results.append(value)
@@ -340,15 +412,15 @@ class DAFMapper(object):
         submission_dir_name = os.path.split(submission_dir)[1]
         if len(submission_dir_name) == 0:
             raise RuntimeError(
-                "Submission dir name too short: %s" %(submission_dir,))
+                "Submission dir name too short: {0}".format(submission_dir))
         return submission_dir_name
-        
+
     def get_submission_node(self, submission_dir):
         """Convert a submission directory name to a submission node
         """
         submission_name = self.make_submission_name(submission_dir)
         return self.submissionSetNS[submission_name]
-        
+
     def _get_library_attribute(self, libNode, attribute):
         if not isinstance(attribute, RDF.Node):
             attribute = libraryOntology[attribute]
@@ -362,7 +434,7 @@ class DAFMapper(object):
         #targets = self._search_same_as(libNode, attribute)
         #if targets is not None:
         #    return self._format_library_attribute(targets)
-        
+
         # we don't know anything about this attribute
         self._add_library_details_to_model(libNode)
 
@@ -371,7 +443,7 @@ class DAFMapper(object):
             return self._format_library_attribute(targets)
 
         return None
-    
+
     def _format_library_attribute(self, targets):
         if len(targets) == 0:
             return None
@@ -388,7 +460,7 @@ class DAFMapper(object):
             if len(targets) > 0:
                 return targets
         return None
-        
+
     def find_view(self, filename):
         """Search through potential DAF filename patterns
         """
@@ -409,9 +481,10 @@ class DAFMapper(object):
             return results[0]
         else:
             return None
-        
+
     def get_view_name(self, view):
-        names = list(self.model.get_targets(view, submissionOntology['view_name']))
+        view_term = submissionOntology['view_name']
+        names = list(self.model.get_targets(view, view_term))
         if len(names) == 1:
             return fromTypedNode(names[0])
         else:
@@ -419,8 +492,7 @@ class DAFMapper(object):
             msg = msg.format(str(view), len(names))
             logger.error(msg)
             raise RuntimeError(msg)
-        
-            
+
     def _get_filename_view_map(self):
         """Query our model for filename patterns
 
@@ -443,16 +515,19 @@ class DAFMapper(object):
 
     def _get_library_url(self):
         return str(self.libraryNS[''].uri)
+
     def _set_library_url(self, value):
         self.libraryNS = RDF.NS(str(value))
+
     library_url = property(_get_library_url, _set_library_url)
 
     def _is_paired(self, libNode):
         """Determine if a library is paired end"""
         library_type = self._get_library_attribute(libNode, 'library_type')
         if library_type is None:
-            raise ModelException("%s doesn't have a library type" % (str(libNode),))
-        
+            errmsg = "%s doesn't have a library type"
+            raise ModelException(errmsg % (str(libNode),))
+
         #single = (1,3,6)
         single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)']
         paired = ['Paired End', 'Multiplexing', 'Barcoded']
@@ -475,5 +550,5 @@ class DAFMapper(object):
             replicate = self.model.get_target(view, replicateTerm)
             if fromTypedNode(replicate):
                 return True
-            
+
         return False