3 from optparse import OptionParser
8 def main(cmdline=None):
10 opts, args = parser.parse_args(cmdline)
12 error_happened = False
13 for filename in args[1:]:
14 stream = open(filename, 'r')
17 errors = validate_fastq(stream,
22 print "%s failed validation" % (filename,)
33 parser = OptionParser()
34 parser.add_option("--fastq", action="store_true", default=False,
35 help="verify arguments are valid fastq file")
36 parser.add_option("--uniform-lengths", action="store_true", default=False,
37 help="require all reads to be of the same length")
38 parser.add_option("--max-errors", type="int", default=None)
39 encodings=['phred33', 'phred64']
40 parser.add_option("--format", type="choice",
43 help="choose quality encoding one of: %s" % (", ".join(encodings)))
48 def validate_fastq(stream, format='phred33', uniform_length=False, max_errors=None):
49 """Validate that a fastq file isn't corrupted
51 uniform_length - requires that all sequence & qualities must be
54 returns number of errors found
61 h1_re = re.compile("^@[\s\w:-]*$")
62 seq_re = re.compile("^[AGCT.N]+$", re.IGNORECASE)
63 h2_re = re.compile("^\+[\s\w:-]*$")
64 phred33 = re.compile("^[!\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJ]+$")
65 phred64 = re.compile("^[@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefgh]+$")
67 if format == 'phred33':
69 elif format == 'phred64':
72 raise ValueError("Unrecognized quality format name")
82 # reset length at start of new record for non-uniform check
83 if not uniform_length:
85 # start of record checks
86 errors += validate_re(h1_re, line, line_number, "FAIL H1")
89 errors += validate_re(seq_re, line, line_number, "FAIL SEQ")
90 length, len_errors = validate_length(line, length, line_number,
95 errors += validate_re(h2_re, line, line_number, "FAIL H2")
97 elif state == FQ_QUAL:
98 errors += validate_re(quality_re, line, line_number, "FAIL QUAL")
99 length, len_errors = validate_length(line, length, line_number,
104 raise RuntimeError("Invalid state: %d" % (state,))
106 if max_errors is not None and errors > max_errors:
111 def validate_re(pattern, line, line_number, errmsg):
112 if pattern.match(line) is None:
113 print errmsg, "[%d]: %s" % (line_number, line)
118 def validate_length(line, line_length, line_number, errmsg):
120 if line_length is None, sets it
123 if line_length is None:
124 line_length = len(line)
125 elif len(line) != line_length:
126 print errmsg, "%d: %s" %(line_number, line)
128 return line_length, error_count