2 Utilities to work with the various eras of sequence archive files
10 from urlparse import urljoin, urlparse
13 from htsworkflow.util.rdfhelp import libraryOntology as libNS
14 from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
15 stripNamespace, dump_model, simplify_uri
17 LOGGER = logging.getLogger(__name__)
19 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
20 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
21 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
23 SEQUENCE_TABLE_NAME = "sequences"
24 def create_sequence_table(cursor):
26 Create a SQL table to hold SequenceFile entries
29 CREATE TABLE %(table)s (
38 """ %( {'table': SEQUENCE_TABLE_NAME} )
39 return cursor.execute(sql)
41 FlowcellPath = collections.namedtuple('FlowcellPath',
42 'flowcell start stop project')
44 class SequenceFile(object):
46 Simple container class that holds the path to a sequence archive
47 and basic descriptive information.
49 def __init__(self, filetype, path, flowcell, lane,
56 """Store various fields used in our sequence files
58 filetype is one of 'qseq', 'srf', 'fastq'
59 path = location of file
60 flowcell = files flowcell id
62 read = which sequencer read, usually 1 or 2
63 pf = did it pass filter
64 cycle = cycle dir name e.g. C1-202
65 project = projed name from HiSeq, probably library ID
66 index = HiSeq barcode index sequence
67 split = file fragment from HiSeq (Since one file is split into many)
69 self.filetype = filetype
71 self.flowcell = flowcell
76 self.project = project
81 return hash(self.key())
84 return (self.flowcell, self.lane, self.read, self.project, self.split)
86 def __unicode__(self):
87 return unicode(self.path)
89 def __eq__(self, other):
91 Equality is defined if everything but the path matches
93 attributes = ['filetype',
103 if getattr(self, a) != getattr(other, a):
108 def __ne__(self, other):
109 return not self == other
112 return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
114 def make_target_name(self, root):
116 Create target name for where we need to link this sequence too
118 path, basename = os.path.split(self.path)
119 # Because the names aren't unque we include the flowcel name
120 # because there were different eland files for different length
121 # analyses, we include the cycle length in the name.
122 if self.filetype == 'eland':
123 template = "%(flowcell)s_%(cycle)s_%(eland)s"
124 basename = template % { 'flowcell': self.flowcell,
128 # all the other file types have names that include flowcell/lane
129 # information and thus are unique so we don't have to do anything
130 return os.path.join(root, basename)
132 def save_to_sql(self, cursor):
134 Add this entry to a DB2.0 database.
136 #FIXME: NEEDS SQL ESCAPING
137 header_macro = {'table': SEQUENCE_TABLE_NAME }
138 sql_header = "insert into %(table)s (" % header_macro
139 sql_columns = ['filetype','path','flowcell','lane']
140 sql_middle = ") values ("
141 sql_values = [self.filetype, self.path, self.flowcell, self.lane]
143 for name in ['read', 'pf', 'cycle']:
144 value = getattr(self, name)
145 if value is not None:
146 sql_columns.append(name)
147 sql_values.append(value)
149 sql = " ".join([sql_header,
150 ", ".join(sql_columns),
152 # note the following makes a string like ?,?,?
153 ",".join(["?"] * len(sql_values)),
156 return cursor.execute(sql, sql_values)
158 def save_to_model(self, model, base_url=None):
159 def add_lit(model, s, p, o):
161 model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
162 def add(model, s, p, o):
163 model.add_statement(RDF.Statement(s,p,o))
164 # a bit unreliable... assumes filesystem is encoded in utf-8
165 path = os.path.abspath(self.path.encode('utf-8'))
166 fileNode = RDF.Node(RDF.Uri('file://' + path))
167 add(model, fileNode, rdfNS['type'], libNS['illumina_result'])
168 add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
169 add_lit(model, fileNode, libNS['lane_number'], self.lane)
170 if self.read is not None:
171 add_lit(model, fileNode, libNS['read'], self.read)
173 add_lit(model, fileNode, libNS['read'], '')
175 add_lit(model, fileNode, libNS['library_id'], self.project)
176 add_lit(model, fileNode, libNS['multiplex_index'], self.index)
177 add_lit(model, fileNode, libNS['split_id'], self.split)
178 add_lit(model, fileNode, libNS['cycle'], self.cycle)
179 add_lit(model, fileNode, libNS['passed_filter'], self.pf)
180 add(model, fileNode, libNS['file_type'], libNS[self.filetype])
182 if base_url is not None:
183 flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
185 flowcell=self.flowcell)))
186 add(model, fileNode, libNS['flowcell'], flowcell)
187 if self.project is not None:
188 library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
190 library=self.project)))
191 add(model, fileNode, libNS['library'], library)
195 def load_from_model(cls, model, seq_id):
198 stmts = model.find_statements(RDF.Statement(s, p, None))
201 if not obj.is_resource():
202 values.append(fromTypedNode(obj))
209 errmsg = u"To many values for %s %s"
210 raise ValueError(errmsg % (unicode(s), unicode(p)))
211 elif len(values) == 1:
216 if not isinstance(seq_id, RDF.Node):
217 seq_id = RDF.Node(RDF.Uri(seq_id))
218 result_statement = RDF.Statement(seq_id,
220 libNS['illumina_result'])
221 if not model.contains_statement(result_statement):
222 raise KeyError(u"%s not found" % (unicode(seq_id),))
224 seq_type_node = model.get_target(seq_id, libNS['file_type'])
225 seq_type = stripNamespace(libNS, seq_type_node)
227 path = urlparse(str(seq_id.uri)).path
228 flowcellNode = get_one(seq_id, libNS['flowcell'])
229 flowcell = get_one(seq_id, libNS['flowcell_id'])
230 lane = get_one(seq_id, libNS['lane_number'])
231 read = get_one(seq_id, libNS['read'])
233 obj = cls(seq_type, path, flowcell, lane)
234 obj.read = read if read != '' else None
235 obj.project = get_one(seq_id, libNS['library_id'])
236 obj.index = get_one(seq_id, libNS['multiplex_index'])
237 obj.split = get_one(seq_id, libNS['split_id'])
238 obj.cycle = get_one(seq_id, libNS['cycle'] )
239 obj.pf = get_one(seq_id, libNS['passed_filter'])
240 obj.libraryNode = get_one(seq_id, libNS['library'])
244 def get_flowcell_cycle(path):
246 Extract flowcell, cycle from pathname
248 path = os.path.normpath(path)
250 rest, tail = os.path.split(path)
251 if tail.startswith('Project_'):
252 # we're in a multiplexed sample
254 rest, cycle = os.path.split(rest)
258 rest, flowcell = os.path.split(rest)
259 cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
260 if cycle_match is None:
262 "Expected .../flowcell/cycle/ directory structure in %s" % \
264 start = cycle_match.group('start')
265 if start is not None:
267 stop = cycle_match.group('stop')
271 return FlowcellPath(flowcell, start, stop, project)
273 def parse_srf(path, filename):
274 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
275 basename, ext = os.path.splitext(filename)
276 records = basename.split('_')
277 flowcell = records[4]
278 lane = int(records[5][0])
279 fullpath = os.path.join(path, filename)
281 if flowcell_dir != flowcell:
282 LOGGER.warn("flowcell %s found in wrong directory %s" % \
285 return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
287 def parse_qseq(path, filename):
288 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
289 basename, ext = os.path.splitext(filename)
290 records = basename.split('_')
291 fullpath = os.path.join(path, filename)
292 flowcell = records[4]
293 lane = int(records[5][1])
294 read = int(records[6][1])
296 if flowcell_dir != flowcell:
297 LOGGER.warn("flowcell %s found in wrong directory %s" % \
300 return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
302 def parse_fastq(path, filename):
305 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
306 basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
307 records = basename.split('_')
308 fullpath = os.path.join(path, filename)
309 if project is not None:
310 # demultiplexed sample!
311 flowcell = flowcell_dir
312 lane = int(records[2][-1])
313 read = int(records[3][-1])
314 pf = True # as I understand it hiseq runs toss the ones that fail filter
316 project_id = records[0]
318 sequence_type = 'split_fastq'
320 flowcell = records[4]
321 lane = int(records[5][1])
322 read = int(records[6][1])
323 pf = parse_fastq_pf_flag(records)
327 sequence_type = 'fastq'
329 if flowcell_dir != flowcell:
330 LOGGER.warn("flowcell %s found in wrong directory %s" % \
333 return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
340 def parse_fastq_pf_flag(records):
341 """Take a fastq filename split on _ and look for the pass-filter flag
346 fastq_type = records[-1].lower()
347 if fastq_type.startswith('pass'):
349 elif fastq_type.startswith('nopass'):
351 elif fastq_type.startswith('all'):
354 raise ValueError("Unrecognized fastq name: %s" % (
359 def parse_eland(path, filename, eland_match=None):
360 if eland_match is None:
361 eland_match = eland_re.match(filename)
362 fullpath = os.path.join(path, filename)
363 flowcell, start, stop, project = get_flowcell_cycle(path)
364 if eland_match.group('lane'):
365 lane = int(eland_match.group('lane'))
368 if eland_match.group('read'):
369 read = int(eland_match.group('read'))
372 return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
374 def scan_for_sequences(dirs):
376 Scan through a list of directories for sequence like files
379 if type(dirs) in types.StringTypes:
380 raise ValueError("You probably want a list or set, not a string")
383 LOGGER.info("Scanning %s for sequences" % (d,))
384 if not os.path.exists(d):
385 LOGGER.warn("Flowcell directory %s does not exist" % (d,))
388 for path, dirname, filenames in os.walk(d):
391 # find sequence files
392 if f.endswith('.md5'):
394 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
395 seq = parse_srf(path, f)
396 elif qseq_re.match(f):
397 seq = parse_qseq(path, f)
398 elif f.endswith('.fastq') or \
399 f.endswith('.fastq.bz2') or \
400 f.endswith('.fastq.gz'):
401 seq = parse_fastq(path, f)
402 eland_match = eland_re.match(f)
404 if f.endswith('.md5'):
406 seq = parse_eland(path, f, eland_match)
408 sequences.append(seq)
409 LOGGER.debug("Found sequence at %s" % (f,))
414 def update_model_sequence_library(model, base_url):
415 """Find sequence objects and add library information if its missing
418 prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
419 select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
421 ?filenode a libNS:illumina_result ;
422 libNS:flowcell_id ?flowcell_id ;
423 libNS:lane_number ?lane_id .
424 OPTIONAL { ?filenode libNS:flowcell ?flowcell . }
425 OPTIONAL { ?filenode libNS:library ?library .}
426 OPTIONAL { ?filenode libNS:library_id ?library_id .}
429 LOGGER.debug("update_model_sequence_library query %s", file_body)
430 file_query = RDF.SPARQLQuery(file_body)
431 files = file_query.execute(model)
433 libraryNS = RDF.NS(urljoin(base_url, 'library/'))
434 flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
436 filenode = f['filenode']
437 LOGGER.debug("Updating file node %s", str(filenode))
438 lane_id = fromTypedNode(f['lane_id'])
439 if f['flowcell'] is None:
440 flowcell = flowcellNS[str(f['flowcell_id'])+'/']
441 LOGGER.debug("Adding file (%s) to flowcell (%s) link",
445 RDF.Statement(filenode, libNS['flowcell'], flowcell))
447 flowcell = f['flowcell']
449 if f['library'] is None:
450 if f['library_id'] is not None:
451 library = libraryNS[str(f['library_id']) + '/']
453 library = guess_library_from_model(model, base_url,
456 library_id = toTypedNode(simplify_uri(library))
457 LOGGER.debug("Adding file (%s) to library (%s) link",
461 RDF.Statement(filenode, libNS['library_id'], library_id))
462 if library is not None:
464 RDF.Statement(filenode, libNS['library'], library))
467 def guess_library_from_model(model, base_url, flowcell, lane_id):
468 """Attempt to find library URI
470 flowcellNode = RDF.Node(flowcell)
471 flowcell = str(flowcell.uri)
473 prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
474 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
475 prefix xsd: <http://www.w3.org/2001/XMLSchema#>
477 select ?library ?lane
479 <{flowcell}> libNS:has_lane ?lane ;
480 a libNS:IlluminaFlowcell .
481 ?lane libNS:lane_number {lane_id} ;
482 libNS:library ?library .
485 lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
488 while len(lanes) == 0 and tries > 0:
490 lane_query = RDF.SPARQLQuery(lane_body)
491 lanes = [ l for l in lane_query.execute(model)]
494 errmsg = "Too many libraries for flowcell {flowcell} "\
495 "lane {lane} = {count}"
496 LOGGER.error(errmsg.format(flowcell=flowcell,
500 elif len(lanes) == 1:
502 return lanes[0]['library']
505 model.load(flowcellNode.uri, name="rdfa")