port condorfastq to rdflib
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 import six
9 from six.moves.urllib.parse import urljoin, urlparse
10
11 from rdflib import ConjunctiveGraph, URIRef
12
13 from htsworkflow.pipelines.sequences import scan_for_sequences, \
14      update_model_sequence_library
15 from htsworkflow.pipelines.samplekey import SampleKey
16 from htsworkflow.pipelines import qseq2fastq
17 from htsworkflow.pipelines import srf2fastq
18 from htsworkflow.pipelines import desplit_fastq
19 from htsworkflow.submission.fastqname import FastqName
20 from htsworkflow.util.rdfhelp import dump_model, strip_namespace
21 from htsworkflow.util.rdfns import *
22 from htsworkflow.util.conversion import parse_flowcell_id
23
24 from django.conf import settings
25 from django.template import Context, loader
26 from django.utils.encoding import smart_str
27
28 LOGGER = logging.getLogger(__name__)
29
30 COMPRESSION_EXTENSIONS = {
31     None: '',
32     'gzip': '.gz'
33 }
34
35 class CondorFastqExtract(object):
36     def __init__(self, host, sequences_path,
37                  log_path='log',
38                  compression=None,
39                  force=False):
40         """Extract fastqs from results archive
41
42         Args:
43           host (str): root of the htsworkflow api server
44           apidata (dict): id & key to post to the server
45           sequences_path (str): root of the directory tree to scan for files
46           log_path (str): where to put condor log files
47           compression (str): one of 'gzip', 'bzip2'
48           force (bool): do we force overwriting current files?
49         """
50         self.host = host
51         self.model = ConjunctiveGraph()
52         self.sequences_path = sequences_path
53         self.log_path = log_path
54         self.compression=compression
55         self.force = force
56         LOGGER.info("CondorFastq host={0}".format(self.host))
57         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
58         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
59         LOGGER.info("Compression {0}".format(self.compression))
60
61     def create_scripts(self, result_map ):
62         """
63         Generate condor scripts to build any needed fastq files
64
65         Args:
66           result_map: htsworkflow.submission.results.ResultMap()
67         """
68         template_map = {'srf': 'srf.condor',
69                         'qseq': 'qseq.condor',
70                         'split_fastq': 'split_fastq.condor',
71                         }
72
73         env = None
74         pythonpath = os.environ.get('PYTHONPATH', None)
75         if pythonpath is not None:
76             env = "PYTHONPATH=%s" % (pythonpath,)
77         condor_entries = self.build_condor_arguments(result_map)
78         for script_type in template_map.keys():
79             template = loader.get_template(template_map[script_type])
80             variables = {'python': sys.executable,
81                          'logdir': self.log_path,
82                          'env': env,
83                          'args': condor_entries[script_type],
84                          'root_url': self.host,
85                          }
86             context = Context(variables)
87
88             with open(script_type + '.condor','w+') as outstream:
89                 outstream.write(template.render(context))
90
91     def build_condor_arguments(self, result_map):
92         condor_entries = {'srf': [],
93                           'qseq': [],
94                           'split_fastq': []}
95
96         conversion_funcs = {'srf': self.condor_srf_to_fastq,
97                             'qseq': self.condor_qseq_to_fastq,
98                             'split_fastq': self.condor_desplit_fastq
99                             }
100         sequences = self.find_archive_sequence_files(result_map)
101         needed_targets = self.update_fastq_targets(result_map, sequences)
102
103         for target_pathname, available_sources in needed_targets.items():
104             LOGGER.debug(' target : %s' % (target_pathname,))
105             LOGGER.debug(' candidate sources: %s' % (available_sources,))
106             for condor_type in available_sources.keys():
107                 conversion = conversion_funcs.get(condor_type, None)
108                 if conversion is None:
109                     errmsg = "Unrecognized type: {0} for {1}"
110                     LOGGER.error(errmsg.format(condor_type,
111                                         pformat(available_sources)))
112                     continue
113                 sources = available_sources.get(condor_type, None)
114
115                 if sources is not None:
116                     condor_entries.setdefault(condor_type, []).append(
117                         conversion(sources, target_pathname))
118             else:
119                 LOGGER.warn(" need file %s", target_pathname)
120
121         return condor_entries
122
123     def find_archive_sequence_files(self,  result_map):
124         """
125         Find archived sequence files associated with our results.
126         """
127         self.import_libraries(result_map)
128         flowcell_ids = self.find_relevant_flowcell_ids()
129         self.import_sequences(flowcell_ids)
130
131         query_text = """
132         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
133         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
134         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
135
136         select ?filenode ?filetype ?cycle ?lane_number ?read
137                ?library  ?library_id
138                ?flowcell ?flowcell_id ?read_length
139                ?flowcell_type ?flowcell_status
140         where {
141             ?filenode libns:cycle ?cycle ;
142                       libns:lane_number ?lane_number ;
143                       libns:read ?read ;
144                       libns:flowcell ?flowcell ;
145                       libns:flowcell_id ?flowcell_id ;
146                       libns:library ?library ;
147                       libns:library_id ?library_id ;
148                       libns:file_type ?filetype ;
149                       a libns:IlluminaResult .
150             ?flowcell libns:read_length ?read_length ;
151                       libns:flowcell_type ?flowcell_type .
152             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
153             FILTER(?filetype != libns:sequencer_result)
154         }
155         """
156         LOGGER.debug("find_archive_sequence_files query: %s",
157                      query_text)
158         results = []
159         for r in self.model.query(query_text):
160             library_id = r['library_id'].toPython()
161             if library_id in result_map:
162                 seq = SequenceResult(r)
163                 LOGGER.debug("Creating sequence result for library %s: %s",
164                              library_id,
165                              repr(seq))
166                 results.append(seq)
167         return results
168
169     def import_libraries(self, result_map):
170         for lib_id in result_map.keys():
171             liburl = urljoin(self.host, 'library/%s/' % (lib_id,))
172             library = URIRef(liburl)
173             self.import_library(library)
174
175     def import_library(self, library):
176         """Import library data into our model if we don't have it already
177         """
178         q = (library, RDF['type'], libraryOntology['Library'])
179         present = False
180         if q not in self.model:
181             present = True
182             self.model.parse(source=library, format='rdfa')
183         LOGGER.debug("Did we import %s: %s", str(library), present)
184
185     def find_relevant_flowcell_ids(self):
186         """Generate set of flowcell ids that had samples of interest on them
187         """
188         flowcell_query = """
189 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
190
191 select distinct ?flowcell ?flowcell_id
192 WHERE {
193   ?library a libns:Library ;
194            libns:has_lane ?lane .
195   ?lane libns:flowcell ?flowcell .
196   ?flowcell libns:flowcell_id ?flowcell_id .
197 }"""
198         flowcell_ids = set()
199         for r in self.model.query(flowcell_query):
200             flowcell_ids.add(r['flowcell_id'].toPython())
201             imported = False
202             a_lane = list(self.model.objects(r['flowcell'],
203                                              libraryOntology['has_lane']))
204             if len(a_lane) == 0:
205                 imported = True
206                 # we lack information about which lanes were on this flowcell
207                 self.model.parse(r['flowcell'], format='rdfa')
208             LOGGER.debug("Did we imported %s: %s" % (str(r['flowcell']),
209                                                      imported))
210
211         return flowcell_ids
212
213     def import_sequences(self, flowcell_ids):
214         seq_dirs = []
215         for f in flowcell_ids:
216             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
217         sequences = scan_for_sequences(seq_dirs)
218         for seq in sequences:
219             seq.save_to_model(self.model, self.host)
220         update_model_sequence_library(self.model, self.host)
221
222     def update_fastq_targets(self, result_map, raw_files):
223         """Return list of fastq files that need to be built.
224
225         Also update model with link between illumina result files
226         and our target fastq file.
227         """
228         # find what targets we're missing
229         needed_targets = {}
230         for seq in raw_files:
231             if not seq.isgood:
232                 continue
233             filename_attributes = {
234                 'flowcell': seq.flowcell_id,
235                 'lib_id': seq.library_id,
236                 'lane': seq.lane_number,
237                 'read': seq.read,
238                 'cycle': seq.cycle,
239                 'compression_extension': COMPRESSION_EXTENSIONS[self.compression],
240                 'is_paired': seq.ispaired
241             }
242
243             fqName = FastqName(**filename_attributes)
244
245             result_dir = result_map[seq.library_id]
246             target_pathname = os.path.join(result_dir, fqName.filename)
247             if self.force or not os.path.exists(target_pathname):
248                 t = needed_targets.setdefault(target_pathname, {})
249                 t.setdefault(seq.filetype, []).append(seq)
250             self.add_target_source_links(target_pathname, seq)
251         return needed_targets
252
253     def add_target_source_links(self, target, seq):
254         """Add link between target pathname and the 'lane' that produced it
255         (note lane objects are now post demultiplexing.)
256         """
257         target_uri = 'file://' + smart_str(target)
258         target_node = URIRef(target_uri)
259         source_stmt = (target_node, DC['source'], seq.filenode)
260         self.model.add(source_stmt)
261
262     def condor_srf_to_fastq(self, sources, target_pathname):
263         if len(sources) > 1:
264             raise ValueError("srf to fastq can only handle one file")
265
266         mid_point = None
267         if sources[0].flowcell_id == '30DY0AAXX':
268             mid_point = 76
269
270         return {
271             'sources': [sources[0].path],
272             'pyscript': srf2fastq.__file__,
273             'flowcell': sources[0].flowcell_id,
274             'ispaired': sources[0].ispaired,
275             'target': target_pathname,
276             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
277             'mid': mid_point,
278             'force': self.force,
279         }
280
281     def condor_qseq_to_fastq(self, sources, target_pathname):
282         paths = []
283         for source in sources:
284             paths.append(source.path)
285         paths.sort()
286         compression_argument = self.format_compression_flag()
287
288         return {
289             'pyscript': qseq2fastq.__file__,
290             'flowcell': sources[0].flowcell_id,
291             'target': target_pathname,
292             'compression': compression_argument,
293             'sources': paths,
294             'ispaired': sources[0].ispaired,
295             'istar': len(sources) == 1,
296         }
297
298     def condor_desplit_fastq(self, sources, target_pathname):
299         paths = []
300         for source in sources:
301             paths.append(source.path)
302         paths.sort()
303         compression_argument = self.format_compression_flag()
304
305         return {
306             'pyscript': desplit_fastq.__file__,
307             'target': target_pathname,
308             'compression': compression_argument,
309             'sources': paths,
310             'ispaired': sources[0].ispaired,
311         }
312
313     def format_compression_flag(self):
314         return '--'+self.compression if self.compression else ''
315
316
317 def make_lane_dict(lib_db, lib_id):
318     """
319     Convert the lane_set in a lib_db to a dictionary
320     indexed by flowcell ID
321     """
322     result = []
323     for lane in lib_db[lib_id]['lane_set']:
324         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
325         lane['flowcell'] = flowcell_id
326         result.append((lane['flowcell'], lane))
327     return dict(result)
328
329 class SequenceResult(object):
330     """Convert the sparql query result from find_archive_sequence_files
331     """
332     def __init__(self, result):
333         keys = {
334             'cycle': 'cycle',
335             'lane_number': 'lane_number',
336             'read': 'read',
337             'library': 'library',
338             'library_id': 'library_id',
339             'flowcell': 'flowcell',
340             'flowcell_id': 'flowcell_id',
341             'flowcell_type': 'flowcell_type',
342             'flowcell_status': 'flowcell_status',
343         }
344         for result_name, attribute_name in keys.items():
345             node = result[result_name]
346             if node is not None:
347                 setattr(self, attribute_name, node.toPython())
348             else:
349                 setattr(self, attribute_name, None)
350
351         self.filenode = result['filenode']
352         self._filetype = result['filetype']
353         if isinstance(self.read, six.string_types):
354             self.read = 1
355
356     @property
357     def isgood(self):
358         """is this sequence / flowcell 'good enough'"""
359         if self.flowcell_status is not None and \
360            self.flowcell_status.lower() == "failed":
361             return False
362         return True
363
364     @property
365     def ispaired(self):
366         if self.flowcell_type.lower() == "paired":
367             return True
368         else:
369             return False
370
371     @property
372     def filetype(self):
373         return strip_namespace(libraryOntology, self._filetype)
374
375     @property
376     def path(self):
377         url = urlparse(str(self.filenode))
378         if url.scheme == 'file':
379             return url.path
380         else:
381             errmsg = u"Unsupported scheme {0} for {1}"
382             raise ValueError(errmsg.format(url.scheme, unicode(url)))
383
384     def __repr__(self):
385         return "SequenceResult({0},{1},{2})".format(
386             str(self.filenode),
387             str(self.library_id),
388             str(self.flowcell_id))