From dd850bd4f1cfd68a30fcb1b02170c44cc8d25062 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Sat, 18 Jun 2011 14:03:36 -0700 Subject: [PATCH] Rework ucsc gather to use RDF models for gathering and storing track metadata. I dumped all of my rdf namespace definitions into rdfhelp.py to comply with don't-repeat-yourself. The major step forward for this is I can now specify additional attributes for a submission that aren't found in the htsworkflow database. Also the daf model attribute lookup function implements a brute force owl:sameAs search. (If it can't find the term you asked for, it'll search for sameAs terms and look for those). The TestDAFMapper unit test is a bit slow as it needs to check the htsworkflow server for one of the test cases. I should come up with a way to preload the required information into the testcase. --- extra/ucsc_encode_submission/encode_find.py | 24 +- extra/ucsc_encode_submission/ucsc_gather.py | 423 +++++--------------- htsworkflow/submission/daf.py | 373 +++++++++++++++-- htsworkflow/submission/test/test_daf.py | 163 +++++++- htsworkflow/util/rdfhelp.py | 82 +++- 5 files changed, 661 insertions(+), 404 deletions(-) diff --git a/extra/ucsc_encode_submission/encode_find.py b/extra/ucsc_encode_submission/encode_find.py index cd52c22..25cacb7 100644 --- a/extra/ucsc_encode_submission/encode_find.py +++ b/extra/ucsc_encode_submission/encode_find.py @@ -14,12 +14,16 @@ import re import RDF import sys import urllib +import urlparse from htsworkflow.util import api from htsworkflow.util.rdfhelp import \ dublinCoreNS, \ + get_model, \ + get_serializer, \ submitOntology, \ libraryOntology, \ + load_into_model, \ rdfNS, \ rdfsNS, \ xsdNS @@ -29,7 +33,8 @@ libraryNS = RDF.NS("http://jumpgate.caltech.edu/library/") from htsworkflow.submission.ucsc import submission_view_url, UCSCEncodePipeline -ddfNS = RDF.NS(RDF.Uri(UCSCEncodePipeline + "/download_ddf#")) +download_ddf = urlparse.urljoin(UCSCEncodePipeline, "download_ddf#", allow_fragments=True) +ddfNS = RDF.NS(download_ddf) DBDIR = os.path.expanduser("~diane/proj/submission") @@ -52,10 +57,11 @@ def main(cmdline=None): htswapi = api.HtswApi(opts.host, htsw_authdata) cookie = None - model = get_model(opts.load_model) + model = get_model(opts.load_model, DBDIR) if opts.load_rdf is not None: - load_into_model(model, opts.rdf_parser_name, opts.load_rdf) + ns_uri = submitOntology[''].uri + load_into_model(model, opts.rdf_parser_name, opts.load_rdf, ns_uri) if opts.update: cookie = login(cookie=cookie) @@ -69,7 +75,7 @@ def main(cmdline=None): missing = find_submissions_with_no_library(model) if opts.print_rdf: - serializer = RDF.Serializer(name=opts.rdf_parser_name) + serializer = get_serializer(name=opts.rdf_parser_name) print serializer.serialize_model_to_string(model) @@ -110,15 +116,6 @@ def make_parser(): return parser -def get_model(model_name=None): - if model_name is None: - storage = RDF.MemoryStorage() - else: - storage = RDF.HashStorage(model_name, - options="hash-type='bdb',dir='{0}'".format(DBDIR)) - model = RDF.Model(storage) - return model - def load_my_submissions(model, cookie=None): if cookie is None: cookie = login() @@ -375,7 +372,6 @@ def load_into_model(model, parser_name, filename): data = open(filename, 'r').read() rdf_parser = RDF.Parser(name=parser_name) - ns_uri = submitOntology[''].uri rdf_parser.parse_string_into_model(model, data, ns_uri) def add_stmt(model, subject, predicate, object): diff --git a/extra/ucsc_encode_submission/ucsc_gather.py b/extra/ucsc_encode_submission/ucsc_gather.py index c65feec..2b9ba85 100755 --- a/extra/ucsc_encode_submission/ucsc_gather.py +++ b/extra/ucsc_encode_submission/ucsc_gather.py @@ -20,8 +20,18 @@ import urllib2 import urlparse from htsworkflow.util import api +from htsworkflow.util.rdfhelp import \ + dafTermOntology, \ + fromTypedNode, \ + get_model, \ + get_serializer, \ + load_into_model, \ + submissionOntology +from htsworkflow.submission.daf import DAFMapper, get_submission_uri from htsworkflow.submission.condorfastq import CondorFastqExtract +logger = logging.getLogger('ucsc_gather') + def main(cmdline=None): parser = make_parser() opts, args = parser.parse_args(cmdline) @@ -35,6 +45,12 @@ def main(cmdline=None): apidata = api.make_auth_from_opts(opts, parser) + model = get_model(opts.load_model) + mapper = DAFMapper(opts.name, opts.daf, model) + submission_uri = get_submission_uri(opts.name) + if opts.load_rdf is not None: + load_into_model(model, 'turtle', opts.load_rdf, submission_uri) + if opts.makeddf and opts.daf is None: parser.error("Please specify your daf when making ddf files") @@ -48,24 +64,36 @@ def main(cmdline=None): if opts.make_tree_from is not None: make_tree_from(opts.make_tree_from, library_result_map) - if opts.daf is not None: - link_daf(opts.daf, library_result_map) + #if opts.daf is not None: + # link_daf(opts.daf, library_result_map) if opts.fastq: extractor = CondorFastqExtract(opts.host, apidata, opts.sequence, force=opts.force) extractor.build_fastqs(library_result_map) - if opts.ini: - make_submission_ini(opts.host, apidata, library_result_map) + if opts.scan_submission: + scan_submission_dirs(mapper, library_result_map) if opts.makeddf: - make_all_ddfs(library_result_map, opts.daf, force=opts.force) + make_all_ddfs(mapper, library_result_map, force=opts.force) + if opts.print_rdf: + writer = get_serializer() + print writer.serialize_model_to_string(model) + def make_parser(): parser = OptionParser() + parser.add_option('--name', help="Set submission name") + parser.add_option('--load-model', default=None, + help="Load model database") + parser.add_option('--load-rdf', default=None, + help="load rdf statements into model") + parser.add_option('--print-rdf', action="store_true", default=False, + help="print ending model state") + # commands parser.add_option('--make-tree-from', help="create directories & link data files", @@ -73,9 +101,9 @@ def make_parser(): parser.add_option('--fastq', help="generate scripts for making fastq files", default=False, action="store_true") - parser.add_option('--ini', help="generate submission ini file", default=False, - action="store_true") - + parser.add_option('--scan-submission', default=False, action="store_true", + help="Import metadata for submission into our model") + parser.add_option('--makeddf', help='make the ddfs', default=False, action="store_true") @@ -93,7 +121,6 @@ def make_parser(): return parser - def make_tree_from(source_path, library_result_map): """Create a tree using data files from source path. """ @@ -131,73 +158,19 @@ def link_daf(daf_path, library_result_map): os.link(daf_path, submission_daf) -def make_submission_ini(host, apidata, library_result_map, paired=True): - #attributes = get_filename_attribute_map(paired) - view_map = NameToViewMap(host, apidata) - - candidate_fastq_src = {} - +def scan_submission_dirs(view_map, library_result_map): + """Look through our submission directories and collect needed information + """ for lib_id, result_dir in library_result_map: - order_by = ['order_by=files', 'view', 'replicate', 'cell', - 'readType', 'mapAlgorithm', 'insertLength', 'md5sum' ] - inifile = ['[config]'] - inifile += [" ".join(order_by)] - inifile += [''] - line_counter = 1 - result_ini = os.path.join(result_dir, result_dir+'.ini') - - # write other lines - submission_files = os.listdir(result_dir) - fastqs = {} - fastq_attributes = {} - for f in submission_files: - attributes = view_map.find_attributes(f, lib_id) - if attributes is None: - raise ValueError("Unrecognized file: %s" % (f,)) - attributes['md5sum'] = "None" - - ext = attributes["extension"] - if attributes['view'] is None: - continue - elif attributes.get("type", None) == 'fastq': - fastqs.setdefault(ext, set()).add(f) - fastq_attributes[ext] = attributes - else: - md5sum = make_md5sum(os.path.join(result_dir,f)) - if md5sum is not None: - attributes['md5sum']=md5sum - inifile.extend( - make_submission_section(line_counter, - [f], - attributes - ) - ) - inifile += [''] - line_counter += 1 - # add in fastqs on a single line. - - for extension, fastq_files in fastqs.items(): - inifile.extend( - make_submission_section(line_counter, - fastq_files, - fastq_attributes[extension]) - ) - inifile += [''] - line_counter += 1 - - f = open(result_ini,'w') - f.write(os.linesep.join(inifile)) - + view_map.import_submission_dir(result_dir, lib_id) -def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False): +def make_all_ddfs(view_map, library_result_map, make_condor=True, force=False): dag_fragment = [] for lib_id, result_dir in library_result_map: - ininame = result_dir+'.ini' - inipathname = os.path.join(result_dir, ininame) - if os.path.exists(inipathname): - dag_fragment.extend( - make_ddf(ininame, daf_name, True, make_condor, result_dir) - ) + submissionNode = view_map.get_submission_node(result_dir) + dag_fragment.extend( + make_ddf(view_map, submissionNode, make_condor, result_dir) + ) if make_condor and len(dag_fragment) > 0: dag_filename = 'submission.dagman' @@ -210,7 +183,7 @@ def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False): f.close() -def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None): +def make_ddf(view_map, submissionNode, make_condor=False, outdir=None): """ Make ddf files, and bonus condor file """ @@ -219,64 +192,62 @@ def make_ddf(ininame, daf_name, guess_ddf=False, make_condor=False, outdir=None if outdir is not None: os.chdir(outdir) output = sys.stdout - ddf_name = None - if guess_ddf: - ddf_name = make_ddf_name(ininame) - print ddf_name - output = open(ddf_name,'w') - - file_list = read_ddf_ini(ininame, output) - logging.info( - "Read config {0}, found files: {1}".format( - ininame, ", ".join(file_list))) - file_list.append(daf_name) - if ddf_name is not None: - file_list.append(ddf_name) + name = fromTypedNode(view_map.model.get_target(submissionNode, submissionOntology['name'])) + if name is None: + logging.error("Need name for %s" % (str(submissionNode))) + return [] + + ddf_name = name + '.ddf' + output = sys.stdout + # output = open(ddf_name,'w') - if make_condor: - archive_condor = make_condor_archive_script(ininame, file_list) - upload_condor = make_condor_upload_script(ininame) - - dag_fragments.extend( - make_dag_fragment(ininame, archive_condor, upload_condor) - ) + # filename goes first + variables = ['filename'] + variables.extend(view_map.get_daf_variables()) + output.write('\t'.join(variables)) + output.write(os.linesep) + + submission_views = view_map.model.get_targets(submissionNode, submissionOntology['has_view']) + file_list = [] + for viewNode in submission_views: + record = [] + for variable_name in variables: + varNode = dafTermOntology[variable_name] + values = [fromTypedNode(v) for v in list(view_map.model.get_targets(viewNode, varNode))] + if variable_name == 'filename': + file_list.extend(values) + if len(values) == 0: + attribute = "#None#" + elif len(values) == 1: + attribute = values[0] + else: + attribute = ",".join(values) + record.append(attribute) + output.write('\t'.join(record)) + output.write(os.linesep) + + logging.info( + "Examined {0}, found files: {1}".format( + str(submissionNode), ", ".join(file_list))) + + #file_list.append(daf_name) + #if ddf_name is not None: + # file_list.append(ddf_name) + # + #if make_condor: + # archive_condor = make_condor_archive_script(ininame, file_list) + # upload_condor = make_condor_upload_script(ininame) + # + # dag_fragments.extend( + # make_dag_fragment(ininame, archive_condor, upload_condor) + # ) os.chdir(curdir) return dag_fragments -def read_ddf_ini(filename, output=sys.stdout): - """ - Read a ini file and dump out a tab delmited text file - """ - file_list = [] - config = SafeConfigParser() - config.read(filename) - - order_by = shlex.split(config.get("config", "order_by")) - - output.write("\t".join(order_by)) - output.write(os.linesep) - sections = config.sections() - sections.sort() - for section in sections: - if section == "config": - # skip the config block - continue - values = [] - for key in order_by: - v = config.get(section, key) - values.append(v) - if key == 'files': - file_list.extend(parse_filelist(v)) - - output.write("\t".join(values)) - output.write(os.linesep) - return file_list - - def read_library_result_map(filename): """ Read a file that maps library id to result directory. @@ -384,212 +355,6 @@ def get_library_info(host, apidata, library_id): return contents -class NameToViewMap(object): - """Determine view attributes for a given submission file name - """ - def __init__(self, root_url, apidata): - self.root_url = root_url - self.apidata = apidata - - self.lib_cache = {} - self.lib_paired = {} - # ma is "map algorithm" - ma = 'TH1014' - - self.patterns = [ - # for 2011 Feb 18 elements submission - ('final_Cufflinks_genes_*gtf', 'GeneDeNovo'), - ('final_Cufflinks_transcripts_*gtf', 'TranscriptDeNovo'), - ('final_exonFPKM-Cufflinks-0.9.3-GENCODE-v3c-*.gtf', - 'ExonsGencV3c'), - ('final_GENCODE-v3-Cufflinks-0.9.3.genes-*gtf', - 'GeneGencV3c'), - ('final_GENCODE-v3-Cufflinks-0.9.3.transcripts-*gtf', - 'TranscriptGencV3c'), - ('final_TSS-Cufflinks-0.9.3-GENCODE-v3c-*.gtf', 'TSS'), - ('final_junctions-*.bed6+3', 'Junctions'), - - ('*.bai', None), - ('*.splices.bam', 'Splices'), - ('*.bam', self._guess_bam_view), - ('junctions.bed', 'Junctions'), - ('*.jnct', 'Junctions'), - ('*unique.bigwig', None), - ('*plus.bigwig', 'PlusSignal'), - ('*minus.bigwig', 'MinusSignal'), - ('*.bigwig', 'Signal'), - ('*.tar.bz2', None), - ('*.condor', None), - ('*.daf', None), - ('*.ddf', None), - - ('*ufflinks?0.9.3.genes.gtf', 'GeneDeNovo'), - ('*ufflinks?0.9.3.transcripts.gtf', 'TranscriptDeNovo'), - ('*GENCODE-v3c.exonFPKM.gtf', 'ExonsGencV3c'), - ('*GENCODE-v3c.genes.gtf', 'GeneGencV3c'), - ('*GENCODE-v3c.transcripts.gtf', 'TranscriptGencV3c'), - ('*GENCODE-v3c.TSS.gtf', 'TSS'), - ('*.junctions.bed6+3', 'Junctions'), - - ('*.?ufflinks-0.9.0?genes.expr', 'GeneDeNovo'), - ('*.?ufflinks-0.9.0?transcripts.expr', 'TranscriptDeNovo'), - ('*.?ufflinks-0.9.0?transcripts.gtf', 'GeneModel'), - - ('*.GENCODE-v3c?genes.expr', 'GeneGCV3c'), - ('*.GENCODE-v3c?transcript*.expr', 'TranscriptGCV3c'), - ('*.GENCODE-v3c?transcript*.gtf', 'TranscriptGencV3c'), - ('*.GENCODE-v4?genes.expr', None), #'GeneGCV4'), - ('*.GENCODE-v4?transcript*.expr', None), #'TranscriptGCV4'), - ('*.GENCODE-v4?transcript*.gtf', None), #'TranscriptGencV4'), - ('*_1.75mers.fastq', 'FastqRd1'), - ('*_2.75mers.fastq', 'FastqRd2'), - ('*_r1.fastq', 'FastqRd1'), - ('*_r2.fastq', 'FastqRd2'), - ('*.fastq', 'Fastq'), - ('*.gtf', 'GeneModel'), - ('*.ini', None), - ('*.log', None), - ('*.md5', None), - ('paired-end-distribution*', 'InsLength'), - ('*.stats.txt', 'InsLength'), - ('*.srf', None), - ('*.wig', None), - ('*.zip', None), - ('transfer_log', None), - ] - - self.views = { - None: {"MapAlgorithm": "NA"}, - "Paired": {"MapAlgorithm": ma}, - "Aligns": {"MapAlgorithm": ma}, - "Single": {"MapAlgorithm": ma}, - "Splices": {"MapAlgorithm": ma}, - "Junctions": {"MapAlgorithm": ma}, - "PlusSignal": {"MapAlgorithm": ma}, - "MinusSignal": {"MapAlgorithm": ma}, - "Signal": {"MapAlgorithm": ma}, - "GeneModel": {"MapAlgorithm": ma}, - "GeneDeNovo": {"MapAlgorithm": ma}, - "TranscriptDeNovo": {"MapAlgorithm": ma}, - "ExonsGencV3c": {"MapAlgorithm": ma}, - "GeneGencV3c": {"MapAlgorithm": ma}, - "TSS": {"MapAlgorithm": ma}, - "GeneGCV3c": {"MapAlgorithm": ma}, - "TranscriptGCV3c": {"MapAlgorithm": ma}, - "TranscriptGencV3c": {"MapAlgorithm": ma}, - "GeneGCV4": {"MapAlgorithm": ma}, - "TranscriptGCV4": {"MapAlgorithm": ma}, - "FastqRd1": {"MapAlgorithm": "NA", "type": "fastq"}, - "FastqRd2": {"MapAlgorithm": "NA", "type": "fastq"}, - "Fastq": {"MapAlgorithm": "NA", "type": "fastq" }, - "InsLength": {"MapAlgorithm": ma}, - } - # view name is one of the attributes - for v in self.views.keys(): - self.views[v]['view'] = v - - def find_attributes(self, pathname, lib_id): - """Looking for the best extension - The 'best' is the longest match - - :Args: - filename (str): the filename whose extention we are about to examine - """ - path, filename = os.path.splitext(pathname) - if not self.lib_cache.has_key(lib_id): - self.lib_cache[lib_id] = get_library_info(self.root_url, - self.apidata, lib_id) - - lib_info = self.lib_cache[lib_id] - if lib_info['cell_line'].lower() == 'unknown': - logging.warn("Library %s missing cell_line" % (lib_id,)) - attributes = { - 'cell': lib_info['cell_line'], - 'replicate': lib_info['replicate'], - } - is_paired = self._is_paired(lib_id, lib_info) - - if is_paired: - attributes.update(self.get_paired_attributes(lib_info)) - else: - attributes.update(self.get_single_attributes(lib_info)) - - for pattern, view in self.patterns: - if fnmatch.fnmatch(pathname, pattern): - if callable(view): - view = view(is_paired=is_paired) - - attributes.update(self.views[view]) - attributes["extension"] = pattern - return attributes - - - def _guess_bam_view(self, is_paired=True): - """Guess a view name based on library attributes - """ - if is_paired: - return "Paired" - else: - return "Aligns" - - - def _is_paired(self, lib_id, lib_info): - """Determine if a library is paired end""" - # TODO: encode this information in the library type page. - single = (1,3,6) - if len(lib_info["lane_set"]) == 0: - # we haven't sequenced anything so guess based on library type - if lib_info['library_type_id'] in single: - return False - else: - return True - - if not self.lib_paired.has_key(lib_id): - is_paired = 0 - isnot_paired = 0 - failed = 0 - # check to see if all the flowcells are the same. - # otherwise we might need to do something complicated - for flowcell in lib_info["lane_set"]: - # yes there's also a status code, but this comparison - # is easier to read - if flowcell["status"].lower() == "failed": - # ignore failed flowcell - failed += 1 - pass - elif flowcell["paired_end"]: - is_paired += 1 - else: - isnot_paired += 1 - - logging.debug("Library %s: %d paired, %d single, %d failed" % \ - (lib_info["library_id"], is_paired, isnot_paired, failed)) - - if is_paired > isnot_paired: - self.lib_paired[lib_id] = True - elif is_paired < isnot_paired: - self.lib_paired[lib_id] = False - else: - raise RuntimeError("Equal number of paired & unpaired lanes."\ - "Can't guess library paired status") - - return self.lib_paired[lib_id] - - def get_paired_attributes(self, lib_info): - if lib_info['insert_size'] is None: - errmsg = "Library %s is missing insert_size, assuming 200" - logging.warn(errmsg % (lib_info["library_id"],)) - insert_size = 200 - else: - insert_size = lib_info['insert_size'] - return {'insertLength': insert_size, - 'readType': '2x75'} - - def get_single_attributes(self, lib_info): - return {'insertLength':'ilNA', - 'readType': '1x75D' - } - def make_submission_section(line_counter, files, attributes): """ Create a section in the submission ini file diff --git a/htsworkflow/submission/daf.py b/htsworkflow/submission/daf.py index cdc9312..e1a9b1f 100644 --- a/htsworkflow/submission/daf.py +++ b/htsworkflow/submission/daf.py @@ -1,25 +1,60 @@ """Parse UCSC DAF File """ import logging +import os import re import string from StringIO import StringIO import types +import urlparse -from htsworkflow.util.rdfhelp import blankOrUri, toTypedNode +import RDF +from htsworkflow.util.rdfhelp import \ + blankOrUri, \ + dafTermOntology, \ + get_model, \ + libraryOntology, \ + owlNS, \ + rdfNS, \ + submissionLog, \ + submissionOntology, \ + toTypedNode, \ + fromTypedNode logger = logging.getLogger(__name__) +# +class ModelException(RuntimeError): pass + # STATES DAF_HEADER = 1 DAF_VIEW = 2 +def parse_into_model(model, submission_name, filename): + """Read a DAF into RDF Model + + requires a short submission name + """ + attributes = parse(filename) + add_to_model(model, attributes, submission_name) + +def fromstream_into_model(model, submission_name, daf_stream): + attributes = parse_stream(daf_stream) + add_to_model(model, attributes, submission_name) + +def fromstring_into_model(model, submission_name, daf_string): + """Read a string containing a DAF into RDF Model + requires a short submission name + """ + attributes = fromstring(daf_string) + add_to_model(model, attributes, submission_name) + def parse(filename): stream = open(filename,'r') attributes = parse_stream(stream) stream.close() - return stream + return attributes def fromstring(daf_string): stream = StringIO(daf_string) @@ -87,43 +122,305 @@ def _extract_value_index(line, start=0): shortline = line.rstrip() return len(shortline) -try: - import RDF - def convert_to_rdf_statements(attributes, source=None): - ddfNS = RDF.NS("http://encodesubmit.ucsc.edu/pipeline/download_ddf#") - - subject = blankOrUri(source) +def convert_to_rdf_statements(attributes, name): + submission_uri = get_submission_uri(name) + subject = RDF.Node(submission_uri) + + statements = [] + for daf_key in attributes: + predicate = dafTermOntology[daf_key] + if daf_key == 'views': + statements.extend(_views_to_statements(name, + dafTermOntology, + attributes[daf_key])) + elif daf_key == 'variables': + #predicate = ddfNS['variables'] + for var in attributes.get('variables', []): + obj = toTypedNode(var) + statements.append(RDF.Statement(subject, predicate, obj)) + else: + value = attributes[daf_key] + obj = toTypedNode(value) + statements.append(RDF.Statement(subject,predicate,obj)) + + return statements + +def _views_to_statements(name, dafNS, views): + subject = RDF.Node(get_submission_uri(name)) + viewNS = get_view_namespace(name) + + statements = [] + for view_name in views: + view_attributes = views[view_name] + viewSubject = viewNS[view_name] + statements.append(RDF.Statement(subject, dafNS['views'], viewSubject)) + for view_attribute_name in view_attributes: + predicate = dafNS[view_attribute_name] + obj = toTypedNode(view_attributes[view_attribute_name]) + statements.append(RDF.Statement(viewSubject, predicate, obj)) + + #statements.extend(convert_to_rdf_statements(view, viewNode)) + return statements + +def add_to_model(model, attributes, name): + for statement in convert_to_rdf_statements(attributes, name): + model.add_statement(statement) + +def get_submission_uri(name): + return submissionLog[name].uri + +def get_view_namespace(name): + submission_uri = get_submission_uri(name) + viewNS = RDF.NS(str(submission_uri) + '/view/') + return viewNS + +class DAFMapper(object): + """Convert filenames to views in the UCSC Daf + """ + def __init__(self, name, daf_file=None, model=None): + """Construct a RDF backed model of a UCSC DAF + + :args: + name (str): the name of this submission (used to construct DAF url) + daf_file (str, stream, or None): + if str, use as filename + if stream, parse as stream + if none, don't attempt to load the DAF into our model + model (RDF.Model or None): + if None, construct a memory backed model + otherwise specifies model to use + """ + if daf_file is None and model is None: + logger.error("We need a DAF or Model containing a DAF to work") + + self.name = name + if model is not None: + self.model = model + else: + self.model = get_model() + + if hasattr(daf_file, 'next'): + # its some kind of stream + fromstream_into_model(self.model, name, daf_file) + else: + # file + parse_into_model(self.model, name, daf_file) + + self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/') + self.submissionSet = get_submission_uri(self.name) + self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/') + self.__view_map = None + + + def add_pattern(self, view_name, filename_pattern): + """Map a filename regular expression to a view name + """ + viewNS = get_view_namespace(self.name) + + obj = toTypedNode(filename_pattern) + self.model.add_statement( + RDF.Statement(viewNS[view_name], + dafTermOntology['filename_re'], + obj)) + + + def import_submission_dir(self, submission_dir, library_id): + """Import a submission directories and update our model as needed + """ + #attributes = get_filename_attribute_map(paired) + libNode = self.libraryNS[library_id + "/"] - statements = [] - for name in attributes: - predicate = ddfNS[name] - if name == 'views': - predicate = ddfNS['views'] - for view_name in attributes.get('views', []): - view = attributes['views'][view_name] - viewNode = RDF.Node() - statements.append(RDF.Statement(subject, predicate, viewNode)) - statements.extend(convert_to_rdf_statements(view, viewNode)) - elif name == 'variables': - predicate = ddfNS['variables'] - for var in attributes.get('variables', []): - obj = toTypedNode(var) - statements.append(RDF.Statement(subject, predicate, obj)) - else: - value = attributes[name] - obj = toTypedNode(value) - statements.append(RDF.Statement(subject,predicate,obj)) - - return statements - + submission_files = os.listdir(submission_dir) + for f in submission_files: + self.construct_file_attributes(submission_dir, libNode, f) + + #attributes['md5sum'] = "None" + # + #ext = attributes["filename_re"] + #if attributes.get("type", None) == 'fastq': + # fastqs.setdefault(ext, set()).add(f) + # fastq_attributes[ext] = attributes + #else: + # md5sum = make_md5sum(os.path.join(result_dir,f)) + # if md5sum is not None: + # attributes['md5sum']=md5sum + #print attributes + + + def construct_file_attributes(self, submission_dir, libNode, pathname): + """Looking for the best extension + The 'best' is the longest match + + :Args: + filename (str): the filename whose extention we are about to examine + """ + path, filename = os.path.split(pathname) + + view = self.find_view(filename) + if view is None: + logger.warn("Unrecognized file: %s" % (pathname,)) + return None + if str(view) == str(libraryOntology['ignore']): + return None + + submissionName = toTypedNode(self.make_submission_name(submission_dir)) + submissionNode = self.get_submission_node(submission_dir) + self.model.add_statement( + RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode)) + + fileNode = RDF.Node(RDF.Uri(str(submissionNode.uri) + '/' +filename)) + self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], view)) + self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], submissionName)) + self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission'])) + + self.model.add_statement( + RDF.Statement(view, dafTermOntology['filename'], toTypedNode(filename))) + self.model.add_statement( + RDF.Statement(view, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode)))) + self.model.add_statement( + RDF.Statement(view, dafTermOntology['submission'], submissionNode)) + + # extra information + terms = [dafTermOntology['type'], + dafTermOntology['filename_re'], + ] + terms.extend((dafTermOntology[v] for v in self.get_daf_variables())) + + # Add everything I can find + for term in terms: + value = self._get_library_attribute(libNode, term) + if value is not None: + self.model.add_statement(RDF.Statement(view, term, value)) + + + def _add_library_details_to_model(self, libNode): + parser = RDF.Parser(name='rdfa') + new_statements = parser.parse_as_stream(libNode.uri) + for s in new_statements: + # don't override things we already have in the model + q = RDF.Statement(s.subject, s.predicate, None) + if len(list(self.model.find_statements(q))) == 0: + self.model.append(s) + + statements = list(self.model.find_statements(q)) + if len(statements) == 0: + logger.warning("Nothing known about %s" % (str(libNode),)) + + def get_daf_variables(self): + """Returns simple variables names that to include in the ddf + """ + variableTerm = dafTermOntology['variables'] + results = ['view'] + for obj in self.model.get_targets(self.submissionSet, variableTerm): + value = str(fromTypedNode(obj)) + results.append(value) + results.append('labVersion') + return results + + def make_submission_name(self, submission_dir): + submission_dir = os.path.normpath(submission_dir) + submission_dir_name = os.path.split(submission_dir)[1] + if len(submission_dir_name) == 0: + raise RuntimeError( + "Submission dir name too short: %s" %(submission_dir,)) + return submission_dir_name - def add_to_model(model, attributes, source=None): - for statement in convert_to_rdf_statements(attributes, source): - model.add_statement(statement) + def get_submission_node(self, submission_dir): + """Convert a submission directory name to a submission node + """ + submission_name = self.make_submission_name(submission_dir) + return self.submissionSetNS[submission_name] + + def _get_library_attribute(self, libNode, attribute): + if not isinstance(attribute, RDF.Node): + attribute = libraryOntology[attribute] + + # search through the model twice (adding in data from website) + for i in xrange(2): + targets = list(self.model.get_targets(libNode, attribute)) + if len(targets) > 0: + return self._format_library_attribute(targets) + + targets = self._search_same_as(libNode, attribute) + if targets is not None: + return self._format_library_attribute(targets) -except ImportError, e: - def convert_to_rdf_statements(attributes, source=None): - raise NotImplementedError("librdf not installed") - def add_to_model(model, attributes, source=None): - raise NotImplementedError("librdf not installed") + # we don't know anything about this attribute + self._add_library_details_to_model(libNode) + + return None + + def _format_library_attribute(self, targets): + if len(targets) == 0: + return None + elif len(targets) == 1: + return fromTypedNode(targets[0]) + elif len(targets) > 1: + return [fromTypedNode(t) for t in targets] + + def _search_same_as(self, subject, predicate): + # look for alternate names + other_predicates = self.model.get_targets(predicate, owlNS['sameAs']) + for other in other_predicates: + targets = list(self.model.get_targets(subject, other)) + if len(targets) > 0: + return targets + return None + + def find_view(self, filename): + """Search through potential DAF filename patterns + """ + if self.__view_map is None: + self.__view_map = self._get_filename_view_map() + results = [] + for pattern, view in self.__view_map.items(): + if re.match(pattern, filename): + results.append(view) + + if len(results) > 1: + msg = "%s matched multiple views %s" % ( + filename, + [str(x) for x in results]) + raise ModelException(msg) + elif len(results) == 1: + return results[0] + else: + return None + + + def _get_filename_view_map(self): + """Query our model for filename patterns + + return a dictionary of compiled regular expressions to view names + """ + filename_query = RDF.Statement( + None, dafTermOntology['filename_re'], None) + + patterns = {} + for s in self.model.find_statements(filename_query): + view_name = s.subject + literal_re = s.object.literal_value['string'] + logger.debug("Found: %s" % (literal_re,)) + try: + filename_re = re.compile(literal_re) + except re.error, e: + logger.error("Unable to compile: %s" % (literal_re,)) + patterns[literal_re] = view_name + return patterns + + def _is_paired(self, libNode): + """Determine if a library is paired end""" + library_type = self._get_library_attribute(libNode, 'library_type') + if library_type is None: + raise ModelException("%s doesn't have a library type" % (str(libNode),)) + + #single = (1,3,6) + single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)'] + paired = ['Paired End', 'Multiplexing', 'Barcoded'] + if library_type in single: + return False + elif library_type in paired: + return True + else: + raise RuntimeError("Unrecognized library type %s" % (library_type,)) diff --git a/htsworkflow/submission/test/test_daf.py b/htsworkflow/submission/test/test_daf.py index 3749446..b71e362 100644 --- a/htsworkflow/submission/test/test_daf.py +++ b/htsworkflow/submission/test/test_daf.py @@ -1,6 +1,16 @@ +from StringIO import StringIO import unittest from htsworkflow.submission import daf +from htsworkflow.util.rdfhelp import \ + dafTermOntology, \ + rdfNS, \ + submissionLog, \ + submissionOntology, \ + get_model, \ + get_serializer + +import RDF test_daf = """# Lab and general info grant Hardison @@ -13,9 +23,9 @@ dafVersion 2.0 validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000 # Track/view definition -view Peaks -longLabelPrefix Caltech Histone Peaks -type narrowPeak +view FastqRd1 +longLabelPrefix Caltech Fastq Read 1 +type fastq hasReplicates yes required no @@ -35,7 +45,7 @@ class TestDAF(unittest.TestCase): self.failUnlessEqual(parsed['grant'], 'Hardison') self.failUnlessEqual(len(parsed['variables']), 6) self.failUnlessEqual(len(parsed['views']), 2) - self.failUnlessEqual(len(parsed['views']['Peaks']), 5) + self.failUnlessEqual(len(parsed['views']['FastqRd1']), 5) self.failUnlessEqual(len(parsed['views']['Signal']), 5) signal = parsed['views']['Signal'] self.failUnlessEqual(signal['required'], False) @@ -43,25 +53,136 @@ class TestDAF(unittest.TestCase): 'Caltech Histone Signal') def test_rdf(self): - try: - import RDF - - parsed = daf.fromstring(test_daf) - #mem = RDF.Storage(storage_name='hashes', - # options_string='hash-type="memory"'), - mem = RDF.MemoryStorage() - model = RDF.Model(mem) - - daf.add_to_model(model, parsed) - - writer = RDF.Serializer(name='turtle') - print writer.serialize_model_to_string(model) - - except ImportError, e: - print "Skipped test_rdf" + + parsed = daf.fromstring(test_daf) + #mem = RDF.Storage(storage_name='hashes', + # options_string='hash-type="memory"'), + mem = RDF.MemoryStorage() + model = RDF.Model(mem) + + name = 'cursub' + subNS = RDF.NS(str(submissionLog[name].uri)) + daf.add_to_model(model, parsed, name) + + signal_view_node = RDF.Node(subNS['/view/Signal'].uri) + writer = get_serializer() + turtle = writer.serialize_model_to_string(model) + #print turtle + + self.failUnless(str(signal_view_node) in turtle) + + statements = list(model.find_statements( + RDF.Statement( + signal_view_node, None, None))) + self.failUnlessEqual(len(statements), 5) + + +def dump_model(model): + writer = get_serializer() + turtle = writer.serialize_model_to_string(model) + print turtle + +class TestDAFMapper(unittest.TestCase): + def test_create_mapper_add_pattern(self): + name = 'testsub' + test_daf_stream = StringIO(test_daf) + mapper = daf.DAFMapper(name, daf_file=test_daf_stream) + pattern = '.bam\Z(?ms)' + mapper.add_pattern('Signal', pattern) + + s = RDF.Statement(daf.get_view_namespace(name)['Signal'], + dafTermOntology['filename_re'], + None) + search = list(mapper.model.find_statements(s)) + self.failUnlessEqual(len(search), 1) + self.failUnlessEqual(str(search[0].subject), + str(submissionLog['testsub/view/Signal'])) + self.failUnlessEqual(str(search[0].predicate), + str(dafTermOntology['filename_re'])) + #self.failUnlessEqual(search[0].object.literal_value['string'], pattern) + + def test_find_one_view(self): + model = get_model() + + parser = RDF.Parser(name='turtle') + parser.parse_string_into_model(model, ''' +@prefix dafTerm: . + +<%(submissionLog)s/testfind/view/Signal> dafTerm:filename_re ".*\\\\.bam" . +<%(submissionLog)s/testfind/view/FastqRd1> dafTerm:filename_re ".*_r1\\\\.fastq" . +''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'}, + 'http://blank') + name = 'testfind' + test_stream = StringIO(test_daf) + daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model) + + view = daf_mapper.find_view('filename_r1.fastq') + self.failUnlessEqual(str(view), + str(submissionLog['testfind/view/FastqRd1'])) + + #writer = get_serializer() + #turtle = writer.serialize_model_to_string(model) + #print turtle + + def test_find_overlapping_view(self): + model = get_model() + + parser = RDF.Parser(name='turtle') + parser.parse_string_into_model(model, ''' +@prefix dafTerm: . + +<%(submissionLog)s/testfind/view/fastq> dafTerm:filename_re ".*\\\\.fastq" . +<%(submissionLog)s/testfind/view/FastqRd1> dafTerm:filename_re ".*_r1\\\\.fastq" . +''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog'}, + 'http://blank') + name = 'testfind' + test_stream = StringIO(test_daf) + daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model) + + self.failUnlessRaises(daf.ModelException, + daf_mapper.find_view, + 'filename_r1.fastq') + + def test_find_attributes(self): + lib_id = '11204' + lib_url = 'http://jumpgate.caltech.edu/library/%s' %(lib_id) + model = get_model() + + parser = RDF.Parser(name='turtle') + parser.parse_string_into_model(model, ''' +@prefix dafTerm: . +@prefix xsd: . + +<%(submissionLog)s/testfind/view/Signal> dafTerm:filename_re ".*\\\\.bam" . +<%(submissionLog)s/testfind/view/FastqRd1> dafTerm:filename_re ".*\\\\.fastq" . +<%(libUrl)s> <%(libraryOntology)sgel_cut> "100"^^xsd:decimal . +''' % {'submissionLog': 'http://jumpgate.caltech.edu/wiki/SubmissionsLog', + 'libraryOntology': 'http://jumpgate.caltech.edu/wiki/LibraryOntology#', + 'libUrl': lib_url}, + 'http://blank') + name = 'testfind' + test_stream = StringIO(test_daf) + daf_mapper = daf.DAFMapper(name, daf_file=test_stream, model=model) + libNode = RDF.Node(RDF.Uri(lib_url)) + daf_mapper._add_library_details_to_model(libNode) + gel_cut = daf_mapper._get_library_attribute(libNode, 'gel_cut') + # make sure we can override attributes, the value in our + # server is 500 for this library + self.failUnlessEqual(gel_cut, 100) + + species = daf_mapper._get_library_attribute(libNode, 'species') + self.failUnlessEqual(species, "Homo sapiens") + + daf_mapper.construct_file_attributes('/tmp/analysis1', libNode, 'filename.bam') + source = daf_mapper.model.get_source(rdfNS['type'], submissionOntology['submission']) + self.failUnlessEqual(str(source), "") + view = daf_mapper.model.get_target(source, submissionOntology['has_view']) + self.failUnlessEqual(str(view), "") def suite(): - return unittest.makeSuite(TestDAF, 'test') + suite = unittest.makeSuite(TestDAF, 'test') + suite.addTest(unittest.makeSuite(TestDAFMapper, 'test')) + return suite if __name__ == "__main__": unittest.main(defaultTest='suite') diff --git a/htsworkflow/util/rdfhelp.py b/htsworkflow/util/rdfhelp.py index 7e9deda..8afa132 100644 --- a/htsworkflow/util/rdfhelp.py +++ b/htsworkflow/util/rdfhelp.py @@ -1,17 +1,22 @@ """Helper features for working with librdf """ -import RDF +import os import types +import RDF + # standard ontology namespaces +owlNS = RDF.NS('http://www.w3.org/2002/07/owl#') dublinCoreNS = RDF.NS("http://purl.org/dc/elements/1.1/") rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#") rdfsNS= RDF.NS("http://www.w3.org/2000/01/rdf-schema#") xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#") # internal ontologies -submitOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UCSCSubmissionOntology#") +submissionOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#") +dafTermOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UcscDaf#") libraryOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#") +submissionLog = RDF.NS("http://jumpgate.caltech.edu/wiki/SubmissionsLog/") def blankOrUri(value=None): node = None @@ -32,6 +37,12 @@ def toTypedNode(value): value = u'1' else: value = u'0' + elif type(value) in (types.IntType, types.LongType): + value_type = xsdNS['decimal'].uri + value = unicode(value) + elif type(value) == types.FloatType: + value_type = xsdNS['float'].uri + value = unicode(value) elif type(value) in types.StringTypes: value_type = xsdNS['string'].uri else: @@ -39,4 +50,71 @@ def toTypedNode(value): value = unicode(value) return RDF.Node(literal=value, datatype=value_type) + +def fromTypedNode(node): + if node is None: + return None + + value_type = str(node.literal_value['datatype']) + # chop off xml schema declaration + value_type = value_type.replace(str(xsdNS[''].uri),'') + literal = node.literal_value['string'] + literal_lower = literal.lower() + + if value_type == 'boolean': + if literal_lower in ('1', 'yes', 'true'): + return True + elif literal_lower in ('0', 'no', 'false'): + return False + else: + raise ValueError("Unrecognized boolean %s" % (literal,)) + elif value_type == 'decimal' and literal.find('.') == -1: + return int(literal) + elif value_type in ('decimal', 'float', 'double'): + return float(literal) + elif value_type in ('string'): + return literal + elif value_type in ('dateTime'): + raise NotImplemented('need to parse isoformat date-time') + + return literal + + +def get_model(model_name=None, directory=None): + if directory is None: + directory = os.getcwd() + + if model_name is None: + storage = RDF.MemoryStorage() + else: + storage = RDF.HashStorage(model_name, + options="hash-type='bdb',dir='{0}'".format(directory)) + model = RDF.Model(storage) + return model + + +def load_into_model(model, parser_name, filename, ns=None): + if not os.path.exists(filename): + raise IOError("Can't find {0}".format(filename)) + data = open(filename, 'r').read() + rdf_parser = RDF.Parser(name=parser_name) + rdf_parser.parse_string_into_model(model, data, ns) + + +def get_serializer(name='turtle'): + """Return a serializer with our standard prefixes loaded + """ + writer = RDF.Serializer(name=name) + # really standard stuff + writer.set_namespace('owl', owlNS._prefix) + writer.set_namespace('rdf', rdfNS._prefix) + writer.set_namespace('rdfs', rdfsNS._prefix) + writer.set_namespace('xsd', xsdNS._prefix) + + # should these be here, kind of specific to an application + writer.set_namespace('libraryOntology', libraryOntology._prefix) + writer.set_namespace('ucscSubmission', submissionOntology._prefix) + writer.set_namespace('ucscDaf', dafTermOntology._prefix) + return writer + -- 2.30.2