Implement writing to compressed files for qseq2fastq
[htsworkflow.git] / htsworkflow / pipelines / desplit_fastq.py
1 #!/usr/bin/env python
2 """Write fastq data from multiple compressed files into a single file
3 """
4 import bz2
5 import gzip
6 from glob import glob
7 import os
8 from optparse import OptionParser
9 import sys
10
11 from htsworkflow.util.version import version
12 from htsworkflow.util.opener import autoopen
13 from htsworkflow.util.conversion import parse_slice
14
15 SEQ_HEADER = 0
16 SEQUENCE = 1
17 QUAL_HEADER = 2
18 QUALITY = 3
19 INVALID = -1
20
21
22 def main(cmdline=None):
23     """Command line driver: [None, 'option', '*.fastq.bz2']
24     """
25     parser = make_parser()
26     opts, args = parser.parse_args(cmdline)
27
28     if opts.version:
29         print (version())
30         return 0
31
32     if opts.output is not None:
33         output = open_output(opts.output, opts)
34     else:
35         output = sys.stdout
36
37     desplitter = DesplitFastq(file_generator(args), output)
38     desplitter.trim = parse_slice(opts.slice)
39     desplitter.run()
40
41     return 0
42
43
44 def make_parser():
45     """Generate an option parser for above main function"""
46
47     usage = '%prog: [options] *.fastq.gz'
48     parser = OptionParser(usage)
49
50     parser.add_option('-o', '--output', default=None,
51                       help='output fastq file')
52     parser.add_option('-s', '--slice',
53                       help="specify python slice, e.g. 0:75, 0:-1",
54                       default=None)
55     parser.add_option('--gzip', default=False, action='store_true',
56                       help='gzip output')
57     parser.add_option('--bzip', default=False, action='store_true',
58                       help='bzip output')
59     parser.add_option("--version", default=False, action="store_true",
60                       help="report software version")
61     return parser
62
63
64 def open_output(output, opts):
65     """Open output file with right compression library
66     """
67     if opts.bzip:
68         return bz2.open(output, 'wt')
69     elif opts.gzip:
70         return gzip.open(output, 'wt')
71     else:
72         return open(output, 'w')
73
74
75 def file_generator(pattern_list):
76     """Given a list of glob patterns return decompressed streams
77     """
78     for pattern in pattern_list:
79         for filename in glob(pattern):
80             yield autoopen(filename, 'rt')
81
82
83 class DesplitFastq(object):
84     """Merge multiple fastq files into a single file"""
85     def __init__(self, sources, destination):
86         self.sources = sources
87         self.destination = destination
88
89         self.making_fastq = True
90         self.trim = slice(None)
91
92     def run(self):
93         """Do the conversion
94
95         This is here so we can run via threading/multiprocessing APIs
96         """
97         state = SEQ_HEADER
98         files_read = 0
99         for stream in self.sources:
100             files_read += 1
101             for line in stream:
102                 line = line.rstrip()
103                 if state == SEQ_HEADER:
104                     self.destination.write(line)
105                     state = SEQUENCE
106                 elif state == SEQUENCE:
107                     self.destination.write(line[self.trim])
108                     state = QUAL_HEADER
109                 elif state == QUAL_HEADER:
110                     self.destination.write(line)
111                     state = QUALITY
112                 elif state == QUALITY:
113                     self.destination.write(line[self.trim])
114                     state = SEQ_HEADER
115                 self.destination.write(os.linesep)
116
117         if files_read == 0:
118             raise RuntimeError("No files processed")
119
120 if __name__ == "__main__":
121     main()