Finish new RDF based ddf construction.
authorDiane Trout <diane@caltech.edu>
Tue, 21 Jun 2011 23:15:09 +0000 (16:15 -0700)
committerDiane Trout <diane@caltech.edu>
Tue, 21 Jun 2011 23:15:09 +0000 (16:15 -0700)
The previous patch hadn't implemented actually writing the ddf to
a file.

extra/ucsc_encode_submission/ucsc_gather.py
htsworkflow/submission/daf.py
htsworkflow/submission/test/test_daf.py
htsworkflow/util/rdfhelp.py

index 2b9ba85b4edf202fb951c76024711eccd5d76b52..3877379c1de838e3762f4b6ccedd2c76cd0d25f4 100755 (executable)
@@ -5,7 +5,7 @@ from glob import glob
 import json
 import logging
 import netrc
-from optparse import OptionParser
+from optparse import OptionParser, OptionGroup
 import os
 from pprint import pprint, pformat
 import shlex
@@ -27,7 +27,10 @@ from htsworkflow.util.rdfhelp import \
      get_serializer, \
      load_into_model, \
      submissionOntology 
-from htsworkflow.submission.daf import DAFMapper, get_submission_uri
+from htsworkflow.submission.daf import \
+     DAFMapper, \
+     MetadataLookupException, \
+     get_submission_uri
 from htsworkflow.submission.condorfastq import CondorFastqExtract
 
 logger = logging.getLogger('ucsc_gather')
@@ -48,10 +51,14 @@ def main(cmdline=None):
     model = get_model(opts.load_model)
     mapper = DAFMapper(opts.name, opts.daf,  model)
     submission_uri = get_submission_uri(opts.name)
+
+    if opts.library_url is not None:
+        mapper.library_url = opts.library_url
+        
     if opts.load_rdf is not None:
         load_into_model(model, 'turtle', opts.load_rdf, submission_uri)
 
-    if opts.makeddf and opts.daf is None:
+    if opts.make_ddf and opts.daf is None:
         parser.error("Please specify your daf when making ddf files")
 
     if len(args) == 0:
@@ -64,8 +71,8 @@ def main(cmdline=None):
     if opts.make_tree_from is not None:
         make_tree_from(opts.make_tree_from, library_result_map)
             
-    #if opts.daf is not None:
-    #    link_daf(opts.daf, library_result_map)
+    if opts.link_daf:
+        link_daf(opts.daf, library_result_map)
 
     if opts.fastq:
         extractor = CondorFastqExtract(opts.host, apidata, opts.sequence,
@@ -75,8 +82,8 @@ def main(cmdline=None):
     if opts.scan_submission:
         scan_submission_dirs(mapper, library_result_map)
 
-    if opts.makeddf:
-        make_all_ddfs(mapper, library_result_map, force=opts.force)
+    if opts.make_ddf:
+        make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force)
 
     if opts.print_rdf:
         writer = get_serializer()
@@ -86,31 +93,35 @@ def main(cmdline=None):
 def make_parser():
     parser = OptionParser()
 
-    parser.add_option('--name', help="Set submission name")
-    parser.add_option('--load-model', default=None,
+    model = OptionGroup(parser, 'model')
+    model.add_option('--name', help="Set submission name")
+    model.add_option('--load-model', default=None,
       help="Load model database")
-    parser.add_option('--load-rdf', default=None,
+    model.add_option('--load-rdf', default=None,
       help="load rdf statements into model")
-    parser.add_option('--print-rdf', action="store_true", default=False,
+    model.add_option('--print-rdf', action="store_true", default=False,
       help="print ending model state")
-
+    parser.add_option_group(model)
     # commands
-    parser.add_option('--make-tree-from',
+    commands = OptionGroup(parser, 'commands')
+    commands.add_option('--make-tree-from',
                       help="create directories & link data files",
                       default=None)
-    parser.add_option('--fastq', help="generate scripts for making fastq files",
-                      default=False, action="store_true")
-
-    parser.add_option('--scan-submission', default=False, action="store_true",
+    commands.add_option('--fastq', default=False, action="store_true",
+                        help="generate scripts for making fastq files")
+    commands.add_option('--scan-submission', default=False, action="store_true",
                       help="Import metadata for submission into our model")
-    
-    parser.add_option('--makeddf', help='make the ddfs', default=False,
+    commands.add_option('--link-daf', default=False, action="store_true",
+                        help="link daf into submission directories")
+    commands.add_option('--make-ddf', help='make the ddfs', default=False,
                       action="store_true")
+    parser.add_option_group(commands)
     
-    parser.add_option('--daf', default=None, help='specify daf name')
     parser.add_option('--force', default=False, action="store_true",
                       help="Force regenerating fastqs")
-
+    parser.add_option('--daf', default=None, help='specify daf name')
+    parser.add_option('--library-url', default=None,
+                      help="specify an alternate source for library information")
     # debugging
     parser.add_option('--verbose', default=False, action="store_true",
                       help='verbose logging')
@@ -128,7 +139,7 @@ def make_tree_from(source_path, library_result_map):
         if not os.path.exists(lib_path):
             logging.info("Making dir {0}".format(lib_path))
             os.mkdir(lib_path)
-        source_lib_dir = os.path.join(source_path, lib_path)
+        source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path))
         if os.path.exists(source_lib_dir):
             pass
         for filename in os.listdir(source_lib_dir):
@@ -162,14 +173,18 @@ def scan_submission_dirs(view_map, library_result_map):
     """Look through our submission directories and collect needed information
     """
     for lib_id, result_dir in library_result_map:
-        view_map.import_submission_dir(result_dir, lib_id)
+        logging.info("Importing %s from %s" % (lib_id, result_dir))
+        try:
+            view_map.import_submission_dir(result_dir, lib_id)
+        except MetadataLookupException, e:
+            logging.error("Skipping %s: %s" % (lib_id, str(e)))
         
-def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
+def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False):
     dag_fragment = []
     for lib_id, result_dir in library_result_map:
         submissionNode = view_map.get_submission_node(result_dir)
         dag_fragment.extend(
-            make_ddf(view_map, submissionNode, make_condor, result_dir)
+            make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir)
         )
 
     if make_condor and len(dag_fragment) > 0:
@@ -183,40 +198,48 @@ def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False):
             f.close()
             
 
-def make_ddf(view_map, submissionNode, make_condor=False, outdir=None):
+def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None):
     """
     Make ddf files, and bonus condor file
     """
     dag_fragments = []
-    curdir = os.getcwd()
-    if outdir is not None:
-        os.chdir(outdir)
-    output = sys.stdout
 
     name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name']))
     if name is None:
         logging.error("Need name for %s" % (str(submissionNode)))
         return []
-    
-    ddf_name = name + '.ddf'
-    output = sys.stdout
-    # output = open(ddf_name,'w')
 
+    ddf_name = name + '.ddf'
+    if outdir is not None:
+        outfile = os.path.join(outdir, ddf_name)
+        output = open(outfile,'w')
+    else:
+        output = sys.stdout
+    
     # filename goes first
     variables = ['filename']
     variables.extend(view_map.get_daf_variables())
     output.write('\t'.join(variables))
     output.write(os.linesep)
     
+    nameTerm = dafTermOntology['name']
+
     submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view'])
     file_list = []
     for viewNode in submission_views:
         record = []
         for variable_name in variables:
             varNode = dafTermOntology[variable_name]
-            values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))]
-            if variable_name == 'filename':
-                file_list.extend(values)
+            values = list(view_map.model.get_targets(viewNode, varNode))
+            
+            if variable_name == 'view':
+                nameNode = view_map.model.get_target(values[0], nameTerm)
+                values = [fromTypedNode(nameNode)]
+            else:
+                values = [ fromTypedNode(v) for v in values ]
+                if variable_name == 'filename':
+                    file_list.extend(values)
+                              
             if len(values) == 0:
                 attribute = "#None#"
             elif len(values) == 1:
@@ -231,20 +254,18 @@ def make_ddf(view_map, submissionNode, make_condor=False, outdir=None):
         "Examined {0}, found files: {1}".format(
             str(submissionNode), ", ".join(file_list)))
 
-    #file_list.append(daf_name)
-    #if ddf_name is not None:
-    #    file_list.append(ddf_name)
-    #
-    #if make_condor:
-    #    archive_condor = make_condor_archive_script(ininame, file_list)
-    #    upload_condor = make_condor_upload_script(ininame)
-    #    
-    #    dag_fragments.extend( 
-    #        make_dag_fragment(ininame, archive_condor, upload_condor)
-    #    ) 
-        
-    os.chdir(curdir)
+    file_list.append(daf_name)
+    file_list.append(ddf_name)
     
