1 """Common submission elements
9 from htsworkflow.util.rdfhelp import \
14 from htsworkflow.util.rdfns import (
22 from htsworkflow.util.hashfile import make_md5sum
23 from htsworkflow.submission.fastqname import FastqName
24 from htsworkflow.submission.daf import \
25 MetadataLookupException, \
28 from htsworkflow.util import opener
30 from django.template import Context, Template, loader
32 LOGGER = logging.getLogger(__name__)
34 class Submission(object):
35 def __init__(self, name, model, host):
39 self.submissionSet = get_submission_uri(self.name)
40 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
41 self.libraryNS = RDF.NS('{0}/library/'.format(host))
42 self.flowcellNS = RDF.NS('{0}/flowcell/'.format(host))
44 self.__view_map = None
46 def scan_submission_dirs(self, result_map):
47 """Examine files in our result directory
49 for lib_id, result_dir in result_map.items():
50 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
52 self.import_analysis_dir(result_dir, lib_id)
53 except MetadataLookupException as e:
54 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
56 def import_analysis_dir(self, analysis_dir, library_id):
57 """Import a submission directories and update our model as needed
59 #attributes = get_filename_attribute_map(paired)
60 libNode = self.libraryNS[library_id + "/"]
62 self._add_library_details_to_model(libNode)
64 submission_files = os.listdir(analysis_dir)
65 for filename in submission_files:
66 pathname = os.path.abspath(os.path.join(analysis_dir, filename))
67 self.construct_file_attributes(analysis_dir, libNode, pathname)
69 def analysis_nodes(self, result_map):
70 """Return an iterable of analysis nodes
72 for result_dir in result_map.values():
73 an_analysis = self.get_submission_node(result_dir)
76 def construct_file_attributes(self, analysis_dir, libNode, pathname):
77 """Looking for the best extension
78 The 'best' is the longest match
81 filename (str): the filename whose extention we are about to examine
83 path, filename = os.path.split(pathname)
85 LOGGER.debug("Searching for view")
86 file_type = self.find_best_match(filename)
88 LOGGER.warn("Unrecognized file: {0}".format(pathname))
90 if str(file_type) == str(libraryOntology['ignore']):
93 an_analysis_name = self.make_submission_name(analysis_dir)
94 an_analysis = self.get_submission_node(analysis_dir)
95 file_classification = self.model.get_target(file_type,
97 if file_classification is None:
98 errmsg = 'Could not find class for {0}'
99 LOGGER.warning(errmsg.format(str(file_type)))
102 self.model.add_statement(
103 RDF.Statement(self.submissionSetNS[''],
104 submissionOntology['has_submission'],
106 self.model.add_statement(RDF.Statement(an_analysis,
107 submissionOntology['name'],
108 toTypedNode(an_analysis_name)))
109 self.model.add_statement(
110 RDF.Statement(an_analysis,
112 submissionOntology['submission']))
113 self.model.add_statement(RDF.Statement(an_analysis,
114 submissionOntology['library'],
117 LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
118 # add track specific information
119 self.model.add_statement(
120 RDF.Statement(an_analysis,
121 dafTermOntology['paired'],
122 toTypedNode(self._is_paired(libNode))))
123 self.model.add_statement(
124 RDF.Statement(an_analysis,
125 dafTermOntology['submission'],
128 # add file specific information
129 fileNode = self.make_file_node(pathname, an_analysis)
130 self.add_md5s(filename, fileNode, analysis_dir)
131 self.add_file_size(filename, fileNode, analysis_dir)
132 self.add_read_length(filename, fileNode, analysis_dir)
133 self.add_fastq_metadata(filename, fileNode)
134 self.add_label(file_type, fileNode, libNode)
135 self.model.add_statement(
136 RDF.Statement(fileNode,
139 self.model.add_statement(
140 RDF.Statement(fileNode,
141 libraryOntology['library'],
144 LOGGER.debug("Done.")
146 def make_file_node(self, pathname, submissionNode):
147 """Create file node and attach it to its submission.
149 # add file specific information
150 path, filename = os.path.split(pathname)
151 pathname = os.path.abspath(pathname)
152 fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
153 self.model.add_statement(
154 RDF.Statement(submissionNode,
155 dafTermOntology['has_file'],
157 self.model.add_statement(
158 RDF.Statement(fileNode,
159 dafTermOntology['filename'],
161 self.model.add_statement(
162 RDF.Statement(fileNode,
163 dafTermOntology['relative_path'],
164 os.path.relpath(pathname)))
167 def add_md5s(self, filename, fileNode, analysis_dir):
168 LOGGER.debug("Updating file md5sum")
169 submission_pathname = os.path.join(analysis_dir, filename)
170 md5 = make_md5sum(submission_pathname)
172 errmsg = "Unable to produce md5sum for {0}"
173 LOGGER.warning(errmsg.format(submission_pathname))
175 self.model.add_statement(
176 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
178 def add_file_size(self, filename, fileNode, analysis_dir):
179 submission_pathname = os.path.join(analysis_dir, filename)
180 file_size = os.stat(submission_pathname).st_size
181 self.model.add_statement(
182 RDF.Statement(fileNode, dafTermOntology['file_size'], toTypedNode(file_size)))
183 LOGGER.debug("Updating file size: %d", file_size)
185 def add_read_length(self, filename, fileNode, analysis_dir):
186 submission_pathname = os.path.join(analysis_dir, filename)
187 stream = opener.autoopen(submission_pathname, 'rt')
188 header = stream.readline().strip()
189 sequence = stream.readline().strip()
190 read_length = len(sequence)
191 self.model.add_statement(
192 RDF.Statement(fileNode,
193 libraryOntology['read_length'],
194 toTypedNode(read_length))
196 LOGGER.debug("Updating read length: %d", read_length)
198 def add_fastq_metadata(self, filename, fileNode):
199 # How should I detect if this is actually a fastq file?
201 fqname = FastqName(filename=filename)
203 # currently its just ignore it if the fastq name parser fails
206 terms = [('flowcell', libraryOntology['flowcell_id']),
207 ('lib_id', libraryOntology['library_id']),
208 ('lane', libraryOntology['lane_number']),
209 ('read', libraryOntology['read']),
211 for file_term, model_term in terms:
212 value = fqname.get(file_term)
213 if value is not None:
214 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
217 if 'flowcell' in fqname:
218 value = self.flowcellNS[fqname['flowcell'] + '/']
219 s = RDF.Statement(fileNode, libraryOntology['flowcell'], value)
222 def add_label(self, file_type, file_node, lib_node):
223 """Add rdfs:label to a file node
225 #template_term = libraryOntology['label_template']
226 template_term = libraryOntology['label_template']
227 label_template = self.model.get_target(file_type, template_term)
229 template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
231 'library': str(lib_node.uri),
233 for r in self.execute_query(template, context):
235 label = Template(label_template).render(context)
236 s = RDF.Statement(file_node, rdfsNS['label'], unicode(label))
239 def _add_library_details_to_model(self, libNode):
240 # attributes that can have multiple values
241 set_attributes = set((libraryOntology['has_lane'],
242 libraryOntology['has_mappings'],
243 dafTermOntology['has_file']))
244 parser = RDF.Parser(name='rdfa')
246 new_statements = parser.parse_as_stream(libNode.uri)
247 except RDF.RedlandError as e:
250 LOGGER.debug("Scanning %s", str(libNode.uri))
252 for s in new_statements:
253 # always add "collections"
254 if s.predicate in set_attributes:
257 # don't override things we already have in the model
258 targets = list(self.model.get_targets(s.subject, s.predicate))
259 if len(targets) == 0:
265 self._add_lane_details(libNode)
266 self._add_flowcell_details()
268 def _add_lane_details(self, libNode):
269 """Import lane details
271 query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
273 for lane_stmt in self.model.find_statements(query):
274 lanes.append(lane_stmt.object)
276 parser = RDF.Parser(name='rdfa')
278 LOGGER.debug("Importing %s" % (lane.uri,))
280 parser.parse_into_model(self.model, lane.uri)
281 except RDF.RedlandError as e:
282 LOGGER.error("Error accessing %s" % (lane.uri,))
286 def _add_flowcell_details(self):
287 template = loader.get_template('aws_flowcell.sparql')
289 parser = RDF.Parser(name='rdfa')
290 for r in self.execute_query(template, Context()):
291 flowcell = r['flowcell']
293 parser.parse_into_model(self.model, flowcell.uri)
294 except RDF.RedlandError as e:
295 LOGGER.error("Error accessing %s" % (str(flowcell)))
299 def find_best_match(self, filename):
300 """Search through potential filename matching patterns
302 if self.__view_map is None:
303 self.__view_map = self._get_filename_view_map()
306 for pattern, view in self.__view_map.items():
307 if re.match(pattern, filename):
311 msg = "%s matched multiple views %s" % (
313 [str(x) for x in results])
314 raise ModelException(msg)
315 elif len(results) == 1:
320 def _get_filename_view_map(self):
321 """Query our model for filename patterns
323 return a dictionary of compiled regular expressions to view names
325 filename_query = RDF.Statement(
326 None, dafTermOntology['filename_re'], None)
329 for s in self.model.find_statements(filename_query):
330 view_name = s.subject
331 literal_re = s.object.literal_value['string']
332 LOGGER.debug("Found: %s" % (literal_re,))
334 filename_re = re.compile(literal_re)
335 except re.error as e:
336 LOGGER.error("Unable to compile: %s" % (literal_re,))
337 patterns[literal_re] = view_name
340 def make_submission_name(self, analysis_dir):
341 analysis_dir = os.path.normpath(analysis_dir)
342 analysis_dir_name = os.path.split(analysis_dir)[1]
343 if len(analysis_dir_name) == 0:
345 "Submission dir name too short: {0}".format(analysis_dir))
346 return analysis_dir_name
348 def get_submission_node(self, analysis_dir):
349 """Convert a submission directory name to a submission node
351 submission_name = self.make_submission_name(analysis_dir)
352 return self.submissionSetNS[submission_name]
354 def _get_library_attribute(self, libNode, attribute):
355 if not isinstance(attribute, RDF.Node):
356 attribute = libraryOntology[attribute]
358 targets = list(self.model.get_targets(libNode, attribute))
360 return self._format_library_attribute(targets)
364 #targets = self._search_same_as(libNode, attribute)
365 #if targets is not None:
366 # return self._format_library_attribute(targets)
368 # we don't know anything about this attribute
369 self._add_library_details_to_model(libNode)
371 targets = list(self.model.get_targets(libNode, attribute))
373 return self._format_library_attribute(targets)
377 def _format_library_attribute(self, targets):
378 if len(targets) == 0:
380 elif len(targets) == 1:
381 return fromTypedNode(targets[0])
382 elif len(targets) > 1:
383 return [fromTypedNode(t) for t in targets]
385 def _is_paired(self, libNode):
386 """Determine if a library is paired end"""
387 library_type = self._get_library_attribute(libNode, 'library_type')
388 if library_type is None:
389 errmsg = "%s doesn't have a library type"
390 raise ModelException(errmsg % (str(libNode),))
392 single = ['CSHL (lacking last nt)',
393 'Single End (non-multiplexed)',
394 'Small RNA (non-multiplexed)',]
395 paired = ['Barcoded Illumina',
397 'NEBNext Multiplexed',
400 'Paired End (non-multiplexed)',
401 'Dual Index Illumina',]
402 if library_type in single:
404 elif library_type in paired:
407 raise MetadataLookupException(
408 "Unrecognized library type %s for %s" % \
409 (library_type, str(libNode)))
411 def execute_query(self, template, context):
412 """Execute the query, returning the results
414 formatted_query = template.render(context)
415 LOGGER.debug(formatted_query)
416 query = RDF.SPARQLQuery(str(formatted_query))
417 rdfstream = query.execute(self.model)
419 for record in rdfstream:
421 for key, value in record.items():
422 d[key] = fromTypedNode(value)
427 def list_submissions(model):
428 """Return generator of submissions in this model.
431 PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
433 select distinct ?submission
434 where { ?submission subns:has_submission ?library_dir }
436 q = RDF.Statement(None, submissionOntology['has_submission'], None)
438 for statement in model.find_statements(q):
439 s = strip_namespace(submissionLog, statement.subject)
440 if s[-1] in ['#', '/', '?']:
444 return list(submissions)