--- /dev/null
+"""
+Utilities to work with the various eras of sequence archive files
+"""
+import logging
+import os
+import re
+
+eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
+raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
+qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
+
+class SequenceFile(object):
+ """
+ Simple container class that holds the path to a sequence archive
+ and basic descriptive information.
+ """
+ def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
+ self.filetype = filetype
+ self.path = path
+ self.flowcell = flowcell
+ self.lane = lane
+ self.read = read
+ self.pf = pf
+ self.cycle = cycle
+
+ def __hash__(self):
+ return hash(self.key())
+
+ def key(self):
+ return (self.flowcell, self.lane)
+
+ def unicode(self):
+ return unicode(self.path)
+
+ def __eq__(self, other):
+ """
+ Equality is defined if everything but the path matches
+ """
+ attributes = ['filetype','flowcell', 'lane', 'read', 'pf', 'cycle']
+ for a in attributes:
+ if getattr(self, a) != getattr(other, a):
+ return False
+
+ return True
+
+ def __repr__(self):
+ return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
+
+ def make_target_name(self, root):
+ """
+ Create target name for where we need to link this sequence too
+ """
+ path, basename = os.path.split(self.path)
+ # Because the names aren't unque we include the flowcel name
+ # because there were different eland files for different length
+ # analyses, we include the cycle length in the name.
+ if self.filetype == 'eland':
+ template = "%(flowcell)s_%(cycle)s_%(eland)s"
+ basename = template % { 'flowcell': self.flowcell,
+ 'cycle': self.cycle,
+ 'eland': basename }
+ # else:
+ # all the other file types have names that include flowcell/lane
+ # information and thus are unique so we don't have to do anything
+ return os.path.join(root, basename)
+
+def parse_srf(path, filename):
+ basename, ext = os.path.splitext(filename)
+ records = basename.split('_')
+ flowcell = records[4]
+ lane = int(records[5][0])
+ fullpath = os.path.join(path, filename)
+ return SequenceFile('srf', fullpath, flowcell, lane)
+
+def parse_qseq(path, filename):
+ basename, ext = os.path.splitext(filename)
+ records = basename.split('_')
+ fullpath = os.path.join(path, filename)
+ flowcell = records[4]
+ lane = int(records[5][1])
+ read = int(records[6][1])
+ return SequenceFile('qseq', fullpath, flowcell, lane, read)
+
+def parse_fastq(path, filename):
+ basename, ext = os.path.splitext(filename)
+ records = basename.split('_')
+ fullpath = os.path.join(path, filename)
+ flowcell = records[4]
+ lane = int(records[5][1])
+ read = int(records[6][1])
+ if records[-1].startswith('pass'):
+ pf = True
+ elif records[-1].startswith('nopass'):
+ pf = False
+ else:
+ raise ValueError("Unrecognized fastq name")
+
+ return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
+
+def parse_eland(path, filename, eland_match=None):
+ if eland_match is None:
+ eland_match = eland_re.match(filename)
+ fullpath = os.path.join(path, filename)
+ rest, cycle = os.path.split(path)
+ rest, flowcell = os.path.split(rest)
+ if eland_match.group('lane'):
+ lane = int(eland_match.group('lane'))
+ else:
+ lane = None
+ if eland_match.group('read'):
+ read = int(eland_match.group('read'))
+ else:
+ read = None
+ return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
+
+def scan_for_sequences(dirs):
+ """
+ Scan through a list of directories for sequence like files
+ """
+ # be forgiving if someone just gives us a string
+ if type(dirs) != type([]):
+ dirs = [dirs]
+
+ sequences = []
+ for d in dirs:
+ logging.info("Scanning %s for sequences" % (d,))
+ for path, dirname, filenames in os.walk(d):
+ for f in filenames:
+ seq = None
+ # find sequence files
+ if raw_seq_re.match(f):
+ if f.endswith('.md5'):
+ continue
+ elif f.endswith('.srf') or f.endswith('.srf.bz2'):
+ seq = parse_srf(path, f)
+ elif qseq_re.match(f):
+ seq = parse_qseq(path, f)
+ elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
+ seq = parse_fastq(path, f)
+ eland_match = eland_re.match(f)
+ if eland_match:
+ if f.endswith('.md5'):
+ continue
+ seq = parse_eland(path, f, eland_match)
+ if seq:
+ sequences.append(seq)
+ logging.debug("Found sequence at %s" % (f,))
+
+ return sequences
--- /dev/null
+#!/usr/bin/env python
+import os
+import unittest
+
+from htsworkflow.pipelines import sequences
+
+class SequenceFileTests(unittest.TestCase):
+ """
+ Make sure the sequence archive class works
+ """
+ def test_srf(self):
+ path = '/root/42BW9AAXX/C1-38'
+ name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_4.srf'
+ pathname = os.path.join(path,name)
+ f = sequences.parse_srf(path, name)
+
+ self.failUnlessEqual(f.filetype, 'srf')
+ self.failUnlessEqual(f.path, pathname)
+ self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+ self.failUnlessEqual(f.lane, 4)
+ self.failUnlessEqual(f.read, None)
+ self.failUnlessEqual(f.pf, None)
+ self.failUnlessEqual(f.cycle, None)
+
+ def test_qseq(self):
+ path = '/root/42BW9AAXX/C1-36'
+ name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1.tar.bz2'
+ pathname = os.path.join(path,name)
+ f = sequences.parse_qseq(path, name)
+
+ self.failUnlessEqual(f.filetype, 'qseq')
+ self.failUnlessEqual(f.path, pathname)
+ self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+ self.failUnlessEqual(f.lane, 4)
+ self.failUnlessEqual(f.read, 1)
+ self.failUnlessEqual(f.pf, None)
+ self.failUnlessEqual(f.cycle, None)
+
+ def test_fastq(self):
+ path = '/root/42BW9AAXX/C1-38'
+ name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1_pass.fastq.bz2'
+ pathname = os.path.join(path,name)
+ f = sequences.parse_fastq(path, name)
+
+ self.failUnlessEqual(f.filetype, 'fastq')
+ self.failUnlessEqual(f.path, pathname)
+ self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+ self.failUnlessEqual(f.lane, 4)
+ self.failUnlessEqual(f.read, 1)
+ self.failUnlessEqual(f.pf, True)
+ self.failUnlessEqual(f.cycle, None)
+
+ name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r2_nopass.fastq.bz2'
+ pathname = os.path.join(path,name)
+ f = sequences.parse_fastq(path, name)
+
+ self.failUnlessEqual(f.filetype, 'fastq')
+ self.failUnlessEqual(f.path, pathname)
+ self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+ self.failUnlessEqual(f.lane, 4)
+ self.failUnlessEqual(f.read, 2)
+ self.failUnlessEqual(f.pf, False)
+ self.failUnlessEqual(f.cycle, None)
+
+ def test_eland(self):
+ path = '/root/42BW9AAXX/C1-38'
+ name = 's_4_eland_extended.txt.bz2'
+ pathname = os.path.join(path,name)
+ f = sequences.parse_eland(path, name)
+
+ self.failUnlessEqual(f.filetype, 'eland')
+ self.failUnlessEqual(f.path, pathname)
+ self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+ self.failUnlessEqual(f.lane, 4)
+ self.failUnlessEqual(f.read, None)
+ self.failUnlessEqual(f.pf, None)
+ self.failUnlessEqual(f.cycle, 'C1-38')
+
+ path = '/root/42BW9AAXX/C1-152'
+ name = 's_4_1_eland_extended.txt.bz2'
+ pathname = os.path.join(path,name)
+ f = sequences.parse_eland(path, name)
+
+ self.failUnlessEqual(f.filetype, 'eland')
+ self.failUnlessEqual(f.path, pathname)
+ self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+ self.failUnlessEqual(f.lane, 4)
+ self.failUnlessEqual(f.read, 1)
+ self.failUnlessEqual(f.pf, None)
+ self.failUnlessEqual(f.cycle, 'C1-152')
+
+ def test_sequence_file_equality(self):
+ path = '/root/42BW9AAXX/C1-38'
+ name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1.tar.bz2'
+
+ f1_qseq = sequences.parse_qseq(path, name)
+ f2_qseq = sequences.parse_qseq(path, name)
+
+ self.failUnlessEqual(f1_qseq, f2_qseq)
+
+def suite():
+ return unittest.makeSuite(SequenceFileTests,'test')
+
+if __name__ == "__main__":
+ unittest.main(defaultTest="suite")
import os
from optparse import OptionParser
import stat
-import re
import shelve
from htsworkflow.util import api
-
-eland_re = re.compile('s_(?P<lane>\d)(?P<read>_\d)?_eland_')
-raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
-qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
-
-class SequenceFile(object):
- def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
- self.filetype = filetype
- self.path = path
- self.flowcell = flowcell
- self.lane = lane
- self.read = read
- self.pf = pf
- self.cycle = cycle
-
- def __hash__(self):
- return hash(self.key())
-
- def key(self):
- return (self.flowcell, self.lane)
-
- def unicode(self):
- return unicode(self.path)
-
- def __repr__(self):
- return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
-
- def make_target_name(self, root):
- """
- Create target name for where we need to link this sequence too
- """
- path, basename = os.path.split(self.path)
- # Because the names aren't unque we include the flowcel name
- # because there were different eland files for different length
- # analyses, we include the cycle length in the name.
- if self.filetype == 'eland':
- template = "%(flowcell)s_%(cycle)s_%(eland)s"
- basename = template % { 'flowcell': self.flowcell,
- 'cycle': self.cycle,
- 'eland': basename }
- # else:
- # all the other file types have names that include flowcell/lane
- # information and thus are unique so we don't have to do anything
- return os.path.join(root, basename)
-
-def parse_srf(path, filename):
- basename, ext = os.path.splitext(filename)
- records = basename.split('_')
- flowcell = records[4]
- lane = int(records[5][0])
- fullpath = os.path.join(path, filename)
- return SequenceFile('srf', fullpath, flowcell, lane)
-
-def parse_qseq(path, filename):
- basename, ext = os.path.splitext(filename)
- records = basename.split('_')
- fullpath = os.path.join(path, filename)
- flowcell = records[4]
- lane = int(records[5][1])
- read = int(records[6][1])
- return SequenceFile('qseq', fullpath, flowcell, lane, read)
-
-def parse_fastq(path, filename):
- basename, ext = os.path.splitext(filename)
- records = basename.split('_')
- fullpath = os.path.join(path, filename)
- flowcell = records[4]
- lane = int(records[5][1])
- read = int(records[6][1])
- if records[-1] == 'pass':
- pf = True
- else:
- pf = False
- return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
-
-def parse_eland(path, filename, eland_match):
- fullpath = os.path.join(path, filename)
- rest, cycle = os.path.split(path)
- rest, flowcell = os.path.split(rest)
- lane = eland_match.group('lane')
- read = eland_match.group('read')
- return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
-
-def scan_for_sequences(dirs):
- sequences = []
- for d in dirs:
- logging.info("Scanning %s for sequences" % (d,))
- for path, dirname, filenames in os.walk(d):
- for f in filenames:
- seq = None
- # find sequence files
- if raw_seq_re.match(f):
- if f.endswith('.md5'):
- continue
- elif f.endswith('.srf') or f.endswith('.srf.bz2'):
- seq = parse_srf(path, f)
- elif qseq_re.match(f):
- seq = parse_qseq(path, f)
- elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
- seq = parse_fastq(path, f)
- eland_match = eland_re.match(f)
- if eland_match:
- if f.endswith('.md5'):
- continue
- seq = parse_eland(path, f, eland_match)
- if seq:
- sequences.append(seq)
- logging.debug("Found sequence at %s" % (f,))
-
- return sequences
-
+from htsworkflow.pipelines.sequences import scan_for_sequences
def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey):
"""