Use django.utils.encode smart_text or smart_str for intelligent unicode handling
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from six.moves.urllib.parse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.submission.fastqname import FastqName
17 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
18      fromTypedNode, \
19      strip_namespace
20 from htsworkflow.util.rdfns import *
21 from htsworkflow.util.conversion import parse_flowcell_id
22
23 from django.conf import settings
24 from django.template import Context, loader
25 from django.utils.encoding import smart_str
26
27 import RDF
28
29 LOGGER = logging.getLogger(__name__)
30
31 COMPRESSION_EXTENSIONS = {
32     None: '',
33     'gzip': '.gz'
34 }
35
36 class CondorFastqExtract(object):
37     def __init__(self, host, sequences_path,
38                  log_path='log',
39                  model=None,
40                  compression=None,
41                  force=False):
42         """Extract fastqs from results archive
43
44         Args:
45           host (str): root of the htsworkflow api server
46           apidata (dict): id & key to post to the server
47           sequences_path (str): root of the directory tree to scan for files
48           log_path (str): where to put condor log files
49           compression (str): one of 'gzip', 'bzip2'
50           force (bool): do we force overwriting current files?
51         """
52         self.host = host
53         self.model = get_model(model)
54         self.sequences_path = sequences_path
55         self.log_path = log_path
56         self.compression=compression
57         self.force = force
58         LOGGER.info("CondorFastq host={0}".format(self.host))
59         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
60         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
61         LOGGER.info("Compression {0}".format(self.compression))
62
63     def create_scripts(self, result_map ):
64         """
65         Generate condor scripts to build any needed fastq files
66
67         Args:
68           result_map: htsworkflow.submission.results.ResultMap()
69         """
70         template_map = {'srf': 'srf.condor',
71                         'qseq': 'qseq.condor',
72                         'split_fastq': 'split_fastq.condor',
73                         }
74
75         env = None
76         pythonpath = os.environ.get('PYTHONPATH', None)
77         if pythonpath is not None:
78             env = "PYTHONPATH=%s" % (pythonpath,)
79         condor_entries = self.build_condor_arguments(result_map)
80         for script_type in template_map.keys():
81             template = loader.get_template(template_map[script_type])
82             variables = {'python': sys.executable,
83                          'logdir': self.log_path,
84                          'env': env,
85                          'args': condor_entries[script_type],
86                          'root_url': self.host,
87                          }
88             context = Context(variables)
89
90             with open(script_type + '.condor','w+') as outstream:
91                 outstream.write(template.render(context))
92
93     def build_condor_arguments(self, result_map):
94         condor_entries = {'srf': [],
95                           'qseq': [],
96                           'split_fastq': []}
97
98         conversion_funcs = {'srf': self.condor_srf_to_fastq,
99                             'qseq': self.condor_qseq_to_fastq,
100                             'split_fastq': self.condor_desplit_fastq
101                             }
102         sequences = self.find_archive_sequence_files(result_map)
103         needed_targets = self.update_fastq_targets(result_map, sequences)
104
105         for target_pathname, available_sources in needed_targets.items():
106             LOGGER.debug(' target : %s' % (target_pathname,))
107             LOGGER.debug(' candidate sources: %s' % (available_sources,))
108             for condor_type in available_sources.keys():
109                 conversion = conversion_funcs.get(condor_type, None)
110                 if conversion is None:
111                     errmsg = "Unrecognized type: {0} for {1}"
112                     LOGGER.error(errmsg.format(condor_type,
113                                         pformat(available_sources)))
114                     continue
115                 sources = available_sources.get(condor_type, None)
116
117                 if sources is not None:
118                     condor_entries.setdefault(condor_type, []).append(
119                         conversion(sources, target_pathname))
120             else:
121                 LOGGER.warn(" need file %s", target_pathname)
122
123         return condor_entries
124
125     def find_archive_sequence_files(self,  result_map):
126         """
127         Find archived sequence files associated with our results.
128         """
129         self.import_libraries(result_map)
130         flowcell_ids = self.find_relevant_flowcell_ids()
131         self.import_sequences(flowcell_ids)
132
133         query_text = """
134         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
135         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
136         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
137
138         select ?filenode ?filetype ?cycle ?lane_number ?read
139                ?library  ?library_id
140                ?flowcell ?flowcell_id ?read_length
141                ?flowcell_type ?flowcell_status
142         where {
143             ?filenode libns:cycle ?cycle ;
144                       libns:lane_number ?lane_number ;
145                       libns:read ?read ;
146                       libns:flowcell ?flowcell ;
147                       libns:flowcell_id ?flowcell_id ;
148                       libns:library ?library ;
149                       libns:library_id ?library_id ;
150                       libns:file_type ?filetype ;
151                       a libns:IlluminaResult .
152             ?flowcell libns:read_length ?read_length ;
153                       libns:flowcell_type ?flowcell_type .
154             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
155             FILTER(?filetype != libns:sequencer_result)
156         }
157         """
158         LOGGER.debug("find_archive_sequence_files query: %s",
159                      query_text)
160         query = RDF.SPARQLQuery(query_text)
161         results = []
162         for r in query.execute(self.model):
163             library_id = fromTypedNode(r['library_id'])
164             if library_id in result_map:
165                 seq = SequenceResult(r)
166                 LOGGER.debug("Creating sequence result for library %s: %s",
167                              library_id,
168                              repr(seq))
169                 results.append(seq)
170         return results
171
172     def import_libraries(self, result_map):
173         for lib_id in result_map.keys():
174             lib_id_encoded = lib_id.encode('utf-8')
175             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
176             library = RDF.Node(RDF.Uri(liburl))
177             self.import_library(library)
178
179     def import_library(self, library):
180         """Import library data into our model if we don't have it already
181         """
182         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
183         present = False
184         if not self.model.contains_statement(q):
185             present = True
186             load_into_model(self.model, 'rdfa', library)
187         LOGGER.debug("Did we import %s: %s", library.uri, present)
188
189     def find_relevant_flowcell_ids(self):
190         """Generate set of flowcell ids that had samples of interest on them
191         """
192         flowcell_query = RDF.SPARQLQuery("""
193 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
194
195 select distinct ?flowcell ?flowcell_id
196 WHERE {
197   ?library a libns:Library ;
198            libns:has_lane ?lane .
199   ?lane libns:flowcell ?flowcell .
200   ?flowcell libns:flowcell_id ?flowcell_id .
201 }""")
202         flowcell_ids = set()
203         for r in flowcell_query.execute(self.model):
204             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
205             imported = False
206             a_lane = self.model.get_target(r['flowcell'],
207                                            libraryOntology['has_lane'])
208             if a_lane is None:
209                 imported = True
210                 # we lack information about which lanes were on this flowcell
211                 load_into_model(self.model, 'rdfa', r['flowcell'])
212             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
213                                                      imported))
214
215         return flowcell_ids
216
217     def import_sequences(self, flowcell_ids):
218         seq_dirs = []
219         for f in flowcell_ids:
220             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
221         sequences = scan_for_sequences(seq_dirs)
222         for seq in sequences:
223             seq.save_to_model(self.model, self.host)
224         update_model_sequence_library(self.model, self.host)
225
226     def update_fastq_targets(self, result_map, raw_files):
227         """Return list of fastq files that need to be built.
228
229         Also update model with link between illumina result files
230         and our target fastq file.
231         """
232         # find what targets we're missing
233         needed_targets = {}
234         for seq in raw_files:
235             if not seq.isgood:
236                 continue
237             filename_attributes = {
238                 'flowcell': seq.flowcell_id,
239                 'lib_id': seq.library_id,
240                 'lane': seq.lane_number,
241                 'read': seq.read,
242                 'cycle': seq.cycle,
243                 'compression_extension': COMPRESSION_EXTENSIONS[self.compression],
244                 'is_paired': seq.ispaired
245             }
246
247             fqName = FastqName(**filename_attributes)
248
249             result_dir = result_map[seq.library_id]
250             target_pathname = os.path.join(result_dir, fqName.filename)
251             if self.force or not os.path.exists(target_pathname):
252                 t = needed_targets.setdefault(target_pathname, {})
253                 t.setdefault(seq.filetype, []).append(seq)
254             self.add_target_source_links(target_pathname, seq)
255         return needed_targets
256
257     def add_target_source_links(self, target, seq):
258         """Add link between target pathname and the 'lane' that produced it
259         (note lane objects are now post demultiplexing.)
260         """
261         target_uri = 'file://' + smart_str(target)
262         target_node = RDF.Node(RDF.Uri(target_uri))
263         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
264         self.model.add_statement(source_stmt)
265
266     def condor_srf_to_fastq(self, sources, target_pathname):
267         if len(sources) > 1:
268             raise ValueError("srf to fastq can only handle one file")
269
270         mid_point = None
271         if sources[0].flowcell_id == '30DY0AAXX':
272             mid_point = 76
273
274         return {
275             'sources': [sources[0].path],
276             'pyscript': srf2fastq.__file__,
277             'flowcell': sources[0].flowcell_id,
278             'ispaired': sources[0].ispaired,
279             'target': target_pathname,
280             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
281             'mid': mid_point,
282             'force': self.force,
283         }
284
285     def condor_qseq_to_fastq(self, sources, target_pathname):
286         paths = []
287         for source in sources:
288             paths.append(source.path)
289         paths.sort()
290         return {
291             'pyscript': qseq2fastq.__file__,
292             'flowcell': sources[0].flowcell_id,
293             'target': target_pathname,
294             'sources': paths,
295             'ispaired': sources[0].ispaired,
296             'istar': len(sources) == 1,
297         }
298
299     def condor_desplit_fastq(self, sources, target_pathname):
300         paths = []
301         for source in sources:
302             paths.append(source.path)
303         paths.sort()
304         compression_argument = ''
305         if self.compression:
306             compression_argument = '--'+self.compression
307
308         return {
309             'pyscript': desplit_fastq.__file__,
310             'target': target_pathname,
311             'compression': compression_argument,
312             'sources': paths,
313             'ispaired': sources[0].ispaired,
314         }
315
316
317 def make_lane_dict(lib_db, lib_id):
318     """
319     Convert the lane_set in a lib_db to a dictionary
320     indexed by flowcell ID
321     """
322     result = []
323     for lane in lib_db[lib_id]['lane_set']:
324         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
325         lane['flowcell'] = flowcell_id
326         result.append((lane['flowcell'], lane))
327     return dict(result)
328
329 class SequenceResult(object):
330     """Convert the sparql query result from find_archive_sequence_files
331     """
332     def __init__(self, result):
333         self.filenode = result['filenode']
334         self._filetype = result['filetype']
335         self.cycle = fromTypedNode(result['cycle'])
336         self.lane_number = fromTypedNode(result['lane_number'])
337         self.read = fromTypedNode(result['read'])
338         if type(self.read) in types.StringTypes:
339             self.read = 1
340         self.library = result['library']
341         self.library_id = fromTypedNode(result['library_id'])
342         self.flowcell = result['flowcell']
343         self.flowcell_id = fromTypedNode(result['flowcell_id'])
344         self.flowcell_type = fromTypedNode(result['flowcell_type'])
345         self.flowcell_status = fromTypedNode(result['flowcell_status'])
346
347     def _is_good(self):
348         """is this sequence / flowcell 'good enough'"""
349         if self.flowcell_status is not None and \
350            self.flowcell_status.lower() == "failed":
351             return False
352         return True
353     isgood = property(_is_good)
354
355     def _get_ispaired(self):
356         if self.flowcell_type.lower() == "paired":
357             return True
358         else:
359             return False
360     ispaired = property(_get_ispaired)
361
362     def _get_filetype(self):
363         return strip_namespace(libraryOntology, self._filetype)
364     filetype = property(_get_filetype)
365
366     def _get_path(self):
367         url = urlparse(str(self.filenode.uri))
368         if url.scheme == 'file':
369             return url.path
370         else:
371             errmsg = u"Unsupported scheme {0} for {1}"
372             raise ValueError(errmsg.format(url.scheme, unicode(url)))
373     path = property(_get_path)
374
375     def __repr__(self):
376         return "SequenceResult({0},{1},{2})".format(
377             str(self.filenode),
378             str(self.library_id),
379             str(self.flowcell_id))