1 """Common submission elements
9 from htsworkflow.util.rdfhelp import \
21 from htsworkflow.util.hashfile import make_md5sum
22 from htsworkflow.submission.fastqname import FastqName
23 from htsworkflow.submission.daf import \
24 MetadataLookupException, \
28 LOGGER = logging.getLogger(__name__)
30 class Submission(object):
31 def __init__(self, name, model, host):
35 self.submissionSet = get_submission_uri(self.name)
36 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
37 self.libraryNS = RDF.NS('{0}/library/'.format(host))
39 self.__view_map = None
41 def scan_submission_dirs(self, result_map):
42 """Examine files in our result directory
44 for lib_id, result_dir in result_map.items():
45 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
47 self.import_analysis_dir(result_dir, lib_id)
48 except MetadataLookupException, e:
49 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
51 def import_analysis_dir(self, analysis_dir, library_id):
52 """Import a submission directories and update our model as needed
54 #attributes = get_filename_attribute_map(paired)
55 libNode = self.libraryNS[library_id + "/"]
57 self._add_library_details_to_model(libNode)
59 submission_files = os.listdir(analysis_dir)
60 for filename in submission_files:
61 pathname = os.path.abspath(os.path.join(analysis_dir, filename))
62 self.construct_file_attributes(analysis_dir, libNode, pathname)
64 def construct_file_attributes(self, analysis_dir, libNode, pathname):
65 """Looking for the best extension
66 The 'best' is the longest match
69 filename (str): the filename whose extention we are about to examine
71 path, filename = os.path.split(pathname)
73 LOGGER.debug("Searching for view")
74 file_type = self.find_best_match(filename)
76 LOGGER.warn("Unrecognized file: {0}".format(pathname))
78 if str(file_type) == str(libraryOntology['ignore']):
81 an_analysis_name = self.make_submission_name(analysis_dir)
82 an_analysis = self.get_submission_node(analysis_dir)
83 an_analysis_uri = str(an_analysis.uri)
84 file_classification = self.model.get_target(file_type,
86 if file_classification is None:
87 errmsg = 'Could not find class for {0}'
88 LOGGER.warning(errmsg.format(str(file_type)))
91 self.model.add_statement(
92 RDF.Statement(self.submissionSetNS[''],
93 submissionOntology['has_submission'],
95 self.model.add_statement(RDF.Statement(an_analysis,
96 submissionOntology['name'],
97 toTypedNode(an_analysis_name)))
98 self.model.add_statement(
99 RDF.Statement(an_analysis,
101 submissionOntology['submission']))
102 self.model.add_statement(RDF.Statement(an_analysis,
103 submissionOntology['library'],
106 LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
107 # add track specific information
108 self.model.add_statement(
109 RDF.Statement(an_analysis,
110 dafTermOntology['paired'],
111 toTypedNode(self._is_paired(libNode))))
112 self.model.add_statement(
113 RDF.Statement(an_analysis,
114 dafTermOntology['submission'],
117 # add file specific information
118 fileNode = self.make_file_node(pathname, an_analysis)
119 self.add_md5s(filename, fileNode, analysis_dir)
120 self.add_fastq_metadata(filename, fileNode)
121 self.model.add_statement(
122 RDF.Statement(fileNode,
125 self.model.add_statement(
126 RDF.Statement(fileNode,
127 libraryOntology['library'],
130 LOGGER.debug("Done.")
132 def make_file_node(self, pathname, submissionNode):
133 """Create file node and attach it to its submission.
135 # add file specific information
136 path, filename = os.path.split(pathname)
137 pathname = os.path.abspath(pathname)
138 fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
139 self.model.add_statement(
140 RDF.Statement(submissionNode,
141 dafTermOntology['has_file'],
143 self.model.add_statement(
144 RDF.Statement(fileNode,
145 dafTermOntology['filename'],
147 self.model.add_statement(
148 RDF.Statement(fileNode,
149 dafTermOntology['relative_path'],
150 os.path.relpath(pathname)))
153 def add_md5s(self, filename, fileNode, analysis_dir):
154 LOGGER.debug("Updating file md5sum")
155 submission_pathname = os.path.join(analysis_dir, filename)
156 md5 = make_md5sum(submission_pathname)
158 errmsg = "Unable to produce md5sum for {0}"
159 LOGGER.warning(errmsg.format(submission_pathname))
161 self.model.add_statement(
162 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
164 def add_fastq_metadata(self, filename, fileNode):
165 # How should I detect if this is actually a fastq file?
167 fqname = FastqName(filename=filename)
169 # currently its just ignore it if the fastq name parser fails
172 terms = [('flowcell', libraryOntology['flowcell_id']),
173 ('lib_id', libraryOntology['library_id']),
174 ('lane', libraryOntology['lane_number']),
175 ('read', libraryOntology['read']),
176 ('cycle', libraryOntology['read_length'])]
177 for file_term, model_term in terms:
178 value = fqname.get(file_term)
179 if value is not None:
180 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
183 def _add_library_details_to_model(self, libNode):
184 # attributes that can have multiple values
185 set_attributes = set((libraryOntology['has_lane'],
186 libraryOntology['has_mappings'],
187 dafTermOntology['has_file']))
188 parser = RDF.Parser(name='rdfa')
190 new_statements = parser.parse_as_stream(libNode.uri)
191 except RDF.RedlandError as e:
194 LOGGER.debug("Scanning %s", str(libNode.uri))
196 for s in new_statements:
197 # always add "collections"
198 if s.predicate in set_attributes:
201 # don't override things we already have in the model
202 targets = list(self.model.get_targets(s.subject, s.predicate))
203 if len(targets) == 0:
209 self._add_lane_details(libNode)
211 def _add_lane_details(self, libNode):
212 """Import lane details
214 query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
216 for lane_stmt in self.model.find_statements(query):
217 lanes.append(lane_stmt.object)
219 parser = RDF.Parser(name='rdfa')
221 LOGGER.debug("Importing %s" % (lane.uri,))
223 parser.parse_into_model(self.model, lane.uri)
224 except RDF.RedlandError, e:
225 LOGGER.error("Error accessing %s" % (lane.uri,))
229 def find_best_match(self, filename):
230 """Search through potential filename matching patterns
232 if self.__view_map is None:
233 self.__view_map = self._get_filename_view_map()
236 for pattern, view in self.__view_map.items():
237 if re.match(pattern, filename):
241 msg = "%s matched multiple views %s" % (
243 [str(x) for x in results])
244 raise ModelException(msg)
245 elif len(results) == 1:
250 def _get_filename_view_map(self):
251 """Query our model for filename patterns
253 return a dictionary of compiled regular expressions to view names
255 filename_query = RDF.Statement(
256 None, dafTermOntology['filename_re'], None)
259 for s in self.model.find_statements(filename_query):
260 view_name = s.subject
261 literal_re = s.object.literal_value['string']
262 LOGGER.debug("Found: %s" % (literal_re,))
264 filename_re = re.compile(literal_re)
266 LOGGER.error("Unable to compile: %s" % (literal_re,))
267 patterns[literal_re] = view_name
270 def make_submission_name(self, analysis_dir):
271 analysis_dir = os.path.normpath(analysis_dir)
272 analysis_dir_name = os.path.split(analysis_dir)[1]
273 if len(analysis_dir_name) == 0:
275 "Submission dir name too short: {0}".format(analysis_dir))
276 return analysis_dir_name
278 def get_submission_node(self, analysis_dir):
279 """Convert a submission directory name to a submission node
281 submission_name = self.make_submission_name(analysis_dir)
282 return self.submissionSetNS[submission_name]
284 def _get_library_attribute(self, libNode, attribute):
285 if not isinstance(attribute, RDF.Node):
286 attribute = libraryOntology[attribute]
288 targets = list(self.model.get_targets(libNode, attribute))
290 return self._format_library_attribute(targets)
294 #targets = self._search_same_as(libNode, attribute)
295 #if targets is not None:
296 # return self._format_library_attribute(targets)
298 # we don't know anything about this attribute
299 self._add_library_details_to_model(libNode)
301 targets = list(self.model.get_targets(libNode, attribute))
303 return self._format_library_attribute(targets)
307 def _format_library_attribute(self, targets):
308 if len(targets) == 0:
310 elif len(targets) == 1:
311 return fromTypedNode(targets[0])
312 elif len(targets) > 1:
313 return [fromTypedNode(t) for t in targets]
315 def _is_paired(self, libNode):
316 """Determine if a library is paired end"""
317 library_type = self._get_library_attribute(libNode, 'library_type')
318 if library_type is None:
319 errmsg = "%s doesn't have a library type"
320 raise ModelException(errmsg % (str(libNode),))
322 single = ['CSHL (lacking last nt)',
323 'Single End (non-multiplexed)',
324 'Small RNA (non-multiplexed)',]
325 paired = ['Barcoded Illumina',
328 'Paired End (non-multiplexed)',
329 'Dual Index Illumina',]
330 if library_type in single:
332 elif library_type in paired:
335 raise MetadataLookupException(
336 "Unrecognized library type %s for %s" % \
337 (library_type, str(libNode)))
339 def execute_query(self, template, context):
340 """Execute the query, returning the results
342 formatted_query = template.render(context)
343 LOGGER.debug(formatted_query)
344 query = RDF.SPARQLQuery(str(formatted_query))
345 rdfstream = query.execute(self.model)
347 for record in rdfstream:
349 for key, value in record.items():
350 d[key] = fromTypedNode(value)