Port pipelines.sequences to rdflib
[htsworkflow.git] / htsworkflow / pipelines / sequences.py
1 """
2 Utilities to work with the various eras of sequence archive files
3 """
4 import collections
5 import logging
6 import os
7 import types
8 import re
9 import sys
10 import six
11 from six.moves.urllib.parse import urljoin, urlparse
12
13 from rdflib import BNode, Literal, Namespace, URIRef
14 from htsworkflow.util.rdfhelp import (
15     dump_model,
16     libraryOntology as libNS,
17     RDF,
18     simplify_uri,
19     strip_namespace,
20 )
21
22
23 LOGGER = logging.getLogger(__name__)
24
25 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
26 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
27 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
28
29 SEQUENCE_TABLE_NAME = "sequences"
30 def create_sequence_table(cursor):
31     """
32     Create a SQL table to hold  SequenceFile entries
33     """
34     sql = """
35 CREATE TABLE %(table)s (
36   filetype   CHAR(8),
37   path       TEXT,
38   flowcell   CHAR(8),
39   lane       INTEGER,
40   read       INTEGER,
41   pf         BOOLEAN,
42   cycle      CHAR(8)
43 );
44 """ %( {'table': SEQUENCE_TABLE_NAME} )
45     return cursor.execute(sql)
46
47 FlowcellPath = collections.namedtuple('FlowcellPath',
48                                       'flowcell start stop project')
49
50 class SequenceFile(object):
51     """
52     Simple container class that holds the path to a sequence archive
53     and basic descriptive information.
54     """
55     def __init__(self, filetype, path, flowcell, lane,
56                  read=None,
57                  pf=None,
58                  cycle=None,
59                  project=None,
60                  index=None,
61                  split=None):
62         """Store various fields used in our sequence files
63
64         filetype is one of 'qseq', 'srf', 'fastq'
65         path = location of file
66         flowcell = files flowcell id
67         lane = which lane
68         read = which sequencer read, usually 1 or 2
69         pf = did it pass filter
70         cycle = cycle dir name e.g. C1-202
71         project = projed name from HiSeq, probably library ID
72         index = HiSeq barcode index sequence
73         split = file fragment from HiSeq (Since one file is split into many)
74         """
75         self.filetype = filetype
76         self.path = path
77         self.flowcell = flowcell
78         self.lane = lane
79         self.read = read
80         self.pf = pf
81         self.cycle = cycle
82         self.project = project
83         self.index = index
84         self.split = split
85
86     def __hash__(self):
87         return hash(self.key())
88
89     def key(self):
90         return (self.flowcell, self.lane, self.read, self.project, self.split)
91
92     def __str__(self):
93         return str(self.path)
94
95     def __eq__(self, other):
96         """
97         Equality is defined if everything but the path matches
98         """
99         attributes = ['filetype',
100                       'flowcell',
101                       'lane',
102                       'read',
103                       'pf',
104                       'cycle',
105                       'project',
106                       'index',
107                       'split']
108         for a in attributes:
109             if getattr(self, a) != getattr(other, a):
110                 return False
111
112         return True
113
114     def __ne__(self, other):
115         return not self == other
116
117     def __repr__(self):
118         return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
119
120     def make_target_name(self, root):
121         """
122         Create target name for where we need to link this sequence too
123         """
124         path, basename = os.path.split(self.path)
125         # Because the names aren't unque we include the flowcel name
126         # because there were different eland files for different length
127         # analyses, we include the cycle length in the name.
128         if self.filetype == 'eland':
129             template = "%(flowcell)s_%(cycle)s_%(eland)s"
130             basename = template % { 'flowcell': self.flowcell,
131                                     'cycle': self.cycle,
132                                     'eland': basename }
133         # else:
134         # all the other file types have names that include flowcell/lane
135         # information and thus are unique so we don't have to do anything
136         return os.path.join(root, basename)
137
138     def save_to_sql(self, cursor):
139         """
140         Add this entry to a DB2.0 database.
141         """
142         #FIXME: NEEDS SQL ESCAPING
143         header_macro = {'table': SEQUENCE_TABLE_NAME }
144         sql_header = "insert into %(table)s (" % header_macro
145         sql_columns = ['filetype','path','flowcell','lane']
146         sql_middle = ") values ("
147         sql_values = [self.filetype, self.path, self.flowcell, self.lane]
148         sql_footer = ");"
149         for name in ['read', 'pf', 'cycle']:
150             value = getattr(self, name)
151             if value is not None:
152                 sql_columns.append(name)
153                 sql_values.append(value)
154
155         sql = " ".join([sql_header,
156                         ", ".join(sql_columns),
157                         sql_middle,
158                         # note the following makes a string like ?,?,?
159                         ",".join(["?"] * len(sql_values)),
160                         sql_footer])
161
162         return cursor.execute(sql, sql_values)
163
164     def save_to_model(self, model, base_url=None):
165         def add_lit(model, s, p, o):
166             if o is not None:
167                 model.add((s, p, Literal(o)))
168         def add(model, s, p, o):
169             model.add((s, p, o))
170         # a bit unreliable... assumes filesystem is encoded in utf-8
171         path = os.path.abspath(self.path)
172         fileNode = URIRef('file://' + path)
173         add(model, fileNode, RDF['type'], libNS['IlluminaResult'])
174         add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
175         add_lit(model, fileNode, libNS['lane_number'], self.lane)
176         if self.read is not None:
177             add_lit(model, fileNode, libNS['read'], self.read)
178         else:
179             add_lit(model, fileNode, libNS['read'], '')
180
181         add_lit(model, fileNode, libNS['library_id'], self.project)
182         add_lit(model, fileNode, libNS['multiplex_index'], self.index)
183         add_lit(model, fileNode, libNS['split_id'], self.split)
184         add_lit(model, fileNode, libNS['cycle'], self.cycle)
185         add_lit(model, fileNode, libNS['passed_filter'], self.pf)
186         add(model, fileNode, libNS['file_type'], libNS[self.filetype])
187
188         if base_url is not None:
189             flowcell = URIRef("{base}/flowcell/{flowcell}/".format(
190                 base=base_url,
191                 flowcell=self.flowcell))
192             add(model, fileNode, libNS['flowcell'], flowcell)
193             if self.project is not None:
194                 library = URIRef("{base}/library/{library}".format(
195                     base=base_url,
196                     library=self.project))
197                 add(model, fileNode, libNS['library'], library)
198
199
200     @classmethod
201     def load_from_model(cls, model, seq_id):
202         def get(s, p):
203             values = []
204             stmts = model.triples((s, p, None))
205             for s in stmts:
206                 obj = s[2]
207                 if not isinstance(obj, URIRef):
208                     values.append(obj.toPython())
209                 else:
210                     values.append(obj)
211             return values
212         def get_one(s, p):
213             values = get(s, p)
214             if len(values) > 1:
215                 errmsg = u"To many values for %s %s"
216                 raise ValueError(errmsg % (unicode(s), unicode(p)))
217             elif len(values) == 1:
218                 return values[0]
219             else:
220                 return None
221
222         if not isinstance(seq_id, URIRef):
223             seq_id = URIRef(seq_id)
224         result_statement = (seq_id, RDF['type'], libNS['IlluminaResult'])
225         if not result_statement in model:
226             raise KeyError(u"%s not found" % (unicode(seq_id),))
227
228         seq_type_node = list(model.objects(seq_id, libNS['file_type']))[0]
229         seq_type = strip_namespace(libNS, seq_type_node)
230
231         path = urlparse(str(seq_id)).path
232         flowcellNode = get_one(seq_id, libNS['flowcell'])
233         flowcell = get_one(seq_id, libNS['flowcell_id'])
234         lane = get_one(seq_id, libNS['lane_number'])
235         read = get_one(seq_id, libNS['read'])
236
237         obj = cls(seq_type, path, flowcell, lane)
238         obj.read = read if read != '' else None
239         obj.project = get_one(seq_id, libNS['library_id'])
240         obj.index = get_one(seq_id, libNS['multiplex_index'])
241         obj.split = get_one(seq_id, libNS['split_id'])
242         obj.cycle = get_one(seq_id, libNS['cycle'] )
243         obj.pf = get_one(seq_id, libNS['passed_filter'])
244         obj.libraryNode = get_one(seq_id, libNS['library'])
245         return obj
246
247
248 def get_flowcell_cycle(path):
249     """
250     Extract flowcell, cycle from pathname
251     """
252     path = os.path.normpath(path)
253     project = None
254     rest, tail = os.path.split(path)
255     if tail.startswith('Project_'):
256         # we're in a multiplexed sample
257         project = tail
258         rest, cycle = os.path.split(rest)
259     else:
260         cycle = tail
261
262     rest, flowcell = os.path.split(rest)
263     cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
264     if cycle_match is None:
265         raise ValueError(
266             "Expected .../flowcell/cycle/ directory structure in %s" % \
267             (path,))
268     start = cycle_match.group('start')
269     if start is not None:
270         start = int(start)
271     stop = cycle_match.group('stop')
272     if stop is not None:
273         stop = int(stop)
274
275     return FlowcellPath(flowcell, start, stop, project)
276
277 def parse_srf(path, filename):
278     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
279     basename, ext = os.path.splitext(filename)
280     records = basename.split('_')
281     flowcell = records[4]
282     lane = records[5][0]
283     fullpath = os.path.join(path, filename)
284
285     if flowcell_dir != flowcell:
286         LOGGER.warn("flowcell %s found in wrong directory %s" % \
287                          (flowcell, path))
288
289     return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
290
291 def parse_qseq(path, filename):
292     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
293     basename, ext = os.path.splitext(filename)
294     records = basename.split('_')
295     fullpath = os.path.join(path, filename)
296     flowcell = records[4]
297     lane = records[5][1]
298     read = int(records[6][1])
299
300     if flowcell_dir != flowcell:
301         LOGGER.warn("flowcell %s found in wrong directory %s" % \
302                          (flowcell, path))
303
304     return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
305
306 def parse_fastq(path, filename):
307     """Parse fastq names
308     """
309     flowcell_dir, start, stop, project = get_flowcell_cycle(path)
310     basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
311     records = basename.split('_')
312     fullpath = os.path.join(path, filename)
313     if project is not None:
314         # demultiplexed sample!
315         flowcell = flowcell_dir
316         lane = records[2][-1]
317         read = int(records[3][-1])
318         pf = True # as I understand it hiseq runs toss the ones that fail filter
319         index = records[1]
320         project_id = records[0]
321         split = records[4]
322         sequence_type = 'split_fastq'
323     else:
324         flowcell = records[4]
325         lane = records[5][1]
326         read = int(records[6][1])
327         pf = parse_fastq_pf_flag(records)
328         index = None
329         project_id = None
330         split = None
331         sequence_type = 'fastq'
332
333     if flowcell_dir != flowcell:
334         LOGGER.warn("flowcell %s found in wrong directory %s" % \
335                          (flowcell, path))
336
337     return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
338                         pf=pf,
339                         cycle=stop,
340                         project=project_id,
341                         index=index,
342                         split=split)
343
344 def parse_fastq_pf_flag(records):
345     """Take a fastq filename split on _ and look for the pass-filter flag
346     """
347     if len(records) < 8:
348         pf = None
349     else:
350         fastq_type = records[-1].lower()
351         if fastq_type.startswith('pass'):
352             pf = True
353         elif fastq_type.startswith('nopass'):
354             pf = False
355         elif fastq_type.startswith('all'):
356             pf = None
357         else:
358             raise ValueError("Unrecognized fastq name: %s" % (
359                 "_".join(records),))
360
361     return pf
362
363 def parse_eland(path, filename, eland_match=None):
364     if eland_match is None:
365         eland_match = eland_re.match(filename)
366     fullpath = os.path.join(path, filename)
367     flowcell, start, stop, project = get_flowcell_cycle(path)
368     if eland_match.group('lane'):
369         lane = eland_match.group('lane')
370     else:
371         lane = None
372     if eland_match.group('read'):
373         read = int(eland_match.group('read'))
374     else:
375         read = None
376     return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
377
378 def scan_for_sequences(dirs):
379     """
380     Scan through a list of directories for sequence like files
381     """
382     sequences = []
383     if isinstance(dirs, six.string_types):
384         raise ValueError("You probably want a list or set, not a string")
385
386     for d in dirs:
387         LOGGER.info("Scanning %s for sequences" % (d,))
388         if not os.path.exists(d):
389             LOGGER.warn("Flowcell directory %s does not exist" % (d,))
390             continue
391
392         for path, dirname, filenames in os.walk(d):
393             for f in filenames:
394                 seq = None
395                 # find sequence files
396                 if f.endswith('.md5'):
397                     continue
398                 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
399                     seq = parse_srf(path, f)
400                 elif qseq_re.match(f):
401                     seq = parse_qseq(path, f)
402                 elif f.endswith('.fastq') or \
403                      f.endswith('.fastq.bz2') or \
404                      f.endswith('.fastq.gz'):
405                     seq = parse_fastq(path, f)
406                 eland_match = eland_re.match(f)
407                 if eland_match:
408                     if f.endswith('.md5'):
409                         continue
410                     seq = parse_eland(path, f, eland_match)
411                 if seq:
412                     sequences.append(seq)
413                     LOGGER.debug("Found sequence at %s" % (f,))
414
415     return sequences
416
417
418 def update_model_sequence_library(model, base_url):
419     """Find sequence objects and add library information if its missing
420     """
421     file_body = """
422     prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
423     select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
424     where {
425        ?filenode a libns:IlluminaResult ;
426                  libns:flowcell_id ?flowcell_id ;
427                  libns:lane_number ?lane_id .
428        OPTIONAL { ?filenode libns:flowcell ?flowcell . }
429        OPTIONAL { ?filenode libns:library ?library .}
430        OPTIONAL { ?filenode libns:library_id ?library_id .}
431     }
432     """
433     LOGGER.debug("update_model_sequence_library query %s", file_body)
434     files = model.query(file_body)
435
436     libraryNS = Namespace(urljoin(base_url, 'library/'))
437     flowcellNS = Namespace(urljoin(base_url, 'flowcell/'))
438     for f in files:
439         filenode = f['filenode']
440         LOGGER.debug("Updating file node %s", str(filenode))
441         lane_id = f['lane_id'].toPython()
442         if f['flowcell'] is None:
443             flowcell = flowcellNS[str(f['flowcell_id'])+'/']
444             LOGGER.debug("Adding file (%s) to flowcell (%s) link",
445                          str(filenode),
446                          str(flowcell))
447             model.add((filenode, libNS['flowcell'], flowcell))
448         else:
449             flowcell = f['flowcell']
450
451         if f['library'] is None:
452             if f['library_id'] is not None:
453                 library = libraryNS[str(f['library_id']) + '/']
454             else:
455                 library = guess_library_from_model(model, base_url,
456                                                    flowcell,
457                                                    lane_id)
458                 if library is None:
459                     LOGGER.error("Unable to decypher: %s %s",
460                                  str(flowcell), str(lane_id))
461                     continue
462                 library_id = Literal(simplify_uri(library))
463                 LOGGER.debug("Adding file (%s) to library (%s) link",
464                              str(filenode),
465                              str(library))
466                 model.add((filenode, libNS['library_id'], library_id))
467             if library is not None:
468                 model.add((filenode, libNS['library'], library))
469
470
471 def guess_library_from_model(model, base_url, flowcell, lane_id):
472     """Attempt to find library URI
473     """
474     flowcellNode = URIRef(flowcell)
475     flowcell = str(flowcell)
476     lane_body = """
477     prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
478     prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
479     prefix xsd: <http://www.w3.org/2001/XMLSchema#>
480
481     select ?library ?lane
482     where {{
483       <{flowcell}> libns:has_lane ?lane ;
484                    a libns:IlluminaFlowcell .
485       ?lane libns:lane_number ?lane_id ;
486             libns:library ?library .
487       FILTER(str(?lane_id) = "{lane_id}")
488     }}
489     """
490     lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
491     LOGGER.debug("guess_library_from_model: %s", lane_body)
492     lanes = []
493     tries = 3
494     while len(lanes) == 0 and tries > 0:
495         tries -= 1
496         lanes = [ l for l in model.query(lane_body)]
497         if len(lanes) > 1:
498             # CONFUSED!
499             errmsg = "Too many libraries for flowcell {flowcell} "\
500                      "lane {lane} = {count}"
501             LOGGER.error(errmsg.format(flowcell=flowcell,
502                                        lane=lane_id,
503                                        count=len(lanes)))
504             return None
505         elif len(lanes) == 1:
506             # success
507             return lanes[0]['library']
508         else:
509             # try grabbing data
510             model.parse(source=flowcellNode, format='rdfa')