remove some trailing whitespace
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dump_model, \
12      fromTypedNode, \
13      get_model, \
14      strip_namespace, \
15      toTypedNode
16 from htsworkflow.util.rdfns import *
17 from htsworkflow.util.hashfile import make_md5sum
18 from htsworkflow.submission.fastqname import FastqName
19 from htsworkflow.submission.daf import \
20      MetadataLookupException, \
21      ModelException, \
22      get_submission_uri
23
24 from django.conf import settings
25 from django.template import Context, Template, loader
26
27 LOGGER = logging.getLogger(__name__)
28
29 class Submission(object):
30     def __init__(self, name, model, host):
31         self.name = name
32         self.model = model
33
34         self.submissionSet = get_submission_uri(self.name)
35         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
36         self.libraryNS = RDF.NS('{0}/library/'.format(host))
37
38         self.__view_map = None
39
40     def scan_submission_dirs(self, result_map):
41         """Examine files in our result directory
42         """
43         for lib_id, result_dir in result_map.items():
44             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
45             try:
46                 self.import_analysis_dir(result_dir, lib_id)
47             except MetadataLookupException, e:
48                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
49
50     def import_analysis_dir(self, analysis_dir, library_id):
51         """Import a submission directories and update our model as needed
52         """
53         #attributes = get_filename_attribute_map(paired)
54         libNode = self.libraryNS[library_id + "/"]
55
56         self._add_library_details_to_model(libNode)
57
58         submission_files = os.listdir(analysis_dir)
59         for filename in submission_files:
60             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
61             self.construct_file_attributes(analysis_dir, libNode, pathname)
62
63     def analysis_nodes(self, result_map):
64         """Return an iterable of analysis nodes
65         """
66         for result_dir in result_map.values():
67             an_analysis = self.get_submission_node(result_dir)
68             yield an_analysis
69
70     def construct_file_attributes(self, analysis_dir, libNode, pathname):
71         """Looking for the best extension
72         The 'best' is the longest match
73
74         :Args:
75         filename (str): the filename whose extention we are about to examine
76         """
77         path, filename = os.path.split(pathname)
78
79         LOGGER.debug("Searching for view")
80         file_type = self.find_best_match(filename)
81         if file_type is None:
82             LOGGER.warn("Unrecognized file: {0}".format(pathname))
83             return None
84         if str(file_type) == str(libraryOntology['ignore']):
85             return None
86
87         an_analysis_name = self.make_submission_name(analysis_dir)
88         an_analysis = self.get_submission_node(analysis_dir)
89         an_analysis_uri = str(an_analysis.uri)
90         file_classification = self.model.get_target(file_type,
91                                                     rdfNS['type'])
92         if file_classification is None:
93             errmsg = 'Could not find class for {0}'
94             LOGGER.warning(errmsg.format(str(file_type)))
95             return
96
97         self.model.add_statement(
98             RDF.Statement(self.submissionSetNS[''],
99                           submissionOntology['has_submission'],
100                           an_analysis))
101         self.model.add_statement(RDF.Statement(an_analysis,
102                                                submissionOntology['name'],
103                                                toTypedNode(an_analysis_name)))
104         self.model.add_statement(
105             RDF.Statement(an_analysis,
106                           rdfNS['type'],
107                           submissionOntology['submission']))
108         self.model.add_statement(RDF.Statement(an_analysis,
109                                                submissionOntology['library'],
110                                                libNode))
111
112         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
113         # add track specific information
114         self.model.add_statement(
115             RDF.Statement(an_analysis,
116                           dafTermOntology['paired'],
117                           toTypedNode(self._is_paired(libNode))))
118         self.model.add_statement(
119             RDF.Statement(an_analysis,
120                           dafTermOntology['submission'],
121                           an_analysis))
122
123         # add file specific information
124         fileNode = self.make_file_node(pathname, an_analysis)
125         self.add_md5s(filename, fileNode, analysis_dir)
126         self.add_fastq_metadata(filename, fileNode)
127         self.add_label(file_type, fileNode, libNode)
128         self.model.add_statement(
129             RDF.Statement(fileNode,
130                           rdfNS['type'],
131                           file_type))
132         self.model.add_statement(
133             RDF.Statement(fileNode,
134                           libraryOntology['library'],
135                           libNode))
136
137         LOGGER.debug("Done.")
138
139     def make_file_node(self, pathname, submissionNode):
140         """Create file node and attach it to its submission.
141         """
142         # add file specific information
143         path, filename = os.path.split(pathname)
144         pathname = os.path.abspath(pathname)
145         fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
146         self.model.add_statement(
147             RDF.Statement(submissionNode,
148                           dafTermOntology['has_file'],
149                           fileNode))
150         self.model.add_statement(
151             RDF.Statement(fileNode,
152                           dafTermOntology['filename'],
153                           filename))
154         self.model.add_statement(
155             RDF.Statement(fileNode,
156                           dafTermOntology['relative_path'],
157                           os.path.relpath(pathname)))
158         return fileNode
159
160     def add_md5s(self, filename, fileNode, analysis_dir):
161         LOGGER.debug("Updating file md5sum")
162         submission_pathname = os.path.join(analysis_dir, filename)
163         md5 = make_md5sum(submission_pathname)
164         if md5 is None:
165             errmsg = "Unable to produce md5sum for {0}"
166             LOGGER.warning(errmsg.format(submission_pathname))
167         else:
168             self.model.add_statement(
169                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
170
171     def add_fastq_metadata(self, filename, fileNode):
172         # How should I detect if this is actually a fastq file?
173         try:
174             fqname = FastqName(filename=filename)
175         except ValueError:
176             # currently its just ignore it if the fastq name parser fails
177             return
178
179         terms = [('flowcell', libraryOntology['flowcell_id']),
180                  ('lib_id', libraryOntology['library_id']),
181                  ('lane', libraryOntology['lane_number']),
182                  ('read', libraryOntology['read']),
183                  ('cycle', libraryOntology['read_length'])]
184         for file_term, model_term in terms:
185             value = fqname.get(file_term)
186             if value is not None:
187                 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
188                 self.model.append(s)
189
190     def add_label(self, file_type, file_node, lib_node):
191         """Add rdfs:label to a file node
192         """
193         #template_term = libraryOntology['label_template']
194         template_term = libraryOntology['label_template']
195         label_template = self.model.get_target(file_type, template_term)
196         if label_template:
197             template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
198             context = Context({
199                 'library': str(lib_node.uri),
200                 })
201             for r in self.execute_query(template, context):
202                 context = Context(r)
203                 label = Template(label_template).render(context)
204                 s = RDF.Statement(file_node, rdfsNS['label'], unicode(label))
205                 self.model.append(s)
206
207     def _add_library_details_to_model(self, libNode):
208         # attributes that can have multiple values
209         set_attributes = set((libraryOntology['has_lane'],
210                               libraryOntology['has_mappings'],
211                               dafTermOntology['has_file']))
212         parser = RDF.Parser(name='rdfa')
213         try:
214             new_statements = parser.parse_as_stream(libNode.uri)
215         except RDF.RedlandError as e:
216             LOGGER.error(e)
217             return
218         LOGGER.debug("Scanning %s", str(libNode.uri))
219         toadd = []
220         for s in new_statements:
221             # always add "collections"
222             if s.predicate in set_attributes:
223                 toadd.append(s)
224                 continue
225             # don't override things we already have in the model
226             targets = list(self.model.get_targets(s.subject, s.predicate))
227             if len(targets) == 0:
228                 toadd.append(s)
229
230         for s in toadd:
231             self.model.append(s)
232
233         self._add_lane_details(libNode)
234
235     def _add_lane_details(self, libNode):
236         """Import lane details
237         """
238         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
239         lanes = []
240         for lane_stmt in self.model.find_statements(query):
241             lanes.append(lane_stmt.object)
242
243         parser = RDF.Parser(name='rdfa')
244         for lane in lanes:
245             LOGGER.debug("Importing %s" % (lane.uri,))
246             try:
247                 parser.parse_into_model(self.model, lane.uri)
248             except RDF.RedlandError, e:
249                 LOGGER.error("Error accessing %s" % (lane.uri,))
250                 raise e
251
252
253     def find_best_match(self, filename):
254         """Search through potential filename matching patterns
255         """
256         if self.__view_map is None:
257             self.__view_map = self._get_filename_view_map()
258
259         results = []
260         for pattern, view in self.__view_map.items():
261             if re.match(pattern, filename):
262                 results.append(view)
263
264         if len(results) > 1:
265             msg = "%s matched multiple views %s" % (
266                 filename,
267                 [str(x) for x in results])
268             raise ModelException(msg)
269         elif len(results) == 1:
270             return results[0]
271         else:
272             return None
273
274     def _get_filename_view_map(self):
275         """Query our model for filename patterns
276
277         return a dictionary of compiled regular expressions to view names
278         """
279         filename_query = RDF.Statement(
280             None, dafTermOntology['filename_re'], None)
281
282         patterns = {}
283         for s in self.model.find_statements(filename_query):
284             view_name = s.subject
285             literal_re = s.object.literal_value['string']
286             LOGGER.debug("Found: %s" % (literal_re,))
287             try:
288                 filename_re = re.compile(literal_re)
289             except re.error, e:
290                 LOGGER.error("Unable to compile: %s" % (literal_re,))
291             patterns[literal_re] = view_name
292         return patterns
293
294     def make_submission_name(self, analysis_dir):
295         analysis_dir = os.path.normpath(analysis_dir)
296         analysis_dir_name = os.path.split(analysis_dir)[1]
297         if len(analysis_dir_name) == 0:
298             raise RuntimeError(
299                 "Submission dir name too short: {0}".format(analysis_dir))
300         return analysis_dir_name
301
302     def get_submission_node(self, analysis_dir):
303         """Convert a submission directory name to a submission node
304         """
305         submission_name = self.make_submission_name(analysis_dir)
306         return self.submissionSetNS[submission_name]
307
308     def _get_library_attribute(self, libNode, attribute):
309         if not isinstance(attribute, RDF.Node):
310             attribute = libraryOntology[attribute]
311
312         targets = list(self.model.get_targets(libNode, attribute))
313         if len(targets) > 0:
314             return self._format_library_attribute(targets)
315         else:
316             return None
317
318         #targets = self._search_same_as(libNode, attribute)
319         #if targets is not None:
320         #    return self._format_library_attribute(targets)
321
322         # we don't know anything about this attribute
323         self._add_library_details_to_model(libNode)
324
325         targets = list(self.model.get_targets(libNode, attribute))
326         if len(targets) > 0:
327             return self._format_library_attribute(targets)
328
329         return None
330
331     def _format_library_attribute(self, targets):
332         if len(targets) == 0:
333             return None
334         elif len(targets) == 1:
335             return fromTypedNode(targets[0])
336         elif len(targets) > 1:
337             return [fromTypedNode(t) for t in targets]
338
339     def _is_paired(self, libNode):
340         """Determine if a library is paired end"""
341         library_type = self._get_library_attribute(libNode, 'library_type')
342         if library_type is None:
343             errmsg = "%s doesn't have a library type"
344             raise ModelException(errmsg % (str(libNode),))
345
346         single = ['CSHL (lacking last nt)',
347                   'Single End (non-multiplexed)',
348                   'Small RNA (non-multiplexed)',]
349         paired = ['Barcoded Illumina',
350                   'Multiplexing',
351                   'NEBNext Multiplexed',
352                   'NEBNext Small RNA',
353                   'Nextera',
354                   'Paired End (non-multiplexed)',
355                   'Dual Index Illumina',]
356         if library_type in single:
357             return False
358         elif library_type in paired:
359             return True
360         else:
361             raise MetadataLookupException(
362                 "Unrecognized library type %s for %s" % \
363                 (library_type, str(libNode)))
364
365     def execute_query(self, template, context):
366         """Execute the query, returning the results
367         """
368         formatted_query = template.render(context)
369         LOGGER.debug(formatted_query)
370         query = RDF.SPARQLQuery(str(formatted_query))
371         rdfstream = query.execute(self.model)
372         results = []
373         for record in rdfstream:
374             d = {}
375             for key, value in record.items():
376                 d[key] = fromTypedNode(value)
377             results.append(d)
378         return results
379
380
381 def list_submissions(model):
382     """Return generator of submissions in this model.
383     """
384     query_body = """
385       PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
386
387       select distinct ?submission
388       where { ?submission subns:has_submission ?library_dir }
389     """
390     query = RDF.SPARQLQuery(query_body)
391     rdfstream = query.execute(model)
392     for row in rdfstream:
393         s = strip_namespace(submissionLog, row['submission'])
394         if s[-1] in ['#', '/', '?']:
395             s = s[:-1]
396         yield s