Port submission.py to rdflib
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 from six.moves.urllib.error import HTTPError
8
9 from rdflib import Graph, Literal, Namespace, URIRef
10 from rdflib.namespace import RDF, RDFS
11
12 from htsworkflow.util.rdfhelp import (
13      dump_model,
14      strip_namespace,
15 )
16 from htsworkflow.util.rdfns import (
17     dafTermOntology,
18     libraryOntology,
19     submissionLog,
20     submissionOntology,
21 )
22 from htsworkflow.util.hashfile import make_md5sum
23 from htsworkflow.submission.fastqname import FastqName
24 from htsworkflow.submission.daf import \
25      MetadataLookupException, \
26      ModelException, \
27      get_submission_uri
28 from htsworkflow.util import opener
29
30 from django.template import Context, Template, loader
31
32 LOGGER = logging.getLogger(__name__)
33
34 class Submission(object):
35     def __init__(self, name, model, host):
36         self.name = name
37         self.model = model
38
39         self.submissionSet = get_submission_uri(self.name)
40         self.submissionSetNS = Namespace(str(self.submissionSet) + '#')
41         self.libraryNS = Namespace('{0}/library/'.format(host))
42         self.flowcellNS = Namespace('{0}/flowcell/'.format(host))
43
44         self.__view_map = None
45
46     def scan_submission_dirs(self, result_map):
47         """Examine files in our result directory
48         """
49         for lib_id, result_dir in result_map.items():
50             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
51             try:
52                 self.import_analysis_dir(result_dir, lib_id)
53             except MetadataLookupException as e:
54                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
55
56     def import_analysis_dir(self, analysis_dir, library_id):
57         """Import a submission directories and update our model as needed
58         """
59         #attributes = get_filename_attribute_map(paired)
60         libNode = self.libraryNS[library_id + "/"]
61
62         self._add_library_details_to_model(libNode)
63
64         submission_files = os.listdir(analysis_dir)
65         for filename in submission_files:
66             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
67             self.construct_file_attributes(analysis_dir, libNode, pathname)
68
69     def analysis_nodes(self, result_map):
70         """Return an iterable of analysis nodes
71         """
72         for result_dir in result_map.values():
73             an_analysis = self.get_submission_node(result_dir)
74             yield an_analysis
75
76     def construct_file_attributes(self, analysis_dir, libNode, pathname):
77         """Looking for the best extension
78         The 'best' is the longest match
79
80         :Args:
81         filename (str): the filename whose extention we are about to examine
82         """
83         path, filename = os.path.split(pathname)
84
85         LOGGER.debug("Searching for view")
86         file_type = self.find_best_match(filename)
87         if file_type is None:
88             LOGGER.warn("Unrecognized file: {0}".format(pathname))
89             return None
90         if str(file_type) == str(libraryOntology['ignore']):
91             return None
92
93         an_analysis_name = self.make_submission_name(analysis_dir)
94         an_analysis = self.get_submission_node(analysis_dir)
95         file_classifications = list(self.model.objects(file_type, RDF['type']))
96         if len(file_classifications) == 0:
97             errmsg = 'Could not find class for {0}'
98             LOGGER.warning(errmsg.format(str(file_type)))
99             return
100         file_classification = file_classifications[0]
101
102         self.model.add((self.submissionSetNS[''],
103                         submissionOntology['has_submission'],
104                         an_analysis))
105         self.model.add((an_analysis,
106                         submissionOntology['name'],
107                         Literal(an_analysis_name)))
108         self.model.add((an_analysis,
109                         RDF['type'],
110                         submissionOntology['submission']))
111         self.model.add((an_analysis,
112                         submissionOntology['library'],
113                         libNode))
114
115         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
116         # add track specific information
117         self.model.add((an_analysis,
118                         dafTermOntology['paired'],
119                         Literal(self._is_paired(libNode))))
120         self.model.add((an_analysis,
121                         dafTermOntology['submission'],
122                         an_analysis))
123
124         # add file specific information
125         fileNode = self.make_file_node(pathname, an_analysis)
126         self.add_md5s(filename, fileNode, analysis_dir)
127         self.add_file_size(filename, fileNode, analysis_dir)
128         self.add_read_length(filename, fileNode, analysis_dir)
129         self.add_fastq_metadata(filename, fileNode)
130         self.add_label(file_type, fileNode, libNode)
131         self.model.add((fileNode,
132                         RDF['type'],
133                         file_type))
134         self.model.add((fileNode,
135                         libraryOntology['library'],
136                         libNode))
137
138         LOGGER.debug("Done.")
139
140     def make_file_node(self, pathname, submissionNode):
141         """Create file node and attach it to its submission.
142         """
143         # add file specific information
144         path, filename = os.path.split(pathname)
145         pathname = os.path.abspath(pathname)
146         fileNode = URIRef('file://'+ pathname)
147         self.model.add((submissionNode,
148                         dafTermOntology['has_file'],
149                         fileNode))
150         self.model.add((fileNode,
151                         dafTermOntology['filename'],
152                         Literal(filename)))
153         self.model.add((fileNode,
154                         dafTermOntology['relative_path'],
155                         Literal(os.path.relpath(pathname))))
156         return fileNode
157
158     def add_md5s(self, filename, fileNode, analysis_dir):
159         LOGGER.debug("Updating file md5sum")
160         submission_pathname = os.path.join(analysis_dir, filename)
161         md5 = make_md5sum(submission_pathname)
162         if md5 is None:
163             errmsg = "Unable to produce md5sum for {0}"
164             LOGGER.warning(errmsg.format(submission_pathname))
165         else:
166             self.model.add((fileNode, dafTermOntology['md5sum'], Literal(md5)))
167
168     def add_file_size(self, filename, fileNode, analysis_dir):
169         submission_pathname = os.path.join(analysis_dir, filename)
170         file_size = os.stat(submission_pathname).st_size
171         self.model.add((fileNode, dafTermOntology['file_size'], Literal(file_size)))
172         LOGGER.debug("Updating file size: %d", file_size)
173
174     def add_read_length(self, filename, fileNode, analysis_dir):
175         submission_pathname = os.path.join(analysis_dir, filename)
176         stream = opener.autoopen(submission_pathname, 'rt')
177         header = stream.readline().strip()
178         sequence = stream.readline().strip()
179         read_length = len(sequence)
180         self.model.add((fileNode,
181                         libraryOntology['read_length'],
182                         Literal(read_length)))
183         LOGGER.debug("Updating read length: %d", read_length)
184
185     def add_fastq_metadata(self, filename, fileNode):
186         # How should I detect if this is actually a fastq file?
187         try:
188             fqname = FastqName(filename=filename)
189         except ValueError:
190             # currently its just ignore it if the fastq name parser fails
191             return
192
193         terms = [('flowcell', libraryOntology['flowcell_id']),
194                  ('lib_id', libraryOntology['library_id']),
195                  ('lane', libraryOntology['lane_number']),
196                  ('read', libraryOntology['read']),
197         ]
198         for file_term, model_term in terms:
199             value = fqname.get(file_term)
200             if value is not None:
201                 s = (fileNode, model_term, value.toPython())
202                 self.model.add(s)
203
204         if 'flowcell' in fqname:
205             value = self.flowcellNS[fqname['flowcell'] + '/']
206             s = (fileNode, libraryOntology['flowcell'], value)
207             self.model.add(s)
208
209     def add_label(self, file_type, file_node, lib_node):
210         """Add rdfs:label to a file node
211         """
212         #template_term = libraryOntology['label_template']
213         template_term = libraryOntology['label_template']
214         label_templates = list(self.model.objects(file_type, template_term))
215         if len(label_templates) > 0:
216             label_template = label_templates[0]
217             template = loader.get_template('submission_view_rdfs_label_metadata.sparql')
218             context = Context({
219                 'library': str(lib_node.uri),
220                 })
221             for r in self.execute_query(template, context):
222                 context = Context(r)
223                 label = Template(label_template).render(context)
224                 s = (file_node, rdfsNS['label'], unicode(label))
225                 self.model.add(s)
226
227     def _add_library_details_to_model(self, libNode):
228         # attributes that can have multiple values
229         set_attributes = set((libraryOntology['has_lane'],
230                               libraryOntology['has_mappings'],
231                               dafTermOntology['has_file']))
232         tmpmodel = Graph()
233         try:
234             tmpmodel.parse(source=libNode, format='rdfa')
235         except HTTPError as e:
236             LOGGER.error(e)
237             return
238
239         LOGGER.debug("Scanning %s", str(libNode))
240         toadd = []
241         for stmt in tmpmodel:
242             s, p, o = stmt
243             # always add "collections"
244             if p in set_attributes:
245                 toadd.append(stmt)
246                 continue
247             # don't override things we already have in the model
248             targets = list(self.model.get_targets(s, p))
249             if len(targets) == 0:
250                 toadd.append(stmt)
251
252         for stmt in toadd:
253             self.model.add(stmt)
254
255         self._add_lane_details(libNode)
256         self._add_flowcell_details()
257
258     def _add_lane_details(self, libNode):
259         """Import lane details
260         """
261         query = (libNode, libraryOntology['has_lane'], None)
262         lanes = []
263         for lane_stmt in self.model.triples(query):
264             lanes.append(lane_stmt[2])
265
266         for lane in lanes:
267             LOGGER.debug("Importing %s" % (lane,))
268             self.model.parse(source=lane, format='rdfa')
269
270
271     def _add_flowcell_details(self):
272         template = loader.get_template('aws_flowcell.sparql')
273
274         for r in self.execute_query(template, Context()):
275             flowcell = r['flowcell']
276             self.model.parse(source=flowcell, format='rdfa')
277
278
279     def find_best_match(self, filename):
280         """Search through potential filename matching patterns
281         """
282         if self.__view_map is None:
283             self.__view_map = self._get_filename_view_map()
284
285         results = []
286         for pattern, view in self.__view_map.items():
287             if re.match(pattern, filename):
288                 results.append(view)
289
290         if len(results) > 1:
291             msg = "%s matched multiple views %s" % (
292                 filename,
293                 [str(x) for x in results])
294             raise ModelException(msg)
295         elif len(results) == 1:
296             return results[0]
297         else:
298             return None
299
300     def _get_filename_view_map(self):
301         """Query our model for filename patterns
302
303         return a dictionary of compiled regular expressions to view names
304         """
305         filename_query = (None, dafTermOntology['filename_re'], None)
306
307         patterns = {}
308         for s in self.model.triples(filename_query):
309             view_name = s[0]
310             literal_re = s[2].value
311             LOGGER.debug("Found: %s" % (literal_re,))
312             try:
313                 filename_re = re.compile(literal_re)
314             except re.error as e:
315                 LOGGER.error("Unable to compile: %s" % (literal_re,))
316             patterns[literal_re] = view_name
317         return patterns
318
319     def make_submission_name(self, analysis_dir):
320         analysis_dir = os.path.normpath(analysis_dir)
321         analysis_dir_name = os.path.split(analysis_dir)[1]
322         if len(analysis_dir_name) == 0:
323             raise RuntimeError(
324                 "Submission dir name too short: {0}".format(analysis_dir))
325         return analysis_dir_name
326
327     def get_submission_node(self, analysis_dir):
328         """Convert a submission directory name to a submission node
329         """
330         submission_name = self.make_submission_name(analysis_dir)
331         return self.submissionSetNS[submission_name]
332
333     def _get_library_attribute(self, libNode, attribute):
334         if not isinstance(libNode, URIRef):
335             raise ValueError("libNode must be a URIRef")
336         if not isinstance(attribute, URIRef):
337             attribute = libraryOntology[attribute]
338
339         targets = list(self.model.objects(libNode, attribute))
340         if len(targets) > 0:
341             return self._format_library_attribute(targets)
342         else:
343             return None
344
345         #targets = self._search_same_as(libNode, attribute)
346         #if targets is not None:
347         #    return self._format_library_attribute(targets)
348
349         # we don't know anything about this attribute
350         self._add_library_details_to_model(libNode)
351
352         targets = list(self.model.objects(libNode, attribute))
353         if len(targets) > 0:
354             return self._format_library_attribute(targets)
355
356         return None
357
358     def _format_library_attribute(self, targets):
359         if len(targets) == 0:
360             return None
361         elif len(targets) == 1:
362             return targets[0].toPython()
363         elif len(targets) > 1:
364             return [t.toPython() for t in targets]
365
366     def _is_paired(self, libNode):
367         """Determine if a library is paired end"""
368         library_type = self._get_library_attribute(libNode, 'library_type')
369         if library_type is None:
370             errmsg = "%s doesn't have a library type"
371             raise ModelException(errmsg % (str(libNode),))
372
373         single = ['CSHL (lacking last nt)',
374                   'Single End (non-multiplexed)',
375                   'Small RNA (non-multiplexed)',]
376         paired = ['Barcoded Illumina',
377                   'Multiplexing',
378                   'NEBNext Multiplexed',
379                   'NEBNext Small RNA',
380                   'Nextera',
381                   'Paired End (non-multiplexed)',
382                   'Dual Index Illumina',]
383         if library_type in single:
384             return False
385         elif library_type in paired:
386             return True
387         else:
388             raise MetadataLookupException(
389                 "Unrecognized library type %s for %s" % \
390                 (library_type, str(libNode)))
391
392     def execute_query(self, template, context):
393         """Execute the query, returning the results
394         """
395         formatted_query = template.render(context)
396         LOGGER.debug(formatted_query)
397         rdfstream = self.model.query(str(formatted_query))
398         results = []
399         for record in rdfstream:
400             d = {}
401             for key, value in record.items():
402                 d[key] = value.toPython()
403             results.append(d)
404         return results
405
406
407 def list_submissions(model):
408     """Return generator of submissions in this model.
409     """
410     query_body = """
411       PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
412
413       select distinct ?submission
414       where { ?submission subns:has_submission ?library_dir }
415     """
416     q = (None, submissionOntology['has_submission'], None)
417     submissions = set()
418     for statement in model.triples(q):
419         s = strip_namespace(submissionLog, statement[0])
420         if s[-1] in ['#', '/', '?']:
421             s = s[:-1]
422         submissions.add(s)
423
424     return list(submissions)