Further clean up ddf generation.
authorDiane Trout <diane@caltech.edu>
Wed, 29 Jun 2011 06:11:04 +0000 (23:11 -0700)
committerDiane Trout <diane@caltech.edu>
Wed, 29 Jun 2011 06:11:04 +0000 (23:11 -0700)
This still needs work as I ended up hard coding a sparql query
to support the submission I'm currently working on -- which is
unfortunate as the whole point of the push to RDF was to reduce
hard coding.

However it did simplify collecting information for make_ddf.

Using the query would also mean that the term copying I was doing
earlier, moving library attributes into each specific submission view
would be unecessary, since I can now easily query the graphs.

Probably what I need to do after the submission is to reduce the
term copying when importing a submisison directory, and add some
way of tying a sparql query to a specific imported daf.

Though I need to deal with the upcoming submission deadlines first.

extra/ucsc_encode_submission/encode_find.py
extra/ucsc_encode_submission/ucsc_gather.py
htsworkflow/submission/daf.py
htsworkflow/submission/test/test_daf.py
htsworkflow/util/hashfile.py [new file with mode: 0644]
htsworkflow/util/rdfhelp.py

index 25cacb782647855d64fb30c9ca7200776f8e4ada..2a1890afd4a44ba426f7befe895727b4f4309139 100644 (file)
@@ -21,6 +21,7 @@ from htsworkflow.util.rdfhelp import \
      dublinCoreNS, \
      get_model, \
      get_serializer, \
+     sparql_query, \
      submitOntology, \
      libraryOntology, \
      load_into_model, \
@@ -353,18 +354,6 @@ def get_date_contents(element):
     else:
         return None
 
-def sparql_query(model, query_filename):
-    """Execute sparql query from file
-    """
-    query_body = open(query_filename,'r').read()
-    query = RDF.SPARQLQuery(query_body)
-    results = query.execute(model)
-    for row in results:
-        output = []
-        for k,v in row.items()[::-1]:
-            print "{0}: {1}".format(k,v)
-        print 
-
         
 def load_into_model(model, parser_name, filename):
     if not os.path.exists(filename):
index ddd1fad63512c79f0ea9394931f0a381ad099579..d52b11bc122cf523a06253091e461848ce33a6de 100755 (executable)
@@ -11,7 +11,6 @@ from pprint import pprint, pformat
 import shlex
 from StringIO import StringIO
 import stat
-from subprocess import Popen, PIPE
 import sys
 import time
 import types
@@ -19,6 +18,8 @@ import urllib
 import urllib2
 import urlparse
 
+import RDF
+
 from htsworkflow.util import api
 from htsworkflow.util.rdfhelp import \
      dafTermOntology, \
@@ -26,6 +27,7 @@ from htsworkflow.util.rdfhelp import \
      get_model, \
      get_serializer, \
      load_into_model, \
+     sparql_query, \
      submissionOntology 
 from htsworkflow.submission.daf import \
      DAFMapper, \
@@ -62,9 +64,6 @@ def main(cmdline=None):
     if opts.make_ddf and opts.daf is None:
         parser.error("Please specify your daf when making ddf files")
 
-    if len(args) == 0:
-        parser.error("I need at least one library submission-dir input file")
-        
     library_result_map = []
     for a in args:
         library_result_map.extend(read_library_result_map(a))
@@ -86,6 +85,9 @@ def main(cmdline=None):
     if opts.make_ddf:
         make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
 
+    if opts.sparql:
+        sparql_query(model, opts.sparql)
+        
     if opts.print_rdf:
         writer = get_serializer()
         print writer.serialize_model_to_string(model)
@@ -100,6 +102,7 @@ def make_parser():
       help="Load model database")
     model.add_option('--load-rdf', default=None,
       help="load rdf statements into model")
+    model.add_option('--sparql', default=None, help="execute sparql query")
     model.add_option('--print-rdf', action="store_true", default=False,
       help="print ending model state")
     parser.add_option_group(model)
