If a quality score started with an @ sign it was treated as a header
authorDiane Trout <diane@caltech.edu>
Wed, 7 Jul 2010 00:19:37 +0000 (00:19 +0000)
committerDiane Trout <diane@caltech.edu>
Wed, 7 Jul 2010 00:19:37 +0000 (00:19 +0000)
which created an invalid fastq file.

This patch fixes that, and introduces some test cases for srf2named_fastq.py

scripts/srf2named_fastq.py
test/test_srf2fastq.py [new file with mode: 0644]

index 22edc12743b52f1f6b9141cdf566bc1fe28e4f19..ebf33232a1c126a0ff1f7d0e7e3ce3d3bb1179fa 100755 (executable)
@@ -7,6 +7,12 @@ import sys
 
 from htsworkflow.util.opener import autoopen
 
+# constants for our fastq finite state machine
+FASTQ_HEADER = 0
+FASTQ_SEQUENCE = 1
+FASTQ_SEQUENCE_HEADER = 2
+FASTQ_QUALITY = 3
+
 
 def main(cmdline=None):
     parser = make_parser()
@@ -85,55 +91,90 @@ def srf_open(filename, cnf1=False):
     
 
 def convert_single_to_fastq(instream, target1, header=''):
+
+    state = FASTQ_HEADER
     for line in instream:
+        line = line.strip()
         # sequence header
-        if line[0] == '@':
-            line = line.strip()
-            target1.write('@')
-            target1.write(header)
-            target1.write(line[1:])
-            target1.write(os.linesep)
-
+        if state == FASTQ_HEADER:
+            write_header(target1, header, line)
+            state = FASTQ_SEQUENCE
+        # sequence
+        elif state == FASTQ_SEQUENCE:
+            write_sequence(target1, line)
+            state = FASTQ_SEQUENCE_HEADER
         # quality header
-        elif line[0] == '+':
-            target1.write(line)
+        elif state == FASTQ_SEQUENCE_HEADER:
+            # the sequence header isn't really sequence, but 
+            # we're just passing it through
+            write_sequence(target1, line)
+            state = FASTQ_QUALITY
         # sequence or quality data
+        elif state == FASTQ_QUALITY:
+            write_sequence(target1, line)
+            state = FASTQ_HEADER
         else:
-            target1.write(line)
+            raise RuntimeError("Unrecognized STATE in fastq split")
+
+
         
 def convert_single_to_two_fastq(instream, target1, target2, mid=None, header=''):
+    """
+    read a fastq file where two paired ends have been run together into 
+    two halves.
+
+    instream is the source stream
+    target1 and target2 are the destination streams
+    """
     if mid is not None:
         mid = int(mid)
 
+    state = FASTQ_HEADER
     for line in instream:
+        line = line.strip()
         # sequence header
-        if line[0] == '@':
-            line = line.strip()
-            target1.write('@')
-            target1.write(header)
-            target1.write(line[1:])
-            target1.write("/1")
-            target1.write(os.linesep)
-
-            target2.write('@')
-            target2.write(header)
-            target2.write(line[1:])
-            target2.write("/2")
-            target2.write(os.linesep)
-
+        if state == FASTQ_HEADER:
+            write_header(target1, header, line, "/1")
+            write_header(target2, header, line, "/2")
+            state = FASTQ_SEQUENCE
+        # sequence
+        elif state == FASTQ_SEQUENCE:
+            if mid is None:
+                mid = len(line)/2
+            write_split_sequence(target1, target2, line, mid)
+            state = FASTQ_SEQUENCE_HEADER
         # quality header
-        elif line[0] == '+':
-            target1.write(line)
-            target2.write(line)
+        elif state == FASTQ_SEQUENCE_HEADER:
+            # the sequence header isn't really sequence, but 
+            # we're just passing it through
+            write_sequence(target1, line)
+            write_sequence(target2, line)
+
+            state = FASTQ_QUALITY
         # sequence or quality data
+        elif state == FASTQ_QUALITY:
+            write_split_sequence(target1, target2, line, mid)
+            state = FASTQ_HEADER
         else:
