2 Utilities to work with the various eras of sequence archive files
10 from urlparse import urljoin, urlparse
13 from htsworkflow.util.rdfhelp import libraryOntology as libNS
14 from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
15 stripNamespace, dump_model, simplify_uri
17 LOGGER = logging.getLogger(__name__)
19 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
20 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
21 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
23 SEQUENCE_TABLE_NAME = "sequences"
24 def create_sequence_table(cursor):
26 Create a SQL table to hold SequenceFile entries
29 CREATE TABLE %(table)s (
38 """ %( {'table': SEQUENCE_TABLE_NAME} )
39 return cursor.execute(sql)
41 FlowcellPath = collections.namedtuple('FlowcellPath',
42 'flowcell start stop project')
44 class SequenceFile(object):
46 Simple container class that holds the path to a sequence archive
47 and basic descriptive information.
49 def __init__(self, filetype, path, flowcell, lane,
56 """Store various fields used in our sequence files
58 filetype is one of 'qseq', 'srf', 'fastq'
59 path = location of file
60 flowcell = files flowcell id
62 read = which sequencer read, usually 1 or 2
63 pf = did it pass filter
64 cycle = cycle dir name e.g. C1-202
65 project = projed name from HiSeq, probably library ID
66 index = HiSeq barcode index sequence
67 split = file fragment from HiSeq (Since one file is split into many)
69 self.filetype = filetype
71 self.flowcell = flowcell
76 self.project = project
81 return hash(self.key())
84 return (self.flowcell, self.lane, self.read, self.project, self.split)
86 def __unicode__(self):
87 return unicode(self.path)
89 def __eq__(self, other):
91 Equality is defined if everything but the path matches
93 attributes = ['filetype',
103 if getattr(self, a) != getattr(other, a):
108 def __ne__(self, other):
109 return not self == other
112 return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
114 def make_target_name(self, root):
116 Create target name for where we need to link this sequence too
118 path, basename = os.path.split(self.path)
119 # Because the names aren't unque we include the flowcel name
120 # because there were different eland files for different length
121 # analyses, we include the cycle length in the name.
122 if self.filetype == 'eland':
123 template = "%(flowcell)s_%(cycle)s_%(eland)s"
124 basename = template % { 'flowcell': self.flowcell,
128 # all the other file types have names that include flowcell/lane
129 # information and thus are unique so we don't have to do anything
130 return os.path.join(root, basename)
132 def save_to_sql(self, cursor):
134 Add this entry to a DB2.0 database.
136 #FIXME: NEEDS SQL ESCAPING
137 header_macro = {'table': SEQUENCE_TABLE_NAME }
138 sql_header = "insert into %(table)s (" % header_macro
139 sql_columns = ['filetype','path','flowcell','lane']
140 sql_middle = ") values ("
141 sql_values = [self.filetype, self.path, self.flowcell, self.lane]
143 for name in ['read', 'pf', 'cycle']:
144 value = getattr(self, name)
145 if value is not None:
146 sql_columns.append(name)
147 sql_values.append(value)
149 sql = " ".join([sql_header,
150 ", ".join(sql_columns),
152 # note the following makes a string like ?,?,?
153 ",".join(["?"] * len(sql_values)),
156 return cursor.execute(sql, sql_values)
158 def save_to_model(self, model, base_url=None):
159 def add_lit(model, s, p, o):
161 model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
162 def add(model, s, p, o):
163 model.add_statement(RDF.Statement(s,p,o))
164 # a bit unreliable... assumes filesystem is encoded in utf-8
165 path = os.path.abspath(self.path.encode('utf-8'))
166 fileNode = RDF.Node(RDF.Uri('file://' + path))
167 add(model, fileNode, rdfNS['type'], libNS['raw_file'])
168 add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
169 add_lit(model, fileNode, libNS['lane_number'], self.lane)
170 if self.read is not None:
171 add_lit(model, fileNode, libNS['read'], self.read)
173 add_lit(model, fileNode, libNS['read'], '')
175 add_lit(model, fileNode, libNS['library_id'], self.project)
176 add_lit(model, fileNode, libNS['multiplex_index'], self.index)
177 add_lit(model, fileNode, libNS['split_id'], self.split)
178 add_lit(model, fileNode, libNS['cycle'], self.cycle)
179 add_lit(model, fileNode, libNS['passed_filter'], self.pf)
180 add(model, fileNode, rdfNS['type'], libNS[self.filetype])
182 if base_url is not None:
183 flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
185 flowcell=self.flowcell)))
186 add(model, fileNode, libNS['flowcell'], flowcell)
187 if self.project is not None:
188 library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
190 library=self.project)))
191 add(model, fileNode, libNS['library'], library)
195 def load_from_model(cls, model, seq_id):
198 stmts = model.find_statements(RDF.Statement(s, p, None))
201 if not obj.is_resource():
202 values.append(fromTypedNode(obj))
209 errmsg = u"To many values for %s %s"
210 raise ValueError(errmsg % (unicode(s), unicode(p)))
211 elif len(values) == 1:
216 if not isinstance(seq_id, RDF.Node):
217 seq_id = RDF.Node(RDF.Uri(seq_id))
218 seqTypesStmt = RDF.Statement(seq_id, rdfNS['type'], None)
219 seqTypes = model.find_statements(seqTypesStmt)
220 isSequenceFile = False
222 if s.object == libNS['raw_file']:
223 isSequenceFile = True
225 seq_type = stripNamespace(libNS, s.object)
227 if not isSequenceFile:
228 raise KeyError(u"%s not found" % (unicode(seq_id),))
230 path = urlparse(str(seq_id.uri)).path
231 flowcellNode = get_one(seq_id, libNS['flowcell'])
232 flowcell = get_one(seq_id, libNS['flowcell_id'])
233 lane = get_one(seq_id, libNS['lane_number'])
234 read = get_one(seq_id, libNS['read'])
236 obj = cls(seq_type, path, flowcell, lane)
237 obj.read = read if read != '' else None
238 obj.project = get_one(seq_id, libNS['library_id'])
239 obj.index = get_one(seq_id, libNS['multiplex_index'])
240 obj.split = get_one(seq_id, libNS['split_id'])
241 obj.cycle = get_one(seq_id, libNS['cycle'] )
242 obj.pf = get_one(seq_id, libNS['passed_filter'])
243 obj.libraryNode = get_one(seq_id, libNS['library'])
247 def get_flowcell_cycle(path):
249 Extract flowcell, cycle from pathname
251 path = os.path.normpath(path)
253 rest, tail = os.path.split(path)
254 if tail.startswith('Project_'):
255 # we're in a multiplexed sample
257 rest, cycle = os.path.split(rest)
261 rest, flowcell = os.path.split(rest)
262 cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
263 if cycle_match is None:
265 "Expected .../flowcell/cycle/ directory structure in %s" % \
267 start = cycle_match.group('start')
268 if start is not None:
270 stop = cycle_match.group('stop')
274 return FlowcellPath(flowcell, start, stop, project)
276 def parse_srf(path, filename):
277 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
278 basename, ext = os.path.splitext(filename)
279 records = basename.split('_')
280 flowcell = records[4]
281 lane = int(records[5][0])
282 fullpath = os.path.join(path, filename)
284 if flowcell_dir != flowcell:
285 LOGGER.warn("flowcell %s found in wrong directory %s" % \
288 return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
290 def parse_qseq(path, filename):
291 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
292 basename, ext = os.path.splitext(filename)
293 records = basename.split('_')
294 fullpath = os.path.join(path, filename)
295 flowcell = records[4]
296 lane = int(records[5][1])
297 read = int(records[6][1])
299 if flowcell_dir != flowcell:
300 LOGGER.warn("flowcell %s found in wrong directory %s" % \
303 return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
305 def parse_fastq(path, filename):
308 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
309 basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
310 records = basename.split('_')
311 fullpath = os.path.join(path, filename)
312 if project is not None:
313 # demultiplexed sample!
314 flowcell = flowcell_dir
315 lane = int(records[2][-1])
316 read = int(records[3][-1])
317 pf = True # as I understand it hiseq runs toss the ones that fail filter
319 project_id = records[0]
321 sequence_type = 'split_fastq'
323 flowcell = records[4]
324 lane = int(records[5][1])
325 read = int(records[6][1])
326 pf = parse_fastq_pf_flag(records)
330 sequence_type = 'fastq'
332 if flowcell_dir != flowcell:
333 LOGGER.warn("flowcell %s found in wrong directory %s" % \
336 return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
343 def parse_fastq_pf_flag(records):
344 """Take a fastq filename split on _ and look for the pass-filter flag
349 fastq_type = records[-1].lower()
350 if fastq_type.startswith('pass'):
352 elif fastq_type.startswith('nopass'):
354 elif fastq_type.startswith('all'):
357 raise ValueError("Unrecognized fastq name: %s" % (
362 def parse_eland(path, filename, eland_match=None):
363 if eland_match is None:
364 eland_match = eland_re.match(filename)
365 fullpath = os.path.join(path, filename)
366 flowcell, start, stop, project = get_flowcell_cycle(path)
367 if eland_match.group('lane'):
368 lane = int(eland_match.group('lane'))
371 if eland_match.group('read'):
372 read = int(eland_match.group('read'))
375 return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
377 def scan_for_sequences(dirs):
379 Scan through a list of directories for sequence like files
382 if type(dirs) in types.StringTypes:
383 raise ValueError("You probably want a list or set, not a string")
386 LOGGER.info("Scanning %s for sequences" % (d,))
387 if not os.path.exists(d):
388 LOGGER.warn("Flowcell directory %s does not exist" % (d,))
391 for path, dirname, filenames in os.walk(d):
394 # find sequence files
395 if f.endswith('.md5'):
397 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
398 seq = parse_srf(path, f)
399 elif qseq_re.match(f):
400 seq = parse_qseq(path, f)
401 elif f.endswith('.fastq') or \
402 f.endswith('.fastq.bz2') or \
403 f.endswith('.fastq.gz'):
404 seq = parse_fastq(path, f)
405 eland_match = eland_re.match(f)
407 if f.endswith('.md5'):
409 seq = parse_eland(path, f, eland_match)
411 sequences.append(seq)
412 LOGGER.debug("Found sequence at %s" % (f,))
417 def update_model_sequence_library(model, base_url):
418 """Find sequence objects and add library information if its missing
421 prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
422 select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
424 ?filenode a libNS:raw_file ;
425 libNS:flowcell_id ?flowcell_id ;
426 libNS:lane_number ?lane_id .
427 OPTIONAL { ?filenode libNS:flowcell ?flowcell . }
428 OPTIONAL { ?filenode libNS:library ?library .}
429 OPTIONAL { ?filenode libNS:library_id ?library_id .}
432 file_query = RDF.SPARQLQuery(file_body)
433 files = file_query.execute(model)
435 libraryNS = RDF.NS(urljoin(base_url, 'library/'))
436 flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
438 filenode = f['filenode']
439 lane_id = fromTypedNode(f['lane_id'])
440 if f['flowcell'] is None:
441 flowcell = flowcellNS[str(f['flowcell_id'])+'/']
443 RDF.Statement(filenode, libNS['flowcell'], flowcell))
445 flowcell = f['flowcell']
447 if f['library'] is None:
448 if f['library_id'] is not None:
449 library = libraryNS[str(f['library_id']) + '/']
451 library = guess_library_from_model(model, base_url,
454 library_id = toTypedNode(simplify_uri(library))
456 RDF.Statement(filenode, libNS['library_id'], library_id))
457 if library is not None:
459 RDF.Statement(filenode, libNS['library'], library))
462 def guess_library_from_model(model, base_url, flowcell, lane_id):
463 """Attempt to find library URI
465 flowcellNode = RDF.Node(flowcell)
466 flowcell = str(flowcell.uri)
468 prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
469 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
470 prefix xsd: <http://www.w3.org/2001/XMLSchema#>
472 select ?library ?lane
474 <{flowcell}> libNS:has_lane ?lane ;
475 a libNS:illumina_flowcell .
476 ?lane libNS:lane_number {lane_id} ;
477 libNS:library ?library .
480 lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
483 while len(lanes) == 0 and tries > 0:
485 lane_query = RDF.SPARQLQuery(lane_body)
486 lanes = [ l for l in lane_query.execute(model)]
489 errmsg = "Too many libraries for flowcell {flowcell} "\
490 "lane {lane} = {count}"
491 LOGGER.error(errmsg.format(flowcell=flowcell,
495 elif len(lanes) == 1:
497 return lanes[0]['library']
500 model.load(flowcellNode.uri, name="rdfa")