Test more of the sequences class.
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import collections
5 import logging
6 import os
7 import types
8 import re
9
10 LOGGER = logging.getLogger(__name__)
11
12 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
13 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
14 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
15
16 SEQUENCE_TABLE_NAME = "sequences"
17 def create_sequence_table(cursor):
18     """
19     Create a SQL table to hold  SequenceFile entries
20     """
21     sql = """
22 CREATE TABLE %(table)s (
23   filetype   CHAR(8),
24   path       TEXT,
25   flowcell   CHAR(8),
26   lane       INTEGER,
27   read       INTEGER,
28   pf         BOOLEAN,
29   cycle      CHAR(8)
30 );
31 """ %( {'table': SEQUENCE_TABLE_NAME} )
32     return cursor.execute(sql)
33
34 FlowcellPath = collections.namedtuple('FlowcellPath',
35                                       'flowcell start stop project')
36
37 class SequenceFile(object):
38     """
39     Simple container class that holds the path to a sequence archive
40     and basic descriptive information.
41     """
42     def __init__(self, filetype, path, flowcell, lane,
43                  read=None,
44                  pf=None,
45                  cycle=None,
46                  project=None,
47                  index=None,
48                  split=None):
49         """Store various fields used in our sequence files
50
51         filetype is one of 'qseq', 'srf', 'fastq'
52         path = location of file
53         flowcell = files flowcell id
54         lane = which lane
55         read = which sequencer read, usually 1 or 2
56         pf = did it pass filter
57         cycle = cycle dir name e.g. C1-202
58         project = projed name from HiSeq, probably library ID
59         index = HiSeq barcode index sequence
60         split = file fragment from HiSeq (Since one file is split into many)
61         """
62         self.filetype = filetype
63         self.path = path
64         self.flowcell = flowcell
65         self.lane = lane
66         self.read = read
67         self.pf = pf
68         self.cycle = cycle
69         self.project = project
70         self.index = index
71         self.split = split
72
73     def __hash__(self):
74         return hash(self.key())
75
76     def key(self):
77         return (self.flowcell, self.lane, self.read, self.project, self.split)
78
79     def __unicode__(self):
80         return unicode(self.path)
81
82     def __eq__(self, other):
83         """
84         Equality is defined if everything but the path matches
85         """
86         attributes = ['filetype',
87                       'flowcell',
88                       'lane',
89                       'read',
90                       'pf',
91                       'cycle',
92                       'project',
93                       'index',
94                       'split']
95         for a in attributes:
96             if getattr(self, a) != getattr(other, a):
97                 return False
98
99         return True
100
101     def __ne__(self, other):
102         return not self == other
103
104     def __repr__(self):
105         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
106
107     def make_target_name(self, root):
108         """
109         Create target name for where we need to link this sequence too
110         """
111         path, basename = os.path.split(self.path)
112         # Because the names aren't unque we include the flowcel name
113         # because there were different eland files for different length
114         # analyses, we include the cycle length in the name.
115         if self.filetype == 'eland':
116             template = "%(flowcell)s_%(cycle)s_%(eland)s"
117             basename = template % { 'flowcell': self.flowcell,
118                                     'cycle': self.cycle,
119                                     'eland': basename }
120         # else:
121         # all the other file types have names that include flowcell/lane
122         # information and thus are unique so we don't have to do anything
123         return os.path.join(root, basename)
124
125     def save(self, cursor):
126         """
127         Add this entry to a DB2.0 database.
128         """
129         #FIXME: NEEDS SQL ESCAPING
130         header_macro = {'table': SEQUENCE_TABLE_NAME }
131         sql_header = "insert into %(table)s (" % header_macro
132         sql_columns = ['filetype','path','flowcell','lane']
133         sql_middle = ") values ("
134         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
135         sql_footer = ");"
136         for name in ['read', 'pf', 'cycle']:
137             value = getattr(self, name)
138             if value is not None:
139                 sql_columns.append(name)
140                 sql_values.append(value)
141
142         sql = " ".join([sql_header,
143                         ", ".join(sql_columns),
144                         sql_middle,
145                         # note the following makes a string like ?,?,?
146                         ",".join(["?"] * len(sql_values)),
147                         sql_footer])
148
149         return cursor.execute(sql, sql_values)
150
151 def get_flowcell_cycle(path):
152     """
153     Extract flowcell, cycle from pathname
154     """
155     path = os.path.normpath(path)
156     project = None
157     rest, tail = os.path.split(path)
158     if tail.startswith('Project_'):
159         # we're in a multiplexed sample
160         project = tail
161         rest, cycle = os.path.split(rest)
162     else:
163         cycle = tail
164
165     rest, flowcell = os.path.split(rest)
166     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
167     if cycle_match is None:
168         raise ValueError(
169             "Expected .../flowcell/cycle/ directory structure in %s" % \
170             (path,))
171     start = cycle_match.group('start')
172     if start is not None:
173         start = int(start)
174     stop = cycle_match.group('stop')
175     if stop is not None:
176         stop = int(stop)
177
178     return FlowcellPath(flowcell, start, stop, project)
179
180 def parse_srf(path, filename):
181     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
182     basename, ext = os.path.splitext(filename)
183     records = basename.split('_')
184     flowcell = records[4]
185     lane = int(records[5][0])
186     fullpath = os.path.join(path, filename)
187
188     if flowcell_dir != flowcell:
189         LOGGER.warn("flowcell %s found in wrong directory %s" % \
190                          (flowcell, path))
191
192     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
193
194 def parse_qseq(path, filename):
195     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
196     basename, ext = os.path.splitext(filename)
197     records = basename.split('_')
198     fullpath = os.path.join(path, filename)
199     flowcell = records[4]
200     lane = int(records[5][1])
201     read = int(records[6][1])
202
203     if flowcell_dir != flowcell:
204         LOGGER.warn("flowcell %s found in wrong directory %s" % \
205                          (flowcell, path))
206
207     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
208
209 def parse_fastq(path, filename):
210     """Parse fastq names
211     """
212     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
213     basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
214     records = basename.split('_')
215     fullpath = os.path.join(path, filename)
216     if project is not None:
217         # demultiplexed sample!
218         flowcell = flowcell_dir
219         lane = int(records[2][-1])
220         read = int(records[3][-1])
221         pf = True # as I understand it hiseq runs toss the ones that fail filter
222         index = records[1]
223         project_id = records[0]
224         split = records[4]
225         sequence_type = 'split_fastq'
226     else:
227         flowcell = records[4]
228         lane = int(records[5][1])
229         read = int(records[6][1])
230         pf = parse_fastq_pf_flag(records)
231         index = None
232         project_id = None
233         split = None
234         sequence_type = 'fastq'
235
236     if flowcell_dir != flowcell:
237         LOGGER.warn("flowcell %s found in wrong directory %s" % \
238                          (flowcell, path))
239
240     return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
241                         pf=pf,
242                         cycle=stop,
243                         project=project_id,
244                         index=index,
245                         split=split)
246
247 def parse_fastq_pf_flag(records):
248     """Take a fastq filename split on _ and look for the pass-filter flag
249     """
250     if len(records) < 8:
251         pf = None
252     else:
253         fastq_type = records[-1].lower()
254         if fastq_type.startswith('pass'):
255             pf = True
256         elif fastq_type.startswith('nopass'):
257             pf = False
258         elif fastq_type.startswith('all'):
259             pf = None
260         else:
261             raise ValueError("Unrecognized fastq name: %s" % (
262                 "_".join(records),))
263
264     return pf
265
266 def parse_eland(path, filename, eland_match=None):
267     if eland_match is None:
268         eland_match = eland_re.match(filename)
269     fullpath = os.path.join(path, filename)
270     flowcell, start, stop, project = get_flowcell_cycle(path)
271     if eland_match.group('lane'):
272         lane = int(eland_match.group('lane'))
273     else:
274         lane = None
275     if eland_match.group('read'):
276         read = int(eland_match.group('read'))
277     else:
278         read = None
279     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
280
281 def scan_for_sequences(dirs):
282     """
283     Scan through a list of directories for sequence like files
284     """
285     sequences = []
286     if type(dirs) in types.StringTypes:
287         raise ValueError("You probably want a list or set, not a string")
288
289     for d in dirs:
290         LOGGER.info("Scanning %s for sequences" % (d,))
291         if not os.path.exists(d):
292             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
293             continue
294
295         for path, dirname, filenames in os.walk(d):
296             for f in filenames:
297                 seq = None
298                 # find sequence files
299                 if f.endswith('.md5'):
300                     continue
301                 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
302                     seq = parse_srf(path, f)
303                 elif qseq_re.match(f):
304                     seq = parse_qseq(path, f)
305                 elif f.endswith('.fastq') or \
306                      f.endswith('.fastq.bz2') or \
307                      f.endswith('.fastq.gz'):
308                     seq = parse_fastq(path, f)
309                 eland_match = eland_re.match(f)
310                 if eland_match:
311                     if f.endswith('.md5'):
312                         continue
313                     seq = parse_eland(path, f, eland_match)
314                 if seq:
315                     sequences.append(seq)
316                     LOGGER.debug("Found sequence at %s" % (f,))
317
318     return sequences