Add module to convert multiple fastq files into a single file.
authorDiane Trout <diane@caltech.edu>
Tue, 6 Mar 2012 21:32:28 +0000 (13:32 -0800)
committerDiane Trout <diane@caltech.edu>
Tue, 6 Mar 2012 21:32:28 +0000 (13:32 -0800)
The HiSeq likes to produce N seperate fastq files to simplify
parallel processing.

Also I decided to split out the parse_slice code into a
util module since it was being shared between the desplit_fastq
and qseq2fastq.

Finally just for fun I ran these two modules through pep8.py
and pylint.py and cleaned up those errors

htsworkflow/pipelines/desplit_fastq.py [new file with mode: 0644]
htsworkflow/pipelines/qseq2fastq.py
htsworkflow/pipelines/test/test_qseq2fastq.py [deleted file]
htsworkflow/util/conversion.py
htsworkflow/util/test/test_conversion.py [new file with mode: 0644]

diff --git a/htsworkflow/pipelines/desplit_fastq.py b/htsworkflow/pipelines/desplit_fastq.py
new file mode 100644 (file)
index 0000000..85ad586
--- /dev/null
@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+"""Write fastq data from multiple compressed files into a single file
+"""
+
+from glob import glob
+import os
+from optparse import OptionParser
+import sys
+
+from htsworkflow.version import version
+from htsworkflow.util.opener import autoopen
+from htsworkflow.util.conversion import parse_slice
+
+SEQ_HEADER = 0
+SEQUENCE = 1
+QUAL_HEADER = 2
+QUALITY = 3
+INVALID = -1
+
+
+def main(cmdline=None):
+    """Command line driver: [None, 'option', '*.fastq.bz2']
+    """
+    parser = make_parser()
+    opts, args = parser.parse_args(cmdline)
+
+    if opts.version:
+        print (version())
+        return 0
+
+    if opts.output is None:
+        output = open(opts.output, 'w')
+    else:
+        output = sys.stdout
+
+    desplitter = DesplitFastq(file_generator(args), output)
+    desplitter.trim = parse_slice(opts.slice)
+    desplitter.run()
+
+    return 0
+
+
+def make_parser():
+    """Generate an option parser for above main function"""
+
+    usage = '%prog: [options] *.fastq.gz'
+    parser = OptionParser(usage)
+
+    parser.add_option('-o', '--output', default=None,
+                      help='output fastq file')
+    parser.add_option('-s', '--slice',
+                      help="specify python slice, e.g. 0:75, 0:-1",
+                      default=None)
+    parser.add_option("--version", default=False, action="store_true",
+                      help="report software version")
+    return parser
+
+
+def file_generator(pattern_list):
+    """Given a list of glob patterns return decompressed streams
+    """
+    for pattern in pattern_list:
+        for filename in glob(pattern):
+            yield autoopen(filename, 'r')
+
+
+class DesplitFastq(object):
+    """Merge multiple fastq files into a single file"""
+    def __init__(self, sources, destination):
+        self.sources = sources
+        self.destination = destination
+
+        self.making_fastq = True
+        self.trim = slice(None)
+
+    def run(self):
+        """Do the conversion
+
+        This is here so we can run via threading/multiprocessing APIs
+        """
+        state = SEQ_HEADER
+        for stream in self.sources:
+            for line in stream:
+                line = line.rstrip()
+                if state == SEQ_HEADER:
+                    self.destination.write(line)
+                    state = SEQUENCE
+                elif state == SEQUENCE:
+                    self.destination.write(line[self.trim])
+                    state = QUAL_HEADER
+                elif state == QUAL_HEADER:
+                    self.destination.write(line)
+                    state = QUALITY
+                elif state == QUALITY:
+                    self.destination.write(line[self.trim])
+                    state = SEQ_HEADER
+                self.destination.write(os.linesep)
+
+
+if __name__ == "__main__":
+    main()
index ac44c1edde131f198e5d02f9d64a644bcdb00f68..2f017eb2dc0f4817448b4bae14eebd5b2657772f 100644 (file)
@@ -1,4 +1,6 @@
 #!/usr/bin/env python
+"""Convert a collection of qseq or a tar file of qseq files to a fastq file
+"""
 from glob import glob
 import os
 from optparse import OptionParser
@@ -7,22 +9,26 @@ import sys
 import tarfile
 
 from htsworkflow.version import version
+from htsworkflow.util.conversion import parse_slice
+
 
 def main(cmdline=None):
+    """Command line driver: [None, '-i', 'tarfile', '-o', 'target.fastq']
+    """
     parser = make_parser()
     opts, args = parser.parse_args(cmdline)
 
     if opts.version:
         print version()
         return 0
-    
+
     if opts.infile is not None:
         qseq_generator = tarfile_generator(opts.infile)
     elif len(args) > 0:
         qseq_generator = file_generator(args)
     else:
         qseq_generator = [sys.stdin]
