5baf1b5022a948b1022925737bc79cd3e425aafd
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import collections
5 import logging
6 import os
7 import types
8 import re
9 import sys
10 from urlparse import urljoin, urlparse
11
12 import RDF
13 from htsworkflow.util.rdfhelp import libraryOntology as libNS
14 from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
15      stripNamespace, dump_model, simplify_uri
16
17 LOGGER = logging.getLogger(__name__)
18
19 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
20 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
21 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
22
23 SEQUENCE_TABLE_NAME = "sequences"
24 def create_sequence_table(cursor):
25     """
26     Create a SQL table to hold  SequenceFile entries
27     """
28     sql = """
29 CREATE TABLE %(table)s (
30   filetype   CHAR(8),
31   path       TEXT,
32   flowcell   CHAR(8),
33   lane       INTEGER,
34   read       INTEGER,
35   pf         BOOLEAN,
36   cycle      CHAR(8)
37 );
38 """ %( {'table': SEQUENCE_TABLE_NAME} )
39     return cursor.execute(sql)
40
41 FlowcellPath = collections.namedtuple('FlowcellPath',
42                                       'flowcell start stop project')
43
44 class SequenceFile(object):
45     """
46     Simple container class that holds the path to a sequence archive
47     and basic descriptive information.
48     """
49     def __init__(self, filetype, path, flowcell, lane,
50                  read=None,
51                  pf=None,
52                  cycle=None,
53                  project=None,
54                  index=None,
55                  split=None):
56         """Store various fields used in our sequence files
57
58         filetype is one of 'qseq', 'srf', 'fastq'
59         path = location of file
60         flowcell = files flowcell id
61         lane = which lane
62         read = which sequencer read, usually 1 or 2
63         pf = did it pass filter
64         cycle = cycle dir name e.g. C1-202
65         project = projed name from HiSeq, probably library ID
66         index = HiSeq barcode index sequence
67         split = file fragment from HiSeq (Since one file is split into many)
68         """
69         self.filetype = filetype
70         self.path = path
71         self.flowcell = flowcell
72         self.lane = lane
73         self.read = read
74         self.pf = pf
75         self.cycle = cycle
76         self.project = project
77         self.index = index
78         self.split = split
79
80     def __hash__(self):
81         return hash(self.key())
82
83     def key(self):
84         return (self.flowcell, self.lane, self.read, self.project, self.split)
85
86     def __unicode__(self):
87         return unicode(self.path)
88
89     def __eq__(self, other):
90         """
91         Equality is defined if everything but the path matches
92         """
93         attributes = ['filetype',
94                       'flowcell',
95                       'lane',
96                       'read',
97                       'pf',
98                       'cycle',
99                       'project',
100                       'index',
101                       'split']
102         for a in attributes:
103             if getattr(self, a) != getattr(other, a):
104                 return False
105
106         return True
107
108     def __ne__(self, other):
109         return not self == other
110
111     def __repr__(self):
112         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
113
114     def make_target_name(self, root):
115         """
116         Create target name for where we need to link this sequence too
117         """
118         path, basename = os.path.split(self.path)
119         # Because the names aren't unque we include the flowcel name
120         # because there were different eland files for different length
121         # analyses, we include the cycle length in the name.
122         if self.filetype == 'eland':
123             template = "%(flowcell)s_%(cycle)s_%(eland)s"
124             basename = template % { 'flowcell': self.flowcell,
125                                     'cycle': self.cycle,
126                                     'eland': basename }
127         # else:
128         # all the other file types have names that include flowcell/lane
129         # information and thus are unique so we don't have to do anything
130         return os.path.join(root, basename)
131
132     def save_to_sql(self, cursor):
133         """
134         Add this entry to a DB2.0 database.
135         """
136         #FIXME: NEEDS SQL ESCAPING
137         header_macro = {'table': SEQUENCE_TABLE_NAME }
138         sql_header = "insert into %(table)s (" % header_macro
139         sql_columns = ['filetype','path','flowcell','lane']
140         sql_middle = ") values ("
141         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
142         sql_footer = ");"
143         for name in ['read', 'pf', 'cycle']:
144             value = getattr(self, name)
145             if value is not None:
146                 sql_columns.append(name)
147                 sql_values.append(value)
148
149         sql = " ".join([sql_header,
150                         ", ".join(sql_columns),
151                         sql_middle,
152                         # note the following makes a string like ?,?,?
153                         ",".join(["?"] * len(sql_values)),
154                         sql_footer])
155
156         return cursor.execute(sql, sql_values)
157
158     def save_to_model(self, model, base_url=None):
159         def add_lit(model, s, p, o):
160             if o is not None:
161                 model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
162         def add(model, s, p, o):
163             model.add_statement(RDF.Statement(s,p,o))
164         # a bit unreliable... assumes filesystem is encoded in utf-8
165         path = os.path.abspath(self.path.encode('utf-8'))
166         fileNode = RDF.Node(RDF.Uri('file://' + path))
167         add(model, fileNode, rdfNS['type'], libNS['raw_file'])
168         add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
169         add_lit(model, fileNode, libNS['lane_number'], self.lane)
170         if self.read is not None:
171             add_lit(model, fileNode, libNS['read'], self.read)
172         else:
173             add_lit(model, fileNode, libNS['read'], '')
174
175         add_lit(model, fileNode, libNS['library_id'], self.project)
176         add_lit(model, fileNode, libNS['multiplex_index'], self.index)
177         add_lit(model, fileNode, libNS['split_id'], self.split)
178         add_lit(model, fileNode, libNS['cycle'], self.cycle)
179         add_lit(model, fileNode, libNS['passed_filter'], self.pf)
180         add(model, fileNode, rdfNS['type'], libNS[self.filetype])
181
182         if base_url is not None:
183             flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
184                 base=base_url,
185                 flowcell=self.flowcell)))
186             add(model, fileNode, libNS['flowcell'], flowcell)
187             if self.project is not None:
188                 library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
189                     base=base_url,
190                     library=self.project)))
191                 add(model, fileNode, libNS['library'], library)
192
193
194     @classmethod
195     def load_from_model(cls, model, seq_id):
196         def get(s, p):
197             values = []
198             stmts = model.find_statements(RDF.Statement(s, p, None))
199             for s in stmts:
200                 obj = s.object
201                 if not obj.is_resource():
202                     values.append(fromTypedNode(obj))
203                 else:
204                     values.append(obj)
205             return values
206         def get_one(s, p):
207             values = get(s, p)
208             if len(values) > 1:
209                 errmsg = u"To many values for %s %s"
210                 raise ValueError(errmsg % (unicode(s), unicode(p)))
211             elif len(values) == 1:
212                 return values[0]
213             else:
214                 return None
215
216         if not isinstance(seq_id, RDF.Node):
217             seq_id = RDF.Node(RDF.Uri(seq_id))
218         seqTypesStmt = RDF.Statement(seq_id, rdfNS['type'], None)
219         seqTypes = model.find_statements(seqTypesStmt)
220         isSequenceFile = False
221         for s in seqTypes:
222             if s.object == libNS['raw_file']:
223                 isSequenceFile = True
224             else:
225                 seq_type = stripNamespace(libNS, s.object)
226
227         if not isSequenceFile:
228             raise KeyError(u"%s not found" % (unicode(seq_id),))
229
230         path = urlparse(str(seq_id.uri)).path
231         flowcellNode = get_one(seq_id, libNS['flowcell'])
232         flowcell = get_one(seq_id, libNS['flowcell_id'])
233         lane = get_one(seq_id, libNS['lane_number'])
234         read = get_one(seq_id, libNS['read'])
235
236         obj = cls(seq_type, path, flowcell, lane)
237         obj.read = read if read != '' else None
238         obj.project = get_one(seq_id, libNS['library_id'])
239         obj.index = get_one(seq_id, libNS['multiplex_index'])
240         obj.split = get_one(seq_id, libNS['split_id'])
241         obj.cycle = get_one(seq_id, libNS['cycle'] )
242         obj.pf = get_one(seq_id, libNS['passed_filter'])
243         obj.libraryNode = get_one(seq_id, libNS['library'])
244         return obj
245
246
247 def get_flowcell_cycle(path):
248     """
249     Extract flowcell, cycle from pathname
250     """
251     path = os.path.normpath(path)
252     project = None
253     rest, tail = os.path.split(path)
254     if tail.startswith('Project_'):
255         # we're in a multiplexed sample
256         project = tail
257         rest, cycle = os.path.split(rest)
258     else:
259         cycle = tail
260
261     rest, flowcell = os.path.split(rest)
262     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
263     if cycle_match is None:
264         raise ValueError(
265             "Expected .../flowcell/cycle/ directory structure in %s" % \
266             (path,))
267     start = cycle_match.group('start')
268     if start is not None:
269         start = int(start)
270     stop = cycle_match.group('stop')
271     if stop is not None:
272         stop = int(stop)
273
274     return FlowcellPath(flowcell, start, stop, project)
275
276 def parse_srf(path, filename):
277     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
278     basename, ext = os.path.splitext(filename)
279     records = basename.split('_')
280     flowcell = records[4]
281     lane = int(records[5][0])
282     fullpath = os.path.join(path, filename)
283
284     if flowcell_dir != flowcell:
285         LOGGER.warn("flowcell %s found in wrong directory %s" % \
286                          (flowcell, path))
287
288     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
289
290 def parse_qseq(path, filename):
291     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
292     basename, ext = os.path.splitext(filename)
293     records = basename.split('_')
294     fullpath = os.path.join(path, filename)
295     flowcell = records[4]
296     lane = int(records[5][1])
297     read = int(records[6][1])
298
299     if flowcell_dir != flowcell:
300         LOGGER.warn("flowcell %s found in wrong directory %s" % \
301                          (flowcell, path))
302
303     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
304
305 def parse_fastq(path, filename):
306     """Parse fastq names
307     """
308     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
309     basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
310     records = basename.split('_')
311     fullpath = os.path.join(path, filename)
312     if project is not None:
313         # demultiplexed sample!
314         flowcell = flowcell_dir
315         lane = int(records[2][-1])
316         read = int(records[3][-1])
317         pf = True # as I understand it hiseq runs toss the ones that fail filter
318         index = records[1]
319         project_id = records[0]
320         split = records[4]
321         sequence_type = 'split_fastq'
322     else:
323         flowcell = records[4]
324         lane = int(records[5][1])
325         read = int(records[6][1])
326         pf = parse_fastq_pf_flag(records)
327         index = None
328         project_id = None
329         split = None
330         sequence_type = 'fastq'
331
332     if flowcell_dir != flowcell:
333         LOGGER.warn("flowcell %s found in wrong directory %s" % \
334                          (flowcell, path))
335
336     return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
337                         pf=pf,
338                         cycle=stop,
339                         project=project_id,
340                         index=index,
341                         split=split)
342
343 def parse_fastq_pf_flag(records):
344     """Take a fastq filename split on _ and look for the pass-filter flag
345     """
346     if len(records) < 8:
347         pf = None
348     else:
349         fastq_type = records[-1].lower()
350         if fastq_type.startswith('pass'):
351             pf = True
352         elif fastq_type.startswith('nopass'):
353             pf = False
354         elif fastq_type.startswith('all'):
355             pf = None
356         else:
357             raise ValueError("Unrecognized fastq name: %s" % (
358                 "_".join(records),))
359
360     return pf
361
362 def parse_eland(path, filename, eland_match=None):
363     if eland_match is None:
364         eland_match = eland_re.match(filename)
365     fullpath = os.path.join(path, filename)
366     flowcell, start, stop, project = get_flowcell_cycle(path)
367     if eland_match.group('lane'):
368         lane = int(eland_match.group('lane'))
369     else:
370         lane = None
371     if eland_match.group('read'):
372         read = int(eland_match.group('read'))
373     else:
374         read = None
375     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
376
377 def scan_for_sequences(dirs):
378     """
379     Scan through a list of directories for sequence like files
380     """
381     sequences = []
382     if type(dirs) in types.StringTypes:
383         raise ValueError("You probably want a list or set, not a string")
384
385     for d in dirs:
386         LOGGER.info("Scanning %s for sequences" % (d,))
387         if not os.path.exists(d):
388             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
389             continue
390
391         for path, dirname, filenames in os.walk(d):
392             for f in filenames:
393                 seq = None
394                 # find sequence files
395                 if f.endswith('.md5'):
396                     continue
397                 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
398                     seq = parse_srf(path, f)
399                 elif qseq_re.match(f):
400                     seq = parse_qseq(path, f)
401                 elif f.endswith('.fastq') or \
402                      f.endswith('.fastq.bz2') or \
403                      f.endswith('.fastq.gz'):
404                     seq = parse_fastq(path, f)
405                 eland_match = eland_re.match(f)
406                 if eland_match:
407                     if f.endswith('.md5'):
408                         continue
409                     seq = parse_eland(path, f, eland_match)
410                 if seq:
411                     sequences.append(seq)
412                     LOGGER.debug("Found sequence at %s" % (f,))
413
414     return sequences
415
416
417 def update_model_sequence_library(model, base_url):
418     """Find sequence objects and add library information if its missing
419     """
420     file_body = """
421     prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
422     select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
423     where {
424        ?filenode a libNS:raw_file ;
425                  libNS:flowcell_id ?flowcell_id ;
426                  libNS:lane_number ?lane_id .
427        OPTIONAL { ?filenode libNS:flowcell ?flowcell . }
428        OPTIONAL { ?filenode libNS:library ?library .}
429        OPTIONAL { ?filenode libNS:library_id ?library_id .}
430     }
431     """
432     file_query = RDF.SPARQLQuery(file_body)
433     files = file_query.execute(model)
434
435     libraryNS = RDF.NS(urljoin(base_url, 'library/'))
436     flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
437     for f in files:
438         filenode = f['filenode']
439         lane_id = fromTypedNode(f['lane_id'])
440         if f['flowcell'] is None:
441             flowcell = flowcellNS[str(f['flowcell_id'])+'/']
442             model.add_statement(
443                 RDF.Statement(filenode, libNS['flowcell'], flowcell))
444         else:
445             flowcell = f['flowcell']
446
447         if f['library'] is None:
448             if f['library_id'] is not None:
449                 library = libraryNS[str(f['library_id']) + '/']
450             else:
451                 library = guess_library_from_model(model, base_url,
452                                                    flowcell,
453                                                    lane_id)
454                 library_id = toTypedNode(simplify_uri(library))
455                 model.add_statement(
456                     RDF.Statement(filenode, libNS['library_id'], library_id))
457             if library is not None:
458                 model.add_statement(
459                     RDF.Statement(filenode, libNS['library'], library))
460
461
462 def guess_library_from_model(model, base_url, flowcell, lane_id):
463     """Attempt to find library URI
464     """
465     flowcellNode = RDF.Node(flowcell)
466     flowcell = str(flowcell.uri)
467     lane_body = """
468     prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
469     prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
470     prefix xsd: <http://www.w3.org/2001/XMLSchema#>
471
472     select ?library ?lane
473     where {{
474       <{flowcell}> libNS:has_lane ?lane ;
475                    a libNS:illumina_flowcell .
476       ?lane libNS:lane_number {lane_id} ;
477             libNS:library ?library .
478     }}
479     """
480     lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
481     lanes = []
482     tries = 3
483     while len(lanes) == 0 and tries > 0:
484         tries -= 1
485         lane_query = RDF.SPARQLQuery(lane_body)
486         lanes = [ l for l in lane_query.execute(model)]
487         if len(lanes) > 1:
488             # CONFUSED!
489             errmsg = "Too many libraries for flowcell {flowcell} "\
490                      "lane {lane} = {count}"
491             LOGGER.error(errmsg.format(flowcell=flowcell,
492                                        lane=lane_id,
493                                        count=len(lanes)))
494             return None
495         elif len(lanes) == 1:
496             # success
497             return lanes[0]['library']
498         else:
499             # try grabbing data
500             model.load(flowcellNode.uri, name="rdfa")
501
502