Generate manifest files for ENCODE3
[htsworkflow.git] / htsworkflow / submission / submission.py
1 """Common submission elements
2 """
3 import logging
4 import os
5 import re
6
7 import RDF
8
9 from htsworkflow.util.rdfhelp import \
10      blankOrUri, \
11      dafTermOntology, \
12      dump_model, \
13      get_model, \
14      libraryOntology, \
15      owlNS, \
16      rdfNS, \
17      submissionLog, \
18      submissionOntology, \
19      toTypedNode, \
20      fromTypedNode
21 from htsworkflow.util.hashfile import make_md5sum
22 from htsworkflow.submission.fastqname import FastqName
23 from htsworkflow.submission.daf import \
24      MetadataLookupException, \
25      ModelException, \
26      get_submission_uri
27
28 LOGGER = logging.getLogger(__name__)
29
30 class Submission(object):
31     def __init__(self, name, model, host):
32         self.name = name
33         self.model = model
34
35         self.submissionSet = get_submission_uri(self.name)
36         self.submissionSetNS = RDF.NS(str(self.submissionSet) + '#')
37         self.libraryNS = RDF.NS('{0}/library/'.format(host))
38
39         self.__view_map = None
40
41     def scan_submission_dirs(self, result_map):
42         """Examine files in our result directory
43         """
44         for lib_id, result_dir in result_map.items():
45             LOGGER.info("Importing %s from %s" % (lib_id, result_dir))
46             try:
47                 self.import_analysis_dir(result_dir, lib_id)
48             except MetadataLookupException, e:
49                 LOGGER.error("Skipping %s: %s" % (lib_id, str(e)))
50
51     def import_analysis_dir(self, analysis_dir, library_id):
52         """Import a submission directories and update our model as needed
53         """
54         #attributes = get_filename_attribute_map(paired)
55         libNode = self.libraryNS[library_id + "/"]
56
57         self._add_library_details_to_model(libNode)
58
59         submission_files = os.listdir(analysis_dir)
60         for filename in submission_files:
61             pathname = os.path.abspath(os.path.join(analysis_dir, filename))
62             self.construct_file_attributes(analysis_dir, libNode, pathname)
63
64     def construct_file_attributes(self, analysis_dir, libNode, pathname):
65         """Looking for the best extension
66         The 'best' is the longest match
67
68         :Args:
69         filename (str): the filename whose extention we are about to examine
70         """
71         path, filename = os.path.split(pathname)
72
73         LOGGER.debug("Searching for view")
74         file_type = self.find_best_match(filename)
75         if file_type is None:
76             LOGGER.warn("Unrecognized file: {0}".format(pathname))
77             return None
78         if str(file_type) == str(libraryOntology['ignore']):
79             return None
80
81         an_analysis_name = self.make_submission_name(analysis_dir)
82         an_analysis = self.get_submission_node(analysis_dir)
83         an_analysis_uri = str(an_analysis.uri)
84         file_classification = self.model.get_target(file_type,
85                                                     rdfNS['type'])
86         if file_classification is None:
87             errmsg = 'Could not find class for {0}'
88             LOGGER.warning(errmsg.format(str(file_type)))
89             return
90
91         self.model.add_statement(
92             RDF.Statement(self.submissionSetNS[''],
93                           submissionOntology['has_submission'],
94                           an_analysis))
95         self.model.add_statement(RDF.Statement(an_analysis,
96                                                submissionOntology['name'],
97                                                toTypedNode(an_analysis_name)))
98         self.model.add_statement(
99             RDF.Statement(an_analysis,
100                           rdfNS['type'],
101                           submissionOntology['submission']))
102         self.model.add_statement(RDF.Statement(an_analysis,
103                                                submissionOntology['library'],
104                                                libNode))
105
106         LOGGER.debug("Adding statements to {0}".format(str(an_analysis)))
107         # add track specific information
108         self.model.add_statement(
109             RDF.Statement(an_analysis,
110                           dafTermOntology['paired'],
111                           toTypedNode(self._is_paired(libNode))))
112         self.model.add_statement(
113             RDF.Statement(an_analysis,
114                           dafTermOntology['submission'],
115                           an_analysis))
116
117         # add file specific information
118         fileNode = self.make_file_node(pathname, an_analysis)
119         self.add_md5s(filename, fileNode, analysis_dir)
120         self.add_fastq_metadata(filename, fileNode)
121         self.model.add_statement(
122             RDF.Statement(fileNode,
123                           rdfNS['type'],
124                           file_type))
125         self.model.add_statement(
126             RDF.Statement(fileNode,
127                           libraryOntology['library'],
128                           libNode))
129                           
130         LOGGER.debug("Done.")
131
132     def make_file_node(self, pathname, submissionNode):
133         """Create file node and attach it to its submission.
134         """
135         # add file specific information
136         path, filename = os.path.split(pathname)
137         pathname = os.path.abspath(pathname)
138         fileNode = RDF.Node(RDF.Uri('file://'+ pathname))
139         self.model.add_statement(
140             RDF.Statement(submissionNode,
141                           dafTermOntology['has_file'],
142                           fileNode))
143         self.model.add_statement(
144             RDF.Statement(fileNode,
145                           dafTermOntology['filename'],
146                           filename))
147         self.model.add_statement(
148             RDF.Statement(fileNode,
149                           dafTermOntology['relative_path'],
150                           os.path.relpath(pathname)))
151         return fileNode
152
153     def add_md5s(self, filename, fileNode, analysis_dir):
154         LOGGER.debug("Updating file md5sum")
155         submission_pathname = os.path.join(analysis_dir, filename)
156         md5 = make_md5sum(submission_pathname)
157         if md5 is None:
158             errmsg = "Unable to produce md5sum for {0}"
159             LOGGER.warning(errmsg.format(submission_pathname))
160         else:
161             self.model.add_statement(
162                 RDF.Statement(fileNode, dafTermOntology['md5sum'], md5))
163
164     def add_fastq_metadata(self, filename, fileNode):
165         # How should I detect if this is actually a fastq file?
166         try:
167             fqname = FastqName(filename=filename)
168         except ValueError:
169             # currently its just ignore it if the fastq name parser fails
170             return
171         
172         terms = [('flowcell', libraryOntology['flowcell_id']),
173                  ('lib_id', libraryOntology['library_id']),
174                  ('lane', libraryOntology['lane_number']),
175                  ('read', libraryOntology['read']),
176                  ('cycle', libraryOntology['read_length'])]
177         for file_term, model_term in terms:
178             value = fqname.get(file_term)
179             if value is not None:
180                 s = RDF.Statement(fileNode, model_term, toTypedNode(value))
181                 self.model.append(s)
182
183     def _add_library_details_to_model(self, libNode):
184         # attributes that can have multiple values
185         set_attributes = set((libraryOntology['has_lane'],
186                               libraryOntology['has_mappings'],
187                               dafTermOntology['has_file']))
188         parser = RDF.Parser(name='rdfa')
189         try:
190             new_statements = parser.parse_as_stream(libNode.uri)
191         except RDF.RedlandError as e:
192             LOGGER.error(e)
193             return
194         LOGGER.debug("Scanning %s", str(libNode.uri))
195         toadd = []
196         for s in new_statements:
197             # always add "collections"
198             if s.predicate in set_attributes:
199                 toadd.append(s)
200                 continue
201             # don't override things we already have in the model
202             targets = list(self.model.get_targets(s.subject, s.predicate))
203             if len(targets) == 0:
204                 toadd.append(s)
205
206         for s in toadd:
207             self.model.append(s)
208
209         self._add_lane_details(libNode)
210
211     def _add_lane_details(self, libNode):
212         """Import lane details
213         """
214         query = RDF.Statement(libNode, libraryOntology['has_lane'], None)
215         lanes = []
216         for lane_stmt in self.model.find_statements(query):
217             lanes.append(lane_stmt.object)
218
219         parser = RDF.Parser(name='rdfa')
220         for lane in lanes:
221             LOGGER.debug("Importing %s" % (lane.uri,))
222             try:
223                 parser.parse_into_model(self.model, lane.uri)
224             except RDF.RedlandError, e:
225                 LOGGER.error("Error accessing %s" % (lane.uri,))
226                 raise e
227
228
229     def find_best_match(self, filename):
230         """Search through potential filename matching patterns
231         """
232         if self.__view_map is None:
233             self.__view_map = self._get_filename_view_map()
234
235         results = []
236         for pattern, view in self.__view_map.items():
237             if re.match(pattern, filename):
238                 results.append(view)
239
240         if len(results) > 1:
241             msg = "%s matched multiple views %s" % (
242                 filename,
243                 [str(x) for x in results])
244             raise ModelException(msg)
245         elif len(results) == 1:
246             return results[0]
247         else:
248             return None
249
250     def _get_filename_view_map(self):
251         """Query our model for filename patterns
252
253         return a dictionary of compiled regular expressions to view names
254         """
255         filename_query = RDF.Statement(
256             None, dafTermOntology['filename_re'], None)
257
258         patterns = {}
259         for s in self.model.find_statements(filename_query):
260             view_name = s.subject
261             literal_re = s.object.literal_value['string']
262             LOGGER.debug("Found: %s" % (literal_re,))
263             try:
264                 filename_re = re.compile(literal_re)
265             except re.error, e:
266                 LOGGER.error("Unable to compile: %s" % (literal_re,))
267             patterns[literal_re] = view_name
268         return patterns
269
270     def make_submission_name(self, analysis_dir):
271         analysis_dir = os.path.normpath(analysis_dir)
272         analysis_dir_name = os.path.split(analysis_dir)[1]
273         if len(analysis_dir_name) == 0:
274             raise RuntimeError(
275                 "Submission dir name too short: {0}".format(analysis_dir))
276         return analysis_dir_name
277
278     def get_submission_node(self, analysis_dir):
279         """Convert a submission directory name to a submission node
280         """
281         submission_name = self.make_submission_name(analysis_dir)
282         return self.submissionSetNS[submission_name]
283
284     def _get_library_attribute(self, libNode, attribute):
285         if not isinstance(attribute, RDF.Node):
286             attribute = libraryOntology[attribute]
287
288         targets = list(self.model.get_targets(libNode, attribute))
289         if len(targets) > 0:
290             return self._format_library_attribute(targets)
291         else:
292             return None
293
294         #targets = self._search_same_as(libNode, attribute)
295         #if targets is not None:
296         #    return self._format_library_attribute(targets)
297
298         # we don't know anything about this attribute
299         self._add_library_details_to_model(libNode)
300
301         targets = list(self.model.get_targets(libNode, attribute))
302         if len(targets) > 0:
303             return self._format_library_attribute(targets)
304
305         return None
306
307     def _format_library_attribute(self, targets):
308         if len(targets) == 0:
309             return None
310         elif len(targets) == 1:
311             return fromTypedNode(targets[0])
312         elif len(targets) > 1:
313             return [fromTypedNode(t) for t in targets]
314
315     def _is_paired(self, libNode):
316         """Determine if a library is paired end"""
317         library_type = self._get_library_attribute(libNode, 'library_type')
318         if library_type is None:
319             errmsg = "%s doesn't have a library type"
320             raise ModelException(errmsg % (str(libNode),))
321
322         single = ['CSHL (lacking last nt)',
323                   'Single End (non-multiplexed)',
324                   'Small RNA (non-multiplexed)',]
325         paired = ['Barcoded Illumina',
326                   'Multiplexing',
327                   'Nextera',
328                   'Paired End (non-multiplexed)',
329                   'Dual Index Illumina',]
330         if library_type in single:
331             return False
332         elif library_type in paired:
333             return True
334         else:
335             raise MetadataLookupException(
336                 "Unrecognized library type %s for %s" % \
337                 (library_type, str(libNode)))
338
339     def execute_query(self, template, context):
340         """Execute the query, returning the results
341         """
342         formatted_query = template.render(context)
343         LOGGER.debug(formatted_query)
344         query = RDF.SPARQLQuery(str(formatted_query))
345         rdfstream = query.execute(self.model)
346         results = []
347         for record in rdfstream:
348             d = {}
349             for key, value in record.items():
350                 d[key] = fromTypedNode(value)
351             results.append(d)
352         return results