import re
eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
-raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
-qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
+raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
+qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
+
+SEQUENCE_TABLE_NAME = "sequences"
+def create_sequence_table(cursor):
+ """
+ Create a SQL table to hold SequenceFile entries
+ """
+ sql = """
+CREATE TABLE %(table)s (
+ filetype CHAR(8),
+ path TEXT,
+ flowcell CHAR(8),
+ lane INTEGER,
+ read INTEGER,
+ pf BOOLEAN,
+ cycle CHAR(8)
+);
+""" %( {'table': SEQUENCE_TABLE_NAME} )
+ return cursor.execute(sql)
class SequenceFile(object):
"""
# all the other file types have names that include flowcell/lane
# information and thus are unique so we don't have to do anything
return os.path.join(root, basename)
+
+ def save(self, cursor):
+ """
+ Add this entry to a DB2.0 database.
+ """
+ #FIXME: NEEDS SQL ESCAPING
+ header_macro = {'table': SEQUENCE_TABLE_NAME }
+ sql_header = "insert into %(table)s (" % header_macro
+ sql_columns = ['filetype','path','flowcell','lane']
+ sql_middle = ") values ("
+ sql_values = [self.filetype, self.path, self.flowcell, self.lane]
+ sql_footer = ");"
+ for name in ['read', 'pf', 'cycle']:
+ value = getattr(self, name)
+ if value is not None:
+ sql_columns.append(name)
+ sql_values.append(value)
+
+ sql = " ".join([sql_header,
+ ", ".join(sql_columns),
+ sql_middle,
+ # note the following makes a string like ?,?,?
+ ",".join(["?"] * len(sql_values)),
+ sql_footer])
+
+ return cursor.execute(sql, sql_values)
+
+def get_flowcell_cycle(path):
+ """
+ Extract flowcell, cycle from pathname
+ """
+ rest, cycle = os.path.split(path)
+ rest, flowcell = os.path.split(rest)
+ cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
+ if cycle_match is None:
+ raise ValueError("Expected .../flowcell/cycle/ directory structure")
+ start = cycle_match.group('start')
+ if start is not None:
+ start = int(start)
+ stop = cycle_match.group('stop')
+ if stop is not None:
+ stop = int(stop)
+
+ return flowcell, start, stop
def parse_srf(path, filename):
+ flowcell_dir, start, stop = get_flowcell_cycle(path)
basename, ext = os.path.splitext(filename)
records = basename.split('_')
flowcell = records[4]
lane = int(records[5][0])
fullpath = os.path.join(path, filename)
- return SequenceFile('srf', fullpath, flowcell, lane)
+
+ if flowcell_dir != flowcell:
+ logging.warn("flowcell %s found in wrong directory %s" % \
+ (flowcell, path))
+
+ return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
def parse_qseq(path, filename):
+ flowcell_dir, start, stop = get_flowcell_cycle(path)
basename, ext = os.path.splitext(filename)
records = basename.split('_')
fullpath = os.path.join(path, filename)
flowcell = records[4]
lane = int(records[5][1])
read = int(records[6][1])
- return SequenceFile('qseq', fullpath, flowcell, lane, read)
+
+ if flowcell_dir != flowcell:
+ logging.warn("flowcell %s found in wrong directory %s" % \
+ (flowcell, path))
+
+ return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
def parse_fastq(path, filename):
+ flowcell_dir, start, stop = get_flowcell_cycle(path)
basename, ext = os.path.splitext(filename)
records = basename.split('_')
fullpath = os.path.join(path, filename)
else:
raise ValueError("Unrecognized fastq name")
- return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
+ if flowcell_dir != flowcell:
+ logging.warn("flowcell %s found in wrong directory %s" % \
+ (flowcell, path))
+
+ return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf, cycle=stop)
def parse_eland(path, filename, eland_match=None):
if eland_match is None:
eland_match = eland_re.match(filename)
fullpath = os.path.join(path, filename)
- rest, cycle = os.path.split(path)
- rest, flowcell = os.path.split(rest)
+ flowcell, start, stop = get_flowcell_cycle(path)
if eland_match.group('lane'):
lane = int(eland_match.group('lane'))
else:
read = int(eland_match.group('read'))
else:
read = None
- return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
+ return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
def scan_for_sequences(dirs):
"""
Scan through a list of directories for sequence like files
"""
- # be forgiving if someone just gives us a string
- if type(dirs) != type([]):
- dirs = [dirs]
-
sequences = []
for d in dirs:
logging.info("Scanning %s for sequences" % (d,))
"""
Make sure the sequence archive class works
"""
+ def test_flowcell_cycle(self):
+ """
+ Make sure code to parse directory heirarchy works
+ """
+ path = '/root/42BW9AAXX/C1-152'
+ flowcell, start, stop = sequences.get_flowcell_cycle(path)
+
+ self.failUnlessEqual(flowcell, '42BW9AAXX')
+ self.failUnlessEqual(start, 1)
+ self.failUnlessEqual(stop, 152)
+
+ path = '/root/42BW9AAXX/other'
+ self.failUnlessRaises(ValueError, sequences.get_flowcell_cycle, path)
+
+
def test_srf(self):
path = '/root/42BW9AAXX/C1-38'
name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_4.srf'
self.failUnlessEqual(f.lane, 4)
self.failUnlessEqual(f.read, None)
self.failUnlessEqual(f.pf, None)
- self.failUnlessEqual(f.cycle, None)
+ self.failUnlessEqual(f.cycle, 38)
def test_qseq(self):
path = '/root/42BW9AAXX/C1-36'
self.failUnlessEqual(f.lane, 4)
self.failUnlessEqual(f.read, 1)
self.failUnlessEqual(f.pf, None)
- self.failUnlessEqual(f.cycle, None)
+ self.failUnlessEqual(f.cycle, 36)
+
+
+ path = '/root/ilmn200901/C1-202'
+ name = 'woldlab_090125_HWI-EAS_0000_ilmn200901_l1_r1.tar.bz2'
+ pathname = os.path.join(path, name)
+ f = sequences.parse_qseq(path, name)
+
+ self.failUnlessEqual(f.filetype, 'qseq')
+ self.failUnlessEqual(f.path, pathname)
+ self.failUnlessEqual(f.lane, 1)
+ self.failUnlessEqual(f.read, 1)
+ self.failUnlessEqual(f.pf, None)
+ self.failUnlessEqual(f.cycle, 202)
def test_fastq(self):
path = '/root/42BW9AAXX/C1-38'
self.failUnlessEqual(f.lane, 4)
self.failUnlessEqual(f.read, 1)
self.failUnlessEqual(f.pf, True)
- self.failUnlessEqual(f.cycle, None)
+ self.failUnlessEqual(f.cycle, 38)
name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r2_nopass.fastq.bz2'
pathname = os.path.join(path,name)
self.failUnlessEqual(f.lane, 4)
self.failUnlessEqual(f.read, 2)
self.failUnlessEqual(f.pf, False)
- self.failUnlessEqual(f.cycle, None)
+ self.failUnlessEqual(f.cycle, 38)
def test_eland(self):
path = '/root/42BW9AAXX/C1-38'
self.failUnlessEqual(f.lane, 4)
self.failUnlessEqual(f.read, None)
self.failUnlessEqual(f.pf, None)
- self.failUnlessEqual(f.cycle, 'C1-38')
+ self.failUnlessEqual(f.cycle, 38)
path = '/root/42BW9AAXX/C1-152'
name = 's_4_1_eland_extended.txt.bz2'
self.failUnlessEqual(f.lane, 4)
self.failUnlessEqual(f.read, 1)
self.failUnlessEqual(f.pf, None)
- self.failUnlessEqual(f.cycle, 'C1-152')
+ self.failUnlessEqual(f.cycle, 152)
def test_sequence_file_equality(self):
path = '/root/42BW9AAXX/C1-38'
self.failUnlessEqual(f1_qseq, f2_qseq)
+ def test_sql(self):
+ """
+ Make sure that the quick and dirty sql interface in sequences works
+ """
+ import sqlite3
+ db = sqlite3.connect(":memory:")
+ c = db.cursor()
+ sequences.create_sequence_table(c)
+
+ data = [('/root/42BW9AAXX/C1-152',
+ 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r1.tar.bz2'),
+ ('/root/42BW9AAXX/C1-152',
+ 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2'),
+ ('/root/42BW9AAXX/C1-152',
+ 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r1.tar.bz2'),
+ ('/root/42BW9AAXX/C1-152',
+ 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r21.tar.bz2'),]
+
+ for path, name in data:
+ seq = sequences.parse_qseq(path, name)
+ seq.save(c)
+
+ count = c.execute("select count(*) from sequences")
+ row = count.fetchone()
+ self.failUnlessEqual(row[0], 4)
+
+
def suite():
return unittest.makeSuite(SequenceFileTests,'test')