-            
+
     if opts.output is not None:
         output = open(opts.output, 'w')
     else:
@@ -43,19 +49,21 @@ def main(cmdline=None):
 
 
 def make_parser():
+    """Return option parser"""
     usage = "%prog: [options] *_qseq.txt\nProduces Phred33 files by default"
     parser = OptionParser(usage)
     parser.add_option("-a", "--fasta", default=False, action="store_true",
                       help="produce fasta files instead of fastq files")
-    parser.add_option("-f", "--flowcell", default=None, 
+    parser.add_option("-f", "--flowcell", default=None,
                       help="Set flowcell ID for output file")
     parser.add_option("-i", "--infile", default=None,
-      help='source tar file (if reading from an archive instead of a directory)')
+                      help='source tar file (if reading from an archive '\
+                           'instead of a directory)')
     parser.add_option("-o", "--output", default=None,
                       help="output fastq file")
     parser.add_option("-n", "--nopass-output", default=None,
-                      help="if provided send files that failed illumina filter "\
-                           "to a differentfile")
+                      help="if provided send files that failed "\
+                           "illumina filter to a differentfile")
     parser.add_option("-s", "--slice",
                       help="specify python slice, e.g. 0:75, 0:-1",
                       default=None)
@@ -63,64 +71,24 @@ def make_parser():
                       action="store_true")
     parser.add_option("--version", default=False, action="store_true",
                       help="report software version")
-    
-    return parser
-
-            
-def file_generator(pattern_list):
-    for pattern in pattern_list:
-        for filename in glob(pattern):
-            yield open(filename,"r")
-
-
-def tarfile_generator(tarfilename):
-    archive = tarfile.open(tarfilename,'r|*')
-    for tarinfo in archive:
-        yield archive.extractfile(tarinfo)
 
+    return parser
 
-def parse_slice(slice_text):
-    if slice_text is None or len(slice_text) == 0:
-        return slice(None)
-        
-    slice_data = []
-    for element in slice_text.split(':'):
-        if len(element) == 0:
-            element = None
-        else:
-            element = int(element)
-        slice_data.append(element)
-
-    return slice(*slice_data)
 
-            
 def file_generator(pattern_list):
+    """Given a list of glob patterns yield open streams for matching files"""
     for pattern in pattern_list:
         for filename in glob(pattern):
-            yield open(filename,"r")
+            yield open(filename, "r")
 
 
 def tarfile_generator(tarfilename):
-    archive = tarfile.open(tarfilename,'r|*')
+    """Yield open streams for files inside a tarfile"""
+    archive = tarfile.open(tarfilename, 'r|*')
     for tarinfo in archive:
         yield archive.extractfile(tarinfo)
 
 
