From af0aacb07181f8126cfb404d9f50371e5a0045f6 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Fri, 5 Mar 2010 22:41:13 +0000 Subject: [PATCH] Use the HTS workflow API to figure out the library tree. This also needed to search flow flowcell id by the starting name because we still have the status of a flowcell being part of the name in a few places. --- .../frontend/experiments/experiments.py | 2 +- scripts/make-library-tree | 441 +++++++++++------- 2 files changed, 275 insertions(+), 168 deletions(-) diff --git a/htsworkflow/frontend/experiments/experiments.py b/htsworkflow/frontend/experiments/experiments.py index 83b5414..a156d7b 100755 --- a/htsworkflow/frontend/experiments/experiments.py +++ b/htsworkflow/frontend/experiments/experiments.py @@ -23,7 +23,7 @@ def flowcell_information(flowcell_id): Return a dictionary describing a flowcell """ try: - fc = FlowCell.objects.get(flowcell_id=flowcell_id) + fc = FlowCell.objects.get(flowcell_id__startswith=flowcell_id) except FlowCell.DoesNotExist, e: return None diff --git a/scripts/make-library-tree b/scripts/make-library-tree index 50ce7a9..f4e854a 100644 --- a/scripts/make-library-tree +++ b/scripts/make-library-tree @@ -1,62 +1,175 @@ -#!/usr/bin/python -""" -Make a tree of symlinks organized by library id. -""" +#!/usr/bin/env python + from ConfigParser import SafeConfigParser -from glob import glob -import logging -from optparse import OptionParser +import json import logging import os +from optparse import OptionParser import stat -import sys - -from htsworkflow.util import fctracker - -def find_lanes(flowcell_dir, flowcell_id, lane): - lane_name = "s_%s_eland_*" %(lane) - pattern = os.path.join(flowcell_dir, flowcell_id, "*", lane_name) - lanes = glob(pattern) - return lanes - -def make_long_lane_name(flowcell_dir, lane_pathname): - """ - make a name from the eland result file name - """ - if flowcell_dir == lane_pathname[0:len(flowcell_dir)]: - subpath = lane_pathname[len(flowcell_dir):] - long_name = subpath.replace(os.path.sep, "_") - return long_name +import re +import shelve +import urllib +import urllib2 +import urlparse + +eland_re = re.compile('s_(?P\d)(?P_\d)?_eland_') +raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+') +qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2') + +class SequenceFile(object): + def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None): + self.filetype = filetype + self.path = path + self.flowcell = flowcell + self.lane = lane + self.read = read + self.pf = pf + self.cycle = cycle + + def __hash__(self): + return hash(self.key()) + + def key(self): + return (self.flowcell, self.lane) + + def unicode(self): + return unicode(self.path) + + def __repr__(self): + return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path) + + def make_target_name(self, root): + """ + Create target name for where we need to link this sequence too + """ + path, basename = os.path.split(self.path) + # Because the names aren't unque we include the flowcel name + # because there were different eland files for different length + # analyses, we include the cycle length in the name. + if self.filetype == 'eland': + template = "%(flowcell)s_%(cycle)s_%(eland)s" + basename = template % { 'flowcell': self.flowcell, + 'cycle': self.cycle, + 'eland': basename } + # else: + # all the other file types have names that include flowcell/lane + # information and thus are unique so we don't have to do anything + return os.path.join(root, basename) + +def parse_srf(path, filename): + basename, ext = os.path.splitext(filename) + records = basename.split('_') + flowcell = records[4] + lane = int(records[5][0]) + fullpath = os.path.join(path, filename) + return SequenceFile('srf', fullpath, flowcell, lane) + +def parse_qseq(path, filename): + basename, ext = os.path.splitext(filename) + records = basename.split('_') + fullpath = os.path.join(path, filename) + flowcell = records[4] + lane = int(records[5][1]) + read = int(records[6][1]) + return SequenceFile('qseq', fullpath, flowcell, lane, read) + +def parse_fastq(path, filename): + basename, ext = os.path.splitext(filename) + records = basename.split('_') + fullpath = os.path.join(path, filename) + flowcell = records[4] + lane = int(records[5][1]) + read = int(records[6][1]) + if records[-1] == 'pass': + pf = True else: - return None + pf = False + return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf) + +def parse_eland(path, filename, eland_match): + fullpath = os.path.join(path, filename) + rest, cycle = os.path.split(path) + rest, flowcell = os.path.split(rest) + lane = eland_match.group('lane') + read = eland_match.group('read') + return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle) -def parse_srf_directory(srf_dir): +def scan_for_sequences(dirs): + sequences = [] + for d in dirs: + logging.info("Scanning %s for sequences" % (d,)) + for path, dirname, filenames in os.walk(d): + for f in filenames: + seq = None + # find sequence files + if raw_seq_re.match(f): + if f.endswith('.md5'): + continue + elif f.endswith('.srf') or f.endswith('.srf.bz2'): + seq = parse_srf(path, f) + elif qseq_re.match(f): + seq = parse_qseq(path, f) + elif f.endswith('fastq') or f.endswith('.fastq.bz2'): + seq = parse_fastq(path, f) + eland_match = eland_re.match(f) + if eland_match: + if f.endswith('.md5'): + continue + seq = parse_eland(path, f, eland_match) + if seq: + sequences.append(seq) + logging.debug("Found sequence at %s" % (f,)) + + return sequences + +def retrieve_info(url, apidata): """ - search srf_dir for *.srf files + Return a dictionary from the HTSworkflow API + """ + try: + apipayload = urllib.urlencode(apidata) + web = urllib2.urlopen(url, apipayload) + except urllib2.URLError, e: + if e.code == 404: + logging.info("%s was not found" % (url,)) + return None + else: + errmsg = 'URLError: %d %s' % (e.code, e.msg) + raise IOError(errmsg) + + contents = web.read() + headers = web.info() - builds a dictionary indexed by flowcell name. + return json.loads(contents) + +def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey): + """ + compare our flowcell database with our list of sequences and return + a fully populated database """ - flowcells = {} - srfs = glob(os.path.join(srf_dir,'*.srf')) - for pathname in srfs: - path, filename = os.path.split(pathname) - basename, ext = os.path.splitext(filename) - record = basename.split('_') - if len(record) != 6: - logging.error("Unrecognized srf file: %s expected 6 fields got %d" % (pathname,len(record))) - continue - - site = record[0] - date = record[1] - machine = record[2] - runid = record[3] - flowcellid = record[4] - laneid = record[5] - - desc = "_".join([site,date,machine,runid,flowcellid]) - flowcells[flowcellid] = desc - return flowcells + fcdb = shelve.open(fcdb_filename) + libdb = {} + apidata = {'apiid': apiid, 'apikey': apikey} + for seq in sequences: + flowcell = seq.flowcell + flowcell_info = None + + # get info about flowcell from server or shelf + if not fcdb.has_key(flowcell): + url = urlparse.urljoin(baseurl, 'experiments/config/%s/json' % (flowcell,)) + flowcell_info = retrieve_info(url, apidata) + if flowcell_info is not None: + fcdb[flowcell] = flowcell_info + else: + flowcell_info = fcdb[flowcell] + # make library id db + if flowcell_info is not None: + seq_library_id = flowcell_info['lane_set'][unicode(seq.lane)]['library_id'] + libdb.setdefault(seq_library_id, []).append(seq) + + fcdb.sync() + return fcdb, libdb def carefully_make_hardlink(source, destination, dry_run=False): """ @@ -65,88 +178,111 @@ def carefully_make_hardlink(source, destination, dry_run=False): Checking to see if the link already exists and is the same as the link we want to make. If the link already exists and is different, throw an error. + + If we didn't update anything return 0, if we did update + return 1. """ logging.debug("CHECKING: %s -> %s", source, destination) if not os.path.exists(source): logging.warning("%s doesn't exist", source) - return + return 0 if os.path.exists(destination): if os.path.samefile(source, destination): logging.debug('SAME: %s -> %s' % (source, destination)) - return + return 0 else: raise IOError('%s and %s are different files' % \ (source, destination)) - logging.info('Linking: %s -> %s' % (source, destination)) + logging.debug('Linking: %s -> %s' % (source, destination)) - if dry_run: return + # we would do something by this part + if dry_run: return 1 os.link(source, destination) os.chmod(destination, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) - -def link_all_eland_lanes(library_path, flowcell_dir, flowcell_id, lane, dry_run): - """ - find eland files at different alignment lengths - and put each of those in the file - """ - lanes = find_lanes(flowcell_dir, flowcell_id, lane) - for lane_pathname in lanes: - long_name = make_long_lane_name(flowcell_dir, - lane_pathname) - long_pathname = os.path.join(library_path, long_name) - carefully_make_hardlink(lane_pathname, - long_pathname, - dry_run) - -def link_srf_lanes(srf_names, library_path, srf_dir, flowcell_id, lane, dry_run): + return 1 + +def make_library_links(root, library_db, dry_run=False): """ - Link srf files into our library directories. + Make a tree of sequencer roots organized by library id - the srf files must be named: - _____.srf + Root is the root of the library tree + library_db is a dictionary of SequenceFiles organized by library id """ - srf_basename = srf_names.get(flowcell_id, None) - if srf_basename is None: - logging.info("srf file for %s was not found", flowcell_id) - else: - srf_filename = "%s_%s.srf" % (srf_basename, lane) - source = os.path.join(srf_dir, srf_filename) - destination = os.path.join(library_path, srf_filename) - carefully_make_hardlink(source, destination, dry_run) + count = 0 + root = os.path.abspath(root) + for lib_id, sequences in library_db.items(): + target_dir = os.path.join(root, lib_id) + if not os.path.exists(target_dir): + logging.info("mkdir %s" % (target_dir,)) + if not dry_run: + os.mkdir(target_dir) + + for s in sequences: + count += carefully_make_hardlink(s.path, + s.make_target_name(target_dir), + dry_run=dry_run) + return count + +def configure_logging(opts): + # setup logging + level = logging.WARN + if opts.verbose: + level = logging.INFO + if opts.debug: + level = logging.DEBUG + logging.basicConfig(level=level) -def make_library_tree(fcdb, library_dir, flowcell_dir, srfs_dir, - dry_run=False): +def configure_opts(opts): """ - Iterate over the library + Load in options from config file """ - library_dir = os.path.normpath(library_dir) + os.path.sep - flowcell_dir = os.path.normpath(flowcell_dir) + os.path.sep - srfs_dir = os.path.normpath(srfs_dir) + os.path.sep - - srf_names = parse_srf_directory(srfs_dir) - - for lib_id, lib in fcdb.library.items(): - library_path = os.path.join(library_dir, str(lib_id)) - if not os.path.exists(library_path): - os.mkdir(library_path) - - for flowcell_id, lane in lib.get('lanes', []): - link_all_eland_lanes(library_path, - flowcell_dir, - flowcell_id, - lane, - dry_run) - - link_srf_lanes(srf_names, - library_path, - srfs_dir, - flowcell_id, - lane, - dry_run) + SECTION_NAME = 'sequence_archive' + ARCHIVE_OPT = 'sequence_archive' + CACHE_OPT = 'cache' + HOST_OPT = 'host' + APIID_OPT = 'apiid' + APIKEY_OPT = 'apikey' + + # figure out what config file to read + config_path = [os.path.expanduser('~/.htsworkflow.ini'), + '/etc/htsworkflow.ini'] + if opts.config is not None: + config_path = [opts.config] + # parse options from config file + config_file = SafeConfigParser() + config_file.read(config_path) + + # load defaults from config file if not overriden by the command line + if opts.cache is None: + if config_file.has_option(SECTION_NAME, CACHE_OPT): + opts.cache = config_file.get(FRONTEND_NAME, CACHE_OPT) + else: + opts.cache = os.path.expanduser('~/.flowcelldb.shelve') + + if opts.sequence_archive is None and \ + config_file.has_option(SECTION_NAME, ARCHIVE_OPT): + opts.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT) + + opts.sequence_archive = os.path.abspath(opts.sequence_archive) + opts.library_tree = os.path.join(opts.sequence_archive, 'libraries') + opts.flowcells = os.path.join(opts.sequence_archive, 'flowcells') + opts.srfs = os.path.join(opts.sequence_archive, 'srfs') + + if opts.host is None and config_file.has_option(SECTION_NAME, HOST_OPT): + opts.host = config_file.get(SECTION_NAME, HOST_OPT) + + if opts.apiid is None and config_file.has_option(SECTION_NAME, APIID_OPT): + opts.apiid = config_file.get(SECTION_NAME, APIID_OPT) + + if opts.apikey is None and config_file.has_option(SECTION_NAME, APIKEY_OPT): + opts.apikey = config_file.get(SECTION_NAME, APIKEY_OPT) + + return opts def make_parser(): """ @@ -156,15 +292,18 @@ def make_parser(): parser.add_option('-c', '--config', default=None, help='path to a configuration file containing a ' 'sequence archive section') - - parser.add_option("--database", dest="database", - help="path to the fctracker.db", - default=None) + parser.add_option('--cache', default=None, + help="default flowcell cache") + + parser.add_option('--host', default=None, + help="specify http://host for quering flowcell information") + parser.add_option('--apiid', default=None, + help="API ID to use when retriving information") + parser.add_option("--apikey", default=None, + help="API Key for when retriving information") + parser.add_option('-a', '--sequence-archive', default=None, help='path to where the sequence archive lives') - parser.add_option("-w", "--where", dest="where", - help="add a where clause", - default=None) parser.add_option('-v', '--verbose', action='store_true', default=False, help='be more verbose') @@ -176,66 +315,34 @@ def make_parser(): help="Don't modify the filesystem") return parser -def main(argv=None): - FRONTEND_NAME = 'frontend' - SECTION_NAME = 'sequence_archive' - DATABASE_OPT = 'database_name' - ARCHIVE_OPT = 'archive_path' - - if argv is None: - argv = [] +def main(cmdline=None): parser = make_parser() + opts, args = parser.parse_args(cmdline) - # parse command line arguments - opt, args = parser.parse_args(argv) - - # setup logging - level = logging.WARN - if opt.verbose: - level = logging.INFO - if opt.debug: - level = logging.DEBUG - logging.basicConfig(level=level) - - # figure out what config file to read - config_path = [os.path.expanduser('~/.htsworkflow.ini'), - '/etc/htsworkflow.ini'] - if opt.config is not None: - config_path = [opt.config] - - # parse options from config file - config_file = SafeConfigParser() - config_file.read(config_path) - - # load defaults from config file if not overriden by the command line - print opt.database - if opt.database is None and \ - config_file.has_option(FRONTEND_NAME, DATABASE_OPT): - opt.database = config_file.get(FRONTEND_NAME, DATABASE_OPT) - - if opt.sequence_archive is None and \ - config_file.has_option(SECTION_NAME, ARCHIVE_OPT): - opt.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT) + configure_logging(opts) + opts = configure_opts(opts) # complain if critical things are missing - if opt.database is None: + if opts.cache is None: parser.error('Need location of htsworkflow frontend database') - if opt.sequence_archive is None: + if opts.sequence_archive is None: parser.error('Need the root path for the sequence archive') - fcdb = fctracker.fctracker(opt.database) - cells = fcdb._get_flowcells(opt.where) - - library_dir = os.path.join(opt.sequence_archive, 'libraries') - flowcell_dir = os.path.join(opt.sequence_archive, 'flowcells') - srfs_dir = os.path.join(opt.sequence_archive, 'srfs') - make_library_tree(fcdb, - library_dir, flowcell_dir, srfs_dir, - opt.dry_run) - + seq_dirs = [ opts.flowcells, opts.srfs ] + if len(args) > 0: + seq_dirs = [os.path.abspath(f) for f in args] + + seqs = scan_for_sequences(seq_dirs) + fcdb, libdb = build_flowcell_db(opts.cache, seqs, opts.host, opts.apiid, opts.apikey) + updates = make_library_links(opts.library_tree, libdb, dry_run=opts.dry_run) + + logging.warn("%s flowcells in database" % (len(fcdb),)) + logging.warn("found %s sequence files" % (len(seqs),)) + logging.warn("%s libraries being checked" % (len(libdb),)) + logging.warn("%s sequence files were linked" % (updates,)) + return 0 - + if __name__ == "__main__": - rv = main(sys.argv[1:]) - # sys.exit(rv) + main() -- 2.30.2