Initial port to python3
[htsworkflow.git] / htsworkflow / util / validate.py
1 #!/usr/bin/env python
2
3 from optparse import OptionParser
4 import os
5 import re
6 import sys
7
8 def main(cmdline=None):
9     parser = make_parser()
10     opts, args = parser.parse_args(cmdline)
11
12     error_happened = False
13     for filename in args[1:]:
14         stream = open(filename, 'r')
15         
16         if opts.fastq:
17             errors = validate_fastq(stream,
18                                     opts.format,
19                                     opts.uniform_lengths,
20                                     opts.max_errors)
21             if errors > 0:
22                 print("%s failed validation" % (filename,))
23                 error_happened = True
24
25         stream.close()
26
27     if error_happened:
28         return 1
29     
30     return 0
31
32 def make_parser():
33     parser = OptionParser()
34     parser.add_option("--fastq", action="store_true", default=False,
35                       help="verify arguments are valid fastq file")
36     parser.add_option("--uniform-lengths", action="store_true", default=False,
37                       help="require all reads to be of the same length")
38     parser.add_option("--max-errors", type="int", default=None)
39     encodings=['phred33', 'phred64']
40     parser.add_option("--format", type="choice",
41                       choices=encodings,
42                       default='phred64',
43                       help="choose quality encoding one of: %s" % (", ".join(encodings)))
44                       
45     return parser
46
47
48 def validate_fastq(stream, format='phred33', uniform_length=False, max_errors=None):
49     """Validate that a fastq file isn't corrupted
50
51     uniform_length - requires that all sequence & qualities must be
52                      the same lengths.
53
54     returns number of errors found
55     """
56     FQ_NONE = 0
57     FQ_H1 = 1
58     FQ_SEQ = 2
59     FQ_H2 = 3
60     FQ_QUAL = 4
61     h1_re = re.compile("^@[\s\w:-]*$")
62     seq_re = re.compile("^[AGCT.N]+$", re.IGNORECASE)
63     h2_re = re.compile("^\+[\s\w:-]*$")
64     phred33 = re.compile("^[!\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJ]+$")
65     phred64 = re.compile("^[@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefgh]+$")
66
67     if format == 'phred33':
68         quality_re = phred33
69     elif format == 'phred64':
70         quality_re = phred64
71     else:
72         raise ValueError("Unrecognized quality format name")
73
74     state = FQ_H1
75     length = None
76     line_number = 1
77     errors = 0
78     for line in stream:
79         line = line.rstrip()
80         len_errors = 0
81         if state == FQ_H1:
82             # reset length at start of new record for non-uniform check
83             if not uniform_length:
84                 length = None
85             # start of record checks
86             errors += validate_re(h1_re, line, line_number, "FAIL H1")
87             state = FQ_SEQ
88         elif state == FQ_SEQ:
89             errors += validate_re(seq_re, line, line_number, "FAIL SEQ")
90             length, len_errors = validate_length(line, length, line_number,
91                                                  "FAIL SEQ LEN")
92             errors += len_errors
93             state = FQ_H2
94         elif state == FQ_H2:
95             errors += validate_re(h2_re, line, line_number, "FAIL H2")
96             state = FQ_QUAL
97         elif state == FQ_QUAL:
98             errors += validate_re(quality_re, line, line_number, "FAIL QUAL")
99             length, len_errors = validate_length(line, length, line_number,
100                                                  "FAIL QUAL LEN")
101             errors += len_errors
102             state = FQ_H1
103         else:
104             raise RuntimeError("Invalid state: %d" % (state,))
105         line_number += 1
106         if max_errors is not None and errors > max_errors:
107             break
108         
109     return errors
110
111 def validate_re(pattern, line, line_number, errmsg):
112     if pattern.match(line) is None:
113         print(errmsg, "[%d]: %s" % (line_number, line))
114         return 1
115     else:
116         return 0
117
118 def validate_length(line, line_length, line_number, errmsg):
119     """
120     if line_length is None, sets it
121     """
122     error_count = 0
123     if line_length is None:
124         line_length = len(line)
125     elif len(line) != line_length:
126         print(errmsg, "%d: %s" %(line_number, line))
127         error_count = 1
128     return line_length, error_count
129