1 """Common submission elements
9 from htsworkflow.util.rdfhelp import \
16 from htsworkflow.util.rdfns import *
17 from htsworkflow.util.hashfile import make_md5sum
18 from htsworkflow.submission.fastqname import FastqName
19 from htsworkflow.submission.daf import \
20 MetadataLookupException, \
24 LOGGER = logging.getLogger(__name__)
26 class Submission(object):
27 def __init__(self, name, model, host):
31 self.submissionSet = get_submission_uri(self.name)
32 self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
33 self.libraryNS = RDF.NS('{0}/library/'.format(host))
35 self.__view_map = None
37 def scan_submission_dirs(self, result_map):
38 """Examine files in our result directory
40 for lib_id, result_dir in result_map.items():
41 LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
43 self.import_analysis_dir(result_dir, lib_id)
44 except MetadataLookupException, e:
45 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
47 def import_analysis_dir(self, analysis_dir, library_id):
48 """Import a submission directories and update our model as needed
50 #attributes = get_filename_attribute_map(paired)
51 libNode = self.libraryNS[library_id + "/"]
53 self._add_library_details_to_model(libNode)
55 submission_files = os.listdir(analysis_dir)
56 for filename in submission_files:
57 pathname = os.path.abspath(os.path.join(analysis_dir, filename))
58 self.construct_file_attributes(analysis_dir, libNode, pathname)
60 def analysis_nodes(self, result_map):
61 """Return an iterable of analysis nodes
63 for result_dir in result_map.values():
64 an_analysis = self.get_submission_node(result_dir)
67 def construct_file_attributes(self, analysis_dir, libNode, pathname):
68 """Looking for the best extension
69 The 'best' is the longest match
72 filename (str): the filename whose extention we are about to examine
74 path, filename = os.path.split(pathname)
76 LOGGER.debug("Searching for view")
77 file_type = self.find_best_match(filename)
79 LOGGER.warn("Unrecognized file: {0}".format(pathname))
81 if str(file_type) == str(libraryOntology['ignore']):
84 an_analysis_name = self.make_submission_name(analysis_dir)
85 an_analysis = self.get_submission_node(analysis_dir)
86 an_analysis_uri = str(an_analysis.uri)
87 file_classification = self.model.get_target(file_type,
89 if file_classification is None:
90 errmsg = 'Could not find class for {0}'
91 LOGGER.warning(errmsg.format(str(file_type)))
94 self.model.add_statement(
95 RDF.Statement(self.submissionSetNS[''],
96 submissionOntology['has_submission'],
98 self.model.add_statement(RDF.Statement(an_analysis,
99 submissionOntology['name'],
100 toTypedNode(an_analysis_name)))
101 self.model.add_statement(
102 RDF.Statement(an_analysis,
104 submissionOntology['submission']))
105 self.model.add_statement(RDF.Statement(an_analysis,
106 submissionOntology['library'],
109 LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
110 # add track specific information
111 self.model.add_statement(
112 RDF.Statement(an_analysis,
113 dafTermOntology['paired'],
114 toTypedNode(self._is_paired(libNode))))
115 self.model.add_statement(
116 RDF.Statement(an_analysis,
117 dafTermOntology['submission'],
120 # add file specific information
121 fileNode = self.make_file_node(pathname, an_analysis)
122 self.add_md5s(filename, fileNode, analysis_dir)
123 self.add_fastq_metadata(filename, fileNode)
124 self.model.add_statement(
125 RDF.Statement(fileNode,
128 self.model.add_statement(
129 RDF.Statement(fileNode,
130 libraryOntology['library'],
133 LOGGER.debug("Done.")
135 def make_file_node(self, pathname, submissionNode):
136 """Create file node and attach it to its submission.
138 # add file specific information
139 path, filename = os.path.split(pathname)
140 pathname = os.path.abspath(pathname)
141 fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
142 self.model.add_statement(
143 RDF.Statement(submissionNode,
144 dafTermOntology['has_file'],
146 self.model.add_statement(
147 RDF.Statement(fileNode,
148 dafTermOntology['filename'],
150 self.model.add_statement(
151 RDF.Statement(fileNode,
152 dafTermOntology['relative_path'],
153 os.path.relpath(pathname)))
156 def add_md5s(self, filename, fileNode, analysis_dir):
157 LOGGER.debug("Updating file md5sum")
158 submission_pathname = os.path.join(analysis_dir, filename)
159 md5 = make_md5sum(submission_pathname)
161 errmsg = "Unable to produce md5sum for {0}"
162 LOGGER.warning(errmsg.format(submission_pathname))
164 self.model.add_statement(
165 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
167 def add_fastq_metadata(self, filename, fileNode):
168 # How should I detect if this is actually a fastq file?
170 fqname = FastqName(filename=filename)
172 # currently its just ignore it if the fastq name parser fails
175 terms = [('flowcell', libraryOntology['flowcell_id']),
176 ('lib_id', libraryOntology['library_id']),
177 ('lane', libraryOntology['lane_number']),
178 ('read', libraryOntology['read']),
179 ('cycle', libraryOntology['read_length'])]
180 for file_term, model_term in terms:
181 value = fqname.get(file_term)
182 if value is not None:
183 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
186 def _add_library_details_to_model(self, libNode):
187 # attributes that can have multiple values
188 set_attributes = set((libraryOntology['has_lane'],
189 libraryOntology['has_mappings'],
190 dafTermOntology['has_file']))
191 parser = RDF.Parser(name='rdfa')
193 new_statements = parser.parse_as_stream(libNode.uri)
194 except RDF.RedlandError as e:
197 LOGGER.debug("Scanning %s", str(libNode.uri))
199 for s in new_statements:
200 # always add "collections"
201 if s.predicate in set_attributes:
204 # don't override things we already have in the model
205 targets = list(self.model.get_targets(s.subject, s.predicate))
206 if len(targets) == 0:
212 self._add_lane_details(libNode)
214 def _add_lane_details(self, libNode):
215 """Import lane details
217 query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
219 for lane_stmt in self.model.find_statements(query):
220 lanes.append(lane_stmt.object)
222 parser = RDF.Parser(name='rdfa')
224 LOGGER.debug("Importing %s" % (lane.uri,))
226 parser.parse_into_model(self.model, lane.uri)
227 except RDF.RedlandError, e:
228 LOGGER.error("Error accessing %s" % (lane.uri,))
232 def find_best_match(self, filename):
233 """Search through potential filename matching patterns
235 if self.__view_map is None:
236 self.__view_map = self._get_filename_view_map()
239 for pattern, view in self.__view_map.items():
240 if re.match(pattern, filename):
244 msg = "%s matched multiple views %s" % (
246 [str(x) for x in results])
247 raise ModelException(msg)
248 elif len(results) == 1:
253 def _get_filename_view_map(self):
254 """Query our model for filename patterns
256 return a dictionary of compiled regular expressions to view names
258 filename_query = RDF.Statement(
259 None, dafTermOntology['filename_re'], None)
262 for s in self.model.find_statements(filename_query):
263 view_name = s.subject
264 literal_re = s.object.literal_value['string']
265 LOGGER.debug("Found: %s" % (literal_re,))
267 filename_re = re.compile(literal_re)
269 LOGGER.error("Unable to compile: %s" % (literal_re,))
270 patterns[literal_re] = view_name
273 def make_submission_name(self, analysis_dir):
274 analysis_dir = os.path.normpath(analysis_dir)
275 analysis_dir_name = os.path.split(analysis_dir)[1]
276 if len(analysis_dir_name) == 0:
278 "Submission dir name too short: {0}".format(analysis_dir))
279 return analysis_dir_name
281 def get_submission_node(self, analysis_dir):
282 """Convert a submission directory name to a submission node
284 submission_name = self.make_submission_name(analysis_dir)
285 return self.submissionSetNS[submission_name]
287 def _get_library_attribute(self, libNode, attribute):
288 if not isinstance(attribute, RDF.Node):
289 attribute = libraryOntology[attribute]
291 targets = list(self.model.get_targets(libNode, attribute))
293 return self._format_library_attribute(targets)
297 #targets = self._search_same_as(libNode, attribute)
298 #if targets is not None:
299 # return self._format_library_attribute(targets)
301 # we don't know anything about this attribute
302 self._add_library_details_to_model(libNode)
304 targets = list(self.model.get_targets(libNode, attribute))
306 return self._format_library_attribute(targets)
310 def _format_library_attribute(self, targets):
311 if len(targets) == 0:
313 elif len(targets) == 1:
314 return fromTypedNode(targets[0])
315 elif len(targets) > 1:
316 return [fromTypedNode(t) for t in targets]
318 def _is_paired(self, libNode):
319 """Determine if a library is paired end"""
320 library_type = self._get_library_attribute(libNode, 'library_type')
321 if library_type is None:
322 errmsg = "%s doesn't have a library type"
323 raise ModelException(errmsg % (str(libNode),))
325 single = ['CSHL (lacking last nt)',
326 'Single End (non-multiplexed)',
327 'Small RNA (non-multiplexed)',]
328 paired = ['Barcoded Illumina',
331 'Paired End (non-multiplexed)',
332 'Dual Index Illumina',]
333 if library_type in single:
335 elif library_type in paired:
338 raise MetadataLookupException(
339 "Unrecognized library type %s for %s" % \
340 (library_type, str(libNode)))
342 def execute_query(self, template, context):
343 """Execute the query, returning the results
345 formatted_query = template.render(context)
346 LOGGER.debug(formatted_query)
347 query = RDF.SPARQLQuery(str(formatted_query))
348 rdfstream = query.execute(self.model)
350 for record in rdfstream:
352 for key, value in record.items():
353 d[key] = fromTypedNode(value)
358 def list_submissions(model):
359 """Return generator of submissions in this model.
362 PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
364 select distinct ?submission
365 where { ?submission subns:has_submission ?library_dir }
367 query = RDF.SPARQLQuery(query_body)
368 rdfstream = query.execute(model)
369 for row in rdfstream:
370 s = stripNamespace(submissionLog, row['submission'])
371 if s[-1] in ['#', '/', '?']: