b3e2778ca490b8bbf4a111c1e2c0543f23910b00
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dafTermOntology, \
12      dump_model, \
13      get_model, \
14      libraryOntology, \
15      owlNS, \
16      rdfNS, \
17      submissionLog, \
18      submissionOntology, \
19      toTypedNode, \
20      fromTypedNode
21 from htsworkflow.util.hashfile import make_md5sum
22 from htsworkflow.submission.fastqname import FastqName
23 from htsworkflow.submission.daf import \
24      MetadataLookupException, \
25      get_submission_uri
26
27 LOGGER = logging.getLogger(__name__)
28
29 class Submission(object):
30     def __init__(self, name, model, host):
31         self.name = name
32         self.model = model
33
34         self.submissionSet = get_submission_uri(self.name)
35         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
36         self.libraryNS = RDF.NS('{0}/library/'.format(host))
37
38         self.__view_map = None
39
40     def scan_submission_dirs(self, result_map):
41         """Examine files in our result directory
42         """
43         for lib_id, result_dir in result_map.items():
44             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
45             try:
46                 self.import_analysis_dir(result_dir, lib_id)
47             except MetadataLookupException, e:
48                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
49
50     def import_analysis_dir(self, analysis_dir, library_id):
51         """Import a submission directories and update our model as needed
52         """
53         #attributes = get_filename_attribute_map(paired)
54         libNode = self.libraryNS[library_id + "/"]
55
56         self._add_library_details_to_model(libNode)
57
58         submission_files = os.listdir(analysis_dir)
59         for filename in submission_files:
60             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
61             self.construct_file_attributes(analysis_dir, libNode, pathname)
62
63     def construct_file_attributes(self, analysis_dir, libNode, pathname):
64         """Looking for the best extension
65         The 'best' is the longest match
66
67         :Args:
68         filename (str): the filename whose extention we are about to examine
69         """
70         path, filename = os.path.split(pathname)
71
72         LOGGER.debug("Searching for view")
73         file_type = self.find_best_match(filename)
74         if file_type is None:
75             LOGGER.warn("Unrecognized file: {0}".format(pathname))
76             return None
77         if str(file_type) == str(libraryOntology['ignore']):
78             return None
79
80         an_analysis_name = self.make_submission_name(analysis_dir)
81         an_analysis = self.get_submission_node(analysis_dir)
82         an_analysis_uri = str(an_analysis.uri)
83         file_classification = self.model.get_target(file_type,
84                                                     rdfNS['type'])
85         if file_classification is None:
86             errmsg = 'Could not find class for {0}'
87             LOGGER.warning(errmsg.format(str(file_type)))
88             return
89
90         self.model.add_statement(
91             RDF.Statement(self.submissionSetNS[''],
92                           submissionOntology['has_submission'],
93                           an_analysis))
94         self.model.add_statement(RDF.Statement(an_analysis,
95                                                submissionOntology['name'],
96                                                toTypedNode(an_analysis_name)))
97         self.model.add_statement(
98             RDF.Statement(an_analysis,
99                           rdfNS['type'],
100                           submissionOntology['submission']))
101         self.model.add_statement(RDF.Statement(an_analysis,
102                                                submissionOntology['library'],
103                                                libNode))
104
105         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
106         # add track specific information
107         self.model.add_statement(
108             RDF.Statement(an_analysis,
109                           dafTermOntology['paired'],
110                           toTypedNode(self._is_paired(libNode))))
111         self.model.add_statement(
112             RDF.Statement(an_analysis,
113                           dafTermOntology['submission'],
114                           an_analysis))
115
116         # add file specific information
117         fileNode = self.make_file_node(pathname, an_analysis)
118         self.add_md5s(filename, fileNode, analysis_dir)
119         self.add_fastq_metadata(filename, fileNode)
120         self.model.add_statement(
121             RDF.Statement(fileNode,
122                           rdfNS['type'],
123                           file_type))
124         LOGGER.debug("Done.")
125
126     def make_file_node(self, pathname, submissionNode):
127         """Create file node and attach it to its submission.
128         """
129         # add file specific information
130         path, filename = os.path.split(pathname)
131         fileNode = RDF.Node(RDF.Uri('file://'+ os.path.abspath(pathname)))
132         self.model.add_statement(
133             RDF.Statement(submissionNode,
134                           dafTermOntology['has_file'],
135                           fileNode))
136         self.model.add_statement(
137             RDF.Statement(fileNode,
138                           dafTermOntology['filename'],
139                           filename))
140         return fileNode
141
142     def add_md5s(self, filename, fileNode, analysis_dir):
143         LOGGER.debug("Updating file md5sum")
144         submission_pathname = os.path.join(analysis_dir, filename)
145         md5 = make_md5sum(submission_pathname)
146         if md5 is None:
147             errmsg = "Unable to produce md5sum for {0}"
148             LOGGER.warning(errmsg.format(submission_pathname))
149         else:
150             self.model.add_statement(
151                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
152
153     def add_fastq_metadata(self, filename, fileNode):
154         # How should I detect if this is actually a fastq file?
155         try:
156             fqname = FastqName(filename=filename)
157         except ValueError:
158             # currently its just ignore it if the fastq name parser fails
159             return
160         
161         terms = [('flowcell', libraryOntology['flowcell_id']),
162                  ('lib_id', libraryOntology['library_id']),
163                  ('lane', libraryOntology['lane_number']),
164                  ('read', libraryOntology['read']),
165                  ('cycle', libraryOntology['read_length'])]
166         for file_term, model_term in terms:
167             value = fqname.get(file_term)
168             if value is not None:
169                 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
170                 self.model.append(s)
171
172     def _add_library_details_to_model(self, libNode):
173         # attributes that can have multiple values
174         set_attributes = set((libraryOntology['has_lane'],
175                               libraryOntology['has_mappings'],
176                               dafTermOntology['has_file']))
177         parser = RDF.Parser(name='rdfa')
178         new_statements = parser.parse_as_stream(libNode.uri)
179         toadd = []
180         for s in new_statements:
181             # always add "collections"
182             if s.predicate in set_attributes:
183                 toadd.append(s)
184                 continue
185             # don't override things we already have in the model
186             targets = list(self.model.get_targets(s.subject, s.predicate))
187             if len(targets) == 0:
188                 toadd.append(s)
189
190         for s in toadd:
191             self.model.append(s)
192
193         self._add_lane_details(libNode)
194
195     def _add_lane_details(self, libNode):
196         """Import lane details
197         """
198         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
199         lanes = []
200         for lane_stmt in self.model.find_statements(query):
201             lanes.append(lane_stmt.object)
202
203         parser = RDF.Parser(name='rdfa')
204         for lane in lanes:
205             LOGGER.debug("Importing %s" % (lane.uri,))
206             try:
207                 parser.parse_into_model(self.model, lane.uri)
208             except RDF.RedlandError, e:
209                 LOGGER.error("Error accessing %s" % (lane.uri,))
210                 raise e
211
212
213     def find_best_match(self, filename):
214         """Search through potential filename matching patterns
215         """
216         if self.__view_map is None:
217             self.__view_map = self._get_filename_view_map()
218
219         results = []
220         for pattern, view in self.__view_map.items():
221             if re.match(pattern, filename):
222                 results.append(view)
223
224         if len(results) > 1:
225             msg = "%s matched multiple views %s" % (
226                 filename,
227                 [str(x) for x in results])
228             raise ModelException(msg)
229         elif len(results) == 1:
230             return results[0]
231         else:
232             return None
233
234     def _get_filename_view_map(self):
235         """Query our model for filename patterns
236
237         return a dictionary of compiled regular expressions to view names
238         """
239         filename_query = RDF.Statement(
240             None, dafTermOntology['filename_re'], None)
241
242         patterns = {}
243         for s in self.model.find_statements(filename_query):
244             view_name = s.subject
245             literal_re = s.object.literal_value['string']
246             LOGGER.debug("Found: %s" % (literal_re,))
247             try:
248                 filename_re = re.compile(literal_re)
249             except re.error, e:
250                 LOGGER.error("Unable to compile: %s" % (literal_re,))
251             patterns[literal_re] = view_name
252         return patterns
253
254     def make_submission_name(self, analysis_dir):
255         analysis_dir = os.path.normpath(analysis_dir)
256         analysis_dir_name = os.path.split(analysis_dir)[1]
257         if len(analysis_dir_name) == 0:
258             raise RuntimeError(
259                 "Submission dir name too short: {0}".format(analysis_dir))
260         return analysis_dir_name
261
262     def get_submission_node(self, analysis_dir):
263         """Convert a submission directory name to a submission node
264         """
265         submission_name = self.make_submission_name(analysis_dir)
266         return self.submissionSetNS[submission_name]
267
268     def _get_library_attribute(self, libNode, attribute):
269         if not isinstance(attribute, RDF.Node):
270             attribute = libraryOntology[attribute]
271
272         targets = list(self.model.get_targets(libNode, attribute))
273         if len(targets) > 0:
274             return self._format_library_attribute(targets)
275         else:
276             return None
277
278         #targets = self._search_same_as(libNode, attribute)
279         #if targets is not None:
280         #    return self._format_library_attribute(targets)
281
282         # we don't know anything about this attribute
283         self._add_library_details_to_model(libNode)
284
285         targets = list(self.model.get_targets(libNode, attribute))
286         if len(targets) > 0:
287             return self._format_library_attribute(targets)
288
289         return None
290
291     def _format_library_attribute(self, targets):
292         if len(targets) == 0:
293             return None
294         elif len(targets) == 1:
295             return fromTypedNode(targets[0])
296         elif len(targets) > 1:
297             return [fromTypedNode(t) for t in targets]
298
299     def _is_paired(self, libNode):
300         """Determine if a library is paired end"""
301         library_type = self._get_library_attribute(libNode, 'library_type')
302         if library_type is None:
303             errmsg = "%s doesn't have a library type"
304             raise ModelException(errmsg % (str(libNode),))
305
306         single = ['CSHL (lacking last nt)',
307                   'Single End (non-multiplexed)',
308                   'Small RNA (non-multiplexed)',]
309         paired = ['Barcoded Illumina',
310                   'Multiplexing',
311                   'Nextera',
312                   'Paired End (non-multiplexed)',]
313         if library_type in single:
314             return False
315         elif library_type in paired:
316             return True
317         else:
318             raise MetadataLookupException(
319                 "Unrecognized library type %s for %s" % \
320                 (library_type, str(libNode)))
321
322     def execute_query(self, template, context):
323         """Execute the query, returning the results
324         """
325         formatted_query = template.render(context)
326         LOGGER.debug(formatted_query)
327         query = RDF.SPARQLQuery(str(formatted_query))
328         rdfstream = query.execute(self.model)
329         results = []
330         for record in rdfstream:
331             d = {}
332             for key, value in record.items():
333                 d[key] = fromTypedNode(value)
334             results.append(d)
335         return results