from htsworkflow.util.opener import autoopen
+# constants for our fastq finite state machine
+FASTQ_HEADER = 0
+FASTQ_SEQUENCE = 1
+FASTQ_SEQUENCE_HEADER = 2
+FASTQ_QUALITY = 3
+
def main(cmdline=None):
parser = make_parser()
def convert_single_to_fastq(instream, target1, header=''):
+
+ state = FASTQ_HEADER
for line in instream:
+ line = line.strip()
# sequence header
- if line[0] == '@':
- line = line.strip()
- target1.write('@')
- target1.write(header)
- target1.write(line[1:])
- target1.write(os.linesep)
-
+ if state == FASTQ_HEADER:
+ write_header(target1, header, line)
+ state = FASTQ_SEQUENCE
+ # sequence
+ elif state == FASTQ_SEQUENCE:
+ write_sequence(target1, line)
+ state = FASTQ_SEQUENCE_HEADER
# quality header
- elif line[0] == '+':
- target1.write(line)
+ elif state == FASTQ_SEQUENCE_HEADER:
+ # the sequence header isn't really sequence, but
+ # we're just passing it through
+ write_sequence(target1, line)
+ state = FASTQ_QUALITY
# sequence or quality data
+ elif state == FASTQ_QUALITY:
+ write_sequence(target1, line)
+ state = FASTQ_HEADER
else:
- target1.write(line)
+ raise RuntimeError("Unrecognized STATE in fastq split")
+
+
def convert_single_to_two_fastq(instream, target1, target2, mid=None, header=''):
+ """
+ read a fastq file where two paired ends have been run together into
+ two halves.
+
+ instream is the source stream
+ target1 and target2 are the destination streams
+ """
if mid is not None:
mid = int(mid)
+ state = FASTQ_HEADER
for line in instream:
+ line = line.strip()
# sequence header
- if line[0] == '@':
- line = line.strip()
- target1.write('@')
- target1.write(header)
- target1.write(line[1:])
- target1.write("/1")
- target1.write(os.linesep)
-
- target2.write('@')
- target2.write(header)
- target2.write(line[1:])
- target2.write("/2")
- target2.write(os.linesep)
-
+ if state == FASTQ_HEADER:
+ write_header(target1, header, line, "/1")
+ write_header(target2, header, line, "/2")
+ state = FASTQ_SEQUENCE
+ # sequence
+ elif state == FASTQ_SEQUENCE:
+ if mid is None:
+ mid = len(line)/2
+ write_split_sequence(target1, target2, line, mid)
+ state = FASTQ_SEQUENCE_HEADER
# quality header
- elif line[0] == '+':
- target1.write(line)
- target2.write(line)
+ elif state == FASTQ_SEQUENCE_HEADER:
+ # the sequence header isn't really sequence, but
+ # we're just passing it through
+ write_sequence(target1, line)
+ write_sequence(target2, line)
+
+ state = FASTQ_QUALITY
# sequence or quality data
+ elif state == FASTQ_QUALITY:
+ write_split_sequence(target1, target2, line, mid)
+ state = FASTQ_HEADER
else:
- line = line.strip()
- if mid is None:
- mid = len(line)/2
- target1.write(line[:mid])
- target1.write(os.linesep)
- target2.write(line[mid:])
- target2.write(os.linesep)
+ raise RuntimeError("Unrecognized STATE in fastq split")
+
+def write_header(target, prefix, line, suffix=''):
+ target.write('@')
+ target.write(prefix)
+ target.write(line[1:])
+ target.write(suffix)
+ target.write(os.linesep)
+
+def write_sequence(target, line):
+ target.write(line)
+ target.write(os.linesep)
+
+def write_split_sequence(target1, target2, line, mid):
+ target1.write(line[:mid])
+ target1.write(os.linesep)
+
+ target2.write(line[mid:])
+ target2.write(os.linesep)
def is_srf(filename):
"""
--- /dev/null
+import os
+from StringIO import StringIO
+import sys
+import unittest
+
+_module_path, _module_name = os.path.split(__file__)
+sys.path.append(os.path.join(_module_path, '..', 'scripts'))
+
+import srf2named_fastq
+
+class testSrf2Fastq(unittest.TestCase):
+ def test_split_good(self):
+ source = StringIO("""@header
+AGCTTTTT
++
+IIIIB+++
+""")
+ target1 = StringIO()
+ target2 = StringIO()
+
+ srf2named_fastq.convert_single_to_two_fastq(source, target1, target2)
+
+ target1.seek(0)
+ lines1 = target1.readlines()
+ self.failUnlessEqual(len(lines1),4)
+ self.failUnlessEqual(lines1[0].rstrip(), '@header/1')
+ self.failUnlessEqual(lines1[1].rstrip(), 'AGCT')
+ self.failUnlessEqual(lines1[2].rstrip(), '+')
+ self.failUnlessEqual(lines1[3].rstrip(), 'IIII')
+
+ target2.seek(0)
+ lines2 = target2.readlines()
+ self.failUnlessEqual(len(lines2),4)
+ self.failUnlessEqual(lines2[0].rstrip(), '@header/2')
+ self.failUnlessEqual(lines2[1].rstrip(), 'TTTT')
+ self.failUnlessEqual(lines2[2].rstrip(), '+')
+ self.failUnlessEqual(lines2[3].rstrip(), 'B+++')
+
+ def test_split_at_with_header(self):
+ source = StringIO("""@header1
+AGCTTTTT
++
+@IIIB+++
+@header2
+AGCTTTTT
++
+IIIIB+++
+""")
+ target1 = StringIO()
+ target2 = StringIO()
+
+ srf2named_fastq.convert_single_to_two_fastq(source, target1, target2, header="foo_")
+
+ target1.seek(0)
+ lines1 = target1.readlines()
+ self.failUnlessEqual(len(lines1),8)
+ self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1/1')
+ self.failUnlessEqual(lines1[1].rstrip(), 'AGCT')
+ self.failUnlessEqual(lines1[2].rstrip(), '+')
+ self.failUnlessEqual(lines1[3].rstrip(), '@III')
+
+ target2.seek(0)
+ lines2 = target2.readlines()
+ self.failUnlessEqual(len(lines2),8)
+ self.failUnlessEqual(lines2[0].rstrip(), '@foo_header1/2')
+ self.failUnlessEqual(lines2[1].rstrip(), 'TTTT')
+ self.failUnlessEqual(lines2[2].rstrip(), '+')
+ self.failUnlessEqual(lines2[3].rstrip(), 'B+++')
+
+ def test_single_at(self):
+ source = StringIO("""@header1
+AGCTTTTT
++
+@IIIB+++
+@header2
+AGCTTTTT
++
+IIIIB+++
+""")
+ target1 = StringIO()
+
+ srf2named_fastq.convert_single_to_fastq(source, target1)
+
+ target1.seek(0)
+ lines1 = target1.readlines()
+ self.failUnlessEqual(len(lines1),8)
+ self.failUnlessEqual(lines1[0].rstrip(), '@header1')
+ self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT')
+ self.failUnlessEqual(lines1[2].rstrip(), '+')
+ self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++')
+
+ def test_single_at_with_header(self):
+ source = StringIO("""@header1
+AGCTTTTT
++
+@IIIB+++
+@header2
+AGCTTTTT
++
+IIIIB+++
+""")
+ target1 = StringIO()
+
+ srf2named_fastq.convert_single_to_fastq(source, target1, header="foo_")
+
+ target1.seek(0)
+ lines1 = target1.readlines()
+ self.failUnlessEqual(len(lines1),8)
+ self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1')
+ self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT')
+ self.failUnlessEqual(lines1[2].rstrip(), '+')
+ self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++')
+
+
+def suite():
+ return unittest.makeSuite(testSrf2Fastq,'test')
+
+if __name__ == "__main__":
+ unittest.main(defaultTest="suite")