-def parse_slice(slice_text):
-    if slice_text is None or len(slice_text) == 0:
-        return slice(None)
-        
-    slice_data = []
-    for element in slice_text.split(':'):
-        if len(element) == 0:
-            element = None
-        else:
-            element = int(element)
-        slice_data.append(element)
-
-    return slice(*slice_data)
-
-
 class Qseq2Fastq(object):
     """
     Convert qseq files to fastq (or fasta) files.
@@ -132,47 +100,33 @@ class Qseq2Fastq(object):
             self.nopass_destination = nopass_destination
         else:
             self.nopass_destination = pass_destination
-        
+
         self.fastq = True
         self.flowcell_id = None
         self.trim = slice(None)
-        self.reportFilter = False
+        self.report_filter = False
 
     def _format_flowcell_id(self):
         """
         Return formatted flowcell ID
         """
         if self.flowcell_id is not None:
-            return self.flowcell_id+"_"
+            return self.flowcell_id + "_"
         else:
             return ""
-    
-    def _convert_illumina_quality(self, illumina_quality):
-        """
-        Convert an Illumina ASCII encoded quality score to a Phred ASCII quality score.
-        """
-        # Illumina scores are Phred + 64
-        # Fastq scores are Phread + 33
-        # the following code grabs the string, converts to short ints and
-        # subtracts 31 (64-33) to convert between the two score formats.
-        # The numpy solution is twice as fast as some of my other
-        # ideas for the conversion.
-        # sorry about the uglyness in changing from character, to 8-bit int
-        # and back to a character array
-        quality = numpy.asarray(illumina_quality,'c')
-        quality.dtype = numpy.uint8
-        quality -= 31
-        quality.dtype = '|S1' # I'd like to know what the real numpy char type is
-        return quality
-        
+
     def run(self):
+        """Run conversion
+        (Used to match threading/multiprocessing API)
+        """
         if self.fastq:
-            header_template = '@' 
+            header_template = '@'
         else:
             # fasta case
             header_template = '>'
-        header_template += self._format_flowcell_id() + '%s_%s:%s:%s:%s:%s/%s%s%s'
-        
+        header_template += self._format_flowcell_id() + \
+                           '%s_%s:%s:%s:%s:%s/%s%s%s'
+
         for qstream in self.sources:
             for line in qstream:
                 # parse line
@@ -183,19 +137,19 @@ class Qseq2Fastq(object):
                 tile = record[3]
                 x = record[4]
                 y = record[5]
-                index = record[6]
+                #index = record[6]
                 read = record[7]
-                sequence = record[8].replace('.','N')
-                quality = self._convert_illumina_quality(record[9])
+                sequence = record[8].replace('.', 'N')
+                quality = convert_illumina_quality(record[9])
 
                 # add pass qc filter if we want it
                 pass_qc = int(record[10])
-                if self.reportFilter:
+                if self.report_filter:
                     pass_qc_msg = " pf=%s" % (pass_qc)
                 else:
                     pass_qc_msg = ""
 
-                header = header_template  % ( \
+                header = header_template % ( \
                     machine_name,
                     run_number,
                     lane_number,
@@ -206,13 +160,12 @@ class Qseq2Fastq(object):
                     pass_qc_msg,
                     os.linesep)
 
-
                 # if we passed the filter write to the "good" file
                 if pass_qc:
                     destination = self.pass_destination
                 else:
                     destination = self.nopass_destination
-                
+
                 destination.write(header)
                 destination.write(sequence[self.trim])
                 destination.write(os.linesep)
@@ -222,5 +175,24 @@ class Qseq2Fastq(object):
                     destination.write(quality[self.trim].tostring())
                     destination.write(os.linesep)
 
+def convert_illumina_quality(illumina_quality):
+    """Convert an Illumina quality score to a Phred ASCII quality score.
+    """
+    # Illumina scores are Phred + 64
+    # Fastq scores are Phread + 33
+    # the following code grabs the string, converts to short ints and
+    # subtracts 31 (64-33) to convert between the two score formats.
+    # The numpy solution is twice as fast as some of my other
+    # ideas for the conversion.
+    # sorry about the uglyness in changing from character, to 8-bit int
+    # and back to a character array
+    quality = numpy.asarray(illumina_quality, 'c')
+    quality.dtype = numpy.uint8
+    quality -= 31
+     # I'd like to know what the real numpy char type is
+    quality.dtype = '|S1'
+    return quality
+
+
 if __name__ == "__main__":
     main()
diff --git a/htsworkflow/pipelines/test/test_qseq2fastq.py b/htsworkflow/pipelines/test/test_qseq2fastq.py
deleted file mode 100644 (file)
index 1c32924..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/usr/bin/env python
-
-import unittest
-
-import htsworkflow.pipelines.qseq2fastq as qseq2fastq
-
-class TestQseq2Fastq(unittest.TestCase):
-    def test_parse_slice(self):
-        s = qseq2fastq.parse_slice("1:")
-        self.failUnlessEqual(s.start, 1)
-        self.failUnlessEqual(s.stop, None)
-
-        s = qseq2fastq.parse_slice("0:2")
-        self.failUnlessEqual(s.start, 0)
-        self.failUnlessEqual(s.stop, 2)
-
-def suite():
-    return unittest.makeSuite(TestQseq2Fastq, 'test')
-
-if __name__ == "__main__":
-    unittest.main(defaultTest="suite")
-    
index 18cabef485aaedeafc7f0f1d16761c7c5937b6ea..d3eb4f6cc5dad43f619ca8db662ed6abbcc402a2 100644 (file)
@@ -14,7 +14,7 @@ def unicode_or_none(value):
 def parse_flowcell_id(flowcell_id):
     """
     Return flowcell id and any status encoded in the id
-  
+
     We stored the status information in the flowcell id name.
     this was dumb, but database schemas are hard to update.
     """
@@ -26,4 +26,19 @@ def parse_flowcell_id(flowcell_id):
     if len(fields) > 1:
         status = fields[1]
     return fcid, status
-    
+
+def parse_slice(slice_text):
+    if slice_text is None or len(slice_text) == 0:
+        return slice(None)
+
+    slice_data = []
+    for element in slice_text.split(':'):
+        if len(element) == 0:
+            element = None
+        else:
+            element = int(element)
+        slice_data.append(element)
+
+    return slice(*slice_data)
+
+
diff --git a/htsworkflow/util/test/test_conversion.py b/htsworkflow/util/test/test_conversion.py
new file mode 100644 (file)
index 0000000..9100625
--- /dev/null
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+
+import unittest
+
+from htsworkflow.util import conversion
+
+class TestConversion(unittest.TestCase):
+    def test_parse_slice(self):
+        s = conversion.parse_slice("1:")
+        self.failUnlessEqual(s.start, 1)
+        self.failUnlessEqual(s.stop, None)
+
+        s = conversion.parse_slice("0:2")
+        self.failUnlessEqual(s.start, 0)
+        self.failUnlessEqual(s.stop, 2)
+
+def suite():
+    return unittest.makeSuite(TestConversion, 'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest="suite")
+