Add module to convert multiple fastq files into a single file.
authorDiane Trout <diane@caltech.edu>
Tue, 6 Mar 2012 21:32:28 +0000 (13:32 -0800)
committerDiane Trout <diane@caltech.edu>
Tue, 6 Mar 2012 21:32:28 +0000 (13:32 -0800)
The HiSeq likes to produce N seperate fastq files to simplify
parallel processing.

Also I decided to split out the parse_slice code into a
util module since it was being shared between the desplit_fastq
and qseq2fastq.

Finally just for fun I ran these two modules through pep8.py
and pylint.py and cleaned up those errors

htsworkflow/pipelines/desplit_fastq.py [new file with mode: 0644]
htsworkflow/pipelines/qseq2fastq.py
htsworkflow/pipelines/test/test_qseq2fastq.py [deleted file]
htsworkflow/util/conversion.py
htsworkflow/util/test/test_conversion.py [new file with mode: 0644]

diff --git a/htsworkflow/pipelines/desplit_fastq.py b/htsworkflow/pipelines/desplit_fastq.py
new file mode 100644 (file)
index 0000000..85ad586
--- /dev/null
@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+"""Write fastq data from multiple compressed files into a single file
+"""
+
+from glob import glob
+import os
+from optparse import OptionParser
+import sys
+
+from htsworkflow.version import version
+from htsworkflow.util.opener import autoopen
+from htsworkflow.util.conversion import parse_slice
+
+SEQ_HEADER = 0
+SEQUENCE = 1
+QUAL_HEADER = 2
+QUALITY = 3
+INVALID = -1
+
+
+def main(cmdline=None):
+    """Command line driver: [None, 'option', '*.fastq.bz2']
+    """
+    parser = make_parser()
+    opts, args = parser.parse_args(cmdline)
+
+    if opts.version:
+        print (version())
+        return 0
+
+    if opts.output is None:
+        output = open(opts.output, 'w')
+    else:
+        output = sys.stdout
+
+    desplitter = DesplitFastq(file_generator(args), output)
+    desplitter.trim = parse_slice(opts.slice)
+    desplitter.run()
+
+    return 0
+
+
+def make_parser():
+    """Generate an option parser for above main function"""
+
+    usage = '%prog: [options] *.fastq.gz'
+    parser = OptionParser(usage)
+
+    parser.add_option('-o', '--output', default=None,
+                      help='output fastq file')
+    parser.add_option('-s', '--slice',
+                      help="specify python slice, e.g. 0:75, 0:-1",
+                      default=None)
+    parser.add_option("--version", default=False, action="store_true",
+                      help="report software version")
+    return parser
+
+
+def file_generator(pattern_list):
+    """Given a list of glob patterns return decompressed streams
+    """
+    for pattern in pattern_list:
+        for filename in glob(pattern):
+            yield autoopen(filename, 'r')
+
+
+class DesplitFastq(object):
+    """Merge multiple fastq files into a single file"""
+    def __init__(self, sources, destination):
+        self.sources = sources
+        self.destination = destination
+
+        self.making_fastq = True
+        self.trim = slice(None)
+
+    def run(self):
+        """Do the conversion
+
+        This is here so we can run via threading/multiprocessing APIs
+        """
+        state = SEQ_HEADER
+        for stream in self.sources:
+            for line in stream:
+                line = line.rstrip()
+                if state == SEQ_HEADER:
+                    self.destination.write(line)
+                    state = SEQUENCE
+                elif state == SEQUENCE:
+                    self.destination.write(line[self.trim])
+                    state = QUAL_HEADER
+                elif state == QUAL_HEADER:
+                    self.destination.write(line)
+                    state = QUALITY
+                elif state == QUALITY:
+                    self.destination.write(line[self.trim])
+                    state = SEQ_HEADER
+                self.destination.write(os.linesep)
+
+
+if __name__ == "__main__":
+    main()
index ac44c1edde131f198e5d02f9d64a644bcdb00f68..2f017eb2dc0f4817448b4bae14eebd5b2657772f 100644 (file)
@@ -1,4 +1,6 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
+"""Convert a collection of qseq or a tar file of qseq files to a fastq file
+"""
 from glob import glob
 import os
 from optparse import OptionParser
 from glob import glob
 import os
 from optparse import OptionParser
@@ -7,22 +9,26 @@ import sys
 import tarfile
 
 from htsworkflow.version import version
 import tarfile
 
 from htsworkflow.version import version
+from htsworkflow.util.conversion import parse_slice
+
 
 def main(cmdline=None):
 
 def main(cmdline=None):
+    """Command line driver: [None, '-i', 'tarfile', '-o', 'target.fastq']
+    """
     parser = make_parser()
     opts, args = parser.parse_args(cmdline)
 
     if opts.version:
         print version()
         return 0
     parser = make_parser()
     opts, args = parser.parse_args(cmdline)
 
     if opts.version:
         print version()
         return 0
-    
+
     if opts.infile is not None:
         qseq_generator = tarfile_generator(opts.infile)
     elif len(args) > 0:
         qseq_generator = file_generator(args)
     else:
         qseq_generator = [sys.stdin]
     if opts.infile is not None:
         qseq_generator = tarfile_generator(opts.infile)
     elif len(args) > 0:
         qseq_generator = file_generator(args)
     else:
         qseq_generator = [sys.stdin]
-            
+
     if opts.output is not None:
         output = open(opts.output, 'w')
     else:
     if opts.output is not None:
         output = open(opts.output, 'w')
     else:
@@ -43,19 +49,21 @@ def main(cmdline=None):
 
 
 def make_parser():
 
 
 def make_parser():
+    """Return option parser"""
     usage = "%prog: [options] *_qseq.txt\nProduces Phred33 files by default"
     parser = OptionParser(usage)
     parser.add_option("-a", "--fasta", default=False, action="store_true",
                       help="produce fasta files instead of fastq files")
     usage = "%prog: [options] *_qseq.txt\nProduces Phred33 files by default"
     parser = OptionParser(usage)
     parser.add_option("-a", "--fasta", default=False, action="store_true",
                       help="produce fasta files instead of fastq files")
-    parser.add_option("-f", "--flowcell", default=None, 
+    parser.add_option("-f", "--flowcell", default=None,
                       help="Set flowcell ID for output file")
     parser.add_option("-i", "--infile", default=None,
                       help="Set flowcell ID for output file")
     parser.add_option("-i", "--infile", default=None,
-      help='source tar file (if reading from an archive instead of a directory)')
+                      help='source tar file (if reading from an archive '\
+                           'instead of a directory)')
     parser.add_option("-o", "--output", default=None,
                       help="output fastq file")
     parser.add_option("-n", "--nopass-output", default=None,
     parser.add_option("-o", "--output", default=None,
                       help="output fastq file")
     parser.add_option("-n", "--nopass-output", default=None,
-                      help="if provided send files that failed illumina filter "\
-                           "to a differentfile")
+                      help="if provided send files that failed "\
+                           "illumina filter to a differentfile")
     parser.add_option("-s", "--slice",
                       help="specify python slice, e.g. 0:75, 0:-1",
                       default=None)
     parser.add_option("-s", "--slice",
                       help="specify python slice, e.g. 0:75, 0:-1",
                       default=None)
@@ -63,64 +71,24 @@ def make_parser():
                       action="store_true")
     parser.add_option("--version", default=False, action="store_true",
                       help="report software version")
                       action="store_true")
     parser.add_option("--version", default=False, action="store_true",
                       help="report software version")
-    
-    return parser
-
-            
-def file_generator(pattern_list):
-    for pattern in pattern_list:
-        for filename in glob(pattern):
-            yield open(filename,"r")
-
-
-def tarfile_generator(tarfilename):
-    archive = tarfile.open(tarfilename,'r|*')
-    for tarinfo in archive:
-        yield archive.extractfile(tarinfo)
 
 
+    return parser
 
 
-def parse_slice(slice_text):
-    if slice_text is None or len(slice_text) == 0:
-        return slice(None)
-        
-    slice_data = []
-    for element in slice_text.split(':'):
-        if len(element) == 0:
-            element = None
-        else:
-            element = int(element)
-        slice_data.append(element)
-
-    return slice(*slice_data)
 
 
-            
 def file_generator(pattern_list):
 def file_generator(pattern_list):
+    """Given a list of glob patterns yield open streams for matching files"""
     for pattern in pattern_list:
         for filename in glob(pattern):
     for pattern in pattern_list:
         for filename in glob(pattern):
-            yield open(filename,"r")
+            yield open(filename, "r")
 
 
 def tarfile_generator(tarfilename):
 
 
 def tarfile_generator(tarfilename):
-    archive = tarfile.open(tarfilename,'r|*')
+    """Yield open streams for files inside a tarfile"""
+    archive = tarfile.open(tarfilename, 'r|*')
     for tarinfo in archive:
         yield archive.extractfile(tarinfo)
 
 
     for tarinfo in archive:
         yield archive.extractfile(tarinfo)
 
 