-            line = line.strip()
-            if mid is None:
-                mid = len(line)/2
-            target1.write(line[:mid])
-            target1.write(os.linesep)
-            target2.write(line[mid:])
-            target2.write(os.linesep)
+            raise RuntimeError("Unrecognized STATE in fastq split")
+
+def write_header(target, prefix, line, suffix=''):
+    target.write('@')
+    target.write(prefix)
+    target.write(line[1:])
+    target.write(suffix)
+    target.write(os.linesep)
+
+def write_sequence(target, line):
+    target.write(line)
+    target.write(os.linesep)
+
+def write_split_sequence(target1, target2, line, mid):
+    target1.write(line[:mid])
+    target1.write(os.linesep)
+
+    target2.write(line[mid:])
+    target2.write(os.linesep)
 
 def is_srf(filename):
     """
diff --git a/test/test_srf2fastq.py b/test/test_srf2fastq.py
new file mode 100644 (file)
index 0000000..f29cc70
--- /dev/null
@@ -0,0 +1,119 @@
+import os
+from StringIO import StringIO
+import sys
+import unittest
+
+_module_path, _module_name = os.path.split(__file__)
+sys.path.append(os.path.join(_module_path, '..', 'scripts'))
+
+import srf2named_fastq
+
+class testSrf2Fastq(unittest.TestCase):
+    def test_split_good(self):
+        source = StringIO("""@header
+AGCTTTTT
++
+IIIIB+++
+""")
+        target1 = StringIO()
+        target2 = StringIO()
+
+        srf2named_fastq.convert_single_to_two_fastq(source, target1, target2)
+
+        target1.seek(0)
+        lines1 = target1.readlines()
+        self.failUnlessEqual(len(lines1),4)
+        self.failUnlessEqual(lines1[0].rstrip(), '@header/1')
+        self.failUnlessEqual(lines1[1].rstrip(), 'AGCT')
+        self.failUnlessEqual(lines1[2].rstrip(), '+')
+        self.failUnlessEqual(lines1[3].rstrip(), 'IIII')
+
+        target2.seek(0)
+        lines2 = target2.readlines()
+        self.failUnlessEqual(len(lines2),4)
+        self.failUnlessEqual(lines2[0].rstrip(), '@header/2')
+        self.failUnlessEqual(lines2[1].rstrip(), 'TTTT')
+        self.failUnlessEqual(lines2[2].rstrip(), '+')
+        self.failUnlessEqual(lines2[3].rstrip(), 'B+++')
+
+    def test_split_at_with_header(self):
+        source = StringIO("""@header1
+AGCTTTTT
++
+@IIIB+++
+@header2
+AGCTTTTT
++
+IIIIB+++
+""")
+        target1 = StringIO()
+        target2 = StringIO()
+
+        srf2named_fastq.convert_single_to_two_fastq(source, target1, target2, header="foo_")
+
+        target1.seek(0)
+        lines1 = target1.readlines()
+        self.failUnlessEqual(len(lines1),8)
+        self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1/1')
+        self.failUnlessEqual(lines1[1].rstrip(), 'AGCT')
+        self.failUnlessEqual(lines1[2].rstrip(), '+')
+        self.failUnlessEqual(lines1[3].rstrip(), '@III')
+
+        target2.seek(0)
+        lines2 = target2.readlines()
+        self.failUnlessEqual(len(lines2),8)
+        self.failUnlessEqual(lines2[0].rstrip(), '@foo_header1/2')
+        self.failUnlessEqual(lines2[1].rstrip(), 'TTTT')
+        self.failUnlessEqual(lines2[2].rstrip(), '+')
+        self.failUnlessEqual(lines2[3].rstrip(), 'B+++')
+
+    def test_single_at(self):
+        source = StringIO("""@header1
+AGCTTTTT
++
+@IIIB+++
+@header2
+AGCTTTTT
++
+IIIIB+++
+""")
+        target1 = StringIO()
+
+        srf2named_fastq.convert_single_to_fastq(source, target1)
+
+        target1.seek(0)
+        lines1 = target1.readlines()
+        self.failUnlessEqual(len(lines1),8)
+        self.failUnlessEqual(lines1[0].rstrip(), '@header1')
+        self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT')
+        self.failUnlessEqual(lines1[2].rstrip(), '+')
+        self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++')
+
+    def test_single_at_with_header(self):
+        source = StringIO("""@header1
+AGCTTTTT
++
+@IIIB+++
+@header2
+AGCTTTTT
++
+IIIIB+++
+""")
+        target1 = StringIO()
+
+        srf2named_fastq.convert_single_to_fastq(source, target1, header="foo_")
+
+        target1.seek(0)
+        lines1 = target1.readlines()
+        self.failUnlessEqual(len(lines1),8)
+        self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1')
+        self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT')
+        self.failUnlessEqual(lines1[2].rstrip(), '+')
+        self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++')
+
+
+def suite():
+    return unittest.makeSuite(testSrf2Fastq,'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest="suite")