+    if make_condor:
+        print name, file_list
+        archive_condor = make_condor_archive_script(name, file_list, outdir)
+        upload_condor = make_condor_upload_script(name, outdir)
+        
+        dag_fragments.extend( 
+            make_dag_fragment(name, archive_condor, upload_condor)
+        ) 
+        
     return dag_fragments
 
 
@@ -267,7 +288,7 @@ def read_library_result_map(filename):
     return results
 
 
-def make_condor_archive_script(ininame, files):
+def make_condor_archive_script(name, files, outdir=None):
     script = """Universe = vanilla
 
 Executable = /bin/tar
@@ -282,23 +303,26 @@ request_memory = 20
 
 queue 
 """
+    if outdir is None:
+        outdir = os.getcwd()
     for f in files:
-        if not os.path.exists(f):
+        pathname = os.path.join(outdir, f)
+        if not os.path.exists(pathname):
             raise RuntimeError("Missing %s" % (f,))
 
-    context = {'archivename': make_submission_name(ininame),
+    context = {'archivename': make_submission_name(name),
                'filelist': " ".join(files),
                'initialdir': os.getcwd(),
                'user': os.getlogin()}
 
-    condor_script = make_condor_name(ininame, 'archive')
+    condor_script = os.path.join(outdir, make_condor_name(name, 'archive'))
     condor_stream = open(condor_script,'w')
     condor_stream.write(script % context)
     condor_stream.close()
     return condor_script
 
 
-def make_condor_upload_script(ininame):
+def make_condor_upload_script(name, outdir=None):
     script = """Universe = vanilla
 
 Executable = /usr/bin/lftp
@@ -311,19 +335,22 @@ initialdir = %(initialdir)s
 
 queue 
 """
+    if outdir is None:
+        outdir = os.getcwd()
+        
     auth = netrc.netrc(os.path.expanduser("~diane/.netrc"))
     
     encodeftp = 'encodeftp.cse.ucsc.edu'
     ftpuser = auth.hosts[encodeftp][0]
     ftppassword = auth.hosts[encodeftp][2]
-    context = {'archivename': make_submission_name(ininame),
-               'initialdir': os.getcwd(),
+    context = {'archivename': make_submission_name(name),
+               'initialdir': outdir,
                'user': os.getlogin(),
                'ftpuser': ftpuser,
                'ftppassword': ftppassword,
                'ftphost': encodeftp}
 
-    condor_script = make_condor_name(ininame, 'upload')
+    condor_script = os.path.join(outdir, make_condor_name(name, 'upload'))
     condor_stream = open(condor_script,'w')
     condor_stream.write(script % context)
     condor_stream.close()
index e1a9b1f113927dd431fcc43512918ea46f20f104..a3e3c3547f7dc552f45bc98542da8ba3ad901568 100644 (file)
@@ -25,7 +25,10 @@ logger = logging.getLogger(__name__)
 
 #
 class ModelException(RuntimeError): pass
-    
+class MetadataLookupException(RuntimeError):
+    """Problem accessing metadata"""
+    pass
+
 # STATES
 DAF_HEADER = 1
 DAF_VIEW = 2
@@ -154,6 +157,8 @@ def _views_to_statements(name, dafNS, views):
         view_attributes = views[view_name]
         viewSubject = viewNS[view_name]
         statements.append(RDF.Statement(subject, dafNS['views'], viewSubject))
+        statements.append(
+            RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name)))
         for view_attribute_name in view_attributes:
             predicate = dafNS[view_attribute_name]
             obj = toTypedNode(view_attributes[view_attribute_name])
@@ -263,22 +268,30 @@ class DAFMapper(object):
         if str(view) == str(libraryOntology['ignore']):
             return None
 
-        submissionName = toTypedNode(self.make_submission_name(submission_dir))
+        submission_name = self.make_submission_name(submission_dir)
         submissionNode = self.get_submission_node(submission_dir)
+        submission_uri = submissionNode.uri
+        print "submission:", str(submission_name), str(submissionNode), str(submission_uri)
+
+        view_name = fromTypedNode(self.model.get_target(view, dafTermOntology['name']))
+        submissionView = RDF.Node(RDF.Uri(str(submission_uri) + '/' + view_name))
+
         self.model.add_statement(
             RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode))
 
-        fileNode = RDF.Node(RDF.Uri(str(submissionNode.uri) + '/' +filename))
-        self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], view))
-        self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], submissionName))
+        self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView))
+        self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name)))
         self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission']))
 
+
+        self.model.add_statement(
+            RDF.Statement(submissionView, dafTermOntology['filename'], toTypedNode(filename)))
         self.model.add_statement(
-            RDF.Statement(view, dafTermOntology['filename'], toTypedNode(filename)))
+            RDF.Statement(submissionView, dafTermOntology['view'], view))
         self.model.add_statement(
-            RDF.Statement(view, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
+            RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode))))
         self.model.add_statement(
-            RDF.Statement(view, dafTermOntology['submission'], submissionNode))
+            RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode))
             
         # extra information 
         terms = [dafTermOntology['type'],
@@ -290,7 +303,7 @@ class DAFMapper(object):
         for term in terms:
             value = self._get_library_attribute(libNode, term)
             if value is not None:
-                self.model.add_statement(RDF.Statement(view, term, value))
+                self.model.add_statement(RDF.Statement(submissionView, term, value))
 
             
     def _add_library_details_to_model(self, libNode):
@@ -324,7 +337,7 @@ class DAFMapper(object):
             raise RuntimeError(
                 "Submission dir name too short: %s" %(submission_dir,))
         return submission_dir_name
-    
+        
     def get_submission_node(self, submission_dir):
         """Convert a submission directory name to a submission node
         """
@@ -423,4 +436,12 @@ class DAFMapper(object):
         elif library_type in paired:
             return True
         else:
-            raise RuntimeError("Unrecognized library type %s" % (library_type,))
+            raise MetadataLookupException(
+                "Unrecognized library type %s for %s" % \
+                (library_type, str(libNode)))
+
+    def _get_library_url(self):
+        return str(self.libraryNS[''].uri)
+    def _set_library_url(self, value):
+        self.libraryNS = RDF.NS(str(value))
+    library_url = property(_get_library_url, _set_library_url)
index b71e36272083625cf291096ee227ad5292272d9d..a7515b05374ac59a87cf6d845b2cfa073d26dc9d 100644 (file)
@@ -4,6 +4,7 @@ import unittest
 from htsworkflow.submission import daf
 from htsworkflow.util.rdfhelp import \
      dafTermOntology, \
+     fromTypedNode, \
      rdfNS, \
      submissionLog, \
      submissionOntology, \
@@ -65,17 +66,30 @@ class TestDAF(unittest.TestCase):
         daf.add_to_model(model, parsed, name)
 
         signal_view_node = RDF.Node(subNS['/view/Signal'].uri)
+
         writer = get_serializer()
         turtle =  writer.serialize_model_to_string(model)
-        #print turtle
-        
         self.failUnless(str(signal_view_node) in turtle)
 
         statements = list(model.find_statements(
             RDF.Statement(
                 signal_view_node, None, None)))
-        self.failUnlessEqual(len(statements), 5)
-
+        self.failUnlessEqual(len(statements), 6)
+        name = model.get_target(signal_view_node, dafTermOntology['name'])
+        self.failUnlessEqual(fromTypedNode(name), u'Signal')
+
+def load_daf_mapper(name, extra_statements=None):
+    """Load test model in
+    """
+    model = get_model()
+    if extra_statements is not None:
+        parser = RDF.Parser(name='turtle')
+        parser.parse_string_into_model(model, extra_statements,
+                                       'http://extra.extra')
+        
+    test_daf_stream = StringIO(test_daf)
+    mapper = daf.DAFMapper(name, daf_file = test_daf_stream, model=model)
+    return mapper
 
 def dump_model(model):
     writer = get_serializer()
@@ -85,8 +99,7 @@ def dump_model(model):
 class TestDAFMapper(unittest.TestCase):
     def test_create_mapper_add_pattern(self):
         name = 'testsub'
-        test_daf_stream = StringIO(test_daf)
-        mapper = daf.DAFMapper(name, daf_file=test_daf_stream)
+        mapper = load_daf_mapper(name)
         pattern = '.bam\Z(?ms)'
         mapper.add_pattern('Signal', pattern)
 
@@ -102,19 +115,13 @@ class TestDAFMapper(unittest.TestCase):
         #self.failUnlessEqual(search[0].object.literal_value['string'], pattern)
 
     def test_find_one_view(self):
-        model = get_model()
-
-        parser = RDF.Parser(name='turtle')
-        parser.parse_string_into_model(model, '''
-@prefix dafTerm:<http://jumpgate.caltech.edu/wiki/UcscDaf#> .
+        extra = '''@prefix dafTerm:<http://jumpgate.caltech.edu/wiki/UcscDaf#> .
 
 <%(submissionLog)s/testfind/view/Signal> dafTerm:filename_re ".*\\\\.bam" .
 <%(submissionLog)s/testfind/view/FastqRd1> dafTerm:filename_re ".*_r1\\\\.fastq" .
-''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'},
-        'http://blank')
-        name = 'testfind'
-        test_stream = StringIO(test_daf)
-        daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model)
+''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'}
+
+        daf_mapper = load_daf_mapper('testfind', extra_statements = extra)
 
         view = daf_mapper.find_view('filename_r1.fastq')
         self.failUnlessEqual(str(view),
@@ -125,19 +132,13 @@ class TestDAFMapper(unittest.TestCase):
         #print turtle
 
     def test_find_overlapping_view(self):
-        model = get_model()
-
-        parser = RDF.Parser(name='turtle')
-        parser.parse_string_into_model(model, '''
-@prefix dafTerm:<http://jumpgate.caltech.edu/wiki/UcscDaf#> .
+        extra = '''@prefix dafTerm:<http://jumpgate.caltech.edu/wiki/UcscDaf#> .
 
 <%(submissionLog)s/testfind/view/fastq> dafTerm:filename_re ".*\\\\.fastq" .
 <%(submissionLog)s/testfind/view/FastqRd1> dafTerm:filename_re ".*_r1\\\\.fastq" .
-''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'},
-        'http://blank')
-        name = 'testfind'
-        test_stream = StringIO(test_daf)
-        daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model)
+''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'}
+
+        daf_mapper = load_daf_mapper('testfind', extra_statements = extra)
 
         self.failUnlessRaises(daf.ModelException,
                               daf_mapper.find_view,
@@ -146,11 +147,7 @@ class TestDAFMapper(unittest.TestCase):
     def test_find_attributes(self):
         lib_id = '11204'
         lib_url = 'http://jumpgate.caltech.edu/library/%s' %(lib_id)
-        model = get_model()
-
-        parser = RDF.Parser(name='turtle')
-        parser.parse_string_into_model(model, '''
-@prefix dafTerm: <http://jumpgate.caltech.edu/wiki/UcscDaf#> .
+        extra = '''@prefix dafTerm: <http://jumpgate.caltech.edu/wiki/UcscDaf#> .
 @prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
 
 <%(submissionLog)s/testfind/view/Signal> dafTerm:filename_re ".*\\\\.bam" .
@@ -158,11 +155,9 @@ class TestDAFMapper(unittest.TestCase):
 <%(libUrl)s> <%(libraryOntology)sgel_cut> "100"^^xsd:decimal . 
 ''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog',
        'libraryOntology': 'http://jumpgate.caltech.edu/wiki/LibraryOntology#',
-       'libUrl': lib_url},
-       'http://blank')
-        name = 'testfind'
-        test_stream = StringIO(test_daf)
-        daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model)
+       'libUrl': lib_url}
+
+        daf_mapper = load_daf_mapper('testfind', extra)
         libNode = RDF.Node(RDF.Uri(lib_url))
         daf_mapper._add_library_details_to_model(libNode)
         gel_cut = daf_mapper._get_library_attribute(libNode, 'gel_cut')
