Add function to list the names for submissions from the RDF model.
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dump_model, \
12      fromTypedNode, \
13      get_model, \
14      stripNamespace, \
15      toTypedNode
16 from htsworkflow.util.rdfns import *
17 from htsworkflow.util.hashfile import make_md5sum
18 from htsworkflow.submission.fastqname import FastqName
19 from htsworkflow.submission.daf import \
20      MetadataLookupException, \
21      ModelException, \
22      get_submission_uri
23
24 LOGGER = logging.getLogger(__name__)
25
26 class Submission(object):
27     def __init__(self, name, model, host):
28         self.name = name
29         self.model = model
30
31         self.submissionSet = get_submission_uri(self.name)
32         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
33         self.libraryNS = RDF.NS('{0}/library/'.format(host))
34
35         self.__view_map = None
36
37     def scan_submission_dirs(self, result_map):
38         """Examine files in our result directory
39         """
40         for lib_id, result_dir in result_map.items():
41             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
42             try:
43                 self.import_analysis_dir(result_dir, lib_id)
44             except MetadataLookupException, e:
45                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
46
47     def import_analysis_dir(self, analysis_dir, library_id):
48         """Import a submission directories and update our model as needed
49         """
50         #attributes = get_filename_attribute_map(paired)
51         libNode = self.libraryNS[library_id + "/"]
52
53         self._add_library_details_to_model(libNode)
54
55         submission_files = os.listdir(analysis_dir)
56         for filename in submission_files:
57             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
58             self.construct_file_attributes(analysis_dir, libNode, pathname)
59
60     def construct_file_attributes(self, analysis_dir, libNode, pathname):
61         """Looking for the best extension
62         The 'best' is the longest match
63
64         :Args:
65         filename (str): the filename whose extention we are about to examine
66         """
67         path, filename = os.path.split(pathname)
68
69         LOGGER.debug("Searching for view")
70         file_type = self.find_best_match(filename)
71         if file_type is None:
72             LOGGER.warn("Unrecognized file: {0}".format(pathname))
73             return None
74         if str(file_type) == str(libraryOntology['ignore']):
75             return None
76
77         an_analysis_name = self.make_submission_name(analysis_dir)
78         an_analysis = self.get_submission_node(analysis_dir)
79         an_analysis_uri = str(an_analysis.uri)
80         file_classification = self.model.get_target(file_type,
81                                                     rdfNS['type'])
82         if file_classification is None:
83             errmsg = 'Could not find class for {0}'
84             LOGGER.warning(errmsg.format(str(file_type)))
85             return
86
87         self.model.add_statement(
88             RDF.Statement(self.submissionSetNS[''],
89                           submissionOntology['has_submission'],
90                           an_analysis))
91         self.model.add_statement(RDF.Statement(an_analysis,
92                                                submissionOntology['name'],
93                                                toTypedNode(an_analysis_name)))
94         self.model.add_statement(
95             RDF.Statement(an_analysis,
96                           rdfNS['type'],
97                           submissionOntology['submission']))
98         self.model.add_statement(RDF.Statement(an_analysis,
99                                                submissionOntology['library'],
100                                                libNode))
101
102         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
103         # add track specific information
104         self.model.add_statement(
105             RDF.Statement(an_analysis,
106                           dafTermOntology['paired'],
107                           toTypedNode(self._is_paired(libNode))))
108         self.model.add_statement(
109             RDF.Statement(an_analysis,
110                           dafTermOntology['submission'],
111                           an_analysis))
112
113         # add file specific information
114         fileNode = self.make_file_node(pathname, an_analysis)
115         self.add_md5s(filename, fileNode, analysis_dir)
116         self.add_fastq_metadata(filename, fileNode)
117         self.model.add_statement(
118             RDF.Statement(fileNode,
119                           rdfNS['type'],
120                           file_type))
121         self.model.add_statement(
122             RDF.Statement(fileNode,
123                           libraryOntology['library'],
124                           libNode))
125                           
126         LOGGER.debug("Done.")
127
128     def make_file_node(self, pathname, submissionNode):
129         """Create file node and attach it to its submission.
130         """
131         # add file specific information
132         path, filename = os.path.split(pathname)
133         pathname = os.path.abspath(pathname)
134         fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
135         self.model.add_statement(
136             RDF.Statement(submissionNode,
137                           dafTermOntology['has_file'],
138                           fileNode))
139         self.model.add_statement(
140             RDF.Statement(fileNode,
141                           dafTermOntology['filename'],
142                           filename))
143         self.model.add_statement(
144             RDF.Statement(fileNode,
145                           dafTermOntology['relative_path'],
146                           os.path.relpath(pathname)))
147         return fileNode
148
149     def add_md5s(self, filename, fileNode, analysis_dir):
150         LOGGER.debug("Updating file md5sum")
151         submission_pathname = os.path.join(analysis_dir, filename)
152         md5 = make_md5sum(submission_pathname)
153         if md5 is None:
154             errmsg = "Unable to produce md5sum for {0}"
155             LOGGER.warning(errmsg.format(submission_pathname))
156         else:
157             self.model.add_statement(
158                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
159
160     def add_fastq_metadata(self, filename, fileNode):
161         # How should I detect if this is actually a fastq file?
162         try:
163             fqname = FastqName(filename=filename)
164         except ValueError:
165             # currently its just ignore it if the fastq name parser fails
166             return
167         
168         terms = [('flowcell', libraryOntology['flowcell_id']),
169                  ('lib_id', libraryOntology['library_id']),
170                  ('lane', libraryOntology['lane_number']),
171                  ('read', libraryOntology['read']),
172                  ('cycle', libraryOntology['read_length'])]
173         for file_term, model_term in terms:
174             value = fqname.get(file_term)
175             if value is not None:
176                 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
177                 self.model.append(s)
178
179     def _add_library_details_to_model(self, libNode):
180         # attributes that can have multiple values
181         set_attributes = set((libraryOntology['has_lane'],
182                               libraryOntology['has_mappings'],
183                               dafTermOntology['has_file']))
184         parser = RDF.Parser(name='rdfa')
185         try:
186             new_statements = parser.parse_as_stream(libNode.uri)
187         except RDF.RedlandError as e:
188             LOGGER.error(e)
189             return
190         LOGGER.debug("Scanning %s", str(libNode.uri))
191         toadd = []
192         for s in new_statements:
193             # always add "collections"
194             if s.predicate in set_attributes:
195                 toadd.append(s)
196                 continue
197             # don't override things we already have in the model
198             targets = list(self.model.get_targets(s.subject, s.predicate))
199             if len(targets) == 0:
200                 toadd.append(s)
201
202         for s in toadd:
203             self.model.append(s)
204
205         self._add_lane_details(libNode)
206
207     def _add_lane_details(self, libNode):
208         """Import lane details
209         """
210         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
211         lanes = []
212         for lane_stmt in self.model.find_statements(query):
213             lanes.append(lane_stmt.object)
214
215         parser = RDF.Parser(name='rdfa')
216         for lane in lanes:
217             LOGGER.debug("Importing %s" % (lane.uri,))
218             try:
219                 parser.parse_into_model(self.model, lane.uri)
220             except RDF.RedlandError, e:
221                 LOGGER.error("Error accessing %s" % (lane.uri,))
222                 raise e
223
224
225     def find_best_match(self, filename):
226         """Search through potential filename matching patterns
227         """
228         if self.__view_map is None:
229             self.__view_map = self._get_filename_view_map()
230
231         results = []
232         for pattern, view in self.__view_map.items():
233             if re.match(pattern, filename):
234                 results.append(view)
235
236         if len(results) > 1:
237             msg = "%s matched multiple views %s" % (
238                 filename,
239                 [str(x) for x in results])
240             raise ModelException(msg)
241         elif len(results) == 1:
242             return results[0]
243         else:
244             return None
245
246     def _get_filename_view_map(self):
247         """Query our model for filename patterns
248
249         return a dictionary of compiled regular expressions to view names
250         """
251         filename_query = RDF.Statement(
252             None, dafTermOntology['filename_re'], None)
253
254         patterns = {}
255         for s in self.model.find_statements(filename_query):
256             view_name = s.subject
257             literal_re = s.object.literal_value['string']
258             LOGGER.debug("Found: %s" % (literal_re,))
259             try:
260                 filename_re = re.compile(literal_re)
261             except re.error, e:
262                 LOGGER.error("Unable to compile: %s" % (literal_re,))
263             patterns[literal_re] = view_name
264         return patterns
265
266     def make_submission_name(self, analysis_dir):
267         analysis_dir = os.path.normpath(analysis_dir)
268         analysis_dir_name = os.path.split(analysis_dir)[1]
269         if len(analysis_dir_name) == 0:
270             raise RuntimeError(
271                 "Submission dir name too short: {0}".format(analysis_dir))
272         return analysis_dir_name
273
274     def get_submission_node(self, analysis_dir):
275         """Convert a submission directory name to a submission node
276         """
277         submission_name = self.make_submission_name(analysis_dir)
278         return self.submissionSetNS[submission_name]
279
280     def _get_library_attribute(self, libNode, attribute):
281         if not isinstance(attribute, RDF.Node):
282             attribute = libraryOntology[attribute]
283
284         targets = list(self.model.get_targets(libNode, attribute))
285         if len(targets) > 0:
286             return self._format_library_attribute(targets)
287         else:
288             return None
289
290         #targets = self._search_same_as(libNode, attribute)
291         #if targets is not None:
292         #    return self._format_library_attribute(targets)
293
294         # we don't know anything about this attribute
295         self._add_library_details_to_model(libNode)
296
297         targets = list(self.model.get_targets(libNode, attribute))
298         if len(targets) > 0:
299             return self._format_library_attribute(targets)
300
301         return None
302
303     def _format_library_attribute(self, targets):
304         if len(targets) == 0:
305             return None
306         elif len(targets) == 1:
307             return fromTypedNode(targets[0])
308         elif len(targets) > 1:
309             return [fromTypedNode(t) for t in targets]
310
311     def _is_paired(self, libNode):
312         """Determine if a library is paired end"""
313         library_type = self._get_library_attribute(libNode, 'library_type')
314         if library_type is None:
315             errmsg = "%s doesn't have a library type"
316             raise ModelException(errmsg % (str(libNode),))
317
318         single = ['CSHL (lacking last nt)',
319                   'Single End (non-multiplexed)',
320                   'Small RNA (non-multiplexed)',]
321         paired = ['Barcoded Illumina',
322                   'Multiplexing',
323                   'Nextera',
324                   'Paired End (non-multiplexed)',
325                   'Dual Index Illumina',]
326         if library_type in single:
327             return False
328         elif library_type in paired:
329             return True
330         else:
331             raise MetadataLookupException(
332                 "Unrecognized library type %s for %s" % \
333                 (library_type, str(libNode)))
334
335     def execute_query(self, template, context):
336         """Execute the query, returning the results
337         """
338         formatted_query = template.render(context)
339         LOGGER.debug(formatted_query)
340         query = RDF.SPARQLQuery(str(formatted_query))
341         rdfstream = query.execute(self.model)
342         results = []
343         for record in rdfstream:
344             d = {}
345             for key, value in record.items():
346                 d[key] = fromTypedNode(value)
347             results.append(d)
348         return results
349
350
351 def list_submissions(model):
352     """Return generator of submissions in this model.
353     """
354     query_body = """
355       PREFIX subns: <http://jumpgate.caltech.edu/wiki/UcscSubmissionOntology#>
356
357       select distinct ?submission
358       where { ?submission subns:has_submission ?library_dir }
359     """
360     query = RDF.SPARQLQuery(query_body)
361     rdfstream = query.execute(model)
362     for row in rdfstream:
363         s = stripNamespace(submissionLog, row['submission'])
364         if s[-1] in ['#', '/', '?']:
365             s = s[:-1]
366         yield s