Use htsworkflow ontology to validate various RDF using components.
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
17      fromTypedNode, \
18      stripNamespace
19 from htsworkflow.util.rdfns import *
20 from htsworkflow.util.conversion import parse_flowcell_id
21
22 from django.conf import settings
23 from django.template import Context, loader
24
25 import RDF
26
27 LOGGER = logging.getLogger(__name__)
28
29
30 class CondorFastqExtract(object):
31     def __init__(self, host, sequences_path,
32                  log_path='log',
33                  model=None,
34                  force=False):
35         """Extract fastqs from results archive
36
37         Args:
38           host (str): root of the htsworkflow api server
39           apidata (dict): id & key to post to the server
40           sequences_path (str): root of the directory tree to scan for files
41           log_path (str): where to put condor log files
42           force (bool): do we force overwriting current files?
43         """
44         self.host = host
45         self.model = get_model(model)
46         self.sequences_path = sequences_path
47         self.log_path = log_path
48         self.force = force
49         LOGGER.info("CondorFastq host={0}".format(self.host))
50         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
51         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
52
53     def create_scripts(self, result_map ):
54         """
55         Generate condor scripts to build any needed fastq files
56
57         Args:
58           result_map: htsworkflow.submission.results.ResultMap()
59         """
60         template_map = {'srf': 'srf.condor',
61                         'qseq': 'qseq.condor',
62                         'split_fastq': 'split_fastq.condor',
63                         }
64
65         env = None
66         pythonpath = os.environ.get('PYTHONPATH', None)
67         if pythonpath is not None:
68             env = "PYTHONPATH=%s" % (pythonpath,)
69         condor_entries = self.build_condor_arguments(result_map)
70         for script_type in template_map.keys():
71             template = loader.get_template(template_map[script_type])
72             variables = {'python': sys.executable,
73                          'logdir': self.log_path,
74                          'env': env,
75                          'args': condor_entries[script_type],
76                          'root_url': self.host,
77                          }
78             context = Context(variables)
79
80             with open(script_type + '.condor','w+') as outstream:
81                 outstream.write(template.render(context))
82
83     def build_condor_arguments(self, result_map):
84         condor_entries = {'srf': [],
85                           'qseq': [],
86                           'split_fastq': []}
87
88         conversion_funcs = {'srf': self.condor_srf_to_fastq,
89                             'qseq': self.condor_qseq_to_fastq,
90                             'split_fastq': self.condor_desplit_fastq
91                             }
92         sequences = self.find_archive_sequence_files(result_map)
93         needed_targets = self.update_fastq_targets(result_map, sequences)
94
95         for target_pathname, available_sources in needed_targets.items():
96             LOGGER.debug(' target : %s' % (target_pathname,))
97             LOGGER.debug(' candidate sources: %s' % (available_sources,))
98             for condor_type in available_sources.keys():
99                 conversion = conversion_funcs.get(condor_type, None)
100                 if conversion is None:
101                     errmsg = "Unrecognized type: {0} for {1}"
102                     print errmsg.format(condor_type,
103                                         pformat(available_sources))
104                     continue
105                 sources = available_sources.get(condor_type, None)
106
107                 if sources is not None:
108                     condor_entries.setdefault(condor_type, []).append(
109                         conversion(sources, target_pathname))
110             else:
111                 print " need file", target_pathname
112
113         return condor_entries
114
115     def find_archive_sequence_files(self,  result_map):
116         """
117         Find archived sequence files associated with our results.
118         """
119         self.import_libraries(result_map)
120         flowcell_ids = self.find_relavant_flowcell_ids()
121         self.import_sequences(flowcell_ids)
122
123         query_text = """
124         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
125         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
126         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
127
128         select ?filenode ?filetype ?cycle ?lane_number ?read
129                ?library  ?library_id
130                ?flowcell ?flowcell_id ?read_length
131                ?flowcell_type ?flowcell_status
132         where {
133             ?filenode libns:cycle ?cycle ;
134                       libns:lane_number ?lane_number ;
135                       libns:read ?read ;
136                       libns:flowcell ?flowcell ;
137                       libns:flowcell_id ?flowcell_id ;
138                       libns:library ?library ;
139                       libns:library_id ?library_id ;
140                       libns:file_type ?filetype ;
141                       a libns:illumina_result .
142             ?flowcell libns:read_length ?read_length ;
143                       libns:flowcell_type ?flowcell_type .
144             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
145             FILTER(?filetype != libns:sequencer_result)
146         }
147         """
148         LOGGER.debug("find_archive_sequence_files query: %s",
149                      query_text)
150         query = RDF.SPARQLQuery(query_text)
151         results = []
152         for r in query.execute(self.model):
153             library_id = fromTypedNode(r['library_id'])
154             if library_id in result_map:
155                 seq = SequenceResult(r)
156                 LOGGER.debug("Creating sequence result for library %s: %s",
157                              library_id,
158                              repr(seq))
159                 results.append(seq)
160         return results
161
162     def import_libraries(self, result_map):
163         for lib_id in result_map.keys():
164             lib_id_encoded = lib_id.encode('utf-8')
165             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
166             library = RDF.Node(RDF.Uri(liburl))
167             self.import_library(library)
168
169     def import_library(self, library):
170         """Import library data into our model if we don't have it already
171         """
172         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
173         present = False
174         if not self.model.contains_statement(q):
175             present = True
176             load_into_model(self.model, 'rdfa', library)
177         LOGGER.debug("Did we import %s: %s", library, present)
178
179     def find_relavant_flowcell_ids(self):
180         """Generate set of flowcell ids that had samples of interest on them
181         """
182         flowcell_query =RDF.SPARQLQuery("""
183 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
184
185 select distinct ?flowcell ?flowcell_id
186 WHERE {
187   ?library a libns:Library ;
188            libns:has_lane ?lane .
189   ?lane libns:flowcell ?flowcell .
190   ?flowcell libns:flowcell_id ?flowcell_id .
191 }""")
192         flowcell_ids = set()
193         for r in flowcell_query.execute(self.model):
194             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
195             LOGGER.debug("Flowcells = %s" %(unicode(flowcell_ids)))
196             flowcell_test = RDF.Statement(r['flowcell'],
197                                           rdfNS['type'],
198                                           libraryOntology['IlluminaFlowcell'])
199             if not self.model.contains_statement(flowcell_test):
200                 # we probably lack full information about the flowcell.
201                 load_into_model(self.model, 'rdfa', r['flowcell'])
202         return flowcell_ids
203
204     def import_sequences(self, flowcell_ids):
205         seq_dirs = []
206         for f in flowcell_ids:
207             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
208         sequences = scan_for_sequences(seq_dirs)
209         for seq in sequences:
210             seq.save_to_model(self.model, self.host)
211         update_model_sequence_library(self.model, self.host)
212
213     def update_fastq_targets(self, result_map, raw_files):
214         """Return list of fastq files that need to be built.
215
216         Also update model with link between illumina result files
217         and our target fastq file.
218         """
219         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
220         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
221         # find what targets we're missing
222         needed_targets = {}
223         for seq in raw_files:
224             if not seq.isgood:
225                 continue
226             filename_attributes = {
227                 'flowcell': seq.flowcell_id,
228                 'lib_id': seq.library_id,
229                 'lane': seq.lane_number,
230                 'read': seq.read,
231                 'cycle': seq.cycle
232             }
233
234             if seq.ispaired:
235                 target_name = fastq_paired_template % \
236                               filename_attributes
237             else:
238                 target_name = fastq_single_template % \
239                               filename_attributes
240
241             result_dir = result_map[seq.library_id]
242             target_pathname = os.path.join(result_dir, target_name)
243             if self.force or not os.path.exists(target_pathname):
244                 t = needed_targets.setdefault(target_pathname, {})
245                 t.setdefault(seq.filetype, []).append(seq)
246             self.add_target_source_links(target_pathname, seq)
247         return needed_targets
248
249     def add_target_source_links(self, target, seq):
250         """Add link between target pathname and the 'lane' that produced it
251         (note lane objects are now post demultiplexing.)
252         """
253         target_uri = 'file://' + target
254         target_node = RDF.Node(RDF.Uri(target_uri))
255         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
256         self.model.add_statement(source_stmt)
257
258     def condor_srf_to_fastq(self, sources, target_pathname):
259         if len(sources) > 1:
260             raise ValueError("srf to fastq can only handle one file")
261
262         mid_point = None
263         if sources[0].flowcell_id == '30DY0AAXX':
264             mid_point = 76
265
266         return {
267             'sources': [sources[0].path],
268             'pyscript': srf2fastq.__file__,
269             'flowcell': sources[0].flowcell_id,
270             'ispaired': sources[0].ispaired,
271             'target': target_pathname,
272             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
273             'mid': mid_point,
274             'force': self.force,
275         }
276
277     def condor_qseq_to_fastq(self, sources, target_pathname):
278         paths = []
279         for source in sources:
280             paths.append(source.path)
281         paths.sort()
282         return {
283             'pyscript': qseq2fastq.__file__,
284             'flowcell': sources[0].flowcell_id,
285             'target': target_pathname,
286             'sources': paths,
287             'ispaired': sources[0].ispaired,
288             'istar': len(sources) == 1,
289         }
290
291     def condor_desplit_fastq(self, sources, target_pathname):
292         paths = []
293         for source in sources:
294             paths.append(source.path)
295         paths.sort()
296         return {
297             'pyscript': desplit_fastq.__file__,
298             'target': target_pathname,
299             'sources': paths,
300             'ispaired': sources[0].ispaired,
301         }
302
303
304 def make_lane_dict(lib_db, lib_id):
305     """
306     Convert the lane_set in a lib_db to a dictionary
307     indexed by flowcell ID
308     """
309     result = []
310     for lane in lib_db[lib_id]['lane_set']:
311         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
312         lane['flowcell'] = flowcell_id
313         result.append((lane['flowcell'], lane))
314     return dict(result)
315
316 class SequenceResult(object):
317     """Convert the sparql query result from find_archive_sequence_files
318     """
319     def __init__(self, result):
320         self.filenode = result['filenode']
321         self._filetype = result['filetype']
322         self.cycle = fromTypedNode(result['cycle'])
323         self.lane_number = fromTypedNode(result['lane_number'])
324         self.read = fromTypedNode(result['read'])
325         if type(self.read) in types.StringTypes:
326             self.read = 1
327         self.library = result['library']
328         self.library_id = fromTypedNode(result['library_id'])
329         self.flowcell = result['flowcell']
330         self.flowcell_id = fromTypedNode(result['flowcell_id'])
331         self.flowcell_type = fromTypedNode(result['flowcell_type'])
332         self.flowcell_status = fromTypedNode(result['flowcell_status'])
333
334     def _is_good(self):
335         """is this sequence / flowcell 'good enough'"""
336         if self.flowcell_status is not None and \
337            self.flowcell_status.lower() == "failed":
338             return False
339         return True
340     isgood = property(_is_good)
341
342     def _get_ispaired(self):
343         if self.flowcell_type.lower() == "paired":
344             return True
345         else:
346             return False
347     ispaired = property(_get_ispaired)
348
349     def _get_filetype(self):
350         return stripNamespace(libraryOntology, self._filetype)
351     filetype = property(_get_filetype)
352
353     def _get_path(self):
354         url = urlparse(str(self.filenode.uri))
355         if url.scheme == 'file':
356             return url.path
357         else:
358             errmsg = u"Unsupported scheme {0} for {1}"
359             raise ValueError(errmsg.format(url.scheme, unicode(url)))
360     path = property(_get_path)
361
362     def __repr__(self):
363         return "SequenceResult({0},{1},{2})".format(
364             str(self.filenode),
365             str(self.library_id),
366             str(self.flowcell_id))