#!/usr/bin/env python from ConfigParser import SafeConfigParser import json import logging import os from optparse import OptionParser import stat import re import shelve import urllib import urllib2 import urlparse eland_re = re.compile('s_(?P\d)(?P_\d)?_eland_') raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+') qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2') class SequenceFile(object): def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None): self.filetype = filetype self.path = path self.flowcell = flowcell self.lane = lane self.read = read self.pf = pf self.cycle = cycle def __hash__(self): return hash(self.key()) def key(self): return (self.flowcell, self.lane) def unicode(self): return unicode(self.path) def __repr__(self): return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path) def make_target_name(self, root): """ Create target name for where we need to link this sequence too """ path, basename = os.path.split(self.path) # Because the names aren't unque we include the flowcel name # because there were different eland files for different length # analyses, we include the cycle length in the name. if self.filetype == 'eland': template = "%(flowcell)s_%(cycle)s_%(eland)s" basename = template % { 'flowcell': self.flowcell, 'cycle': self.cycle, 'eland': basename } # else: # all the other file types have names that include flowcell/lane # information and thus are unique so we don't have to do anything return os.path.join(root, basename) def parse_srf(path, filename): basename, ext = os.path.splitext(filename) records = basename.split('_') flowcell = records[4] lane = int(records[5][0]) fullpath = os.path.join(path, filename) return SequenceFile('srf', fullpath, flowcell, lane) def parse_qseq(path, filename): basename, ext = os.path.splitext(filename) records = basename.split('_') fullpath = os.path.join(path, filename) flowcell = records[4] lane = int(records[5][1]) read = int(records[6][1]) return SequenceFile('qseq', fullpath, flowcell, lane, read) def parse_fastq(path, filename): basename, ext = os.path.splitext(filename) records = basename.split('_') fullpath = os.path.join(path, filename) flowcell = records[4] lane = int(records[5][1]) read = int(records[6][1]) if records[-1] == 'pass': pf = True else: pf = False return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf) def parse_eland(path, filename, eland_match): fullpath = os.path.join(path, filename) rest, cycle = os.path.split(path) rest, flowcell = os.path.split(rest) lane = eland_match.group('lane') read = eland_match.group('read') return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle) def scan_for_sequences(dirs): sequences = [] for d in dirs: logging.info("Scanning %s for sequences" % (d,)) for path, dirname, filenames in os.walk(d): for f in filenames: seq = None # find sequence files if raw_seq_re.match(f): if f.endswith('.md5'): continue elif f.endswith('.srf') or f.endswith('.srf.bz2'): seq = parse_srf(path, f) elif qseq_re.match(f): seq = parse_qseq(path, f) elif f.endswith('fastq') or f.endswith('.fastq.bz2'): seq = parse_fastq(path, f) eland_match = eland_re.match(f) if eland_match: if f.endswith('.md5'): continue seq = parse_eland(path, f, eland_match) if seq: sequences.append(seq) logging.debug("Found sequence at %s" % (f,)) return sequences def retrieve_info(url, apidata): """ Return a dictionary from the HTSworkflow API """ try: apipayload = urllib.urlencode(apidata) web = urllib2.urlopen(url, apipayload) except urllib2.URLError, e: if e.code == 404: logging.info("%s was not found" % (url,)) return None else: errmsg = 'URLError: %d %s' % (e.code, e.msg) raise IOError(errmsg) contents = web.read() headers = web.info() return json.loads(contents) def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey): """ compare our flowcell database with our list of sequences and return a fully populated database """ fcdb = shelve.open(fcdb_filename) libdb = {} apidata = {'apiid': apiid, 'apikey': apikey} for seq in sequences: flowcell = seq.flowcell flowcell_info = None # get info about flowcell from server or shelf if not fcdb.has_key(flowcell): url = urlparse.urljoin(baseurl, 'experiments/config/%s/json' % (flowcell,)) flowcell_info = retrieve_info(url, apidata) if flowcell_info is not None: fcdb[flowcell] = flowcell_info else: flowcell_info = fcdb[flowcell] # make library id db if flowcell_info is not None: seq_library_id = flowcell_info['lane_set'][unicode(seq.lane)]['library_id'] libdb.setdefault(seq_library_id, []).append(seq) fcdb.sync() return fcdb, libdb def carefully_make_hardlink(source, destination, dry_run=False): """ Make a hard link, failing if a different link already exists Checking to see if the link already exists and is the same as the link we want to make. If the link already exists and is different, throw an error. If we didn't update anything return 0, if we did update return 1. """ logging.debug("CHECKING: %s -> %s", source, destination) if not os.path.exists(source): logging.warning("%s doesn't exist", source) return 0 if os.path.exists(destination): if os.path.samefile(source, destination): logging.debug('SAME: %s -> %s' % (source, destination)) return 0 else: raise IOError('%s and %s are different files' % \ (source, destination)) logging.debug('Linking: %s -> %s' % (source, destination)) # we would do something by this part if dry_run: return 1 os.link(source, destination) os.chmod(destination, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH ) return 1 def make_library_links(root, library_db, dry_run=False): """ Make a tree of sequencer roots organized by library id Root is the root of the library tree library_db is a dictionary of SequenceFiles organized by library id """ count = 0 root = os.path.abspath(root) for lib_id, sequences in library_db.items(): target_dir = os.path.join(root, lib_id) if not os.path.exists(target_dir): logging.info("mkdir %s" % (target_dir,)) if not dry_run: os.mkdir(target_dir) for s in sequences: count += carefully_make_hardlink(s.path, s.make_target_name(target_dir), dry_run=dry_run) return count def configure_logging(opts): # setup logging level = logging.WARN if opts.verbose: level = logging.INFO if opts.debug: level = logging.DEBUG logging.basicConfig(level=level) def configure_opts(opts): """ Load in options from config file """ SECTION_NAME = 'sequence_archive' ARCHIVE_OPT = 'sequence_archive' CACHE_OPT = 'cache' HOST_OPT = 'host' APIID_OPT = 'apiid' APIKEY_OPT = 'apikey' # figure out what config file to read config_path = [os.path.expanduser('~/.htsworkflow.ini'), '/etc/htsworkflow.ini'] if opts.config is not None: config_path = [opts.config] # parse options from config file config_file = SafeConfigParser() config_file.read(config_path) # load defaults from config file if not overriden by the command line if opts.cache is None: if config_file.has_option(SECTION_NAME, CACHE_OPT): opts.cache = config_file.get(FRONTEND_NAME, CACHE_OPT) else: opts.cache = os.path.expanduser('~/.flowcelldb.shelve') if opts.sequence_archive is None and \ config_file.has_option(SECTION_NAME, ARCHIVE_OPT): opts.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT) opts.sequence_archive = os.path.abspath(opts.sequence_archive) opts.library_tree = os.path.join(opts.sequence_archive, 'libraries') opts.flowcells = os.path.join(opts.sequence_archive, 'flowcells') opts.srfs = os.path.join(opts.sequence_archive, 'srfs') if opts.host is None and config_file.has_option(SECTION_NAME, HOST_OPT): opts.host = config_file.get(SECTION_NAME, HOST_OPT) if opts.apiid is None and config_file.has_option(SECTION_NAME, APIID_OPT): opts.apiid = config_file.get(SECTION_NAME, APIID_OPT) if opts.apikey is None and config_file.has_option(SECTION_NAME, APIKEY_OPT): opts.apikey = config_file.get(SECTION_NAME, APIKEY_OPT) return opts def make_parser(): """ Make parser """ parser = OptionParser() parser.add_option('-c', '--config', default=None, help='path to a configuration file containing a ' 'sequence archive section') parser.add_option('--cache', default=None, help="default flowcell cache") parser.add_option('--host', default=None, help="specify http://host for quering flowcell information") parser.add_option('--apiid', default=None, help="API ID to use when retriving information") parser.add_option("--apikey", default=None, help="API Key for when retriving information") parser.add_option('-a', '--sequence-archive', default=None, help='path to where the sequence archive lives') parser.add_option('-v', '--verbose', action='store_true', default=False, help='be more verbose') parser.add_option('-d', '--debug', action='store_true', default=False, help='report everything') parser.add_option("--dry-run", dest="dry_run", action="store_true", default=False, help="Don't modify the filesystem") return parser def main(cmdline=None): parser = make_parser() opts, args = parser.parse_args(cmdline) configure_logging(opts) opts = configure_opts(opts) # complain if critical things are missing if opts.cache is None: parser.error('Need location of htsworkflow frontend database') if opts.sequence_archive is None: parser.error('Need the root path for the sequence archive') seq_dirs = [ opts.flowcells, opts.srfs ] if len(args) > 0: seq_dirs = [os.path.abspath(f) for f in args] seqs = scan_for_sequences(seq_dirs) fcdb, libdb = build_flowcell_db(opts.cache, seqs, opts.host, opts.apiid, opts.apikey) updates = make_library_links(opts.library_tree, libdb, dry_run=opts.dry_run) logging.warn("%s flowcells in database" % (len(fcdb),)) logging.warn("found %s sequence files" % (len(seqs),)) logging.warn("%s libraries being checked" % (len(libdb),)) logging.warn("%s sequence files were linked" % (updates,)) return 0 if __name__ == "__main__": main()