From 144a9cb83097965e05ac5e0718791b0855700697 Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Tue, 6 Mar 2012 13:32:28 -0800 Subject: [PATCH] Add module to convert multiple fastq files into a single file. The HiSeq likes to produce N seperate fastq files to simplify parallel processing. Also I decided to split out the parse_slice code into a util module since it was being shared between the desplit_fastq and qseq2fastq. Finally just for fun I ran these two modules through pep8.py and pylint.py and cleaned up those errors --- htsworkflow/pipelines/desplit_fastq.py | 101 +++++++++++++ htsworkflow/pipelines/qseq2fastq.py | 138 +++++++----------- htsworkflow/util/conversion.py | 19 ++- .../test/test_conversion.py} | 12 +- 4 files changed, 179 insertions(+), 91 deletions(-) create mode 100644 htsworkflow/pipelines/desplit_fastq.py rename htsworkflow/{pipelines/test/test_qseq2fastq.py => util/test/test_conversion.py} (57%) diff --git a/htsworkflow/pipelines/desplit_fastq.py b/htsworkflow/pipelines/desplit_fastq.py new file mode 100644 index 0000000..85ad586 --- /dev/null +++ b/htsworkflow/pipelines/desplit_fastq.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +"""Write fastq data from multiple compressed files into a single file +""" + +from glob import glob +import os +from optparse import OptionParser +import sys + +from htsworkflow.version import version +from htsworkflow.util.opener import autoopen +from htsworkflow.util.conversion import parse_slice + +SEQ_HEADER = 0 +SEQUENCE = 1 +QUAL_HEADER = 2 +QUALITY = 3 +INVALID = -1 + + +def main(cmdline=None): + """Command line driver: [None, 'option', '*.fastq.bz2'] + """ + parser = make_parser() + opts, args = parser.parse_args(cmdline) + + if opts.version: + print (version()) + return 0 + + if opts.output is None: + output = open(opts.output, 'w') + else: + output = sys.stdout + + desplitter = DesplitFastq(file_generator(args), output) + desplitter.trim = parse_slice(opts.slice) + desplitter.run() + + return 0 + + +def make_parser(): + """Generate an option parser for above main function""" + + usage = '%prog: [options] *.fastq.gz' + parser = OptionParser(usage) + + parser.add_option('-o', '--output', default=None, + help='output fastq file') + parser.add_option('-s', '--slice', + help="specify python slice, e.g. 0:75, 0:-1", + default=None) + parser.add_option("--version", default=False, action="store_true", + help="report software version") + return parser + + +def file_generator(pattern_list): + """Given a list of glob patterns return decompressed streams + """ + for pattern in pattern_list: + for filename in glob(pattern): + yield autoopen(filename, 'r') + + +class DesplitFastq(object): + """Merge multiple fastq files into a single file""" + def __init__(self, sources, destination): + self.sources = sources + self.destination = destination + + self.making_fastq = True + self.trim = slice(None) + + def run(self): + """Do the conversion + + This is here so we can run via threading/multiprocessing APIs + """ + state = SEQ_HEADER + for stream in self.sources: + for line in stream: + line = line.rstrip() + if state == SEQ_HEADER: + self.destination.write(line) + state = SEQUENCE + elif state == SEQUENCE: + self.destination.write(line[self.trim]) + state = QUAL_HEADER + elif state == QUAL_HEADER: + self.destination.write(line) + state = QUALITY + elif state == QUALITY: + self.destination.write(line[self.trim]) + state = SEQ_HEADER + self.destination.write(os.linesep) + + +if __name__ == "__main__": + main() diff --git a/htsworkflow/pipelines/qseq2fastq.py b/htsworkflow/pipelines/qseq2fastq.py index ac44c1e..2f017eb 100644 --- a/htsworkflow/pipelines/qseq2fastq.py +++ b/htsworkflow/pipelines/qseq2fastq.py @@ -1,4 +1,6 @@ #!/usr/bin/env python +"""Convert a collection of qseq or a tar file of qseq files to a fastq file +""" from glob import glob import os from optparse import OptionParser @@ -7,22 +9,26 @@ import sys import tarfile from htsworkflow.version import version +from htsworkflow.util.conversion import parse_slice + def main(cmdline=None): + """Command line driver: [None, '-i', 'tarfile', '-o', 'target.fastq'] + """ parser = make_parser() opts, args = parser.parse_args(cmdline) if opts.version: print version() return 0 - + if opts.infile is not None: qseq_generator = tarfile_generator(opts.infile) elif len(args) > 0: qseq_generator = file_generator(args) else: qseq_generator = [sys.stdin] - + if opts.output is not None: output = open(opts.output, 'w') else: @@ -43,19 +49,21 @@ def main(cmdline=None): def make_parser(): + """Return option parser""" usage = "%prog: [options] *_qseq.txt\nProduces Phred33 files by default" parser = OptionParser(usage) parser.add_option("-a", "--fasta", default=False, action="store_true", help="produce fasta files instead of fastq files") - parser.add_option("-f", "--flowcell", default=None, + parser.add_option("-f", "--flowcell", default=None, help="Set flowcell ID for output file") parser.add_option("-i", "--infile", default=None, - help='source tar file (if reading from an archive instead of a directory)') + help='source tar file (if reading from an archive '\ + 'instead of a directory)') parser.add_option("-o", "--output", default=None, help="output fastq file") parser.add_option("-n", "--nopass-output", default=None, - help="if provided send files that failed illumina filter "\ - "to a differentfile") + help="if provided send files that failed "\ + "illumina filter to a differentfile") parser.add_option("-s", "--slice", help="specify python slice, e.g. 0:75, 0:-1", default=None) @@ -63,64 +71,24 @@ def make_parser(): action="store_true") parser.add_option("--version", default=False, action="store_true", help="report software version") - - return parser - - -def file_generator(pattern_list): - for pattern in pattern_list: - for filename in glob(pattern): - yield open(filename,"r") - - -def tarfile_generator(tarfilename): - archive = tarfile.open(tarfilename,'r|*') - for tarinfo in archive: - yield archive.extractfile(tarinfo) + return parser -def parse_slice(slice_text): - if slice_text is None or len(slice_text) == 0: - return slice(None) - - slice_data = [] - for element in slice_text.split(':'): - if len(element) == 0: - element = None - else: - element = int(element) - slice_data.append(element) - - return slice(*slice_data) - def file_generator(pattern_list): + """Given a list of glob patterns yield open streams for matching files""" for pattern in pattern_list: for filename in glob(pattern): - yield open(filename,"r") + yield open(filename, "r") def tarfile_generator(tarfilename): - archive = tarfile.open(tarfilename,'r|*') + """Yield open streams for files inside a tarfile""" + archive = tarfile.open(tarfilename, 'r|*') for tarinfo in archive: yield archive.extractfile(tarinfo) -def parse_slice(slice_text): - if slice_text is None or len(slice_text) == 0: - return slice(None) - - slice_data = [] - for element in slice_text.split(':'): - if len(element) == 0: - element = None - else: - element = int(element) - slice_data.append(element) - - return slice(*slice_data) - - class Qseq2Fastq(object): """ Convert qseq files to fastq (or fasta) files. @@ -132,47 +100,33 @@ class Qseq2Fastq(object): self.nopass_destination = nopass_destination else: self.nopass_destination = pass_destination - + self.fastq = True self.flowcell_id = None self.trim = slice(None) - self.reportFilter = False + self.report_filter = False def _format_flowcell_id(self): """ Return formatted flowcell ID """ if self.flowcell_id is not None: - return self.flowcell_id+"_" + return self.flowcell_id + "_" else: return "" - - def _convert_illumina_quality(self, illumina_quality): - """ - Convert an Illumina ASCII encoded quality score to a Phred ASCII quality score. - """ - # Illumina scores are Phred + 64 - # Fastq scores are Phread + 33 - # the following code grabs the string, converts to short ints and - # subtracts 31 (64-33) to convert between the two score formats. - # The numpy solution is twice as fast as some of my other - # ideas for the conversion. - # sorry about the uglyness in changing from character, to 8-bit int - # and back to a character array - quality = numpy.asarray(illumina_quality,'c') - quality.dtype = numpy.uint8 - quality -= 31 - quality.dtype = '|S1' # I'd like to know what the real numpy char type is - return quality - + def run(self): + """Run conversion + (Used to match threading/multiprocessing API) + """ if self.fastq: - header_template = '@' + header_template = '@' else: # fasta case header_template = '>' - header_template += self._format_flowcell_id() + '%s_%s:%s:%s:%s:%s/%s%s%s' - + header_template += self._format_flowcell_id() + \ + '%s_%s:%s:%s:%s:%s/%s%s%s' + for qstream in self.sources: for line in qstream: # parse line @@ -183,19 +137,19 @@ class Qseq2Fastq(object): tile = record[3] x = record[4] y = record[5] - index = record[6] + #index = record[6] read = record[7] - sequence = record[8].replace('.','N') - quality = self._convert_illumina_quality(record[9]) + sequence = record[8].replace('.', 'N') + quality = convert_illumina_quality(record[9]) # add pass qc filter if we want it pass_qc = int(record[10]) - if self.reportFilter: + if self.report_filter: pass_qc_msg = " pf=%s" % (pass_qc) else: pass_qc_msg = "" - header = header_template % ( \ + header = header_template % ( \ machine_name, run_number, lane_number, @@ -206,13 +160,12 @@ class Qseq2Fastq(object): pass_qc_msg, os.linesep) - # if we passed the filter write to the "good" file if pass_qc: destination = self.pass_destination else: destination = self.nopass_destination - + destination.write(header) destination.write(sequence[self.trim]) destination.write(os.linesep) @@ -222,5 +175,24 @@ class Qseq2Fastq(object): destination.write(quality[self.trim].tostring()) destination.write(os.linesep) +def convert_illumina_quality(illumina_quality): + """Convert an Illumina quality score to a Phred ASCII quality score. + """ + # Illumina scores are Phred + 64 + # Fastq scores are Phread + 33 + # the following code grabs the string, converts to short ints and + # subtracts 31 (64-33) to convert between the two score formats. + # The numpy solution is twice as fast as some of my other + # ideas for the conversion. + # sorry about the uglyness in changing from character, to 8-bit int + # and back to a character array + quality = numpy.asarray(illumina_quality, 'c') + quality.dtype = numpy.uint8 + quality -= 31 + # I'd like to know what the real numpy char type is + quality.dtype = '|S1' + return quality + + if __name__ == "__main__": main() diff --git a/htsworkflow/util/conversion.py b/htsworkflow/util/conversion.py index 18cabef..d3eb4f6 100644 --- a/htsworkflow/util/conversion.py +++ b/htsworkflow/util/conversion.py @@ -14,7 +14,7 @@ def unicode_or_none(value): def parse_flowcell_id(flowcell_id): """ Return flowcell id and any status encoded in the id - + We stored the status information in the flowcell id name. this was dumb, but database schemas are hard to update. """ @@ -26,4 +26,19 @@ def parse_flowcell_id(flowcell_id): if len(fields) > 1: status = fields[1] return fcid, status - + +def parse_slice(slice_text): + if slice_text is None or len(slice_text) == 0: + return slice(None) + + slice_data = [] + for element in slice_text.split(':'): + if len(element) == 0: + element = None + else: + element = int(element) + slice_data.append(element) + + return slice(*slice_data) + + diff --git a/htsworkflow/pipelines/test/test_qseq2fastq.py b/htsworkflow/util/test/test_conversion.py similarity index 57% rename from htsworkflow/pipelines/test/test_qseq2fastq.py rename to htsworkflow/util/test/test_conversion.py index 1c32924..9100625 100644 --- a/htsworkflow/pipelines/test/test_qseq2fastq.py +++ b/htsworkflow/util/test/test_conversion.py @@ -2,21 +2,21 @@ import unittest -import htsworkflow.pipelines.qseq2fastq as qseq2fastq +from htsworkflow.util import conversion -class TestQseq2Fastq(unittest.TestCase): +class TestConversion(unittest.TestCase): def test_parse_slice(self): - s = qseq2fastq.parse_slice("1:") + s = conversion.parse_slice("1:") self.failUnlessEqual(s.start, 1) self.failUnlessEqual(s.stop, None) - s = qseq2fastq.parse_slice("0:2") + s = conversion.parse_slice("0:2") self.failUnlessEqual(s.start, 0) self.failUnlessEqual(s.stop, 2) def suite(): - return unittest.makeSuite(TestQseq2Fastq, 'test') + return unittest.makeSuite(TestConversion, 'test') if __name__ == "__main__": unittest.main(defaultTest="suite") - + -- 2.30.2