@@ -177,7 +172,17 @@ class TestDAFMapper(unittest.TestCase):
         source = daf_mapper.model.get_source(rdfNS['type'], submissionOntology['submission'])
         self.failUnlessEqual(str(source), "<http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/analysis1>")
         view = daf_mapper.model.get_target(source, submissionOntology['has_view'])
-        self.failUnlessEqual(str(view), "<http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/view/Signal>")
+        self.failUnlessEqual(str(view), "<http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/analysis1/Signal>")
+
+
+    def test_library_url(self):
+        daf_mapper = load_daf_mapper('urltest')
+
+        self.failUnlessEqual(daf_mapper.library_url,
+                             'http://jumpgate.caltech.edu/library/')
+        daf_mapper.library_url = 'http://google.com'
+        self.failUnlessEqual(daf_mapper.library_url, 'http://google.com' )
+        
 
 def suite():
     suite = unittest.makeSuite(TestDAF, 'test')
index 8afa13255d0ae5d6d775681778796357db965dc7..91414145af9f3955726b36602a6dd585979eb388 100644 (file)
@@ -54,7 +54,7 @@ def toTypedNode(value):
 def fromTypedNode(node):
     if node is None:
         return None
-    
+
     value_type = str(node.literal_value['datatype'])
     # chop off xml schema declaration
     value_type = value_type.replace(str(xsdNS[''].uri),'')