993bcc94979cf5c3c6fe12eaa3c4b4b062a28581
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import logging
5 import os
6 import re
7
8 LOGGER = logging.getLogger(__name__)
9
10 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
11 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
12 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
13
14 SEQUENCE_TABLE_NAME = "sequences"
15 def create_sequence_table(cursor):
16     """
17     Create a SQL table to hold  SequenceFile entries
18     """
19     sql = """
20 CREATE TABLE %(table)s (
21   filetype   CHAR(8),
22   path       TEXT,
23   flowcell   CHAR(8),
24   lane       INTEGER,
25   read       INTEGER,
26   pf         BOOLEAN,
27   cycle      CHAR(8)
28 );
29 """ %( {'table': SEQUENCE_TABLE_NAME} )
30     return cursor.execute(sql)
31
32 class SequenceFile(object):
33     """
34     Simple container class that holds the path to a sequence archive
35     and basic descriptive information.
36     """
37     def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
38         self.filetype = filetype
39         self.path = path
40         self.flowcell = flowcell
41         self.lane = lane
42         self.read = read
43         self.pf = pf
44         self.cycle = cycle
45
46     def __hash__(self):
47         return hash(self.key())
48
49     def key(self):
50         return (self.flowcell, self.lane)
51
52     def unicode(self):
53         return unicode(self.path)
54
55     def __eq__(self, other):
56         """
57         Equality is defined if everything but the path matches
58         """
59         attributes = ['filetype','flowcell', 'lane', 'read', 'pf', 'cycle']
60         for a in attributes:
61             if getattr(self, a) != getattr(other, a):
62                 return False
63
64         return True
65
66     def __repr__(self):
67         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
68
69     def make_target_name(self, root):
70         """
71         Create target name for where we need to link this sequence too
72         """
73         path, basename = os.path.split(self.path)
74         # Because the names aren't unque we include the flowcel name
75         # because there were different eland files for different length
76         # analyses, we include the cycle length in the name.
77         if self.filetype == 'eland':
78             template = "%(flowcell)s_%(cycle)s_%(eland)s"
79             basename = template % { 'flowcell': self.flowcell,
80                                     'cycle': self.cycle,
81                                     'eland': basename }
82         # else:
83         # all the other file types have names that include flowcell/lane
84         # information and thus are unique so we don't have to do anything
85         return os.path.join(root, basename)
86
87     def save(self, cursor):
88         """
89         Add this entry to a DB2.0 database.
90         """
91         #FIXME: NEEDS SQL ESCAPING
92         header_macro = {'table': SEQUENCE_TABLE_NAME }
93         sql_header = "insert into %(table)s (" % header_macro
94         sql_columns = ['filetype','path','flowcell','lane']
95         sql_middle = ") values ("
96         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
97         sql_footer = ");"
98         for name in ['read', 'pf', 'cycle']:
99             value = getattr(self, name)
100             if value is not None:
101                 sql_columns.append(name)
102                 sql_values.append(value)
103
104         sql = " ".join([sql_header,
105                         ", ".join(sql_columns),
106                         sql_middle,
107                         # note the following makes a string like ?,?,?
108                         ",".join(["?"] * len(sql_values)),
109                         sql_footer])
110
111         return cursor.execute(sql, sql_values)
112
113 def get_flowcell_cycle(path):
114     """
115     Extract flowcell, cycle from pathname
116     """
117     rest, cycle = os.path.split(path)
118     rest, flowcell = os.path.split(rest)
119     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
120     if cycle_match is None:
121         raise ValueError(
122             "Expected .../flowcell/cycle/ directory structure in %s" % \
123             (path,))
124     start = cycle_match.group('start')
125     if start is not None:
126         start = int(start)
127     stop = cycle_match.group('stop')
128     if stop is not None:
129         stop = int(stop)
130
131     return flowcell, start, stop
132
133 def parse_srf(path, filename):
134     flowcell_dir, start, stop = get_flowcell_cycle(path)
135     basename, ext = os.path.splitext(filename)
136     records = basename.split('_')
137     flowcell = records[4]
138     lane = int(records[5][0])
139     fullpath = os.path.join(path, filename)
140
141     if flowcell_dir != flowcell:
142         LOGGER.warn("flowcell %s found in wrong directory %s" % \
143                          (flowcell, path))
144
145     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
146
147 def parse_qseq(path, filename):
148     flowcell_dir, start, stop = get_flowcell_cycle(path)
149     basename, ext = os.path.splitext(filename)
150     records = basename.split('_')
151     fullpath = os.path.join(path, filename)
152     flowcell = records[4]
153     lane = int(records[5][1])
154     read = int(records[6][1])
155
156     if flowcell_dir != flowcell:
157         LOGGER.warn("flowcell %s found in wrong directory %s" % \
158                          (flowcell, path))
159
160     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
161
162 def parse_fastq(path, filename):
163     """Parse fastq names
164     """
165     flowcell_dir, start, stop = get_flowcell_cycle(path)
166     basename, ext = os.path.splitext(filename)
167     records = basename.split('_')
168     fullpath = os.path.join(path, filename)
169     flowcell = records[4]
170     lane = int(records[5][1])
171     read = int(records[6][1])
172     pf = parse_fastq_pf_flag(records)
173
174     if flowcell_dir != flowcell:
175         LOGGER.warn("flowcell %s found in wrong directory %s" % \
176                          (flowcell, path))
177
178     return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf, cycle=stop)
179
180 def parse_fastq_pf_flag(records):
181     """Take a fastq filename split on _ and look for the pass-filter flag
182     """
183     if len(records) < 8:
184         pf = None
185     else:
186         fastq_type = records[-1].lower()
187         if fastq_type.startswith('pass'):
188             pf = True
189         elif fastq_type.startswith('nopass'):
190             pf = False
191         elif fastq_type.startswith('all'):
192             pf = None
193         else:
194             raise ValueError("Unrecognized fastq name %s at %s" % \
195                              (records[-1], os.path.join(path,filename)))
196
197     return pf
198
199 def parse_eland(path, filename, eland_match=None):
200     if eland_match is None:
201         eland_match = eland_re.match(filename)
202     fullpath = os.path.join(path, filename)
203     flowcell, start, stop = get_flowcell_cycle(path)
204     if eland_match.group('lane'):
205         lane = int(eland_match.group('lane'))
206     else:
207         lane = None
208     if eland_match.group('read'):
209         read = int(eland_match.group('read'))
210     else:
211         read = None
212     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
213
214 def scan_for_sequences(dirs):
215     """
216     Scan through a list of directories for sequence like files
217     """
218     sequences = []
219     for d in dirs:
220         LOGGER.info("Scanning %s for sequences" % (d,))
221         if not os.path.exists(d):
222             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
223             continue
224
225         for path, dirname, filenames in os.walk(d):
226             for f in filenames:
227                 seq = None
228                 # find sequence files
229                 if raw_seq_re.match(f):
230                     if f.endswith('.md5'):
231                         continue
232                     elif f.endswith('.srf') or f.endswith('.srf.bz2'):
233                         seq = parse_srf(path, f)
234                     elif qseq_re.match(f):
235                         seq = parse_qseq(path, f)
236                     elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
237                         seq = parse_fastq(path, f)
238                 eland_match = eland_re.match(f)
239                 if eland_match:
240                     if f.endswith('.md5'):
241                         continue
242                     seq = parse_eland(path, f, eland_match)
243                 if seq:
244                     sequences.append(seq)
245                     LOGGER.debug("Found sequence at %s" % (f,))
246
247     return sequences