Merge ssh://jumpgate.caltech.edu/var/htsworkflow/htsworkflow
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dafTermOntology, \
12      dump_model, \
13      get_model, \
14      libraryOntology, \
15      owlNS, \
16      rdfNS, \
17      submissionLog, \
18      submissionOntology, \
19      toTypedNode, \
20      fromTypedNode
21 from htsworkflow.util.hashfile import make_md5sum
22
23 from htsworkflow.submission.daf import \
24      MetadataLookupException, \
25      get_submission_uri
26
27 LOGGER = logging.getLogger(__name__)
28
29 class Submission(object):
30     def __init__(self, name, model):
31         self.name = name
32         self.model = model
33
34         self.submissionSet = get_submission_uri(self.name)
35         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
36         self.libraryNS = RDF.NS('http://jumpgate.caltech.edu/library/')
37
38         self.__view_map = None
39
40     def scan_submission_dirs(self, result_map):
41         """Examine files in our result directory
42         """
43         for lib_id, result_dir in result_map.items():
44             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
45             try:
46                 self.import_analysis_dir(result_dir, lib_id)
47             except MetadataLookupException, e:
48                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
49
50     def import_analysis_dir(self, analysis_dir, library_id):
51         """Import a submission directories and update our model as needed
52         """
53         #attributes = get_filename_attribute_map(paired)
54         libNode = self.libraryNS[library_id + "/"]
55
56         self._add_library_details_to_model(libNode)
57
58         submission_files = os.listdir(analysis_dir)
59         for filename in submission_files:
60             self.construct_file_attributes(analysis_dir, libNode, filename)
61
62     def construct_file_attributes(self, analysis_dir, libNode, pathname):
63         """Looking for the best extension
64         The 'best' is the longest match
65
66         :Args:
67         filename (str): the filename whose extention we are about to examine
68         """
69         path, filename = os.path.split(pathname)
70
71         LOGGER.debug("Searching for view")
72         file_type = self.find_best_match(filename)
73         if file_type is None:
74             LOGGER.warn("Unrecognized file: {0}".format(pathname))
75             return None
76         if str(file_type) == str(libraryOntology['ignore']):
77             return None
78
79         an_analysis_name = self.make_submission_name(analysis_dir)
80         an_analysis = self.get_submission_node(analysis_dir)
81         an_analysis_uri = str(an_analysis.uri)
82         file_classification = self.model.get_target(file_type,
83                                                     rdfNS['type'])
84         if file_classification is None:
85             errmsg = 'Could not find class for {0}'
86             logger.warning(errmsg.format(str(file_type)))
87             return
88
89         self.model.add_statement(
90             RDF.Statement(self.submissionSetNS[''],
91                           submissionOntology['has_submission'],
92                           an_analysis))
93         self.model.add_statement(RDF.Statement(an_analysis,
94                                                submissionOntology['name'],
95                                                toTypedNode(an_analysis_name)))
96         self.model.add_statement(
97             RDF.Statement(an_analysis,
98                           rdfNS['type'],
99                           submissionOntology['submission']))
100         self.model.add_statement(RDF.Statement(an_analysis,
101                                                submissionOntology['library'],
102                                                libNode))
103
104         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
105         # add track specific information
106         self.model.add_statement(
107             RDF.Statement(an_analysis,
108                           dafTermOntology['paired'],
109                           toTypedNode(self._is_paired(libNode))))
110         self.model.add_statement(
111             RDF.Statement(an_analysis,
112                           dafTermOntology['submission'],
113                           an_analysis))
114
115         # add file specific information
116         fileNode = self.link_file_to_classes(filename,
117                                              an_analysis,
118                                              an_analysis_uri,
119                                              analysis_dir)
120         self.add_md5s(filename, fileNode, analysis_dir)
121         self.model.add_statement(
122             RDF.Statement(fileNode,
123                           rdfNS['type'],
124                           file_type))
125         LOGGER.debug("Done.")
126
127     def link_file_to_classes(self, filename, submissionNode, submission_uri, analysis_dir):
128         # add file specific information
129         fileNode = RDF.Node(RDF.Uri(submission_uri + '/' + filename))
130         self.model.add_statement(
131             RDF.Statement(submissionNode,
132                           dafTermOntology['has_file'],
133                           fileNode))
134         self.model.add_statement(
135             RDF.Statement(fileNode,
136                           dafTermOntology['filename'],
137                           filename))
138         return fileNode
139
140     def add_md5s(self, filename, fileNode, analysis_dir):
141         LOGGER.debug("Updating file md5sum")
142         submission_pathname = os.path.join(analysis_dir, filename)
143         md5 = make_md5sum(submission_pathname)
144         if md5 is None:
145             errmsg = "Unable to produce md5sum for {0}"
146             LOGGER.warning(errmsg.format(submission_pathname))
147         else:
148             self.model.add_statement(
149                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
150
151     def _add_library_details_to_model(self, libNode):
152         parser = RDF.Parser(name='rdfa')
153         new_statements = parser.parse_as_stream(libNode.uri)
154         for s in new_statements:
155             # don't override things we already have in the model
156             targets = list(self.model.get_targets(s.subject, s.predicate))
157             if len(targets) == 0:
158                 self.model.append(s)
159
160
161     def find_best_match(self, filename):
162         """Search through potential filename matching patterns
163         """
164         if self.__view_map is None:
165             self.__view_map = self._get_filename_view_map()
166
167         results = []
168         for pattern, view in self.__view_map.items():
169             if re.match(pattern, filename):
170                 results.append(view)
171
172         if len(results) > 1:
173             msg = "%s matched multiple views %s" % (
174                 filename,
175                 [str(x) for x in results])
176             raise ModelException(msg)
177         elif len(results) == 1:
178             return results[0]
179         else:
180             return None
181
182     def _get_filename_view_map(self):
183         """Query our model for filename patterns
184
185         return a dictionary of compiled regular expressions to view names
186         """
187         filename_query = RDF.Statement(
188             None, dafTermOntology['filename_re'], None)
189
190         patterns = {}
191         for s in self.model.find_statements(filename_query):
192             view_name = s.subject
193             literal_re = s.object.literal_value['string']
194             LOGGER.debug("Found: %s" % (literal_re,))
195             try:
196                 filename_re = re.compile(literal_re)
197             except re.error, e:
198                 LOGGER.error("Unable to compile: %s" % (literal_re,))
199             patterns[literal_re] = view_name
200         return patterns
201
202     def make_submission_name(self, analysis_dir):
203         analysis_dir = os.path.normpath(analysis_dir)
204         analysis_dir_name = os.path.split(analysis_dir)[1]
205         if len(analysis_dir_name) == 0:
206             raise RuntimeError(
207                 "Submission dir name too short: {0}".format(analysis_dir))
208         return analysis_dir_name
209
210     def get_submission_node(self, analysis_dir):
211         """Convert a submission directory name to a submission node
212         """
213         submission_name = self.make_submission_name(analysis_dir)
214         return self.submissionSetNS[submission_name]
215
216     def _get_library_attribute(self, libNode, attribute):
217         if not isinstance(attribute, RDF.Node):
218             attribute = libraryOntology[attribute]
219
220         targets = list(self.model.get_targets(libNode, attribute))
221         if len(targets) > 0:
222             return self._format_library_attribute(targets)
223         else:
224             return None
225
226         #targets = self._search_same_as(libNode, attribute)
227         #if targets is not None:
228         #    return self._format_library_attribute(targets)
229
230         # we don't know anything about this attribute
231         self._add_library_details_to_model(libNode)
232
233         targets = list(self.model.get_targets(libNode, attribute))
234         if len(targets) > 0:
235             return self._format_library_attribute(targets)
236
237         return None
238
239     def _format_library_attribute(self, targets):
240         if len(targets) == 0:
241             return None
242         elif len(targets) == 1:
243             return fromTypedNode(targets[0])
244         elif len(targets) > 1:
245             return [fromTypedNode(t) for t in targets]
246
247     def _is_paired(self, libNode):
248         """Determine if a library is paired end"""
249         library_type = self._get_library_attribute(libNode, 'library_type')
250         if library_type is None:
251             errmsg = "%s doesn't have a library type"
252             raise ModelException(errmsg % (str(libNode),))
253
254         single = ['CSHL (lacking last nt)',
255                   'Single End (non-multiplexed)',
256                   'Small RNA (non-multiplexed)',]
257         paired = ['Barcoded Illumina',
258                   'Multiplexing',
259                   'Nextera',
260                   'Paired End (non-multiplexed)',]
261         if library_type in single:
262             return False
263         elif library_type in paired:
264             return True
265         else:
266             raise MetadataLookupException(
267                 "Unrecognized library type %s for %s" % \
268                 (library_type, str(libNode)))
269
270     def execute_query(self, template, context):
271         """Execute the query, returning the results
272         """
273         formatted_query = template.render(context)
274         LOGGER.debug(formatted_query)
275         query = RDF.SPARQLQuery(str(formatted_query))
276         rdfstream = query.execute(self.model)
277         results = []
278         for r in rdfstream:
279             results.append(r)
280         return results