2 Utilities to work with the various eras of sequence archive files
11 from six.moves.urllib.parse import urljoin, urlparse
14 from htsworkflow.util.rdfhelp import libraryOntology as libNS
15 from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
16 strip_namespace, dump_model, simplify_uri
18 LOGGER = logging.getLogger(__name__)
20 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
21 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
22 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
24 SEQUENCE_TABLE_NAME = "sequences"
25 def create_sequence_table(cursor):
27 Create a SQL table to hold SequenceFile entries
30 CREATE TABLE %(table)s (
39 """ %( {'table': SEQUENCE_TABLE_NAME} )
40 return cursor.execute(sql)
42 FlowcellPath = collections.namedtuple('FlowcellPath',
43 'flowcell start stop project')
45 class SequenceFile(object):
47 Simple container class that holds the path to a sequence archive
48 and basic descriptive information.
50 def __init__(self, filetype, path, flowcell, lane,
57 """Store various fields used in our sequence files
59 filetype is one of 'qseq', 'srf', 'fastq'
60 path = location of file
61 flowcell = files flowcell id
63 read = which sequencer read, usually 1 or 2
64 pf = did it pass filter
65 cycle = cycle dir name e.g. C1-202
66 project = projed name from HiSeq, probably library ID
67 index = HiSeq barcode index sequence
68 split = file fragment from HiSeq (Since one file is split into many)
70 self.filetype = filetype
72 self.flowcell = flowcell
77 self.project = project
82 return hash(self.key())
85 return (self.flowcell, self.lane, self.read, self.project, self.split)
90 def __eq__(self, other):
92 Equality is defined if everything but the path matches
94 attributes = ['filetype',
104 if getattr(self, a) != getattr(other, a):
109 def __ne__(self, other):
110 return not self == other
113 return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
115 def make_target_name(self, root):
117 Create target name for where we need to link this sequence too
119 path, basename = os.path.split(self.path)
120 # Because the names aren't unque we include the flowcel name
121 # because there were different eland files for different length
122 # analyses, we include the cycle length in the name.
123 if self.filetype == 'eland':
124 template = "%(flowcell)s_%(cycle)s_%(eland)s"
125 basename = template % { 'flowcell': self.flowcell,
129 # all the other file types have names that include flowcell/lane
130 # information and thus are unique so we don't have to do anything
131 return os.path.join(root, basename)
133 def save_to_sql(self, cursor):
135 Add this entry to a DB2.0 database.
137 #FIXME: NEEDS SQL ESCAPING
138 header_macro = {'table': SEQUENCE_TABLE_NAME }
139 sql_header = "insert into %(table)s (" % header_macro
140 sql_columns = ['filetype','path','flowcell','lane']
141 sql_middle = ") values ("
142 sql_values = [self.filetype, self.path, self.flowcell, self.lane]
144 for name in ['read', 'pf', 'cycle']:
145 value = getattr(self, name)
146 if value is not None:
147 sql_columns.append(name)
148 sql_values.append(value)
150 sql = " ".join([sql_header,
151 ", ".join(sql_columns),
153 # note the following makes a string like ?,?,?
154 ",".join(["?"] * len(sql_values)),
157 return cursor.execute(sql, sql_values)
159 def save_to_model(self, model, base_url=None):
160 def add_lit(model, s, p, o):
162 model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
163 def add(model, s, p, o):
164 model.add_statement(RDF.Statement(s,p,o))
165 # a bit unreliable... assumes filesystem is encoded in utf-8
166 path = os.path.abspath(self.path)
167 fileNode = RDF.Node(RDF.Uri('file://' + path))
168 add(model, fileNode, rdfNS['type'], libNS['IlluminaResult'])
169 add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
170 add_lit(model, fileNode, libNS['lane_number'], self.lane)
171 if self.read is not None:
172 add_lit(model, fileNode, libNS['read'], self.read)
174 add_lit(model, fileNode, libNS['read'], '')
176 add_lit(model, fileNode, libNS['library_id'], self.project)
177 add_lit(model, fileNode, libNS['multiplex_index'], self.index)
178 add_lit(model, fileNode, libNS['split_id'], self.split)
179 add_lit(model, fileNode, libNS['cycle'], self.cycle)
180 add_lit(model, fileNode, libNS['passed_filter'], self.pf)
181 add(model, fileNode, libNS['file_type'], libNS[self.filetype])
183 if base_url is not None:
184 flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
186 flowcell=self.flowcell)))
187 add(model, fileNode, libNS['flowcell'], flowcell)
188 if self.project is not None:
189 library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
191 library=self.project)))
192 add(model, fileNode, libNS['library'], library)
196 def load_from_model(cls, model, seq_id):
199 stmts = model.find_statements(RDF.Statement(s, p, None))
202 if not obj.is_resource():
203 values.append(fromTypedNode(obj))
210 errmsg = u"To many values for %s %s"
211 raise ValueError(errmsg % (unicode(s), unicode(p)))
212 elif len(values) == 1:
217 if not isinstance(seq_id, RDF.Node):
218 seq_id = RDF.Node(RDF.Uri(seq_id))
219 result_statement = RDF.Statement(seq_id,
221 libNS['IlluminaResult'])
222 if not model.contains_statement(result_statement):
223 raise KeyError(u"%s not found" % (unicode(seq_id),))
225 seq_type_node = model.get_target(seq_id, libNS['file_type'])
226 seq_type = strip_namespace(libNS, seq_type_node)
228 path = urlparse(str(seq_id.uri)).path
229 flowcellNode = get_one(seq_id, libNS['flowcell'])
230 flowcell = get_one(seq_id, libNS['flowcell_id'])
231 lane = get_one(seq_id, libNS['lane_number'])
232 read = get_one(seq_id, libNS['read'])
234 obj = cls(seq_type, path, flowcell, lane)
235 obj.read = read if read != '' else None
236 obj.project = get_one(seq_id, libNS['library_id'])
237 obj.index = get_one(seq_id, libNS['multiplex_index'])
238 obj.split = get_one(seq_id, libNS['split_id'])
239 obj.cycle = get_one(seq_id, libNS['cycle'] )
240 obj.pf = get_one(seq_id, libNS['passed_filter'])
241 obj.libraryNode = get_one(seq_id, libNS['library'])
245 def get_flowcell_cycle(path):
247 Extract flowcell, cycle from pathname
249 path = os.path.normpath(path)
251 rest, tail = os.path.split(path)
252 if tail.startswith('Project_'):
253 # we're in a multiplexed sample
255 rest, cycle = os.path.split(rest)
259 rest, flowcell = os.path.split(rest)
260 cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
261 if cycle_match is None:
263 "Expected .../flowcell/cycle/ directory structure in %s" % \
265 start = cycle_match.group('start')
266 if start is not None:
268 stop = cycle_match.group('stop')
272 return FlowcellPath(flowcell, start, stop, project)
274 def parse_srf(path, filename):
275 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
276 basename, ext = os.path.splitext(filename)
277 records = basename.split('_')
278 flowcell = records[4]
280 fullpath = os.path.join(path, filename)
282 if flowcell_dir != flowcell:
283 LOGGER.warn("flowcell %s found in wrong directory %s" % \
286 return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
288 def parse_qseq(path, filename):
289 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
290 basename, ext = os.path.splitext(filename)
291 records = basename.split('_')
292 fullpath = os.path.join(path, filename)
293 flowcell = records[4]
295 read = int(records[6][1])
297 if flowcell_dir != flowcell:
298 LOGGER.warn("flowcell %s found in wrong directory %s" % \
301 return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
303 def parse_fastq(path, filename):
306 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
307 basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
308 records = basename.split('_')
309 fullpath = os.path.join(path, filename)
310 if project is not None:
311 # demultiplexed sample!
312 flowcell = flowcell_dir
313 lane = records[2][-1]
314 read = int(records[3][-1])
315 pf = True # as I understand it hiseq runs toss the ones that fail filter
317 project_id = records[0]
319 sequence_type = 'split_fastq'
321 flowcell = records[4]
323 read = int(records[6][1])
324 pf = parse_fastq_pf_flag(records)
328 sequence_type = 'fastq'
330 if flowcell_dir != flowcell:
331 LOGGER.warn("flowcell %s found in wrong directory %s" % \
334 return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
341 def parse_fastq_pf_flag(records):
342 """Take a fastq filename split on _ and look for the pass-filter flag
347 fastq_type = records[-1].lower()
348 if fastq_type.startswith('pass'):
350 elif fastq_type.startswith('nopass'):
352 elif fastq_type.startswith('all'):
355 raise ValueError("Unrecognized fastq name: %s" % (
360 def parse_eland(path, filename, eland_match=None):
361 if eland_match is None:
362 eland_match = eland_re.match(filename)
363 fullpath = os.path.join(path, filename)
364 flowcell, start, stop, project = get_flowcell_cycle(path)
365 if eland_match.group('lane'):
366 lane = eland_match.group('lane')
369 if eland_match.group('read'):
370 read = int(eland_match.group('read'))
373 return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
375 def scan_for_sequences(dirs):
377 Scan through a list of directories for sequence like files
380 if isinstance(dirs, six.string_types):
381 raise ValueError("You probably want a list or set, not a string")
384 LOGGER.info("Scanning %s for sequences" % (d,))
385 if not os.path.exists(d):
386 LOGGER.warn("Flowcell directory %s does not exist" % (d,))
389 for path, dirname, filenames in os.walk(d):
392 # find sequence files
393 if f.endswith('.md5'):
395 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
396 seq = parse_srf(path, f)
397 elif qseq_re.match(f):
398 seq = parse_qseq(path, f)
399 elif f.endswith('.fastq') or \
400 f.endswith('.fastq.bz2') or \
401 f.endswith('.fastq.gz'):
402 seq = parse_fastq(path, f)
403 eland_match = eland_re.match(f)
405 if f.endswith('.md5'):
407 seq = parse_eland(path, f, eland_match)
409 sequences.append(seq)
410 LOGGER.debug("Found sequence at %s" % (f,))
415 def update_model_sequence_library(model, base_url):
416 """Find sequence objects and add library information if its missing
419 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
420 select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
422 ?filenode a libns:IlluminaResult ;
423 libns:flowcell_id ?flowcell_id ;
424 libns:lane_number ?lane_id .
425 OPTIONAL { ?filenode libns:flowcell ?flowcell . }
426 OPTIONAL { ?filenode libns:library ?library .}
427 OPTIONAL { ?filenode libns:library_id ?library_id .}
430 LOGGER.debug("update_model_sequence_library query %s", file_body)
431 file_query = RDF.SPARQLQuery(file_body)
432 files = file_query.execute(model)
434 libraryNS = RDF.NS(urljoin(base_url, 'library/'))
435 flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
437 filenode = f['filenode']
438 LOGGER.debug("Updating file node %s", str(filenode))
439 lane_id = fromTypedNode(f['lane_id'])
440 if f['flowcell'] is None:
441 flowcell = flowcellNS[str(f['flowcell_id'])+'/']
442 LOGGER.debug("Adding file (%s) to flowcell (%s) link",
446 RDF.Statement(filenode, libNS['flowcell'], flowcell))
448 flowcell = f['flowcell']
450 if f['library'] is None:
451 if f['library_id'] is not None:
452 library = libraryNS[str(f['library_id']) + '/']
454 library = guess_library_from_model(model, base_url,
458 LOGGER.error("Unable to decypher: %s %s",
459 str(flowcell), str(lane_id))
461 library_id = toTypedNode(simplify_uri(library))
462 LOGGER.debug("Adding file (%s) to library (%s) link",
466 RDF.Statement(filenode, libNS['library_id'], library_id))
467 if library is not None:
469 RDF.Statement(filenode, libNS['library'], library))
472 def guess_library_from_model(model, base_url, flowcell, lane_id):
473 """Attempt to find library URI
475 flowcellNode = RDF.Node(flowcell)
476 flowcell = str(flowcell.uri)
478 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
479 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
480 prefix xsd: <http://www.w3.org/2001/XMLSchema#>
482 select ?library ?lane
484 <{flowcell}> libns:has_lane ?lane ;
485 a libns:IlluminaFlowcell .
486 ?lane libns:lane_number ?lane_id ;
487 libns:library ?library .
488 FILTER(str(?lane_id) = "{lane_id}")
491 lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
492 LOGGER.debug("guess_library_from_model: %s", lane_body)
495 while len(lanes) == 0 and tries > 0:
497 lane_query = RDF.SPARQLQuery(lane_body)
498 lanes = [ l for l in lane_query.execute(model)]
501 errmsg = "Too many libraries for flowcell {flowcell} "\
502 "lane {lane} = {count}"
503 LOGGER.error(errmsg.format(flowcell=flowcell,
507 elif len(lanes) == 1:
509 return lanes[0]['library']
512 model.load(flowcellNode.uri, name="rdfa")