Forgot to add split attribute to __eq__ test.
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import collections
5 import logging
6 import os
7 import types
8 import re
9
10 LOGGER = logging.getLogger(__name__)
11
12 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
13 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
14 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
15
16 SEQUENCE_TABLE_NAME = "sequences"
17 def create_sequence_table(cursor):
18     """
19     Create a SQL table to hold  SequenceFile entries
20     """
21     sql = """
22 CREATE TABLE %(table)s (
23   filetype   CHAR(8),
24   path       TEXT,
25   flowcell   CHAR(8),
26   lane       INTEGER,
27   read       INTEGER,
28   pf         BOOLEAN,
29   cycle      CHAR(8)
30 );
31 """ %( {'table': SEQUENCE_TABLE_NAME} )
32     return cursor.execute(sql)
33
34 FlowcellPath = collections.namedtuple('FlowcellPath',
35                                       'flowcell start stop project')
36
37 class SequenceFile(object):
38     """
39     Simple container class that holds the path to a sequence archive
40     and basic descriptive information.
41     """
42     def __init__(self, filetype, path, flowcell, lane,
43                  read=None,
44                  pf=None,
45                  cycle=None,
46                  project=None,
47                  index=None,
48                  split=None):
49         """Store various fields used in our sequence files
50
51         filetype is one of 'qseq', 'srf', 'fastq'
52         path = location of file
53         flowcell = files flowcell id
54         lane = which lane
55         read = which sequencer read, usually 1 or 2
56         pf = did it pass filter
57         cycle = cycle dir name e.g. C1-202
58         project = projed name from HiSeq, probably library ID
59         index = HiSeq barcode index sequence
60         split = file fragment from HiSeq (Since one file is split into many)
61         """
62         self.filetype = filetype
63         self.path = path
64         self.flowcell = flowcell
65         self.lane = lane
66         self.read = read
67         self.pf = pf
68         self.cycle = cycle
69         self.project = project
70         self.index = index
71         self.split = split
72
73     def __hash__(self):
74         return hash(self.key())
75
76     def key(self):
77         return (self.flowcell, self.lane, self.read, self.project, self.split)
78
79     def unicode(self):
80         return unicode(self.path)
81
82     def __eq__(self, other):
83         """
84         Equality is defined if everything but the path matches
85         """
86         attributes = ['filetype',
87                       'flowcell',
88                       'lane',
89                       'read',
90                       'pf',
91                       'cycle',
92                       'project',
93                       'index',
94                       'split']
95         for a in attributes:
96             if getattr(self, a) != getattr(other, a):
97                 return False
98
99         return True
100
101     def __repr__(self):
102         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
103
104     def make_target_name(self, root):
105         """
106         Create target name for where we need to link this sequence too
107         """
108         path, basename = os.path.split(self.path)
109         # Because the names aren't unque we include the flowcel name
110         # because there were different eland files for different length
111         # analyses, we include the cycle length in the name.
112         if self.filetype == 'eland':
113             template = "%(flowcell)s_%(cycle)s_%(eland)s"
114             basename = template % { 'flowcell': self.flowcell,
115                                     'cycle': self.cycle,
116                                     'eland': basename }
117         # else:
118         # all the other file types have names that include flowcell/lane
119         # information and thus are unique so we don't have to do anything
120         return os.path.join(root, basename)
121
122     def save(self, cursor):
123         """
124         Add this entry to a DB2.0 database.
125         """
126         #FIXME: NEEDS SQL ESCAPING
127         header_macro = {'table': SEQUENCE_TABLE_NAME }
128         sql_header = "insert into %(table)s (" % header_macro
129         sql_columns = ['filetype','path','flowcell','lane']
130         sql_middle = ") values ("
131         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
132         sql_footer = ");"
133         for name in ['read', 'pf', 'cycle']:
134             value = getattr(self, name)
135             if value is not None:
136                 sql_columns.append(name)
137                 sql_values.append(value)
138
139         sql = " ".join([sql_header,
140                         ", ".join(sql_columns),
141                         sql_middle,
142                         # note the following makes a string like ?,?,?
143                         ",".join(["?"] * len(sql_values)),
144                         sql_footer])
145
146         return cursor.execute(sql, sql_values)
147
148 def get_flowcell_cycle(path):
149     """
150     Extract flowcell, cycle from pathname
151     """
152     path = os.path.normpath(path)
153     project = None
154     rest, tail = os.path.split(path)
155     if tail.startswith('Project_'):
156         # we're in a multiplexed sample
157         project = tail
158         rest, cycle = os.path.split(rest)
159     else:
160         cycle = tail
161
162     rest, flowcell = os.path.split(rest)
163     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
164     if cycle_match is None:
165         raise ValueError(
166             "Expected .../flowcell/cycle/ directory structure in %s" % \
167             (path,))
168     start = cycle_match.group('start')
169     if start is not None:
170         start = int(start)
171     stop = cycle_match.group('stop')
172     if stop is not None:
173         stop = int(stop)
174
175     return FlowcellPath(flowcell, start, stop, project)
176
177 def parse_srf(path, filename):
178     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
179     basename, ext = os.path.splitext(filename)
180     records = basename.split('_')
181     flowcell = records[4]
182     lane = int(records[5][0])
183     fullpath = os.path.join(path, filename)
184
185     if flowcell_dir != flowcell:
186         LOGGER.warn("flowcell %s found in wrong directory %s" % \
187                          (flowcell, path))
188
189     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
190
191 def parse_qseq(path, filename):
192     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
193     basename, ext = os.path.splitext(filename)
194     records = basename.split('_')
195     fullpath = os.path.join(path, filename)
196     flowcell = records[4]
197     lane = int(records[5][1])
198     read = int(records[6][1])
199
200     if flowcell_dir != flowcell:
201         LOGGER.warn("flowcell %s found in wrong directory %s" % \
202                          (flowcell, path))
203
204     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
205
206 def parse_fastq(path, filename):
207     """Parse fastq names
208     """
209     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
210     basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
211     records = basename.split('_')
212     fullpath = os.path.join(path, filename)
213     if project is not None:
214         # demultiplexed sample!
215         flowcell = flowcell_dir
216         lane = int(records[2][-1])
217         read = int(records[3][-1])
218         pf = True # as I understand it hiseq runs toss the ones that fail filter
219         index = records[1]
220         project_id = records[0]
221         split = records[4]
222         sequence_type = 'split_fastq'
223     else:
224         flowcell = records[4]
225         lane = int(records[5][1])
226         read = int(records[6][1])
227         pf = parse_fastq_pf_flag(records)
228         index = None
229         project_id = None
230         split = None
231         sequence_type = 'fastq'
232
233     if flowcell_dir != flowcell:
234         LOGGER.warn("flowcell %s found in wrong directory %s" % \
235                          (flowcell, path))
236
237     return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
238                         pf=pf,
239                         cycle=stop,
240                         project=project_id,
241                         index=index,
242                         split=split)
243
244 def parse_fastq_pf_flag(records):
245     """Take a fastq filename split on _ and look for the pass-filter flag
246     """
247     if len(records) < 8:
248         pf = None
249     else:
250         fastq_type = records[-1].lower()
251         if fastq_type.startswith('pass'):
252             pf = True
253         elif fastq_type.startswith('nopass'):
254             pf = False
255         elif fastq_type.startswith('all'):
256             pf = None
257         else:
258             raise ValueError("Unrecognized fastq name %s at %s" % \
259                              (records[-1], os.path.join(path,filename)))
260
261     return pf
262
263 def parse_eland(path, filename, eland_match=None):
264     if eland_match is None:
265         eland_match = eland_re.match(filename)
266     fullpath = os.path.join(path, filename)
267     flowcell, start, stop, project = get_flowcell_cycle(path)
268     if eland_match.group('lane'):
269         lane = int(eland_match.group('lane'))
270     else:
271         lane = None
272     if eland_match.group('read'):
273         read = int(eland_match.group('read'))
274     else:
275         read = None
276     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
277
278 def scan_for_sequences(dirs):
279     """
280     Scan through a list of directories for sequence like files
281     """
282     sequences = []
283     if type(dirs) in types.StringTypes:
284         raise ValueError("You probably want a list or set, not a string")
285
286     for d in dirs:
287         LOGGER.info("Scanning %s for sequences" % (d,))
288         if not os.path.exists(d):
289             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
290             continue
291
292         for path, dirname, filenames in os.walk(d):
293             for f in filenames:
294                 seq = None
295                 # find sequence files
296                 if f.endswith('.md5'):
297                     continue
298                 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
299                     seq = parse_srf(path, f)
300                 elif qseq_re.match(f):
301                     seq = parse_qseq(path, f)
302                 elif f.endswith('.fastq') or \
303                      f.endswith('.fastq.bz2') or \
304                      f.endswith('.fastq.gz'):
305                     seq = parse_fastq(path, f)
306                 eland_match = eland_re.match(f)
307                 if eland_match:
308                     if f.endswith('.md5'):
309                         continue
310                     seq = parse_eland(path, f, eland_match)
311                 if seq:
312                     sequences.append(seq)
313                     LOGGER.debug("Found sequence at %s" % (f,))
314
315     return sequences