From 68da71c8242b55ee1372093dfce4b23658023003 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Tue, 21 Jun 2011 16:15:09 -0700 Subject: [PATCH] Finish new RDF based ddf construction. The previous patch hadn't implemented actually writing the ddf to a file. --- extra/ucsc_encode_submission/ucsc_gather.py | 143 ++++++++++++-------- htsworkflow/submission/daf.py | 43 ++++-- htsworkflow/submission/test/test_daf.py | 79 ++++++----- htsworkflow/util/rdfhelp.py | 2 +- 4 files changed, 160 insertions(+), 107 deletions(-) diff --git a/extra/ucsc_encode_submission/ucsc_gather.py b/extra/ucsc_encode_submission/ucsc_gather.py index 2b9ba85..3877379 100755 --- a/extra/ucsc_encode_submission/ucsc_gather.py +++ b/extra/ucsc_encode_submission/ucsc_gather.py @@ -5,7 +5,7 @@ from glob import glob import json import logging import netrc -from optparse import OptionParser +from optparse import OptionParser, OptionGroup import os from pprint import pprint, pformat import shlex @@ -27,7 +27,10 @@ from htsworkflow.util.rdfhelp import \ get_serializer, \ load_into_model, \ submissionOntology -from htsworkflow.submission.daf import DAFMapper, get_submission_uri +from htsworkflow.submission.daf import \ + DAFMapper, \ + MetadataLookupException, \ + get_submission_uri from htsworkflow.submission.condorfastq import CondorFastqExtract logger = logging.getLogger('ucsc_gather') @@ -48,10 +51,14 @@ def main(cmdline=None): model = get_model(opts.load_model) mapper = DAFMapper(opts.name, opts.daf, model) submission_uri = get_submission_uri(opts.name) + + if opts.library_url is not None: + mapper.library_url = opts.library_url + if opts.load_rdf is not None: load_into_model(model, 'turtle', opts.load_rdf, submission_uri) - if opts.makeddf and opts.daf is None: + if opts.make_ddf and opts.daf is None: parser.error("Please specify your daf when making ddf files") if len(args) == 0: @@ -64,8 +71,8 @@ def main(cmdline=None): if opts.make_tree_from is not None: make_tree_from(opts.make_tree_from, library_result_map) - #if opts.daf is not None: - # link_daf(opts.daf, library_result_map) + if opts.link_daf: + link_daf(opts.daf, library_result_map) if opts.fastq: extractor = CondorFastqExtract(opts.host, apidata, opts.sequence, @@ -75,8 +82,8 @@ def main(cmdline=None): if opts.scan_submission: scan_submission_dirs(mapper, library_result_map) - if opts.makeddf: - make_all_ddfs(mapper, library_result_map, force=opts.force) + if opts.make_ddf: + make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force) if opts.print_rdf: writer = get_serializer() @@ -86,31 +93,35 @@ def main(cmdline=None): def make_parser(): parser = OptionParser() - parser.add_option('--name', help="Set submission name") - parser.add_option('--load-model', default=None, + model = OptionGroup(parser, 'model') + model.add_option('--name', help="Set submission name") + model.add_option('--load-model', default=None, help="Load model database") - parser.add_option('--load-rdf', default=None, + model.add_option('--load-rdf', default=None, help="load rdf statements into model") - parser.add_option('--print-rdf', action="store_true", default=False, + model.add_option('--print-rdf', action="store_true", default=False, help="print ending model state") - + parser.add_option_group(model) # commands - parser.add_option('--make-tree-from', + commands = OptionGroup(parser, 'commands') + commands.add_option('--make-tree-from', help="create directories & link data files", default=None) - parser.add_option('--fastq', help="generate scripts for making fastq files", - default=False, action="store_true") - - parser.add_option('--scan-submission', default=False, action="store_true", + commands.add_option('--fastq', default=False, action="store_true", + help="generate scripts for making fastq files") + commands.add_option('--scan-submission', default=False, action="store_true", help="Import metadata for submission into our model") - - parser.add_option('--makeddf', help='make the ddfs', default=False, + commands.add_option('--link-daf', default=False, action="store_true", + help="link daf into submission directories") + commands.add_option('--make-ddf', help='make the ddfs', default=False, action="store_true") + parser.add_option_group(commands) - parser.add_option('--daf', default=None, help='specify daf name') parser.add_option('--force', default=False, action="store_true", help="Force regenerating fastqs") - + parser.add_option('--daf', default=None, help='specify daf name') + parser.add_option('--library-url', default=None, + help="specify an alternate source for library information") # debugging parser.add_option('--verbose', default=False, action="store_true", help='verbose logging') @@ -128,7 +139,7 @@ def make_tree_from(source_path, library_result_map): if not os.path.exists(lib_path): logging.info("Making dir {0}".format(lib_path)) os.mkdir(lib_path) - source_lib_dir = os.path.join(source_path, lib_path) + source_lib_dir = os.path.abspath(os.path.join(source_path, lib_path)) if os.path.exists(source_lib_dir): pass for filename in os.listdir(source_lib_dir): @@ -162,14 +173,18 @@ def scan_submission_dirs(view_map, library_result_map): """Look through our submission directories and collect needed information """ for lib_id, result_dir in library_result_map: - view_map.import_submission_dir(result_dir, lib_id) + logging.info("Importing %s from %s" % (lib_id, result_dir)) + try: + view_map.import_submission_dir(result_dir, lib_id) + except MetadataLookupException, e: + logging.error("Skipping %s: %s" % (lib_id, str(e))) -def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False): +def make_all_ddfs(view_map, library_result_map, daf_name, make_condor=True, force=False): dag_fragment = [] for lib_id, result_dir in library_result_map: submissionNode = view_map.get_submission_node(result_dir) dag_fragment.extend( - make_ddf(view_map, submissionNode, make_condor, result_dir) + make_ddf(view_map, submissionNode, daf_name, make_condor, result_dir) ) if make_condor and len(dag_fragment) > 0: @@ -183,40 +198,48 @@ def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False): f.close() -def make_ddf(view_map, submissionNode, make_condor=False, outdir=None): +def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None): """ Make ddf files, and bonus condor file """ dag_fragments = [] - curdir = os.getcwd() - if outdir is not None: - os.chdir(outdir) - output = sys.stdout name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name'])) if name is None: logging.error("Need name for %s" % (str(submissionNode))) return [] - - ddf_name = name + '.ddf' - output = sys.stdout - # output = open(ddf_name,'w') + ddf_name = name + '.ddf' + if outdir is not None: + outfile = os.path.join(outdir, ddf_name) + output = open(outfile,'w') + else: + output = sys.stdout + # filename goes first variables = ['filename'] variables.extend(view_map.get_daf_variables()) output.write('\t'.join(variables)) output.write(os.linesep) + nameTerm = dafTermOntology['name'] + submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view']) file_list = [] for viewNode in submission_views: record = [] for variable_name in variables: varNode = dafTermOntology[variable_name] - values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))] - if variable_name == 'filename': - file_list.extend(values) + values = list(view_map.model.get_targets(viewNode, varNode)) + + if variable_name == 'view': + nameNode = view_map.model.get_target(values[0], nameTerm) + values = [fromTypedNode(nameNode)] + else: + values = [ fromTypedNode(v) for v in values ] + if variable_name == 'filename': + file_list.extend(values) + if len(values) == 0: attribute = "#None#" elif len(values) == 1: @@ -231,20 +254,18 @@ def make_ddf(view_map, submissionNode, make_condor=False, outdir=None): "Examined {0}, found files: {1}".format( str(submissionNode), ", ".join(file_list))) - #file_list.append(daf_name) - #if ddf_name is not None: - # file_list.append(ddf_name) - # - #if make_condor: - # archive_condor = make_condor_archive_script(ininame, file_list) - # upload_condor = make_condor_upload_script(ininame) - # - # dag_fragments.extend( - # make_dag_fragment(ininame, archive_condor, upload_condor) - # ) - - os.chdir(curdir) + file_list.append(daf_name) + file_list.append(ddf_name) + if make_condor: + print name, file_list + archive_condor = make_condor_archive_script(name, file_list, outdir) + upload_condor = make_condor_upload_script(name, outdir) + + dag_fragments.extend( + make_dag_fragment(name, archive_condor, upload_condor) + ) + return dag_fragments @@ -267,7 +288,7 @@ def read_library_result_map(filename): return results -def make_condor_archive_script(ininame, files): +def make_condor_archive_script(name, files, outdir=None): script = """Universe = vanilla Executable = /bin/tar @@ -282,23 +303,26 @@ request_memory = 20 queue """ + if outdir is None: + outdir = os.getcwd() for f in files: - if not os.path.exists(f): + pathname = os.path.join(outdir, f) + if not os.path.exists(pathname): raise RuntimeError("Missing %s" % (f,)) - context = {'archivename': make_submission_name(ininame), + context = {'archivename': make_submission_name(name), 'filelist': " ".join(files), 'initialdir': os.getcwd(), 'user': os.getlogin()} - condor_script = make_condor_name(ininame, 'archive') + condor_script = os.path.join(outdir, make_condor_name(name, 'archive')) condor_stream = open(condor_script,'w') condor_stream.write(script % context) condor_stream.close() return condor_script -def make_condor_upload_script(ininame): +def make_condor_upload_script(name, outdir=None): script = """Universe = vanilla Executable = /usr/bin/lftp @@ -311,19 +335,22 @@ initialdir = %(initialdir)s queue """ + if outdir is None: + outdir = os.getcwd() + auth = netrc.netrc(os.path.expanduser("~diane/.netrc")) encodeftp = 'encodeftp.cse.ucsc.edu' ftpuser = auth.hosts[encodeftp][0] ftppassword = auth.hosts[encodeftp][2] - context = {'archivename': make_submission_name(ininame), - 'initialdir': os.getcwd(), + context = {'archivename': make_submission_name(name), + 'initialdir': outdir, 'user': os.getlogin(), 'ftpuser': ftpuser, 'ftppassword': ftppassword, 'ftphost': encodeftp} - condor_script = make_condor_name(ininame, 'upload') + condor_script = os.path.join(outdir, make_condor_name(name, 'upload')) condor_stream = open(condor_script,'w') condor_stream.write(script % context) condor_stream.close() diff --git a/htsworkflow/submission/daf.py b/htsworkflow/submission/daf.py index e1a9b1f..a3e3c35 100644 --- a/htsworkflow/submission/daf.py +++ b/htsworkflow/submission/daf.py @@ -25,7 +25,10 @@ logger = logging.getLogger(__name__) # class ModelException(RuntimeError): pass - +class MetadataLookupException(RuntimeError): + """Problem accessing metadata""" + pass + # STATES DAF_HEADER = 1 DAF_VIEW = 2 @@ -154,6 +157,8 @@ def _views_to_statements(name, dafNS, views): view_attributes = views[view_name] viewSubject = viewNS[view_name] statements.append(RDF.Statement(subject, dafNS['views'], viewSubject)) + statements.append( + RDF.Statement(viewSubject, dafNS['name'], toTypedNode(view_name))) for view_attribute_name in view_attributes: predicate = dafNS[view_attribute_name] obj = toTypedNode(view_attributes[view_attribute_name]) @@ -263,22 +268,30 @@ class DAFMapper(object): if str(view) == str(libraryOntology['ignore']): return None - submissionName = toTypedNode(self.make_submission_name(submission_dir)) + submission_name = self.make_submission_name(submission_dir) submissionNode = self.get_submission_node(submission_dir) + submission_uri = submissionNode.uri + print "submission:", str(submission_name), str(submissionNode), str(submission_uri) + + view_name = fromTypedNode(self.model.get_target(view, dafTermOntology['name'])) + submissionView = RDF.Node(RDF.Uri(str(submission_uri) + '/' + view_name)) + self.model.add_statement( RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode)) - fileNode = RDF.Node(RDF.Uri(str(submissionNode.uri) + '/' +filename)) - self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], view)) - self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], submissionName)) + self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView)) + self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name))) self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission'])) + + self.model.add_statement( + RDF.Statement(submissionView, dafTermOntology['filename'], toTypedNode(filename))) self.model.add_statement( - RDF.Statement(view, dafTermOntology['filename'], toTypedNode(filename))) + RDF.Statement(submissionView, dafTermOntology['view'], view)) self.model.add_statement( - RDF.Statement(view, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode)))) + RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode)))) self.model.add_statement( - RDF.Statement(view, dafTermOntology['submission'], submissionNode)) + RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode)) # extra information terms = [dafTermOntology['type'], @@ -290,7 +303,7 @@ class DAFMapper(object): for term in terms: value = self._get_library_attribute(libNode, term) if value is not None: - self.model.add_statement(RDF.Statement(view, term, value)) + self.model.add_statement(RDF.Statement(submissionView, term, value)) def _add_library_details_to_model(self, libNode): @@ -324,7 +337,7 @@ class DAFMapper(object): raise RuntimeError( "Submission dir name too short: %s" %(submission_dir,)) return submission_dir_name - + def get_submission_node(self, submission_dir): """Convert a submission directory name to a submission node """ @@ -423,4 +436,12 @@ class DAFMapper(object): elif library_type in paired: return True else: - raise RuntimeError("Unrecognized library type %s" % (library_type,)) + raise MetadataLookupException( + "Unrecognized library type %s for %s" % \ + (library_type, str(libNode))) + + def _get_library_url(self): + return str(self.libraryNS[''].uri) + def _set_library_url(self, value): + self.libraryNS = RDF.NS(str(value)) + library_url = property(_get_library_url, _set_library_url) diff --git a/htsworkflow/submission/test/test_daf.py b/htsworkflow/submission/test/test_daf.py index b71e362..a7515b0 100644 --- a/htsworkflow/submission/test/test_daf.py +++ b/htsworkflow/submission/test/test_daf.py @@ -4,6 +4,7 @@ import unittest from htsworkflow.submission import daf from htsworkflow.util.rdfhelp import \ dafTermOntology, \ + fromTypedNode, \ rdfNS, \ submissionLog, \ submissionOntology, \ @@ -65,17 +66,30 @@ class TestDAF(unittest.TestCase): daf.add_to_model(model, parsed, name) signal_view_node = RDF.Node(subNS['/view/Signal'].uri) + writer = get_serializer() turtle = writer.serialize_model_to_string(model) - #print turtle - self.failUnless(str(signal_view_node) in turtle) statements = list(model.find_statements( RDF.Statement( signal_view_node, None, None))) - self.failUnlessEqual(len(statements), 5) - + self.failUnlessEqual(len(statements), 6) + name = model.get_target(signal_view_node, dafTermOntology['name']) + self.failUnlessEqual(fromTypedNode(name), u'Signal') + +def load_daf_mapper(name, extra_statements=None): + """Load test model in + """ + model = get_model() + if extra_statements is not None: + parser = RDF.Parser(name='turtle') + parser.parse_string_into_model(model, extra_statements, + 'http://extra.extra') + + test_daf_stream = StringIO(test_daf) + mapper = daf.DAFMapper(name, daf_file = test_daf_stream, model=model) + return mapper def dump_model(model): writer = get_serializer() @@ -85,8 +99,7 @@ def dump_model(model): class TestDAFMapper(unittest.TestCase): def test_create_mapper_add_pattern(self): name = 'testsub' - test_daf_stream = StringIO(test_daf) - mapper = daf.DAFMapper(name, daf_file=test_daf_stream) + mapper = load_daf_mapper(name) pattern = '.bam\Z(?ms)' mapper.add_pattern('Signal', pattern) @@ -102,19 +115,13 @@ class TestDAFMapper(unittest.TestCase): #self.failUnlessEqual(search[0].object.literal_value['string'], pattern) def test_find_one_view(self): - model = get_model() - - parser = RDF.Parser(name='turtle') - parser.parse_string_into_model(model, ''' -@prefix dafTerm: . + extra = '''@prefix dafTerm: . <%(submissionLog)s/testfind/view/Signal> dafTerm:filename_re ".*\\\\.bam" . <%(submissionLog)s/testfind/view/FastqRd1> dafTerm:filename_re ".*_r1\\\\.fastq" . -''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'}, - 'http://blank') - name = 'testfind' - test_stream = StringIO(test_daf) - daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model) +''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'} + + daf_mapper = load_daf_mapper('testfind', extra_statements = extra) view = daf_mapper.find_view('filename_r1.fastq') self.failUnlessEqual(str(view), @@ -125,19 +132,13 @@ class TestDAFMapper(unittest.TestCase): #print turtle def test_find_overlapping_view(self): - model = get_model() - - parser = RDF.Parser(name='turtle') - parser.parse_string_into_model(model, ''' -@prefix dafTerm: . + extra = '''@prefix dafTerm: . <%(submissionLog)s/testfind/view/fastq> dafTerm:filename_re ".*\\\\.fastq" . <%(submissionLog)s/testfind/view/FastqRd1> dafTerm:filename_re ".*_r1\\\\.fastq" . -''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'}, - 'http://blank') - name = 'testfind' - test_stream = StringIO(test_daf) - daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model) +''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'} + + daf_mapper = load_daf_mapper('testfind', extra_statements = extra) self.failUnlessRaises(daf.ModelException, daf_mapper.find_view, @@ -146,11 +147,7 @@ class TestDAFMapper(unittest.TestCase): def test_find_attributes(self): lib_id = '11204' lib_url = 'http://jumpgate.caltech.edu/library/%s' %(lib_id) - model = get_model() - - parser = RDF.Parser(name='turtle') - parser.parse_string_into_model(model, ''' -@prefix dafTerm: . + extra = '''@prefix dafTerm: . @prefix xsd: . <%(submissionLog)s/testfind/view/Signal> dafTerm:filename_re ".*\\\\.bam" . @@ -158,11 +155,9 @@ class TestDAFMapper(unittest.TestCase): <%(libUrl)s> <%(libraryOntology)sgel_cut> "100"^^xsd:decimal . ''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog', 'libraryOntology': 'http://jumpgate.caltech.edu/wiki/LibraryOntology#', - 'libUrl': lib_url}, - 'http://blank') - name = 'testfind' - test_stream = StringIO(test_daf) - daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model) + 'libUrl': lib_url} + + daf_mapper = load_daf_mapper('testfind', extra) libNode = RDF.Node(RDF.Uri(lib_url)) daf_mapper._add_library_details_to_model(libNode) gel_cut = daf_mapper._get_library_attribute(libNode, 'gel_cut') @@ -177,7 +172,17 @@ class TestDAFMapper(unittest.TestCase): source = daf_mapper.model.get_source(rdfNS['type'], submissionOntology['submission']) self.failUnlessEqual(str(source), "") view = daf_mapper.model.get_target(source, submissionOntology['has_view']) - self.failUnlessEqual(str(view), "") + self.failUnlessEqual(str(view), "") + + + def test_library_url(self): + daf_mapper = load_daf_mapper('urltest') + + self.failUnlessEqual(daf_mapper.library_url, + 'http://jumpgate.caltech.edu/library/') + daf_mapper.library_url = 'http://google.com' + self.failUnlessEqual(daf_mapper.library_url, 'http://google.com' ) + def suite(): suite = unittest.makeSuite(TestDAF, 'test') diff --git a/htsworkflow/util/rdfhelp.py b/htsworkflow/util/rdfhelp.py index 8afa132..9141414 100644 --- a/htsworkflow/util/rdfhelp.py +++ b/htsworkflow/util/rdfhelp.py @@ -54,7 +54,7 @@ def toTypedNode(value): def fromTypedNode(node): if node is None: return None - + value_type = str(node.literal_value['datatype']) # chop off xml schema declaration value_type = value_type.replace(str(xsdNS[''].uri),'') -- 2.30.2