-def parse_slice(slice_text):
-    if slice_text is None or len(slice_text) == 0:
-        return slice(None)
-        
-    slice_data = []
-    for element in slice_text.split(':'):
-        if len(element) == 0:
-            element = None
-        else:
-            element = int(element)
-        slice_data.append(element)
-
-    return slice(*slice_data)
-
-
 class Qseq2Fastq(object):
     """
     Convert qseq files to fastq (or fasta) files.
 class Qseq2Fastq(object):
     """
     Convert qseq files to fastq (or fasta) files.
@@ -132,47 +100,33 @@ class Qseq2Fastq(object):
             self.nopass_destination = nopass_destination
         else:
             self.nopass_destination = pass_destination
             self.nopass_destination = nopass_destination
         else:
             self.nopass_destination = pass_destination
-        
+
         self.fastq = True
         self.flowcell_id = None
         self.trim = slice(None)
         self.fastq = True
         self.flowcell_id = None
         self.trim = slice(None)
-        self.reportFilter = False
+        self.report_filter = False
 
     def _format_flowcell_id(self):
         """
         Return formatted flowcell ID
         """
         if self.flowcell_id is not None:
 
     def _format_flowcell_id(self):
         """
         Return formatted flowcell ID
         """
         if self.flowcell_id is not None:
-            return self.flowcell_id+"_"
+            return self.flowcell_id + "_"
         else:
             return ""
         else:
             return ""
-    
-    def _convert_illumina_quality(self, illumina_quality):
-        """
-        Convert an Illumina ASCII encoded quality score to a Phred ASCII quality score.
-        """
-        # Illumina scores are Phred + 64
-        # Fastq scores are Phread + 33
-        # the following code grabs the string, converts to short ints and
-        # subtracts 31 (64-33) to convert between the two score formats.
-        # The numpy solution is twice as fast as some of my other
-        # ideas for the conversion.
-        # sorry about the uglyness in changing from character, to 8-bit int
-        # and back to a character array
-        quality = numpy.asarray(illumina_quality,'c')
-        quality.dtype = numpy.uint8
-        quality -= 31
-        quality.dtype = '|S1' # I'd like to know what the real numpy char type is
-        return quality
-        
+
     def run(self):
     def run(self):
+        """Run conversion
+        (Used to match threading/multiprocessing API)
+        """
         if self.fastq:
         if self.fastq:
-            header_template = '@' 
+            header_template = '@'
         else:
             # fasta case
             header_template = '>'
         else:
             # fasta case
             header_template = '>'
-        header_template += self._format_flowcell_id() + '%s_%s:%s:%s:%s:%s/%s%s%s'
-        
+        header_template += self._format_flowcell_id() + \
+                           '%s_%s:%s:%s:%s:%s/%s%s%s'
+
         for qstream in self.sources:
             for line in qstream:
                 # parse line
         for qstream in self.sources:
             for line in qstream:
                 # parse line
@@ -183,19 +137,19 @@ class Qseq2Fastq(object):
                 tile = record[3]
                 x = record[4]
                 y = record[5]
                 tile = record[3]
                 x = record[4]
                 y = record[5]
-                index = record[6]
+                #index = record[6]
                 read = record[7]
                 read = record[7]
-                sequence = record[8].replace('.','N')
-                quality = self._convert_illumina_quality(record[9])
+                sequence = record[8].replace('.', 'N')
+                quality = convert_illumina_quality(record[9])
 
                 # add pass qc filter if we want it
                 pass_qc = int(record[10])
 
                 # add pass qc filter if we want it
                 pass_qc = int(record[10])
-                if self.reportFilter:
+                if self.report_filter:
                     pass_qc_msg = " pf=%s" % (pass_qc)
                 else:
                     pass_qc_msg = ""
 
                     pass_qc_msg = " pf=%s" % (pass_qc)
                 else:
                     pass_qc_msg = ""
 
-                header = header_template  % ( \
+                header = header_template % ( \
                     machine_name,
                     run_number,
                     lane_number,
                     machine_name,
                     run_number,
                     lane_number,
@@ -206,13 +160,12 @@ class Qseq2Fastq(object):
                     pass_qc_msg,
                     os.linesep)
 
                     pass_qc_msg,
                     os.linesep)
 
-
                 # if we passed the filter write to the "good" file
                 if pass_qc:
                     destination = self.pass_destination
                 else:
                     destination = self.nopass_destination
                 # if we passed the filter write to the "good" file
                 if pass_qc:
                     destination = self.pass_destination
                 else:
                     destination = self.nopass_destination
-                
+
                 destination.write(header)
                 destination.write(sequence[self.trim])
                 destination.write(os.linesep)
                 destination.write(header)
                 destination.write(sequence[self.trim])
                 destination.write(os.linesep)
@@ -222,5 +175,24 @@ class Qseq2Fastq(object):
                     destination.write(quality[self.trim].tostring())
                     destination.write(os.linesep)
 
                     destination.write(quality[self.trim].tostring())
                     destination.write(os.linesep)
 
+def convert_illumina_quality(illumina_quality):
+    """Convert an Illumina quality score to a Phred ASCII quality score.
+    """
+    # Illumina scores are Phred + 64
+    # Fastq scores are Phread + 33
+    # the following code grabs the string, converts to short ints and
+    # subtracts 31 (64-33) to convert between the two score formats.
+    # The numpy solution is twice as fast as some of my other
+    # ideas for the conversion.
+    # sorry about the uglyness in changing from character, to 8-bit int
+    # and back to a character array
+    quality = numpy.asarray(illumina_quality, 'c')
+    quality.dtype = numpy.uint8
+    quality -= 31
+     # I'd like to know what the real numpy char type is
+    quality.dtype = '|S1'
+    return quality
+
+
 if __name__ == "__main__":
     main()
 if __name__ == "__main__":
     main()
diff --git a/htsworkflow/pipelines/test/test_qseq2fastq.py b/htsworkflow/pipelines/test/test_qseq2fastq.py
deleted file mode 100644 (file)
index 1c32924..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/usr/bin/env python
-
-import unittest
-
-import htsworkflow.pipelines.qseq2fastq as qseq2fastq
-
-class TestQseq2Fastq(unittest.TestCase):
-    def test_parse_slice(self):
-        s = qseq2fastq.parse_slice("1:")
-        self.failUnlessEqual(s.start, 1)
-        self.failUnlessEqual(s.stop, None)
-
-        s = qseq2fastq.parse_slice("0:2")
-        self.failUnlessEqual(s.start, 0)
-        self.failUnlessEqual(s.stop, 2)
-
-def suite():
-    return unittest.makeSuite(TestQseq2Fastq, 'test')
-
-if __name__ == "__main__":
-    unittest.main(defaultTest="suite")
-    
index 18cabef485aaedeafc7f0f1d16761c7c5937b6ea..d3eb4f6cc5dad43f619ca8db662ed6abbcc402a2 100644 (file)
@@ -14,7 +14,7 @@ def unicode_or_none(value):
 def parse_flowcell_id(flowcell_id):
     """
     Return flowcell id and any status encoded in the id
 def parse_flowcell_id(flowcell_id):
     """
     Return flowcell id and any status encoded in the id
-  
+
     We stored the status information in the flowcell id name.
     this was dumb, but database schemas are hard to update.
     """
     We stored the status information in the flowcell id name.
     this was dumb, but database schemas are hard to update.
     """
@@ -26,4 +26,19 @@ def parse_flowcell_id(flowcell_id):
     if len(fields) > 1:
         status = fields[1]
     return fcid, status
     if len(fields) > 1:
         status = fields[1]
     return fcid, status
-    
+
+def parse_slice(slice_text):
+    if slice_text is None or len(slice_text) == 0:
+        return slice(None)
+
+    slice_data = []
+    for element in slice_text.split(':'):
+        if len(element) == 0:
+            element = None
+        else:
+            element = int(element)
+        slice_data.append(element)
+
+    return slice(*slice_data)
+
+
diff --git a/htsworkflow/util/test/test_conversion.py b/htsworkflow/util/test/test_conversion.py
new file mode 100644 (file)
index 0000000..9100625
--- /dev/null
@@ -0,0 +1,22 @@
+#!/usr/bin/env python
+
+import unittest
+
+from htsworkflow.util import conversion
+
+class TestConversion(unittest.TestCase):
+    def test_parse_slice(self):
+        s = conversion.parse_slice("1:")
+        self.failUnlessEqual(s.start, 1)
+        self.failUnlessEqual(s.stop, None)
+
+        s = conversion.parse_slice("0:2")
+        self.failUnlessEqual(s.start, 0)
+        self.failUnlessEqual(s.stop, 2)
+
+def suite():
+    return unittest.makeSuite(TestConversion, 'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest="suite")
+