1d746c9844f8a82a15a586f76b8baa65dd594b9a
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import collections
5 import logging
6 import os
7 import types
8 import re
9 import sys
10 import six
11 from six.moves.urllib.parse import urljoin, urlparse
12
13 import RDF
14 from htsworkflow.util.rdfhelp import libraryOntology as libNS
15 from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
16      strip_namespace, dump_model, simplify_uri
17
18 LOGGER = logging.getLogger(__name__)
19
20 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
21 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
22 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
23
24 SEQUENCE_TABLE_NAME = "sequences"
25 def create_sequence_table(cursor):
26     """
27     Create a SQL table to hold  SequenceFile entries
28     """
29     sql = """
30 CREATE TABLE %(table)s (
31   filetype   CHAR(8),
32   path       TEXT,
33   flowcell   CHAR(8),
34   lane       INTEGER,
35   read       INTEGER,
36   pf         BOOLEAN,
37   cycle      CHAR(8)
38 );
39 """ %( {'table': SEQUENCE_TABLE_NAME} )
40     return cursor.execute(sql)
41
42 FlowcellPath = collections.namedtuple('FlowcellPath',
43                                       'flowcell start stop project')
44
45 class SequenceFile(object):
46     """
47     Simple container class that holds the path to a sequence archive
48     and basic descriptive information.
49     """
50     def __init__(self, filetype, path, flowcell, lane,
51                  read=None,
52                  pf=None,
53                  cycle=None,
54                  project=None,
55                  index=None,
56                  split=None):
57         """Store various fields used in our sequence files
58
59         filetype is one of 'qseq', 'srf', 'fastq'
60         path = location of file
61         flowcell = files flowcell id
62         lane = which lane
63         read = which sequencer read, usually 1 or 2
64         pf = did it pass filter
65         cycle = cycle dir name e.g. C1-202
66         project = projed name from HiSeq, probably library ID
67         index = HiSeq barcode index sequence
68         split = file fragment from HiSeq (Since one file is split into many)
69         """
70         self.filetype = filetype
71         self.path = path
72         self.flowcell = flowcell
73         self.lane = lane
74         self.read = read
75         self.pf = pf
76         self.cycle = cycle
77         self.project = project
78         self.index = index
79         self.split = split
80
81     def __hash__(self):
82         return hash(self.key())
83
84     def key(self):
85         return (self.flowcell, self.lane, self.read, self.project, self.split)
86
87     def __str__(self):
88         return str(self.path)
89
90     def __eq__(self, other):
91         """
92         Equality is defined if everything but the path matches
93         """
94         attributes = ['filetype',
95                       'flowcell',
96                       'lane',
97                       'read',
98                       'pf',
99                       'cycle',
100                       'project',
101                       'index',
102                       'split']
103         for a in attributes:
104             if getattr(self, a) != getattr(other, a):
105                 return False
106
107         return True
108
109     def __ne__(self, other):
110         return not self == other
111
112     def __repr__(self):
113         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
114
115     def make_target_name(self, root):
116         """
117         Create target name for where we need to link this sequence too
118         """
119         path, basename = os.path.split(self.path)
120         # Because the names aren't unque we include the flowcel name
121         # because there were different eland files for different length
122         # analyses, we include the cycle length in the name.
123         if self.filetype == 'eland':
124             template = "%(flowcell)s_%(cycle)s_%(eland)s"
125             basename = template % { 'flowcell': self.flowcell,
126                                     'cycle': self.cycle,
127                                     'eland': basename }
128         # else:
129         # all the other file types have names that include flowcell/lane
130         # information and thus are unique so we don't have to do anything
131         return os.path.join(root, basename)
132
133     def save_to_sql(self, cursor):
134         """
135         Add this entry to a DB2.0 database.
136         """
137         #FIXME: NEEDS SQL ESCAPING
138         header_macro = {'table': SEQUENCE_TABLE_NAME }
139         sql_header = "insert into %(table)s (" % header_macro
140         sql_columns = ['filetype','path','flowcell','lane']
141         sql_middle = ") values ("
142         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
143         sql_footer = ");"
144         for name in ['read', 'pf', 'cycle']:
145             value = getattr(self, name)
146             if value is not None:
147                 sql_columns.append(name)
148                 sql_values.append(value)
149
150         sql = " ".join([sql_header,
151                         ", ".join(sql_columns),
152                         sql_middle,
153                         # note the following makes a string like ?,?,?
154                         ",".join(["?"] * len(sql_values)),
155                         sql_footer])
156
157         return cursor.execute(sql, sql_values)
158
159     def save_to_model(self, model, base_url=None):
160         def add_lit(model, s, p, o):
161             if o is not None:
162                 model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
163         def add(model, s, p, o):
164             model.add_statement(RDF.Statement(s,p,o))
165         # a bit unreliable... assumes filesystem is encoded in utf-8
166         path = os.path.abspath(self.path)
167         fileNode = RDF.Node(RDF.Uri('file://' + path))
168         add(model, fileNode, rdfNS['type'], libNS['IlluminaResult'])
169         add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
170         add_lit(model, fileNode, libNS['lane_number'], self.lane)
171         if self.read is not None:
172             add_lit(model, fileNode, libNS['read'], self.read)
173         else:
174             add_lit(model, fileNode, libNS['read'], '')
175
176         add_lit(model, fileNode, libNS['library_id'], self.project)
177         add_lit(model, fileNode, libNS['multiplex_index'], self.index)
178         add_lit(model, fileNode, libNS['split_id'], self.split)
179         add_lit(model, fileNode, libNS['cycle'], self.cycle)
180         add_lit(model, fileNode, libNS['passed_filter'], self.pf)
181         add(model, fileNode, libNS['file_type'], libNS[self.filetype])
182
183         if base_url is not None:
184             flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
185                 base=base_url,
186                 flowcell=self.flowcell)))
187             add(model, fileNode, libNS['flowcell'], flowcell)
188             if self.project is not None:
189                 library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
190                     base=base_url,
191                     library=self.project)))
192                 add(model, fileNode, libNS['library'], library)
193
194
195     @classmethod
196     def load_from_model(cls, model, seq_id):
197         def get(s, p):
198             values = []
199             stmts = model.find_statements(RDF.Statement(s, p, None))
200             for s in stmts:
201                 obj = s.object
202                 if not obj.is_resource():
203                     values.append(fromTypedNode(obj))
204                 else:
205                     values.append(obj)
206             return values
207         def get_one(s, p):
208             values = get(s, p)
209             if len(values) > 1:
210                 errmsg = u"To many values for %s %s"
211                 raise ValueError(errmsg % (unicode(s), unicode(p)))
212             elif len(values) == 1:
213                 return values[0]
214             else:
215                 return None
216
217         if not isinstance(seq_id, RDF.Node):
218             seq_id = RDF.Node(RDF.Uri(seq_id))
219         result_statement = RDF.Statement(seq_id,
220                                          rdfNS['type'],
221                                          libNS['IlluminaResult'])
222         if not model.contains_statement(result_statement):
223             raise KeyError(u"%s not found" % (unicode(seq_id),))
224
225         seq_type_node = model.get_target(seq_id, libNS['file_type'])
226         seq_type = strip_namespace(libNS, seq_type_node)
227
228         path = urlparse(str(seq_id.uri)).path
229         flowcellNode = get_one(seq_id, libNS['flowcell'])
230         flowcell = get_one(seq_id, libNS['flowcell_id'])
231         lane = get_one(seq_id, libNS['lane_number'])
232         read = get_one(seq_id, libNS['read'])
233
234         obj = cls(seq_type, path, flowcell, lane)
235         obj.read = read if read != '' else None
236         obj.project = get_one(seq_id, libNS['library_id'])
237         obj.index = get_one(seq_id, libNS['multiplex_index'])
238         obj.split = get_one(seq_id, libNS['split_id'])
239         obj.cycle = get_one(seq_id, libNS['cycle'] )
240         obj.pf = get_one(seq_id, libNS['passed_filter'])
241         obj.libraryNode = get_one(seq_id, libNS['library'])
242         return obj
243
244
245 def get_flowcell_cycle(path):
246     """
247     Extract flowcell, cycle from pathname
248     """
249     path = os.path.normpath(path)
250     project = None
251     rest, tail = os.path.split(path)
252     if tail.startswith('Project_'):
253         # we're in a multiplexed sample
254         project = tail
255         rest, cycle = os.path.split(rest)
256     else:
257         cycle = tail
258
259     rest, flowcell = os.path.split(rest)
260     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
261     if cycle_match is None:
262         raise ValueError(
263             "Expected .../flowcell/cycle/ directory structure in %s" % \
264             (path,))
265     start = cycle_match.group('start')
266     if start is not None:
267         start = int(start)
268     stop = cycle_match.group('stop')
269     if stop is not None:
270         stop = int(stop)
271
272     return FlowcellPath(flowcell, start, stop, project)
273
274 def parse_srf(path, filename):
275     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
276     basename, ext = os.path.splitext(filename)
277     records = basename.split('_')
278     flowcell = records[4]
279     lane = records[5][0]
280     fullpath = os.path.join(path, filename)
281
282     if flowcell_dir != flowcell:
283         LOGGER.warn("flowcell %s found in wrong directory %s" % \
284                          (flowcell, path))
285
286     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
287
288 def parse_qseq(path, filename):
289     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
290     basename, ext = os.path.splitext(filename)
291     records = basename.split('_')
292     fullpath = os.path.join(path, filename)
293     flowcell = records[4]
294     lane = records[5][1]
295     read = int(records[6][1])
296
297     if flowcell_dir != flowcell:
298         LOGGER.warn("flowcell %s found in wrong directory %s" % \
299                          (flowcell, path))
300
301     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
302
303 def parse_fastq(path, filename):
304     """Parse fastq names
305     """
306     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
307     basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
308     records = basename.split('_')
309     fullpath = os.path.join(path, filename)
310     if project is not None:
311         # demultiplexed sample!
312         flowcell = flowcell_dir
313         lane = records[2][-1]
314         read = int(records[3][-1])
315         pf = True # as I understand it hiseq runs toss the ones that fail filter
316         index = records[1]
317         project_id = records[0]
318         split = records[4]
319         sequence_type = 'split_fastq'
320     else:
321         flowcell = records[4]
322         lane = records[5][1]
323         read = int(records[6][1])
324         pf = parse_fastq_pf_flag(records)
325         index = None
326         project_id = None
327         split = None
328         sequence_type = 'fastq'
329
330     if flowcell_dir != flowcell:
331         LOGGER.warn("flowcell %s found in wrong directory %s" % \
332                          (flowcell, path))
333
334     return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
335                         pf=pf,
336                         cycle=stop,
337                         project=project_id,
338                         index=index,
339                         split=split)
340
341 def parse_fastq_pf_flag(records):
342     """Take a fastq filename split on _ and look for the pass-filter flag
343     """
344     if len(records) < 8:
345         pf = None
346     else:
347         fastq_type = records[-1].lower()
348         if fastq_type.startswith('pass'):
349             pf = True
350         elif fastq_type.startswith('nopass'):
351             pf = False
352         elif fastq_type.startswith('all'):
353             pf = None
354         else:
355             raise ValueError("Unrecognized fastq name: %s" % (
356                 "_".join(records),))
357
358     return pf
359
360 def parse_eland(path, filename, eland_match=None):
361     if eland_match is None:
362         eland_match = eland_re.match(filename)
363     fullpath = os.path.join(path, filename)
364     flowcell, start, stop, project = get_flowcell_cycle(path)
365     if eland_match.group('lane'):
366         lane = eland_match.group('lane')
367     else:
368         lane = None
369     if eland_match.group('read'):
370         read = int(eland_match.group('read'))
371     else:
372         read = None
373     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
374
375 def scan_for_sequences(dirs):
376     """
377     Scan through a list of directories for sequence like files
378     """
379     sequences = []
380     if isinstance(dirs, six.string_types):
381         raise ValueError("You probably want a list or set, not a string")
382
383     for d in dirs:
384         LOGGER.info("Scanning %s for sequences" % (d,))
385         if not os.path.exists(d):
386             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
387             continue
388
389         for path, dirname, filenames in os.walk(d):
390             for f in filenames:
391                 seq = None
392                 # find sequence files
393                 if f.endswith('.md5'):
394                     continue
395                 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
396                     seq = parse_srf(path, f)
397                 elif qseq_re.match(f):
398                     seq = parse_qseq(path, f)
399                 elif f.endswith('.fastq') or \
400                      f.endswith('.fastq.bz2') or \
401                      f.endswith('.fastq.gz'):
402                     seq = parse_fastq(path, f)
403                 eland_match = eland_re.match(f)
404                 if eland_match:
405                     if f.endswith('.md5'):
406                         continue
407                     seq = parse_eland(path, f, eland_match)
408                 if seq:
409                     sequences.append(seq)
410                     LOGGER.debug("Found sequence at %s" % (f,))
411
412     return sequences
413
414
415 def update_model_sequence_library(model, base_url):
416     """Find sequence objects and add library information if its missing
417     """
418     file_body = """
419     prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
420     select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
421     where {
422        ?filenode a libns:IlluminaResult ;
423                  libns:flowcell_id ?flowcell_id ;
424                  libns:lane_number ?lane_id .
425        OPTIONAL { ?filenode libns:flowcell ?flowcell . }
426        OPTIONAL { ?filenode libns:library ?library .}
427        OPTIONAL { ?filenode libns:library_id ?library_id .}
428     }
429     """
430     LOGGER.debug("update_model_sequence_library query %s", file_body)
431     file_query = RDF.SPARQLQuery(file_body)
432     files = file_query.execute(model)
433
434     libraryNS = RDF.NS(urljoin(base_url, 'library/'))
435     flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
436     for f in files:
437         filenode = f['filenode']
438         LOGGER.debug("Updating file node %s", str(filenode))
439         lane_id = fromTypedNode(f['lane_id'])
440         if f['flowcell'] is None:
441             flowcell = flowcellNS[str(f['flowcell_id'])+'/']
442             LOGGER.debug("Adding file (%s) to flowcell (%s) link",
443                          str(filenode),
444                          str(flowcell))
445             model.add_statement(
446                 RDF.Statement(filenode, libNS['flowcell'], flowcell))
447         else:
448             flowcell = f['flowcell']
449
450         if f['library'] is None:
451             if f['library_id'] is not None:
452                 library = libraryNS[str(f['library_id']) + '/']
453             else:
454                 library = guess_library_from_model(model, base_url,
455                                                    flowcell,
456                                                    lane_id)
457                 if library is None:
458                     LOGGER.error("Unable to decypher: %s %s",
459                                  str(flowcell), str(lane_id))
460                     continue
461                 library_id = toTypedNode(simplify_uri(library))
462                 LOGGER.debug("Adding file (%s) to library (%s) link",
463                              str(filenode),
464                              str(library))
465                 model.add_statement(
466                     RDF.Statement(filenode, libNS['library_id'], library_id))
467             if library is not None:
468                 model.add_statement(
469                     RDF.Statement(filenode, libNS['library'], library))
470
471
472 def guess_library_from_model(model, base_url, flowcell, lane_id):
473     """Attempt to find library URI
474     """
475     flowcellNode = RDF.Node(flowcell)
476     flowcell = str(flowcell.uri)
477     lane_body = """
478     prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
479     prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
480     prefix xsd: <http://www.w3.org/2001/XMLSchema#>
481
482     select ?library ?lane
483     where {{
484       <{flowcell}> libns:has_lane ?lane ;
485                    a libns:IlluminaFlowcell .
486       ?lane libns:lane_number ?lane_id ;
487             libns:library ?library .
488       FILTER(str(?lane_id) = "{lane_id}")
489     }}
490     """
491     lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
492     LOGGER.debug("guess_library_from_model: %s", lane_body)
493     lanes = []
494     tries = 3
495     while len(lanes) == 0 and tries > 0:
496         tries -= 1
497         lane_query = RDF.SPARQLQuery(lane_body)
498         lanes = [ l for l in lane_query.execute(model)]
499         if len(lanes) > 1:
500             # CONFUSED!
501             errmsg = "Too many libraries for flowcell {flowcell} "\
502                      "lane {lane} = {count}"
503             LOGGER.error(errmsg.format(flowcell=flowcell,
504                                        lane=lane_id,
505                                        count=len(lanes)))
506             return None
507         elif len(lanes) == 1:
508             # success
509             return lanes[0]['library']
510         else:
511             # try grabbing data
512             model.load(flowcellNode.uri, name="rdfa")