Validate fastq files in both phred33 & phred64 versions
[htsworkflow.git] / htsworkflow / util / validate.py
index f7b821269881c637efcddf24ee428057e2cc5d22..959acc281db0b951d0d0dc2dec17023eddc9be47 100644 (file)
@@ -9,11 +9,24 @@ def main(cmdline=None):
     parser = make_parser()
     opts, args = parser.parse_args(cmdline)
 
+    error_happened = False
     for filename in args[1:]:
         stream = open(filename, 'r')
+        
         if opts.fastq:
-            validate_fastq(f, opts.uniform_lengths)
+            errors = validate_fastq(stream,
+                                    opts.format,
+                                    opts.uniform_lengths,
+                                    opts.max_errors)
+            if errors > 0:
+                print "%s failed validation" % (filename,)
+                error_happened = True
+
         stream.close()
+
+    if error_happened:
+        return 1
+    
     return 0
 
 def make_parser():
@@ -22,11 +35,17 @@ def make_parser():
                       help="verify arguments are valid fastq file")
     parser.add_option("--uniform-lengths", action="store_true", default=False,
                       help="require all reads to be of the same length")
+    parser.add_option("--max-errors", type="int", default=None)
+    encodings=['phred33', 'phred64']
+    parser.add_option("--format", type="choice",
+                      choices=encodings,
+                      default='phred64',
+                      help="choose quality encoding one of: %s" % (", ".join(encodings)))
                       
     return parser
 
 
-def validate_fastq(stream, uniform_length=False):
+def validate_fastq(stream, format='phred33', uniform_length=False, max_errors=None):
     """Validate that a fastq file isn't corrupted
 
     uniform_length - requires that all sequence & qualities must be
@@ -39,61 +58,72 @@ def validate_fastq(stream, uniform_length=False):
     FQ_SEQ = 2
     FQ_H2 = 3
     FQ_QUAL = 4
-    h1_re = re.compile("^>[ \t\w]*$")
+    h1_re = re.compile("^@[\s\w:-]*$")
     seq_re = re.compile("^[AGCT.N]+$", re.IGNORECASE)
-    h2_re = re.compile("^@[ \t\w]*$")
+    h2_re = re.compile("^\+[\s\w:-]*$")
     phred33 = re.compile("^[!\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJ]+$")
     phred64 = re.compile("^[@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefgh]+$")
 
+    if format == 'phred33':
+        quality_re = phred33
+    elif format == 'phred64':
+        quality_re = phred64
+    else:
+        raise ValueError("Unrecognized quality format name")
+
     state = FQ_H1
     length = None
     line_number = 1
     errors = 0
     for line in stream:
         line = line.rstrip()
+        len_errors = 0
         if state == FQ_H1:
             # reset length at start of new record for non-uniform check
             if not uniform_length:
                 length = None
             # start of record checks
-            errors = validate_re(h1_re, line, line_number, errors,
-                                 "FAIL H1")
+            errors += validate_re(h1_re, line, line_number, "FAIL H1")
             state = FQ_SEQ
         elif state == FQ_SEQ:
-            errors = validate_re(seq_re, line, line_number, errors,
-                                 "FAIL SEQ")
-            length, errors = validate_length(line, length, line_number,
-                                             errors,
-                                             "FAIL SEQ LEN")
+            errors += validate_re(seq_re, line, line_number, "FAIL SEQ")
+            length, len_errors = validate_length(line, length, line_number,
+                                                 "FAIL SEQ LEN")
+            errors += len_errors
             state = FQ_H2
         elif state == FQ_H2:
-            errors = validate_re(h2_re, line, line_number, errors, "FAIL H2")
+            errors += validate_re(h2_re, line, line_number, "FAIL H2")
             state = FQ_QUAL
         elif state == FQ_QUAL:
-            errors = validate_re(phred64, line, line_number, errors,
-                                 "FAIL QUAL")
-            length, errors = validate_length(line, length, line_number, errors,
-                                            "FAIL QUAL LEN")
+            errors += validate_re(quality_re, line, line_number, "FAIL QUAL")
+            length, len_errors = validate_length(line, length, line_number,
+                                                 "FAIL QUAL LEN")
+            errors += len_errors
             state = FQ_H1
         else:
             raise RuntimeError("Invalid state: %d" % (state,))
         line_number += 1
+        if max_errors is not None and errors > max_errors:
+            break
+        
     return errors
 
-def validate_re(pattern, line, line_number, error_count, errmsg):
+def validate_re(pattern, line, line_number, errmsg):
     if pattern.match(line) is None:
         print errmsg, "[%d]: %s" % (line_number, line)
-        error_count += 1
-    return error_count
+        return 1
+    else:
+        return 0
 
-def validate_length(line, line_length, line_number, error_count, errmsg):
+def validate_length(line, line_length, line_number, errmsg):
     """
     if line_length is None, sets it
     """
+    error_count = 0
     if line_length is None:
         line_length = len(line)
     elif len(line) != line_length:
         print errmsg, "%d: %s" %(line_number, line)
-        error_count += 1
+        error_count = 1
     return line_length, error_count