Use the HTS workflow API to figure out the library tree.
authorDiane Trout <diane@caltech.edu>
Fri, 5 Mar 2010 22:41:13 +0000 (22:41 +0000)
committerDiane Trout <diane@caltech.edu>
Fri, 5 Mar 2010 22:41:13 +0000 (22:41 +0000)
This also needed to search flow flowcell id by the starting name
because we still have the status of a flowcell being part of the
name in a few places.

htsworkflow/frontend/experiments/experiments.py
scripts/make-library-tree

index 83b541437603197923c741550e5ca7da44dddf88..a156d7bdbe0198876f504b3f4b4276e115835787 100755 (executable)
@@ -23,7 +23,7 @@ def flowcell_information(flowcell_id):
     Return a dictionary describing a flowcell
     """
     try:
-        fc = FlowCell.objects.get(flowcell_id=flowcell_id)
+        fc = FlowCell.objects.get(flowcell_id__startswith=flowcell_id)
     except FlowCell.DoesNotExist, e:
         return None
 
index 50ce7a932ef263300c70d2e9483f18431ced6f5d..f4e854a0fe4cd742a7481513f67455b70291b2e0 100644 (file)
-#!/usr/bin/python
-"""
-Make a tree of symlinks organized by library id.
-"""
+#!/usr/bin/env python
+
 from ConfigParser import SafeConfigParser
-from glob import glob
-import logging
-from optparse import OptionParser
+import json
 import logging
 import os
+from optparse import OptionParser
 import stat
-import sys
-
-from htsworkflow.util import fctracker
-
-def find_lanes(flowcell_dir, flowcell_id, lane):
-    lane_name = "s_%s_eland_*" %(lane)
-    pattern = os.path.join(flowcell_dir, flowcell_id, "*", lane_name)
-    lanes = glob(pattern)
-    return lanes
-
-def make_long_lane_name(flowcell_dir, lane_pathname):
-    """
-    make a name from the eland result file name
-    """
-    if flowcell_dir == lane_pathname[0:len(flowcell_dir)]:
-        subpath = lane_pathname[len(flowcell_dir):]
-        long_name = subpath.replace(os.path.sep, "_")
-        return long_name
+import re
+import shelve
+import urllib
+import urllib2
+import urlparse
+
+eland_re = re.compile('s_(?P<lane>\d)(?P<read>_\d)?_eland_')
+raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
+qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
+
+class SequenceFile(object):
+    def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
+        self.filetype = filetype
+        self.path = path
+        self.flowcell = flowcell
+        self.lane = lane
+        self.read = read
+        self.pf = pf
+        self.cycle = cycle
+
+    def __hash__(self):
+        return hash(self.key())
+
+    def key(self):
+        return (self.flowcell, self.lane)
+
+    def unicode(self):
+        return unicode(self.path)
+
+    def __repr__(self):
+        return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
+
+    def make_target_name(self, root):
+        """
+        Create target name for where we need to link this sequence too
+        """
+        path, basename = os.path.split(self.path)
+        # Because the names aren't unque we include the flowcel name
+        # because there were different eland files for different length
+        # analyses, we include the cycle length in the name.
+        if self.filetype == 'eland':
+            template = "%(flowcell)s_%(cycle)s_%(eland)s"
+            basename = template % { 'flowcell': self.flowcell,
+                                    'cycle': self.cycle,
+                                    'eland': basename }
+        # else:
+        # all the other file types have names that include flowcell/lane
+        # information and thus are unique so we don't have to do anything
+        return os.path.join(root, basename)
+        
+def parse_srf(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    flowcell = records[4]
+    lane = int(records[5][0])
+    fullpath = os.path.join(path, filename)
+    return SequenceFile('srf', fullpath, flowcell, lane)
+
+def parse_qseq(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    fullpath = os.path.join(path, filename)
+    flowcell = records[4]
+    lane = int(records[5][1])
+    read = int(records[6][1])
+    return SequenceFile('qseq', fullpath, flowcell, lane, read)
+
+def parse_fastq(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    fullpath = os.path.join(path, filename)
+    flowcell = records[4]
+    lane = int(records[5][1])
+    read = int(records[6][1])
+    if records[-1] == 'pass':
+        pf = True
     else:
-        return None
+        pf = False
+    return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
+
+def parse_eland(path, filename, eland_match):
+    fullpath = os.path.join(path, filename)
+    rest, cycle = os.path.split(path)
+    rest, flowcell = os.path.split(rest)
+    lane = eland_match.group('lane')
+    read = eland_match.group('read')
+    return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
     
-def parse_srf_directory(srf_dir):
+def scan_for_sequences(dirs):
+    sequences = []
+    for d in dirs:
+        logging.info("Scanning %s for sequences" % (d,))
+        for path, dirname, filenames in os.walk(d):
+            for f in filenames:
+                seq = None
+                # find sequence files
+                if raw_seq_re.match(f):
+                    if f.endswith('.md5'):
+                        continue
+                    elif f.endswith('.srf') or f.endswith('.srf.bz2'):
+                        seq = parse_srf(path, f)
+                    elif qseq_re.match(f):
+                        seq = parse_qseq(path, f)
+                    elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
+                        seq = parse_fastq(path, f)
+                eland_match = eland_re.match(f)
+                if eland_match:
+                    if f.endswith('.md5'):
+                        continue
+                    seq = parse_eland(path, f, eland_match)
+                if seq:
+                    sequences.append(seq)
+                    logging.debug("Found sequence at %s" % (f,))
+                    
+    return sequences
+
+def retrieve_info(url, apidata):
     """
