Use the HTS workflow API to figure out the library tree.
authorDiane Trout <diane@caltech.edu>
Fri, 5 Mar 2010 22:41:13 +0000 (22:41 +0000)
committerDiane Trout <diane@caltech.edu>
Fri, 5 Mar 2010 22:41:13 +0000 (22:41 +0000)
This also needed to search flow flowcell id by the starting name
because we still have the status of a flowcell being part of the
name in a few places.

htsworkflow/frontend/experiments/experiments.py
scripts/make-library-tree

index 83b541437603197923c741550e5ca7da44dddf88..a156d7bdbe0198876f504b3f4b4276e115835787 100755 (executable)
@@ -23,7 +23,7 @@ def flowcell_information(flowcell_id):
     Return a dictionary describing a flowcell
     """
     try:
     Return a dictionary describing a flowcell
     """
     try:
-        fc = FlowCell.objects.get(flowcell_id=flowcell_id)
+        fc = FlowCell.objects.get(flowcell_id__startswith=flowcell_id)
     except FlowCell.DoesNotExist, e:
         return None
 
     except FlowCell.DoesNotExist, e:
         return None
 
index 50ce7a932ef263300c70d2e9483f18431ced6f5d..f4e854a0fe4cd742a7481513f67455b70291b2e0 100644 (file)
-#!/usr/bin/python
-"""
-Make a tree of symlinks organized by library id.
-"""
+#!/usr/bin/env python
+
 from ConfigParser import SafeConfigParser
 from ConfigParser import SafeConfigParser
-from glob import glob
-import logging
-from optparse import OptionParser
+import json
 import logging
 import os
 import logging
 import os
+from optparse import OptionParser
 import stat
 import stat
-import sys
-
-from htsworkflow.util import fctracker
-
-def find_lanes(flowcell_dir, flowcell_id, lane):
-    lane_name = "s_%s_eland_*" %(lane)
-    pattern = os.path.join(flowcell_dir, flowcell_id, "*", lane_name)
-    lanes = glob(pattern)
-    return lanes
-
-def make_long_lane_name(flowcell_dir, lane_pathname):
-    """
-    make a name from the eland result file name
-    """
-    if flowcell_dir == lane_pathname[0:len(flowcell_dir)]:
-        subpath = lane_pathname[len(flowcell_dir):]
-        long_name = subpath.replace(os.path.sep, "_")
-        return long_name
+import re
+import shelve
+import urllib
+import urllib2
+import urlparse
+
+eland_re = re.compile('s_(?P<lane>\d)(?P<read>_\d)?_eland_')
+raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
+qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
+
+class SequenceFile(object):
+    def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
+        self.filetype = filetype
+        self.path = path
+        self.flowcell = flowcell
+        self.lane = lane
+        self.read = read
+        self.pf = pf
+        self.cycle = cycle
+
+    def __hash__(self):
+        return hash(self.key())
+
+    def key(self):
+        return (self.flowcell, self.lane)
+
+    def unicode(self):
+        return unicode(self.path)
+
+    def __repr__(self):
+        return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
+
+    def make_target_name(self, root):
+        """
+        Create target name for where we need to link this sequence too
+        """
+        path, basename = os.path.split(self.path)
+        # Because the names aren't unque we include the flowcel name
+        # because there were different eland files for different length
+        # analyses, we include the cycle length in the name.
+        if self.filetype == 'eland':
+            template = "%(flowcell)s_%(cycle)s_%(eland)s"
+            basename = template % { 'flowcell': self.flowcell,
+                                    'cycle': self.cycle,
+                                    'eland': basename }
+        # else:
+        # all the other file types have names that include flowcell/lane
+        # information and thus are unique so we don't have to do anything
+        return os.path.join(root, basename)
+        
+def parse_srf(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    flowcell = records[4]
+    lane = int(records[5][0])
+    fullpath = os.path.join(path, filename)
+    return SequenceFile('srf', fullpath, flowcell, lane)
+
+def parse_qseq(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    fullpath = os.path.join(path, filename)
+    flowcell = records[4]
+    lane = int(records[5][1])
+    read = int(records[6][1])
+    return SequenceFile('qseq', fullpath, flowcell, lane, read)
+
+def parse_fastq(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    fullpath = os.path.join(path, filename)
+    flowcell = records[4]
+    lane = int(records[5][1])
+    read = int(records[6][1])
+    if records[-1] == 'pass':
+        pf = True
     else:
     else:
-        return None
+        pf = False
+    return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
+
+def parse_eland(path, filename, eland_match):
+    fullpath = os.path.join(path, filename)
+    rest, cycle = os.path.split(path)
+    rest, flowcell = os.path.split(rest)
+    lane = eland_match.group('lane')
+    read = eland_match.group('read')
+    return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
     
     
-def parse_srf_directory(srf_dir):
+def scan_for_sequences(dirs):
+    sequences = []
+    for d in dirs:
+        logging.info("Scanning %s for sequences" % (d,))
+        for path, dirname, filenames in os.walk(d):
+            for f in filenames:
+                seq = None
+                # find sequence files
+                if raw_seq_re.match(f):
+                    if f.endswith('.md5'):
+                        continue
+                    elif f.endswith('.srf') or f.endswith('.srf.bz2'):
+                        seq = parse_srf(path, f)
+                    elif qseq_re.match(f):
+                        seq = parse_qseq(path, f)
+                    elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
+                        seq = parse_fastq(path, f)
+                eland_match = eland_re.match(f)
+                if eland_match:
+                    if f.endswith('.md5'):
+                        continue
+                    seq = parse_eland(path, f, eland_match)
+                if seq:
+                    sequences.append(seq)
+                    logging.debug("Found sequence at %s" % (f,))
+                    
+    return sequences
+
+def retrieve_info(url, apidata):
     """
     """
-    search srf_dir for *.srf files
+    Return a dictionary from the HTSworkflow API
+    """
+    try:
+        apipayload = urllib.urlencode(apidata)
+        web = urllib2.urlopen(url, apipayload)
+    except urllib2.URLError, e:
+        if e.code == 404:
+            logging.info("%s was not found" % (url,))
+            return None
+        else:
+            errmsg = 'URLError: %d %s' % (e.code, e.msg)
+            raise IOError(errmsg)
+    
+    contents = web.read()
+    headers = web.info()
 
 
-    builds a dictionary indexed by flowcell name.
+    return json.loads(contents)
+
+def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey):
+    """
+    compare our flowcell database with our list of sequences and return
+    a fully populated database
     """
     """
-    flowcells = {}
-    srfs = glob(os.path.join(srf_dir,'*.srf'))
-    for pathname in srfs:
-        path, filename = os.path.split(pathname)
-        basename, ext = os.path.splitext(filename)
-        record = basename.split('_')
-        if len(record) != 6:
-            logging.error("Unrecognized srf file: %s expected 6 fields got %d" % (pathname,len(record)))
-            continue
-
-        site = record[0]
-        date = record[1]
-        machine = record[2]
-        runid = record[3]
-        flowcellid = record[4]
-        laneid = record[5]
-
-        desc = "_".join([site,date,machine,runid,flowcellid])
-        flowcells[flowcellid] = desc
-    return flowcells
+    fcdb = shelve.open(fcdb_filename)
+    libdb = {}
+    apidata = {'apiid': apiid, 'apikey': apikey}
+    for seq in sequences:
+        flowcell = seq.flowcell
+        flowcell_info = None
+
+        # get info about flowcell from server or shelf
+        if not fcdb.has_key(flowcell):
+            url = urlparse.urljoin(baseurl, 'experiments/config/%s/json' % (flowcell,))
+            flowcell_info = retrieve_info(url, apidata)
+            if flowcell_info is not None:
+                fcdb[flowcell] = flowcell_info
+        else:
+            flowcell_info = fcdb[flowcell]
 
 
+        # make library id db
+        if flowcell_info is not None:
+            seq_library_id = flowcell_info['lane_set'][unicode(seq.lane)]['library_id']
+            libdb.setdefault(seq_library_id, []).append(seq)
+           
+    fcdb.sync()
+    return fcdb, libdb
 
 def carefully_make_hardlink(source, destination, dry_run=False):
     """
 
 def carefully_make_hardlink(source, destination, dry_run=False):
     """
@@ -65,88 +178,111 @@ def carefully_make_hardlink(source, destination, dry_run=False):
     Checking to see if the link already exists and is
     the same as the link we want to make.
     If the link already exists and is different, throw an error.
     Checking to see if the link already exists and is
     the same as the link we want to make.
     If the link already exists and is different, throw an error.
+
+    If we didn't update anything return 0, if we did update
+    return 1.
     """
     logging.debug("CHECKING: %s -> %s", source, destination)
 
     if not os.path.exists(source):
         logging.warning("%s doesn't exist", source)
     """
     logging.debug("CHECKING: %s -> %s", source, destination)
 
     if not os.path.exists(source):
         logging.warning("%s doesn't exist", source)
-        return
+        return 0
 
     if os.path.exists(destination):
         if os.path.samefile(source, destination):
             logging.debug('SAME: %s -> %s' % (source, destination))
 
     if os.path.exists(destination):
         if os.path.samefile(source, destination):
             logging.debug('SAME: %s -> %s' % (source, destination))
-            return
+            return 0
         else:
             raise IOError('%s and %s are different files' % \
                            (source, destination))
         else:
             raise IOError('%s and %s are different files' % \
                            (source, destination))
-    logging.info('Linking: %s -> %s' % (source, destination))
+    logging.debug('Linking: %s -> %s' % (source, destination))
 
 
-    if dry_run: return 
+    # we would do something by this part
+    if dry_run: return 1
 
     os.link(source, destination)
     os.chmod(destination,
              stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH )
 
     os.link(source, destination)
     os.chmod(destination,
              stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH )
-
-def link_all_eland_lanes(library_path, flowcell_dir, flowcell_id, lane, dry_run):
-    """
-    find eland files at different alignment lengths
-    and put each of those in the file 
-    """
-    lanes = find_lanes(flowcell_dir, flowcell_id, lane)
-    for lane_pathname in lanes:
-        long_name = make_long_lane_name(flowcell_dir, 
-                                        lane_pathname)
-        long_pathname = os.path.join(library_path, long_name)
-        carefully_make_hardlink(lane_pathname,
-                                long_pathname,
-                                dry_run)
-
-def link_srf_lanes(srf_names, library_path, srf_dir, flowcell_id, lane, dry_run):
+    return 1
+    
+def make_library_links(root, library_db, dry_run=False):
     """
     """
-    Link srf files into our library directories.
+    Make a tree of sequencer roots organized by library id
 
 
-    the srf files must be named:
-    <site>_<date>_<machine>_<run>_<flowcellid>_<lane>.srf
+    Root is the root of the library tree
+    library_db is a dictionary of SequenceFiles organized by library id
     """
     """
-    srf_basename = srf_names.get(flowcell_id, None)
-    if srf_basename is None:
-        logging.info("srf file for %s was not found", flowcell_id)
-    else:
-        srf_filename = "%s_%s.srf" % (srf_basename, lane)
-        source = os.path.join(srf_dir, srf_filename)
-        destination = os.path.join(library_path, srf_filename)
-        carefully_make_hardlink(source, destination, dry_run)
+    count = 0
+    root = os.path.abspath(root)
+    for lib_id, sequences in library_db.items():
+        target_dir = os.path.join(root, lib_id)
+        if not os.path.exists(target_dir):
+            logging.info("mkdir %s" % (target_dir,))
+            if not dry_run:
+                os.mkdir(target_dir)
+            
+        for s in sequences:
+            count += carefully_make_hardlink(s.path,
+                                             s.make_target_name(target_dir),
+                                             dry_run=dry_run)
+    return count
+
+def configure_logging(opts):
+    # setup logging
+    level = logging.WARN
+    if opts.verbose:
+        level = logging.INFO
+    if opts.debug:
+        level = logging.DEBUG
+    logging.basicConfig(level=level)
     
 
     
 
-def make_library_tree(fcdb, library_dir, flowcell_dir, srfs_dir,
-                      dry_run=False):
+def configure_opts(opts):
     """
     """
-    Iterate over the library 
+    Load in options from config file
     """
     """
-    library_dir = os.path.normpath(library_dir) + os.path.sep
-    flowcell_dir = os.path.normpath(flowcell_dir) + os.path.sep
-    srfs_dir = os.path.normpath(srfs_dir) + os.path.sep
-
-    srf_names = parse_srf_directory(srfs_dir)
-
-    for lib_id, lib in fcdb.library.items():
-        library_path = os.path.join(library_dir, str(lib_id))
-        if not os.path.exists(library_path):
-            os.mkdir(library_path)
-
-        for flowcell_id, lane in lib.get('lanes', []):
-            link_all_eland_lanes(library_path, 
-                                 flowcell_dir, 
-                                 flowcell_id, 
-                                 lane, 
-                                 dry_run)
-
-            link_srf_lanes(srf_names, 
-                           library_path, 
-                           srfs_dir,
-                           flowcell_id,
-                           lane,
-                           dry_run)
+    SECTION_NAME = 'sequence_archive'
+    ARCHIVE_OPT = 'sequence_archive'
+    CACHE_OPT = 'cache'
+    HOST_OPT = 'host'
+    APIID_OPT = 'apiid'
+    APIKEY_OPT = 'apikey'
+
+    # figure out what config file to read
+    config_path = [os.path.expanduser('~/.htsworkflow.ini'),
+                   '/etc/htsworkflow.ini']
+    if opts.config is not None:
+        config_path = [opts.config]
+    # parse options from config file
+    config_file = SafeConfigParser()
+    config_file.read(config_path)
+
+    # load defaults from config file if not overriden by the command line
+    if opts.cache is None:
+        if config_file.has_option(SECTION_NAME, CACHE_OPT):
+            opts.cache = config_file.get(FRONTEND_NAME, CACHE_OPT)
+        else:
+            opts.cache = os.path.expanduser('~/.flowcelldb.shelve')
+
+    if opts.sequence_archive is None and \
+       config_file.has_option(SECTION_NAME, ARCHIVE_OPT):
+        opts.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT)
+
+    opts.sequence_archive = os.path.abspath(opts.sequence_archive)
+    opts.library_tree = os.path.join(opts.sequence_archive, 'libraries')
+    opts.flowcells = os.path.join(opts.sequence_archive, 'flowcells')
+    opts.srfs = os.path.join(opts.sequence_archive, 'srfs')
+
+    if opts.host is None and config_file.has_option(SECTION_NAME, HOST_OPT):
+        opts.host = config_file.get(SECTION_NAME, HOST_OPT)
+
+    if opts.apiid is None and config_file.has_option(SECTION_NAME, APIID_OPT):
+        opts.apiid = config_file.get(SECTION_NAME, APIID_OPT)
+
+    if opts.apikey is None and config_file.has_option(SECTION_NAME, APIKEY_OPT):
+        opts.apikey = config_file.get(SECTION_NAME, APIKEY_OPT)
+      
+    return opts
 
 def make_parser():
     """
 
 def make_parser():
     """
@@ -156,15 +292,18 @@ def make_parser():
     parser.add_option('-c', '--config', default=None,
                       help='path to a configuration file containing a '
                            'sequence archive section')
     parser.add_option('-c', '--config', default=None,
                       help='path to a configuration file containing a '
                            'sequence archive section')
-                      
-    parser.add_option("--database", dest="database",
-                      help="path to the fctracker.db",
-                      default=None)
+    parser.add_option('--cache', default=None,
+                      help="default flowcell cache")
+    
+    parser.add_option('--host', default=None,
+                      help="specify http://host for quering flowcell information")
+    parser.add_option('--apiid', default=None,
+                      help="API ID to use when retriving information")
+    parser.add_option("--apikey", default=None,
+                      help="API Key for when retriving information")
+    
     parser.add_option('-a', '--sequence-archive', default=None,
                       help='path to where the sequence archive lives')
     parser.add_option('-a', '--sequence-archive', default=None,
                       help='path to where the sequence archive lives')
-    parser.add_option("-w", "--where", dest="where",
-                      help="add a where clause",
-                      default=None)
 
     parser.add_option('-v', '--verbose', action='store_true', default=False,
                       help='be more verbose')
 
     parser.add_option('-v', '--verbose', action='store_true', default=False,
                       help='be more verbose')
@@ -176,66 +315,34 @@ def make_parser():
                       help="Don't modify the filesystem")
     return parser
 
                       help="Don't modify the filesystem")
     return parser
 
