1 """Common submission elements
9 from htsworkflow.util.rdfhelp import \
21 from htsworkflow.util.hashfile import make_md5sum
23 from htsworkflow.submission.daf import \
24 MetadataLookupException, \
27 LOGGER = logging.getLogger(__name__)
29 class Submission(object):
30 def __init__(self, name, model, host):
34 self.submissionSet = get_submission_uri(self.name)
35 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
36 self.libraryNS = RDF.NS('{0}/library/'.format(host))
38 self.__view_map = None
40 def scan_submission_dirs(self, result_map):
41 """Examine files in our result directory
43 for lib_id, result_dir in result_map.items():
44 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
46 self.import_analysis_dir(result_dir, lib_id)
47 except MetadataLookupException, e:
48 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
50 def import_analysis_dir(self, analysis_dir, library_id):
51 """Import a submission directories and update our model as needed
53 #attributes = get_filename_attribute_map(paired)
54 libNode = self.libraryNS[library_id + "/"]
56 self._add_library_details_to_model(libNode)
58 submission_files = os.listdir(analysis_dir)
59 for filename in submission_files:
60 pathname = os.path.abspath(os.path.join(analysis_dir, filename))
61 self.construct_file_attributes(analysis_dir, libNode, pathname)
63 def construct_file_attributes(self, analysis_dir, libNode, pathname):
64 """Looking for the best extension
65 The 'best' is the longest match
68 filename (str): the filename whose extention we are about to examine
70 path, filename = os.path.split(pathname)
72 LOGGER.debug("Searching for view")
73 file_type = self.find_best_match(filename)
75 LOGGER.warn("Unrecognized file: {0}".format(pathname))
77 if str(file_type) == str(libraryOntology['ignore']):
80 an_analysis_name = self.make_submission_name(analysis_dir)
81 an_analysis = self.get_submission_node(analysis_dir)
82 an_analysis_uri = str(an_analysis.uri)
83 file_classification = self.model.get_target(file_type,
85 if file_classification is None:
86 errmsg = 'Could not find class for {0}'
87 LOGGER.warning(errmsg.format(str(file_type)))
90 self.model.add_statement(
91 RDF.Statement(self.submissionSetNS[''],
92 submissionOntology['has_submission'],
94 self.model.add_statement(RDF.Statement(an_analysis,
95 submissionOntology['name'],
96 toTypedNode(an_analysis_name)))
97 self.model.add_statement(
98 RDF.Statement(an_analysis,
100 submissionOntology['submission']))
101 self.model.add_statement(RDF.Statement(an_analysis,
102 submissionOntology['library'],
105 LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
106 # add track specific information
107 self.model.add_statement(
108 RDF.Statement(an_analysis,
109 dafTermOntology['paired'],
110 toTypedNode(self._is_paired(libNode))))
111 self.model.add_statement(
112 RDF.Statement(an_analysis,
113 dafTermOntology['submission'],
116 # add file specific information
117 fileNode = self.link_file_to_classes(pathname,
121 self.add_md5s(filename, fileNode, analysis_dir)
122 self.model.add_statement(
123 RDF.Statement(fileNode,
126 LOGGER.debug("Done.")
128 def link_file_to_classes(self, pathname, submissionNode, submission_uri, analysis_dir):
129 # add file specific information
130 path, filename = os.path.split(pathname)
131 fileNode = RDF.Node(RDF.Uri('file://'+ os.path.abspath(pathname)))
132 self.model.add_statement(
133 RDF.Statement(submissionNode,
134 dafTermOntology['has_file'],
136 self.model.add_statement(
137 RDF.Statement(fileNode,
138 dafTermOntology['filename'],
142 def add_md5s(self, filename, fileNode, analysis_dir):
143 LOGGER.debug("Updating file md5sum")
144 submission_pathname = os.path.join(analysis_dir, filename)
145 md5 = make_md5sum(submission_pathname)
147 errmsg = "Unable to produce md5sum for {0}"
148 LOGGER.warning(errmsg.format(submission_pathname))
150 self.model.add_statement(
151 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
153 def _add_library_details_to_model(self, libNode):
154 # attributes that can have multiple values
155 set_attributes = set((libraryOntology['has_lane'],
156 libraryOntology['has_mappings'],
157 dafTermOntology['has_file']))
158 parser = RDF.Parser(name='rdfa')
159 new_statements = parser.parse_as_stream(libNode.uri)
161 for s in new_statements:
162 # always add "collections"
163 if s.predicate in set_attributes:
166 # don't override things we already have in the model
167 targets = list(self.model.get_targets(s.subject, s.predicate))
168 if len(targets) == 0:
174 self._add_lane_details(libNode)
176 def _add_lane_details(self, libNode):
177 """Import lane details
179 query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
181 for lane_stmt in self.model.find_statements(query):
182 lanes.append(lane_stmt.object)
184 parser = RDF.Parser(name='rdfa')
186 LOGGER.debug("Importing %s" % (lane.uri,))
188 parser.parse_into_model(self.model, lane.uri)
189 except RDF.RedlandError, e:
190 LOGGER.error("Error accessing %s" % (lane.uri,))
194 def find_best_match(self, filename):
195 """Search through potential filename matching patterns
197 if self.__view_map is None:
198 self.__view_map = self._get_filename_view_map()
201 for pattern, view in self.__view_map.items():
202 if re.match(pattern, filename):
206 msg = "%s matched multiple views %s" % (
208 [str(x) for x in results])
209 raise ModelException(msg)
210 elif len(results) == 1:
215 def _get_filename_view_map(self):
216 """Query our model for filename patterns
218 return a dictionary of compiled regular expressions to view names
220 filename_query = RDF.Statement(
221 None, dafTermOntology['filename_re'], None)
224 for s in self.model.find_statements(filename_query):
225 view_name = s.subject
226 literal_re = s.object.literal_value['string']
227 LOGGER.debug("Found: %s" % (literal_re,))
229 filename_re = re.compile(literal_re)
231 LOGGER.error("Unable to compile: %s" % (literal_re,))
232 patterns[literal_re] = view_name
235 def make_submission_name(self, analysis_dir):
236 analysis_dir = os.path.normpath(analysis_dir)
237 analysis_dir_name = os.path.split(analysis_dir)[1]
238 if len(analysis_dir_name) == 0:
240 "Submission dir name too short: {0}".format(analysis_dir))
241 return analysis_dir_name
243 def get_submission_node(self, analysis_dir):
244 """Convert a submission directory name to a submission node
246 submission_name = self.make_submission_name(analysis_dir)
247 return self.submissionSetNS[submission_name]
249 def _get_library_attribute(self, libNode, attribute):
250 if not isinstance(attribute, RDF.Node):
251 attribute = libraryOntology[attribute]
253 targets = list(self.model.get_targets(libNode, attribute))
255 return self._format_library_attribute(targets)
259 #targets = self._search_same_as(libNode, attribute)
260 #if targets is not None:
261 # return self._format_library_attribute(targets)
263 # we don't know anything about this attribute
264 self._add_library_details_to_model(libNode)
266 targets = list(self.model.get_targets(libNode, attribute))
268 return self._format_library_attribute(targets)
272 def _format_library_attribute(self, targets):
273 if len(targets) == 0:
275 elif len(targets) == 1:
276 return fromTypedNode(targets[0])
277 elif len(targets) > 1:
278 return [fromTypedNode(t) for t in targets]
280 def _is_paired(self, libNode):
281 """Determine if a library is paired end"""
282 library_type = self._get_library_attribute(libNode, 'library_type')
283 if library_type is None:
284 errmsg = "%s doesn't have a library type"
285 raise ModelException(errmsg % (str(libNode),))
287 single = ['CSHL (lacking last nt)',
288 'Single End (non-multiplexed)',
289 'Small RNA (non-multiplexed)',]
290 paired = ['Barcoded Illumina',
293 'Paired End (non-multiplexed)',]
294 if library_type in single:
296 elif library_type in paired:
299 raise MetadataLookupException(
300 "Unrecognized library type %s for %s" % \
301 (library_type, str(libNode)))
303 def execute_query(self, template, context):
304 """Execute the query, returning the results
306 formatted_query = template.render(context)
307 LOGGER.debug(formatted_query)
308 query = RDF.SPARQLQuery(str(formatted_query))
309 rdfstream = query.execute(self.model)