split the library script into a reusable database/reporting layer
authorDiane Trout <diane@caltech.edu>
Mon, 21 Apr 2008 23:34:51 +0000 (23:34 +0000)
committerDiane Trout <diane@caltech.edu>
Mon, 21 Apr 2008 23:34:51 +0000 (23:34 +0000)
and command line script.

gaworkflow/util/fctracker.py [new file with mode: 0644]
scripts/library.py

diff --git a/gaworkflow/util/fctracker.py b/gaworkflow/util/fctracker.py
new file mode 100644 (file)
index 0000000..17fefb6
--- /dev/null
@@ -0,0 +1,154 @@
+"""
+Provide some quick and dirty access and reporting for the fctracker database.
+
+The advantage to this code is that it doesn't depend on django being
+installed, so it can run on machines other than the webserver.
+"""
+import datetime
+import os
+import sys
+import time
+
+if sys.version_info[0] + sys.version_info[1] * 0.1 >= 2.5:
+  # we're python 2.5
+  import sqlite3
+else:
+  import pysqlite2.dbapi2 as sqlite3
+
+
+class fctracker:
+    """
+    provide a simple way to interact with the flowcell data in fctracker.db
+    """
+    def __init__(self, database):
+        # default to the current directory
+        if database is None: 
+            self.database = self._guess_fctracker_path()
+        else:
+            self.database = database
+        self.conn = sqlite3.connect(self.database)
+        self._get_library()
+        self._get_species()
+
+    def _guess_fctracker_path(self):
+        """
+        Guess a few obvious places for the database
+        """
+        fctracker = 'fctracker.db'
+        name = fctracker
+        # is it in the current dir?
+        if os.path.exists(name): 
+            return name
+        name = os.path.expanduser(os.path.join('~', fctracker))
+        if os.path.exists(name):
+            return name
+        raise RuntimeError("Can't find fctracker")
+
+    def _make_dict_from_table(self, table_name, pkey_name):
+        """
+        Convert a django table into a dictionary indexed by the primary key.
+        Yes, it really does just load everything into memory, hopefully
+        we stay under a few tens of thousands of runs for a while.
+        """
+        table = {}
+        c = self.conn.cursor()
+        c.execute('select * from %s;' % (table_name))
+        # extract just the field name
+        description = [ f[0] for f in c.description]
+        for row in c:
+            row_dict = dict(zip(description, row))
+            table[row_dict[pkey_name]] = row_dict
+        c.close()
+        return table
+
+    def _get_library(self):
+        """
+        attach the library dictionary to the instance
+        """
+        self.library = self._make_dict_from_table(
+                         'fctracker_library', 
+                         'library_id')
+                                                  
+        
+    def _get_species(self):
+        """
+        attach the species dictionary to the instance
+        """
+        self.species = self._make_dict_from_table(
+                         'fctracker_species',
+                         'id'
+                       )
+        
+    def _get_flowcells(self, where=None):
+        """
+        attach the flowcell dictionary to the instance
+
+        where is a sql where clause. (eg "where run_date > '2008-1-1'")
+        that can be used to limit what flowcells we select
+        FIXME: please add sanitization code
+        """
+        if where is None:
+            where = ""
+        self.flowcells = {}
+        c = self.conn.cursor()
+        c.execute('select * from fctracker_flowcell %s;' % (where))
+        # extract just the field name
+        description = [ f[0] for f in c.description ]
+        for row in c:
+            row_dict = dict(zip(description, row))
+            for lane in [ 'lane_%d_library' % (i) for i in range(1,9) ]:
+                lane_library = self.library[row_dict[lane+"_id"]]
+                species_id = lane_library['library_species_id']
+                lane_library['library_species'] = self.species[species_id]
+                row_dict[lane] = lane_library
+            # some useful parsing
+            run_date = time.strptime(row_dict['run_date'],  '%Y-%m-%d %H:%M:%S')
+            run_date = datetime.datetime(*run_date[:6])
+            row_dict['run_date'] = run_date
+            self.flowcells[row_dict['flowcell_id']] = row_dict
+        return self.flowcells
+
+def recoverable_drive_report(flowcells):
+    """
+    Attempt to report what flowcells are still on a hard drive
+    """
+    def flowcell_gone(cell):
+        """
+        Use a variety of heuristics to determine if the flowcell drive
+        has been deleted.
+        """
+        name = cell['flowcell_id']
+        if 'failed' in name:
+            return True
+        if 'deleted' in name:
+            return True
+        if 'not run' in name:
+            return True
+        return False
+
+    # sort flowcells by run date
+    flowcell_list = []
+    for key, cell in flowcells.items():
+        flowcell_list.append( (cell['run_date'], key) )
+    flowcell_list.sort()
+
+    report = []
+    line = "%(date)s %(id)s %(lane)s %(library_name)s (%(library_id)s) "
+    line += "%(species)s"
+    for run_date, flowcell_id in flowcell_list:
+        cell = flowcells[flowcell_id]
+        if flowcell_gone(cell):
+            continue
+        for l in range(1,9):
+            lane = 'lane_%d' % (l)
+            cell_library = cell['%s_library'%(lane)]
+            fields = {
+              'date': cell['run_date'].strftime('%y-%b-%d'),
+              'id': cell['flowcell_id'],
+              'lane': l,
+              'library_name': cell_library['library_name'],
+              'library_id': cell['%s_library_id'%(lane)],
+              'species': cell_library['library_species']['scientific_name'],
+            }
+            report.append(line % (fields))
+    return os.linesep.join(report)
index 6263c9ae0916e28a5047b7c192a78586014aa353..eeeed90e3f47e9f7b2fedafcfaef20a0a5d4f77d 100644 (file)
@@ -1,97 +1,22 @@
-import datetime
+"""
+Provide some quick and dirty access and reporting for the fctracker database.
+
+The advantage to this code is that it doesn't depend on django being
+installed, so it can run on machines other than the webserver.
+"""
 from optparse import OptionParser
