1 """Common submission elements
9 from htsworkflow.util.rdfhelp import \
16 from htsworkflow.util.rdfns import *
17 from htsworkflow.util.hashfile import make_md5sum
18 from htsworkflow.submission.fastqname import FastqName
19 from htsworkflow.submission.daf import \
20 MetadataLookupException, \
23 from htsworkflow.util import opener
25 from django.conf import settings
26 from django.template import Context, Template, loader
28 LOGGER = logging.getLogger(__name__)
30 class Submission(object):
31 def __init__(self, name, model, host):
35 self.submissionSet = get_submission_uri(self.name)
36 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
37 self.libraryNS = RDF.NS('{0}/library/'.format(host))
38 self.flowcellNS = RDF.NS('{0}/flowcell/'.format(host))
40 self.__view_map = None
42 def scan_submission_dirs(self, result_map):
43 """Examine files in our result directory
45 for lib_id, result_dir in result_map.items():
46 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
48 self.import_analysis_dir(result_dir, lib_id)
49 except MetadataLookupException as e:
50 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
52 def import_analysis_dir(self, analysis_dir, library_id):
53 """Import a submission directories and update our model as needed
55 #attributes = get_filename_attribute_map(paired)
56 libNode = self.libraryNS[library_id + "/"]
58 self._add_library_details_to_model(libNode)
60 submission_files = os.listdir(analysis_dir)
61 for filename in submission_files:
62 pathname = os.path.abspath(os.path.join(analysis_dir, filename))
63 self.construct_file_attributes(analysis_dir, libNode, pathname)
65 def analysis_nodes(self, result_map):
66 """Return an iterable of analysis nodes
68 for result_dir in result_map.values():
69 an_analysis = self.get_submission_node(result_dir)
72 def construct_file_attributes(self, analysis_dir, libNode, pathname):
73 """Looking for the best extension
74 The 'best' is the longest match
77 filename (str): the filename whose extention we are about to examine
79 path, filename = os.path.split(pathname)
81 LOGGER.debug("Searching for view")
82 file_type = self.find_best_match(filename)
84 LOGGER.warn("Unrecognized file: {0}".format(pathname))
86 if str(file_type) == str(libraryOntology['ignore']):
89 an_analysis_name = self.make_submission_name(analysis_dir)
90 an_analysis = self.get_submission_node(analysis_dir)
91 an_analysis_uri = str(an_analysis.uri)
92 file_classification = self.model.get_target(file_type,
94 if file_classification is None:
95 errmsg = 'Could not find class for {0}'
96 LOGGER.warning(errmsg.format(str(file_type)))
99 self.model.add_statement(
100 RDF.Statement(self.submissionSetNS[''],
101 submissionOntology['has_submission'],
103 self.model.add_statement(RDF.Statement(an_analysis,
104 submissionOntology['name'],
105 toTypedNode(an_analysis_name)))
106 self.model.add_statement(
107 RDF.Statement(an_analysis,
109 submissionOntology['submission']))
110 self.model.add_statement(RDF.Statement(an_analysis,
111 submissionOntology['library'],
114 LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
115 # add track specific information
116 self.model.add_statement(
117 RDF.Statement(an_analysis,
118 dafTermOntology['paired'],
119 toTypedNode(self._is_paired(libNode))))
120 self.model.add_statement(
121 RDF.Statement(an_analysis,
122 dafTermOntology['submission'],
125 # add file specific information
126 fileNode = self.make_file_node(pathname, an_analysis)
127 self.add_md5s(filename, fileNode, analysis_dir)
128 self.add_file_size(filename, fileNode, analysis_dir)
129 self.add_read_length(filename, fileNode, analysis_dir)
130 self.add_fastq_metadata(filename, fileNode)
131 self.add_label(file_type, fileNode, libNode)
132 self.model.add_statement(
133 RDF.Statement(fileNode,
136 self.model.add_statement(
137 RDF.Statement(fileNode,
138 libraryOntology['library'],
141 LOGGER.debug("Done.")
143 def make_file_node(self, pathname, submissionNode):
144 """Create file node and attach it to its submission.
146 # add file specific information
147 path, filename = os.path.split(pathname)
148 pathname = os.path.abspath(pathname)
149 fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
150 self.model.add_statement(
151 RDF.Statement(submissionNode,
152 dafTermOntology['has_file'],
154 self.model.add_statement(
155 RDF.Statement(fileNode,
156 dafTermOntology['filename'],
158 self.model.add_statement(
159 RDF.Statement(fileNode,
160 dafTermOntology['relative_path'],
161 os.path.relpath(pathname)))
164 def add_md5s(self, filename, fileNode, analysis_dir):
165 LOGGER.debug("Updating file md5sum")
166 submission_pathname = os.path.join(analysis_dir, filename)
167 md5 = make_md5sum(submission_pathname)
169 errmsg = "Unable to produce md5sum for {0}"
170 LOGGER.warning(errmsg.format(submission_pathname))
172 self.model.add_statement(
173 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
175 def add_file_size(self, filename, fileNode, analysis_dir):
176 submission_pathname = os.path.join(analysis_dir, filename)
177 file_size = os.stat(submission_pathname).st_size
178 self.model.add_statement(
179 RDF.Statement(fileNode, dafTermOntology['file_size'], toTypedNode(file_size)))
180 LOGGER.debug("Updating file size: %d", file_size)
182 def add_read_length(self, filename, fileNode, analysis_dir):
183 submission_pathname = os.path.join(analysis_dir, filename)
184 stream = opener.autoopen(submission_pathname, 'rt')
185 header = stream.readline().strip()
186 sequence = stream.readline().strip()
187 read_length = len(sequence)
188 self.model.add_statement(
189 RDF.Statement(fileNode,
190 libraryOntology['read_length'],
191 toTypedNode(read_length))
193 LOGGER.debug("Updating read length: %d", read_length)
195 def add_fastq_metadata(self, filename, fileNode):
196 # How should I detect if this is actually a fastq file?
198 fqname = FastqName(filename=filename)
200 # currently its just ignore it if the fastq name parser fails
203 terms = [('flowcell', libraryOntology['flowcell_id']),
204 ('lib_id', libraryOntology['library_id']),
205 ('lane', libraryOntology['lane_number']),
206 ('read', libraryOntology['read']),
208 for file_term, model_term in terms:
209 value = fqname.get(file_term)
210 if value is not None:
211 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
214 if 'flowcell' in fqname:
215 value = self.flowcellNS[fqname['flowcell'] + '/']
216 s = RDF.Statement(fileNode, libraryOntology['flowcell'], value)
219 def add_label(self, file_type, file_node, lib_node):
220 """Add rdfs:label to a file node
222 #template_term = libraryOntology['label_template']
223 template_term = libraryOntology['label_template']
224 label_template = self.model.get_target(file_type, template_term)
226 template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
228 'library': str(lib_node.uri),
230 for r in self.execute_query(template, context):
232 label = Template(label_template).render(context)
233 s = RDF.Statement(file_node, rdfsNS['label'], unicode(label))
236 def _add_library_details_to_model(self, libNode):
237 # attributes that can have multiple values
238 set_attributes = set((libraryOntology['has_lane'],
239 libraryOntology['has_mappings'],
240 dafTermOntology['has_file']))
241 parser = RDF.Parser(name='rdfa')
243 new_statements = parser.parse_as_stream(libNode.uri)
244 except RDF.RedlandError as e:
247 LOGGER.debug("Scanning %s", str(libNode.uri))
249 for s in new_statements:
250 # always add "collections"
251 if s.predicate in set_attributes:
254 # don't override things we already have in the model
255 targets = list(self.model.get_targets(s.subject, s.predicate))
256 if len(targets) == 0:
262 self._add_lane_details(libNode)
263 self._add_flowcell_details()
265 def _add_lane_details(self, libNode):
266 """Import lane details
268 query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
270 for lane_stmt in self.model.find_statements(query):
271 lanes.append(lane_stmt.object)
273 parser = RDF.Parser(name='rdfa')
275 LOGGER.debug("Importing %s" % (lane.uri,))
277 parser.parse_into_model(self.model, lane.uri)
278 except RDF.RedlandError as e:
279 LOGGER.error("Error accessing %s" % (lane.uri,))
283 def _add_flowcell_details(self):
284 template = loader.get_template('aws_flowcell.sparql')
285 results = self.execute_query(template, Context())
287 parser = RDF.Parser(name='rdfa')
288 for r in self.execute_query(template, Context()):
289 flowcell = r['flowcell']
291 parser.parse_into_model(self.model, flowcell.uri)
292 except RDF.RedlandError as e:
293 LOGGER.error("Error accessing %s" % (str(flowcell)))
297 def find_best_match(self, filename):
298 """Search through potential filename matching patterns
300 if self.__view_map is None:
301 self.__view_map = self._get_filename_view_map()
304 for pattern, view in self.__view_map.items():
305 if re.match(pattern, filename):
309 msg = "%s matched multiple views %s" % (
311 [str(x) for x in results])
312 raise ModelException(msg)
313 elif len(results) == 1:
318 def _get_filename_view_map(self):
319 """Query our model for filename patterns
321 return a dictionary of compiled regular expressions to view names
323 filename_query = RDF.Statement(
324 None, dafTermOntology['filename_re'], None)
327 for s in self.model.find_statements(filename_query):
328 view_name = s.subject
329 literal_re = s.object.literal_value['string']
330 LOGGER.debug("Found: %s" % (literal_re,))
332 filename_re = re.compile(literal_re)
333 except re.error as e:
334 LOGGER.error("Unable to compile: %s" % (literal_re,))
335 patterns[literal_re] = view_name
338 def make_submission_name(self, analysis_dir):
339 analysis_dir = os.path.normpath(analysis_dir)
340 analysis_dir_name = os.path.split(analysis_dir)[1]
341 if len(analysis_dir_name) == 0:
343 "Submission dir name too short: {0}".format(analysis_dir))
344 return analysis_dir_name
346 def get_submission_node(self, analysis_dir):
347 """Convert a submission directory name to a submission node
349 submission_name = self.make_submission_name(analysis_dir)
350 return self.submissionSetNS[submission_name]
352 def _get_library_attribute(self, libNode, attribute):
353 if not isinstance(attribute, RDF.Node):
354 attribute = libraryOntology[attribute]
356 targets = list(self.model.get_targets(libNode, attribute))
358 return self._format_library_attribute(targets)
362 #targets = self._search_same_as(libNode, attribute)
363 #if targets is not None:
364 # return self._format_library_attribute(targets)
366 # we don't know anything about this attribute
367 self._add_library_details_to_model(libNode)
369 targets = list(self.model.get_targets(libNode, attribute))
371 return self._format_library_attribute(targets)
375 def _format_library_attribute(self, targets):
376 if len(targets) == 0:
378 elif len(targets) == 1:
379 return fromTypedNode(targets[0])
380 elif len(targets) > 1:
381 return [fromTypedNode(t) for t in targets]
383 def _is_paired(self, libNode):
384 """Determine if a library is paired end"""
385 library_type = self._get_library_attribute(libNode, 'library_type')
386 if library_type is None:
387 errmsg = "%s doesn't have a library type"
388 raise ModelException(errmsg % (str(libNode),))
390 single = ['CSHL (lacking last nt)',
391 'Single End (non-multiplexed)',
392 'Small RNA (non-multiplexed)',]
393 paired = ['Barcoded Illumina',
395 'NEBNext Multiplexed',
398 'Paired End (non-multiplexed)',
399 'Dual Index Illumina',]
400 if library_type in single:
402 elif library_type in paired:
405 raise MetadataLookupException(
406 "Unrecognized library type %s for %s" % \
407 (library_type, str(libNode)))
409 def execute_query(self, template, context):
410 """Execute the query, returning the results
412 formatted_query = template.render(context)
413 LOGGER.debug(formatted_query)
414 query = RDF.SPARQLQuery(str(formatted_query))
415 rdfstream = query.execute(self.model)
417 for record in rdfstream:
419 for key, value in record.items():
420 d[key] = fromTypedNode(value)
425 def list_submissions(model):
426 """Return generator of submissions in this model.
429 PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
431 select distinct ?submission
432 where { ?submission subns:has_submission ?library_dir }
434 query = RDF.SPARQLQuery(query_body)
435 rdfstream = query.execute(model)
436 for row in rdfstream:
437 s = strip_namespace(submissionLog, row['submission'])
438 if s[-1] in ['#', '/', '?']: