2 from optparse import OptionParser
8 LOGGER = logging.getLogger(__name__)
10 def main(cmdline=None):
11 parser = make_parser()
12 opts, args = parser.parse_args(cmdline)
14 error_happened = False
15 for filename in args[1:]:
16 stream = open(filename, 'r')
19 errors = validate_fastq(stream,
24 LOGGER.error("%s failed validation", filename)
35 parser = OptionParser()
36 parser.add_option("--fastq", action="store_true", default=False,
37 help="verify arguments are valid fastq file")
38 parser.add_option("--uniform-lengths", action="store_true", default=False,
39 help="require all reads to be of the same length")
40 parser.add_option("--max-errors", type="int", default=None)
41 encodings=['phred33', 'phred64']
42 parser.add_option("--format", type="choice",
45 help="choose quality encoding one of: %s" % (", ".join(encodings)))
50 def validate_fastq(stream, format='phred33', uniform_length=False, max_errors=None):
51 """Validate that a fastq file isn't corrupted
53 uniform_length - requires that all sequence & qualities must be
56 returns number of errors found
63 h1_re = re.compile("^@[\s\w:-]*$")
64 seq_re = re.compile("^[AGCT.N]+$", re.IGNORECASE)
65 h2_re = re.compile("^\+[\s\w:-]*$")
66 phred33 = re.compile("^[!\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJ]+$")
67 phred64 = re.compile("^[@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefgh]+$")
69 if format == 'phred33':
71 elif format == 'phred64':
74 raise ValueError("Unrecognized quality format name")
84 # reset length at start of new record for non-uniform check
85 if not uniform_length:
87 # start of record checks
88 errors += validate_re(h1_re, line, line_number, "FAIL H1")
91 errors += validate_re(seq_re, line, line_number, "FAIL SEQ")
92 length, len_errors = validate_length(line, length, line_number,
97 errors += validate_re(h2_re, line, line_number, "FAIL H2")
99 elif state == FQ_QUAL:
100 errors += validate_re(quality_re, line, line_number, "FAIL QUAL")
101 length, len_errors = validate_length(line, length, line_number,
106 raise RuntimeError("Invalid state: %d" % (state,))
108 if max_errors is not None and errors > max_errors:
113 def validate_re(pattern, line, line_number, errmsg):
114 if pattern.match(line) is None:
115 LOGGER.error("%s [%d]: %s", errmsg, line_number, line)
120 def validate_length(line, line_length, line_number, errmsg):
122 if line_length is None, sets it
125 if line_length is None:
126 line_length = len(line)
127 elif len(line) != line_length:
128 LOGGER.error("%s %d: %s", errmsg, line_number, line)
130 return line_length, error_count