-    search srf_dir for *.srf files
+    Return a dictionary from the HTSworkflow API
+    """
+    try:
+        apipayload = urllib.urlencode(apidata)
+        web = urllib2.urlopen(url, apipayload)
+    except urllib2.URLError, e:
+        if e.code == 404:
+            logging.info("%s was not found" % (url,))
+            return None
+        else:
+            errmsg = 'URLError: %d %s' % (e.code, e.msg)
+            raise IOError(errmsg)
+    
+    contents = web.read()
+    headers = web.info()
 
-    builds a dictionary indexed by flowcell name.
+    return json.loads(contents)
+
+def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey):
+    """
+    compare our flowcell database with our list of sequences and return
+    a fully populated database
     """
-    flowcells = {}
-    srfs = glob(os.path.join(srf_dir,'*.srf'))
-    for pathname in srfs:
-        path, filename = os.path.split(pathname)
-        basename, ext = os.path.splitext(filename)
-        record = basename.split('_')
-        if len(record) != 6:
-            logging.error("Unrecognized srf file: %s expected 6 fields got %d" % (pathname,len(record)))
-            continue
-
-        site = record[0]
-        date = record[1]
-        machine = record[2]
-        runid = record[3]
-        flowcellid = record[4]
-        laneid = record[5]
-
-        desc = "_".join([site,date,machine,runid,flowcellid])
-        flowcells[flowcellid] = desc
-    return flowcells
+    fcdb = shelve.open(fcdb_filename)
+    libdb = {}
+    apidata = {'apiid': apiid, 'apikey': apikey}
+    for seq in sequences:
+        flowcell = seq.flowcell
+        flowcell_info = None
+
+        # get info about flowcell from server or shelf
+        if not fcdb.has_key(flowcell):
+            url = urlparse.urljoin(baseurl, 'experiments/config/%s/json' % (flowcell,))
+            flowcell_info = retrieve_info(url, apidata)
+            if flowcell_info is not None:
+                fcdb[flowcell] = flowcell_info
+        else:
+            flowcell_info = fcdb[flowcell]
 
+        # make library id db
+        if flowcell_info is not None:
+            seq_library_id = flowcell_info['lane_set'][unicode(seq.lane)]['library_id']
+            libdb.setdefault(seq_library_id, []).append(seq)
+           
+    fcdb.sync()
+    return fcdb, libdb
 
 def carefully_make_hardlink(source, destination, dry_run=False):
     """
@@ -65,88 +178,111 @@ def carefully_make_hardlink(source, destination, dry_run=False):
     Checking to see if the link already exists and is
     the same as the link we want to make.
     If the link already exists and is different, throw an error.
+
+    If we didn't update anything return 0, if we did update
+    return 1.
     """
     logging.debug("CHECKING: %s -> %s", source, destination)
 
     if not os.path.exists(source):
         logging.warning("%s doesn't exist", source)
-        return
+        return 0
 
     if os.path.exists(destination):
         if os.path.samefile(source, destination):
             logging.debug('SAME: %s -> %s' % (source, destination))
-            return
+            return 0
         else:
             raise IOError('%s and %s are different files' % \
                            (source, destination))
-    logging.info('Linking: %s -> %s' % (source, destination))
+    logging.debug('Linking: %s -> %s' % (source, destination))
 
-    if dry_run: return 
+    # we would do something by this part
+    if dry_run: return 1
 
     os.link(source, destination)
     os.chmod(destination,
              stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH )
-
-def link_all_eland_lanes(library_path, flowcell_dir, flowcell_id, lane, dry_run):
-    """
-    find eland files at different alignment lengths
-    and put each of those in the file 
-    """
-    lanes = find_lanes(flowcell_dir, flowcell_id, lane)
-    for lane_pathname in lanes:
-        long_name = make_long_lane_name(flowcell_dir, 
-                                        lane_pathname)
-        long_pathname = os.path.join(library_path, long_name)
-        carefully_make_hardlink(lane_pathname,
-                                long_pathname,
-                                dry_run)
-
-def link_srf_lanes(srf_names, library_path, srf_dir, flowcell_id, lane, dry_run):
+    return 1
+    
+def make_library_links(root, library_db, dry_run=False):
     """
