Add validation script
[htsworkflow.git] / htsworkflow / util / validate.py
1 #!/usr/bin/env python
2
3 from optparse import OptionParser
4 import os
5 import re
6 import sys
7
8 def main(cmdline=None):
9     parser = make_parser()
10     opts, args = parser.parse_args(cmdline)
11
12     for filename in args[1:]:
13         stream = open(filename, 'r')
14         if opts.fastq:
15             validate_fastq(f, opts.uniform_lengths)
16         stream.close()
17     return 0
18
19 def make_parser():
20     parser = OptionParser()
21     parser.add_option("--fastq", action="store_true", default=False,
22                       help="verify arguments are valid fastq file")
23     parser.add_option("--uniform-lengths", action="store_true", default=False,
24                       help="require all reads to be of the same length")
25                       
26     return parser
27
28
29 def validate_fastq(stream, uniform_length=False):
30     """Validate that a fastq file isn't corrupted
31
32     uniform_length - requires that all sequence & qualities must be
33                      the same lengths.
34
35     returns number of errors found
36     """
37     FQ_NONE = 0
38     FQ_H1 = 1
39     FQ_SEQ = 2
40     FQ_H2 = 3
41     FQ_QUAL = 4
42     h1_re = re.compile("^>[ \t\w]*$")
43     seq_re = re.compile("^[AGCT.N]+$", re.IGNORECASE)
44     h2_re = re.compile("^@[ \t\w]*$")
45     phred33 = re.compile("^[!\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJ]+$")
46     phred64 = re.compile("^[@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefgh]+$")
47
48     state = FQ_H1
49     length = None
50     line_number = 1
51     errors = 0
52     for line in stream:
53         line = line.rstrip()
54         if state == FQ_H1:
55             # reset length at start of new record for non-uniform check
56             if not uniform_length:
57                 length = None
58             # start of record checks
59             errors = validate_re(h1_re, line, line_number, errors,
60                                  "FAIL H1")
61             state = FQ_SEQ
62         elif state == FQ_SEQ:
63             errors = validate_re(seq_re, line, line_number, errors,
64                                  "FAIL SEQ")
65             length, errors = validate_length(line, length, line_number,
66                                              errors,
67                                              "FAIL SEQ LEN")
68             state = FQ_H2
69         elif state == FQ_H2:
70             errors = validate_re(h2_re, line, line_number, errors, "FAIL H2")
71             state = FQ_QUAL
72         elif state == FQ_QUAL:
73             errors = validate_re(phred64, line, line_number, errors,
74                                  "FAIL QUAL")
75             length, errors = validate_length(line, length, line_number, errors,
76                                             "FAIL QUAL LEN")
77             state = FQ_H1
78         else:
79             raise RuntimeError("Invalid state: %d" % (state,))
80         line_number += 1
81     return errors
82
83 def validate_re(pattern, line, line_number, error_count, errmsg):
84     if pattern.match(line) is None:
85         print errmsg, "[%d]: %s" % (line_number, line)
86         error_count += 1
87     return error_count
88
89 def validate_length(line, line_length, line_number, error_count, errmsg):
90     """
91     if line_length is None, sets it
92     """
93     if line_length is None:
94         line_length = len(line)
95     elif len(line) != line_length:
96         print errmsg, "%d: %s" %(line_number, line)
97         error_count += 1
98     return line_length, error_count
99