1 """Common submission elements
7 from six.moves.urllib.error import HTTPError
9 from rdflib import Graph, Literal, Namespace, URIRef
10 from rdflib.namespace import RDF, RDFS
12 from htsworkflow.util.rdfhelp import (
16 from htsworkflow.util.rdfns import (
22 from htsworkflow.util.hashfile import make_md5sum
23 from htsworkflow.submission.fastqname import FastqName
24 from htsworkflow.submission.daf import \
25 MetadataLookupException, \
28 from htsworkflow.util import opener
30 from django.template import Context, Template, loader
32 LOGGER = logging.getLogger(__name__)
34 class Submission(object):
35 def __init__(self, name, model, host):
39 self.submissionSet = get_submission_uri(self.name)
40 self.submissionSetNS = Namespace(str(self.submissionSet) + '#')
41 self.libraryNS = Namespace('{0}/library/'.format(host))
42 self.flowcellNS = Namespace('{0}/flowcell/'.format(host))
44 self.__view_map = None
46 def scan_submission_dirs(self, result_map):
47 """Examine files in our result directory
49 for lib_id, result_dir in result_map.items():
50 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
52 self.import_analysis_dir(result_dir, lib_id)
53 except MetadataLookupException as e:
54 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
56 def import_analysis_dir(self, analysis_dir, library_id):
57 """Import a submission directories and update our model as needed
59 #attributes = get_filename_attribute_map(paired)
60 libNode = self.libraryNS[library_id + "/"]
62 self._add_library_details_to_model(libNode)
64 submission_files = os.listdir(analysis_dir)
65 for filename in submission_files:
66 pathname = os.path.abspath(os.path.join(analysis_dir, filename))
67 self.construct_file_attributes(analysis_dir, libNode, pathname)
69 def analysis_nodes(self, result_map):
70 """Return an iterable of analysis nodes
72 for result_dir in result_map.values():
73 an_analysis = self.get_submission_node(result_dir)
76 def construct_file_attributes(self, analysis_dir, libNode, pathname):
77 """Looking for the best extension
78 The 'best' is the longest match
81 filename (str): the filename whose extention we are about to examine
83 path, filename = os.path.split(pathname)
85 LOGGER.debug("Searching for view")
86 file_type = self.find_best_match(filename)
88 LOGGER.warn("Unrecognized file: {0}".format(pathname))
90 if str(file_type) == str(libraryOntology['ignore']):
93 an_analysis_name = self.make_submission_name(analysis_dir)
94 an_analysis = self.get_submission_node(analysis_dir)
95 file_classifications = list(self.model.objects(file_type, RDF['type']))
96 if len(file_classifications) == 0:
97 errmsg = 'Could not find class for {0}'
98 LOGGER.warning(errmsg.format(str(file_type)))
100 file_classification = file_classifications[0]
102 self.model.add((self.submissionSetNS[''],
103 submissionOntology['has_submission'],
105 self.model.add((an_analysis,
106 submissionOntology['name'],
107 Literal(an_analysis_name)))
108 self.model.add((an_analysis,
110 submissionOntology['submission']))
111 self.model.add((an_analysis,
112 submissionOntology['library'],
115 LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
116 # add track specific information
117 self.model.add((an_analysis,
118 dafTermOntology['paired'],
119 Literal(self._is_paired(libNode))))
120 self.model.add((an_analysis,
121 dafTermOntology['submission'],
124 # add file specific information
125 fileNode = self.make_file_node(pathname, an_analysis)
126 self.add_md5s(filename, fileNode, analysis_dir)
127 self.add_file_size(filename, fileNode, analysis_dir)
128 self.add_read_length(filename, fileNode, analysis_dir)
129 self.add_fastq_metadata(filename, fileNode)
130 self.add_label(file_type, fileNode, libNode)
131 self.model.add((fileNode,
134 self.model.add((fileNode,
135 libraryOntology['library'],
138 LOGGER.debug("Done.")
140 def make_file_node(self, pathname, submissionNode):
141 """Create file node and attach it to its submission.
143 # add file specific information
144 path, filename = os.path.split(pathname)
145 pathname = os.path.abspath(pathname)
146 fileNode = URIRef('file://'+ pathname)
147 self.model.add((submissionNode,
148 dafTermOntology['has_file'],
150 self.model.add((fileNode,
151 dafTermOntology['filename'],
153 self.model.add((fileNode,
154 dafTermOntology['relative_path'],
155 Literal(os.path.relpath(pathname))))
158 def add_md5s(self, filename, fileNode, analysis_dir):
159 LOGGER.debug("Updating file md5sum")
160 submission_pathname = os.path.join(analysis_dir, filename)
161 md5 = make_md5sum(submission_pathname)
163 errmsg = "Unable to produce md5sum for {0}"
164 LOGGER.warning(errmsg.format(submission_pathname))
166 self.model.add((fileNode, dafTermOntology['md5sum'], Literal(md5)))
168 def add_file_size(self, filename, fileNode, analysis_dir):
169 submission_pathname = os.path.join(analysis_dir, filename)
170 file_size = os.stat(submission_pathname).st_size
171 self.model.add((fileNode, dafTermOntology['file_size'], Literal(file_size)))
172 LOGGER.debug("Updating file size: %d", file_size)
174 def add_read_length(self, filename, fileNode, analysis_dir):
175 submission_pathname = os.path.join(analysis_dir, filename)
176 stream = opener.autoopen(submission_pathname, 'rt')
177 header = stream.readline().strip()
178 sequence = stream.readline().strip()
179 read_length = len(sequence)
180 self.model.add((fileNode,
181 libraryOntology['read_length'],
182 Literal(read_length)))
183 LOGGER.debug("Updating read length: %d", read_length)
185 def add_fastq_metadata(self, filename, fileNode):
186 # How should I detect if this is actually a fastq file?
188 fqname = FastqName(filename=filename)
190 # currently its just ignore it if the fastq name parser fails
193 terms = [('flowcell', libraryOntology['flowcell_id']),
194 ('lib_id', libraryOntology['library_id']),
195 ('lane', libraryOntology['lane_number']),
196 ('read', libraryOntology['read']),
198 for file_term, model_term in terms:
199 value = fqname.get(file_term)
200 if value is not None:
201 s = (fileNode, model_term, value.toPython())
204 if 'flowcell' in fqname:
205 value = self.flowcellNS[fqname['flowcell'] + '/']
206 s = (fileNode, libraryOntology['flowcell'], value)
209 def add_label(self, file_type, file_node, lib_node):
210 """Add rdfs:label to a file node
212 #template_term = libraryOntology['label_template']
213 template_term = libraryOntology['label_template']
214 label_templates = list(self.model.objects(file_type, template_term))
215 if len(label_templates) > 0:
216 label_template = label_templates[0]
217 template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
219 'library': str(lib_node.uri),
221 for r in self.execute_query(template, context):
223 label = Template(label_template).render(context)
224 s = (file_node, rdfsNS['label'], unicode(label))
227 def _add_library_details_to_model(self, libNode):
228 # attributes that can have multiple values
229 set_attributes = set((libraryOntology['has_lane'],
230 libraryOntology['has_mappings'],
231 dafTermOntology['has_file']))
234 tmpmodel.parse(source=libNode, format='rdfa')
235 except HTTPError as e:
239 LOGGER.debug("Scanning %s", str(libNode))
241 for stmt in tmpmodel:
243 # always add "collections"
244 if p in set_attributes:
247 # don't override things we already have in the model
248 targets = list(self.model.get_targets(s, p))
249 if len(targets) == 0:
255 self._add_lane_details(libNode)
256 self._add_flowcell_details()
258 def _add_lane_details(self, libNode):
259 """Import lane details
261 query = (libNode, libraryOntology['has_lane'], None)
263 for lane_stmt in self.model.triples(query):
264 lanes.append(lane_stmt[2])
267 LOGGER.debug("Importing %s" % (lane,))
268 self.model.parse(source=lane, format='rdfa')
271 def _add_flowcell_details(self):
272 template = loader.get_template('aws_flowcell.sparql')
274 for r in self.execute_query(template, Context()):
275 flowcell = r['flowcell']
276 self.model.parse(source=flowcell, format='rdfa')
279 def find_best_match(self, filename):
280 """Search through potential filename matching patterns
282 if self.__view_map is None:
283 self.__view_map = self._get_filename_view_map()
286 for pattern, view in self.__view_map.items():
287 if re.match(pattern, filename):
291 msg = "%s matched multiple views %s" % (
293 [str(x) for x in results])
294 raise ModelException(msg)
295 elif len(results) == 1:
300 def _get_filename_view_map(self):
301 """Query our model for filename patterns
303 return a dictionary of compiled regular expressions to view names
305 filename_query = (None, dafTermOntology['filename_re'], None)
308 for s in self.model.triples(filename_query):
310 literal_re = s[2].value
311 LOGGER.debug("Found: %s" % (literal_re,))
313 filename_re = re.compile(literal_re)
314 except re.error as e:
315 LOGGER.error("Unable to compile: %s" % (literal_re,))
316 patterns[literal_re] = view_name
319 def make_submission_name(self, analysis_dir):
320 analysis_dir = os.path.normpath(analysis_dir)
321 analysis_dir_name = os.path.split(analysis_dir)[1]
322 if len(analysis_dir_name) == 0:
324 "Submission dir name too short: {0}".format(analysis_dir))
325 return analysis_dir_name
327 def get_submission_node(self, analysis_dir):
328 """Convert a submission directory name to a submission node
330 submission_name = self.make_submission_name(analysis_dir)
331 return self.submissionSetNS[submission_name]
333 def _get_library_attribute(self, libNode, attribute):
334 if not isinstance(libNode, URIRef):
335 raise ValueError("libNode must be a URIRef")
336 if not isinstance(attribute, URIRef):
337 attribute = libraryOntology[attribute]
339 targets = list(self.model.objects(libNode, attribute))
341 return self._format_library_attribute(targets)
345 #targets = self._search_same_as(libNode, attribute)
346 #if targets is not None:
347 # return self._format_library_attribute(targets)
349 # we don't know anything about this attribute
350 self._add_library_details_to_model(libNode)
352 targets = list(self.model.objects(libNode, attribute))
354 return self._format_library_attribute(targets)
358 def _format_library_attribute(self, targets):
359 if len(targets) == 0:
361 elif len(targets) == 1:
362 return targets[0].toPython()
363 elif len(targets) > 1:
364 return [t.toPython() for t in targets]
366 def _is_paired(self, libNode):
367 """Determine if a library is paired end"""
368 library_type = self._get_library_attribute(libNode, 'library_type')
369 if library_type is None:
370 errmsg = "%s doesn't have a library type"
371 raise ModelException(errmsg % (str(libNode),))
373 single = ['CSHL (lacking last nt)',
374 'Single End (non-multiplexed)',
375 'Small RNA (non-multiplexed)',]
376 paired = ['Barcoded Illumina',
378 'NEBNext Multiplexed',
381 'Paired End (non-multiplexed)',
382 'Dual Index Illumina',]
383 if library_type in single:
385 elif library_type in paired:
388 raise MetadataLookupException(
389 "Unrecognized library type %s for %s" % \
390 (library_type, str(libNode)))
392 def execute_query(self, template, context):
393 """Execute the query, returning the results
395 formatted_query = template.render(context)
396 LOGGER.debug(formatted_query)
397 rdfstream = self.model.query(str(formatted_query))
399 for record in rdfstream:
401 for key, value in record.items():
402 d[key] = value.toPython()
407 def list_submissions(model):
408 """Return generator of submissions in this model.
411 PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
413 select distinct ?submission
414 where { ?submission subns:has_submission ?library_dir }
416 q = (None, submissionOntology['has_submission'], None)
418 for statement in model.triples(q):
419 s = strip_namespace(submissionLog, statement[0])
420 if s[-1] in ['#', '/', '?']:
424 return list(submissions)