-    Link srf files into our library directories.
+    Make a tree of sequencer roots organized by library id
 
-    the srf files must be named:
-    <site>_<date>_<machine>_<run>_<flowcellid>_<lane>.srf
+    Root is the root of the library tree
+    library_db is a dictionary of SequenceFiles organized by library id
     """
-    srf_basename = srf_names.get(flowcell_id, None)
-    if srf_basename is None:
-        logging.info("srf file for %s was not found", flowcell_id)
-    else:
-        srf_filename = "%s_%s.srf" % (srf_basename, lane)
-        source = os.path.join(srf_dir, srf_filename)
-        destination = os.path.join(library_path, srf_filename)
-        carefully_make_hardlink(source, destination, dry_run)
+    count = 0
+    root = os.path.abspath(root)
+    for lib_id, sequences in library_db.items():
+        target_dir = os.path.join(root, lib_id)
+        if not os.path.exists(target_dir):
+            logging.info("mkdir %s" % (target_dir,))
+            if not dry_run:
+                os.mkdir(target_dir)
+            
+        for s in sequences:
+            count += carefully_make_hardlink(s.path,
+                                             s.make_target_name(target_dir),
+                                             dry_run=dry_run)
+    return count
+
+def configure_logging(opts):
+    # setup logging
+    level = logging.WARN
+    if opts.verbose:
+        level = logging.INFO
+    if opts.debug:
+        level = logging.DEBUG
+    logging.basicConfig(level=level)
     
 
-def make_library_tree(fcdb, library_dir, flowcell_dir, srfs_dir,
-                      dry_run=False):
+def configure_opts(opts):
     """
-    Iterate over the library 
+    Load in options from config file
     """
-    library_dir = os.path.normpath(library_dir) + os.path.sep
-    flowcell_dir = os.path.normpath(flowcell_dir) + os.path.sep
-    srfs_dir = os.path.normpath(srfs_dir) + os.path.sep
-
-    srf_names = parse_srf_directory(srfs_dir)
-
-    for lib_id, lib in fcdb.library.items():
-        library_path = os.path.join(library_dir, str(lib_id))
-        if not os.path.exists(library_path):
-            os.mkdir(library_path)
-
-        for flowcell_id, lane in lib.get('lanes', []):
-            link_all_eland_lanes(library_path, 
-                                 flowcell_dir, 
-                                 flowcell_id, 
-                                 lane, 
-                                 dry_run)
-
-            link_srf_lanes(srf_names, 
-                           library_path, 
-                           srfs_dir,
-                           flowcell_id,
-                           lane,
-                           dry_run)
+    SECTION_NAME = 'sequence_archive'
+    ARCHIVE_OPT = 'sequence_archive'
+    CACHE_OPT = 'cache'
+    HOST_OPT = 'host'
+    APIID_OPT = 'apiid'
+    APIKEY_OPT = 'apikey'
+
+    # figure out what config file to read
+    config_path = [os.path.expanduser('~/.htsworkflow.ini'),
+                   '/etc/htsworkflow.ini']
+    if opts.config is not None:
+        config_path = [opts.config]
+    # parse options from config file
+    config_file = SafeConfigParser()
+    config_file.read(config_path)
+
+    # load defaults from config file if not overriden by the command line
+    if opts.cache is None:
+        if config_file.has_option(SECTION_NAME, CACHE_OPT):
+            opts.cache = config_file.get(FRONTEND_NAME, CACHE_OPT)
+        else:
+            opts.cache = os.path.expanduser('~/.flowcelldb.shelve')
+
+    if opts.sequence_archive is None and \
+       config_file.has_option(SECTION_NAME, ARCHIVE_OPT):
+        opts.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT)
+
+    opts.sequence_archive = os.path.abspath(opts.sequence_archive)
+    opts.library_tree = os.path.join(opts.sequence_archive, 'libraries')
+    opts.flowcells = os.path.join(opts.sequence_archive, 'flowcells')
+    opts.srfs = os.path.join(opts.sequence_archive, 'srfs')
+
+    if opts.host is None and config_file.has_option(SECTION_NAME, HOST_OPT):
+        opts.host = config_file.get(SECTION_NAME, HOST_OPT)
+
+    if opts.apiid is None and config_file.has_option(SECTION_NAME, APIID_OPT):
+        opts.apiid = config_file.get(SECTION_NAME, APIID_OPT)
+
+    if opts.apikey is None and config_file.has_option(SECTION_NAME, APIKEY_OPT):
+        opts.apikey = config_file.get(SECTION_NAME, APIKEY_OPT)
+      
+    return opts
 
 def make_parser():
     """
