2ad853c1fe2053f91ffaf19268264b159798e520
[htsworkflow.git] / htsworkflow / pipelines / desplit_fastq.py
1 #!/usr/bin/env python
2 """Write fastq data from multiple compressed files into a single file
3 """
4
5 from glob import glob
6 import os
7 from optparse import OptionParser
8 import sys
9
10 from htsworkflow.util.version import version
11 from htsworkflow.util.opener import autoopen
12 from htsworkflow.util.conversion import parse_slice
13
14 SEQ_HEADER = 0
15 SEQUENCE = 1
16 QUAL_HEADER = 2
17 QUALITY = 3
18 INVALID = -1
19
20
21 def main(cmdline=None):
22     """Command line driver: [None, 'option', '*.fastq.bz2']
23     """
24     parser = make_parser()
25     opts, args = parser.parse_args(cmdline)
26
27     if opts.version:
28         print (version())
29         return 0
30
31     if opts.output is not None:
32         output = open(opts.output, 'w')
33     else:
34         output = sys.stdout
35
36     desplitter = DesplitFastq(file_generator(args), output)
37     desplitter.trim = parse_slice(opts.slice)
38     desplitter.run()
39
40     return 0
41
42
43 def make_parser():
44     """Generate an option parser for above main function"""
45
46     usage = '%prog: [options] *.fastq.gz'
47     parser = OptionParser(usage)
48
49     parser.add_option('-o', '--output', default=None,
50                       help='output fastq file')
51     parser.add_option('-s', '--slice',
52                       help="specify python slice, e.g. 0:75, 0:-1",
53                       default=None)
54     parser.add_option("--version", default=False, action="store_true",
55                       help="report software version")
56     return parser
57
58
59 def file_generator(pattern_list):
60     """Given a list of glob patterns return decompressed streams
61     """
62     for pattern in pattern_list:
63         for filename in glob(pattern):
64             yield autoopen(filename, 'r')
65
66
67 class DesplitFastq(object):
68     """Merge multiple fastq files into a single file"""
69     def __init__(self, sources, destination):
70         self.sources = sources
71         self.destination = destination
72
73         self.making_fastq = True
74         self.trim = slice(None)
75
76     def run(self):
77         """Do the conversion
78
79         This is here so we can run via threading/multiprocessing APIs
80         """
81         state = SEQ_HEADER
82         for stream in self.sources:
83             for line in stream:
84                 line = line.rstrip()
85                 if state == SEQ_HEADER:
86                     self.destination.write(line)
87                     state = SEQUENCE
88                 elif state == SEQUENCE:
89                     self.destination.write(line[self.trim])
90                     state = QUAL_HEADER
91                 elif state == QUAL_HEADER:
92                     self.destination.write(line)
93                     state = QUALITY
94                 elif state == QUALITY:
95                     self.destination.write(line[self.trim])
96                     state = SEQ_HEADER
97                 self.destination.write(os.linesep)
98
99
100 if __name__ == "__main__":
101     main()