From 422aae6869d8fd6e81db031c4d2f698bcbae451c Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Fri, 2 Sep 2011 16:28:39 -0700 Subject: [PATCH] Attempt to download DAF data for a encodesubmit submission This required trying to make htsworkflow/submission/daf.py more flexible, and not assuming that it was going to get a "submission log name" Also theres some changes to make pep8/pylint happier --- extra/ucsc_encode_submission/encode_find.py | 304 ++++++++++++-------- htsworkflow/submission/daf.py | 243 ++++++++++------ htsworkflow/submission/test/test_daf.py | 61 ++-- 3 files changed, 377 insertions(+), 231 deletions(-) diff --git a/extra/ucsc_encode_submission/encode_find.py b/extra/ucsc_encode_submission/encode_find.py index 15acea6..f116281 100644 --- a/extra/ucsc_encode_submission/encode_find.py +++ b/extra/ucsc_encode_submission/encode_find.py @@ -1,4 +1,7 @@ #!/usr/bin/env python +""" +Gather information about our submissions into a single RDF store +""" from BeautifulSoup import BeautifulSoup from datetime import datetime @@ -11,13 +14,16 @@ import logging import os import re # redland rdf lib -import RDF +import RDF import sys import urllib import urlparse +from htsworkflow.submission import daf + from htsworkflow.util import api from htsworkflow.util.rdfhelp import \ + dafTermOntology, \ dublinCoreNS, \ get_model, \ get_serializer, \ @@ -28,18 +34,23 @@ from htsworkflow.util.rdfhelp import \ rdfNS, \ rdfsNS, \ xsdNS +TYPE_N = rdfNS['type'] # URL mappings -libraryNS = RDF.NS("http://jumpgate.caltech.edu/library/") +LIBRARY_NS = RDF.NS("http://jumpgate.caltech.edu/library/") + +from htsworkflow.submission.ucsc import \ + daf_download_url, \ + ddf_download_url, \ + submission_view_url, \ + UCSCEncodePipeline +DOWNLOAD_DDF = UCSCEncodePipeline + "download_ddf#" +DDF_NS = RDF.NS(DOWNLOAD_DDF) -from htsworkflow.submission.ucsc import submission_view_url, UCSCEncodePipeline -download_ddf = UCSCEncodePipeline+"download_ddf#" -ddfNS = RDF.NS(download_ddf) - DBDIR = os.path.expanduser("~diane/proj/submission") -logger = logging.getLogger("encode_find") +LOGGER = logging.getLogger("encode_find") LOGIN_URL = 'http://encodesubmit.ucsc.edu/account/login' USER_URL = 'http://encodesubmit.ucsc.edu/pipeline/show_user' @@ -47,7 +58,14 @@ USER_URL = 'http://encodesubmit.ucsc.edu/pipeline/show_user' USERNAME = 'detrout' CHARSET = 'utf-8' + def main(cmdline=None): + """ + Parse command line arguments + + Takes a list of arguments (assuming arg[0] is the program name) or None + If None, it looks at sys.argv + """ parser = make_parser() opts, args = parser.parse_args(cmdline) @@ -58,31 +76,38 @@ def main(cmdline=None): htsw_authdata = api.make_auth_from_opts(opts, parser) htswapi = api.HtswApi(opts.host, htsw_authdata) - + cookie = None model = get_model(opts.load_model, DBDIR) - + if opts.load_rdf is not None: ns_uri = submissionOntology[''].uri load_into_model(model, opts.rdf_parser_name, opts.load_rdf, ns_uri) - + + if len(args) == 0: + limit = None + else: + limit = args + if opts.update: cookie = login(cookie=cookie) - load_my_submissions(model, cookie=cookie) + load_my_submissions(model, limit=limit, cookie=cookie) load_encode_libraries(model, htswapi) if opts.sparql is not None: sparql_query(model, opts.sparql) if opts.find_submission_with_no_library: - missing = find_submissions_with_no_library(model) - + find_submissions_with_no_library(model) + if opts.print_rdf: serializer = get_serializer(name=opts.rdf_parser_name) print serializer.serialize_model_to_string(model) def make_parser(): + """Construct option parser + """ parser = OptionParser() commands = OptionGroup(parser, "Commands") commands.add_option('--load-model', default=None, @@ -98,15 +123,16 @@ def make_parser(): #commands.add_option('--update-ddfs', action="store_true", default=False, # help="download ddf information for known submission") #commands.add_option('--update-library', default=None, - # help="download library info from htsw, requires filename for extra rules") + # help="download library info from htsw, "\ + # "requires filename for extra rules") parser.add_option_group(commands) - + queries = OptionGroup(parser, "Queries") queries.add_option('--sparql', default=None, help="execute arbitrary sparql query") queries.add_option('--find-submission-with-no-library', default=False, action="store_true", - help="find submissions with no library ID") + help="find submissions with no library ID") parser.add_option_group(queries) options = OptionGroup(parser, "Options") @@ -115,86 +141,91 @@ def make_parser(): options.add_option("-v", "--verbose", action="store_true", default=False) options.add_option("--debug", action="store_true", default=False) parser.add_option_group(options) - + api.add_auth_options(parser) return parser -def load_my_submissions(model, cookie=None): + +def load_my_submissions(model, limit=None, cookie=None): + """Parse all the submissions from UCSC into model + It will look at the global USER_URL to figure out who to scrape + cookie contains the session cookie, if none, will attempt to login + """ if cookie is None: cookie = login() - + soup = get_url_as_soup(USER_URL, 'GET', cookie) - p = soup.find('table', attrs={'id':'projects'}) - tr = p.findNext('tr') + projects = soup.find('table', attrs={'id': 'projects'}) + table_row = projects.findNext('tr') # first record is header - tr = tr.findNext() - TypeN = rdfsNS['type'] - NameN = submissionOntology['name'] - SpeciesN = submissionOntology['species'] - LibraryURN = submissionOntology['library_urn'] - - while tr is not None: - td = tr.findAll('td') - if td is not None and len(td) > 1: - subUrnText = td[0].contents[0].contents[0].encode(CHARSET) - subUrn = RDF.Uri(submission_view_url(subUrnText)) - - add_stmt(model, subUrn, TypeN, submissionOntology['Submission']) - - name = get_contents(td[4]) - add_stmt(model, subUrn, NameN, name) - - species = get_contents(td[2]) - if species is not None: - add_stmt(model, subUrn, SpeciesN, species) - - library_id = get_library_id(name) - if library_id is not None: - add_submission_to_library_urn(model, - subUrn, - LibraryURN, - library_id) - - add_submission_creation_date(model, subUrn, cookie) - - # grab changing atttributes - status = get_contents(td[6]).strip() - last_mod_datetime = get_date_contents(td[8]) - last_mod = last_mod_datetime.isoformat() - - update_submission_detail(model, subUrn, status, last_mod, cookie=cookie) - - logging.info("Processed {0}".format( subUrn)) - - tr = tr.findNext('tr') + table_row = table_row.findNext() + name_n = submissionOntology['name'] + species_n = submissionOntology['species'] + library_urn = submissionOntology['library_urn'] + + while table_row is not None: + cell = table_row.findAll('td') + if cell is not None and len(cell) > 1: + submission_id = cell[0].contents[0].contents[0].encode(CHARSET) + if limit is None or submission_id in limit: + subUrn = RDF.Uri(submission_view_url(submission_id)) + + add_stmt(model, subUrn, TYPE_N, submissionOntology['Submission']) + + name = get_contents(cell[4]) + add_stmt(model, subUrn, name_n, name) + + species = get_contents(cell[2]) + if species is not None: + add_stmt(model, subUrn, species_n, species) + + library_id = get_library_id(name) + if library_id is not None: + add_submission_to_library_urn(model, + subUrn, + library_urn, + library_id) + + add_submission_creation_date(model, subUrn, cookie) + + # grab changing atttributes + status = get_contents(cell[6]).strip() + last_mod_datetime = get_date_contents(cell[8]) + last_mod = last_mod_datetime.isoformat() + + update_submission_detail(model, subUrn, status, last_mod, + cookie=cookie) + + logging.info("Processed {0}".format(subUrn)) + + table_row = table_row.findNext('tr') def add_submission_to_library_urn(model, submissionUrn, predicate, library_id): """Add a link from a UCSC submission to woldlab library if needed """ - libraryUrn = libraryNS[library_id+'/'] + libraryUrn = LIBRARY_NS[library_id + '/'] query = RDF.Statement(submissionUrn, predicate, libraryUrn) if not model.contains_statement(query): link = RDF.Statement(submissionUrn, predicate, libraryUrn) - logger.info("Adding Sub -> Lib link: {0}".format(link)) + LOGGER.info("Adding Sub -> Lib link: {0}".format(link)) model.add_statement(link) else: - logger.debug("Found: {0}".format(str(query))) + LOGGER.debug("Found: {0}".format(str(query))) + - def find_submissions_with_no_library(model): missing_lib_query = RDF.SPARQLQuery(""" PREFIX submissionOntology:<{submissionOntology}> -SELECT +SELECT ?subid ?name WHERE {{ ?subid submissionOntology:name ?name OPTIONAL {{ ?subid submissionOntology:library_urn ?libid }} FILTER (!bound(?libid)) -}}""".format(submissionOntology=submissionOntology[''].uri) -) +}}""".format(submissionOntology=submissionOntology[''].uri)) results = missing_lib_query.execute(model) for row in results: @@ -202,9 +233,10 @@ WHERE {{ name = row['name'] print "# {0}".format(name) print "<{0}>".format(subid.uri) - print " encodeSubmit:library_urn ." + print " encodeSubmit:library_urn"\ + " ." print "" - + def add_submission_creation_date(model, subUrn, cookie): # in theory the submission page might have more information on it. @@ -213,7 +245,7 @@ def add_submission_creation_date(model, subUrn, cookie): query = RDF.Statement(subUrn, creationDateN, None) creation_dates = list(model.find_statements(query)) if len(creation_dates) == 0: - logger.info("Getting creation date for: {0}".format(str(subUrn))) + LOGGER.info("Getting creation date for: {0}".format(str(subUrn))) soup = get_url_as_soup(str(subUrn), 'GET', cookie) created_label = soup.find(text="Created: ") if created_label: @@ -222,7 +254,8 @@ def add_submission_creation_date(model, subUrn, cookie): datatype=dateTimeType.uri) add_stmt(model, subUrn, creationDateN, created_date_node) else: - logger.debug("Found creation date for: {0}".format(str(subUrn))) + LOGGER.debug("Found creation date for: {0}".format(str(subUrn))) + def update_submission_detail(model, subUrn, status, recent_update, cookie): HasStatusN = submissionOntology['has_status'] @@ -235,33 +268,46 @@ def update_submission_detail(model, subUrn, status, recent_update, cookie): if len(status_nodes) == 0: # has no status node, add one logging.info("Adding status node to {0}".format(subUrn)) - status_blank = RDF.Node() - add_stmt(model, subUrn, HasStatusN, status_blank) - add_stmt(model, status_blank, rdfsNS['type'], StatusN) - add_stmt(model, status_blank, StatusN, status) - add_stmt(model, status_blank, LastModifyN, recent_update) - update_ddf(model, subUrn, status_blank, cookie=cookie) + status_node = create_status_node(subUrn, recent_update) + add_stmt(model, subUrn, HasStatusN, status_node) + add_stmt(model, status_node, rdfsNS['type'], StatusN) + add_stmt(model, status_node, StatusN, status) + add_stmt(model, status_node, LastModifyN, recent_update) + update_ddf(model, subUrn, status_node, cookie=cookie) + update_daf(model, subUrn, status_node, cookie=cookie) else: logging.info("Found {0} status blanks".format(len(status_nodes))) for status_statement in status_nodes: - status_blank = status_statement.object - last_modified_query = RDF.Statement(status_blank, LastModifyN, None) + status_node = status_statement.object + last_modified_query = RDF.Statement(status_node, + LastModifyN, + None) last_mod_nodes = model.find_statements(last_modified_query) for last_mod_statement in last_mod_nodes: last_mod_date = str(last_mod_statement.object) if recent_update == str(last_mod_date): - update_ddf(model, subUrn, status_blank, cookie=cookie) + update_ddf(model, subUrn, status_node, cookie=cookie) + update_daf(model, subUrn, status_node, cookie=cookie) break - +def update_daf(model, submission_url, status_node, cookie): + download_daf_uri = str(submission_url).replace('show', 'download_daf') + daf_uri = RDF.Uri(download_daf_uri) + + status_is_daf = RDF.Statement(status_node, TYPE_N, dafTermOntology['']) + if not model.contains_statement(status_is_daf): + logging.info('Adding daf to {0}, {1}'.format(submission_url, + status_node)) + daf_text = get_url_as_text(download_daf_uri, 'GET', cookie) + daf.fromstring_into_model(model, status_node, daf_text) + + def update_ddf(model, subUrn, statusNode, cookie): - TypeN = rdfsNS['type'] - download_ddf_url = str(subUrn).replace('show', 'download_ddf') ddfUrn = RDF.Uri(download_ddf_url) - - status_is_ddf = RDF.Statement(statusNode, TypeN, ddfNS['']) + + status_is_ddf = RDF.Statement(statusNode, TYPE_N, DDF_NS['']) if not model.contains_statement(status_is_ddf): logging.info('Adding ddf to {0}, {1}'.format(subUrn, statusNode)) ddf_text = get_url_as_text(download_ddf_url, 'GET', cookie) @@ -275,8 +321,7 @@ def add_ddf_statements(model, statusNode, ddf_string): ddf_lines = ddf_string.split('\n') # first line is header header = ddf_lines[0].split() - attributes = [ ddfNS[x] for x in header ] - statements = [] + attributes = [DDF_NS[x] for x in header] for ddf_line in ddf_lines[1:]: ddf_line = ddf_line.strip() @@ -284,18 +329,21 @@ def add_ddf_statements(model, statusNode, ddf_string): continue if ddf_line.startswith("#"): continue - + ddf_record = ddf_line.split('\t') files = ddf_record[0].split(',') file_attributes = ddf_record[1:] for f in files: fileNode = RDF.Node() - add_stmt(model, statusNode, submissionOntology['has_file'], fileNode) - add_stmt(model, fileNode, rdfsNS['type'], ddfNS['file']) - add_stmt(model, fileNode, ddfNS['filename'], f) - - for predicate, object in zip( attributes[1:], file_attributes): + add_stmt(model, + statusNode, + submissionOntology['has_file'], + fileNode) + add_stmt(model, fileNode, rdfsNS['type'], DDF_NS['file']) + add_stmt(model, fileNode, DDF_NS['filename'], f) + + for predicate, object in zip(attributes[1:], file_attributes): add_stmt(model, fileNode, predicate, object) @@ -303,19 +351,19 @@ def load_encode_libraries(model, htswapi): """Get libraries associated with encode. """ encodeFilters = ["/library/?affiliations__id__exact=44", - "/library/?affiliations__id__exact=80",] + "/library/?affiliations__id__exact=80", + ] - encodeUrls = [os.path.join(htswapi.root_url + u) for u in encodeFilters] rdfaParser = RDF.Parser(name='rdfa') for encodeUrl in encodeUrls: - logger.info("Scanning library url {0}".format(encodeUrl)) + LOGGER.info("Scanning library url {0}".format(encodeUrl)) rdfaParser.parse_into_model(model, encodeUrl) query = RDF.Statement(None, libraryOntology['library_id'], None) libraries = model.find_statements(query) for statement in libraries: libraryUrn = statement.subject - logger.info("Scanning {0}".format(str(libraryUrn))) + LOGGER.info("Scanning {0}".format(str(libraryUrn))) load_library_detail(model, libraryUrn) @@ -325,15 +373,17 @@ def load_library_detail(model, libraryUrn): rdfaParser = RDF.Parser(name='rdfa') query = RDF.Statement(libraryUrn, libraryOntology['date'], None) results = list(model.find_statements(query)) - logger.debug("Found {0} statements for {1}".format(len(results), libraryUrn)) + log_message = "Found {0} statements for {1}" + LOGGER.debug(log_message.format(len(results), libraryUrn)) if len(results) == 0: - logger.info("Loading {0}".format(str(libraryUrn))) + LOGGER.info("Loading {0}".format(str(libraryUrn))) rdfaParser.parse_into_model(model, libraryUrn.uri) elif len(results) == 1: - pass # Assuming that a loaded dataset has one record + pass # Assuming that a loaded dataset has one record else: logging.warning("Many dates for {0}".format(libraryUrn)) - + + def get_library_id(name): """Guess library ID from library name @@ -360,8 +410,13 @@ def get_contents(element): return a.contents[0].encode(CHARSET) return element.contents[0].encode(CHARSET) - - + + +def create_status_node(submission_uri, timestamp): + submission_uri = daf.submission_uri_to_string(submission_uri) + status_uri = urlparse.urljoin(submission_uri, timestamp) + return RDF.Node(RDF.Uri(status_uri)) + def get_date_contents(element): data = get_contents(element) if data: @@ -369,13 +424,12 @@ def get_date_contents(element): else: return None - -def add_stmt(model, subject, predicate, object): + +def add_stmt(model, subject, predicate, rdf_object): """Convienence create RDF Statement and add to a model """ return model.add_statement( - RDF.Statement(subject, predicate, object) - ) + RDF.Statement(subject, predicate, rdf_object)) def login(cookie=None): @@ -383,7 +437,7 @@ def login(cookie=None): """ if cookie is not None: return cookie - + keys = keyring.get_keyring() password = keys.get_password(LOGIN_URL, USERNAME) credentials = {'login': USERNAME, @@ -396,13 +450,13 @@ def login(cookie=None): body=urllib.urlencode(credentials)) logging.debug("Login to {0}, status {1}".format(LOGIN_URL, response['status'])) - + cookie = response.get('set-cookie', None) if cookie is None: raise RuntimeError("Wasn't able to log into: {0}".format(LOGIN_URL)) return cookie - + def get_url_as_soup(url, method, cookie=None): http = httplib2.Http() headers = {} @@ -411,15 +465,15 @@ def get_url_as_soup(url, method, cookie=None): response, content = http.request(url, method, headers=headers) if response['status'] == '200': soup = BeautifulSoup(content, - fromEncoding="utf-8", # should read from header - convertEntities=BeautifulSoup.HTML_ENTITIES - ) + fromEncoding="utf-8", # should read from header + convertEntities=BeautifulSoup.HTML_ENTITIES) return soup else: msg = "error accessing {0}, status {1}" msg = msg.format(url, response['status']) e = httplib2.HttpLib2ErrorWithResponse(msg, response, content) + def get_url_as_text(url, method, cookie=None): http = httplib2.Http() headers = {} @@ -432,7 +486,7 @@ def get_url_as_text(url, method, cookie=None): msg = "error accessing {0}, status {1}" msg = msg.format(url, response['status']) e = httplib2.HttpLib2ErrorWithResponse(msg, response, content) - + ################ # old stuff SUBMISSIONS_LACKING_LIBID = [ @@ -452,18 +506,18 @@ SUBMISSIONS_LACKING_LIBID = [ ] - def select_by_library_id(submission_list): - subl = [ (x.library_id, x) for x in submission_list if x.library_id ] + subl = [(x.library_id, x) for x in submission_list if x.library_id] libraries = {} for lib_id, subobj in subl: libraries.setdefault(lib_id, []).append(subobj) for submission in libraries.values(): submission.sort(key=attrgetter('date'), reverse=True) - + return libraries + def library_to_freeze(selected_libraries): freezes = ['2010-Jan', '2010-Jul', '2011-Jan'] lib_ids = sorted(selected_libraries.keys()) @@ -486,7 +540,7 @@ def library_to_freeze(selected_libraries): report.append('') for lib_id in lib_ids: report.append('') - lib_url = libraryNS[lib_id].uri + lib_url = LIBRARY_NS[lib_id].uri report.append('{1}'.format(lib_url, lib_id)) submissions = selected_libraries[lib_id] report.append('{0}'.format(submissions[0].name)) @@ -494,7 +548,6 @@ def library_to_freeze(selected_libraries): for sub in submissions: date = date_to_freeze(sub.date) batched.setdefault(date, []).append(sub) - print lib_id, batched for d in freezes: report.append('') for s in batched.get(d, []): @@ -509,12 +562,12 @@ def library_to_freeze(selected_libraries): report.append("") return "\n".join(report) - + def date_to_freeze(d): - freezes = [ (datetime(2010, 1, 30), '2010-Jan'), - (datetime(2010, 7, 30), '2010-Jul'), - (datetime(2011, 1, 30), '2011-Jan'), - ] + freezes = [(datetime(2010, 1, 30), '2010-Jan'), + (datetime(2010, 7, 30), '2010-Jul'), + (datetime(2011, 1, 30), '2011-Jan'), + ] for end, name in freezes: if d < end: return name @@ -523,4 +576,3 @@ def date_to_freeze(d): if __name__ == "__main__": main() - diff --git a/htsworkflow/submission/daf.py b/htsworkflow/submission/daf.py index b8a4177..1be882b 100644 --- a/htsworkflow/submission/daf.py +++ b/htsworkflow/submission/daf.py @@ -24,47 +24,64 @@ from htsworkflow.util.hashfile import make_md5sum logger = logging.getLogger(__name__) -# -class ModelException(RuntimeError): pass + +class ModelException(RuntimeError): + """Assumptions about the RDF model failed""" + pass + + class MetadataLookupException(RuntimeError): """Problem accessing metadata""" pass + # STATES DAF_HEADER = 1 DAF_VIEW = 2 -def parse_into_model(model, submission_name, filename): + +def parse_into_model(model, subject, filename): """Read a DAF into RDF Model - requires a short submission name + requires a subject node to attach statements to """ attributes = parse(filename) - add_to_model(model, attributes, submission_name) + add_to_model(model, attributes, subject) -def fromstream_into_model(model, submission_name, daf_stream): + +def fromstream_into_model(model, subject, daf_stream): + """Load daf stream into model attached to node subject + """ attributes = parse_stream(daf_stream) - add_to_model(model, attributes, submission_name) - -def fromstring_into_model(model, submission_name, daf_string): + add_to_model(model, attributes, subject) + + +def fromstring_into_model(model, subject, daf_string): """Read a string containing a DAF into RDF Model requires a short submission name """ attributes = fromstring(daf_string) - add_to_model(model, attributes, submission_name) - + add_to_model(model, attributes, subject) + + def parse(filename): - stream = open(filename,'r') - attributes = parse_stream(stream) + """Parse daf from a file + """ + stream = open(filename, 'r') + attributes = parse_stream(stream) stream.close() return attributes + def fromstring(daf_string): + """Parse UCSC daf from a provided string""" stream = StringIO(daf_string) return parse_stream(stream) + def parse_stream(stream): + """Parse UCSC dat stored in a stream""" comment_re = re.compile("#.*$") state = DAF_HEADER @@ -84,7 +101,7 @@ def parse_stream(stream): value = True elif value.lower() in ('no',): value = False - + if len(name) == 0: if view_name is not None: attributes['views'][view_name] = view_attributes @@ -92,7 +109,7 @@ def parse_stream(stream): view_attributes = {} state = DAF_HEADER elif state == DAF_HEADER and name == 'variables': - attributes[name] = [ x.strip() for x in value.split(',')] + attributes[name] = [x.strip() for x in value.split(',')] elif state == DAF_HEADER and name == 'view': view_name = value view_attributes['view'] = value @@ -105,36 +122,51 @@ def parse_stream(stream): # save last block if view_name is not None: attributes['views'][view_name] = view_attributes - + return attributes + def _consume_whitespace(line, start=0): + """return index of next non whitespace character + + returns length of string if it can't find anything + """ for i in xrange(start, len(line)): if line[i] not in string.whitespace: return i - + return len(line) + def _extract_name_index(line, start=0): + """Used to find end of word by looking for a whitespace character + + returns length of string if nothing matches + """ for i in xrange(start, len(line)): if line[i] in string.whitespace: return i - + return len(line) + def _extract_value_index(line, start=0): + """Returns position of last non-whitespace character + """ shortline = line.rstrip() return len(shortline) -def convert_to_rdf_statements(attributes, name): - submission_uri = get_submission_uri(name) - subject = RDF.Node(submission_uri) +def convert_to_rdf_statements(attributes, subject): + """Convert dictionary of DAF attributes into rdf statements + + The statements are attached to the provided subject node + """ statements = [] for daf_key in attributes: predicate = dafTermOntology[daf_key] if daf_key == 'views': - statements.extend(_views_to_statements(name, + statements.extend(_views_to_statements(subject, dafTermOntology, attributes[daf_key])) elif daf_key == 'variables': @@ -145,13 +177,15 @@ def convert_to_rdf_statements(attributes, name): else: value = attributes[daf_key] obj = toTypedNode(value) - statements.append(RDF.Statement(subject,predicate,obj)) + statements.append(RDF.Statement(subject, predicate, obj)) return statements -def _views_to_statements(name, dafNS, views): - subject = RDF.Node(get_submission_uri(name)) - viewNS = get_view_namespace(name) + +def _views_to_statements(subject, dafNS, views): + """Attach view attributes to new view nodes atached to provided subject + """ + viewNS = get_view_namespace(subject) statements = [] for view_name in views: @@ -163,23 +197,38 @@ def _views_to_statements(name, dafNS, views): for view_attribute_name in view_attributes: predicate = dafNS[view_attribute_name] obj = toTypedNode(view_attributes[view_attribute_name]) - statements.append(RDF.Statement(viewSubject, predicate, obj)) - + statements.append(RDF.Statement(viewSubject, predicate, obj)) + #statements.extend(convert_to_rdf_statements(view, viewNode)) return statements -def add_to_model(model, attributes, name): - for statement in convert_to_rdf_statements(attributes, name): + +def add_to_model(model, attributes, subject): + for statement in convert_to_rdf_statements(attributes, subject): model.add_statement(statement) - + + def get_submission_uri(name): - return submissionLog[name].uri + return submissionLog[name].uri + + +def submission_uri_to_string(submission_uri): + if isinstance(submission_uri, RDF.Node): + submission_uri = str(submission_uri.uri) + elif isinstance(submission_uri, RDF.Uri): + submission_uri = str(submission_uri) + if submission_uri[-1] != '/': + submission_uri += '/' + return submission_uri -def get_view_namespace(name): - submission_uri = get_submission_uri(name) - viewNS = RDF.NS(str(submission_uri) + '/view/') + +def get_view_namespace(submission_uri): + submission_uri = submission_uri_to_string(submission_uri) + view_uri = urlparse.urljoin(submission_uri, 'view/') + viewNS = RDF.NS(view_uri) return viewNS + class DAFMapper(object): """Convert filenames to views in the UCSC Daf """ @@ -200,6 +249,9 @@ class DAFMapper(object): logger.error("We need a DAF or Model containing a DAF to work") self.name = name + self.submissionSet = get_submission_uri(self.name) + self.viewNS = get_view_namespace(self.submissionSet) + if model is not None: self.model = model else: @@ -207,46 +259,40 @@ class DAFMapper(object): if hasattr(daf_file, 'next'): # its some kind of stream - fromstream_into_model(self.model, name, daf_file) + fromstream_into_model(self.model, self.submissionSet, daf_file) else: # file - parse_into_model(self.model, name, daf_file) + parse_into_model(self.model, self.submissionSet, daf_file) self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/') - self.submissionSet = get_submission_uri(self.name) - self.submissionSetNS = RDF.NS(str(self.submissionSet)+'/') + self.submissionSetNS = RDF.NS(str(self.submissionSet) + '/') self.__view_map = None - def add_pattern(self, view_name, filename_pattern): """Map a filename regular expression to a view name """ - viewNS = get_view_namespace(self.name) - obj = toTypedNode(filename_pattern) self.model.add_statement( - RDF.Statement(viewNS[view_name], + RDF.Statement(self.viewNS[view_name], dafTermOntology['filename_re'], obj)) - def import_submission_dir(self, submission_dir, library_id): """Import a submission directories and update our model as needed """ #attributes = get_filename_attribute_map(paired) libNode = self.libraryNS[library_id + "/"] - + self._add_library_details_to_model(libNode) - + submission_files = os.listdir(submission_dir) - for f in submission_files: - self.construct_file_attributes(submission_dir, libNode, f) + for filename in submission_files: + self.construct_track_attributes(submission_dir, libNode, filename) - - def construct_file_attributes(self, submission_dir, libNode, pathname): + def construct_track_attributes(self, submission_dir, libNode, pathname): """Looking for the best extension The 'best' is the longest match - + :Args: filename (str): the filename whose extention we are about to examine """ @@ -255,7 +301,7 @@ class DAFMapper(object): logger.debug("Searching for view") view = self.find_view(filename) if view is None: - logger.warn("Unrecognized file: %s" % (pathname,)) + logger.warn("Unrecognized file: {0}".format(pathname)) return None if str(view) == str(libraryOntology['ignore']): return None @@ -263,55 +309,81 @@ class DAFMapper(object): submission_name = self.make_submission_name(submission_dir) submissionNode = self.get_submission_node(submission_dir) submission_uri = str(submissionNode.uri) - view_name = fromTypedNode(self.model.get_target(view, dafTermOntology['name'])) + view_name = fromTypedNode(self.model.get_target(view, + dafTermOntology['name'])) if view_name is None: - logging.warning('Could not find view name for {0}'.format(str(view))) + errmsg = 'Could not find view name for {0}' + logging.warning(errmsg.format(str(view))) return - + view_name = str(view_name) submissionView = RDF.Node(RDF.Uri(submission_uri + '/' + view_name)) self.model.add_statement( - RDF.Statement(self.submissionSet, dafTermOntology['has_submission'], submissionNode)) + RDF.Statement(self.submissionSet, + dafTermOntology['has_submission'], + submissionNode)) logger.debug("Adding statements to {0}".format(str(submissionNode))) - self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['has_view'], submissionView)) - self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['name'], toTypedNode(submission_name))) - self.model.add_statement(RDF.Statement(submissionNode, rdfNS['type'], submissionOntology['submission'])) - self.model.add_statement(RDF.Statement(submissionNode, submissionOntology['library'], libNode)) + self.model.add_statement(RDF.Statement(submissionNode, + submissionOntology['has_view'], + submissionView)) + self.model.add_statement(RDF.Statement(submissionNode, + submissionOntology['name'], + toTypedNode(submission_name))) + self.model.add_statement( + RDF.Statement(submissionNode, + rdfNS['type'], + submissionOntology['submission'])) + self.model.add_statement(RDF.Statement(submissionNode, + submissionOntology['library'], + libNode)) logger.debug("Adding statements to {0}".format(str(submissionView))) - # add trac specific information + # add track specific information self.model.add_statement( RDF.Statement(submissionView, dafTermOntology['view'], view)) self.model.add_statement( - RDF.Statement(submissionView, dafTermOntology['paired'], toTypedNode(self._is_paired(libNode)))) + RDF.Statement(submissionView, + dafTermOntology['paired'], + toTypedNode(self._is_paired(libNode)))) self.model.add_statement( - RDF.Statement(submissionView, dafTermOntology['submission'], submissionNode)) + RDF.Statement(submissionView, + dafTermOntology['submission'], + submissionNode)) - # extra information + # extra information terms = [dafTermOntology['type'], dafTermOntology['filename_re'], ] terms.extend((dafTermOntology[v] for v in self.get_daf_variables())) + # add file specific information + self.create_file_attributes(filename, submissionView, submission_uri, submission_dir) + + logger.debug("Done.") + + def create_file_attributes(self, filename, submissionView, submission_uri, submission_dir): # add file specific information logger.debug("Updating file md5sum") fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename)) submission_pathname = os.path.join(submission_dir, filename) - md5 = make_md5sum(submission_pathname) self.model.add_statement( - RDF.Statement(submissionView, dafTermOntology['has_file'], fileNode)) + RDF.Statement(submissionView, + dafTermOntology['has_file'], + fileNode)) self.model.add_statement( - RDF.Statement(fileNode, dafTermOntology['filename'], filename)) + RDF.Statement(fileNode, + dafTermOntology['filename'], + filename)) + md5 = make_md5sum(submission_pathname) if md5 is None: - logging.warning("Unable to produce md5sum for %s" % ( submission_pathname)) + errmsg = "Unable to produce md5sum for {0}" + logging.warning(errmsg.format(submission_pathname)) else: self.model.add_statement( RDF.Statement(fileNode, dafTermOntology['md5sum'], md5)) - logger.debug("Done.") - def _add_library_details_to_model(self, libNode): parser = RDF.Parser(name='rdfa') new_statements = parser.parse_as_stream(libNode.uri) @@ -328,7 +400,7 @@ class DAFMapper(object): results = ['view'] if self.need_replicate(): results.append('replicate') - + for obj in self.model.get_targets(self.submissionSet, variableTerm): value = str(fromTypedNode(obj)) results.append(value) @@ -340,15 +412,15 @@ class DAFMapper(object): submission_dir_name = os.path.split(submission_dir)[1] if len(submission_dir_name) == 0: raise RuntimeError( - "Submission dir name too short: %s" %(submission_dir,)) + "Submission dir name too short: {0}".format(submission_dir)) return submission_dir_name - + def get_submission_node(self, submission_dir): """Convert a submission directory name to a submission node """ submission_name = self.make_submission_name(submission_dir) return self.submissionSetNS[submission_name] - + def _get_library_attribute(self, libNode, attribute): if not isinstance(attribute, RDF.Node): attribute = libraryOntology[attribute] @@ -362,7 +434,7 @@ class DAFMapper(object): #targets = self._search_same_as(libNode, attribute) #if targets is not None: # return self._format_library_attribute(targets) - + # we don't know anything about this attribute self._add_library_details_to_model(libNode) @@ -371,7 +443,7 @@ class DAFMapper(object): return self._format_library_attribute(targets) return None - + def _format_library_attribute(self, targets): if len(targets) == 0: return None @@ -388,7 +460,7 @@ class DAFMapper(object): if len(targets) > 0: return targets return None - + def find_view(self, filename): """Search through potential DAF filename patterns """ @@ -409,9 +481,10 @@ class DAFMapper(object): return results[0] else: return None - + def get_view_name(self, view): - names = list(self.model.get_targets(view, submissionOntology['view_name'])) + view_term = submissionOntology['view_name'] + names = list(self.model.get_targets(view, view_term)) if len(names) == 1: return fromTypedNode(names[0]) else: @@ -419,8 +492,7 @@ class DAFMapper(object): msg = msg.format(str(view), len(names)) logger.error(msg) raise RuntimeError(msg) - - + def _get_filename_view_map(self): """Query our model for filename patterns @@ -443,16 +515,19 @@ class DAFMapper(object): def _get_library_url(self): return str(self.libraryNS[''].uri) + def _set_library_url(self, value): self.libraryNS = RDF.NS(str(value)) + library_url = property(_get_library_url, _set_library_url) def _is_paired(self, libNode): """Determine if a library is paired end""" library_type = self._get_library_attribute(libNode, 'library_type') if library_type is None: - raise ModelException("%s doesn't have a library type" % (str(libNode),)) - + errmsg = "%s doesn't have a library type" + raise ModelException(errmsg % (str(libNode),)) + #single = (1,3,6) single = ['Single End', 'Small RNA', 'CSHL (lacking last nt)'] paired = ['Paired End', 'Multiplexing', 'Barcoded'] @@ -475,5 +550,5 @@ class DAFMapper(object): replicate = self.model.get_target(view, replicateTerm) if fromTypedNode(replicate): return True - + return False diff --git a/htsworkflow/submission/test/test_daf.py b/htsworkflow/submission/test/test_daf.py index 4f77799..3d31c95 100644 --- a/htsworkflow/submission/test/test_daf.py +++ b/htsworkflow/submission/test/test_daf.py @@ -20,7 +20,7 @@ import RDF test_daf = """# Lab and general info grant Hardison lab Caltech-m -dataType ChipSeq +dataType ChipSeq variables cell, antibody,sex,age,strain,control compositeSuffix CaltechHistone assembly mm9 @@ -44,7 +44,7 @@ required no test_daf_no_rep = """# Lab and general info grant Hardison lab Caltech-m -dataType ChipSeq +dataType ChipSeq variables cell, antibody,sex,age,strain,control compositeSuffix CaltechHistone assembly mm9 @@ -63,7 +63,7 @@ class TestDAF(unittest.TestCase): def test_parse(self): parsed = daf.fromstring(test_daf) - + self.failUnlessEqual(parsed['assembly'], 'mm9') self.failUnlessEqual(parsed['grant'], 'Hardison') self.failUnlessEqual(len(parsed['variables']), 6) @@ -85,7 +85,7 @@ class TestDAF(unittest.TestCase): name = 'cursub' subNS = RDF.NS(str(submissionLog[name].uri)) - daf.add_to_model(model, parsed, name) + daf.add_to_model(model, parsed, submissionLog[name].uri) signal_view_node = RDF.Node(subNS['/view/Signal'].uri) @@ -101,18 +101,38 @@ class TestDAF(unittest.TestCase): name = model.get_target(signal_view_node, dafTermOntology['name']) self.failUnlessEqual(fromTypedNode(name), u'Signal') + def test_get_view_namespace_from_string(self): + url = "http://jumpgate.caltech.edu/wiki/SubmissionLog/cursub/" + target = RDF.NS(url + 'view/') + view_namespace = daf.get_view_namespace(url) + self.assertEqual(view_namespace[''], target['']) + + def test_get_view_namespace_from_string_no_trailing_slash(self): + url = "http://jumpgate.caltech.edu/wiki/SubmissionLog/cursub" + target = RDF.NS(url + '/view/') + view_namespace = daf.get_view_namespace(url) + self.assertEqual(view_namespace[''], target['']) + + def test_get_view_namespace_from_uri_node(self): + url = "http://jumpgate.caltech.edu/wiki/SubmissionLog/cursub/" + node = RDF.Node(RDF.Uri(url)) + target = RDF.NS(url + 'view/') + view_namespace = daf.get_view_namespace(node) + self.assertEqual(view_namespace[''], target['']) + + def load_daf_mapper(name, extra_statements=None, ns=None, test_daf=test_daf): """Load test model in """ model = get_model() if ns is None: ns="http://extra" - + if extra_statements is not None: parser = RDF.Parser(name='turtle') parser.parse_string_into_model(model, extra_statements, ns) - + test_daf_stream = StringIO(test_daf) mapper = daf.DAFMapper(name, daf_file = test_daf_stream, model=model) return mapper @@ -121,7 +141,7 @@ def dump_model(model): writer = get_serializer() turtle = writer.serialize_model_to_string(model) print turtle - + class TestDAFMapper(unittest.TestCase): def test_create_mapper_add_pattern(self): name = 'testsub' @@ -129,7 +149,7 @@ class TestDAFMapper(unittest.TestCase): pattern = '.bam\Z(?ms)' mapper.add_pattern('Signal', pattern) - s = RDF.Statement(daf.get_view_namespace(name)['Signal'], + s = RDF.Statement(mapper.viewNS['Signal'], dafTermOntology['filename_re'], None) search = list(mapper.model.find_statements(s)) @@ -140,7 +160,7 @@ class TestDAFMapper(unittest.TestCase): str(dafTermOntology['filename_re'])) #self.failUnlessEqual(search[0].object.literal_value['string'], pattern) - + def test_find_one_view(self): name='testfind' extra = '''@prefix dafTerm: . @@ -152,7 +172,7 @@ thisView:FastqRd1 dafTerm:filename_re ".*_r1\\\\.fastq" . daf_mapper = load_daf_mapper(name, extra_statements = extra) view = daf_mapper.find_view('filename_r1.fastq') - + # dump_model(daf_mapper.model) view_root = 'http://jumpgate.caltech.edu/wiki/SubmissionsLog/{0}/view/' view_root = view_root.format(name) @@ -185,7 +205,7 @@ thisView:Signal dafTerm:filename_re ".*\\\\.bam" ; submissionOntology:view_name "Signal" . thisView:FastqRd1 dafTerm:filename_re ".*\\\\.fastq" ; submissionOntology:view_name "FastqRd1" . -<%(libUrl)s> <%(libraryOntology)sgel_cut> "100"^^xsd:decimal . +<%(libUrl)s> <%(libraryOntology)sgel_cut> "100"^^xsd:decimal . ''' % {'libraryOntology': 'http://jumpgate.caltech.edu/wiki/LibraryOntology#', 'libUrl': lib_url} @@ -203,13 +223,12 @@ thisView:FastqRd1 dafTerm:filename_re ".*\\\\.fastq" ; with mktempdir('analysis') as analysis_dir: path, analysis_name = os.path.split(analysis_dir) with mktempfile('.bam', dir=analysis_dir) as filename: - print 'dir', os.listdir(analysis_dir) - daf_mapper.construct_file_attributes(analysis_dir, - libNode, - filename) - + daf_mapper.construct_track_attributes(analysis_dir, + libNode, + filename) + #dump_model(daf_mapper.model) - + sub_root = "http://jumpgate.caltech.edu/wiki/SubmissionsLog/testfind/" submission_name = sub_root + analysis_name source = daf_mapper.model.get_source(rdfNS['type'], submissionOntology['submission']) @@ -219,7 +238,7 @@ thisView:FastqRd1 dafTerm:filename_re ".*\\\\.fastq" ; view = daf_mapper.model.get_target(source, submissionOntology['has_view']) self.failUnlessEqual(str(view.uri), view_name) - + def test_library_url(self): daf_mapper = load_daf_mapper('urltest') @@ -232,12 +251,12 @@ thisView:FastqRd1 dafTerm:filename_re ".*\\\\.fastq" ; daf_mapper = load_daf_mapper('test_rep') self.failUnlessEqual(daf_mapper.need_replicate(), True) self.failUnless('replicate' in daf_mapper.get_daf_variables()) - + def test_daf_without_replicate(self): daf_mapper = load_daf_mapper('test_rep',test_daf=test_daf_no_rep) self.failUnlessEqual(daf_mapper.need_replicate(), False) self.failUnless('replicate' not in daf_mapper.get_daf_variables()) - + @contextmanager def mktempdir(prefix='tmp'): d = tempfile.mkdtemp(prefix=prefix) @@ -255,7 +274,7 @@ def mktempfile(suffix='', prefix='tmp', dir=None): os.unlink(pathname) print "unmade", pathname - + def suite(): suite = unittest.makeSuite(TestDAF, 'test') suite.addTest(unittest.makeSuite(TestDAFMapper, 'test')) -- 2.30.2