From: Diane Trout Date: Mon, 21 Apr 2008 23:34:51 +0000 (+0000) Subject: split the library script into a reusable database/reporting layer X-Git-Tag: stanford.caltech-merged-database-2009-jan-15~77 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=b6a3910e73bbb83d7bc1aa04784021e312bfcabc split the library script into a reusable database/reporting layer and command line script. --- diff --git a/gaworkflow/util/fctracker.py b/gaworkflow/util/fctracker.py new file mode 100644 index 0000000..17fefb6 --- /dev/null +++ b/gaworkflow/util/fctracker.py @@ -0,0 +1,154 @@ +""" +Provide some quick and dirty access and reporting for the fctracker database. + +The advantage to this code is that it doesn't depend on django being +installed, so it can run on machines other than the webserver. +""" +import datetime +import os +import sys +import time + +if sys.version_info[0] + sys.version_info[1] * 0.1 >= 2.5: + # we're python 2.5 + import sqlite3 +else: + import pysqlite2.dbapi2 as sqlite3 + + +class fctracker: + """ + provide a simple way to interact with the flowcell data in fctracker.db + """ + def __init__(self, database): + # default to the current directory + if database is None: + self.database = self._guess_fctracker_path() + else: + self.database = database + self.conn = sqlite3.connect(self.database) + self._get_library() + self._get_species() + + def _guess_fctracker_path(self): + """ + Guess a few obvious places for the database + """ + fctracker = 'fctracker.db' + name = fctracker + # is it in the current dir? + if os.path.exists(name): + return name + name = os.path.expanduser(os.path.join('~', fctracker)) + if os.path.exists(name): + return name + raise RuntimeError("Can't find fctracker") + + def _make_dict_from_table(self, table_name, pkey_name): + """ + Convert a django table into a dictionary indexed by the primary key. + Yes, it really does just load everything into memory, hopefully + we stay under a few tens of thousands of runs for a while. + """ + table = {} + c = self.conn.cursor() + c.execute('select * from %s;' % (table_name)) + # extract just the field name + description = [ f[0] for f in c.description] + for row in c: + row_dict = dict(zip(description, row)) + table[row_dict[pkey_name]] = row_dict + c.close() + return table + + def _get_library(self): + """ + attach the library dictionary to the instance + """ + self.library = self._make_dict_from_table( + 'fctracker_library', + 'library_id') + + + def _get_species(self): + """ + attach the species dictionary to the instance + """ + self.species = self._make_dict_from_table( + 'fctracker_species', + 'id' + ) + + def _get_flowcells(self, where=None): + """ + attach the flowcell dictionary to the instance + + where is a sql where clause. (eg "where run_date > '2008-1-1'") + that can be used to limit what flowcells we select + FIXME: please add sanitization code + """ + if where is None: + where = "" + self.flowcells = {} + c = self.conn.cursor() + c.execute('select * from fctracker_flowcell %s;' % (where)) + # extract just the field name + description = [ f[0] for f in c.description ] + for row in c: + row_dict = dict(zip(description, row)) + for lane in [ 'lane_%d_library' % (i) for i in range(1,9) ]: + lane_library = self.library[row_dict[lane+"_id"]] + species_id = lane_library['library_species_id'] + lane_library['library_species'] = self.species[species_id] + row_dict[lane] = lane_library + # some useful parsing + run_date = time.strptime(row_dict['run_date'], '%Y-%m-%d %H:%M:%S') + run_date = datetime.datetime(*run_date[:6]) + row_dict['run_date'] = run_date + self.flowcells[row_dict['flowcell_id']] = row_dict + return self.flowcells + +def recoverable_drive_report(flowcells): + """ + Attempt to report what flowcells are still on a hard drive + """ + def flowcell_gone(cell): + """ + Use a variety of heuristics to determine if the flowcell drive + has been deleted. + """ + name = cell['flowcell_id'] + if 'failed' in name: + return True + if 'deleted' in name: + return True + if 'not run' in name: + return True + return False + + # sort flowcells by run date + flowcell_list = [] + for key, cell in flowcells.items(): + flowcell_list.append( (cell['run_date'], key) ) + flowcell_list.sort() + + report = [] + line = "%(date)s %(id)s %(lane)s %(library_name)s (%(library_id)s) " + line += "%(species)s" + for run_date, flowcell_id in flowcell_list: + cell = flowcells[flowcell_id] + if flowcell_gone(cell): + continue + for l in range(1,9): + lane = 'lane_%d' % (l) + cell_library = cell['%s_library'%(lane)] + fields = { + 'date': cell['run_date'].strftime('%y-%b-%d'), + 'id': cell['flowcell_id'], + 'lane': l, + 'library_name': cell_library['library_name'], + 'library_id': cell['%s_library_id'%(lane)], + 'species': cell_library['library_species']['scientific_name'], + } + report.append(line % (fields)) + return os.linesep.join(report) diff --git a/scripts/library.py b/scripts/library.py index 6263c9a..eeeed90 100644 --- a/scripts/library.py +++ b/scripts/library.py @@ -1,97 +1,22 @@ -import datetime +""" +Provide some quick and dirty access and reporting for the fctracker database. + +The advantage to this code is that it doesn't depend on django being +installed, so it can run on machines other than the webserver. +""" from optparse import OptionParser -from pprint import pprint -import sqlite3 import sys -import time - -class fctracker: - def __init__(self): - self.conn = sqlite3.connect('fctracker.db') - self._get_library() - self._get_species() - - def _make_dict_from_table(self, table_name, pkey_name): - table = {} - c = self.conn.cursor() - c.execute('select * from %s;' % (table_name)) - # extract just the field name - description = [ f[0] for f in c.description] - for row in c: - row_dict = dict(zip(description, row)) - table[row_dict[pkey_name]] = row_dict - c.close() - return table - - def _get_library(self): - self.library = self._make_dict_from_table( - 'fctracker_library', - 'library_id') - - - def _get_species(self): - self.species = self._make_dict_from_table( - 'fctracker_species', - 'id' - ) - - def _get_flowcells(self, where=None): - if where is None: - where = "" - self.flowcells = {} - c = self.conn.cursor() - c.execute('select * from fctracker_flowcell %s;' % (where)) - # extract just the field name - description = [ f[0] for f in c.description ] - for row in c: - row_dict = dict(zip(description, row)) - for lane in [ 'lane_%d_library' % (i) for i in range(1,9) ]: - lane_library = self.library[row_dict[lane+"_id"]] - species_id = lane_library['library_species_id'] - lane_library['library_species'] = self.species[species_id] - row_dict[lane] = lane_library - # some useful parsing - run_date = time.strptime(row_dict['run_date'], '%Y-%m-%d %H:%M:%S') - run_date = datetime.datetime(*run_date[:6]) - row_dict['run_date'] = run_date - self.flowcells[row_dict['flowcell_id']] = row_dict - return self.flowcells - -def flowcell_gone(name): - if 'failed' in name: - return True - if 'deleted' in name: - return True - if 'not run' in name: - return True - return False - -def report(flowcells): - flowcell_list = [] - for key, cell in flowcells.items(): - flowcell_list.append( (cell['run_date'], key) ) - flowcell_list.sort() - for run_date, flowcell_id in flowcell_list: - cell = flowcells[flowcell_id] - if flowcell_gone(cell['flowcell_id']): - continue - #pprint(cell) - #print cell['flowcell_id'], cell['run_date'] - for l in range(1,9): - lane = 'lane_%d' % (l) - print cell['run_date'].strftime('%y-%b-%d'), - print cell['flowcell_id'], - #print " ", - print l,cell['%s_library'%(lane)]['library_name'], - print '(%s)' % (cell['%s_library_id'%(lane)]), - print cell['%s_library'%(lane)]['library_species']['scientific_name'] +from gaworkflow.util import fctracker def make_parser(): """ Make parser """ parser = OptionParser() + parser.add_option("-d", "--database", dest="database", + help="path to the fctracker.db", + default=None) parser.add_option("-w", "--where", dest="where", help="add a where clause", default=None) @@ -104,10 +29,10 @@ def main(argv=None): opt, args = parser.parse_args(argv) - fc = fctracker() + fc = fctracker.fctracker(opt.database) cells = fc._get_flowcells(opt.where) - report(cells) + print fctracker.recoverable_drive_report(cells) return 0 if __name__ == "__main__":