From: Diane Trout Date: Wed, 7 Jul 2010 00:19:37 +0000 (+0000) Subject: If a quality score started with an @ sign it was treated as a header X-Git-Tag: 0.4.4~11 X-Git-Url: http://woldlab.caltech.edu/gitweb/?p=htsworkflow.git;a=commitdiff_plain;h=d15ee402e05feb4e8782e6347d661128d5f6e0e4 If a quality score started with an @ sign it was treated as a header which created an invalid fastq file. This patch fixes that, and introduces some test cases for srf2named_fastq.py --- diff --git a/scripts/srf2named_fastq.py b/scripts/srf2named_fastq.py index 22edc12..ebf3323 100755 --- a/scripts/srf2named_fastq.py +++ b/scripts/srf2named_fastq.py @@ -7,6 +7,12 @@ import sys from htsworkflow.util.opener import autoopen +# constants for our fastq finite state machine +FASTQ_HEADER = 0 +FASTQ_SEQUENCE = 1 +FASTQ_SEQUENCE_HEADER = 2 +FASTQ_QUALITY = 3 + def main(cmdline=None): parser = make_parser() @@ -85,55 +91,90 @@ def srf_open(filename, cnf1=False): def convert_single_to_fastq(instream, target1, header=''): + + state = FASTQ_HEADER for line in instream: + line = line.strip() # sequence header - if line[0] == '@': - line = line.strip() - target1.write('@') - target1.write(header) - target1.write(line[1:]) - target1.write(os.linesep) - + if state == FASTQ_HEADER: + write_header(target1, header, line) + state = FASTQ_SEQUENCE + # sequence + elif state == FASTQ_SEQUENCE: + write_sequence(target1, line) + state = FASTQ_SEQUENCE_HEADER # quality header - elif line[0] == '+': - target1.write(line) + elif state == FASTQ_SEQUENCE_HEADER: + # the sequence header isn't really sequence, but + # we're just passing it through + write_sequence(target1, line) + state = FASTQ_QUALITY # sequence or quality data + elif state == FASTQ_QUALITY: + write_sequence(target1, line) + state = FASTQ_HEADER else: - target1.write(line) + raise RuntimeError("Unrecognized STATE in fastq split") + + def convert_single_to_two_fastq(instream, target1, target2, mid=None, header=''): + """ + read a fastq file where two paired ends have been run together into + two halves. + + instream is the source stream + target1 and target2 are the destination streams + """ if mid is not None: mid = int(mid) + state = FASTQ_HEADER for line in instream: + line = line.strip() # sequence header - if line[0] == '@': - line = line.strip() - target1.write('@') - target1.write(header) - target1.write(line[1:]) - target1.write("/1") - target1.write(os.linesep) - - target2.write('@') - target2.write(header) - target2.write(line[1:]) - target2.write("/2") - target2.write(os.linesep) - + if state == FASTQ_HEADER: + write_header(target1, header, line, "/1") + write_header(target2, header, line, "/2") + state = FASTQ_SEQUENCE + # sequence + elif state == FASTQ_SEQUENCE: + if mid is None: + mid = len(line)/2 + write_split_sequence(target1, target2, line, mid) + state = FASTQ_SEQUENCE_HEADER # quality header - elif line[0] == '+': - target1.write(line) - target2.write(line) + elif state == FASTQ_SEQUENCE_HEADER: + # the sequence header isn't really sequence, but + # we're just passing it through + write_sequence(target1, line) + write_sequence(target2, line) + + state = FASTQ_QUALITY # sequence or quality data + elif state == FASTQ_QUALITY: + write_split_sequence(target1, target2, line, mid) + state = FASTQ_HEADER else: - line = line.strip() - if mid is None: - mid = len(line)/2 - target1.write(line[:mid]) - target1.write(os.linesep) - target2.write(line[mid:]) - target2.write(os.linesep) + raise RuntimeError("Unrecognized STATE in fastq split") + +def write_header(target, prefix, line, suffix=''): + target.write('@') + target.write(prefix) + target.write(line[1:]) + target.write(suffix) + target.write(os.linesep) + +def write_sequence(target, line): + target.write(line) + target.write(os.linesep) + +def write_split_sequence(target1, target2, line, mid): + target1.write(line[:mid]) + target1.write(os.linesep) + + target2.write(line[mid:]) + target2.write(os.linesep) def is_srf(filename): """ diff --git a/test/test_srf2fastq.py b/test/test_srf2fastq.py new file mode 100644 index 0000000..f29cc70 --- /dev/null +++ b/test/test_srf2fastq.py @@ -0,0 +1,119 @@ +import os +from StringIO import StringIO +import sys +import unittest + +_module_path, _module_name = os.path.split(__file__) +sys.path.append(os.path.join(_module_path, '..', 'scripts')) + +import srf2named_fastq + +class testSrf2Fastq(unittest.TestCase): + def test_split_good(self): + source = StringIO("""@header +AGCTTTTT ++ +IIIIB+++ +""") + target1 = StringIO() + target2 = StringIO() + + srf2named_fastq.convert_single_to_two_fastq(source, target1, target2) + + target1.seek(0) + lines1 = target1.readlines() + self.failUnlessEqual(len(lines1),4) + self.failUnlessEqual(lines1[0].rstrip(), '@header/1') + self.failUnlessEqual(lines1[1].rstrip(), 'AGCT') + self.failUnlessEqual(lines1[2].rstrip(), '+') + self.failUnlessEqual(lines1[3].rstrip(), 'IIII') + + target2.seek(0) + lines2 = target2.readlines() + self.failUnlessEqual(len(lines2),4) + self.failUnlessEqual(lines2[0].rstrip(), '@header/2') + self.failUnlessEqual(lines2[1].rstrip(), 'TTTT') + self.failUnlessEqual(lines2[2].rstrip(), '+') + self.failUnlessEqual(lines2[3].rstrip(), 'B+++') + + def test_split_at_with_header(self): + source = StringIO("""@header1 +AGCTTTTT ++ +@IIIB+++ +@header2 +AGCTTTTT ++ +IIIIB+++ +""") + target1 = StringIO() + target2 = StringIO() + + srf2named_fastq.convert_single_to_two_fastq(source, target1, target2, header="foo_") + + target1.seek(0) + lines1 = target1.readlines() + self.failUnlessEqual(len(lines1),8) + self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1/1') + self.failUnlessEqual(lines1[1].rstrip(), 'AGCT') + self.failUnlessEqual(lines1[2].rstrip(), '+') + self.failUnlessEqual(lines1[3].rstrip(), '@III') + + target2.seek(0) + lines2 = target2.readlines() + self.failUnlessEqual(len(lines2),8) + self.failUnlessEqual(lines2[0].rstrip(), '@foo_header1/2') + self.failUnlessEqual(lines2[1].rstrip(), 'TTTT') + self.failUnlessEqual(lines2[2].rstrip(), '+') + self.failUnlessEqual(lines2[3].rstrip(), 'B+++') + + def test_single_at(self): + source = StringIO("""@header1 +AGCTTTTT ++ +@IIIB+++ +@header2 +AGCTTTTT ++ +IIIIB+++ +""") + target1 = StringIO() + + srf2named_fastq.convert_single_to_fastq(source, target1) + + target1.seek(0) + lines1 = target1.readlines() + self.failUnlessEqual(len(lines1),8) + self.failUnlessEqual(lines1[0].rstrip(), '@header1') + self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT') + self.failUnlessEqual(lines1[2].rstrip(), '+') + self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++') + + def test_single_at_with_header(self): + source = StringIO("""@header1 +AGCTTTTT ++ +@IIIB+++ +@header2 +AGCTTTTT ++ +IIIIB+++ +""") + target1 = StringIO() + + srf2named_fastq.convert_single_to_fastq(source, target1, header="foo_") + + target1.seek(0) + lines1 = target1.readlines() + self.failUnlessEqual(len(lines1),8) + self.failUnlessEqual(lines1[0].rstrip(), '@foo_header1') + self.failUnlessEqual(lines1[1].rstrip(), 'AGCTTTTT') + self.failUnlessEqual(lines1[2].rstrip(), '+') + self.failUnlessEqual(lines1[3].rstrip(), '@IIIB+++') + + +def suite(): + return unittest.makeSuite(testSrf2Fastq,'test') + +if __name__ == "__main__": + unittest.main(defaultTest="suite")