Indicate when desplit_fastq didn't get any files to process
[htsworkflow.git] / htsworkflow / pipelines / desplit_fastq.py
1 #!/usr/bin/env python
2 """Write fastq data from multiple compressed files into a single file
3 """
4 import bz2
5 import gzip
6 from glob import glob
7 import os
8 from optparse import OptionParser
9 import sys
10
11 from htsworkflow.util.version import version
12 from htsworkflow.util.opener import autoopen
13 from htsworkflow.util.conversion import parse_slice
14
15 SEQ_HEADER = 0
16 SEQUENCE = 1
17 QUAL_HEADER = 2
18 QUALITY = 3
19 INVALID = -1
20
21
22 def main(cmdline=None):
23     """Command line driver: [None, 'option', '*.fastq.bz2']
24     """
25     parser = make_parser()
26     opts, args = parser.parse_args(cmdline)
27
28     if opts.version:
29         print (version())
30         return 0
31
32     if opts.output is not None:
33         if opts.bzip:
34             output = bz2.BZ2File(opts.output,'w')
35         elif opts.gzip:
36             output = gzip.GzipFile(opts.output, 'w')
37         else:
38             output = open(opts.output, 'w')
39     else:
40         output = sys.stdout
41
42     desplitter = DesplitFastq(file_generator(args), output)
43     desplitter.trim = parse_slice(opts.slice)
44     desplitter.run()
45
46     return 0
47
48
49 def make_parser():
50     """Generate an option parser for above main function"""
51
52     usage = '%prog: [options] *.fastq.gz'
53     parser = OptionParser(usage)
54
55     parser.add_option('-o', '--output', default=None,
56                       help='output fastq file')
57     parser.add_option('-s', '--slice',
58                       help="specify python slice, e.g. 0:75, 0:-1",
59                       default=None)
60     parser.add_option('--gzip', default=False, action='store_true',
61                       help='gzip output')
62     parser.add_option('--bzip', default=False, action='store_true',
63                       help='bzip output')
64     parser.add_option("--version", default=False, action="store_true",
65                       help="report software version")
66     return parser
67
68
69 def file_generator(pattern_list):
70     """Given a list of glob patterns return decompressed streams
71     """
72     for pattern in pattern_list:
73         for filename in glob(pattern):
74             yield autoopen(filename, 'r')
75
76
77 class DesplitFastq(object):
78     """Merge multiple fastq files into a single file"""
79     def __init__(self, sources, destination):
80         self.sources = sources
81         self.destination = destination
82
83         self.making_fastq = True
84         self.trim = slice(None)
85
86     def run(self):
87         """Do the conversion
88
89         This is here so we can run via threading/multiprocessing APIs
90         """
91         state = SEQ_HEADER
92         files_read = 0
93         for stream in self.sources:
94             files_read += 1
95             for line in stream:
96                 line = line.rstrip()
97                 if state == SEQ_HEADER:
98                     self.destination.write(line)
99                     state = SEQUENCE
100                 elif state == SEQUENCE:
101                     self.destination.write(line[self.trim])
102                     state = QUAL_HEADER
103                 elif state == QUAL_HEADER:
104                     self.destination.write(line)
105                     state = QUALITY
106                 elif state == QUALITY:
107                     self.destination.write(line[self.trim])
108                     state = SEQ_HEADER
109                 self.destination.write(os.linesep)
110
111         if files_read == 0:
112             raise RuntimeError("No files processed")
113
114 if __name__ == "__main__":
115     main()