@@ -156,15 +292,18 @@ def make_parser():
     parser.add_option('-c', '--config', default=None,
                       help='path to a configuration file containing a '
                            'sequence archive section')
-                      
-    parser.add_option("--database", dest="database",
-                      help="path to the fctracker.db",
-                      default=None)
+    parser.add_option('--cache', default=None,
+                      help="default flowcell cache")
+    
+    parser.add_option('--host', default=None,
+                      help="specify http://host for quering flowcell information")
+    parser.add_option('--apiid', default=None,
+                      help="API ID to use when retriving information")
+    parser.add_option("--apikey", default=None,
+                      help="API Key for when retriving information")
+    
     parser.add_option('-a', '--sequence-archive', default=None,
                       help='path to where the sequence archive lives')
-    parser.add_option("-w", "--where", dest="where",
-                      help="add a where clause",
-                      default=None)
 
     parser.add_option('-v', '--verbose', action='store_true', default=False,
                       help='be more verbose')
@@ -176,66 +315,34 @@ def make_parser():
                       help="Don't modify the filesystem")
     return parser
 
-def main(argv=None):
-    FRONTEND_NAME = 'frontend'
-    SECTION_NAME = 'sequence_archive'
-    DATABASE_OPT = 'database_name'
-    ARCHIVE_OPT = 'archive_path'
-
-    if argv is None:
-        argv = []
+def main(cmdline=None):
     parser = make_parser()
+    opts, args = parser.parse_args(cmdline)
 
-    # parse command line arguments
-    opt, args = parser.parse_args(argv)
-
-    # setup logging
-    level = logging.WARN
-    if opt.verbose:
-        level = logging.INFO
-    if opt.debug:
-        level = logging.DEBUG
-    logging.basicConfig(level=level)
-
-    # figure out what config file to read
-    config_path = [os.path.expanduser('~/.htsworkflow.ini'),
-                   '/etc/htsworkflow.ini']
-    if opt.config is not None:
-        config_path = [opt.config]
-    
-    # parse options from config file
-    config_file = SafeConfigParser()
-    config_file.read(config_path)
-
-    # load defaults from config file if not overriden by the command line
-    print opt.database
-    if opt.database is None and \
-       config_file.has_option(FRONTEND_NAME, DATABASE_OPT):
-        opt.database = config_file.get(FRONTEND_NAME, DATABASE_OPT)
-
-    if opt.sequence_archive is None and \
-       config_file.has_option(SECTION_NAME, ARCHIVE_OPT):
-        opt.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT)
+    configure_logging(opts)
+    opts = configure_opts(opts)
   
     # complain if critical things are missing
-    if opt.database is None:
+    if opts.cache is None:
        parser.error('Need location of htsworkflow frontend database')
 
-    if opt.sequence_archive is None:
+    if opts.sequence_archive is None:
        parser.error('Need the root path for the sequence archive')
 
-    fcdb = fctracker.fctracker(opt.database)
-    cells = fcdb._get_flowcells(opt.where)
-
-    library_dir = os.path.join(opt.sequence_archive, 'libraries')
-    flowcell_dir = os.path.join(opt.sequence_archive, 'flowcells')
-    srfs_dir = os.path.join(opt.sequence_archive, 'srfs')
-    make_library_tree(fcdb, 
-                      library_dir, flowcell_dir, srfs_dir, 
-                      opt.dry_run)
-
+    seq_dirs = [ opts.flowcells, opts.srfs ]
+    if len(args) > 0:
+        seq_dirs = [os.path.abspath(f) for f in args]
+    
+    seqs = scan_for_sequences(seq_dirs)
+    fcdb, libdb = build_flowcell_db(opts.cache, seqs, opts.host, opts.apiid, opts.apikey)
+    updates = make_library_links(opts.library_tree, libdb, dry_run=opts.dry_run)
+    
+    logging.warn("%s flowcells in database" % (len(fcdb),))
+    logging.warn("found %s sequence files" % (len(seqs),))
+    logging.warn("%s libraries being checked" % (len(libdb),))
+    logging.warn("%s sequence files were linked" % (updates,))
+    
     return 0
-
+    
 if __name__ == "__main__":
-    rv = main(sys.argv[1:])
-    # sys.exit(rv)
+    main()