2 Utilities to work with the various eras of sequence archive files
10 from urlparse import urljoin, urlparse
13 from htsworkflow.util.rdfhelp import libraryOntology as libNS
14 from htsworkflow.util.rdfhelp import toTypedNode, fromTypedNode, rdfNS, \
15 stripNamespace, dump_model, simplify_uri
17 LOGGER = logging.getLogger(__name__)
19 eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
20 raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+')
21 qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Za-z]+_l[\d]_r[\d].tar.bz2')
23 SEQUENCE_TABLE_NAME = "sequences"
24 def create_sequence_table(cursor):
26 Create a SQL table to hold SequenceFile entries
29 CREATE TABLE %(table)s (
38 """ %( {'table': SEQUENCE_TABLE_NAME} )
39 return cursor.execute(sql)
41 FlowcellPath = collections.namedtuple('FlowcellPath',
42 'flowcell start stop project')
44 class SequenceFile(object):
46 Simple container class that holds the path to a sequence archive
47 and basic descriptive information.
49 def __init__(self, filetype, path, flowcell, lane,
56 """Store various fields used in our sequence files
58 filetype is one of 'qseq', 'srf', 'fastq'
59 path = location of file
60 flowcell = files flowcell id
62 read = which sequencer read, usually 1 or 2
63 pf = did it pass filter
64 cycle = cycle dir name e.g. C1-202
65 project = projed name from HiSeq, probably library ID
66 index = HiSeq barcode index sequence
67 split = file fragment from HiSeq (Since one file is split into many)
69 self.filetype = filetype
71 self.flowcell = flowcell
76 self.project = project
81 return hash(self.key())
84 return (self.flowcell, self.lane, self.read, self.project, self.split)
86 def __unicode__(self):
87 return unicode(self.path)
89 def __eq__(self, other):
91 Equality is defined if everything but the path matches
93 attributes = ['filetype',
103 if getattr(self, a) != getattr(other, a):
108 def __ne__(self, other):
109 return not self == other
112 return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
114 def make_target_name(self, root):
116 Create target name for where we need to link this sequence too
118 path, basename = os.path.split(self.path)
119 # Because the names aren't unque we include the flowcel name
120 # because there were different eland files for different length
121 # analyses, we include the cycle length in the name.
122 if self.filetype == 'eland':
123 template = "%(flowcell)s_%(cycle)s_%(eland)s"
124 basename = template % { 'flowcell': self.flowcell,
128 # all the other file types have names that include flowcell/lane
129 # information and thus are unique so we don't have to do anything
130 return os.path.join(root, basename)
132 def save_to_sql(self, cursor):
134 Add this entry to a DB2.0 database.
136 #FIXME: NEEDS SQL ESCAPING
137 header_macro = {'table': SEQUENCE_TABLE_NAME }
138 sql_header = "insert into %(table)s (" % header_macro
139 sql_columns = ['filetype','path','flowcell','lane']
140 sql_middle = ") values ("
141 sql_values = [self.filetype, self.path, self.flowcell, self.lane]
143 for name in ['read', 'pf', 'cycle']:
144 value = getattr(self, name)
145 if value is not None:
146 sql_columns.append(name)
147 sql_values.append(value)
149 sql = " ".join([sql_header,
150 ", ".join(sql_columns),
152 # note the following makes a string like ?,?,?
153 ",".join(["?"] * len(sql_values)),
156 return cursor.execute(sql, sql_values)
158 def save_to_model(self, model, base_url=None):
159 def add_lit(model, s, p, o):
161 model.add_statement(RDF.Statement(s, p, toTypedNode(o)))
162 def add(model, s, p, o):
163 model.add_statement(RDF.Statement(s,p,o))
164 fileNode = RDF.Node(RDF.Uri('file://' + os.path.abspath(self.path)))
165 add(model, fileNode, rdfNS['type'], libNS['raw_file'])
166 add_lit(model, fileNode, libNS['flowcell_id'], self.flowcell)
167 add_lit(model, fileNode, libNS['lane_number'], self.lane)
168 if self.read is not None:
169 add_lit(model, fileNode, libNS['read'], self.read)
171 add_lit(model, fileNode, libNS['read'], '')
173 add_lit(model, fileNode, libNS['library_id'], self.project)
174 add_lit(model, fileNode, libNS['multiplex_index'], self.index)
175 add_lit(model, fileNode, libNS['split_id'], self.split)
176 add_lit(model, fileNode, libNS['cycle'], self.cycle)
177 add_lit(model, fileNode, libNS['passed_filter'], self.pf)
178 add(model, fileNode, rdfNS['type'], libNS[self.filetype])
180 if base_url is not None:
181 flowcell = RDF.Node(RDF.Uri("{base}/flowcell/{flowcell}/".format(
183 flowcell=self.flowcell)))
184 add(model, fileNode, libNS['flowcell'], flowcell)
185 if self.project is not None:
186 library = RDF.Node(RDF.Uri("{base}/library/{library}".format(
188 library=self.project)))
189 add(model, fileNode, libNS['library'], library)
193 def load_from_model(cls, model, seq_id):
196 stmts = model.find_statements(RDF.Statement(s, p, None))
199 if not obj.is_resource():
200 values.append(fromTypedNode(obj))
207 errmsg = u"To many values for %s %s"
208 raise ValueError(errmsg % (unicode(s), unicode(p)))
209 elif len(values) == 1:
214 if not isinstance(seq_id, RDF.Node):
215 seq_id = RDF.Node(RDF.Uri(seq_id))
216 seqTypesStmt = RDF.Statement(seq_id, rdfNS['type'], None)
217 seqTypes = model.find_statements(seqTypesStmt)
218 isSequenceFile = False
220 if s.object == libNS['raw_file']:
221 isSequenceFile = True
223 seq_type = stripNamespace(libNS, s.object)
225 if not isSequenceFile:
226 raise KeyError(u"%s not found" % (unicode(seq_id),))
228 path = urlparse(str(seq_id.uri)).path
229 flowcellNode = get_one(seq_id, libNS['flowcell'])
230 flowcell = get_one(seq_id, libNS['flowcell_id'])
231 lane = get_one(seq_id, libNS['lane_number'])
232 read = get_one(seq_id, libNS['read'])
234 obj = cls(seq_type, path, flowcell, lane)
235 obj.read = read if read != '' else None
236 obj.project = get_one(seq_id, libNS['library_id'])
237 obj.index = get_one(seq_id, libNS['multiplex_index'])
238 obj.split = get_one(seq_id, libNS['split_id'])
239 obj.cycle = get_one(seq_id, libNS['cycle'] )
240 obj.pf = get_one(seq_id, libNS['passed_filter'])
241 obj.libraryNode = get_one(seq_id, libNS['library'])
245 def get_flowcell_cycle(path):
247 Extract flowcell, cycle from pathname
249 path = os.path.normpath(path)
251 rest, tail = os.path.split(path)
252 if tail.startswith('Project_'):
253 # we're in a multiplexed sample
255 rest, cycle = os.path.split(rest)
259 rest, flowcell = os.path.split(rest)
260 cycle_match = re.match("C(?P<start>[0-9]+)-(?P<stop>[0-9]+)", cycle)
261 if cycle_match is None:
263 "Expected .../flowcell/cycle/ directory structure in %s" % \
265 start = cycle_match.group('start')
266 if start is not None:
268 stop = cycle_match.group('stop')
272 return FlowcellPath(flowcell, start, stop, project)
274 def parse_srf(path, filename):
275 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
276 basename, ext = os.path.splitext(filename)
277 records = basename.split('_')
278 flowcell = records[4]
279 lane = int(records[5][0])
280 fullpath = os.path.join(path, filename)
282 if flowcell_dir != flowcell:
283 LOGGER.warn("flowcell %s found in wrong directory %s" % \
286 return SequenceFile('srf', fullpath, flowcell, lane, cycle=stop)
288 def parse_qseq(path, filename):
289 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
290 basename, ext = os.path.splitext(filename)
291 records = basename.split('_')
292 fullpath = os.path.join(path, filename)
293 flowcell = records[4]
294 lane = int(records[5][1])
295 read = int(records[6][1])
297 if flowcell_dir != flowcell:
298 LOGGER.warn("flowcell %s found in wrong directory %s" % \
301 return SequenceFile('qseq', fullpath, flowcell, lane, read, cycle=stop)
303 def parse_fastq(path, filename):
306 flowcell_dir, start, stop, project = get_flowcell_cycle(path)
307 basename = re.sub('\.fastq(\.gz|\.bz2)?$', '', filename)
308 records = basename.split('_')
309 fullpath = os.path.join(path, filename)
310 if project is not None:
311 # demultiplexed sample!
312 flowcell = flowcell_dir
313 lane = int(records[2][-1])
314 read = int(records[3][-1])
315 pf = True # as I understand it hiseq runs toss the ones that fail filter
317 project_id = records[0]
319 sequence_type = 'split_fastq'
321 flowcell = records[4]
322 lane = int(records[5][1])
323 read = int(records[6][1])
324 pf = parse_fastq_pf_flag(records)
328 sequence_type = 'fastq'
330 if flowcell_dir != flowcell:
331 LOGGER.warn("flowcell %s found in wrong directory %s" % \
334 return SequenceFile(sequence_type, fullpath, flowcell, lane, read,
341 def parse_fastq_pf_flag(records):
342 """Take a fastq filename split on _ and look for the pass-filter flag
347 fastq_type = records[-1].lower()
348 if fastq_type.startswith('pass'):
350 elif fastq_type.startswith('nopass'):
352 elif fastq_type.startswith('all'):
355 raise ValueError("Unrecognized fastq name: %s" % (
360 def parse_eland(path, filename, eland_match=None):
361 if eland_match is None:
362 eland_match = eland_re.match(filename)
363 fullpath = os.path.join(path, filename)
364 flowcell, start, stop, project = get_flowcell_cycle(path)
365 if eland_match.group('lane'):
366 lane = int(eland_match.group('lane'))
369 if eland_match.group('read'):
370 read = int(eland_match.group('read'))
373 return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=stop)
375 def scan_for_sequences(dirs):
377 Scan through a list of directories for sequence like files
380 if type(dirs) in types.StringTypes:
381 raise ValueError("You probably want a list or set, not a string")
384 LOGGER.info("Scanning %s for sequences" % (d,))
385 if not os.path.exists(d):
386 LOGGER.warn("Flowcell directory %s does not exist" % (d,))
389 for path, dirname, filenames in os.walk(d):
392 # find sequence files
393 if f.endswith('.md5'):
395 elif f.endswith('.srf') or f.endswith('.srf.bz2'):
396 seq = parse_srf(path, f)
397 elif qseq_re.match(f):
398 seq = parse_qseq(path, f)
399 elif f.endswith('.fastq') or \
400 f.endswith('.fastq.bz2') or \
401 f.endswith('.fastq.gz'):
402 seq = parse_fastq(path, f)
403 eland_match = eland_re.match(f)
405 if f.endswith('.md5'):
407 seq = parse_eland(path, f, eland_match)
409 sequences.append(seq)
410 LOGGER.debug("Found sequence at %s" % (f,))
415 def update_model_sequence_library(model, base_url):
416 """Find sequence objects and add library information if its missing
419 prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
420 select ?filenode ?flowcell_id ?lane_id ?library_id ?flowcell ?library
422 ?filenode a libNS:raw_file ;
423 libNS:flowcell_id ?flowcell_id ;
424 libNS:lane_number ?lane_id .
425 OPTIONAL { ?filenode libNS:flowcell ?flowcell . }
426 OPTIONAL { ?filenode libNS:library ?library .}
427 OPTIONAL { ?filenode libNS:library_id ?library_id .}
430 file_query = RDF.SPARQLQuery(file_body)
431 files = file_query.execute(model)
433 libraryNS = RDF.NS(urljoin(base_url, 'library/'))
434 flowcellNS = RDF.NS(urljoin(base_url, 'flowcell/'))
436 filenode = f['filenode']
437 lane_id = fromTypedNode(f['lane_id'])
438 if f['flowcell'] is None:
439 flowcell = flowcellNS[str(f['flowcell_id'])+'/']
441 RDF.Statement(filenode, libNS['flowcell'], flowcell))
443 flowcell = f['flowcell']
445 if f['library'] is None:
446 if f['library_id'] is not None:
447 library = libraryNS[str(f['library_id']) + '/']
449 library = guess_library_from_model(model, base_url,
452 library_id = toTypedNode(simplify_uri(library))
454 RDF.Statement(filenode, libNS['library_id'], library_id))
455 if library is not None:
457 RDF.Statement(filenode, libNS['library'], library))
460 def guess_library_from_model(model, base_url, flowcell, lane_id):
461 """Attempt to find library URI
463 flowcellNode = RDF.Node(flowcell)
464 flowcell = str(flowcell.uri)
466 prefix libNS: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
467 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
468 prefix xsd: <http://www.w3.org/2001/XMLSchema#>
470 select ?library ?lane
472 <{flowcell}> libNS:has_lane ?lane ;
473 a libNS:illumina_flowcell .
474 ?lane libNS:lane_number {lane_id} ;
475 libNS:library ?library .
478 lane_body = lane_body.format(flowcell=flowcell, lane_id=lane_id)
481 while len(lanes) == 0 and tries > 0:
483 lane_query = RDF.SPARQLQuery(lane_body)
484 lanes = [ l for l in lane_query.execute(model)]
487 errmsg = "Too many libraries for flowcell {flowcell} "\
488 "lane {lane} = {count}"
489 LOGGER.error(errmsg.format(flowcell=flowcell,
493 elif len(lanes) == 1:
495 return lanes[0]['library']
498 model.load(flowcellNode.uri, name="rdfa")