2 Utilities to work with the various eras of sequence archive files
11 from six.moves.urllib.parse import urljoin, urlparse
13 from rdflib import BNode, Literal, Namespace, URIRef
14 from htsworkflow.util.rdfhelp import (
16 libraryOntology as libNS,
23 LOGGER = logging.getLogger(__name__)
25 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
26 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
27 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
29 SEQUENCE_TABLE_NAME = "sequences"
30 def create_sequence_table(cursor):
32 Create a SQL table to hold SequenceFile entries
35 CREATE TABLE %(table)s (
44 """ %( {'table': SEQUENCE_TABLE_NAME} )
45 return cursor.execute(sql)
47 FlowcellPath = collections.namedtuple('FlowcellPath',
48 'flowcell start stop project')
50 class SequenceFile(object):
52 Simple container class that holds the path to a sequence archive
53 and basic descriptive information.
55 def __init__(self, filetype, path, flowcell, lane,
62 """Store various fields used in our sequence files
64 filetype is one of 'qseq', 'srf', 'fastq'
65 path = location of file
66 flowcell = files flowcell id
68 read = which sequencer read, usually 1 or 2
69 pf = did it pass filter
70 cycle = cycle dir name e.g. C1-202
71 project = projed name from HiSeq, probably library ID
72 index = HiSeq barcode index sequence
73 split = file fragment from HiSeq (Since one file is split into many)
75 self.filetype = filetype
77 self.flowcell = flowcell
82 self.project = project
87 return hash(self.key())
90 return (self.flowcell, self.lane, self.read, self.project, self.split)
95 def __eq__(self, other):
97 Equality is defined if everything but the path matches
99 attributes = ['filetype',
109 if getattr(self, a) != getattr(other, a):
114 def __ne__(self, other):
115 return not self == other
118 return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
120 def make_target_name(self, root):
122 Create target name for where we need to link this sequence too
124 path, basename = os.path.split(self.path)
125 # Because the names aren't unque we include the flowcel name
126 # because there were different eland files for different length
127 # analyses, we include the cycle length in the name.
128 if self.filetype == 'eland':
129 template = "%(flowcell)s_%(cycle)s_%(eland)s"
130 basename = template % { 'flowcell': self.flowcell,
134 # all the other file types have names that include flowcell/lane
135 # information and thus are unique so we don't have to do anything
136 return os.path.join(root, basename)
138 def save_to_sql(self, cursor):
140 Add this entry to a DB2.0 database.
142 #FIXME: NEEDS SQL ESCAPING
143 header_macro = {'table': SEQUENCE_TABLE_NAME }
144 sql_header = "insert into %(table)s (" % header_macro
145 sql_columns = ['filetype','path','flowcell','lane']
146 sql_middle = ") values ("
147 sql_values = [self.filetype, self.path, self.flowcell, self.lane]
149 for name in ['read', 'pf', 'cycle']:
150 value = getattr(self, name)
151 if value is not None:
152 sql_columns.append(name)
153 sql_values.append(value)
155 sql = " ".join([sql_header,
156 ", ".join(sql_columns),
158 # note the following makes a string like ?,?,?
159 ",".join(["?"] * len(sql_values)),
162 return cursor.execute(sql, sql_values)
164 def save_to_model(self, model, base_url=None):
165 def add_lit(model, s, p, o):
167 model.add((s, p, Literal(o)))
168 def add(model, s, p, o):
170 # a bit unreliable... assumes filesystem is encoded in utf-8
171 path = os.path.abspath(self.path)
172 fileNode = URIRef('file://' + path)
173 add(model, fileNode, RDF['type'], libNS['IlluminaResult'])
174 add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
175 add_lit(model, fileNode, libNS['lane_number'], self.lane)
176 if self.read is not None:
177 add_lit(model, fileNode, libNS['read'], self.read)
179 add_lit(model, fileNode, libNS['read'], '')
181 add_lit(model, fileNode, libNS['library_id'], self.project)
182 add_lit(model, fileNode, libNS['multiplex_index'], self.index)
183 add_lit(model, fileNode, libNS['split_id'], self.split)
184 add_lit(model, fileNode, libNS['cycle'], self.cycle)
185 add_lit(model, fileNode, libNS['passed_filter'], self.pf)
186 add(model, fileNode, libNS['file_type'], libNS[self.filetype])
188 if base_url is not None:
189 flowcell = URIRef("{base}/flowcell/{flowcell}/".format(
191 flowcell=self.flowcell))
192 add(model, fileNode, libNS['flowcell'], flowcell)
193 if self.project is not None:
194 library = URIRef("{base}/library/{library}".format(
196 library=self.project))
197 add(model, fileNode, libNS['library'], library)
201 def load_from_model(cls, model, seq_id):
204 stmts = model.triples((s, p, None))
207 if not isinstance(obj, URIRef):
208 values.append(obj.toPython())
215 errmsg = u"To many values for %s %s"
216 raise ValueError(errmsg % (unicode(s), unicode(p)))
217 elif len(values) == 1:
222 if not isinstance(seq_id, URIRef):
223 seq_id = URIRef(seq_id)
224 result_statement = (seq_id, RDF['type'], libNS['IlluminaResult'])
225 if not result_statement in model:
226 raise KeyError(u"%s not found" % (unicode(seq_id),))
228 seq_type_node = list(model.objects(seq_id, libNS['file_type']))[0]
229 seq_type = strip_namespace(libNS, seq_type_node)
231 path = urlparse(str(seq_id)).path
232 flowcellNode = get_one(seq_id, libNS['flowcell'])
233 flowcell = get_one(seq_id, libNS['flowcell_id'])
234 lane = get_one(seq_id, libNS['lane_number'])
235 read = get_one(seq_id, libNS['read'])
237 obj = cls(seq_type, path, flowcell, lane)
238 obj.read = read if read != '' else None
239 obj.project = get_one(seq_id, libNS['library_id'])
240 obj.index = get_one(seq_id, libNS['multiplex_index'])
241 obj.split = get_one(seq_id, libNS['split_id'])
242 obj.cycle = get_one(seq_id, libNS['cycle'] )
243 obj.pf = get_one(seq_id, libNS['passed_filter'])
244 obj.libraryNode = get_one(seq_id, libNS['library'])
248 def get_flowcell_cycle(path):
250 Extract flowcell, cycle from pathname
252 path = os.path.normpath(path)
254 rest, tail = os.path.split(path)
255 if tail.startswith('Project_'):
256 # we're in a multiplexed sample
258 rest, cycle = os.path.split(rest)
262 rest, flowcell = os.path.split(rest)
263 cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
264 if cycle_match is None:
266 "Expected .../flowcell/cycle/ directory structure in %s" % \
268 start = cycle_match.group('start')
269 if start is not None:
271 stop = cycle_match.group('stop')
275 return FlowcellPath(flowcell, start, stop, project)
277 def parse_srf(path, filename):
278 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
279 basename, ext = os.path.splitext(filename)
280 records = basename.split('_')
281 flowcell = records[4]
283 fullpath = os.path.join(path, filename)
285 if flowcell_dir != flowcell:
286 LOGGER.warn("flowcell %s found in wrong directory %s" % \
289 return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
291 def parse_qseq(path, filename):
292 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
293 basename, ext = os.path.splitext(filename)
294 records = basename.split('_')
295 fullpath = os.path.join(path, filename)
296 flowcell = records[4]
298 read = int(records[6][1])
300 if flowcell_dir != flowcell:
301 LOGGER.warn("flowcell %s found in wrong directory %s" % \
304 return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
306 def parse_fastq(path, filename):
309 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
310 basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
311 records = basename.split('_')
312 fullpath = os.path.join(path, filename)
313 if project is not None:
314 # demultiplexed sample!
315 flowcell = flowcell_dir
316 lane = records[2][-1]
317 read = int(records[3][-1])
318 pf = True # as I understand it hiseq runs toss the ones that fail filter
320 project_id = records[0]
322 sequence_type = 'split_fastq'
324 flowcell = records[4]
326 read = int(records[6][1])
327 pf = parse_fastq_pf_flag(records)
331 sequence_type = 'fastq'
333 if flowcell_dir != flowcell:
334 LOGGER.warn("flowcell %s found in wrong directory %s" % \
337 return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
344 def parse_fastq_pf_flag(records):
345 """Take a fastq filename split on _ and look for the pass-filter flag
350 fastq_type = records[-1].lower()
351 if fastq_type.startswith('pass'):
353 elif fastq_type.startswith('nopass'):
355 elif fastq_type.startswith('all'):
358 raise ValueError("Unrecognized fastq name: %s" % (
363 def parse_eland(path, filename, eland_match=None):
364 if eland_match is None:
365 eland_match = eland_re.match(filename)
366 fullpath = os.path.join(path, filename)
367 flowcell, start, stop, project = get_flowcell_cycle(path)
368 if eland_match.group('lane'):
369 lane = eland_match.group('lane')
372 if eland_match.group('read'):
373 read = int(eland_match.group('read'))
376 return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
378 def scan_for_sequences(dirs):
380 Scan through a list of directories for sequence like files
383 if isinstance(dirs, six.string_types):
384 raise ValueError("You probably want a list or set, not a string")
387 LOGGER.info("Scanning %s for sequences" % (d,))
388 if not os.path.exists(d):
389 LOGGER.warn("Flowcell directory %s does not exist" % (d,))
392 for path, dirname, filenames in os.walk(d):
395 # find sequence files
396 if f.endswith('.md5'):
398 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
399 seq = parse_srf(path, f)
400 elif qseq_re.match(f):
401 seq = parse_qseq(path, f)
402 elif f.endswith('.fastq') or \
403 f.endswith('.fastq.bz2') or \
404 f.endswith('.fastq.gz'):
405 seq = parse_fastq(path, f)
406 eland_match = eland_re.match(f)
408 if f.endswith('.md5'):
410 seq = parse_eland(path, f, eland_match)
412 sequences.append(seq)
413 LOGGER.debug("Found sequence at %s" % (f,))
418 def update_model_sequence_library(model, base_url):
419 """Find sequence objects and add library information if its missing
422 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
423 select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
425 ?filenode a libns:IlluminaResult ;
426 libns:flowcell_id ?flowcell_id ;
427 libns:lane_number ?lane_id .
428 OPTIONAL { ?filenode libns:flowcell ?flowcell . }
429 OPTIONAL { ?filenode libns:library ?library .}
430 OPTIONAL { ?filenode libns:library_id ?library_id .}
433 LOGGER.debug("update_model_sequence_library query %s", file_body)
434 files = model.query(file_body)
436 libraryNS = Namespace(urljoin(base_url, 'library/'))
437 flowcellNS = Namespace(urljoin(base_url, 'flowcell/'))
439 filenode = f['filenode']
440 LOGGER.debug("Updating file node %s", str(filenode))
441 lane_id = f['lane_id'].toPython()
442 if f['flowcell'] is None:
443 flowcell = flowcellNS[str(f['flowcell_id'])+'/']
444 LOGGER.debug("Adding file (%s) to flowcell (%s) link",
447 model.add((filenode, libNS['flowcell'], flowcell))
449 flowcell = f['flowcell']
451 if f['library'] is None:
452 if f['library_id'] is not None:
453 library = libraryNS[str(f['library_id']) + '/']
455 library = guess_library_from_model(model, base_url,
459 LOGGER.error("Unable to decypher: %s %s",
460 str(flowcell), str(lane_id))
462 library_id = Literal(simplify_uri(library))
463 LOGGER.debug("Adding file (%s) to library (%s) link",
466 model.add((filenode, libNS['library_id'], library_id))
467 if library is not None:
468 model.add((filenode, libNS['library'], library))
471 def guess_library_from_model(model, base_url, flowcell, lane_id):
472 """Attempt to find library URI
474 flowcellNode = URIRef(flowcell)
475 flowcell = str(flowcell)
477 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
478 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
479 prefix xsd: <http://www.w3.org/2001/XMLSchema#>
481 select ?library ?lane
483 <{flowcell}> libns:has_lane ?lane ;
484 a libns:IlluminaFlowcell .
485 ?lane libns:lane_number ?lane_id ;
486 libns:library ?library .
487 FILTER(str(?lane_id) = "{lane_id}")
490 lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
491 LOGGER.debug("guess_library_from_model: %s", lane_body)
494 while len(lanes) == 0 and tries > 0:
496 lanes = [ l for l in model.query(lane_body)]
499 errmsg = "Too many libraries for flowcell {flowcell} "\
500 "lane {lane} = {count}"
501 LOGGER.error(errmsg.format(flowcell=flowcell,
505 elif len(lanes) == 1:
507 return lanes[0]['library']
510 model.parse(source=flowcellNode, format='rdfa')