-#!/usr/bin/python
-"""
-Make a tree of symlinks organized by library id.
-"""
+#!/usr/bin/env python
+
from ConfigParser import SafeConfigParser
-from glob import glob
-import logging
-from optparse import OptionParser
+import json
import logging
import os
+from optparse import OptionParser
import stat
-import sys
-
-from htsworkflow.util import fctracker
-
-def find_lanes(flowcell_dir, flowcell_id, lane):
- lane_name = "s_%s_eland_*" %(lane)
- pattern = os.path.join(flowcell_dir, flowcell_id, "*", lane_name)
- lanes = glob(pattern)
- return lanes
-
-def make_long_lane_name(flowcell_dir, lane_pathname):
- """
- make a name from the eland result file name
- """
- if flowcell_dir == lane_pathname[0:len(flowcell_dir)]:
- subpath = lane_pathname[len(flowcell_dir):]
- long_name = subpath.replace(os.path.sep, "_")
- return long_name
+import re
+import shelve
+import urllib
+import urllib2
+import urlparse
+
+eland_re = re.compile('s_(?P<lane>\d)(?P<read>_\d)?_eland_')
+raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
+qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
+
+class SequenceFile(object):
+ def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
+ self.filetype = filetype
+ self.path = path
+ self.flowcell = flowcell
+ self.lane = lane
+ self.read = read
+ self.pf = pf
+ self.cycle = cycle
+
+ def __hash__(self):
+ return hash(self.key())
+
+ def key(self):
+ return (self.flowcell, self.lane)
+
+ def unicode(self):
+ return unicode(self.path)
+
+ def __repr__(self):
+ return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
+
+ def make_target_name(self, root):
+ """
+ Create target name for where we need to link this sequence too
+ """
+ path, basename = os.path.split(self.path)
+ # Because the names aren't unque we include the flowcel name
+ # because there were different eland files for different length
+ # analyses, we include the cycle length in the name.
+ if self.filetype == 'eland':
+ template = "%(flowcell)s_%(cycle)s_%(eland)s"
+ basename = template % { 'flowcell': self.flowcell,
+ 'cycle': self.cycle,
+ 'eland': basename }
+ # else:
+ # all the other file types have names that include flowcell/lane
+ # information and thus are unique so we don't have to do anything
+ return os.path.join(root, basename)
+
+def parse_srf(path, filename):
+ basename, ext = os.path.splitext(filename)
+ records = basename.split('_')
+ flowcell = records[4]
+ lane = int(records[5][0])
+ fullpath = os.path.join(path, filename)
+ return SequenceFile('srf', fullpath, flowcell, lane)
+
+def parse_qseq(path, filename):
+ basename, ext = os.path.splitext(filename)
+ records = basename.split('_')
+ fullpath = os.path.join(path, filename)
+ flowcell = records[4]
+ lane = int(records[5][1])
+ read = int(records[6][1])
+ return SequenceFile('qseq', fullpath, flowcell, lane, read)
+
+def parse_fastq(path, filename):
+ basename, ext = os.path.splitext(filename)
+ records = basename.split('_')
+ fullpath = os.path.join(path, filename)
+ flowcell = records[4]
+ lane = int(records[5][1])
+ read = int(records[6][1])
+ if records[-1] == 'pass':
+ pf = True
else:
- return None
+ pf = False
+ return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
+
+def parse_eland(path, filename, eland_match):
+ fullpath = os.path.join(path, filename)
+ rest, cycle = os.path.split(path)
+ rest, flowcell = os.path.split(rest)
+ lane = eland_match.group('lane')
+ read = eland_match.group('read')
+ return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
-def parse_srf_directory(srf_dir):
+def scan_for_sequences(dirs):
+ sequences = []
+ for d in dirs:
+ logging.info("Scanning %s for sequences" % (d,))
+ for path, dirname, filenames in os.walk(d):
+ for f in filenames:
+ seq = None
+ # find sequence files
+ if raw_seq_re.match(f):
+ if f.endswith('.md5'):
+ continue
+ elif f.endswith('.srf') or f.endswith('.srf.bz2'):
+ seq = parse_srf(path, f)
+ elif qseq_re.match(f):
+ seq = parse_qseq(path, f)
+ elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
+ seq = parse_fastq(path, f)
+ eland_match = eland_re.match(f)
+ if eland_match:
+ if f.endswith('.md5'):
+ continue
+ seq = parse_eland(path, f, eland_match)
+ if seq:
+ sequences.append(seq)
+ logging.debug("Found sequence at %s" % (f,))
+
+ return sequences
+
+def retrieve_info(url, apidata):
"""
- search srf_dir for *.srf files
+ Return a dictionary from the HTSworkflow API
+ """
+ try:
+ apipayload = urllib.urlencode(apidata)
+ web = urllib2.urlopen(url, apipayload)
+ except urllib2.URLError, e:
+ if e.code == 404:
+ logging.info("%s was not found" % (url,))
+ return None
+ else:
+ errmsg = 'URLError: %d %s' % (e.code, e.msg)
+ raise IOError(errmsg)
+
+ contents = web.read()
+ headers = web.info()
- builds a dictionary indexed by flowcell name.
+ return json.loads(contents)
+
+def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey):
+ """
+ compare our flowcell database with our list of sequences and return
+ a fully populated database
"""
- flowcells = {}
- srfs = glob(os.path.join(srf_dir,'*.srf'))
- for pathname in srfs:
- path, filename = os.path.split(pathname)
- basename, ext = os.path.splitext(filename)
- record = basename.split('_')
- if len(record) != 6:
- logging.error("Unrecognized srf file: %s expected 6 fields got %d" % (pathname,len(record)))
- continue
-
- site = record[0]
- date = record[1]
- machine = record[2]
- runid = record[3]
- flowcellid = record[4]
- laneid = record[5]
-
- desc = "_".join([site,date,machine,runid,flowcellid])
- flowcells[flowcellid] = desc
- return flowcells
+ fcdb = shelve.open(fcdb_filename)
+ libdb = {}
+ apidata = {'apiid': apiid, 'apikey': apikey}
+ for seq in sequences:
+ flowcell = seq.flowcell
+ flowcell_info = None
+
+ # get info about flowcell from server or shelf
+ if not fcdb.has_key(flowcell):
+ url = urlparse.urljoin(baseurl, 'experiments/config/%s/json' % (flowcell,))
+ flowcell_info = retrieve_info(url, apidata)
+ if flowcell_info is not None:
+ fcdb[flowcell] = flowcell_info
+ else:
+ flowcell_info = fcdb[flowcell]
+ # make library id db
+ if flowcell_info is not None:
+ seq_library_id = flowcell_info['lane_set'][unicode(seq.lane)]['library_id']
+ libdb.setdefault(seq_library_id, []).append(seq)
+
+ fcdb.sync()
+ return fcdb, libdb
def carefully_make_hardlink(source, destination, dry_run=False):
"""
Checking to see if the link already exists and is
the same as the link we want to make.
If the link already exists and is different, throw an error.
+
+ If we didn't update anything return 0, if we did update
+ return 1.
"""
logging.debug("CHECKING: %s -> %s", source, destination)
if not os.path.exists(source):
logging.warning("%s doesn't exist", source)
- return
+ return 0
if os.path.exists(destination):
if os.path.samefile(source, destination):
logging.debug('SAME: %s -> %s' % (source, destination))
- return
+ return 0
else:
raise IOError('%s and %s are different files' % \
(source, destination))
- logging.info('Linking: %s -> %s' % (source, destination))
+ logging.debug('Linking: %s -> %s' % (source, destination))
- if dry_run: return
+ # we would do something by this part
+ if dry_run: return 1
os.link(source, destination)
os.chmod(destination,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH )
-
-def link_all_eland_lanes(library_path, flowcell_dir, flowcell_id, lane, dry_run):
- """
- find eland files at different alignment lengths
- and put each of those in the file
- """
- lanes = find_lanes(flowcell_dir, flowcell_id, lane)
- for lane_pathname in lanes:
- long_name = make_long_lane_name(flowcell_dir,
- lane_pathname)
- long_pathname = os.path.join(library_path, long_name)
- carefully_make_hardlink(lane_pathname,
- long_pathname,
- dry_run)
-
-def link_srf_lanes(srf_names, library_path, srf_dir, flowcell_id, lane, dry_run):
+ return 1
+
+def make_library_links(root, library_db, dry_run=False):
"""
- Link srf files into our library directories.
+ Make a tree of sequencer roots organized by library id
- the srf files must be named:
- <site>_<date>_<machine>_<run>_<flowcellid>_<lane>.srf
+ Root is the root of the library tree
+ library_db is a dictionary of SequenceFiles organized by library id
"""
- srf_basename = srf_names.get(flowcell_id, None)
- if srf_basename is None:
- logging.info("srf file for %s was not found", flowcell_id)
- else:
- srf_filename = "%s_%s.srf" % (srf_basename, lane)
- source = os.path.join(srf_dir, srf_filename)
- destination = os.path.join(library_path, srf_filename)
- carefully_make_hardlink(source, destination, dry_run)
+ count = 0
+ root = os.path.abspath(root)
+ for lib_id, sequences in library_db.items():
+ target_dir = os.path.join(root, lib_id)
+ if not os.path.exists(target_dir):
+ logging.info("mkdir %s" % (target_dir,))
+ if not dry_run:
+ os.mkdir(target_dir)
+
+ for s in sequences:
+ count += carefully_make_hardlink(s.path,
+ s.make_target_name(target_dir),
+ dry_run=dry_run)
+ return count
+
+def configure_logging(opts):
+ # setup logging
+ level = logging.WARN
+ if opts.verbose:
+ level = logging.INFO
+ if opts.debug:
+ level = logging.DEBUG
+ logging.basicConfig(level=level)
-def make_library_tree(fcdb, library_dir, flowcell_dir, srfs_dir,
- dry_run=False):
+def configure_opts(opts):
"""
- Iterate over the library
+ Load in options from config file
"""
- library_dir = os.path.normpath(library_dir) + os.path.sep
- flowcell_dir = os.path.normpath(flowcell_dir) + os.path.sep
- srfs_dir = os.path.normpath(srfs_dir) + os.path.sep
-
- srf_names = parse_srf_directory(srfs_dir)
-
- for lib_id, lib in fcdb.library.items():
- library_path = os.path.join(library_dir, str(lib_id))
- if not os.path.exists(library_path):
- os.mkdir(library_path)
-
- for flowcell_id, lane in lib.get('lanes', []):
- link_all_eland_lanes(library_path,
- flowcell_dir,
- flowcell_id,
- lane,
- dry_run)
-
- link_srf_lanes(srf_names,
- library_path,
- srfs_dir,
- flowcell_id,
- lane,
- dry_run)
+ SECTION_NAME = 'sequence_archive'
+ ARCHIVE_OPT = 'sequence_archive'
+ CACHE_OPT = 'cache'
+ HOST_OPT = 'host'
+ APIID_OPT = 'apiid'
+ APIKEY_OPT = 'apikey'
+
+ # figure out what config file to read
+ config_path = [os.path.expanduser('~/.htsworkflow.ini'),
+ '/etc/htsworkflow.ini']
+ if opts.config is not None:
+ config_path = [opts.config]
+ # parse options from config file
+ config_file = SafeConfigParser()
+ config_file.read(config_path)
+
+ # load defaults from config file if not overriden by the command line
+ if opts.cache is None:
+ if config_file.has_option(SECTION_NAME, CACHE_OPT):
+ opts.cache = config_file.get(FRONTEND_NAME, CACHE_OPT)
+ else:
+ opts.cache = os.path.expanduser('~/.flowcelldb.shelve')
+
+ if opts.sequence_archive is None and \
+ config_file.has_option(SECTION_NAME, ARCHIVE_OPT):
+ opts.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT)
+
+ opts.sequence_archive = os.path.abspath(opts.sequence_archive)
+ opts.library_tree = os.path.join(opts.sequence_archive, 'libraries')
+ opts.flowcells = os.path.join(opts.sequence_archive, 'flowcells')
+ opts.srfs = os.path.join(opts.sequence_archive, 'srfs')
+
+ if opts.host is None and config_file.has_option(SECTION_NAME, HOST_OPT):
+ opts.host = config_file.get(SECTION_NAME, HOST_OPT)
+
+ if opts.apiid is None and config_file.has_option(SECTION_NAME, APIID_OPT):
+ opts.apiid = config_file.get(SECTION_NAME, APIID_OPT)
+
+ if opts.apikey is None and config_file.has_option(SECTION_NAME, APIKEY_OPT):
+ opts.apikey = config_file.get(SECTION_NAME, APIKEY_OPT)
+
+ return opts
def make_parser():
"""
parser.add_option('-c', '--config', default=None,
help='path to a configuration file containing a '
'sequence archive section')
-
- parser.add_option("--database", dest="database",
- help="path to the fctracker.db",
- default=None)
+ parser.add_option('--cache', default=None,
+ help="default flowcell cache")
+
+ parser.add_option('--host', default=None,
+ help="specify http://host for quering flowcell information")
+ parser.add_option('--apiid', default=None,
+ help="API ID to use when retriving information")
+ parser.add_option("--apikey", default=None,
+ help="API Key for when retriving information")
+
parser.add_option('-a', '--sequence-archive', default=None,
help='path to where the sequence archive lives')
- parser.add_option("-w", "--where", dest="where",
- help="add a where clause",
- default=None)
parser.add_option('-v', '--verbose', action='store_true', default=False,
help='be more verbose')
help="Don't modify the filesystem")
return parser
-def main(argv=None):
- FRONTEND_NAME = 'frontend'
- SECTION_NAME = 'sequence_archive'
- DATABASE_OPT = 'database_name'
- ARCHIVE_OPT = 'archive_path'
-
- if argv is None:
- argv = []
+def main(cmdline=None):
parser = make_parser()
+ opts, args = parser.parse_args(cmdline)
- # parse command line arguments
- opt, args = parser.parse_args(argv)
-
- # setup logging
- level = logging.WARN
- if opt.verbose:
- level = logging.INFO
- if opt.debug:
- level = logging.DEBUG
- logging.basicConfig(level=level)
-
- # figure out what config file to read
- config_path = [os.path.expanduser('~/.htsworkflow.ini'),
- '/etc/htsworkflow.ini']
- if opt.config is not None:
- config_path = [opt.config]
-
- # parse options from config file
- config_file = SafeConfigParser()
- config_file.read(config_path)
-
- # load defaults from config file if not overriden by the command line
- print opt.database
- if opt.database is None and \
- config_file.has_option(FRONTEND_NAME, DATABASE_OPT):
- opt.database = config_file.get(FRONTEND_NAME, DATABASE_OPT)
-
- if opt.sequence_archive is None and \
- config_file.has_option(SECTION_NAME, ARCHIVE_OPT):
- opt.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT)
+ configure_logging(opts)
+ opts = configure_opts(opts)
# complain if critical things are missing
- if opt.database is None:
+ if opts.cache is None:
parser.error('Need location of htsworkflow frontend database')
- if opt.sequence_archive is None:
+ if opts.sequence_archive is None:
parser.error('Need the root path for the sequence archive')
- fcdb = fctracker.fctracker(opt.database)
- cells = fcdb._get_flowcells(opt.where)
-
- library_dir = os.path.join(opt.sequence_archive, 'libraries')
- flowcell_dir = os.path.join(opt.sequence_archive, 'flowcells')
- srfs_dir = os.path.join(opt.sequence_archive, 'srfs')
- make_library_tree(fcdb,
- library_dir, flowcell_dir, srfs_dir,
- opt.dry_run)
-
+ seq_dirs = [ opts.flowcells, opts.srfs ]
+ if len(args) > 0:
+ seq_dirs = [os.path.abspath(f) for f in args]
+
+ seqs = scan_for_sequences(seq_dirs)
+ fcdb, libdb = build_flowcell_db(opts.cache, seqs, opts.host, opts.apiid, opts.apikey)
+ updates = make_library_links(opts.library_tree, libdb, dry_run=opts.dry_run)
+
+ logging.warn("%s flowcells in database" % (len(fcdb),))
+ logging.warn("found %s sequence files" % (len(seqs),))
+ logging.warn("%s libraries being checked" % (len(libdb),))
+ logging.warn("%s sequence files were linked" % (updates,))
+
return 0
-
+
if __name__ == "__main__":
- rv = main(sys.argv[1:])
- # sys.exit(rv)
+ main()