-def main(argv=None):
-    FRONTEND_NAME = 'frontend'
-    SECTION_NAME = 'sequence_archive'
-    DATABASE_OPT = 'database_name'
-    ARCHIVE_OPT = 'archive_path'
-
-    if argv is None:
-        argv = []
+def main(cmdline=None):
     parser = make_parser()
     parser = make_parser()
+    opts, args = parser.parse_args(cmdline)
 
 
-    # parse command line arguments
-    opt, args = parser.parse_args(argv)
-
-    # setup logging
-    level = logging.WARN
-    if opt.verbose:
-        level = logging.INFO
-    if opt.debug:
-        level = logging.DEBUG
-    logging.basicConfig(level=level)
-
-    # figure out what config file to read
-    config_path = [os.path.expanduser('~/.htsworkflow.ini'),
-                   '/etc/htsworkflow.ini']
-    if opt.config is not None:
-        config_path = [opt.config]
-    
-    # parse options from config file
-    config_file = SafeConfigParser()
-    config_file.read(config_path)
-
-    # load defaults from config file if not overriden by the command line
-    print opt.database
-    if opt.database is None and \
-       config_file.has_option(FRONTEND_NAME, DATABASE_OPT):
-        opt.database = config_file.get(FRONTEND_NAME, DATABASE_OPT)
-
-    if opt.sequence_archive is None and \
-       config_file.has_option(SECTION_NAME, ARCHIVE_OPT):
-        opt.sequence_archive = config_file.get(SECTION_NAME, ARCHIVE_OPT)
+    configure_logging(opts)
+    opts = configure_opts(opts)
   
     # complain if critical things are missing
   
     # complain if critical things are missing
-    if opt.database is None:
+    if opts.cache is None:
        parser.error('Need location of htsworkflow frontend database')
 
        parser.error('Need location of htsworkflow frontend database')
 
-    if opt.sequence_archive is None:
+    if opts.sequence_archive is None:
        parser.error('Need the root path for the sequence archive')
 
        parser.error('Need the root path for the sequence archive')
 
-    fcdb = fctracker.fctracker(opt.database)
-    cells = fcdb._get_flowcells(opt.where)
-
-    library_dir = os.path.join(opt.sequence_archive, 'libraries')
-    flowcell_dir = os.path.join(opt.sequence_archive, 'flowcells')
-    srfs_dir = os.path.join(opt.sequence_archive, 'srfs')
-    make_library_tree(fcdb, 
-                      library_dir, flowcell_dir, srfs_dir, 
-                      opt.dry_run)
-
+    seq_dirs = [ opts.flowcells, opts.srfs ]
+    if len(args) > 0:
+        seq_dirs = [os.path.abspath(f) for f in args]
+    
+    seqs = scan_for_sequences(seq_dirs)
+    fcdb, libdb = build_flowcell_db(opts.cache, seqs, opts.host, opts.apiid, opts.apikey)
+    updates = make_library_links(opts.library_tree, libdb, dry_run=opts.dry_run)
+    
+    logging.warn("%s flowcells in database" % (len(fcdb),))
+    logging.warn("found %s sequence files" % (len(seqs),))
+    logging.warn("%s libraries being checked" % (len(libdb),))
+    logging.warn("%s sequence files were linked" % (updates,))
+    
     return 0
     return 0
-
+    
 if __name__ == "__main__":
 if __name__ == "__main__":
-    rv = main(sys.argv[1:])
-    # sys.exit(rv)
+    main()