1 """Common submission elements
9 from htsworkflow.util.rdfhelp import \
16 from htsworkflow.util.rdfns import *
17 from htsworkflow.util.hashfile import make_md5sum
18 from htsworkflow.submission.fastqname import FastqName
19 from htsworkflow.submission.daf import \
20 MetadataLookupException, \
24 from django.conf import settings
25 from django.template import Context, Template, loader
27 LOGGER = logging.getLogger(__name__)
29 class Submission(object):
30 def __init__(self, name, model, host):
34 self.submissionSet = get_submission_uri(self.name)
35 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
36 self.libraryNS = RDF.NS('{0}/library/'.format(host))
38 self.__view_map = None
40 def scan_submission_dirs(self, result_map):
41 """Examine files in our result directory
43 for lib_id, result_dir in result_map.items():
44 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
46 self.import_analysis_dir(result_dir, lib_id)
47 except MetadataLookupException, e:
48 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
50 def import_analysis_dir(self, analysis_dir, library_id):
51 """Import a submission directories and update our model as needed
53 #attributes = get_filename_attribute_map(paired)
54 libNode = self.libraryNS[library_id + "/"]
56 self._add_library_details_to_model(libNode)
58 submission_files = os.listdir(analysis_dir)
59 for filename in submission_files:
60 pathname = os.path.abspath(os.path.join(analysis_dir, filename))
61 self.construct_file_attributes(analysis_dir, libNode, pathname)
63 def analysis_nodes(self, result_map):
64 """Return an iterable of analysis nodes
66 for result_dir in result_map.values():
67 an_analysis = self.get_submission_node(result_dir)
70 def construct_file_attributes(self, analysis_dir, libNode, pathname):
71 """Looking for the best extension
72 The 'best' is the longest match
75 filename (str): the filename whose extention we are about to examine
77 path, filename = os.path.split(pathname)
79 LOGGER.debug("Searching for view")
80 file_type = self.find_best_match(filename)
82 LOGGER.warn("Unrecognized file: {0}".format(pathname))
84 if str(file_type) == str(libraryOntology['ignore']):
87 an_analysis_name = self.make_submission_name(analysis_dir)
88 an_analysis = self.get_submission_node(analysis_dir)
89 an_analysis_uri = str(an_analysis.uri)
90 file_classification = self.model.get_target(file_type,
92 if file_classification is None:
93 errmsg = 'Could not find class for {0}'
94 LOGGER.warning(errmsg.format(str(file_type)))
97 self.model.add_statement(
98 RDF.Statement(self.submissionSetNS[''],
99 submissionOntology['has_submission'],
101 self.model.add_statement(RDF.Statement(an_analysis,
102 submissionOntology['name'],
103 toTypedNode(an_analysis_name)))
104 self.model.add_statement(
105 RDF.Statement(an_analysis,
107 submissionOntology['submission']))
108 self.model.add_statement(RDF.Statement(an_analysis,
109 submissionOntology['library'],
112 LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
113 # add track specific information
114 self.model.add_statement(
115 RDF.Statement(an_analysis,
116 dafTermOntology['paired'],
117 toTypedNode(self._is_paired(libNode))))
118 self.model.add_statement(
119 RDF.Statement(an_analysis,
120 dafTermOntology['submission'],
123 # add file specific information
124 fileNode = self.make_file_node(pathname, an_analysis)
125 self.add_md5s(filename, fileNode, analysis_dir)
126 self.add_file_size(filename, fileNode, analysis_dir)
127 self.add_fastq_metadata(filename, fileNode)
128 self.add_label(file_type, fileNode, libNode)
129 self.model.add_statement(
130 RDF.Statement(fileNode,
133 self.model.add_statement(
134 RDF.Statement(fileNode,
135 libraryOntology['library'],
138 LOGGER.debug("Done.")
140 def make_file_node(self, pathname, submissionNode):
141 """Create file node and attach it to its submission.
143 # add file specific information
144 path, filename = os.path.split(pathname)
145 pathname = os.path.abspath(pathname)
146 fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
147 self.model.add_statement(
148 RDF.Statement(submissionNode,
149 dafTermOntology['has_file'],
151 self.model.add_statement(
152 RDF.Statement(fileNode,
153 dafTermOntology['filename'],
155 self.model.add_statement(
156 RDF.Statement(fileNode,
157 dafTermOntology['relative_path'],
158 os.path.relpath(pathname)))
161 def add_md5s(self, filename, fileNode, analysis_dir):
162 LOGGER.debug("Updating file md5sum")
163 submission_pathname = os.path.join(analysis_dir, filename)
164 md5 = make_md5sum(submission_pathname)
166 errmsg = "Unable to produce md5sum for {0}"
167 LOGGER.warning(errmsg.format(submission_pathname))
169 self.model.add_statement(
170 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
172 def add_file_size(self, filename, fileNode, analysis_dir):
173 LOGGER.debug("Updating file size")
174 submission_pathname = os.path.join(analysis_dir, filename)
175 file_size = os.stat(submission_pathname).st_size
176 self.model.add_statement(
177 RDF.Statement(fileNode, dafTermOntology['file_size'], toTypedNode(file_size)))
179 def add_fastq_metadata(self, filename, fileNode):
180 # How should I detect if this is actually a fastq file?
182 fqname = FastqName(filename=filename)
184 # currently its just ignore it if the fastq name parser fails
187 terms = [('flowcell', libraryOntology['flowcell_id']),
188 ('lib_id', libraryOntology['library_id']),
189 ('lane', libraryOntology['lane_number']),
190 ('read', libraryOntology['read']),
191 ('cycle', libraryOntology['read_length'])]
192 for file_term, model_term in terms:
193 value = fqname.get(file_term)
194 if value is not None:
195 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
198 def add_label(self, file_type, file_node, lib_node):
199 """Add rdfs:label to a file node
201 #template_term = libraryOntology['label_template']
202 template_term = libraryOntology['label_template']
203 label_template = self.model.get_target(file_type, template_term)
205 template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
207 'library': str(lib_node.uri),
209 for r in self.execute_query(template, context):
211 label = Template(label_template).render(context)
212 s = RDF.Statement(file_node, rdfsNS['label'], unicode(label))
215 def _add_library_details_to_model(self, libNode):
216 # attributes that can have multiple values
217 set_attributes = set((libraryOntology['has_lane'],
218 libraryOntology['has_mappings'],
219 dafTermOntology['has_file']))
220 parser = RDF.Parser(name='rdfa')
222 new_statements = parser.parse_as_stream(libNode.uri)
223 except RDF.RedlandError as e:
226 LOGGER.debug("Scanning %s", str(libNode.uri))
228 for s in new_statements:
229 # always add "collections"
230 if s.predicate in set_attributes:
233 # don't override things we already have in the model
234 targets = list(self.model.get_targets(s.subject, s.predicate))
235 if len(targets) == 0:
241 self._add_lane_details(libNode)
243 def _add_lane_details(self, libNode):
244 """Import lane details
246 query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
248 for lane_stmt in self.model.find_statements(query):
249 lanes.append(lane_stmt.object)
251 parser = RDF.Parser(name='rdfa')
253 LOGGER.debug("Importing %s" % (lane.uri,))
255 parser.parse_into_model(self.model, lane.uri)
256 except RDF.RedlandError, e:
257 LOGGER.error("Error accessing %s" % (lane.uri,))
261 def find_best_match(self, filename):
262 """Search through potential filename matching patterns
264 if self.__view_map is None:
265 self.__view_map = self._get_filename_view_map()
268 for pattern, view in self.__view_map.items():
269 if re.match(pattern, filename):
273 msg = "%s matched multiple views %s" % (
275 [str(x) for x in results])
276 raise ModelException(msg)
277 elif len(results) == 1:
282 def _get_filename_view_map(self):
283 """Query our model for filename patterns
285 return a dictionary of compiled regular expressions to view names
287 filename_query = RDF.Statement(
288 None, dafTermOntology['filename_re'], None)
291 for s in self.model.find_statements(filename_query):
292 view_name = s.subject
293 literal_re = s.object.literal_value['string']
294 LOGGER.debug("Found: %s" % (literal_re,))
296 filename_re = re.compile(literal_re)
298 LOGGER.error("Unable to compile: %s" % (literal_re,))
299 patterns[literal_re] = view_name
302 def make_submission_name(self, analysis_dir):
303 analysis_dir = os.path.normpath(analysis_dir)
304 analysis_dir_name = os.path.split(analysis_dir)[1]
305 if len(analysis_dir_name) == 0:
307 "Submission dir name too short: {0}".format(analysis_dir))
308 return analysis_dir_name
310 def get_submission_node(self, analysis_dir):
311 """Convert a submission directory name to a submission node
313 submission_name = self.make_submission_name(analysis_dir)
314 return self.submissionSetNS[submission_name]
316 def _get_library_attribute(self, libNode, attribute):
317 if not isinstance(attribute, RDF.Node):
318 attribute = libraryOntology[attribute]
320 targets = list(self.model.get_targets(libNode, attribute))
322 return self._format_library_attribute(targets)
326 #targets = self._search_same_as(libNode, attribute)
327 #if targets is not None:
328 # return self._format_library_attribute(targets)
330 # we don't know anything about this attribute
331 self._add_library_details_to_model(libNode)
333 targets = list(self.model.get_targets(libNode, attribute))
335 return self._format_library_attribute(targets)
339 def _format_library_attribute(self, targets):
340 if len(targets) == 0:
342 elif len(targets) == 1:
343 return fromTypedNode(targets[0])
344 elif len(targets) > 1:
345 return [fromTypedNode(t) for t in targets]
347 def _is_paired(self, libNode):
348 """Determine if a library is paired end"""
349 library_type = self._get_library_attribute(libNode, 'library_type')
350 if library_type is None:
351 errmsg = "%s doesn't have a library type"
352 raise ModelException(errmsg % (str(libNode),))
354 single = ['CSHL (lacking last nt)',
355 'Single End (non-multiplexed)',
356 'Small RNA (non-multiplexed)',]
357 paired = ['Barcoded Illumina',
359 'NEBNext Multiplexed',
362 'Paired End (non-multiplexed)',
363 'Dual Index Illumina',]
364 if library_type in single:
366 elif library_type in paired:
369 raise MetadataLookupException(
370 "Unrecognized library type %s for %s" % \
371 (library_type, str(libNode)))
373 def execute_query(self, template, context):
374 """Execute the query, returning the results
376 formatted_query = template.render(context)
377 LOGGER.debug(formatted_query)
378 query = RDF.SPARQLQuery(str(formatted_query))
379 rdfstream = query.execute(self.model)
381 for record in rdfstream:
383 for key, value in record.items():
384 d[key] = fromTypedNode(value)
389 def list_submissions(model):
390 """Return generator of submissions in this model.
393 PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
395 select distinct ?submission
396 where { ?submission subns:has_submission ?library_dir }
398 query = RDF.SPARQLQuery(query_body)
399 rdfstream = query.execute(model)
400 for row in rdfstream:
401 s = strip_namespace(submissionLog, row['submission'])
402 if s[-1] in ['#', '/', '?']: