Core functions for saving and finding fastq files generated by a HiSeq.
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import logging
5 import os
6 import re
7
8 LOGGER = logging.getLogger(__name__)
9
10 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
11 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
12 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
13
14 SEQUENCE_TABLE_NAME = "sequences"
15 def create_sequence_table(cursor):
16     """
17     Create a SQL table to hold  SequenceFile entries
18     """
19     sql = """
20 CREATE TABLE %(table)s (
21   filetype   CHAR(8),
22   path       TEXT,
23   flowcell   CHAR(8),
24   lane       INTEGER,
25   read       INTEGER,
26   pf         BOOLEAN,
27   cycle      CHAR(8)
28 );
29 """ %( {'table': SEQUENCE_TABLE_NAME} )
30     return cursor.execute(sql)
31
32 class SequenceFile(object):
33     """
34     Simple container class that holds the path to a sequence archive
35     and basic descriptive information.
36     """
37     def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None,
38                  project=None,
39                  index=None):
40         self.filetype = filetype
41         self.path = path
42         self.flowcell = flowcell
43         self.lane = lane
44         self.read = read
45         self.pf = pf
46         self.cycle = cycle
47         self.project = project
48         self.index = index
49
50     def __hash__(self):
51         return hash(self.key())
52
53     def key(self):
54         return (self.flowcell, self.lane)
55
56     def unicode(self):
57         return unicode(self.path)
58
59     def __eq__(self, other):
60         """
61         Equality is defined if everything but the path matches
62         """
63         attributes = ['filetype','flowcell', 'lane', 'read', 'pf', 'cycle', 'project', 'index']
64         for a in attributes:
65             if getattr(self, a) != getattr(other, a):
66                 return False
67
68         return True
69
70     def __repr__(self):
71         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
72
73     def make_target_name(self, root):
74         """
75         Create target name for where we need to link this sequence too
76         """
77         path, basename = os.path.split(self.path)
78         # Because the names aren't unque we include the flowcel name
79         # because there were different eland files for different length
80         # analyses, we include the cycle length in the name.
81         if self.filetype == 'eland':
82             template = "%(flowcell)s_%(cycle)s_%(eland)s"
83             basename = template % { 'flowcell': self.flowcell,
84                                     'cycle': self.cycle,
85                                     'eland': basename }
86         # else:
87         # all the other file types have names that include flowcell/lane
88         # information and thus are unique so we don't have to do anything
89         return os.path.join(root, basename)
90
91     def save(self, cursor):
92         """
93         Add this entry to a DB2.0 database.
94         """
95         #FIXME: NEEDS SQL ESCAPING
96         header_macro = {'table': SEQUENCE_TABLE_NAME }
97         sql_header = "insert into %(table)s (" % header_macro
98         sql_columns = ['filetype','path','flowcell','lane']
99         sql_middle = ") values ("
100         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
101         sql_footer = ");"
102         for name in ['read', 'pf', 'cycle']:
103             value = getattr(self, name)
104             if value is not None:
105                 sql_columns.append(name)
106                 sql_values.append(value)
107
108         sql = " ".join([sql_header,
109                         ", ".join(sql_columns),
110                         sql_middle,
111                         # note the following makes a string like ?,?,?
112                         ",".join(["?"] * len(sql_values)),
113                         sql_footer])
114
115         return cursor.execute(sql, sql_values)
116
117 def get_flowcell_cycle(path):
118     """
119     Extract flowcell, cycle from pathname
120     """
121     project = None
122     rest, tail = os.path.split(path)
123     if tail.startswith('Project_'):
124         # we're in a multiplexed sample
125         project = tail
126         rest, cycle = os.path.split(rest)
127     else:
128         cycle = tail
129
130     rest, flowcell = os.path.split(rest)
131     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
132     if cycle_match is None:
133         raise ValueError(
134             "Expected .../flowcell/cycle/ directory structure in %s" % \
135             (path,))
136     start = cycle_match.group('start')
137     if start is not None:
138         start = int(start)
139     stop = cycle_match.group('stop')
140     if stop is not None:
141         stop = int(stop)
142
143     return flowcell, start, stop, project
144
145 def parse_srf(path, filename):
146     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
147     basename, ext = os.path.splitext(filename)
148     records = basename.split('_')
149     flowcell = records[4]
150     lane = int(records[5][0])
151     fullpath = os.path.join(path, filename)
152
153     if flowcell_dir != flowcell:
154         LOGGER.warn("flowcell %s found in wrong directory %s" % \
155                          (flowcell, path))
156
157     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
158
159 def parse_qseq(path, filename):
160     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
161     basename, ext = os.path.splitext(filename)
162     records = basename.split('_')
163     fullpath = os.path.join(path, filename)
164     flowcell = records[4]
165     lane = int(records[5][1])
166     read = int(records[6][1])
167
168     if flowcell_dir != flowcell:
169         LOGGER.warn("flowcell %s found in wrong directory %s" % \
170                          (flowcell, path))
171
172     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
173
174 def parse_fastq(path, filename):
175     """Parse fastq names
176     """
177     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
178     basename, ext = os.path.splitext(filename)
179     records = basename.split('_')
180     fullpath = os.path.join(path, filename)
181     if project is not None:
182         # demultiplexed sample!
183         flowcell = flowcell_dir
184         lane = int(records[2][-1])
185         read = int(records[3][-1])
186         pf = True # as I understand it hiseq runs toss the ones that fail filter
187         index = records[1]
188         project_id = records[0]
189     else:
190         flowcell = records[4]
191         lane = int(records[5][1])
192         read = int(records[6][1])
193         pf = parse_fastq_pf_flag(records)
194         index = None
195         project_id = None
196
197     if flowcell_dir != flowcell:
198         LOGGER.warn("flowcell %s found in wrong directory %s" % \
199                          (flowcell, path))
200
201     return SequenceFile('fastq', fullpath, flowcell, lane, read,
202                         pf=pf,
203                         cycle=stop,
204                         project=project_id,
205                         index=index)
206
207 def parse_fastq_pf_flag(records):
208     """Take a fastq filename split on _ and look for the pass-filter flag
209     """
210     if len(records) < 8:
211         pf = None
212     else:
213         fastq_type = records[-1].lower()
214         if fastq_type.startswith('pass'):
215             pf = True
216         elif fastq_type.startswith('nopass'):
217             pf = False
218         elif fastq_type.startswith('all'):
219             pf = None
220         else:
221             raise ValueError("Unrecognized fastq name %s at %s" % \
222                              (records[-1], os.path.join(path,filename)))
223
224     return pf
225
226 def parse_eland(path, filename, eland_match=None):
227     if eland_match is None:
228         eland_match = eland_re.match(filename)
229     fullpath = os.path.join(path, filename)
230     flowcell, start, stop, project = get_flowcell_cycle(path)
231     if eland_match.group('lane'):
232         lane = int(eland_match.group('lane'))
233     else:
234         lane = None
235     if eland_match.group('read'):
236         read = int(eland_match.group('read'))
237     else:
238         read = None
239     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
240
241 def scan_for_sequences(dirs):
242     """
243     Scan through a list of directories for sequence like files
244     """
245     sequences = []
246     for d in dirs:
247         LOGGER.info("Scanning %s for sequences" % (d,))
248         if not os.path.exists(d):
249             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
250             continue
251
252         for path, dirname, filenames in os.walk(d):
253             for f in filenames:
254                 seq = None
255                 # find sequence files
256                 if raw_seq_re.match(f):
257                     if f.endswith('.md5'):
258                         continue
259                     elif f.endswith('.srf') or f.endswith('.srf.bz2'):
260                         seq = parse_srf(path, f)
261                     elif qseq_re.match(f):
262                         seq = parse_qseq(path, f)
263                     elif f.endswith('.fastq') or \
264                          f.endswith('.fastq.bz2') or \
265                          f.endswith('.fastq.gz'):
266                         seq = parse_fastq(path, f)
267                 eland_match = eland_re.match(f)
268                 if eland_match:
269                     if f.endswith('.md5'):
270                         continue
271                     seq = parse_eland(path, f, eland_match)
272                 if seq:
273                     sequences.append(seq)
274                     LOGGER.debug("Found sequence at %s" % (f,))
275
276     return sequences