2b04ff43a05ce6e70d6ccb7b20f7f17d54165699
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dafTermOntology, \
12      dump_model, \
13      get_model, \
14      libraryOntology, \
15      owlNS, \
16      rdfNS, \
17      submissionLog, \
18      submissionOntology, \
19      toTypedNode, \
20      fromTypedNode
21 from htsworkflow.util.hashfile import make_md5sum
22
23 from htsworkflow.submission.daf import \
24      MetadataLookupException, \
25      get_submission_uri
26
27 LOGGER = logging.getLogger(__name__)
28
29 class Submission(object):
30     def __init__(self, name, model):
31         self.name = name
32         self.model = model
33
34         self.submissionSet = get_submission_uri(self.name)
35         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
36         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
37
38         self.__view_map = None
39
40     def scan_submission_dirs(self, result_map):
41         """Examine files in our result directory
42         """
43         for lib_id, result_dir in result_map.items():
44             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
45             try:
46                 self.import_analysis_dir(result_dir, lib_id)
47             except MetadataLookupException, e:
48                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
49
50     def import_analysis_dir(self, analysis_dir, library_id):
51         """Import a submission directories and update our model as needed
52         """
53         #attributes = get_filename_attribute_map(paired)
54         libNode = self.libraryNS[library_id + "/"]
55
56         self._add_library_details_to_model(libNode)
57
58         submission_files = os.listdir(analysis_dir)
59         for filename in submission_files:
60             self.construct_file_attributes(analysis_dir, libNode, filename)
61
62     def construct_file_attributes(self, analysis_dir, libNode, pathname):
63         """Looking for the best extension
64         The 'best' is the longest match
65
66         :Args:
67         filename (str): the filename whose extention we are about to examine
68         """
69         path, filename = os.path.split(pathname)
70
71         LOGGER.debug("Searching for view")
72         file_type = self.find_best_match(filename)
73         if file_type is None:
74             LOGGER.warn("Unrecognized file: {0}".format(pathname))
75             return None
76         if str(file_type) == str(libraryOntology['ignore']):
77             return None
78
79         an_analysis_name = self.make_submission_name(analysis_dir)
80         an_analysis = self.get_submission_node(analysis_dir)
81         an_analysis_uri = str(an_analysis.uri)
82         file_classification = self.model.get_target(file_type,
83                                                     rdfNS['type'])
84         if file_classification is None:
85             errmsg = 'Could not find class for {0}'
86             LOGGER.warning(errmsg.format(str(file_type)))
87             return
88
89         self.model.add_statement(
90             RDF.Statement(self.submissionSetNS[''],
91                           submissionOntology['has_submission'],
92                           an_analysis))
93         self.model.add_statement(RDF.Statement(an_analysis,
94                                                submissionOntology['name'],
95                                                toTypedNode(an_analysis_name)))
96         self.model.add_statement(
97             RDF.Statement(an_analysis,
98                           rdfNS['type'],
99                           submissionOntology['submission']))
100         self.model.add_statement(RDF.Statement(an_analysis,
101                                                submissionOntology['library'],
102                                                libNode))
103
104         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
105         # add track specific information
106         self.model.add_statement(
107             RDF.Statement(an_analysis,
108                           dafTermOntology['paired'],
109                           toTypedNode(self._is_paired(libNode))))
110         self.model.add_statement(
111             RDF.Statement(an_analysis,
112                           dafTermOntology['submission'],
113                           an_analysis))
114
115         # add file specific information
116         fileNode = self.link_file_to_classes(filename,
117                                              an_analysis,
118                                              an_analysis_uri,
119                                              analysis_dir)
120         self.add_md5s(filename, fileNode, analysis_dir)
121         self.model.add_statement(
122             RDF.Statement(fileNode,
123                           rdfNS['type'],
124                           file_type))
125         LOGGER.debug("Done.")
126
127     def link_file_to_classes(self, filename, submissionNode, submission_uri, analysis_dir):
128         # add file specific information
129         fileNode = RDF.Node(RDF.Uri('file://'+ os.path.abspath(filename)))
130         self.model.add_statement(
131             RDF.Statement(submissionNode,
132                           dafTermOntology['has_file'],
133                           fileNode))
134         self.model.add_statement(
135             RDF.Statement(fileNode,
136                           dafTermOntology['filename'],
137                           filename))
138         return fileNode
139
140     def add_md5s(self, filename, fileNode, analysis_dir):
141         LOGGER.debug("Updating file md5sum")
142         submission_pathname = os.path.join(analysis_dir, filename)
143         md5 = make_md5sum(submission_pathname)
144         if md5 is None:
145             errmsg = "Unable to produce md5sum for {0}"
146             LOGGER.warning(errmsg.format(submission_pathname))
147         else:
148             self.model.add_statement(
149                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
150
151     def _add_library_details_to_model(self, libNode):
152         # attributes that can have multiple values
153         set_attributes = set((libraryOntology['has_lane'],
154                               libraryOntology['has_mappings'],
155                               dafTermOntology['has_file']))
156         parser = RDF.Parser(name='rdfa')
157         new_statements = parser.parse_as_stream(libNode.uri)
158         toadd = []
159         for s in new_statements:
160             # always add "collections"
161             if s.predicate in set_attributes:
162                 toadd.append(s)
163                 continue
164             # don't override things we already have in the model
165             targets = list(self.model.get_targets(s.subject, s.predicate))
166             if len(targets) == 0:
167                 toadd.append(s)
168
169         for s in toadd:
170             self.model.append(s)
171
172         self._add_lane_details(libNode)
173
174     def _add_lane_details(self, libNode):
175         """Import lane details
176         """
177         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
178         lanes = []
179         for lane_stmt in self.model.find_statements(query):
180             lanes.append(lane_stmt.object)
181
182         parser = RDF.Parser(name='rdfa')
183         for lane in lanes:
184             LOGGER.debug("Importing %s" % (lane.uri,))
185             try:
186                 parser.parse_into_model(self.model, lane.uri)
187             except RDF.RedlandError, e:
188                 LOGGER.error("Error accessing %s" % (lane.uri,))
189                 raise e
190
191
192     def find_best_match(self, filename):
193         """Search through potential filename matching patterns
194         """
195         if self.__view_map is None:
196             self.__view_map = self._get_filename_view_map()
197
198         results = []
199         for pattern, view in self.__view_map.items():
200             if re.match(pattern, filename):
201                 results.append(view)
202
203         if len(results) > 1:
204             msg = "%s matched multiple views %s" % (
205                 filename,
206                 [str(x) for x in results])
207             raise ModelException(msg)
208         elif len(results) == 1:
209             return results[0]
210         else:
211             return None
212
213     def _get_filename_view_map(self):
214         """Query our model for filename patterns
215
216         return a dictionary of compiled regular expressions to view names
217         """
218         filename_query = RDF.Statement(
219             None, dafTermOntology['filename_re'], None)
220
221         patterns = {}
222         for s in self.model.find_statements(filename_query):
223             view_name = s.subject
224             literal_re = s.object.literal_value['string']
225             LOGGER.debug("Found: %s" % (literal_re,))
226             try:
227                 filename_re = re.compile(literal_re)
228             except re.error, e:
229                 LOGGER.error("Unable to compile: %s" % (literal_re,))
230             patterns[literal_re] = view_name
231         return patterns
232
233     def make_submission_name(self, analysis_dir):
234         analysis_dir = os.path.normpath(analysis_dir)
235         analysis_dir_name = os.path.split(analysis_dir)[1]
236         if len(analysis_dir_name) == 0:
237             raise RuntimeError(
238                 "Submission dir name too short: {0}".format(analysis_dir))
239         return analysis_dir_name
240
241     def get_submission_node(self, analysis_dir):
242         """Convert a submission directory name to a submission node
243         """
244         submission_name = self.make_submission_name(analysis_dir)
245         return self.submissionSetNS[submission_name]
246
247     def _get_library_attribute(self, libNode, attribute):
248         if not isinstance(attribute, RDF.Node):
249             attribute = libraryOntology[attribute]
250
251         targets = list(self.model.get_targets(libNode, attribute))
252         if len(targets) > 0:
253             return self._format_library_attribute(targets)
254         else:
255             return None
256
257         #targets = self._search_same_as(libNode, attribute)
258         #if targets is not None:
259         #    return self._format_library_attribute(targets)
260
261         # we don't know anything about this attribute
262         self._add_library_details_to_model(libNode)
263
264         targets = list(self.model.get_targets(libNode, attribute))
265         if len(targets) > 0:
266             return self._format_library_attribute(targets)
267
268         return None
269
270     def _format_library_attribute(self, targets):
271         if len(targets) == 0:
272             return None
273         elif len(targets) == 1:
274             return fromTypedNode(targets[0])
275         elif len(targets) > 1:
276             return [fromTypedNode(t) for t in targets]
277
278     def _is_paired(self, libNode):
279         """Determine if a library is paired end"""
280         library_type = self._get_library_attribute(libNode, 'library_type')
281         if library_type is None:
282             errmsg = "%s doesn't have a library type"
283             raise ModelException(errmsg % (str(libNode),))
284
285         single = ['CSHL (lacking last nt)',
286                   'Single End (non-multiplexed)',
287                   'Small RNA (non-multiplexed)',]
288         paired = ['Barcoded Illumina',
289                   'Multiplexing',
290                   'Nextera',
291                   'Paired End (non-multiplexed)',]
292         if library_type in single:
293             return False
294         elif library_type in paired:
295             return True
296         else:
297             raise MetadataLookupException(
298                 "Unrecognized library type %s for %s" % \
299                 (library_type, str(libNode)))
300
301     def execute_query(self, template, context):
302         """Execute the query, returning the results
303         """
304         formatted_query = template.render(context)
305         LOGGER.debug(formatted_query)
306         query = RDF.SPARQLQuery(str(formatted_query))
307         rdfstream = query.execute(self.model)
308         results = []
309         for r in rdfstream:
310             results.append(r)
311         return results