make a more informative log message
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dump_model, \
12      fromTypedNode, \
13      get_model, \
14      strip_namespace, \
15      toTypedNode
16 from htsworkflow.util.rdfns import *
17 from htsworkflow.util.hashfile import make_md5sum
18 from htsworkflow.submission.fastqname import FastqName
19 from htsworkflow.submission.daf import \
20      MetadataLookupException, \
21      ModelException, \
22      get_submission_uri
23 from htsworkflow.util import opener
24
25 from django.conf import settings
26 from django.template import Context, Template, loader
27
28 LOGGER = logging.getLogger(__name__)
29
30 class Submission(object):
31     def __init__(self, name, model, host):
32         self.name = name
33         self.model = model
34
35         self.submissionSet = get_submission_uri(self.name)
36         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
37         self.libraryNS = RDF.NS('{0}/library/'.format(host))
38         self.flowcellNS = RDF.NS('{0}/flowcell/'.format(host))
39
40         self.__view_map = None
41
42     def scan_submission_dirs(self, result_map):
43         """Examine files in our result directory
44         """
45         for lib_id, result_dir in result_map.items():
46             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
47             try:
48                 self.import_analysis_dir(result_dir, lib_id)
49             except MetadataLookupException as e:
50                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
51
52     def import_analysis_dir(self, analysis_dir, library_id):
53         """Import a submission directories and update our model as needed
54         """
55         #attributes = get_filename_attribute_map(paired)
56         libNode = self.libraryNS[library_id + "/"]
57
58         self._add_library_details_to_model(libNode)
59
60         submission_files = os.listdir(analysis_dir)
61         for filename in submission_files:
62             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
63             self.construct_file_attributes(analysis_dir, libNode, pathname)
64
65     def analysis_nodes(self, result_map):
66         """Return an iterable of analysis nodes
67         """
68         for result_dir in result_map.values():
69             an_analysis = self.get_submission_node(result_dir)
70             yield an_analysis
71
72     def construct_file_attributes(self, analysis_dir, libNode, pathname):
73         """Looking for the best extension
74         The 'best' is the longest match
75
76         :Args:
77         filename (str): the filename whose extention we are about to examine
78         """
79         path, filename = os.path.split(pathname)
80
81         LOGGER.debug("Searching for view")
82         file_type = self.find_best_match(filename)
83         if file_type is None:
84             LOGGER.warn("Unrecognized file: {0}".format(pathname))
85             return None
86         if str(file_type) == str(libraryOntology['ignore']):
87             return None
88
89         an_analysis_name = self.make_submission_name(analysis_dir)
90         an_analysis = self.get_submission_node(analysis_dir)
91         an_analysis_uri = str(an_analysis.uri)
92         file_classification = self.model.get_target(file_type,
93                                                     rdfNS['type'])
94         if file_classification is None:
95             errmsg = 'Could not find class for {0}'
96             LOGGER.warning(errmsg.format(str(file_type)))
97             return
98
99         self.model.add_statement(
100             RDF.Statement(self.submissionSetNS[''],
101                           submissionOntology['has_submission'],
102                           an_analysis))
103         self.model.add_statement(RDF.Statement(an_analysis,
104                                                submissionOntology['name'],
105                                                toTypedNode(an_analysis_name)))
106         self.model.add_statement(
107             RDF.Statement(an_analysis,
108                           rdfNS['type'],
109                           submissionOntology['submission']))
110         self.model.add_statement(RDF.Statement(an_analysis,
111                                                submissionOntology['library'],
112                                                libNode))
113
114         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
115         # add track specific information
116         self.model.add_statement(
117             RDF.Statement(an_analysis,
118                           dafTermOntology['paired'],
119                           toTypedNode(self._is_paired(libNode))))
120         self.model.add_statement(
121             RDF.Statement(an_analysis,
122                           dafTermOntology['submission'],
123                           an_analysis))
124
125         # add file specific information
126         fileNode = self.make_file_node(pathname, an_analysis)
127         self.add_md5s(filename, fileNode, analysis_dir)
128         self.add_file_size(filename, fileNode, analysis_dir)
129         self.add_read_length(filename, fileNode, analysis_dir)
130         self.add_fastq_metadata(filename, fileNode)
131         self.add_label(file_type, fileNode, libNode)
132         self.model.add_statement(
133             RDF.Statement(fileNode,
134                           rdfNS['type'],
135                           file_type))
136         self.model.add_statement(
137             RDF.Statement(fileNode,
138                           libraryOntology['library'],
139                           libNode))
140
141         LOGGER.debug("Done.")
142
143     def make_file_node(self, pathname, submissionNode):
144         """Create file node and attach it to its submission.
145         """
146         # add file specific information
147         path, filename = os.path.split(pathname)
148         pathname = os.path.abspath(pathname)
149         fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
150         self.model.add_statement(
151             RDF.Statement(submissionNode,
152                           dafTermOntology['has_file'],
153                           fileNode))
154         self.model.add_statement(
155             RDF.Statement(fileNode,
156                           dafTermOntology['filename'],
157                           filename))
158         self.model.add_statement(
159             RDF.Statement(fileNode,
160                           dafTermOntology['relative_path'],
161                           os.path.relpath(pathname)))
162         return fileNode
163
164     def add_md5s(self, filename, fileNode, analysis_dir):
165         LOGGER.debug("Updating file md5sum")
166         submission_pathname = os.path.join(analysis_dir, filename)
167         md5 = make_md5sum(submission_pathname)
168         if md5 is None:
169             errmsg = "Unable to produce md5sum for {0}"
170             LOGGER.warning(errmsg.format(submission_pathname))
171         else:
172             self.model.add_statement(
173                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
174
175     def add_file_size(self, filename, fileNode, analysis_dir):
176         submission_pathname = os.path.join(analysis_dir, filename)
177         file_size = os.stat(submission_pathname).st_size
178         self.model.add_statement(
179             RDF.Statement(fileNode, dafTermOntology['file_size'], toTypedNode(file_size)))
180         LOGGER.debug("Updating file size: %d", file_size)
181
182     def add_read_length(self, filename, fileNode, analysis_dir):
183         submission_pathname = os.path.join(analysis_dir, filename)
184         stream = opener.autoopen(submission_pathname, 'rt')
185         header = stream.readline().strip()
186         sequence = stream.readline().strip()
187         read_length = len(sequence)
188         self.model.add_statement(
189             RDF.Statement(fileNode,
190                           libraryOntology['read_length'],
191                           toTypedNode(read_length))
192         )
193         LOGGER.debug("Updating read length: %d", read_length)
194
195     def add_fastq_metadata(self, filename, fileNode):
196         # How should I detect if this is actually a fastq file?
197         try:
198             fqname = FastqName(filename=filename)
199         except ValueError:
200             # currently its just ignore it if the fastq name parser fails
201             return
202
203         terms = [('flowcell', libraryOntology['flowcell_id']),
204                  ('lib_id', libraryOntology['library_id']),
205                  ('lane', libraryOntology['lane_number']),
206                  ('read', libraryOntology['read']),
207         ]
208         for file_term, model_term in terms:
209             value = fqname.get(file_term)
210             if value is not None:
211                 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
212                 self.model.append(s)
213
214         if 'flowcell' in fqname:
215             value = self.flowcellNS[fqname['flowcell'] + '/']
216             s = RDF.Statement(fileNode, libraryOntology['flowcell'], value)
217             self.model.append(s)
218
219     def add_label(self, file_type, file_node, lib_node):
220         """Add rdfs:label to a file node
221         """
222         #template_term = libraryOntology['label_template']
223         template_term = libraryOntology['label_template']
224         label_template = self.model.get_target(file_type, template_term)
225         if label_template:
226             template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
227             context = Context({
228                 'library': str(lib_node.uri),
229                 })
230             for r in self.execute_query(template, context):
231                 context = Context(r)
232                 label = Template(label_template).render(context)
233                 s = RDF.Statement(file_node, rdfsNS['label'], unicode(label))
234                 self.model.append(s)
235
236     def _add_library_details_to_model(self, libNode):
237         # attributes that can have multiple values
238         set_attributes = set((libraryOntology['has_lane'],
239                               libraryOntology['has_mappings'],
240                               dafTermOntology['has_file']))
241         parser = RDF.Parser(name='rdfa')
242         try:
243             new_statements = parser.parse_as_stream(libNode.uri)
244         except RDF.RedlandError as e:
245             LOGGER.error(e)
246             return
247         LOGGER.debug("Scanning %s", str(libNode.uri))
248         toadd = []
249         for s in new_statements:
250             # always add "collections"
251             if s.predicate in set_attributes:
252                 toadd.append(s)
253                 continue
254             # don't override things we already have in the model
255             targets = list(self.model.get_targets(s.subject, s.predicate))
256             if len(targets) == 0:
257                 toadd.append(s)
258
259         for s in toadd:
260             self.model.append(s)
261
262         self._add_lane_details(libNode)
263         self._add_flowcell_details()
264
265     def _add_lane_details(self, libNode):
266         """Import lane details
267         """
268         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
269         lanes = []
270         for lane_stmt in self.model.find_statements(query):
271             lanes.append(lane_stmt.object)
272
273         parser = RDF.Parser(name='rdfa')
274         for lane in lanes:
275             LOGGER.debug("Importing %s" % (lane.uri,))
276             try:
277                 parser.parse_into_model(self.model, lane.uri)
278             except RDF.RedlandError as e:
279                 LOGGER.error("Error accessing %s" % (lane.uri,))
280                 raise e
281
282
283     def _add_flowcell_details(self):
284         template = loader.get_template('aws_flowcell.sparql')
285         results = self.execute_query(template, Context())
286
287         parser = RDF.Parser(name='rdfa')
288         for r in self.execute_query(template, Context()):
289             flowcell = r['flowcell']
290             try:
291                 parser.parse_into_model(self.model, flowcell.uri)
292             except RDF.RedlandError as e:
293                 LOGGER.error("Error accessing %s" % (str(flowcell)))
294                 raise e
295
296
297     def find_best_match(self, filename):
298         """Search through potential filename matching patterns
299         """
300         if self.__view_map is None:
301             self.__view_map = self._get_filename_view_map()
302
303         results = []
304         for pattern, view in self.__view_map.items():
305             if re.match(pattern, filename):
306                 results.append(view)
307
308         if len(results) > 1:
309             msg = "%s matched multiple views %s" % (
310                 filename,
311                 [str(x) for x in results])
312             raise ModelException(msg)
313         elif len(results) == 1:
314             return results[0]
315         else:
316             return None
317
318     def _get_filename_view_map(self):
319         """Query our model for filename patterns
320
321         return a dictionary of compiled regular expressions to view names
322         """
323         filename_query = RDF.Statement(
324             None, dafTermOntology['filename_re'], None)
325
326         patterns = {}
327         for s in self.model.find_statements(filename_query):
328             view_name = s.subject
329             literal_re = s.object.literal_value['string']
330             LOGGER.debug("Found: %s" % (literal_re,))
331             try:
332                 filename_re = re.compile(literal_re)
333             except re.error as e:
334                 LOGGER.error("Unable to compile: %s" % (literal_re,))
335             patterns[literal_re] = view_name
336         return patterns
337
338     def make_submission_name(self, analysis_dir):
339         analysis_dir = os.path.normpath(analysis_dir)
340         analysis_dir_name = os.path.split(analysis_dir)[1]
341         if len(analysis_dir_name) == 0:
342             raise RuntimeError(
343                 "Submission dir name too short: {0}".format(analysis_dir))
344         return analysis_dir_name
345
346     def get_submission_node(self, analysis_dir):
347         """Convert a submission directory name to a submission node
348         """
349         submission_name = self.make_submission_name(analysis_dir)
350         return self.submissionSetNS[submission_name]
351
352     def _get_library_attribute(self, libNode, attribute):
353         if not isinstance(attribute, RDF.Node):
354             attribute = libraryOntology[attribute]
355
356         targets = list(self.model.get_targets(libNode, attribute))
357         if len(targets) > 0:
358             return self._format_library_attribute(targets)
359         else:
360             return None
361
362         #targets = self._search_same_as(libNode, attribute)
363         #if targets is not None:
364         #    return self._format_library_attribute(targets)
365
366         # we don't know anything about this attribute
367         self._add_library_details_to_model(libNode)
368
369         targets = list(self.model.get_targets(libNode, attribute))
370         if len(targets) > 0:
371             return self._format_library_attribute(targets)
372
373         return None
374
375     def _format_library_attribute(self, targets):
376         if len(targets) == 0:
377             return None
378         elif len(targets) == 1:
379             return fromTypedNode(targets[0])
380         elif len(targets) > 1:
381             return [fromTypedNode(t) for t in targets]
382
383     def _is_paired(self, libNode):
384         """Determine if a library is paired end"""
385         library_type = self._get_library_attribute(libNode, 'library_type')
386         if library_type is None:
387             errmsg = "%s doesn't have a library type"
388             raise ModelException(errmsg % (str(libNode),))
389
390         single = ['CSHL (lacking last nt)',
391                   'Single End (non-multiplexed)',
392                   'Small RNA (non-multiplexed)',]
393         paired = ['Barcoded Illumina',
394                   'Multiplexing',
395                   'NEBNext Multiplexed',
396                   'NEBNext Small RNA',
397                   'Nextera',
398                   'Paired End (non-multiplexed)',
399                   'Dual Index Illumina',]
400         if library_type in single:
401             return False
402         elif library_type in paired:
403             return True
404         else:
405             raise MetadataLookupException(
406                 "Unrecognized library type %s for %s" % \
407                 (library_type, str(libNode)))
408
409     def execute_query(self, template, context):
410         """Execute the query, returning the results
411         """
412         formatted_query = template.render(context)
413         LOGGER.debug(formatted_query)
414         query = RDF.SPARQLQuery(str(formatted_query))
415         rdfstream = query.execute(self.model)
416         results = []
417         for record in rdfstream:
418             d = {}
419             for key, value in record.items():
420                 d[key] = fromTypedNode(value)
421             results.append(d)
422         return results
423
424
425 def list_submissions(model):
426     """Return generator of submissions in this model.
427     """
428     query_body = """
429       PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
430
431       select distinct ?submission
432       where { ?submission subns:has_submission ?library_dir }
433     """
434     query = RDF.SPARQLQuery(query_body)
435     rdfstream = query.execute(model)
436     for row in rdfstream:
437         s = strip_namespace(submissionLog, row['submission'])
438         if s[-1] in ['#', '/', '?']:
439             s = s[:-1]
440         yield s