84bafbfb4af38d95aa68e25083650747d0c73df4
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import logging
5 import os
6 import re
7
8 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
9 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
10 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
11
12 SEQUENCE_TABLE_NAME = "sequences"
13 def create_sequence_table(cursor):
14     """
15     Create a SQL table to hold  SequenceFile entries
16     """
17     sql = """
18 CREATE TABLE %(table)s (
19   filetype   CHAR(8),
20   path       TEXT,
21   flowcell   CHAR(8),
22   lane       INTEGER,
23   read       INTEGER,
24   pf         BOOLEAN,
25   cycle      CHAR(8)
26 );
27 """ %( {'table': SEQUENCE_TABLE_NAME} )
28     return cursor.execute(sql)
29
30 class SequenceFile(object):
31     """
32     Simple container class that holds the path to a sequence archive
33     and basic descriptive information.     
34     """
35     def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
36         self.filetype = filetype
37         self.path = path
38         self.flowcell = flowcell
39         self.lane = lane
40         self.read = read
41         self.pf = pf
42         self.cycle = cycle
43
44     def __hash__(self):
45         return hash(self.key())
46
47     def key(self):
48         return (self.flowcell, self.lane)
49
50     def unicode(self):
51         return unicode(self.path)
52
53     def __eq__(self, other):
54         """
55         Equality is defined if everything but the path matches
56         """
57         attributes = ['filetype','flowcell', 'lane', 'read', 'pf', 'cycle']
58         for a in attributes:
59             if getattr(self, a) != getattr(other, a):
60                 return False
61
62         return True
63
64     def __repr__(self):
65         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
66
67     def make_target_name(self, root):
68         """
69         Create target name for where we need to link this sequence too
70         """
71         path, basename = os.path.split(self.path)
72         # Because the names aren't unque we include the flowcel name
73         # because there were different eland files for different length
74         # analyses, we include the cycle length in the name.
75         if self.filetype == 'eland':
76             template = "%(flowcell)s_%(cycle)s_%(eland)s"
77             basename = template % { 'flowcell': self.flowcell,
78                                     'cycle': self.cycle,
79                                     'eland': basename }
80         # else:
81         # all the other file types have names that include flowcell/lane
82         # information and thus are unique so we don't have to do anything
83         return os.path.join(root, basename)
84
85     def save(self, cursor):
86         """
87         Add this entry to a DB2.0 database.
88         """
89         #FIXME: NEEDS SQL ESCAPING
90         header_macro = {'table': SEQUENCE_TABLE_NAME } 
91         sql_header = "insert into %(table)s (" % header_macro
92         sql_columns = ['filetype','path','flowcell','lane']
93         sql_middle = ") values ("
94         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
95         sql_footer = ");"
96         for name in ['read', 'pf', 'cycle']:
97             value = getattr(self, name)
98             if value is not None:
99                 sql_columns.append(name)
100                 sql_values.append(value)
101
102         sql = " ".join([sql_header,
103                         ", ".join(sql_columns),
104                         sql_middle,
105                         # note the following makes a string like ?,?,?
106                         ",".join(["?"] * len(sql_values)),
107                         sql_footer])
108
109         return cursor.execute(sql, sql_values)
110
111 def get_flowcell_cycle(path):
112     """
113     Extract flowcell, cycle from pathname
114     """
115     rest, cycle = os.path.split(path)
116     rest, flowcell = os.path.split(rest)
117     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
118     if cycle_match is None:
119         raise ValueError(
120             "Expected .../flowcell/cycle/ directory structure in %s" % \
121             (path,))
122     start = cycle_match.group('start')
123     if start is not None:
124         start = int(start)
125     stop = cycle_match.group('stop')
126     if stop is not None:
127         stop = int(stop)
128     
129     return flowcell, start, stop
130         
131 def parse_srf(path, filename):
132     flowcell_dir, start, stop = get_flowcell_cycle(path)
133     basename, ext = os.path.splitext(filename)
134     records = basename.split('_')
135     flowcell = records[4]
136     lane = int(records[5][0])
137     fullpath = os.path.join(path, filename)
138
139     if flowcell_dir != flowcell:
140         logging.warn("flowcell %s found in wrong directory %s" % \
141                          (flowcell, path))
142
143     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
144
145 def parse_qseq(path, filename):
146     flowcell_dir, start, stop = get_flowcell_cycle(path)
147     basename, ext = os.path.splitext(filename)
148     records = basename.split('_')
149     fullpath = os.path.join(path, filename)
150     flowcell = records[4]
151     lane = int(records[5][1])
152     read = int(records[6][1])
153
154     if flowcell_dir != flowcell:
155         logging.warn("flowcell %s found in wrong directory %s" % \
156                          (flowcell, path))
157
158     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
159
160 def parse_fastq(path, filename):
161     """Parse fastq names
162     """
163     flowcell_dir, start, stop = get_flowcell_cycle(path)
164     basename, ext = os.path.splitext(filename)
165     records = basename.split('_')
166     fullpath = os.path.join(path, filename)
167     flowcell = records[4]
168     lane = int(records[5][1])
169     read = int(records[6][1])
170     pf = parse_fastq_pf_flag(records)
171     
172     if flowcell_dir != flowcell:
173         logging.warn("flowcell %s found in wrong directory %s" % \
174                          (flowcell, path))
175
176     return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf, cycle=stop)
177
178 def parse_fastq_pf_flag(records):
179     """Take a fastq filename split on _ and look for the pass-filter flag
180     """
181     if len(records) < 8:
182         pf = None
183     else:
184         fastq_type = records[-1].lower()
185         if fastq_type.startswith('pass'):
186             pf = True
187         elif fastq_type.startswith('nopass'):
188             pf = False
189         elif fastq_type.startswith('all'):
190             pf = None
191         else:
192             raise ValueError("Unrecognized fastq name %s at %s" % \
193                              (records[-1], os.path.join(path,filename)))
194
195     return pf
196     
197 def parse_eland(path, filename, eland_match=None):
198     if eland_match is None:
199         eland_match = eland_re.match(filename)
200     fullpath = os.path.join(path, filename)
201     flowcell, start, stop = get_flowcell_cycle(path)
202     if eland_match.group('lane'):
203         lane = int(eland_match.group('lane'))
204     else:
205         lane = None
206     if eland_match.group('read'):
207         read = int(eland_match.group('read'))
208     else:
209         read = None
210     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
211     
212 def scan_for_sequences(dirs):
213     """
214     Scan through a list of directories for sequence like files
215     """
216     sequences = []
217     for d in dirs:
218         logging.info("Scanning %s for sequences" % (d,))
219         for path, dirname, filenames in os.walk(d):
220             for f in filenames:
221                 seq = None
222                 # find sequence files
223                 if raw_seq_re.match(f):
224                     if f.endswith('.md5'):
225                         continue
226                     elif f.endswith('.srf') or f.endswith('.srf.bz2'):
227                         seq = parse_srf(path, f)
228                     elif qseq_re.match(f):
229                         seq = parse_qseq(path, f)
230                     elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
231                         seq = parse_fastq(path, f)
232                 eland_match = eland_re.match(f)
233                 if eland_match:
234                     if f.endswith('.md5'):
235                         continue
236                     seq = parse_eland(path, f, eland_match)
237                 if seq:
238                     sequences.append(seq)
239                     logging.debug("Found sequence at %s" % (f,))
240                     
241     return sequences