Rename stripNamespace strip_namespace
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import collections
5 import logging
6 import os
7 import types
8 import re
9 import sys
10 from urlparse import urljoin, urlparse
11
12 import RDF
13 from htsworkflow.util.rdfhelp import libraryOntology as libNS
14 from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
15      strip_namespace, dump_model, simplify_uri
16
17 LOGGER = logging.getLogger(__name__)
18
19 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
20 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
21 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
22
23 SEQUENCE_TABLE_NAME = "sequences"
24 def create_sequence_table(cursor):
25     """
26     Create a SQL table to hold  SequenceFile entries
27     """
28     sql = """
29 CREATE TABLE %(table)s (
30   filetype   CHAR(8),
31   path       TEXT,
32   flowcell   CHAR(8),
33   lane       INTEGER,
34   read       INTEGER,
35   pf         BOOLEAN,
36   cycle      CHAR(8)
37 );
38 """ %( {'table': SEQUENCE_TABLE_NAME} )
39     return cursor.execute(sql)
40
41 FlowcellPath = collections.namedtuple('FlowcellPath',
42                                       'flowcell start stop project')
43
44 class SequenceFile(object):
45     """
46     Simple container class that holds the path to a sequence archive
47     and basic descriptive information.
48     """
49     def __init__(self, filetype, path, flowcell, lane,
50                  read=None,
51                  pf=None,
52                  cycle=None,
53                  project=None,
54                  index=None,
55                  split=None):
56         """Store various fields used in our sequence files
57
58         filetype is one of 'qseq', 'srf', 'fastq'
59         path = location of file
60         flowcell = files flowcell id
61         lane = which lane
62         read = which sequencer read, usually 1 or 2
63         pf = did it pass filter
64         cycle = cycle dir name e.g. C1-202
65         project = projed name from HiSeq, probably library ID
66         index = HiSeq barcode index sequence
67         split = file fragment from HiSeq (Since one file is split into many)
68         """
69         self.filetype = filetype
70         self.path = path
71         self.flowcell = flowcell
72         self.lane = lane
73         self.read = read
74         self.pf = pf
75         self.cycle = cycle
76         self.project = project
77         self.index = index
78         self.split = split
79
80     def __hash__(self):
81         return hash(self.key())
82
83     def key(self):
84         return (self.flowcell, self.lane, self.read, self.project, self.split)
85
86     def __unicode__(self):
87         return unicode(self.path)
88
89     def __eq__(self, other):
90         """
91         Equality is defined if everything but the path matches
92         """
93         attributes = ['filetype',
94                       'flowcell',
95                       'lane',
96                       'read',
97                       'pf',
98                       'cycle',
99                       'project',
100                       'index',
101                       'split']
102         for a in attributes:
103             if getattr(self, a) != getattr(other, a):
104                 return False
105
106         return True
107
108     def __ne__(self, other):
109         return not self == other
110
111     def __repr__(self):
112         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
113
114     def make_target_name(self, root):
115         """
116         Create target name for where we need to link this sequence too
117         """
118         path, basename = os.path.split(self.path)
119         # Because the names aren't unque we include the flowcel name
120         # because there were different eland files for different length
121         # analyses, we include the cycle length in the name.
122         if self.filetype == 'eland':
123             template = "%(flowcell)s_%(cycle)s_%(eland)s"
124             basename = template % { 'flowcell': self.flowcell,
125                                     'cycle': self.cycle,
126                                     'eland': basename }
127         # else:
128         # all the other file types have names that include flowcell/lane
129         # information and thus are unique so we don't have to do anything
130         return os.path.join(root, basename)
131
132     def save_to_sql(self, cursor):
133         """
134         Add this entry to a DB2.0 database.
135         """
136         #FIXME: NEEDS SQL ESCAPING
137         header_macro = {'table': SEQUENCE_TABLE_NAME }
138         sql_header = "insert into %(table)s (" % header_macro
139         sql_columns = ['filetype','path','flowcell','lane']
140         sql_middle = ") values ("
141         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
142         sql_footer = ");"
143         for name in ['read', 'pf', 'cycle']:
144             value = getattr(self, name)
145             if value is not None:
146                 sql_columns.append(name)
147                 sql_values.append(value)
148
149         sql = " ".join([sql_header,
150                         ", ".join(sql_columns),
151                         sql_middle,
152                         # note the following makes a string like ?,?,?
153                         ",".join(["?"] * len(sql_values)),
154                         sql_footer])
155
156         return cursor.execute(sql, sql_values)
157
158     def save_to_model(self, model, base_url=None):
159         def add_lit(model, s, p, o):
160             if o is not None:
161                 model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
162         def add(model, s, p, o):
163             model.add_statement(RDF.Statement(s,p,o))
164         # a bit unreliable... assumes filesystem is encoded in utf-8
165         path = os.path.abspath(self.path.encode('utf-8'))
166         fileNode = RDF.Node(RDF.Uri('file://' + path))
167         add(model, fileNode, rdfNS['type'], libNS['IlluminaResult'])
168         add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
169         add_lit(model, fileNode, libNS['lane_number'], self.lane)
170         if self.read is not None:
171             add_lit(model, fileNode, libNS['read'], self.read)
172         else:
173             add_lit(model, fileNode, libNS['read'], '')
174
175         add_lit(model, fileNode, libNS['library_id'], self.project)
176         add_lit(model, fileNode, libNS['multiplex_index'], self.index)
177         add_lit(model, fileNode, libNS['split_id'], self.split)
178         add_lit(model, fileNode, libNS['cycle'], self.cycle)
179         add_lit(model, fileNode, libNS['passed_filter'], self.pf)
180         add(model, fileNode, libNS['file_type'], libNS[self.filetype])
181
182         if base_url is not None:
183             flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
184                 base=base_url,
185                 flowcell=self.flowcell)))
186             add(model, fileNode, libNS['flowcell'], flowcell)
187             if self.project is not None:
188                 library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
189                     base=base_url,
190                     library=self.project)))
191                 add(model, fileNode, libNS['library'], library)
192
193
194     @classmethod
195     def load_from_model(cls, model, seq_id):
196         def get(s, p):
197             values = []
198             stmts = model.find_statements(RDF.Statement(s, p, None))
199             for s in stmts:
200                 obj = s.object
201                 if not obj.is_resource():
202                     values.append(fromTypedNode(obj))
203                 else:
204                     values.append(obj)
205             return values
206         def get_one(s, p):
207             values = get(s, p)
208             if len(values) > 1:
209                 errmsg = u"To many values for %s %s"
210                 raise ValueError(errmsg % (unicode(s), unicode(p)))
211             elif len(values) == 1:
212                 return values[0]
213             else:
214                 return None
215
216         if not isinstance(seq_id, RDF.Node):
217             seq_id = RDF.Node(RDF.Uri(seq_id))
218         result_statement = RDF.Statement(seq_id,
219                                          rdfNS['type'],
220                                          libNS['IlluminaResult'])
221         if not model.contains_statement(result_statement):
222             raise KeyError(u"%s not found" % (unicode(seq_id),))
223
224         seq_type_node = model.get_target(seq_id, libNS['file_type'])
225         seq_type = strip_namespace(libNS, seq_type_node)
226
227         path = urlparse(str(seq_id.uri)).path
228         flowcellNode = get_one(seq_id, libNS['flowcell'])
229         flowcell = get_one(seq_id, libNS['flowcell_id'])
230         lane = get_one(seq_id, libNS['lane_number'])
231         read = get_one(seq_id, libNS['read'])
232
233         obj = cls(seq_type, path, flowcell, lane)
234         obj.read = read if read != '' else None
235         obj.project = get_one(seq_id, libNS['library_id'])
236         obj.index = get_one(seq_id, libNS['multiplex_index'])
237         obj.split = get_one(seq_id, libNS['split_id'])
238         obj.cycle = get_one(seq_id, libNS['cycle'] )
239         obj.pf = get_one(seq_id, libNS['passed_filter'])
240         obj.libraryNode = get_one(seq_id, libNS['library'])
241         return obj
242
243
244 def get_flowcell_cycle(path):
245     """
246     Extract flowcell, cycle from pathname
247     """
248     path = os.path.normpath(path)
249     project = None
250     rest, tail = os.path.split(path)
251     if tail.startswith('Project_'):
252         # we're in a multiplexed sample
253         project = tail
254         rest, cycle = os.path.split(rest)
255     else:
256         cycle = tail
257
258     rest, flowcell = os.path.split(rest)
259     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
260     if cycle_match is None:
261         raise ValueError(
262             "Expected .../flowcell/cycle/ directory structure in %s" % \
263             (path,))
264     start = cycle_match.group('start')
265     if start is not None:
266         start = int(start)
267     stop = cycle_match.group('stop')
268     if stop is not None:
269         stop = int(stop)
270
271     return FlowcellPath(flowcell, start, stop, project)
272
273 def parse_srf(path, filename):
274     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
275     basename, ext = os.path.splitext(filename)
276     records = basename.split('_')
277     flowcell = records[4]
278     lane = records[5][0]
279     fullpath = os.path.join(path, filename)
280
281     if flowcell_dir != flowcell:
282         LOGGER.warn("flowcell %s found in wrong directory %s" % \
283                          (flowcell, path))
284
285     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
286
287 def parse_qseq(path, filename):
288     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
289     basename, ext = os.path.splitext(filename)
290     records = basename.split('_')
291     fullpath = os.path.join(path, filename)
292     flowcell = records[4]
293     lane = records[5][1]
294     read = int(records[6][1])
295
296     if flowcell_dir != flowcell:
297         LOGGER.warn("flowcell %s found in wrong directory %s" % \
298                          (flowcell, path))
299
300     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
301
302 def parse_fastq(path, filename):
303     """Parse fastq names
304     """
305     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
306     basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
307     records = basename.split('_')
308     fullpath = os.path.join(path, filename)
309     if project is not None:
310         # demultiplexed sample!
311         flowcell = flowcell_dir
312         lane = records[2][-1]
313         read = int(records[3][-1])
314         pf = True # as I understand it hiseq runs toss the ones that fail filter
315         index = records[1]
316         project_id = records[0]
317         split = records[4]
318         sequence_type = 'split_fastq'
319     else:
320         flowcell = records[4]
321         lane = records[5][1]
322         read = int(records[6][1])
323         pf = parse_fastq_pf_flag(records)
324         index = None
325         project_id = None
326         split = None
327         sequence_type = 'fastq'
328
329     if flowcell_dir != flowcell:
330         LOGGER.warn("flowcell %s found in wrong directory %s" % \
331                          (flowcell, path))
332
333     return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
334                         pf=pf,
335                         cycle=stop,
336                         project=project_id,
337                         index=index,
338                         split=split)
339
340 def parse_fastq_pf_flag(records):
341     """Take a fastq filename split on _ and look for the pass-filter flag
342     """
343     if len(records) < 8:
344         pf = None
345     else:
346         fastq_type = records[-1].lower()
347         if fastq_type.startswith('pass'):
348             pf = True
349         elif fastq_type.startswith('nopass'):
350             pf = False
351         elif fastq_type.startswith('all'):
352             pf = None
353         else:
354             raise ValueError("Unrecognized fastq name: %s" % (
355                 "_".join(records),))
356
357     return pf
358
359 def parse_eland(path, filename, eland_match=None):
360     if eland_match is None:
361         eland_match = eland_re.match(filename)
362     fullpath = os.path.join(path, filename)
363     flowcell, start, stop, project = get_flowcell_cycle(path)
364     if eland_match.group('lane'):
365         lane = eland_match.group('lane')
366     else:
367         lane = None
368     if eland_match.group('read'):
369         read = int(eland_match.group('read'))
370     else:
371         read = None
372     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
373
374 def scan_for_sequences(dirs):
375     """
376     Scan through a list of directories for sequence like files
377     """
378     sequences = []
379     if type(dirs) in types.StringTypes:
380         raise ValueError("You probably want a list or set, not a string")
381
382     for d in dirs:
383         LOGGER.info("Scanning %s for sequences" % (d,))
384         if not os.path.exists(d):
385             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
386             continue
387
388         for path, dirname, filenames in os.walk(d):
389             for f in filenames:
390                 seq = None
391                 # find sequence files
392                 if f.endswith('.md5'):
393                     continue
394                 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
395                     seq = parse_srf(path, f)
396                 elif qseq_re.match(f):
397                     seq = parse_qseq(path, f)
398                 elif f.endswith('.fastq') or \
399                      f.endswith('.fastq.bz2') or \
400                      f.endswith('.fastq.gz'):
401                     seq = parse_fastq(path, f)
402                 eland_match = eland_re.match(f)
403                 if eland_match:
404                     if f.endswith('.md5'):
405                         continue
406                     seq = parse_eland(path, f, eland_match)
407                 if seq:
408                     sequences.append(seq)
409                     LOGGER.debug("Found sequence at %s" % (f,))
410
411     return sequences
412
413
414 def update_model_sequence_library(model, base_url):
415     """Find sequence objects and add library information if its missing
416     """
417     file_body = """
418     prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
419     select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
420     where {
421        ?filenode a libns:IlluminaResult ;
422                  libns:flowcell_id ?flowcell_id ;
423                  libns:lane_number ?lane_id .
424        OPTIONAL { ?filenode libns:flowcell ?flowcell . }
425        OPTIONAL { ?filenode libns:library ?library .}
426        OPTIONAL { ?filenode libns:library_id ?library_id .}
427     }
428     """
429     LOGGER.debug("update_model_sequence_library query %s", file_body)
430     file_query = RDF.SPARQLQuery(file_body)
431     files = file_query.execute(model)
432
433     libraryNS = RDF.NS(urljoin(base_url, 'library/'))
434     flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
435     for f in files:
436         filenode = f['filenode']
437         LOGGER.debug("Updating file node %s", str(filenode))
438         lane_id = fromTypedNode(f['lane_id'])
439         if f['flowcell'] is None:
440             flowcell = flowcellNS[str(f['flowcell_id'])+'/']
441             LOGGER.debug("Adding file (%s) to flowcell (%s) link",
442                          str(filenode),
443                          str(flowcell))
444             model.add_statement(
445                 RDF.Statement(filenode, libNS['flowcell'], flowcell))
446         else:
447             flowcell = f['flowcell']
448
449         if f['library'] is None:
450             if f['library_id'] is not None:
451                 library = libraryNS[str(f['library_id']) + '/']
452             else:
453                 library = guess_library_from_model(model, base_url,
454                                                    flowcell,
455                                                    lane_id)
456                 if library is None:
457                     LOGGER.error("Unable to decypher: %s %s",
458                                  str(flowcell), str(lane_id))
459                     continue
460                 library_id = toTypedNode(simplify_uri(library))
461                 LOGGER.debug("Adding file (%s) to library (%s) link",
462                              str(filenode),
463                              str(library))
464                 model.add_statement(
465                     RDF.Statement(filenode, libNS['library_id'], library_id))
466             if library is not None:
467                 model.add_statement(
468                     RDF.Statement(filenode, libNS['library'], library))
469
470
471 def guess_library_from_model(model, base_url, flowcell, lane_id):
472     """Attempt to find library URI
473     """
474     flowcellNode = RDF.Node(flowcell)
475     flowcell = str(flowcell.uri)
476     lane_body = """
477     prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
478     prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
479     prefix xsd: <http://www.w3.org/2001/XMLSchema#>
480
481     select ?library ?lane
482     where {{
483       <{flowcell}> libns:has_lane ?lane ;
484                    a libns:IlluminaFlowcell .
485       ?lane libns:lane_number ?lane_id ;
486             libns:library ?library .
487       FILTER(str(?lane_id) = "{lane_id}")
488     }}
489     """
490     lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
491     LOGGER.debug("guess_library_from_model: %s", lane_body)
492     lanes = []
493     tries = 3
494     while len(lanes) == 0 and tries > 0:
495         tries -= 1
496         lane_query = RDF.SPARQLQuery(lane_body)
497         lanes = [ l for l in lane_query.execute(model)]
498         if len(lanes) > 1:
499             # CONFUSED!
500             errmsg = "Too many libraries for flowcell {flowcell} "\
501                      "lane {lane} = {count}"
502             LOGGER.error(errmsg.format(flowcell=flowcell,
503                                        lane=lane_id,
504                                        count=len(lanes)))
505             return None
506         elif len(lanes) == 1:
507             # success
508             return lanes[0]['library']
509         else:
510             # try grabbing data
511             model.load(flowcellNode.uri, name="rdfa")