Allow option to save/restore a sequence class to a RDF model.
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import collections
5 import logging
6 import os
7 import types
8 import re
9 import sys
10 from urlparse import urljoin, urlparse
11
12 import RDF
13 from htsworkflow.util.rdfhelp import libraryOntology as libNS
14 from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
15      stripNamespace, dump_model, simplify_uri
16
17 LOGGER = logging.getLogger(__name__)
18
19 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
20 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
21 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
22
23 SEQUENCE_TABLE_NAME = "sequences"
24 def create_sequence_table(cursor):
25     """
26     Create a SQL table to hold  SequenceFile entries
27     """
28     sql = """
29 CREATE TABLE %(table)s (
30   filetype   CHAR(8),
31   path       TEXT,
32   flowcell   CHAR(8),
33   lane       INTEGER,
34   read       INTEGER,
35   pf         BOOLEAN,
36   cycle      CHAR(8)
37 );
38 """ %( {'table': SEQUENCE_TABLE_NAME} )
39     return cursor.execute(sql)
40
41 FlowcellPath = collections.namedtuple('FlowcellPath',
42                                       'flowcell start stop project')
43
44 class SequenceFile(object):
45     """
46     Simple container class that holds the path to a sequence archive
47     and basic descriptive information.
48     """
49     def __init__(self, filetype, path, flowcell, lane,
50                  read=None,
51                  pf=None,
52                  cycle=None,
53                  project=None,
54                  index=None,
55                  split=None):
56         """Store various fields used in our sequence files
57
58         filetype is one of 'qseq', 'srf', 'fastq'
59         path = location of file
60         flowcell = files flowcell id
61         lane = which lane
62         read = which sequencer read, usually 1 or 2
63         pf = did it pass filter
64         cycle = cycle dir name e.g. C1-202
65         project = projed name from HiSeq, probably library ID
66         index = HiSeq barcode index sequence
67         split = file fragment from HiSeq (Since one file is split into many)
68         """
69         self.filetype = filetype
70         self.path = path
71         self.flowcell = flowcell
72         self.lane = lane
73         self.read = read
74         self.pf = pf
75         self.cycle = cycle
76         self.project = project
77         self.index = index
78         self.split = split
79
80     def __hash__(self):
81         return hash(self.key())
82
83     def key(self):
84         return (self.flowcell, self.lane, self.read, self.project, self.split)
85
86     def __unicode__(self):
87         return unicode(self.path)
88
89     def __eq__(self, other):
90         """
91         Equality is defined if everything but the path matches
92         """
93         attributes = ['filetype',
94                       'flowcell',
95                       'lane',
96                       'read',
97                       'pf',
98                       'cycle',
99                       'project',
100                       'index',
101                       'split']
102         for a in attributes:
103             if getattr(self, a) != getattr(other, a):
104                 return False
105
106         return True
107
108     def __ne__(self, other):
109         return not self == other
110
111     def __repr__(self):
112         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
113
114     def make_target_name(self, root):
115         """
116         Create target name for where we need to link this sequence too
117         """
118         path, basename = os.path.split(self.path)
119         # Because the names aren't unque we include the flowcel name
120         # because there were different eland files for different length
121         # analyses, we include the cycle length in the name.
122         if self.filetype == 'eland':
123             template = "%(flowcell)s_%(cycle)s_%(eland)s"
124             basename = template % { 'flowcell': self.flowcell,
125                                     'cycle': self.cycle,
126                                     'eland': basename }
127         # else:
128         # all the other file types have names that include flowcell/lane
129         # information and thus are unique so we don't have to do anything
130         return os.path.join(root, basename)
131
132     def save_to_sql(self, cursor):
133         """
134         Add this entry to a DB2.0 database.
135         """
136         #FIXME: NEEDS SQL ESCAPING
137         header_macro = {'table': SEQUENCE_TABLE_NAME }
138         sql_header = "insert into %(table)s (" % header_macro
139         sql_columns = ['filetype','path','flowcell','lane']
140         sql_middle = ") values ("
141         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
142         sql_footer = ");"
143         for name in ['read', 'pf', 'cycle']:
144             value = getattr(self, name)
145             if value is not None:
146                 sql_columns.append(name)
147                 sql_values.append(value)
148
149         sql = " ".join([sql_header,
150                         ", ".join(sql_columns),
151                         sql_middle,
152                         # note the following makes a string like ?,?,?
153                         ",".join(["?"] * len(sql_values)),
154                         sql_footer])
155
156         return cursor.execute(sql, sql_values)
157
158     def save_to_model(self, model, base_url=None):
159         def add_lit(model, s, p, o):
160             if o is not None:
161                 model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
162         def add(model, s, p, o):
163             model.add_statement(RDF.Statement(s,p,o))
164         fileNode = RDF.Node(RDF.Uri('file://' + os.path.abspath(self.path)))
165         add(model, fileNode, rdfNS['type'], libNS['raw_file'])
166         add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
167         add_lit(model, fileNode, libNS['lane_number'], self.lane)
168         if self.read is not None:
169             add_lit(model, fileNode, libNS['read'], self.read)
170         else:
171             add_lit(model, fileNode, libNS['read'], '')
172
173         add_lit(model, fileNode, libNS['library_id'], self.project)
174         add_lit(model, fileNode, libNS['multiplex_index'], self.index)
175         add_lit(model, fileNode, libNS['split_id'], self.split)
176         add_lit(model, fileNode, libNS['cycle'], self.cycle)
177         add_lit(model, fileNode, libNS['passed_filter'], self.pf)
178         add(model, fileNode, rdfNS['type'], libNS[self.filetype])
179
180         if base_url is not None:
181             flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
182                 base=base_url,
183                 flowcell=self.flowcell)))
184             add(model, fileNode, libNS['flowcell'], flowcell)
185             if self.project is not None:
186                 library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
187                     base=base_url,
188                     library=self.project)))
189                 add(model, fileNode, libNS['library'], library)
190
191
192     @classmethod
193     def load_from_model(cls, model, seq_id):
194         def get(s, p):
195             values = []
196             stmts = model.find_statements(RDF.Statement(s, p, None))
197             for s in stmts:
198                 obj = s.object
199                 if not obj.is_resource():
200                     values.append(fromTypedNode(obj))
201                 else:
202                     values.append(obj)
203             return values
204         def get_one(s, p):
205             values = get(s, p)
206             if len(values) > 1:
207                 errmsg = u"To many values for %s %s"
208                 raise ValueError(errmsg % (unicode(s), unicode(p)))
209             elif len(values) == 1:
210                 return values[0]
211             else:
212                 return None
213
214         if not isinstance(seq_id, RDF.Node):
215             seq_id = RDF.Node(RDF.Uri(seq_id))
216         seqTypesStmt = RDF.Statement(seq_id, rdfNS['type'], None)
217         seqTypes = model.find_statements(seqTypesStmt)
218         isSequenceFile = False
219         for s in seqTypes:
220             if s.object == libNS['raw_file']:
221                 isSequenceFile = True
222             else:
223                 seq_type = stripNamespace(libNS, s.object)
224
225         if not isSequenceFile:
226             raise KeyError(u"%s not found" % (unicode(seq_id),))
227
228         path = urlparse(str(seq_id.uri)).path
229         flowcellNode = get_one(seq_id, libNS['flowcell'])
230         flowcell = get_one(seq_id, libNS['flowcell_id'])
231         lane = get_one(seq_id, libNS['lane_number'])
232         read = get_one(seq_id, libNS['read'])
233
234         obj = cls(seq_type, path, flowcell, lane)
235         obj.read = read if read != '' else None
236         obj.project = get_one(seq_id, libNS['library_id'])
237         obj.index = get_one(seq_id, libNS['multiplex_index'])
238         obj.split = get_one(seq_id, libNS['split_id'])
239         obj.cycle = get_one(seq_id, libNS['cycle'] )
240         obj.pf = get_one(seq_id, libNS['passed_filter'])
241         obj.libraryNode = get_one(seq_id, libNS['library'])
242         return obj
243
244
245 def get_flowcell_cycle(path):
246     """
247     Extract flowcell, cycle from pathname
248     """
249     path = os.path.normpath(path)
250     project = None
251     rest, tail = os.path.split(path)
252     if tail.startswith('Project_'):
253         # we're in a multiplexed sample
254         project = tail
255         rest, cycle = os.path.split(rest)
256     else:
257         cycle = tail
258
259     rest, flowcell = os.path.split(rest)
260     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
261     if cycle_match is None:
262         raise ValueError(
263             "Expected .../flowcell/cycle/ directory structure in %s" % \
264             (path,))
265     start = cycle_match.group('start')
266     if start is not None:
267         start = int(start)
268     stop = cycle_match.group('stop')
269     if stop is not None:
270         stop = int(stop)
271
272     return FlowcellPath(flowcell, start, stop, project)
273
274 def parse_srf(path, filename):
275     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
276     basename, ext = os.path.splitext(filename)
277     records = basename.split('_')
278     flowcell = records[4]
279     lane = int(records[5][0])
280     fullpath = os.path.join(path, filename)
281
282     if flowcell_dir != flowcell:
283         LOGGER.warn("flowcell %s found in wrong directory %s" % \
284                          (flowcell, path))
285
286     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
287
288 def parse_qseq(path, filename):
289     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
290     basename, ext = os.path.splitext(filename)
291     records = basename.split('_')
292     fullpath = os.path.join(path, filename)
293     flowcell = records[4]
294     lane = int(records[5][1])
295     read = int(records[6][1])
296
297     if flowcell_dir != flowcell:
298         LOGGER.warn("flowcell %s found in wrong directory %s" % \
299                          (flowcell, path))
300
301     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
302
303 def parse_fastq(path, filename):
304     """Parse fastq names
305     """
306     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
307     basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
308     records = basename.split('_')
309     fullpath = os.path.join(path, filename)
310     if project is not None:
311         # demultiplexed sample!
312         flowcell = flowcell_dir
313         lane = int(records[2][-1])
314         read = int(records[3][-1])
315         pf = True # as I understand it hiseq runs toss the ones that fail filter
316         index = records[1]
317         project_id = records[0]
318         split = records[4]
319         sequence_type = 'split_fastq'
320     else:
321         flowcell = records[4]
322         lane = int(records[5][1])
323         read = int(records[6][1])
324         pf = parse_fastq_pf_flag(records)
325         index = None
326         project_id = None
327         split = None
328         sequence_type = 'fastq'
329
330     if flowcell_dir != flowcell:
331         LOGGER.warn("flowcell %s found in wrong directory %s" % \
332                          (flowcell, path))
333
334     return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
335                         pf=pf,
336                         cycle=stop,
337                         project=project_id,
338                         index=index,
339                         split=split)
340
341 def parse_fastq_pf_flag(records):
342     """Take a fastq filename split on _ and look for the pass-filter flag
343     """
344     if len(records) < 8:
345         pf = None
346     else:
347         fastq_type = records[-1].lower()
348         if fastq_type.startswith('pass'):
349             pf = True
350         elif fastq_type.startswith('nopass'):
351             pf = False
352         elif fastq_type.startswith('all'):
353             pf = None
354         else:
355             raise ValueError("Unrecognized fastq name: %s" % (
356                 "_".join(records),))
357
358     return pf
359
360 def parse_eland(path, filename, eland_match=None):
361     if eland_match is None:
362         eland_match = eland_re.match(filename)
363     fullpath = os.path.join(path, filename)
364     flowcell, start, stop, project = get_flowcell_cycle(path)
365     if eland_match.group('lane'):
366         lane = int(eland_match.group('lane'))
367     else:
368         lane = None
369     if eland_match.group('read'):
370         read = int(eland_match.group('read'))
371     else:
372         read = None
373     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
374
375 def scan_for_sequences(dirs):
376     """
377     Scan through a list of directories for sequence like files
378     """
379     sequences = []
380     if type(dirs) in types.StringTypes:
381         raise ValueError("You probably want a list or set, not a string")
382
383     for d in dirs:
384         LOGGER.info("Scanning %s for sequences" % (d,))
385         if not os.path.exists(d):
386             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
387             continue
388
389         for path, dirname, filenames in os.walk(d):
390             for f in filenames:
391                 seq = None
392                 # find sequence files
393                 if f.endswith('.md5'):
394                     continue
395                 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
396                     seq = parse_srf(path, f)
397                 elif qseq_re.match(f):
398                     seq = parse_qseq(path, f)
399                 elif f.endswith('.fastq') or \
400                      f.endswith('.fastq.bz2') or \
401                      f.endswith('.fastq.gz'):
402                     seq = parse_fastq(path, f)
403                 eland_match = eland_re.match(f)
404                 if eland_match:
405                     if f.endswith('.md5'):
406                         continue
407                     seq = parse_eland(path, f, eland_match)
408                 if seq:
409                     sequences.append(seq)
410                     LOGGER.debug("Found sequence at %s" % (f,))
411
412     return sequences
413
414
415 def update_model_sequence_library(model, base_url):
416     """Find sequence objects and add library information if its missing
417     """
418     file_body = """
419     prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
420     select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
421     where {
422        ?filenode a libNS:raw_file ;
423                  libNS:flowcell_id ?flowcell_id ;
424                  libNS:lane_number ?lane_id .
425        OPTIONAL { ?filenode libNS:flowcell ?flowcell . }
426        OPTIONAL { ?filenode libNS:library ?library .}
427        OPTIONAL { ?filenode libNS:library_id ?library_id .}
428     }
429     """
430     file_query = RDF.SPARQLQuery(file_body)
431     files = file_query.execute(model)
432
433     libraryNS = RDF.NS(urljoin(base_url, 'library/'))
434     flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
435     for f in files:
436         filenode = f['filenode']
437         lane_id = fromTypedNode(f['lane_id'])
438         if f['flowcell'] is None:
439             flowcell = flowcellNS[str(f['flowcell_id'])+'/']
440             model.add_statement(
441                 RDF.Statement(filenode, libNS['flowcell'], flowcell))
442         else:
443             flowcell = f['flowcell']
444
445         if f['library'] is None:
446             if f['library_id'] is not None:
447                 library = libraryNS[str(f['library_id']) + '/']
448             else:
449                 library = guess_library_from_model(model, base_url,
450                                                    flowcell,
451                                                    lane_id)
452                 library_id = toTypedNode(simplify_uri(library))
453                 model.add_statement(
454                     RDF.Statement(filenode, libNS['library_id'], library_id))
455             if library is not None:
456                 model.add_statement(
457                     RDF.Statement(filenode, libNS['library'], library))
458
459
460 def guess_library_from_model(model, base_url, flowcell, lane_id):
461     """Attempt to find library URI
462     """
463     flowcellNode = RDF.Node(flowcell)
464     flowcell = str(flowcell.uri)
465     lane_body = """
466     prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
467     prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
468     prefix xsd: <http://www.w3.org/2001/XMLSchema#>
469
470     select ?library ?lane
471     where {{
472       <{flowcell}> libNS:has_lane ?lane ;
473                    a libNS:illumina_flowcell .
474       ?lane libNS:lane_number {lane_id} ;
475             libNS:library ?library .
476     }}
477     """
478     lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
479     lanes = []
480     tries = 3
481     while len(lanes) == 0 and tries > 0:
482         tries -= 1
483         lane_query = RDF.SPARQLQuery(lane_body)
484         lanes = [ l for l in lane_query.execute(model)]
485         if len(lanes) > 1:
486             # CONFUSED!
487             errmsg = "Too many libraries for flowcell {flowcell} "\
488                      "lane {lane} = {count}"
489             LOGGER.error(errmsg.format(flowcell=flowcell,
490                                        lane=lane_id,
491                                        count=len(lanes)))
492             return None
493         elif len(lanes) == 1:
494             # success
495             return lanes[0]['library']
496         else:
497             # try grabbing data
498             model.load(flowcellNode.uri, name="rdfa")
499
500