Use named tuple for sequence flowcell/start/stop/project dir extractor
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import collections
5 import logging
6 import os
7 import types
8 import re
9
10 LOGGER = logging.getLogger(__name__)
11
12 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
13 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
14 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
15
16 SEQUENCE_TABLE_NAME = "sequences"
17 def create_sequence_table(cursor):
18     """
19     Create a SQL table to hold  SequenceFile entries
20     """
21     sql = """
22 CREATE TABLE %(table)s (
23   filetype   CHAR(8),
24   path       TEXT,
25   flowcell   CHAR(8),
26   lane       INTEGER,
27   read       INTEGER,
28   pf         BOOLEAN,
29   cycle      CHAR(8)
30 );
31 """ %( {'table': SEQUENCE_TABLE_NAME} )
32     return cursor.execute(sql)
33
34 FlowcellPath = collections.namedtuple('FlowcellPath',
35                                       'flowcell start stop project')
36
37 class SequenceFile(object):
38     """
39     Simple container class that holds the path to a sequence archive
40     and basic descriptive information.
41     """
42     def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None,
43                  project=None,
44                  index=None):
45         self.filetype = filetype
46         self.path = path
47         self.flowcell = flowcell
48         self.lane = lane
49         self.read = read
50         self.pf = pf
51         self.cycle = cycle
52         self.project = project
53         self.index = index
54
55     def __hash__(self):
56         return hash(self.key())
57
58     def key(self):
59         return (self.flowcell, self.lane)
60
61     def unicode(self):
62         return unicode(self.path)
63
64     def __eq__(self, other):
65         """
66         Equality is defined if everything but the path matches
67         """
68         attributes = ['filetype','flowcell', 'lane', 'read', 'pf', 'cycle', 'project', 'index']
69         for a in attributes:
70             if getattr(self, a) != getattr(other, a):
71                 return False
72
73         return True
74
75     def __repr__(self):
76         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
77
78     def make_target_name(self, root):
79         """
80         Create target name for where we need to link this sequence too
81         """
82         path, basename = os.path.split(self.path)
83         # Because the names aren't unque we include the flowcel name
84         # because there were different eland files for different length
85         # analyses, we include the cycle length in the name.
86         if self.filetype == 'eland':
87             template = "%(flowcell)s_%(cycle)s_%(eland)s"
88             basename = template % { 'flowcell': self.flowcell,
89                                     'cycle': self.cycle,
90                                     'eland': basename }
91         # else:
92         # all the other file types have names that include flowcell/lane
93         # information and thus are unique so we don't have to do anything
94         return os.path.join(root, basename)
95
96     def save(self, cursor):
97         """
98         Add this entry to a DB2.0 database.
99         """
100         #FIXME: NEEDS SQL ESCAPING
101         header_macro = {'table': SEQUENCE_TABLE_NAME }
102         sql_header = "insert into %(table)s (" % header_macro
103         sql_columns = ['filetype','path','flowcell','lane']
104         sql_middle = ") values ("
105         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
106         sql_footer = ");"
107         for name in ['read', 'pf', 'cycle']:
108             value = getattr(self, name)
109             if value is not None:
110                 sql_columns.append(name)
111                 sql_values.append(value)
112
113         sql = " ".join([sql_header,
114                         ", ".join(sql_columns),
115                         sql_middle,
116                         # note the following makes a string like ?,?,?
117                         ",".join(["?"] * len(sql_values)),
118                         sql_footer])
119
120         return cursor.execute(sql, sql_values)
121
122 def get_flowcell_cycle(path):
123     """
124     Extract flowcell, cycle from pathname
125     """
126     path = os.path.normpath(path)
127     project = None
128     rest, tail = os.path.split(path)
129     if tail.startswith('Project_'):
130         # we're in a multiplexed sample
131         project = tail
132         rest, cycle = os.path.split(rest)
133     else:
134         cycle = tail
135
136     rest, flowcell = os.path.split(rest)
137     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
138     if cycle_match is None:
139         raise ValueError(
140             "Expected .../flowcell/cycle/ directory structure in %s" % \
141             (path,))
142     start = cycle_match.group('start')
143     if start is not None:
144         start = int(start)
145     stop = cycle_match.group('stop')
146     if stop is not None:
147         stop = int(stop)
148
149     return FlowcellPath(flowcell, start, stop, project)
150
151 def parse_srf(path, filename):
152     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
153     basename, ext = os.path.splitext(filename)
154     records = basename.split('_')
155     flowcell = records[4]
156     lane = int(records[5][0])
157     fullpath = os.path.join(path, filename)
158
159     if flowcell_dir != flowcell:
160         LOGGER.warn("flowcell %s found in wrong directory %s" % \
161                          (flowcell, path))
162
163     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
164
165 def parse_qseq(path, filename):
166     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
167     basename, ext = os.path.splitext(filename)
168     records = basename.split('_')
169     fullpath = os.path.join(path, filename)
170     flowcell = records[4]
171     lane = int(records[5][1])
172     read = int(records[6][1])
173
174     if flowcell_dir != flowcell:
175         LOGGER.warn("flowcell %s found in wrong directory %s" % \
176                          (flowcell, path))
177
178     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
179
180 def parse_fastq(path, filename):
181     """Parse fastq names
182     """
183     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
184     basename, ext = os.path.splitext(filename)
185     records = basename.split('_')
186     fullpath = os.path.join(path, filename)
187     if project is not None:
188         # demultiplexed sample!
189         flowcell = flowcell_dir
190         lane = int(records[2][-1])
191         read = int(records[3][-1])
192         pf = True # as I understand it hiseq runs toss the ones that fail filter
193         index = records[1]
194         project_id = records[0]
195     else:
196         flowcell = records[4]
197         lane = int(records[5][1])
198         read = int(records[6][1])
199         pf = parse_fastq_pf_flag(records)
200         index = None
201         project_id = None
202
203     if flowcell_dir != flowcell:
204         LOGGER.warn("flowcell %s found in wrong directory %s" % \
205                          (flowcell, path))
206
207     return SequenceFile('fastq', fullpath, flowcell, lane, read,
208                         pf=pf,
209                         cycle=stop,
210                         project=project_id,
211                         index=index)
212
213 def parse_fastq_pf_flag(records):
214     """Take a fastq filename split on _ and look for the pass-filter flag
215     """
216     if len(records) < 8:
217         pf = None
218     else:
219         fastq_type = records[-1].lower()
220         if fastq_type.startswith('pass'):
221             pf = True
222         elif fastq_type.startswith('nopass'):
223             pf = False
224         elif fastq_type.startswith('all'):
225             pf = None
226         else:
227             raise ValueError("Unrecognized fastq name %s at %s" % \
228                              (records[-1], os.path.join(path,filename)))
229
230     return pf
231
232 def parse_eland(path, filename, eland_match=None):
233     if eland_match is None:
234         eland_match = eland_re.match(filename)
235     fullpath = os.path.join(path, filename)
236     flowcell, start, stop, project = get_flowcell_cycle(path)
237     if eland_match.group('lane'):
238         lane = int(eland_match.group('lane'))
239     else:
240         lane = None
241     if eland_match.group('read'):
242         read = int(eland_match.group('read'))
243     else:
244         read = None
245     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
246
247 def scan_for_sequences(dirs):
248     """
249     Scan through a list of directories for sequence like files
250     """
251     sequences = []
252     if type(dirs) in types.StringTypes:
253         raise ValueError("You probably want a list or set, not a string")
254
255     for d in dirs:
256         LOGGER.info("Scanning %s for sequences" % (d,))
257         if not os.path.exists(d):
258             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
259             continue
260
261         for path, dirname, filenames in os.walk(d):
262             for f in filenames:
263                 seq = None
264                 # find sequence files
265                 if raw_seq_re.match(f):
266                     if f.endswith('.md5'):
267                         continue
268                     elif f.endswith('.srf') or f.endswith('.srf.bz2'):
269                         seq = parse_srf(path, f)
270                     elif qseq_re.match(f):
271                         seq = parse_qseq(path, f)
272                     elif f.endswith('.fastq') or \
273                          f.endswith('.fastq.bz2') or \
274                          f.endswith('.fastq.gz'):
275                         seq = parse_fastq(path, f)
276                 eland_match = eland_re.match(f)
277                 if eland_match:
278                     if f.endswith('.md5'):
279                         continue
280                     seq = parse_eland(path, f, eland_match)
281                 if seq:
282                     sequences.append(seq)
283                     LOGGER.debug("Found sequence at %s" % (f,))
284
285     return sequences