From: Diane Trout Date: Wed, 29 Jun 2011 06:11:04 +0000 (-0700) Subject: Further clean up ddf generation. X-Git-Tag: 0.5.2~8 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=24a0824ccb5924c8dfa8153b8b05efd233fa0b3f Further clean up ddf generation. This still needs work as I ended up hard coding a sparql query to support the submission I'm currently working on -- which is unfortunate as the whole point of the push to RDF was to reduce hard coding. However it did simplify collecting information for make_ddf. Using the query would also mean that the term copying I was doing earlier, moving library attributes into each specific submission view would be unecessary, since I can now easily query the graphs. Probably what I need to do after the submission is to reduce the term copying when importing a submisison directory, and add some way of tying a sparql query to a specific imported daf. Though I need to deal with the upcoming submission deadlines first. --- diff --git a/extra/ucsc_encode_submission/encode_find.py b/extra/ucsc_encode_submission/encode_find.py index 25cacb7..2a1890a 100644 --- a/extra/ucsc_encode_submission/encode_find.py +++ b/extra/ucsc_encode_submission/encode_find.py @@ -21,6 +21,7 @@ from htsworkflow.util.rdfhelp import \ dublinCoreNS, \ get_model, \ get_serializer, \ + sparql_query, \ submitOntology, \ libraryOntology, \ load_into_model, \ @@ -353,18 +354,6 @@ def get_date_contents(element): else: return None -def sparql_query(model, query_filename): - """Execute sparql query from file - """ - query_body = open(query_filename,'r').read() - query = RDF.SPARQLQuery(query_body) - results = query.execute(model) - for row in results: - output = [] - for k,v in row.items()[::-1]: - print "{0}: {1}".format(k,v) - print - def load_into_model(model, parser_name, filename): if not os.path.exists(filename): diff --git a/extra/ucsc_encode_submission/ucsc_gather.py b/extra/ucsc_encode_submission/ucsc_gather.py index ddd1fad..d52b11b 100755 --- a/extra/ucsc_encode_submission/ucsc_gather.py +++ b/extra/ucsc_encode_submission/ucsc_gather.py @@ -11,7 +11,6 @@ from pprint import pprint, pformat import shlex from StringIO import StringIO import stat -from subprocess import Popen, PIPE import sys import time import types @@ -19,6 +18,8 @@ import urllib import urllib2 import urlparse +import RDF + from htsworkflow.util import api from htsworkflow.util.rdfhelp import \ dafTermOntology, \ @@ -26,6 +27,7 @@ from htsworkflow.util.rdfhelp import \ get_model, \ get_serializer, \ load_into_model, \ + sparql_query, \ submissionOntology from htsworkflow.submission.daf import \ DAFMapper, \ @@ -62,9 +64,6 @@ def main(cmdline=None): if opts.make_ddf and opts.daf is None: parser.error("Please specify your daf when making ddf files") - if len(args) == 0: - parser.error("I need at least one library submission-dir input file") - library_result_map = [] for a in args: library_result_map.extend(read_library_result_map(a)) @@ -86,6 +85,9 @@ def main(cmdline=None): if opts.make_ddf: make_all_ddfs(mapper, library_result_map, opts.daf, force=opts.force) + if opts.sparql: + sparql_query(model, opts.sparql) + if opts.print_rdf: writer = get_serializer() print writer.serialize_model_to_string(model) @@ -100,6 +102,7 @@ def make_parser(): help="Load model database") model.add_option('--load-rdf', default=None, help="load rdf statements into model") + model.add_option('--sparql', default=None, help="execute sparql query") model.add_option('--print-rdf', action="store_true", default=False, help="print ending model state") parser.add_option_group(model) @@ -203,6 +206,31 @@ def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None) """ Make ddf files, and bonus condor file """ + query_template = """PREFIX libraryOntology: +PREFIX submissionOntology: +PREFIX ucscDaf: + +select ?submitView ?filename ?md5sum ?view ?cell ?antibody ?sex ?control ?controlId ?labExpId ?labVersion ?treatment ?protocol +WHERE { + ?file ucscDaf:filename ?filename ; + ucscDaf:md5sum ?md5sum . + ?submitView ucscDaf:has_file ?file ; + ucscDaf:view ?dafView ; + ucscDaf:submission %(submission)s . + ?dafView ucscDaf:name ?view . + %(submission)s submissionOntology:library ?library . + + OPTIONAL { ?submitView ucscDaf:antibody ?antibody } + OPTIONAL { ?submitView ucscDaf:cell ?cell } + OPTIONAL { ?submitView ucscDaf:control ?control } + OPTIONAL { ?library ucscDaf:controlId ?controlId } + OPTIONAL { ?submitView ucscDaf:sex ?sex } + OPTIONAL { ?submitView ucscDaf:labVersion ?labExpId } + OPTIONAL { ?submitView ucscDaf:labVersion ?labVersion } + OPTIONAL { ?library ucscDaf:treatment ?treatment } + OPTIONAL { ?submitView ucscDaf:protocol ?protocol } +} +ORDER BY ?submitView""" dag_fragments = [] name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name'])) @@ -216,51 +244,51 @@ def make_ddf(view_map, submissionNode, daf_name, make_condor=False, outdir=None) output = open(outfile,'w') else: output = sys.stdout - - # filename goes first + + formatted_query = query_template % {'submission': str(submissionNode)} + + query = RDF.SPARQLQuery(formatted_query) + results = query.execute(view_map.model) + variables = ['filename'] + # filename goes first variables.extend(view_map.get_daf_variables()) + variables += ['controlId', 'labExpId', 'md5sum'] output.write('\t'.join(variables)) output.write(os.linesep) - nameTerm = dafTermOntology['name'] - - submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view']) - file_list = [] - for viewNode in submission_views: - record = [] + all_views = {} + all_files = [] + for row in results: + viewname = fromTypedNode(row['view']) + current = all_views.setdefault(viewname, {}) for variable_name in variables: - varNode = dafTermOntology[variable_name] - values = list(view_map.model.get_targets(viewNode, varNode)) - - if variable_name == 'view': - nameNode = view_map.model.get_target(values[0], nameTerm) - values = [fromTypedNode(nameNode)] + value = str(fromTypedNode(row[variable_name])) + if variable_name in ('filename', 'md5sum'): + current.setdefault(variable_name,[]).append(value) else: - values = [ fromTypedNode(v) for v in values ] - if variable_name == 'filename': - file_list.extend(values) - - if len(values) == 0: - attribute = "#None#" - elif len(values) == 1: - attribute = values[0] + current[variable_name] = value + + for view in all_views.keys(): + line = [] + for variable_name in variables: + if variable_name in ('filename', 'md5sum'): + line.append(','.join(all_views[view][variable_name])) else: - attribute = ",".join(values) - record.append(attribute) - output.write('\t'.join(record)) + line.append(all_views[view][variable_name]) + output.write("\t".join(line)) output.write(os.linesep) - + all_files.extend(all_views[view]['filename']) + logging.info( "Examined {0}, found files: {1}".format( - str(submissionNode), ", ".join(file_list))) + str(submissionNode), ", ".join(all_files))) + + all_files.append(daf_name) + all_files.append(ddf_name) - file_list.append(daf_name) - file_list.append(ddf_name) - if make_condor: - print name, file_list - archive_condor = make_condor_archive_script(name, file_list, outdir) + archive_condor = make_condor_archive_script(name, all_files, outdir) upload_condor = make_condor_upload_script(name, outdir) dag_fragments.extend( @@ -295,7 +323,7 @@ def make_condor_archive_script(name, files, outdir=None): Executable = /bin/tar arguments = czvhf ../%(archivename)s %(filelist)s -Error = compress.err.$(Process).log +Error = compress.out.$(Process).log Output = compress.out.$(Process).log Log = /tmp/submission-compress-%(user)s.log initialdir = %(initialdir)s @@ -313,7 +341,7 @@ queue context = {'archivename': make_submission_name(name), 'filelist': " ".join(files), - 'initialdir': os.getcwd(), + 'initialdir': os.path.abspath(outdir), 'user': os.getlogin()} condor_script = os.path.join(outdir, make_condor_name(name, 'archive')) @@ -329,7 +357,7 @@ def make_condor_upload_script(name, outdir=None): Executable = /usr/bin/lftp arguments = -c put ../%(archivename)s -o ftp://%(ftpuser)s:%(ftppassword)s@%(ftphost)s/%(archivename)s -Error = upload.err.$(Process).log +Error = upload.out.$(Process).log Output = upload.out.$(Process).log Log = /tmp/submission-upload-%(user)s.log initialdir = %(initialdir)s @@ -345,7 +373,7 @@ queue ftpuser = auth.hosts[encodeftp][0] ftppassword = auth.hosts[encodeftp][2] context = {'archivename': make_submission_name(name), - 'initialdir': outdir, + 'initialdir': os.path.abspath(outdir), 'user': os.getlogin(), 'ftpuser': ftpuser, 'ftppassword': ftppassword, @@ -432,46 +460,5 @@ def validate_filelist(files): if not os.path.exists(f): raise RuntimeError("%s does not exist" % (f,)) -def make_md5sum(filename): - """Quickly find the md5sum of a file - """ - md5_cache = os.path.join(filename+".md5") - print md5_cache - if os.path.exists(md5_cache): - logging.debug("Found md5sum in {0}".format(md5_cache)) - stream = open(md5_cache,'r') - lines = stream.readlines() - md5sum = parse_md5sum_line(lines, filename) - else: - md5sum = make_md5sum_unix(filename, md5_cache) - return md5sum - -def make_md5sum_unix(filename, md5_cache): - cmd = ["md5sum", filename] - logging.debug("Running {0}".format(" ".join(cmd))) - p = Popen(cmd, stdout=PIPE) - stdin, stdout = p.communicate() - retcode = p.wait() - logging.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode)) - if retcode != 0: - logging.error("Trouble with md5sum for {0}".format(filename)) - return None - lines = stdin.split(os.linesep) - md5sum = parse_md5sum_line(lines, filename) - if md5sum is not None: - logging.debug("Caching sum in {0}".format(md5_cache)) - stream = open(md5_cache, "w") - stream.write(stdin) - stream.close() - return md5sum - -def parse_md5sum_line(lines, filename): - md5sum, md5sum_filename = lines[0].split() - if md5sum_filename != filename: - errmsg = "MD5sum and I disagre about filename. {0} != {1}" - logging.error(errmsg.format(filename, md5sum_filename)) - return None - return md5sum - if __name__ == "__main__": main() diff --git a/htsworkflow/submission/daf.py b/htsworkflow/submission/daf.py index 9429a38..28f7599 100644 --- a/htsworkflow/submission/daf.py +++ b/htsworkflow/submission/daf.py @@ -20,6 +20,7 @@ from htsworkflow.util.rdfhelp import \ submissionOntology, \ toTypedNode, \ fromTypedNode +from htsworkflow.util.hashfile import make_md5sum logger = logging.getLogger(__name__) @@ -239,18 +240,6 @@ class DAFMapper(object): for f in submission_files: self.construct_file_attributes(submission_dir, libNode, f) - #attributes['md5sum'] = "None" - # - #ext = attributes["filename_re"] - #if attributes.get("type", None) == 'fastq': - # fastqs.setdefault(ext, set()).add(f) - # fastq_attributes[ext] = attributes - #else: - # md5sum = make_md5sum(os.path.join(result_dir,f)) - # if md5sum is not None: - # attributes['md5sum']=md5sum - #print attributes - def construct_file_attributes(self, submission_dir, libNode, pathname): """Looking for the best extension @@ -280,17 +269,16 @@ class DAFMapper(object): self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView)) self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name))) self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission'])) - - - self.model.add_statement( - RDF.Statement(submissionView, dafTermOntology['filename'], toTypedNode(filename))) + self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['library'], libNode)) + + # add trac specific information self.model.add_statement( RDF.Statement(submissionView, dafTermOntology['view'], view)) self.model.add_statement( RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode)))) self.model.add_statement( RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode)) - + # extra information terms = [dafTermOntology['type'], dafTermOntology['filename_re'], @@ -303,6 +291,21 @@ class DAFMapper(object): if value is not None: self.model.add_statement(RDF.Statement(submissionView, term, value)) + # add file specific information + fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename)) + submission_pathname = os.path.join(submission_dir, filename) + md5 = make_md5sum(submission_pathname) + self.model.add_statement( + RDF.Statement(submissionView, dafTermOntology['has_file'], fileNode)) + self.model.add_statement( + RDF.Statement(fileNode, dafTermOntology['filename'], filename)) + + if md5 is None: + logging.warning("Unable to produce md5sum for %s" % ( submission_pathname)) + else: + self.model.add_statement( + RDF.Statement(fileNode, dafTermOntology['md5sum'], md5)) + def _add_library_details_to_model(self, libNode): parser = RDF.Parser(name='rdfa') diff --git a/htsworkflow/submission/test/test_daf.py b/htsworkflow/submission/test/test_daf.py index 5da4dc2..5d647f6 100644 --- a/htsworkflow/submission/test/test_daf.py +++ b/htsworkflow/submission/test/test_daf.py @@ -1,4 +1,8 @@ +from contextlib import contextmanager +import os from StringIO import StringIO +import shutil +import tempfile import unittest from htsworkflow.submission import daf @@ -168,12 +172,24 @@ class TestDAFMapper(unittest.TestCase): species = daf_mapper._get_library_attribute(libNode, 'species') self.failUnlessEqual(species, "Homo sapiens") - - daf_mapper.construct_file_attributes('/tmp/analysis1', libNode, 'filename.bam') + + with mktempdir('analysis') as analysis_dir: + path, analysis_name = os.path.split(analysis_dir) + with mktempfile('.bam', dir=analysis_dir) as filename: + print 'dir', os.listdir(analysis_dir) + daf_mapper.construct_file_attributes(analysis_dir, + libNode, + filename) + + sub_root = "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/" + submission_name = sub_root + analysis_name source = daf_mapper.model.get_source(rdfNS['type'], submissionOntology['submission']) - self.failUnlessEqual(str(source.uri), "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/analysis1") + + self.failUnlessEqual(str(source.uri), submission_name) + + view_name = submission_name + '/Signal' view = daf_mapper.model.get_target(source, submissionOntology['has_view']) - self.failUnlessEqual(str(view.uri), "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/analysis1/Signal") + self.failUnlessEqual(str(view.uri), view_name) def test_library_url(self): daf_mapper = load_daf_mapper('urltest') @@ -182,7 +198,24 @@ class TestDAFMapper(unittest.TestCase): 'http://jumpgate.caltech.edu/library/') daf_mapper.library_url = 'http://google.com' self.failUnlessEqual(daf_mapper.library_url, 'http://google.com' ) - + +@contextmanager +def mktempdir(prefix='tmp'): + d = tempfile.mkdtemp(prefix=prefix) + print "made", d + yield d + shutil.rmtree(d) + print "unmade", d + +@contextmanager +def mktempfile(suffix='', prefix='tmp', dir=None): + fd, pathname = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir) + yield pathname + print "made", pathname + os.close(fd) + os.unlink(pathname) + print "unmade", pathname + def suite(): suite = unittest.makeSuite(TestDAF, 'test') suite.addTest(unittest.makeSuite(TestDAFMapper, 'test')) diff --git a/htsworkflow/util/hashfile.py b/htsworkflow/util/hashfile.py new file mode 100644 index 0000000..f246410 --- /dev/null +++ b/htsworkflow/util/hashfile.py @@ -0,0 +1,49 @@ +"""Utility to make md5sums of a file caching as a parallel file +""" +import logging +import os +from subprocess import Popen, PIPE + +logger = logging.getLogger(__name__) + +def make_md5sum(filename): + """Quickly find the md5sum of a file + """ + md5_cache = os.path.join(filename+".md5") + print md5_cache + if os.path.exists(md5_cache): + logger.debug("Found md5sum in {0}".format(md5_cache)) + stream = open(md5_cache,'r') + lines = stream.readlines() + md5sum = parse_md5sum_line(lines, filename) + else: + md5sum = make_md5sum_unix(filename, md5_cache) + return md5sum + +def make_md5sum_unix(filename, md5_cache): + cmd = ["md5sum", filename] + logger.debug("Running {0}".format(" ".join(cmd))) + p = Popen(cmd, stdout=PIPE) + stdin, stdout = p.communicate() + retcode = p.wait() + logger.debug("Finished {0} retcode {1}".format(" ".join(cmd), retcode)) + if retcode != 0: + logger.error("Trouble with md5sum for {0}".format(filename)) + return None + lines = stdin.split(os.linesep) + md5sum = parse_md5sum_line(lines, filename) + if md5sum is not None: + logger.debug("Caching sum in {0}".format(md5_cache)) + stream = open(md5_cache, "w") + stream.write(stdin) + stream.close() + return md5sum + +def parse_md5sum_line(lines, filename): + md5sum, md5sum_filename = lines[0].split() + if md5sum_filename != filename: + errmsg = "MD5sum and I disagre about filename. {0} != {1}" + logger.error(errmsg.format(filename, md5sum_filename)) + return None + return md5sum + diff --git a/htsworkflow/util/rdfhelp.py b/htsworkflow/util/rdfhelp.py index 9141414..7b78703 100644 --- a/htsworkflow/util/rdfhelp.py +++ b/htsworkflow/util/rdfhelp.py @@ -18,6 +18,19 @@ dafTermOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UcscDaf#") libraryOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#") submissionLog = RDF.NS("http://jumpgate.caltech.edu/wiki/SubmissionsLog/") +def sparql_query(model, query_filename): + """Execute sparql query from file + """ + query_body = open(query_filename,'r').read() + query = RDF.SPARQLQuery(query_body) + results = query.execute(model) + for row in results: + output = [] + for k,v in row.items()[::-1]: + print "{0}: {1}".format(k,v) + print + + def blankOrUri(value=None): node = None if value is None: