01173cad21404f2e3e1a458c8497bbbd8be92ddc
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 import six
9 from six.moves.urllib.parse import urljoin, urlparse
10
11 from htsworkflow.pipelines.sequences import scan_for_sequences, \
12      update_model_sequence_library
13 from htsworkflow.pipelines.samplekey import SampleKey
14 from htsworkflow.pipelines import qseq2fastq
15 from htsworkflow.pipelines import srf2fastq
16 from htsworkflow.pipelines import desplit_fastq
17 from htsworkflow.submission.fastqname import FastqName
18 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
19      fromTypedNode, \
20      strip_namespace
21 from htsworkflow.util.rdfns import *
22 from htsworkflow.util.conversion import parse_flowcell_id
23
24 from django.conf import settings
25 from django.template import Context, loader
26 from django.utils.encoding import smart_str
27
28 import RDF
29
30 LOGGER = logging.getLogger(__name__)
31
32 COMPRESSION_EXTENSIONS = {
33     None: '',
34     'gzip': '.gz'
35 }
36
37 class CondorFastqExtract(object):
38     def __init__(self, host, sequences_path,
39                  log_path='log',
40                  model=None,
41                  compression=None,
42                  force=False):
43         """Extract fastqs from results archive
44
45         Args:
46           host (str): root of the htsworkflow api server
47           apidata (dict): id & key to post to the server
48           sequences_path (str): root of the directory tree to scan for files
49           log_path (str): where to put condor log files
50           compression (str): one of 'gzip', 'bzip2'
51           force (bool): do we force overwriting current files?
52         """
53         self.host = host
54         self.model = get_model(model)
55         self.sequences_path = sequences_path
56         self.log_path = log_path
57         self.compression=compression
58         self.force = force
59         LOGGER.info("CondorFastq host={0}".format(self.host))
60         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
61         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
62         LOGGER.info("Compression {0}".format(self.compression))
63
64     def create_scripts(self, result_map ):
65         """
66         Generate condor scripts to build any needed fastq files
67
68         Args:
69           result_map: htsworkflow.submission.results.ResultMap()
70         """
71         template_map = {'srf': 'srf.condor',
72                         'qseq': 'qseq.condor',
73                         'split_fastq': 'split_fastq.condor',
74                         }
75
76         env = None
77         pythonpath = os.environ.get('PYTHONPATH', None)
78         if pythonpath is not None:
79             env = "PYTHONPATH=%s" % (pythonpath,)
80         condor_entries = self.build_condor_arguments(result_map)
81         for script_type in template_map.keys():
82             template = loader.get_template(template_map[script_type])
83             variables = {'python': sys.executable,
84                          'logdir': self.log_path,
85                          'env': env,
86                          'args': condor_entries[script_type],
87                          'root_url': self.host,
88                          }
89             context = Context(variables)
90
91             with open(script_type + '.condor','w+') as outstream:
92                 outstream.write(template.render(context))
93
94     def build_condor_arguments(self, result_map):
95         condor_entries = {'srf': [],
96                           'qseq': [],
97                           'split_fastq': []}
98
99         conversion_funcs = {'srf': self.condor_srf_to_fastq,
100                             'qseq': self.condor_qseq_to_fastq,
101                             'split_fastq': self.condor_desplit_fastq
102                             }
103         sequences = self.find_archive_sequence_files(result_map)
104         needed_targets = self.update_fastq_targets(result_map, sequences)
105
106         for target_pathname, available_sources in needed_targets.items():
107             LOGGER.debug(' target : %s' % (target_pathname,))
108             LOGGER.debug(' candidate sources: %s' % (available_sources,))
109             for condor_type in available_sources.keys():
110                 conversion = conversion_funcs.get(condor_type, None)
111                 if conversion is None:
112                     errmsg = "Unrecognized type: {0} for {1}"
113                     LOGGER.error(errmsg.format(condor_type,
114                                         pformat(available_sources)))
115                     continue
116                 sources = available_sources.get(condor_type, None)
117
118                 if sources is not None:
119                     condor_entries.setdefault(condor_type, []).append(
120                         conversion(sources, target_pathname))
121             else:
122                 LOGGER.warn(" need file %s", target_pathname)
123
124         return condor_entries
125
126     def find_archive_sequence_files(self,  result_map):
127         """
128         Find archived sequence files associated with our results.
129         """
130         self.import_libraries(result_map)
131         flowcell_ids = self.find_relevant_flowcell_ids()
132         self.import_sequences(flowcell_ids)
133
134         query_text = """
135         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
136         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
137         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
138
139         select ?filenode ?filetype ?cycle ?lane_number ?read
140                ?library  ?library_id
141                ?flowcell ?flowcell_id ?read_length
142                ?flowcell_type ?flowcell_status
143         where {
144             ?filenode libns:cycle ?cycle ;
145                       libns:lane_number ?lane_number ;
146                       libns:read ?read ;
147                       libns:flowcell ?flowcell ;
148                       libns:flowcell_id ?flowcell_id ;
149                       libns:library ?library ;
150                       libns:library_id ?library_id ;
151                       libns:file_type ?filetype ;
152                       a libns:IlluminaResult .
153             ?flowcell libns:read_length ?read_length ;
154                       libns:flowcell_type ?flowcell_type .
155             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
156             FILTER(?filetype != libns:sequencer_result)
157         }
158         """
159         LOGGER.debug("find_archive_sequence_files query: %s",
160                      query_text)
161         query = RDF.SPARQLQuery(query_text)
162         results = []
163         for r in query.execute(self.model):
164             library_id = fromTypedNode(r['library_id'])
165             if library_id in result_map:
166                 seq = SequenceResult(r)
167                 LOGGER.debug("Creating sequence result for library %s: %s",
168                              library_id,
169                              repr(seq))
170                 results.append(seq)
171         return results
172
173     def import_libraries(self, result_map):
174         for lib_id in result_map.keys():
175             liburl = urljoin(self.host, 'library/%s/' % (lib_id,))
176             library = RDF.Node(RDF.Uri(liburl))
177             self.import_library(library)
178
179     def import_library(self, library):
180         """Import library data into our model if we don't have it already
181         """
182         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
183         present = False
184         if not self.model.contains_statement(q):
185             present = True
186             load_into_model(self.model, 'rdfa', library)
187         LOGGER.debug("Did we import %s: %s", library.uri, present)
188
189     def find_relevant_flowcell_ids(self):
190         """Generate set of flowcell ids that had samples of interest on them
191         """
192         flowcell_query = RDF.SPARQLQuery("""
193 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
194
195 select distinct ?flowcell ?flowcell_id
196 WHERE {
197   ?library a libns:Library ;
198            libns:has_lane ?lane .
199   ?lane libns:flowcell ?flowcell .
200   ?flowcell libns:flowcell_id ?flowcell_id .
201 }""")
202         flowcell_ids = set()
203         for r in flowcell_query.execute(self.model):
204             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
205             imported = False
206             a_lane = self.model.get_target(r['flowcell'],
207                                            libraryOntology['has_lane'])
208             if a_lane is None:
209                 imported = True
210                 # we lack information about which lanes were on this flowcell
211                 load_into_model(self.model, 'rdfa', r['flowcell'])
212             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
213                                                      imported))
214
215         return flowcell_ids
216
217     def import_sequences(self, flowcell_ids):
218         seq_dirs = []
219         for f in flowcell_ids:
220             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
221         sequences = scan_for_sequences(seq_dirs)
222         for seq in sequences:
223             seq.save_to_model(self.model, self.host)
224         update_model_sequence_library(self.model, self.host)
225
226     def update_fastq_targets(self, result_map, raw_files):
227         """Return list of fastq files that need to be built.
228
229         Also update model with link between illumina result files
230         and our target fastq file.
231         """
232         # find what targets we're missing
233         needed_targets = {}
234         for seq in raw_files:
235             if not seq.isgood:
236                 continue
237             filename_attributes = {
238                 'flowcell': seq.flowcell_id,
239                 'lib_id': seq.library_id,
240                 'lane': seq.lane_number,
241                 'read': seq.read,
242                 'cycle': seq.cycle,
243                 'compression_extension': COMPRESSION_EXTENSIONS[self.compression],
244                 'is_paired': seq.ispaired
245             }
246
247             fqName = FastqName(**filename_attributes)
248
249             result_dir = result_map[seq.library_id]
250             target_pathname = os.path.join(result_dir, fqName.filename)
251             if self.force or not os.path.exists(target_pathname):
252                 t = needed_targets.setdefault(target_pathname, {})
253                 t.setdefault(seq.filetype, []).append(seq)
254             self.add_target_source_links(target_pathname, seq)
255         return needed_targets
256
257     def add_target_source_links(self, target, seq):
258         """Add link between target pathname and the 'lane' that produced it
259         (note lane objects are now post demultiplexing.)
260         """
261         target_uri = 'file://' + smart_str(target)
262         target_node = RDF.Node(RDF.Uri(target_uri))
263         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
264         self.model.add_statement(source_stmt)
265
266     def condor_srf_to_fastq(self, sources, target_pathname):
267         if len(sources) > 1:
268             raise ValueError("srf to fastq can only handle one file")
269
270         mid_point = None
271         if sources[0].flowcell_id == '30DY0AAXX':
272             mid_point = 76
273
274         return {
275             'sources': [sources[0].path],
276             'pyscript': srf2fastq.__file__,
277             'flowcell': sources[0].flowcell_id,
278             'ispaired': sources[0].ispaired,
279             'target': target_pathname,
280             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
281             'mid': mid_point,
282             'force': self.force,
283         }
284
285     def condor_qseq_to_fastq(self, sources, target_pathname):
286         paths = []
287         for source in sources:
288             paths.append(source.path)
289         paths.sort()
290         return {
291             'pyscript': qseq2fastq.__file__,
292             'flowcell': sources[0].flowcell_id,
293             'target': target_pathname,
294             'sources': paths,
295             'ispaired': sources[0].ispaired,
296             'istar': len(sources) == 1,
297         }
298
299     def condor_desplit_fastq(self, sources, target_pathname):
300         paths = []
301         for source in sources:
302             paths.append(source.path)
303         paths.sort()
304         compression_argument = ''
305         if self.compression:
306             compression_argument = '--'+self.compression
307
308         return {
309             'pyscript': desplit_fastq.__file__,
310             'target': target_pathname,
311             'compression': compression_argument,
312             'sources': paths,
313             'ispaired': sources[0].ispaired,
314         }
315
316
317 def make_lane_dict(lib_db, lib_id):
318     """
319     Convert the lane_set in a lib_db to a dictionary
320     indexed by flowcell ID
321     """
322     result = []
323     for lane in lib_db[lib_id]['lane_set']:
324         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
325         lane['flowcell'] = flowcell_id
326         result.append((lane['flowcell'], lane))
327     return dict(result)
328
329 class SequenceResult(object):
330     """Convert the sparql query result from find_archive_sequence_files
331     """
332     def __init__(self, result):
333         self.filenode = result['filenode']
334         self._filetype = result['filetype']
335         self.cycle = fromTypedNode(result['cycle'])
336         self.lane_number = fromTypedNode(result['lane_number'])
337         self.read = fromTypedNode(result['read'])
338         if isinstance(self.read, six.string_types):
339             self.read = 1
340         self.library = result['library']
341         self.library_id = fromTypedNode(result['library_id'])
342         self.flowcell = result['flowcell']
343         self.flowcell_id = fromTypedNode(result['flowcell_id'])
344         self.flowcell_type = fromTypedNode(result['flowcell_type'])
345         self.flowcell_status = fromTypedNode(result['flowcell_status'])
346
347     def _is_good(self):
348         """is this sequence / flowcell 'good enough'"""
349         if self.flowcell_status is not None and \
350            self.flowcell_status.lower() == "failed":
351             return False
352         return True
353     isgood = property(_is_good)
354
355     def _get_ispaired(self):
356         if self.flowcell_type.lower() == "paired":
357             return True
358         else:
359             return False
360     ispaired = property(_get_ispaired)
361
362     def _get_filetype(self):
363         return strip_namespace(libraryOntology, self._filetype)
364     filetype = property(_get_filetype)
365
366     def _get_path(self):
367         url = urlparse(str(self.filenode.uri))
368         if url.scheme == 'file':
369             return url.path
370         else:
371             errmsg = u"Unsupported scheme {0} for {1}"
372             raise ValueError(errmsg.format(url.scheme, unicode(url)))
373     path = property(_get_path)
374
375     def __repr__(self):
376         return "SequenceResult({0},{1},{2})".format(
377             str(self.filenode),
378             str(self.library_id),
379             str(self.flowcell_id))