@@ -203,6 +206,31 @@ def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None)
     """
     Make ddf files, and bonus condor file
     """
+    query_template = """PREFIX libraryOntology: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
+PREFIX submissionOntology: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
+PREFIX ucscDaf: <http://jumpgate.caltech.edu/wiki/UcscDaf#>
+
+select ?submitView  ?filename ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol
+WHERE {
+  ?file ucscDaf:filename ?filename ;
+        ucscDaf:md5sum ?md5sum .
+  ?submitView ucscDaf:has_file ?file ;
+              ucscDaf:view ?dafView ;
+              ucscDaf:submission %(submission)s .
+  ?dafView ucscDaf:name ?view .
+  %(submission)s submissionOntology:library ?library .
+
+  OPTIONAL { ?submitView ucscDaf:antibody ?antibody }
+  OPTIONAL { ?submitView ucscDaf:cell ?cell }
+  OPTIONAL { ?submitView ucscDaf:control ?control }
+  OPTIONAL { ?library ucscDaf:controlId ?controlId }
+  OPTIONAL { ?submitView ucscDaf:sex ?sex }
+  OPTIONAL { ?submitView ucscDaf:labVersion ?labExpId }
+  OPTIONAL { ?submitView ucscDaf:labVersion ?labVersion }
+  OPTIONAL { ?library ucscDaf:treatment ?treatment }
+  OPTIONAL { ?submitView ucscDaf:protocol ?protocol }
+}
+ORDER BY  ?submitView""" 
     dag_fragments = []
 
     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
@@ -216,51 +244,51 @@ def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None)
         output = open(outfile,'w')
     else:
         output = sys.stdout
-    
-    # filename goes first
+
+    formatted_query = query_template % {'submission': str(submissionNode)}
+
+    query = RDF.SPARQLQuery(formatted_query)
+    results = query.execute(view_map.model)
+
     variables = ['filename']
+    # filename goes first
     variables.extend(view_map.get_daf_variables())
+    variables += ['controlId', 'labExpId', 'md5sum']
     output.write('\t'.join(variables))
     output.write(os.linesep)
     
-    nameTerm = dafTermOntology['name']
-
-    submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
-    file_list = []
-    for viewNode in submission_views:
-        record = []
+    all_views = {}
+    all_files = []
+    for row in results:
+        viewname = fromTypedNode(row['view'])
+        current = all_views.setdefault(viewname, {})
         for variable_name in variables:
-            varNode = dafTermOntology[variable_name]
-            values = list(view_map.model.get_targets(viewNode, varNode))
-            
-            if variable_name == 'view':
-                nameNode = view_map.model.get_target(values[0], nameTerm)
-                values = [fromTypedNode(nameNode)]
+            value = str(fromTypedNode(row[variable_name]))
+            if variable_name in ('filename', 'md5sum'):
+                current.setdefault(variable_name,[]).append(value)
             else:
-                values = [ fromTypedNode(v) for v in values ]
-                if variable_name == 'filename':
-                    file_list.extend(values)
-                              
-            if len(values) == 0:
-                attribute = "#None#"
-            elif len(values) == 1:
-                attribute = values[0]
+                current[variable_name] = value
+
+    for view in all_views.keys():
+        line = []
+        for variable_name in variables:
+            if variable_name in ('filename', 'md5sum'):
+                line.append(','.join(all_views[view][variable_name]))
             else:
-                attribute = ",".join(values)
-            record.append(attribute)
-        output.write('\t'.join(record))
+                line.append(all_views[view][variable_name])
+        output.write("\t".join(line))
         output.write(os.linesep)
-            
+        all_files.extend(all_views[view]['filename'])
+        
     logging.info(
         "Examined {0}, found files: {1}".format(
-            str(submissionNode), ", ".join(file_list)))
+            str(submissionNode), ", ".join(all_files)))
+
+    all_files.append(daf_name)
+    all_files.append(ddf_name)
 
