50e822877bee7dcdeccc89226ffc7707a7a902c3
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      dump_model, \
11      fromTypedNode, \
12      strip_namespace, \
13      toTypedNode
14 from htsworkflow.util.rdfns import (
15     dafTermOntology,
16     libraryOntology,
17     rdfNS,
18     rdfsNS,
19     submissionLog,
20     submissionOntology,
21 )
22 from htsworkflow.util.hashfile import make_md5sum
23 from htsworkflow.submission.fastqname import FastqName
24 from htsworkflow.submission.daf import \
25      MetadataLookupException, \
26      ModelException, \
27      get_submission_uri
28 from htsworkflow.util import opener
29
30 from django.template import Context, Template, loader
31
32 LOGGER = logging.getLogger(__name__)
33
34 class Submission(object):
35     def __init__(self, name, model, host):
36         self.name = name
37         self.model = model
38
39         self.submissionSet = get_submission_uri(self.name)
40         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
41         self.libraryNS = RDF.NS('{0}/library/'.format(host))
42         self.flowcellNS = RDF.NS('{0}/flowcell/'.format(host))
43
44         self.__view_map = None
45
46     def scan_submission_dirs(self, result_map):
47         """Examine files in our result directory
48         """
49         for lib_id, result_dir in result_map.items():
50             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
51             try:
52                 self.import_analysis_dir(result_dir, lib_id)
53             except MetadataLookupException as e:
54                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
55
56     def import_analysis_dir(self, analysis_dir, library_id):
57         """Import a submission directories and update our model as needed
58         """
59         #attributes = get_filename_attribute_map(paired)
60         libNode = self.libraryNS[library_id + "/"]
61
62         self._add_library_details_to_model(libNode)
63
64         submission_files = os.listdir(analysis_dir)
65         for filename in submission_files:
66             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
67             self.construct_file_attributes(analysis_dir, libNode, pathname)
68
69     def analysis_nodes(self, result_map):
70         """Return an iterable of analysis nodes
71         """
72         for result_dir in result_map.values():
73             an_analysis = self.get_submission_node(result_dir)
74             yield an_analysis
75
76     def construct_file_attributes(self, analysis_dir, libNode, pathname):
77         """Looking for the best extension
78         The 'best' is the longest match
79
80         :Args:
81         filename (str): the filename whose extention we are about to examine
82         """
83         path, filename = os.path.split(pathname)
84
85         LOGGER.debug("Searching for view")
86         file_type = self.find_best_match(filename)
87         if file_type is None:
88             LOGGER.warn("Unrecognized file: {0}".format(pathname))
89             return None
90         if str(file_type) == str(libraryOntology['ignore']):
91             return None
92
93         an_analysis_name = self.make_submission_name(analysis_dir)
94         an_analysis = self.get_submission_node(analysis_dir)
95         file_classification = self.model.get_target(file_type,
96                                                     rdfNS['type'])
97         if file_classification is None:
98             errmsg = 'Could not find class for {0}'
99             LOGGER.warning(errmsg.format(str(file_type)))
100             return
101
102         self.model.add_statement(
103             RDF.Statement(self.submissionSetNS[''],
104                           submissionOntology['has_submission'],
105                           an_analysis))
106         self.model.add_statement(RDF.Statement(an_analysis,
107                                                submissionOntology['name'],
108                                                toTypedNode(an_analysis_name)))
109         self.model.add_statement(
110             RDF.Statement(an_analysis,
111                           rdfNS['type'],
112                           submissionOntology['submission']))
113         self.model.add_statement(RDF.Statement(an_analysis,
114                                                submissionOntology['library'],
115                                                libNode))
116
117         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
118         # add track specific information
119         self.model.add_statement(
120             RDF.Statement(an_analysis,
121                           dafTermOntology['paired'],
122                           toTypedNode(self._is_paired(libNode))))
123         self.model.add_statement(
124             RDF.Statement(an_analysis,
125                           dafTermOntology['submission'],
126                           an_analysis))
127
128         # add file specific information
129         fileNode = self.make_file_node(pathname, an_analysis)
130         self.add_md5s(filename, fileNode, analysis_dir)
131         self.add_file_size(filename, fileNode, analysis_dir)
132         self.add_read_length(filename, fileNode, analysis_dir)
133         self.add_fastq_metadata(filename, fileNode)
134         self.add_label(file_type, fileNode, libNode)
135         self.model.add_statement(
136             RDF.Statement(fileNode,
137                           rdfNS['type'],
138                           file_type))
139         self.model.add_statement(
140             RDF.Statement(fileNode,
141                           libraryOntology['library'],
142                           libNode))
143
144         LOGGER.debug("Done.")
145
146     def make_file_node(self, pathname, submissionNode):
147         """Create file node and attach it to its submission.
148         """
149         # add file specific information
150         path, filename = os.path.split(pathname)
151         pathname = os.path.abspath(pathname)
152         fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
153         self.model.add_statement(
154             RDF.Statement(submissionNode,
155                           dafTermOntology['has_file'],
156                           fileNode))
157         self.model.add_statement(
158             RDF.Statement(fileNode,
159                           dafTermOntology['filename'],
160                           filename))
161         self.model.add_statement(
162             RDF.Statement(fileNode,
163                           dafTermOntology['relative_path'],
164                           os.path.relpath(pathname)))
165         return fileNode
166
167     def add_md5s(self, filename, fileNode, analysis_dir):
168         LOGGER.debug("Updating file md5sum")
169         submission_pathname = os.path.join(analysis_dir, filename)
170         md5 = make_md5sum(submission_pathname)
171         if md5 is None:
172             errmsg = "Unable to produce md5sum for {0}"
173             LOGGER.warning(errmsg.format(submission_pathname))
174         else:
175             self.model.add_statement(
176                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
177
178     def add_file_size(self, filename, fileNode, analysis_dir):
179         submission_pathname = os.path.join(analysis_dir, filename)
180         file_size = os.stat(submission_pathname).st_size
181         self.model.add_statement(
182             RDF.Statement(fileNode, dafTermOntology['file_size'], toTypedNode(file_size)))
183         LOGGER.debug("Updating file size: %d", file_size)
184
185     def add_read_length(self, filename, fileNode, analysis_dir):
186         submission_pathname = os.path.join(analysis_dir, filename)
187         stream = opener.autoopen(submission_pathname, 'rt')
188         header = stream.readline().strip()
189         sequence = stream.readline().strip()
190         read_length = len(sequence)
191         self.model.add_statement(
192             RDF.Statement(fileNode,
193                           libraryOntology['read_length'],
194                           toTypedNode(read_length))
195         )
196         LOGGER.debug("Updating read length: %d", read_length)
197
198     def add_fastq_metadata(self, filename, fileNode):
199         # How should I detect if this is actually a fastq file?
200         try:
201             fqname = FastqName(filename=filename)
202         except ValueError:
203             # currently its just ignore it if the fastq name parser fails
204             return
205
206         terms = [('flowcell', libraryOntology['flowcell_id']),
207                  ('lib_id', libraryOntology['library_id']),
208                  ('lane', libraryOntology['lane_number']),
209                  ('read', libraryOntology['read']),
210         ]
211         for file_term, model_term in terms:
212             value = fqname.get(file_term)
213             if value is not None:
214                 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
215                 self.model.append(s)
216
217         if 'flowcell' in fqname:
218             value = self.flowcellNS[fqname['flowcell'] + '/']
219             s = RDF.Statement(fileNode, libraryOntology['flowcell'], value)
220             self.model.append(s)
221
222     def add_label(self, file_type, file_node, lib_node):
223         """Add rdfs:label to a file node
224         """
225         #template_term = libraryOntology['label_template']
226         template_term = libraryOntology['label_template']
227         label_template = self.model.get_target(file_type, template_term)
228         if label_template:
229             template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
230             context = Context({
231                 'library': str(lib_node.uri),
232                 })
233             for r in self.execute_query(template, context):
234                 context = Context(r)
235                 label = Template(label_template).render(context)
236                 s = RDF.Statement(file_node, rdfsNS['label'], unicode(label))
237                 self.model.append(s)
238
239     def _add_library_details_to_model(self, libNode):
240         # attributes that can have multiple values
241         set_attributes = set((libraryOntology['has_lane'],
242                               libraryOntology['has_mappings'],
243                               dafTermOntology['has_file']))
244         parser = RDF.Parser(name='rdfa')
245         try:
246             new_statements = parser.parse_as_stream(libNode.uri)
247         except RDF.RedlandError as e:
248             LOGGER.error(e)
249             return
250         LOGGER.debug("Scanning %s", str(libNode.uri))
251         toadd = []
252         for s in new_statements:
253             # always add "collections"
254             if s.predicate in set_attributes:
255                 toadd.append(s)
256                 continue
257             # don't override things we already have in the model
258             targets = list(self.model.get_targets(s.subject, s.predicate))
259             if len(targets) == 0:
260                 toadd.append(s)
261
262         for s in toadd:
263             self.model.append(s)
264
265         self._add_lane_details(libNode)
266         self._add_flowcell_details()
267
268     def _add_lane_details(self, libNode):
269         """Import lane details
270         """
271         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
272         lanes = []
273         for lane_stmt in self.model.find_statements(query):
274             lanes.append(lane_stmt.object)
275
276         parser = RDF.Parser(name='rdfa')
277         for lane in lanes:
278             LOGGER.debug("Importing %s" % (lane.uri,))
279             try:
280                 parser.parse_into_model(self.model, lane.uri)
281             except RDF.RedlandError as e:
282                 LOGGER.error("Error accessing %s" % (lane.uri,))
283                 raise e
284
285
286     def _add_flowcell_details(self):
287         template = loader.get_template('aws_flowcell.sparql')
288
289         parser = RDF.Parser(name='rdfa')
290         for r in self.execute_query(template, Context()):
291             flowcell = r['flowcell']
292             try:
293                 parser.parse_into_model(self.model, flowcell.uri)
294             except RDF.RedlandError as e:
295                 LOGGER.error("Error accessing %s" % (str(flowcell)))
296                 raise e
297
298
299     def find_best_match(self, filename):
300         """Search through potential filename matching patterns
301         """
302         if self.__view_map is None:
303             self.__view_map = self._get_filename_view_map()
304
305         results = []
306         for pattern, view in self.__view_map.items():
307             if re.match(pattern, filename):
308                 results.append(view)
309
310         if len(results) > 1:
311             msg = "%s matched multiple views %s" % (
312                 filename,
313                 [str(x) for x in results])
314             raise ModelException(msg)
315         elif len(results) == 1:
316             return results[0]
317         else:
318             return None
319
320     def _get_filename_view_map(self):
321         """Query our model for filename patterns
322
323         return a dictionary of compiled regular expressions to view names
324         """
325         filename_query = RDF.Statement(
326             None, dafTermOntology['filename_re'], None)
327
328         patterns = {}
329         for s in self.model.find_statements(filename_query):
330             view_name = s.subject
331             literal_re = s.object.literal_value['string']
332             LOGGER.debug("Found: %s" % (literal_re,))
333             try:
334                 filename_re = re.compile(literal_re)
335             except re.error as e:
336                 LOGGER.error("Unable to compile: %s" % (literal_re,))
337             patterns[literal_re] = view_name
338         return patterns
339
340     def make_submission_name(self, analysis_dir):
341         analysis_dir = os.path.normpath(analysis_dir)
342         analysis_dir_name = os.path.split(analysis_dir)[1]
343         if len(analysis_dir_name) == 0:
344             raise RuntimeError(
345                 "Submission dir name too short: {0}".format(analysis_dir))
346         return analysis_dir_name
347
348     def get_submission_node(self, analysis_dir):
349         """Convert a submission directory name to a submission node
350         """
351         submission_name = self.make_submission_name(analysis_dir)
352         return self.submissionSetNS[submission_name]
353
354     def _get_library_attribute(self, libNode, attribute):
355         if not isinstance(attribute, RDF.Node):
356             attribute = libraryOntology[attribute]
357
358         targets = list(self.model.get_targets(libNode, attribute))
359         if len(targets) > 0:
360             return self._format_library_attribute(targets)
361         else:
362             return None
363
364         #targets = self._search_same_as(libNode, attribute)
365         #if targets is not None:
366         #    return self._format_library_attribute(targets)
367
368         # we don't know anything about this attribute
369         self._add_library_details_to_model(libNode)
370
371         targets = list(self.model.get_targets(libNode, attribute))
372         if len(targets) > 0:
373             return self._format_library_attribute(targets)
374
375         return None
376
377     def _format_library_attribute(self, targets):
378         if len(targets) == 0:
379             return None
380         elif len(targets) == 1:
381             return fromTypedNode(targets[0])
382         elif len(targets) > 1:
383             return [fromTypedNode(t) for t in targets]
384
385     def _is_paired(self, libNode):
386         """Determine if a library is paired end"""
387         library_type = self._get_library_attribute(libNode, 'library_type')
388         if library_type is None:
389             errmsg = "%s doesn't have a library type"
390             raise ModelException(errmsg % (str(libNode),))
391
392         single = ['CSHL (lacking last nt)',
393                   'Single End (non-multiplexed)',
394                   'Small RNA (non-multiplexed)',]
395         paired = ['Barcoded Illumina',
396                   'Multiplexing',
397                   'NEBNext Multiplexed',
398                   'NEBNext Small RNA',
399                   'Nextera',
400                   'Paired End (non-multiplexed)',
401                   'Dual Index Illumina',]
402         if library_type in single:
403             return False
404         elif library_type in paired:
405             return True
406         else:
407             raise MetadataLookupException(
408                 "Unrecognized library type %s for %s" % \
409                 (library_type, str(libNode)))
410
411     def execute_query(self, template, context):
412         """Execute the query, returning the results
413         """
414         formatted_query = template.render(context)
415         LOGGER.debug(formatted_query)
416         query = RDF.SPARQLQuery(str(formatted_query))
417         rdfstream = query.execute(self.model)
418         results = []
419         for record in rdfstream:
420             d = {}
421             for key, value in record.items():
422                 d[key] = fromTypedNode(value)
423             results.append(d)
424         return results
425
426
427 def list_submissions(model):
428     """Return generator of submissions in this model.
429     """
430     query_body = """
431       PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
432
433       select distinct ?submission
434       where { ?submission subns:has_submission ?library_dir }
435     """
436     query = RDF.SPARQLQuery(query_body)
437     rdfstream = query.execute(model)
438     for row in rdfstream:
439         s = strip_namespace(submissionLog, row['submission'])
440         if s[-1] in ['#', '/', '?']:
441             s = s[:-1]
442         yield s