-from pprint import pprint
-import sqlite3
 import sys
-import time
-
-class fctracker:
-    def __init__(self):
-        self.conn = sqlite3.connect('fctracker.db')
-        self._get_library()
-        self._get_species()
-
-    def _make_dict_from_table(self, table_name, pkey_name):
-        table = {}
-        c = self.conn.cursor()
-        c.execute('select * from %s;' % (table_name))
-        # extract just the field name
-        description = [ f[0] for f in c.description]
-        for row in c:
-            row_dict = dict(zip(description, row))
-            table[row_dict[pkey_name]] = row_dict
-        c.close()
-        return table
-
-    def _get_library(self):
-        self.library = self._make_dict_from_table(
-                         'fctracker_library', 
-                         'library_id')
-                                                  
-        
-    def _get_species(self):
-        self.species = self._make_dict_from_table(
-                         'fctracker_species',
-                         'id'
-                       )
-        
-    def _get_flowcells(self, where=None):
-        if where is None:
-            where = ""
-        self.flowcells = {}
-        c = self.conn.cursor()
-        c.execute('select * from fctracker_flowcell %s;' % (where))
-        # extract just the field name
-        description = [ f[0] for f in c.description ]
-        for row in c:
-            row_dict = dict(zip(description, row))
-            for lane in [ 'lane_%d_library' % (i) for i in range(1,9) ]:
-                lane_library = self.library[row_dict[lane+"_id"]]
-                species_id = lane_library['library_species_id']
-                lane_library['library_species'] = self.species[species_id]
-                row_dict[lane] = lane_library
-            # some useful parsing
-            run_date = time.strptime(row_dict['run_date'],  '%Y-%m-%d %H:%M:%S')
-            run_date = datetime.datetime(*run_date[:6])
-            row_dict['run_date'] = run_date
-            self.flowcells[row_dict['flowcell_id']] = row_dict
-        return self.flowcells
 
-
-def flowcell_gone(name):
-    if 'failed' in name:
-        return True
-    if 'deleted' in name:
-        return True
-    if 'not run' in name:
-        return True
-    return False
-
-def report(flowcells):
-    flowcell_list = []
-    for key, cell in flowcells.items():
-        flowcell_list.append( (cell['run_date'], key) )
-    flowcell_list.sort()
-    for run_date, flowcell_id in flowcell_list:
-        cell = flowcells[flowcell_id]
-        if flowcell_gone(cell['flowcell_id']):
-            continue
-        #pprint(cell)
-        #print cell['flowcell_id'], cell['run_date']
-        for l in range(1,9):
-            lane = 'lane_%d' % (l)
-            print cell['run_date'].strftime('%y-%b-%d'),
-            print cell['flowcell_id'],
-            #print "  ",
-            print l,cell['%s_library'%(lane)]['library_name'],
-            print '(%s)' % (cell['%s_library_id'%(lane)]),
-            print cell['%s_library'%(lane)]['library_species']['scientific_name']
+from gaworkflow.util import fctracker
 
 def make_parser():
     """
     Make parser
     """
     parser = OptionParser()
+    parser.add_option("-d", "--database", dest="database",
+                      help="path to the fctracker.db",
+                      default=None)
     parser.add_option("-w", "--where", dest="where",
                       help="add a where clause",
                       default=None)
@@ -104,10 +29,10 @@ def main(argv=None):
 
     opt, args = parser.parse_args(argv)
     
-    fc = fctracker()
+    fc = fctracker.fctracker(opt.database)
     cells = fc._get_flowcells(opt.where)
 
-    report(cells)
+    print fctracker.recoverable_drive_report(cells)
     return 0
 
 if __name__ == "__main__":