-    file_list.append(daf_name)
-    file_list.append(ddf_name)
-    
     if make_condor:
-        print name, file_list
-        archive_condor = make_condor_archive_script(name, file_list, outdir)
+        archive_condor = make_condor_archive_script(name, all_files, outdir)
         upload_condor = make_condor_upload_script(name, outdir)
         
         dag_fragments.extend( 
@@ -295,7 +323,7 @@ def make_condor_archive_script(name, files, outdir=None):
 Executable = /bin/tar
 arguments = czvhf ../%(archivename)s %(filelist)s
 
-Error = compress.err.$(Process).log
+Error = compress.out.$(Process).log
 Output = compress.out.$(Process).log
 Log = /tmp/submission-compress-%(user)s.log
 initialdir = %(initialdir)s
@@ -313,7 +341,7 @@ queue
 
     context = {'archivename': make_submission_name(name),
                'filelist': " ".join(files),
-               'initialdir': os.getcwd(),
+               'initialdir': os.path.abspath(outdir), 
                'user': os.getlogin()}
 
     condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
@@ -329,7 +357,7 @@ def make_condor_upload_script(name, outdir=None):
 Executable = /usr/bin/lftp
 arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s
 
-Error = upload.err.$(Process).log
+Error = upload.out.$(Process).log
 Output = upload.out.$(Process).log
 Log = /tmp/submission-upload-%(user)s.log
 initialdir = %(initialdir)s
@@ -345,7 +373,7 @@ queue
     ftpuser = auth.hosts[encodeftp][0]
     ftppassword = auth.hosts[encodeftp][2]
     context = {'archivename': make_submission_name(name),
-               'initialdir': outdir,
+               'initialdir': os.path.abspath(outdir),
                'user': os.getlogin(),
                'ftpuser': ftpuser,
                'ftppassword': ftppassword,
@@ -432,46 +460,5 @@ def validate_filelist(files):
         if not os.path.exists(f):
             raise RuntimeError("%s does not exist" % (f,))
 
-def make_md5sum(filename):
-    """Quickly find the md5sum of a file
-    """
-    md5_cache = os.path.join(filename+".md5")
-    print md5_cache
-    if os.path.exists(md5_cache):
-        logging.debug("Found md5sum in {0}".format(md5_cache))
-        stream = open(md5_cache,'r')
-        lines = stream.readlines()
-        md5sum = parse_md5sum_line(lines, filename)
-    else:
-        md5sum = make_md5sum_unix(filename, md5_cache)
-    return md5sum
-    
-def make_md5sum_unix(filename, md5_cache):
-    cmd = ["md5sum", filename]
-    logging.debug("Running {0}".format(" ".join(cmd)))
-    p = Popen(cmd, stdout=PIPE)
-    stdin, stdout = p.communicate()
-    retcode = p.wait()
-    logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
-    if retcode != 0:
-        logging.error("Trouble with md5sum for {0}".format(filename))
-        return None
-    lines = stdin.split(os.linesep)
-    md5sum = parse_md5sum_line(lines, filename)
-    if md5sum is not None:
-        logging.debug("Caching sum in {0}".format(md5_cache))
-        stream = open(md5_cache, "w")
-        stream.write(stdin)
-        stream.close()
-    return md5sum
-
-def parse_md5sum_line(lines, filename):
-    md5sum, md5sum_filename = lines[0].split()
-    if md5sum_filename != filename:
-        errmsg = "MD5sum and I disagre about filename. {0} != {1}"
-        logging.error(errmsg.format(filename, md5sum_filename))
-        return None
-    return md5sum
-
 if __name__ == "__main__":
     main()
index 9429a3818bf93fb4e2093add094a0848db97d948..28f75999ce80974198a8dad9858524c06b896d71 100644 (file)
@@ -20,6 +20,7 @@ from htsworkflow.util.rdfhelp import \
      submissionOntology, \
      toTypedNode, \
      fromTypedNode
+from htsworkflow.util.hashfile import make_md5sum
 
 logger = logging.getLogger(__name__)
 
@@ -239,18 +240,6 @@ class DAFMapper(object):
         for f in submission_files:
             self.construct_file_attributes(submission_dir, libNode, f)
 
-            #attributes['md5sum'] = "None"
-            #
-            #ext = attributes["filename_re"]
-            #if attributes.get("type", None) == 'fastq':
-            #    fastqs.setdefault(ext, set()).add(f)
-            #    fastq_attributes[ext] = attributes
-            #else:
-            #    md5sum = make_md5sum(os.path.join(result_dir,f))
-            #    if md5sum is not None:
-            #        attributes['md5sum']=md5sum
-            #print attributes
-            
         
     def construct_file_attributes(self, submission_dir, libNode, pathname):
         """Looking for the best extension
@@ -280,17 +269,16 @@ class DAFMapper(object):
         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView))
         self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name)))
         self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
-
-
-        self.model.add_statement(
-            RDF.Statement(submissionView, dafTermOntology['filename'], toTypedNode(filename)))
+        self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['library'], libNode))
+        
+        # add trac specific information
         self.model.add_statement(
             RDF.Statement(submissionView, dafTermOntology['view'], view))
         self.model.add_statement(
             RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
         self.model.add_statement(
             RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode))
-            
+
         # extra information 
         terms = [dafTermOntology['type'],
                  dafTermOntology['filename_re'],
@@ -303,6 +291,21 @@ class DAFMapper(object):
             if value is not None:
                 self.model.add_statement(RDF.Statement(submissionView, term, value))
 
+        # add file specific information
+        fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
+        submission_pathname = os.path.join(submission_dir, filename)
+        md5 = make_md5sum(submission_pathname)
+        self.model.add_statement(
+            RDF.Statement(submissionView, dafTermOntology['has_file'], fileNode))
+        self.model.add_statement(
+            RDF.Statement(fileNode, dafTermOntology['filename'], filename))
+
+        if md5 is None:
+            logging.warning("Unable to produce md5sum for %s" % ( submission_pathname))
+        else:
+            self.model.add_statement(
+                RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
+
             
     def _add_library_details_to_model(self, libNode):
         parser = RDF.Parser(name='rdfa')
index 5da4dc2faff3217fdf7386849add99c089ccc2d8..5d647f6c72c6e254e34acb652cb3ed36fd26ab4f 100644 (file)
@@ -1,4 +1,8 @@
+from contextlib import contextmanager
+import os
 from StringIO import StringIO
+import shutil
+import tempfile
 import unittest
 
 from htsworkflow.submission import daf
@@ -168,12 +172,24 @@ class TestDAFMapper(unittest.TestCase):
         
         species = daf_mapper._get_library_attribute(libNode, 'species')
         self.failUnlessEqual(species, "Homo sapiens")
-        
-        daf_mapper.construct_file_attributes('/tmp/analysis1', libNode, 'filename.bam')
+
+        with mktempdir('analysis') as analysis_dir:
+            path, analysis_name = os.path.split(analysis_dir)
+            with mktempfile('.bam', dir=analysis_dir) as filename:
+                print 'dir', os.listdir(analysis_dir)
+                daf_mapper.construct_file_attributes(analysis_dir,
+                                                     libNode,
+                                                     filename)
+            
+        sub_root = "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/"
+        submission_name = sub_root + analysis_name
         source = daf_mapper.model.get_source(rdfNS['type'], submissionOntology['submission'])
-        self.failUnlessEqual(str(source.uri), "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/analysis1")
+
+        self.failUnlessEqual(str(source.uri), submission_name)
+
+        view_name = submission_name + '/Signal'
         view = daf_mapper.model.get_target(source, submissionOntology['has_view'])
-        self.failUnlessEqual(str(view.uri), "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/analysis1/Signal")
+        self.failUnlessEqual(str(view.uri), view_name)
 
     def test_library_url(self):
         daf_mapper = load_daf_mapper('urltest')
@@ -182,7 +198,24 @@ class TestDAFMapper(unittest.TestCase):
                              'http://jumpgate.caltech.edu/library/')
         daf_mapper.library_url = 'http://google.com'
         self.failUnlessEqual(daf_mapper.library_url, 'http://google.com' )
-        
+
+@contextmanager
+def mktempdir(prefix='tmp'):
+    d = tempfile.mkdtemp(prefix=prefix)
+    print "made", d
+    yield d
+    shutil.rmtree(d)
+    print "unmade", d
+
+@contextmanager
+def mktempfile(suffix='', prefix='tmp', dir=None):
+    fd, pathname = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
+    yield pathname
+    print "made", pathname
+    os.close(fd)
+    os.unlink(pathname)
+    print "unmade", pathname
+    
 def suite():
     suite = unittest.makeSuite(TestDAF, 'test')
     suite.addTest(unittest.makeSuite(TestDAFMapper, 'test'))
diff --git a/htsworkflow/util/hashfile.py b/htsworkflow/util/hashfile.py
new file mode 100644 (file)
index 0000000..f246410
--- /dev/null
@@ -0,0 +1,49 @@
+"""Utility to make md5sums of a file caching as a parallel file
+"""
+import logging
+import os
+from subprocess import Popen, PIPE
+
+logger = logging.getLogger(__name__)
+
+def make_md5sum(filename):
+    """Quickly find the md5sum of a file
+    """
+    md5_cache = os.path.join(filename+".md5")
+    print md5_cache
+    if os.path.exists(md5_cache):
+        logger.debug("Found md5sum in {0}".format(md5_cache))
+        stream = open(md5_cache,'r')
+        lines = stream.readlines()
+        md5sum = parse_md5sum_line(lines, filename)
+    else:
+        md5sum = make_md5sum_unix(filename, md5_cache)
+    return md5sum
+    
+def make_md5sum_unix(filename, md5_cache):
+    cmd = ["md5sum", filename]
+    logger.debug("Running {0}".format(" ".join(cmd)))
+    p = Popen(cmd, stdout=PIPE)
+    stdin, stdout = p.communicate()
+    retcode = p.wait()
+    logger.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode))
+    if retcode != 0:
+        logger.error("Trouble with md5sum for {0}".format(filename))
+        return None
+    lines = stdin.split(os.linesep)
+    md5sum = parse_md5sum_line(lines, filename)
+    if md5sum is not None:
+        logger.debug("Caching sum in {0}".format(md5_cache))
+        stream = open(md5_cache, "w")
+        stream.write(stdin)
+        stream.close()
+    return md5sum
+
+def parse_md5sum_line(lines, filename):
+    md5sum, md5sum_filename = lines[0].split()
+    if md5sum_filename != filename:
+        errmsg = "MD5sum and I disagre about filename. {0} != {1}"
+        logger.error(errmsg.format(filename, md5sum_filename))
+        return None
+    return md5sum
+
index 91414145af9f3955726b36602a6dd585979eb388..7b7870363cce0888d41bd047f521b127e37c05ff 100644 (file)
@@ -18,6 +18,19 @@ dafTermOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UcscDaf#")
 libraryOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#")
 submissionLog = RDF.NS("http://jumpgate.caltech.edu/wiki/SubmissionsLog/")
 
+def sparql_query(model, query_filename):
+    """Execute sparql query from file
+    """
+    query_body = open(query_filename,'r').read()
+    query = RDF.SPARQLQuery(query_body)
+    results = query.execute(model)
+    for row in results:
+        output = []
+        for k,v in row.items()[::-1]:
+            print "{0}: {1}".format(k,v)
+        print 
+
+
 def blankOrUri(value=None):
     node = None
     if value is None: