From: Diane Trout Date: Sat, 18 Jun 2011 20:31:03 +0000 (-0700) Subject: Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow X-Git-Tag: 0.5.2~15 X-Git-Url: http://woldlab.caltech.edu/gitweb/?a=commitdiff_plain;h=1011742ef95a437d53e4cea23b63e21c62c06a32;hp=d10f1bff171674bbc817fed09b83842a377cd799;p=htsworkflow.git Merge branch 'master' of mus.cacr.caltech.edu:htsworkflow --- diff --git a/extra/ucsc_encode_submission/encode_find.py b/extra/ucsc_encode_submission/encode_find.py index 3ac4f04..cd52c22 100644 --- a/extra/ucsc_encode_submission/encode_find.py +++ b/extra/ucsc_encode_submission/encode_find.py @@ -16,22 +16,25 @@ import sys import urllib from htsworkflow.util import api +from htsworkflow.util.rdfhelp import \ + dublinCoreNS, \ + submitOntology, \ + libraryOntology, \ + rdfNS, \ + rdfsNS, \ + xsdNS + +# URL mappings +libraryNS = RDF.NS("http://jumpgate.caltech.edu/library/") + +from htsworkflow.submission.ucsc import submission_view_url, UCSCEncodePipeline +ddfNS = RDF.NS(RDF.Uri(UCSCEncodePipeline + "/download_ddf#")) + DBDIR = os.path.expanduser("~diane/proj/submission") logger = logging.getLogger("encode_find") -libraryNS = RDF.NS("http://jumpgate.caltech.edu/library/") -submissionNS = RDF.NS("http://encodesubmit.ucsc.edu/pipeline/show/") -submitOntologyNS = RDF.NS("http://jumpgate.caltech.edu/wiki/UCSCSubmissionOntology#") -ddfNS = RDF.NS("http://encodesubmit.ucsc.edu/pipeline/download_ddf#") -libOntNS = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#") - -dublinCoreNS = RDF.NS("http://purl.org/dc/elements/1.1/") -rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#") -rdfsNS= RDF.NS("http://www.w3.org/2000/01/rdf-schema#") -xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#") - LOGIN_URL = 'http://encodesubmit.ucsc.edu/account/login' USER_URL = 'http://encodesubmit.ucsc.edu/pipeline/show_user' @@ -126,17 +129,17 @@ def load_my_submissions(model, cookie=None): # first record is header tr = tr.findNext() TypeN = rdfsNS['type'] - NameN = submitOntologyNS['name'] - SpeciesN = submitOntologyNS['species'] - LibraryURN = submitOntologyNS['library_urn'] + NameN = submitOntology['name'] + SpeciesN = submitOntology['species'] + LibraryURN = submitOntology['library_urn'] while tr is not None: td = tr.findAll('td') if td is not None and len(td) > 1: subUrnText = td[0].contents[0].contents[0].encode(CHARSET) - subUrn = submissionNS[subUrnText] + subUrn = RDF.Uri(submission_view_url(subUrnText)) - add_stmt(model, subUrn, TypeN, submitOntologyNS['Submission']) + add_stmt(model, subUrn, TypeN, submitOntology['Submission']) name = get_contents(td[4]) add_stmt(model, subUrn, NameN, name) @@ -189,7 +192,7 @@ WHERE {{ ?subid submissionOntology:name ?name OPTIONAL {{ ?subid submissionOntology:library_urn ?libid }} FILTER (!bound(?libid)) -}}""".format(submissionOntology=submitOntologyNS[''].uri) +}}""".format(submissionOntology=submitOntology[''].uri) ) results = missing_lib_query.execute(model) @@ -204,7 +207,7 @@ WHERE {{ def add_submission_creation_date(model, subUrn, cookie): # in theory the submission page might have more information on it. - creationDateN = libOntNS['date'] + creationDateN = libraryOntology['date'] dateTimeType = xsdNS['dateTime'] query = RDF.Statement(subUrn, creationDateN, None) creation_dates = list(model.find_statements(query)) @@ -221,9 +224,9 @@ def add_submission_creation_date(model, subUrn, cookie): logger.debug("Found creation date for: {0}".format(str(subUrn))) def update_submission_detail(model, subUrn, status, recent_update, cookie): - HasStatusN = submitOntologyNS['has_status'] - StatusN = submitOntologyNS['status'] - LastModifyN = submitOntologyNS['last_modify_date'] + HasStatusN = submitOntology['has_status'] + StatusN = submitOntology['status'] + LastModifyN = submitOntology['last_modify_date'] status_nodes_query = RDF.Statement(subUrn, HasStatusN, None) status_nodes = list(model.find_statements(status_nodes_query)) @@ -287,7 +290,7 @@ def add_ddf_statements(model, statusNode, ddf_string): for f in files: fileNode = RDF.Node() - add_stmt(model, statusNode, submitOntologyNS['has_file'], fileNode) + add_stmt(model, statusNode, submitOntology['has_file'], fileNode) add_stmt(model, fileNode, rdfsNS['type'], ddfNS['file']) add_stmt(model, fileNode, ddfNS['filename'], f) @@ -302,7 +305,7 @@ def load_encode_libraries(model, htswapi): rdfaParser = RDF.Parser(name='rdfa') print encodeUrl rdfaParser.parse_into_model(model, encodeUrl) - query = RDF.Statement(None, libOntNS['library_id'], None) + query = RDF.Statement(None, libraryOntology['library_id'], None) libraries = model.find_statements(query) for statement in libraries: libraryUrn = statement.subject @@ -313,7 +316,7 @@ def load_library_detail(model, libraryUrn): """Grab detail information from library page """ rdfaParser = RDF.Parser(name='rdfa') - query = RDF.Statement(libraryUrn, libOntNS['date'], None) + query = RDF.Statement(libraryUrn, libraryOntology['date'], None) results = list(model.find_statements(query)) if len(results) == 0: logger.info("Loading {0}".format(str(libraryUrn))) @@ -372,7 +375,7 @@ def load_into_model(model, parser_name, filename): data = open(filename, 'r').read() rdf_parser = RDF.Parser(name=parser_name) - ns_uri = submitOntologyNS[''].uri + ns_uri = submitOntology[''].uri rdf_parser.parse_string_into_model(model, data, ns_uri) def add_stmt(model, subject, predicate, object): @@ -502,7 +505,7 @@ def library_to_freeze(selected_libraries): for d in freezes: report.append('') for s in batched.get(d, []): - show_url = submissionNS[s.subid].uri + show_url = submission_view_url(s.subid) subid = '{1}'.format(show_url, s.subid) report.append("{0}:{1}".format(subid, s.status)) report.append('') diff --git a/extra/ucsc_encode_submission/ucsc_gather.py b/extra/ucsc_encode_submission/ucsc_gather.py index 38d8dee..c65feec 100755 --- a/extra/ucsc_encode_submission/ucsc_gather.py +++ b/extra/ucsc_encode_submission/ucsc_gather.py @@ -20,11 +20,7 @@ import urllib2 import urlparse from htsworkflow.util import api -from htsworkflow.pipelines.sequences import \ - create_sequence_table, \ - scan_for_sequences -from htsworkflow.pipelines import qseq2fastq -from htsworkflow.pipelines import srf2fastq +from htsworkflow.submission.condorfastq import CondorFastqExtract def main(cmdline=None): parser = make_parser() @@ -56,11 +52,9 @@ def main(cmdline=None): link_daf(opts.daf, library_result_map) if opts.fastq: - build_fastqs(opts.host, - apidata, - opts.sequence, - library_result_map, - force=opts.force) + extractor = CondorFastqExtract(opts.host, apidata, opts.sequence, + force=opts.force) + extractor.build_fastqs(library_result_map) if opts.ini: make_submission_ini(opts.host, apidata, library_result_map) @@ -120,130 +114,6 @@ def make_tree_from(source_path, library_result_map): logging.info( 'LINK {0} to {1}'.format(source_pathname, target_pathname)) -def build_fastqs(host, apidata, sequences_path, library_result_map, - force=False ): - """ - Generate condor scripts to build any needed fastq files - - Args: - host (str): root of the htsworkflow api server - apidata (dict): id & key to post to the server - sequences_path (str): root of the directory tree to scan for files - library_result_map (list): [(library_id, destination directory), ...] - """ - qseq_condor_header = """ -Universe=vanilla -executable=%(exe)s -error=log/qseq2fastq.err.$(process).log -output=log/qseq2fastq.out.$(process).log -log=log/qseq2fastq.log - -""" % {'exe': sys.executable } - qseq_condor_entries = [] - srf_condor_header = """ -Universe=vanilla -executable=%(exe)s -output=log/srf_pair_fastq.out.$(process).log -error=log/srf_pair_fastq.err.$(process).log -log=log/srf_pair_fastq.log -environment="PYTHONPATH=/home/diane/lib/python2.6/site-packages:/home/diane/proj/solexa/gaworkflow PATH=/woldlab/rattus/lvol0/mus/home/diane/bin:/usr/bin:/bin" - -""" % {'exe': sys.executable } - srf_condor_entries = [] - lib_db = find_archive_sequence_files(host, - apidata, - sequences_path, - library_result_map) - - needed_targets = find_missing_targets(library_result_map, lib_db, force) - - for target_pathname, available_sources in needed_targets.items(): - logging.debug(' target : %s' % (target_pathname,)) - logging.debug(' candidate sources: %s' % (available_sources,)) - if available_sources.has_key('qseq'): - source = available_sources['qseq'] - qseq_condor_entries.append( - condor_qseq_to_fastq(source.path, - target_pathname, - source.flowcell, - force=force) - ) - elif available_sources.has_key('srf'): - source = available_sources['srf'] - mid = getattr(source, 'mid_point', None) - srf_condor_entries.append( - condor_srf_to_fastq(source.path, - target_pathname, - source.paired, - source.flowcell, - mid, - force=force) - ) - else: - print " need file", target_pathname - - if len(srf_condor_entries) > 0: - make_submit_script('srf.fastq.condor', - srf_condor_header, - srf_condor_entries) - - if len(qseq_condor_entries) > 0: - make_submit_script('qseq.fastq.condor', - qseq_condor_header, - qseq_condor_entries) - - -def find_missing_targets(library_result_map, lib_db, force=False): - """ - Check if the sequence file exists. - This requires computing what the sequence name is and checking - to see if it can be found in the sequence location. - - Adds seq.paired flag to sequences listed in lib_db[*]['lanes'] - """ - fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq' - fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq' - # find what targets we're missing - needed_targets = {} - for lib_id, result_dir in library_result_map: - lib = lib_db[lib_id] - lane_dict = make_lane_dict(lib_db, lib_id) - - for lane_key, sequences in lib['lanes'].items(): - for seq in sequences: - seq.paired = lane_dict[seq.flowcell]['paired_end'] - lane_status = lane_dict[seq.flowcell]['status'] - - if seq.paired and seq.read is None: - seq.read = 1 - filename_attributes = { - 'flowcell': seq.flowcell, - 'lib_id': lib_id, - 'lane': seq.lane, - 'read': seq.read, - 'cycle': seq.cycle - } - # skip bad runs - if lane_status == 'Failed': - continue - if seq.flowcell == '30DY0AAXX': - # 30DY0 only ran for 151 bases instead of 152 - # it is actually 76 1st read, 75 2nd read - seq.mid_point = 76 - - # end filters - if seq.paired: - target_name = fastq_paired_template % filename_attributes - else: - target_name = fastq_single_template % filename_attributes - - target_pathname = os.path.join(result_dir, target_name) - if force or not os.path.exists(target_pathname): - t = needed_targets.setdefault(target_pathname, {}) - t[seq.filetype] = seq - - return needed_targets - def link_daf(daf_path, library_result_map): if not os.path.exists(daf_path): @@ -319,17 +189,6 @@ def make_submission_ini(host, apidata, library_result_map, paired=True): f.write(os.linesep.join(inifile)) -def make_lane_dict(lib_db, lib_id): - """ - Convert the lane_set in a lib_db to a dictionary - indexed by flowcell ID - """ - result = [] - for lane in lib_db[lib_id]['lane_set']: - result.append((lane['flowcell'], lane)) - return dict(result) - - def make_all_ddfs(library_result_map, daf_name, make_condor=True, force=False): dag_fragment = [] for lib_id, result_dir in library_result_map: @@ -525,93 +384,6 @@ def get_library_info(host, apidata, library_id): return contents -def condor_srf_to_fastq(srf_file, target_pathname, paired, flowcell=None, - mid=None, force=False): - py = srf2fastq.__file__ - args = [ py, srf_file, ] - if paired: - args.extend(['--left', target_pathname]) - # this is ugly. I did it because I was pregenerating the target - # names before I tried to figure out what sources could generate - # those targets, and everything up to this point had been - # one-to-one. So I couldn't figure out how to pair the - # target names. - # With this at least the command will run correctly. - # however if we rename the default targets, this'll break - # also I think it'll generate it twice. - args.extend(['--right', - target_pathname.replace('_r1.fastq', '_r2.fastq')]) - else: - args.extend(['--single', target_pathname ]) - if flowcell is not None: - args.extend(['--flowcell', flowcell]) - - if mid is not None: - args.extend(['-m', str(mid)]) - - if force: - args.extend(['--force']) - - script = """ -arguments="%s" -queue -""" % (" ".join(args),) - - return script - - -def condor_qseq_to_fastq(qseq_file, target_pathname, flowcell=None, force=False): - py = qseq2fastq.__file__ - args = [py, '-i', qseq_file, '-o', target_pathname ] - if flowcell is not None: - args.extend(['-f', flowcell]) - script = """ -arguments="%s" -queue -""" % (" ".join(args)) - - return script - -def find_archive_sequence_files(host, apidata, sequences_path, - library_result_map): - """ - Find all the archive sequence files possibly associated with our results. - - """ - logging.debug("Searching for sequence files in: %s" %(sequences_path,)) - - lib_db = {} - seq_dirs = set() - #seq_dirs = set(os.path.join(sequences_path, 'srfs')) - candidate_lanes = {} - for lib_id, result_dir in library_result_map: - lib_info = get_library_info(host, apidata, lib_id) - lib_info['lanes'] = {} - lib_db[lib_id] = lib_info - - for lane in lib_info['lane_set']: - lane_key = (lane['flowcell'], lane['lane_number']) - candidate_lanes[lane_key] = lib_id - seq_dirs.add(os.path.join(sequences_path, - 'flowcells', - lane['flowcell'])) - logging.debug("Seq_dirs = %s" %(unicode(seq_dirs))) - candidate_seq_list = scan_for_sequences(seq_dirs) - - # at this point we have too many sequences as scan_for_sequences - # returns all the sequences in a flowcell directory - # so lets filter out the extras - - for seq in candidate_seq_list: - lane_key = (seq.flowcell, seq.lane) - lib_id = candidate_lanes.get(lane_key, None) - if lib_id is not None: - lib_info = lib_db[lib_id] - lib_info['lanes'].setdefault(lane_key, set()).add(seq) - - return lib_db - - class NameToViewMap(object): """Determine view attributes for a given submission file name """ @@ -855,32 +627,6 @@ def make_condor_name(pathname, run_type=None): return ".".join(elements) -def make_submit_script(target, header, body_list): - """ - write out a text file - - this was intended for condor submit scripts - - Args: - target (str or stream): - if target is a string, we will open and close the file - if target is a stream, the caller is responsible. - - header (str); - header to write at the beginning of the file - body_list (list of strs): - a list of blocks to add to the file. - """ - if type(target) in types.StringTypes: - f = open(target,"w") - else: - f = target - f.write(header) - for entry in body_list: - f.write(entry) - if type(target) in types.StringTypes: - f.close() - def parse_filelist(file_string): return file_string.split(",") diff --git a/htsworkflow/frontend/templates/samples/library_detail.html b/htsworkflow/frontend/templates/samples/library_detail.html index df7af9e..0cff58b 100644 --- a/htsworkflow/frontend/templates/samples/library_detail.html +++ b/htsworkflow/frontend/templates/samples/library_detail.html @@ -126,7 +126,7 @@ {% for lane in lib.lane_set.all %} {{ lane.flowcell.flowcell_id }} + >{{ lane.flowcell.flowcell_id }} {{ lane.lane_number }} diff --git a/htsworkflow/submission/__init__.py b/htsworkflow/submission/__init__.py new file mode 100644 index 0000000..bbbe4df --- /dev/null +++ b/htsworkflow/submission/__init__.py @@ -0,0 +1 @@ +"""Utilities to help with submitting results to public repositories""" diff --git a/htsworkflow/submission/condorfastq.py b/htsworkflow/submission/condorfastq.py new file mode 100644 index 0000000..3833dc5 --- /dev/null +++ b/htsworkflow/submission/condorfastq.py @@ -0,0 +1,280 @@ +"""Convert srf and qseq archive files to fastqs +""" +import logging +import os +import sys +import types + +from htsworkflow.frontend.samples.results import parse_flowcell_id +from htsworkflow.pipelines.sequences import scan_for_sequences +from htsworkflow.pipelines import qseq2fastq +from htsworkflow.pipelines import srf2fastq +from htsworkflow.util.api import HtswApi + +logger = logging.getLogger(__name__) + +class CondorFastqExtract(object): + def __init__(self, host, apidata, sequences_path, + log_path='log', + force=False): + """Extract fastqs from results archive + + Args: + host (str): root of the htsworkflow api server + apidata (dict): id & key to post to the server + sequences_path (str): root of the directory tree to scan for files + log_path (str): where to put condor log files + force (bool): do we force overwriting current files? + """ + self.api = HtswApi(host, apidata) + self.sequences_path = sequences_path + self.log_path = log_path + self.force = force + + def build_fastqs(self, library_result_map ): + """ + Generate condor scripts to build any needed fastq files + + Args: + library_result_map (list): [(library_id, destination directory), ...] + """ + qseq_condor_header = self.get_qseq_condor_header() + qseq_condor_entries = [] + srf_condor_header = self.get_srf_condor_header() + srf_condor_entries = [] + lib_db = self.find_archive_sequence_files(library_result_map) + + needed_targets = self.find_missing_targets(library_result_map, lib_db) + + for target_pathname, available_sources in needed_targets.items(): + logger.debug(' target : %s' % (target_pathname,)) + logger.debug(' candidate sources: %s' % (available_sources,)) + if available_sources.has_key('qseq'): + source = available_sources['qseq'] + qseq_condor_entries.append( + self.condor_qseq_to_fastq(source.path, + target_pathname, + source.flowcell) + ) + elif available_sources.has_key('srf'): + source = available_sources['srf'] + mid = getattr(source, 'mid_point', None) + srf_condor_entries.append( + self.condor_srf_to_fastq(source.path, + target_pathname, + source.paired, + source.flowcell, + mid) + ) + else: + print " need file", target_pathname + + if len(srf_condor_entries) > 0: + make_submit_script('srf.fastq.condor', + srf_condor_header, + srf_condor_entries) + + if len(qseq_condor_entries) > 0: + make_submit_script('qseq.fastq.condor', + qseq_condor_header, + qseq_condor_entries) + + + def get_qseq_condor_header(self): + return """Universe=vanilla +executable=%(exe)s +error=%(log)s/qseq2fastq.err.$(process).log +output=%(log)s/qseq2fastq.out.$(process).log +log=%(log)s/qseq2fastq.log + +""" % {'exe': sys.executable, + 'log': self.log_path } + + def get_srf_condor_header(self): + return """Universe=vanilla +executable=%(exe)s +output=%(log)s/srf_pair_fastq.out.$(process).log +error=%(log)s/srf_pair_fastq.err.$(process).log +log=%(log)s/srf_pair_fastq.log +environment="%(env)s" + +""" % {'exe': sys.executable, + 'log': self.log_path, + 'env': os.environ.get('PYTHONPATH', '') + } + + def find_archive_sequence_files(self, library_result_map): + """ + Find archived sequence files associated with our results. + """ + logger.debug("Searching for sequence files in: %s" %(self.sequences_path,)) + + lib_db = {} + seq_dirs = set() + candidate_lanes = {} + for lib_id, result_dir in library_result_map: + lib_info = self.api.get_library(lib_id) + lib_info['lanes'] = {} + lib_db[lib_id] = lib_info + + for lane in lib_info['lane_set']: + lane_key = (lane['flowcell'], lane['lane_number']) + candidate_lanes[lane_key] = lib_id + seq_dirs.add(os.path.join(self.sequences_path, + 'flowcells', + lane['flowcell'])) + logger.debug("Seq_dirs = %s" %(unicode(seq_dirs))) + candidate_seq_list = scan_for_sequences(seq_dirs) + + # at this point we have too many sequences as scan_for_sequences + # returns all the sequences in a flowcell directory + # so lets filter out the extras + + for seq in candidate_seq_list: + lane_key = (seq.flowcell, seq.lane) + lib_id = candidate_lanes.get(lane_key, None) + if lib_id is not None: + lib_info = lib_db[lib_id] + lib_info['lanes'].setdefault(lane_key, set()).add(seq) + + return lib_db + + def find_missing_targets(self, library_result_map, lib_db): + """ + Check if the sequence file exists. + This requires computing what the sequence name is and checking + to see if it can be found in the sequence location. + + Adds seq.paired flag to sequences listed in lib_db[*]['lanes'] + """ + fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq' + fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq' + # find what targets we're missing + needed_targets = {} + for lib_id, result_dir in library_result_map: + lib = lib_db[lib_id] + lane_dict = make_lane_dict(lib_db, lib_id) + + for lane_key, sequences in lib['lanes'].items(): + for seq in sequences: + seq.paired = lane_dict[seq.flowcell]['paired_end'] + lane_status = lane_dict[seq.flowcell]['status'] + + if seq.paired and seq.read is None: + seq.read = 1 + filename_attributes = { + 'flowcell': seq.flowcell, + 'lib_id': lib_id, + 'lane': seq.lane, + 'read': seq.read, + 'cycle': seq.cycle + } + # skip bad runs + if lane_status == 'Failed': + continue + if seq.flowcell == '30DY0AAXX': + # 30DY0 only ran for 151 bases instead of 152 + # it is actually 76 1st read, 75 2nd read + seq.mid_point = 76 + + # end filters + if seq.paired: + target_name = fastq_paired_template % filename_attributes + else: + target_name = fastq_single_template % filename_attributes + + target_pathname = os.path.join(result_dir, target_name) + if self.force or not os.path.exists(target_pathname): + t = needed_targets.setdefault(target_pathname, {}) + t[seq.filetype] = seq + + return needed_targets + + + def condor_srf_to_fastq(self, + srf_file, + target_pathname, + paired, + flowcell=None, + mid=None): + py = srf2fastq.__file__ + args = [ py, srf_file, ] + if paired: + args.extend(['--left', target_pathname]) + # this is ugly. I did it because I was pregenerating the target + # names before I tried to figure out what sources could generate + # those targets, and everything up to this point had been + # one-to-one. So I couldn't figure out how to pair the + # target names. + # With this at least the command will run correctly. + # however if we rename the default targets, this'll break + # also I think it'll generate it twice. + args.extend(['--right', + target_pathname.replace('_r1.fastq', '_r2.fastq')]) + else: + args.extend(['--single', target_pathname ]) + if flowcell is not None: + args.extend(['--flowcell', flowcell]) + + if mid is not None: + args.extend(['-m', str(mid)]) + + if self.force: + args.extend(['--force']) + + script = """arguments="%s" +queue +""" % (" ".join(args),) + + return script + + + def condor_qseq_to_fastq(self, qseq_file, target_pathname, flowcell=None): + py = qseq2fastq.__file__ + args = [py, '-i', qseq_file, '-o', target_pathname ] + if flowcell is not None: + args.extend(['-f', flowcell]) + script = """arguments="%s" +queue +""" % (" ".join(args)) + + return script + +def make_submit_script(target, header, body_list): + """ + write out a text file + + this was intended for condor submit scripts + + Args: + target (str or stream): + if target is a string, we will open and close the file + if target is a stream, the caller is responsible. + + header (str); + header to write at the beginning of the file + body_list (list of strs): + a list of blocks to add to the file. + """ + if type(target) in types.StringTypes: + f = open(target,"w") + else: + f = target + f.write(header) + for entry in body_list: + f.write(entry) + if type(target) in types.StringTypes: + f.close() + +def make_lane_dict(lib_db, lib_id): + """ + Convert the lane_set in a lib_db to a dictionary + indexed by flowcell ID + """ + result = [] + for lane in lib_db[lib_id]['lane_set']: + flowcell_id, status = parse_flowcell_id(lane['flowcell']) + lane['flowcell'] = flowcell_id + result.append((lane['flowcell'], lane)) + return dict(result) + diff --git a/htsworkflow/submission/daf.py b/htsworkflow/submission/daf.py new file mode 100644 index 0000000..cdc9312 --- /dev/null +++ b/htsworkflow/submission/daf.py @@ -0,0 +1,129 @@ +"""Parse UCSC DAF File +""" +import logging +import re +import string +from StringIO import StringIO +import types + +from htsworkflow.util.rdfhelp import blankOrUri, toTypedNode + +logger = logging.getLogger(__name__) + +# STATES +DAF_HEADER = 1 +DAF_VIEW = 2 + + +def parse(filename): + stream = open(filename,'r') + attributes = parse_stream(stream) + stream.close() + return stream + +def fromstring(daf_string): + stream = StringIO(daf_string) + return parse_stream(stream) + +def parse_stream(stream): + comment_re = re.compile("#.*$") + + state = DAF_HEADER + attributes = {'views': {}} + view_name = None + view_attributes = {} + for line in stream: + #remove comments + line = comment_re.sub("", line) + nstop = _extract_name_index(line) + name = line[0:nstop] + sstop = _consume_whitespace(line, start=nstop) + vstop = _extract_value_index(line, start=sstop) + value = line[sstop:vstop] + + if value.lower() in ('yes',): + value = True + elif value.lower() in ('no',): + value = False + + if len(name) == 0: + if view_name is not None: + attributes['views'][view_name] = view_attributes + view_name = None + view_attributes = {} + state = DAF_HEADER + elif state == DAF_HEADER and name == 'variables': + attributes[name] = [ x.strip() for x in value.split(',')] + elif state == DAF_HEADER and name == 'view': + view_name = value + view_attributes['view'] = value + state = DAF_VIEW + elif state == DAF_HEADER: + attributes[name] = value + elif state == DAF_VIEW: + view_attributes[name] = value + + # save last block + if view_name is not None: + attributes['views'][view_name] = view_attributes + + return attributes + +def _consume_whitespace(line, start=0): + for i in xrange(start, len(line)): + if line[i] not in string.whitespace: + return i + + return len(line) + +def _extract_name_index(line, start=0): + for i in xrange(start, len(line)): + if line[i] in string.whitespace: + return i + + return len(line) + +def _extract_value_index(line, start=0): + shortline = line.rstrip() + return len(shortline) + +try: + import RDF + def convert_to_rdf_statements(attributes, source=None): + ddfNS = RDF.NS("http://encodesubmit.ucsc.edu/pipeline/download_ddf#") + + subject = blankOrUri(source) + + statements = [] + for name in attributes: + predicate = ddfNS[name] + if name == 'views': + predicate = ddfNS['views'] + for view_name in attributes.get('views', []): + view = attributes['views'][view_name] + viewNode = RDF.Node() + statements.append(RDF.Statement(subject, predicate, viewNode)) + statements.extend(convert_to_rdf_statements(view, viewNode)) + elif name == 'variables': + predicate = ddfNS['variables'] + for var in attributes.get('variables', []): + obj = toTypedNode(var) + statements.append(RDF.Statement(subject, predicate, obj)) + else: + value = attributes[name] + obj = toTypedNode(value) + statements.append(RDF.Statement(subject,predicate,obj)) + + return statements + + + def add_to_model(model, attributes, source=None): + for statement in convert_to_rdf_statements(attributes, source): + model.add_statement(statement) + +except ImportError, e: + def convert_to_rdf_statements(attributes, source=None): + raise NotImplementedError("librdf not installed") + def add_to_model(model, attributes, source=None): + raise NotImplementedError("librdf not installed") + diff --git a/htsworkflow/submission/test/test_daf.py b/htsworkflow/submission/test/test_daf.py new file mode 100644 index 0000000..3749446 --- /dev/null +++ b/htsworkflow/submission/test/test_daf.py @@ -0,0 +1,67 @@ +import unittest + +from htsworkflow.submission import daf + +test_daf = """# Lab and general info +grant Hardison +lab Caltech-m +dataType ChipSeq +variables cell, antibody,sex,age,strain,control +compositeSuffix CaltechHistone +assembly mm9 +dafVersion 2.0 +validationSettings validateFiles.bam:mismatches=2,bamPercent=99.9;validateFiles.fastq:quick=1000 + +# Track/view definition +view Peaks +longLabelPrefix Caltech Histone Peaks +type narrowPeak +hasReplicates yes +required no + +view Signal +longLabelPrefix Caltech Histone Signal +type bigWig +hasReplicates yes +required no +""" + +class TestDAF(unittest.TestCase): + def test_parse(self): + + parsed = daf.fromstring(test_daf) + + self.failUnlessEqual(parsed['assembly'], 'mm9') + self.failUnlessEqual(parsed['grant'], 'Hardison') + self.failUnlessEqual(len(parsed['variables']), 6) + self.failUnlessEqual(len(parsed['views']), 2) + self.failUnlessEqual(len(parsed['views']['Peaks']), 5) + self.failUnlessEqual(len(parsed['views']['Signal']), 5) + signal = parsed['views']['Signal'] + self.failUnlessEqual(signal['required'], False) + self.failUnlessEqual(signal['longLabelPrefix'], + 'Caltech Histone Signal') + + def test_rdf(self): + try: + import RDF + + parsed = daf.fromstring(test_daf) + #mem = RDF.Storage(storage_name='hashes', + # options_string='hash-type="memory"'), + mem = RDF.MemoryStorage() + model = RDF.Model(mem) + + daf.add_to_model(model, parsed) + + writer = RDF.Serializer(name='turtle') + print writer.serialize_model_to_string(model) + + except ImportError, e: + print "Skipped test_rdf" + +def suite(): + return unittest.makeSuite(TestDAF, 'test') + +if __name__ == "__main__": + unittest.main(defaultTest='suite') diff --git a/htsworkflow/submission/ucsc.py b/htsworkflow/submission/ucsc.py new file mode 100644 index 0000000..f80629a --- /dev/null +++ b/htsworkflow/submission/ucsc.py @@ -0,0 +1,30 @@ +import urlparse + +UCSCEncodePipeline = "http://encodesubmit.ucsc.edu/pipeline/" + +def ddf_download_url(submission_id): + """Return url to download a DDF for a submission + + >>> ddf_download_url(1234) + 'http://encodesubmit.ucsc.edu/pipeline/download_ddf/1234' + """ + fragment = 'download_ddf/%s' % (submission_id,) + return urlparse.urljoin(UCSCEncodePipeline, fragment) + +def daf_download_url(submission_id): + """Return url to download a DAF for a submission + + >>> daf_download_url(1234) + 'http://encodesubmit.ucsc.edu/pipeline/download_daf/1234' + """ + fragment = 'download_daf/%s' % (submission_id,) + return urlparse.urljoin(UCSCEncodePipeline, fragment) + +def submission_view_url(submission_id): + """Return url to download a DAF for a submission + + >>> submission_view_url(1234) + 'http://encodesubmit.ucsc.edu/pipeline/show/1234' + """ + fragment = 'show/%s' % (submission_id,) + return urlparse.urljoin(UCSCEncodePipeline, fragment) diff --git a/htsworkflow/util/rdfhelp.py b/htsworkflow/util/rdfhelp.py new file mode 100644 index 0000000..7e9deda --- /dev/null +++ b/htsworkflow/util/rdfhelp.py @@ -0,0 +1,42 @@ +"""Helper features for working with librdf +""" +import RDF +import types + +# standard ontology namespaces +dublinCoreNS = RDF.NS("http://purl.org/dc/elements/1.1/") +rdfNS = RDF.NS("http://www.w3.org/1999/02/22-rdf-syntax-ns#") +rdfsNS= RDF.NS("http://www.w3.org/2000/01/rdf-schema#") +xsdNS = RDF.NS("http://www.w3.org/2001/XMLSchema#") + +# internal ontologies +submitOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/UCSCSubmissionOntology#") +libraryOntology = RDF.NS("http://jumpgate.caltech.edu/wiki/LibraryOntology#") + +def blankOrUri(value=None): + node = None + if value is None: + node = RDF.Node() + elif type(value) in types.StringTypes: + node = RDF.Node(uri_string=value) + elif isinstance(value, RDF.Node): + node = value + + return node + + +def toTypedNode(value): + if type(value) == types.BooleanType: + value_type = xsdNS['boolean'].uri + if value: + value = u'1' + else: + value = u'0' + elif type(value) in types.StringTypes: + value_type = xsdNS['string'].uri + else: + value_type = None + value = unicode(value) + + return RDF.Node(literal=value, datatype=value_type) + diff --git a/htsworkflow/util/test/test_alphanum.py b/htsworkflow/util/test/test_alphanum.py index bfb2eda..f7b488d 100644 --- a/htsworkflow/util/test/test_alphanum.py +++ b/htsworkflow/util/test/test_alphanum.py @@ -33,7 +33,3 @@ def suite(): if __name__ == "__main__": unittest.main(defaultTest='suite') - - - - diff --git a/htsworkflow/util/test/test_rdfhelp.py b/htsworkflow/util/test/test_rdfhelp.py new file mode 100644 index 0000000..0364d78 --- /dev/null +++ b/htsworkflow/util/test/test_rdfhelp.py @@ -0,0 +1,45 @@ +import unittest + +from htsworkflow.util.rdfhelp import toTypedNode, blankOrUri +try: + import RDF + + class TestRDFHelp(unittest.TestCase): + def test_typed_node_boolean(self): + node = toTypedNode(True) + self.failUnlessEqual(node.literal_value['string'], u'1') + self.failUnlessEqual(str(node.literal_value['datatype']), + 'http://www.w3.org/2001/XMLSchema#boolean') + + def test_typed_node_string(self): + node = toTypedNode('hello') + self.failUnlessEqual(node.literal_value['string'], u'hello') + self.failUnlessEqual(str(node.literal_value['datatype']), + 'http://www.w3.org/2001/XMLSchema#string') + + def test_blank_or_uri_blank(self): + node = blankOrUri() + self.failUnlessEqual(node.is_blank(), True) + + def test_blank_or_uri_url(self): + s = 'http://google.com' + node = blankOrUri(s) + self.failUnlessEqual(node.is_resource(), True) + self.failUnlessEqual(str(node.uri), s) + + def test_blank_or_uri_node(self): + s = RDF.Node(RDF.Uri('http://google.com')) + node = blankOrUri(s) + self.failUnlessEqual(node.is_resource(), True) + self.failUnlessEqual(node, s) + + def suite(): + return unittest.makeSuite(testRdfHelp, 'test') +except ImportError, e: + print "Unable to test rdfhelp" + + def suite(): + return None + +if __name__ == "__main__": + unittest.main(defaultTest='suite')