rest, flowcell = os.path.split(rest)
cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
if cycle_match is None:
- raise ValueError("Expected .../flowcell/cycle/ directory structure")
+ raise ValueError(
+ "Expected .../flowcell/cycle/ directory structure in %s" % \
+ (path,))
start = cycle_match.group('start')
if start is not None:
start = int(start)
return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
def parse_fastq(path, filename):
+ """Parse fastq names
+ """
flowcell_dir, start, stop = get_flowcell_cycle(path)
basename, ext = os.path.splitext(filename)
records = basename.split('_')
flowcell = records[4]
lane = int(records[5][1])
read = int(records[6][1])
- if records[-1].startswith('pass'):
- pf = True
- elif records[-1].startswith('nopass'):
- pf = False
- else:
- raise ValueError("Unrecognized fastq name")
-
+ pf = parse_fastq_pf_flag(records)
+
if flowcell_dir != flowcell:
logging.warn("flowcell %s found in wrong directory %s" % \
(flowcell, path))
return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf, cycle=stop)
+def parse_fastq_pf_flag(records):
+ """Take a fastq filename split on _ and look for the pass-filter flag
+ """
+ if len(records) < 8:
+ pf = None
+ else:
+ fastq_type = records[-1].lower()
+ if fastq_type.startswith('pass'):
+ pf = True
+ elif fastq_type.startswith('nopass'):
+ pf = False
+ elif fastq_type.startswith('all'):
+ pf = None
+ else:
+ raise ValueError("Unrecognized fastq name %s at %s" % \
+ (records[-1], os.path.join(path,filename)))
+
+ return pf
+
def parse_eland(path, filename, eland_match=None):
if eland_match is None:
eland_match = eland_re.match(filename)