Merge branch 'django1.7' of mus.cacr.caltech.edu:htsworkflow into django1.7
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dump_model, \
12      fromTypedNode, \
13      get_model, \
14      strip_namespace, \
15      toTypedNode
16 from htsworkflow.util.rdfns import *
17 from htsworkflow.util.hashfile import make_md5sum
18 from htsworkflow.submission.fastqname import FastqName
19 from htsworkflow.submission.daf import \
20      MetadataLookupException, \
21      ModelException, \
22      get_submission_uri
23
24 from django.conf import settings
25 from django.template import Context, Template, loader
26
27 LOGGER = logging.getLogger(__name__)
28
29 class Submission(object):
30     def __init__(self, name, model, host):
31         self.name = name
32         self.model = model
33
34         self.submissionSet = get_submission_uri(self.name)
35         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
36         self.libraryNS = RDF.NS('{0}/library/'.format(host))
37         self.flowcellNS = RDF.NS('{0}/flowcell/'.format(host))
38
39         self.__view_map = None
40
41     def scan_submission_dirs(self, result_map):
42         """Examine files in our result directory
43         """
44         for lib_id, result_dir in result_map.items():
45             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
46             try:
47                 self.import_analysis_dir(result_dir, lib_id)
48             except MetadataLookupException as e:
49                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
50
51     def import_analysis_dir(self, analysis_dir, library_id):
52         """Import a submission directories and update our model as needed
53         """
54         #attributes = get_filename_attribute_map(paired)
55         libNode = self.libraryNS[library_id + "/"]
56
57         self._add_library_details_to_model(libNode)
58
59         submission_files = os.listdir(analysis_dir)
60         for filename in submission_files:
61             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
62             self.construct_file_attributes(analysis_dir, libNode, pathname)
63
64     def analysis_nodes(self, result_map):
65         """Return an iterable of analysis nodes
66         """
67         for result_dir in result_map.values():
68             an_analysis = self.get_submission_node(result_dir)
69             yield an_analysis
70
71     def construct_file_attributes(self, analysis_dir, libNode, pathname):
72         """Looking for the best extension
73         The 'best' is the longest match
74
75         :Args:
76         filename (str): the filename whose extention we are about to examine
77         """
78         path, filename = os.path.split(pathname)
79
80         LOGGER.debug("Searching for view")
81         file_type = self.find_best_match(filename)
82         if file_type is None:
83             LOGGER.warn("Unrecognized file: {0}".format(pathname))
84             return None
85         if str(file_type) == str(libraryOntology['ignore']):
86             return None
87
88         an_analysis_name = self.make_submission_name(analysis_dir)
89         an_analysis = self.get_submission_node(analysis_dir)
90         an_analysis_uri = str(an_analysis.uri)
91         file_classification = self.model.get_target(file_type,
92                                                     rdfNS['type'])
93         if file_classification is None:
94             errmsg = 'Could not find class for {0}'
95             LOGGER.warning(errmsg.format(str(file_type)))
96             return
97
98         self.model.add_statement(
99             RDF.Statement(self.submissionSetNS[''],
100                           submissionOntology['has_submission'],
101                           an_analysis))
102         self.model.add_statement(RDF.Statement(an_analysis,
103                                                submissionOntology['name'],
104                                                toTypedNode(an_analysis_name)))
105         self.model.add_statement(
106             RDF.Statement(an_analysis,
107                           rdfNS['type'],
108                           submissionOntology['submission']))
109         self.model.add_statement(RDF.Statement(an_analysis,
110                                                submissionOntology['library'],
111                                                libNode))
112
113         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
114         # add track specific information
115         self.model.add_statement(
116             RDF.Statement(an_analysis,
117                           dafTermOntology['paired'],
118                           toTypedNode(self._is_paired(libNode))))
119         self.model.add_statement(
120             RDF.Statement(an_analysis,
121                           dafTermOntology['submission'],
122                           an_analysis))
123
124         # add file specific information
125         fileNode = self.make_file_node(pathname, an_analysis)
126         self.add_md5s(filename, fileNode, analysis_dir)
127         self.add_file_size(filename, fileNode, analysis_dir)
128         self.add_fastq_metadata(filename, fileNode)
129         self.add_label(file_type, fileNode, libNode)
130         self.model.add_statement(
131             RDF.Statement(fileNode,
132                           rdfNS['type'],
133                           file_type))
134         self.model.add_statement(
135             RDF.Statement(fileNode,
136                           libraryOntology['library'],
137                           libNode))
138
139         LOGGER.debug("Done.")
140
141     def make_file_node(self, pathname, submissionNode):
142         """Create file node and attach it to its submission.
143         """
144         # add file specific information
145         path, filename = os.path.split(pathname)
146         pathname = os.path.abspath(pathname)
147         fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
148         self.model.add_statement(
149             RDF.Statement(submissionNode,
150                           dafTermOntology['has_file'],
151                           fileNode))
152         self.model.add_statement(
153             RDF.Statement(fileNode,
154                           dafTermOntology['filename'],
155                           filename))
156         self.model.add_statement(
157             RDF.Statement(fileNode,
158                           dafTermOntology['relative_path'],
159                           os.path.relpath(pathname)))
160         return fileNode
161
162     def add_md5s(self, filename, fileNode, analysis_dir):
163         LOGGER.debug("Updating file md5sum")
164         submission_pathname = os.path.join(analysis_dir, filename)
165         md5 = make_md5sum(submission_pathname)
166         if md5 is None:
167             errmsg = "Unable to produce md5sum for {0}"
168             LOGGER.warning(errmsg.format(submission_pathname))
169         else:
170             self.model.add_statement(
171                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
172
173     def add_file_size(self, filename, fileNode, analysis_dir):
174         LOGGER.debug("Updating file size")
175         submission_pathname = os.path.join(analysis_dir, filename)
176         file_size = os.stat(submission_pathname).st_size
177         self.model.add_statement(
178             RDF.Statement(fileNode, dafTermOntology['file_size'], toTypedNode(file_size)))
179
180     def add_fastq_metadata(self, filename, fileNode):
181         # How should I detect if this is actually a fastq file?
182         try:
183             fqname = FastqName(filename=filename)
184         except ValueError:
185             # currently its just ignore it if the fastq name parser fails
186             return
187
188         terms = [('flowcell', libraryOntology['flowcell_id']),
189                  ('lib_id', libraryOntology['library_id']),
190                  ('lane', libraryOntology['lane_number']),
191                  ('read', libraryOntology['read']),
192                  ('cycle', libraryOntology['read_length'])]
193         for file_term, model_term in terms:
194             value = fqname.get(file_term)
195             if value is not None:
196                 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
197                 self.model.append(s)
198
199         if 'flowcell' in fqname:
200             value = self.flowcellNS[fqname['flowcell'] + '/']
201             s = RDF.Statement(fileNode, libraryOntology['flowcell'], value)
202             self.model.append(s)
203
204     def add_label(self, file_type, file_node, lib_node):
205         """Add rdfs:label to a file node
206         """
207         #template_term = libraryOntology['label_template']
208         template_term = libraryOntology['label_template']
209         label_template = self.model.get_target(file_type, template_term)
210         if label_template:
211             template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
212             context = Context({
213                 'library': str(lib_node.uri),
214                 })
215             for r in self.execute_query(template, context):
216                 context = Context(r)
217                 label = Template(label_template).render(context)
218                 s = RDF.Statement(file_node, rdfsNS['label'], unicode(label))
219                 self.model.append(s)
220
221     def _add_library_details_to_model(self, libNode):
222         # attributes that can have multiple values
223         set_attributes = set((libraryOntology['has_lane'],
224                               libraryOntology['has_mappings'],
225                               dafTermOntology['has_file']))
226         parser = RDF.Parser(name='rdfa')
227         try:
228             new_statements = parser.parse_as_stream(libNode.uri)
229         except RDF.RedlandError as e:
230             LOGGER.error(e)
231             return
232         LOGGER.debug("Scanning %s", str(libNode.uri))
233         toadd = []
234         for s in new_statements:
235             # always add "collections"
236             if s.predicate in set_attributes:
237                 toadd.append(s)
238                 continue
239             # don't override things we already have in the model
240             targets = list(self.model.get_targets(s.subject, s.predicate))
241             if len(targets) == 0:
242                 toadd.append(s)
243
244         for s in toadd:
245             self.model.append(s)
246
247         self._add_lane_details(libNode)
248         self._add_flowcell_details()
249
250     def _add_lane_details(self, libNode):
251         """Import lane details
252         """
253         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
254         lanes = []
255         for lane_stmt in self.model.find_statements(query):
256             lanes.append(lane_stmt.object)
257
258         parser = RDF.Parser(name='rdfa')
259         for lane in lanes:
260             LOGGER.debug("Importing %s" % (lane.uri,))
261             try:
262                 parser.parse_into_model(self.model, lane.uri)
263             except RDF.RedlandError as e:
264                 LOGGER.error("Error accessing %s" % (lane.uri,))
265                 raise e
266
267
268     def _add_flowcell_details(self):
269         template = loader.get_template('aws_flowcell.sparql')
270         results = self.execute_query(template, Context())
271
272         parser = RDF.Parser(name='rdfa')
273         for r in self.execute_query(template, Context()):
274             flowcell = r['flowcell']
275             try:
276                 parser.parse_into_model(self.model, flowcell.uri)
277             except RDF.RedlandError as e:
278                 LOGGER.error("Error accessing %s" % (str(flowcell)))
279                 raise e
280
281
282     def find_best_match(self, filename):
283         """Search through potential filename matching patterns
284         """
285         if self.__view_map is None:
286             self.__view_map = self._get_filename_view_map()
287
288         results = []
289         for pattern, view in self.__view_map.items():
290             if re.match(pattern, filename):
291                 results.append(view)
292
293         if len(results) > 1:
294             msg = "%s matched multiple views %s" % (
295                 filename,
296                 [str(x) for x in results])
297             raise ModelException(msg)
298         elif len(results) == 1:
299             return results[0]
300         else:
301             return None
302
303     def _get_filename_view_map(self):
304         """Query our model for filename patterns
305
306         return a dictionary of compiled regular expressions to view names
307         """
308         filename_query = RDF.Statement(
309             None, dafTermOntology['filename_re'], None)
310
311         patterns = {}
312         for s in self.model.find_statements(filename_query):
313             view_name = s.subject
314             literal_re = s.object.literal_value['string']
315             LOGGER.debug("Found: %s" % (literal_re,))
316             try:
317                 filename_re = re.compile(literal_re)
318             except re.error as e:
319                 LOGGER.error("Unable to compile: %s" % (literal_re,))
320             patterns[literal_re] = view_name
321         return patterns
322
323     def make_submission_name(self, analysis_dir):
324         analysis_dir = os.path.normpath(analysis_dir)
325         analysis_dir_name = os.path.split(analysis_dir)[1]
326         if len(analysis_dir_name) == 0:
327             raise RuntimeError(
328                 "Submission dir name too short: {0}".format(analysis_dir))
329         return analysis_dir_name
330
331     def get_submission_node(self, analysis_dir):
332         """Convert a submission directory name to a submission node
333         """
334         submission_name = self.make_submission_name(analysis_dir)
335         return self.submissionSetNS[submission_name]
336
337     def _get_library_attribute(self, libNode, attribute):
338         if not isinstance(attribute, RDF.Node):
339             attribute = libraryOntology[attribute]
340
341         targets = list(self.model.get_targets(libNode, attribute))
342         if len(targets) > 0:
343             return self._format_library_attribute(targets)
344         else:
345             return None
346
347         #targets = self._search_same_as(libNode, attribute)
348         #if targets is not None:
349         #    return self._format_library_attribute(targets)
350
351         # we don't know anything about this attribute
352         self._add_library_details_to_model(libNode)
353
354         targets = list(self.model.get_targets(libNode, attribute))
355         if len(targets) > 0:
356             return self._format_library_attribute(targets)
357
358         return None
359
360     def _format_library_attribute(self, targets):
361         if len(targets) == 0:
362             return None
363         elif len(targets) == 1:
364             return fromTypedNode(targets[0])
365         elif len(targets) > 1:
366             return [fromTypedNode(t) for t in targets]
367
368     def _is_paired(self, libNode):
369         """Determine if a library is paired end"""
370         library_type = self._get_library_attribute(libNode, 'library_type')
371         if library_type is None:
372             errmsg = "%s doesn't have a library type"
373             raise ModelException(errmsg % (str(libNode),))
374
375         single = ['CSHL (lacking last nt)',
376                   'Single End (non-multiplexed)',
377                   'Small RNA (non-multiplexed)',]
378         paired = ['Barcoded Illumina',
379                   'Multiplexing',
380                   'NEBNext Multiplexed',
381                   'NEBNext Small RNA',
382                   'Nextera',
383                   'Paired End (non-multiplexed)',
384                   'Dual Index Illumina',]
385         if library_type in single:
386             return False
387         elif library_type in paired:
388             return True
389         else:
390             raise MetadataLookupException(
391                 "Unrecognized library type %s for %s" % \
392                 (library_type, str(libNode)))
393
394     def execute_query(self, template, context):
395         """Execute the query, returning the results
396         """
397         formatted_query = template.render(context)
398         LOGGER.debug(formatted_query)
399         query = RDF.SPARQLQuery(str(formatted_query))
400         rdfstream = query.execute(self.model)
401         results = []
402         for record in rdfstream:
403             d = {}
404             for key, value in record.items():
405                 d[key] = fromTypedNode(value)
406             results.append(d)
407         return results
408
409
410 def list_submissions(model):
411     """Return generator of submissions in this model.
412     """
413     query_body = """
414       PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
415
416       select distinct ?submission
417       where { ?submission subns:has_submission ?library_dir }
418     """
419     query = RDF.SPARQLQuery(query_body)
420     rdfstream = query.execute(model)
421     for row in rdfstream:
422         s = strip_namespace(submissionLog, row['submission'])
423         if s[-1] in ['#', '/', '?']:
424             s = s[:-1]
425         yield s