Initial port to python3
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dump_model, \
12      fromTypedNode, \
13      get_model, \
14      stripNamespace, \
15      toTypedNode
16 from htsworkflow.util.rdfns import *
17 from htsworkflow.util.hashfile import make_md5sum
18 from htsworkflow.submission.fastqname import FastqName
19 from htsworkflow.submission.daf import \
20      MetadataLookupException, \
21      ModelException, \
22      get_submission_uri
23
24 LOGGER = logging.getLogger(__name__)
25
26 class Submission(object):
27     def __init__(self, name, model, host):
28         self.name = name
29         self.model = model
30
31         self.submissionSet = get_submission_uri(self.name)
32         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
33         self.libraryNS = RDF.NS('{0}/library/'.format(host))
34
35         self.__view_map = None
36
37     def scan_submission_dirs(self, result_map):
38         """Examine files in our result directory
39         """
40         for lib_id, result_dir in list(result_map.items()):
41             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
42             try:
43                 self.import_analysis_dir(result_dir, lib_id)
44             except MetadataLookupException as e:
45                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
46
47     def import_analysis_dir(self, analysis_dir, library_id):
48         """Import a submission directories and update our model as needed
49         """
50         #attributes = get_filename_attribute_map(paired)
51         libNode = self.libraryNS[library_id + "/"]
52
53         self._add_library_details_to_model(libNode)
54
55         submission_files = os.listdir(analysis_dir)
56         for filename in submission_files:
57             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
58             self.construct_file_attributes(analysis_dir, libNode, pathname)
59
60     def analysis_nodes(self, result_map):
61         """Return an iterable of analysis nodes
62         """
63         for result_dir in list(result_map.values()):
64             an_analysis = self.get_submission_node(result_dir)
65             yield an_analysis
66
67     def construct_file_attributes(self, analysis_dir, libNode, pathname):
68         """Looking for the best extension
69         The 'best' is the longest match
70
71         :Args:
72         filename (str): the filename whose extention we are about to examine
73         """
74         path, filename = os.path.split(pathname)
75
76         LOGGER.debug("Searching for view")
77         file_type = self.find_best_match(filename)
78         if file_type is None:
79             LOGGER.warn("Unrecognized file: {0}".format(pathname))
80             return None
81         if str(file_type) == str(libraryOntology['ignore']):
82             return None
83
84         an_analysis_name = self.make_submission_name(analysis_dir)
85         an_analysis = self.get_submission_node(analysis_dir)
86         an_analysis_uri = str(an_analysis.uri)
87         file_classification = self.model.get_target(file_type,
88                                                     rdfNS['type'])
89         if file_classification is None:
90             errmsg = 'Could not find class for {0}'
91             LOGGER.warning(errmsg.format(str(file_type)))
92             return
93
94         self.model.add_statement(
95             RDF.Statement(self.submissionSetNS[''],
96                           submissionOntology['has_submission'],
97                           an_analysis))
98         self.model.add_statement(RDF.Statement(an_analysis,
99                                                submissionOntology['name'],
100                                                toTypedNode(an_analysis_name)))
101         self.model.add_statement(
102             RDF.Statement(an_analysis,
103                           rdfNS['type'],
104                           submissionOntology['submission']))
105         self.model.add_statement(RDF.Statement(an_analysis,
106                                                submissionOntology['library'],
107                                                libNode))
108
109         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
110         # add track specific information
111         self.model.add_statement(
112             RDF.Statement(an_analysis,
113                           dafTermOntology['paired'],
114                           toTypedNode(self._is_paired(libNode))))
115         self.model.add_statement(
116             RDF.Statement(an_analysis,
117                           dafTermOntology['submission'],
118                           an_analysis))
119
120         # add file specific information
121         fileNode = self.make_file_node(pathname, an_analysis)
122         self.add_md5s(filename, fileNode, analysis_dir)
123         self.add_fastq_metadata(filename, fileNode)
124         self.model.add_statement(
125             RDF.Statement(fileNode,
126                           rdfNS['type'],
127                           file_type))
128         self.model.add_statement(
129             RDF.Statement(fileNode,
130                           libraryOntology['library'],
131                           libNode))
132                           
133         LOGGER.debug("Done.")
134
135     def make_file_node(self, pathname, submissionNode):
136         """Create file node and attach it to its submission.
137         """
138         # add file specific information
139         path, filename = os.path.split(pathname)
140         pathname = os.path.abspath(pathname)
141         fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
142         self.model.add_statement(
143             RDF.Statement(submissionNode,
144                           dafTermOntology['has_file'],
145                           fileNode))
146         self.model.add_statement(
147             RDF.Statement(fileNode,
148                           dafTermOntology['filename'],
149                           filename))
150         self.model.add_statement(
151             RDF.Statement(fileNode,
152                           dafTermOntology['relative_path'],
153                           os.path.relpath(pathname)))
154         return fileNode
155
156     def add_md5s(self, filename, fileNode, analysis_dir):
157         LOGGER.debug("Updating file md5sum")
158         submission_pathname = os.path.join(analysis_dir, filename)
159         md5 = make_md5sum(submission_pathname)
160         if md5 is None:
161             errmsg = "Unable to produce md5sum for {0}"
162             LOGGER.warning(errmsg.format(submission_pathname))
163         else:
164             self.model.add_statement(
165                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
166
167     def add_fastq_metadata(self, filename, fileNode):
168         # How should I detect if this is actually a fastq file?
169         try:
170             fqname = FastqName(filename=filename)
171         except ValueError:
172             # currently its just ignore it if the fastq name parser fails
173             return
174         
175         terms = [('flowcell', libraryOntology['flowcell_id']),
176                  ('lib_id', libraryOntology['library_id']),
177                  ('lane', libraryOntology['lane_number']),
178                  ('read', libraryOntology['read']),
179                  ('cycle', libraryOntology['read_length'])]
180         for file_term, model_term in terms:
181             value = fqname.get(file_term)
182             if value is not None:
183                 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
184                 self.model.append(s)
185
186     def _add_library_details_to_model(self, libNode):
187         # attributes that can have multiple values
188         set_attributes = set((libraryOntology['has_lane'],
189                               libraryOntology['has_mappings'],
190                               dafTermOntology['has_file']))
191         parser = RDF.Parser(name='rdfa')
192         try:
193             new_statements = parser.parse_as_stream(libNode.uri)
194         except RDF.RedlandError as e:
195             LOGGER.error(e)
196             return
197         LOGGER.debug("Scanning %s", str(libNode.uri))
198         toadd = []
199         for s in new_statements:
200             # always add "collections"
201             if s.predicate in set_attributes:
202                 toadd.append(s)
203                 continue
204             # don't override things we already have in the model
205             targets = list(self.model.get_targets(s.subject, s.predicate))
206             if len(targets) == 0:
207                 toadd.append(s)
208
209         for s in toadd:
210             self.model.append(s)
211
212         self._add_lane_details(libNode)
213
214     def _add_lane_details(self, libNode):
215         """Import lane details
216         """
217         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
218         lanes = []
219         for lane_stmt in self.model.find_statements(query):
220             lanes.append(lane_stmt.object)
221
222         parser = RDF.Parser(name='rdfa')
223         for lane in lanes:
224             LOGGER.debug("Importing %s" % (lane.uri,))
225             try:
226                 parser.parse_into_model(self.model, lane.uri)
227             except RDF.RedlandError as e:
228                 LOGGER.error("Error accessing %s" % (lane.uri,))
229                 raise e
230
231
232     def find_best_match(self, filename):
233         """Search through potential filename matching patterns
234         """
235         if self.__view_map is None:
236             self.__view_map = self._get_filename_view_map()
237
238         results = []
239         for pattern, view in list(self.__view_map.items()):
240             if re.match(pattern, filename):
241                 results.append(view)
242
243         if len(results) > 1:
244             msg = "%s matched multiple views %s" % (
245                 filename,
246                 [str(x) for x in results])
247             raise ModelException(msg)
248         elif len(results) == 1:
249             return results[0]
250         else:
251             return None
252
253     def _get_filename_view_map(self):
254         """Query our model for filename patterns
255
256         return a dictionary of compiled regular expressions to view names
257         """
258         filename_query = RDF.Statement(
259             None, dafTermOntology['filename_re'], None)
260
261         patterns = {}
262         for s in self.model.find_statements(filename_query):
263             view_name = s.subject
264             literal_re = s.object.literal_value['string']
265             LOGGER.debug("Found: %s" % (literal_re,))
266             try:
267                 filename_re = re.compile(literal_re)
268             except re.error as e:
269                 LOGGER.error("Unable to compile: %s" % (literal_re,))
270             patterns[literal_re] = view_name
271         return patterns
272
273     def make_submission_name(self, analysis_dir):
274         analysis_dir = os.path.normpath(analysis_dir)
275         analysis_dir_name = os.path.split(analysis_dir)[1]
276         if len(analysis_dir_name) == 0:
277             raise RuntimeError(
278                 "Submission dir name too short: {0}".format(analysis_dir))
279         return analysis_dir_name
280
281     def get_submission_node(self, analysis_dir):
282         """Convert a submission directory name to a submission node
283         """
284         submission_name = self.make_submission_name(analysis_dir)
285         return self.submissionSetNS[submission_name]
286
287     def _get_library_attribute(self, libNode, attribute):
288         if not isinstance(attribute, RDF.Node):
289             attribute = libraryOntology[attribute]
290
291         targets = list(self.model.get_targets(libNode, attribute))
292         if len(targets) > 0:
293             return self._format_library_attribute(targets)
294         else:
295             return None
296
297         #targets = self._search_same_as(libNode, attribute)
298         #if targets is not None:
299         #    return self._format_library_attribute(targets)
300
301         # we don't know anything about this attribute
302         self._add_library_details_to_model(libNode)
303
304         targets = list(self.model.get_targets(libNode, attribute))
305         if len(targets) > 0:
306             return self._format_library_attribute(targets)
307
308         return None
309
310     def _format_library_attribute(self, targets):
311         if len(targets) == 0:
312             return None
313         elif len(targets) == 1:
314             return fromTypedNode(targets[0])
315         elif len(targets) > 1:
316             return [fromTypedNode(t) for t in targets]
317
318     def _is_paired(self, libNode):
319         """Determine if a library is paired end"""
320         library_type = self._get_library_attribute(libNode, 'library_type')
321         if library_type is None:
322             errmsg = "%s doesn't have a library type"
323             raise ModelException(errmsg % (str(libNode),))
324
325         single = ['CSHL (lacking last nt)',
326                   'Single End (non-multiplexed)',
327                   'Small RNA (non-multiplexed)',]
328         paired = ['Barcoded Illumina',
329                   'Multiplexing',
330                   'Nextera',
331                   'Paired End (non-multiplexed)',
332                   'Dual Index Illumina',]
333         if library_type in single:
334             return False
335         elif library_type in paired:
336             return True
337         else:
338             raise MetadataLookupException(
339                 "Unrecognized library type %s for %s" % \
340                 (library_type, str(libNode)))
341
342     def execute_query(self, template, context):
343         """Execute the query, returning the results
344         """
345         formatted_query = template.render(context)
346         LOGGER.debug(formatted_query)
347         query = RDF.SPARQLQuery(str(formatted_query))
348         rdfstream = query.execute(self.model)
349         results = []
350         for record in rdfstream:
351             d = {}
352             for key, value in list(record.items()):
353                 d[key] = fromTypedNode(value)
354             results.append(d)
355         return results
356
357
358 def list_submissions(model):
359     """Return generator of submissions in this model.
360     """
361     query_body = """
362       PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
363
364       select distinct ?submission
365       where { ?submission subns:has_submission ?library_dir }
366     """
367     query = RDF.SPARQLQuery(query_body)
368     rdfstream = query.execute(model)
369     for row in rdfstream:
370         s = stripNamespace(submissionLog, row['submission'])
371         if s[-1] in ['#', '/', '?']:
372             s = s[:-1]
373         yield s