fdac4ba6fa45784d8392af2d5c7a9d95e7c042f0
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 import six
9 from six.moves.urllib.parse import urljoin, urlparse
10
11 from htsworkflow.pipelines.sequences import scan_for_sequences, \
12      update_model_sequence_library
13 from htsworkflow.pipelines.samplekey import SampleKey
14 from htsworkflow.pipelines import qseq2fastq
15 from htsworkflow.pipelines import srf2fastq
16 from htsworkflow.pipelines import desplit_fastq
17 from htsworkflow.submission.fastqname import FastqName
18 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
19      fromTypedNode, \
20      strip_namespace
21 from htsworkflow.util.rdfns import *
22 from htsworkflow.util.conversion import parse_flowcell_id
23
24 from django.conf import settings
25 from django.template import Context, loader
26 from django.utils.encoding import smart_str
27
28 import RDF
29
30 LOGGER = logging.getLogger(__name__)
31
32 COMPRESSION_EXTENSIONS = {
33     None: '',
34     'gzip': '.gz'
35 }
36
37 class CondorFastqExtract(object):
38     def __init__(self, host, sequences_path,
39                  log_path='log',
40                  model=None,
41                  compression=None,
42                  force=False):
43         """Extract fastqs from results archive
44
45         Args:
46           host (str): root of the htsworkflow api server
47           apidata (dict): id & key to post to the server
48           sequences_path (str): root of the directory tree to scan for files
49           log_path (str): where to put condor log files
50           compression (str): one of 'gzip', 'bzip2'
51           force (bool): do we force overwriting current files?
52         """
53         self.host = host
54         self.model = get_model(model)
55         self.sequences_path = sequences_path
56         self.log_path = log_path
57         self.compression=compression
58         self.force = force
59         LOGGER.info("CondorFastq host={0}".format(self.host))
60         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
61         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
62         LOGGER.info("Compression {0}".format(self.compression))
63
64     def create_scripts(self, result_map ):
65         """
66         Generate condor scripts to build any needed fastq files
67
68         Args:
69           result_map: htsworkflow.submission.results.ResultMap()
70         """
71         template_map = {'srf': 'srf.condor',
72                         'qseq': 'qseq.condor',
73                         'split_fastq': 'split_fastq.condor',
74                         }
75
76         env = None
77         pythonpath = os.environ.get('PYTHONPATH', None)
78         if pythonpath is not None:
79             env = "PYTHONPATH=%s" % (pythonpath,)
80         condor_entries = self.build_condor_arguments(result_map)
81         for script_type in template_map.keys():
82             template = loader.get_template(template_map[script_type])
83             variables = {'python': sys.executable,
84                          'logdir': self.log_path,
85                          'env': env,
86                          'args': condor_entries[script_type],
87                          'root_url': self.host,
88                          }
89             context = Context(variables)
90
91             with open(script_type + '.condor','w+') as outstream:
92                 outstream.write(template.render(context))
93
94     def build_condor_arguments(self, result_map):
95         condor_entries = {'srf': [],
96                           'qseq': [],
97                           'split_fastq': []}
98
99         conversion_funcs = {'srf': self.condor_srf_to_fastq,
100                             'qseq': self.condor_qseq_to_fastq,
101                             'split_fastq': self.condor_desplit_fastq
102                             }
103         sequences = self.find_archive_sequence_files(result_map)
104         needed_targets = self.update_fastq_targets(result_map, sequences)
105
106         for target_pathname, available_sources in needed_targets.items():
107             LOGGER.debug(' target : %s' % (target_pathname,))
108             LOGGER.debug(' candidate sources: %s' % (available_sources,))
109             for condor_type in available_sources.keys():
110                 conversion = conversion_funcs.get(condor_type, None)
111                 if conversion is None:
112                     errmsg = "Unrecognized type: {0} for {1}"
113                     LOGGER.error(errmsg.format(condor_type,
114                                         pformat(available_sources)))
115                     continue
116                 sources = available_sources.get(condor_type, None)
117
118                 if sources is not None:
119                     condor_entries.setdefault(condor_type, []).append(
120                         conversion(sources, target_pathname))
121             else:
122                 LOGGER.warn(" need file %s", target_pathname)
123
124         return condor_entries
125
126     def find_archive_sequence_files(self,  result_map):
127         """
128         Find archived sequence files associated with our results.
129         """
130         self.import_libraries(result_map)
131         flowcell_ids = self.find_relevant_flowcell_ids()
132         self.import_sequences(flowcell_ids)
133
134         query_text = """
135         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
136         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
137         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
138
139         select ?filenode ?filetype ?cycle ?lane_number ?read
140                ?library  ?library_id
141                ?flowcell ?flowcell_id ?read_length
142                ?flowcell_type ?flowcell_status
143         where {
144             ?filenode libns:cycle ?cycle ;
145                       libns:lane_number ?lane_number ;
146                       libns:read ?read ;
147                       libns:flowcell ?flowcell ;
148                       libns:flowcell_id ?flowcell_id ;
149                       libns:library ?library ;
150                       libns:library_id ?library_id ;
151                       libns:file_type ?filetype ;
152                       a libns:IlluminaResult .
153             ?flowcell libns:read_length ?read_length ;
154                       libns:flowcell_type ?flowcell_type .
155             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
156             FILTER(?filetype != libns:sequencer_result)
157         }
158         """
159         LOGGER.debug("find_archive_sequence_files query: %s",
160                      query_text)
161         query = RDF.SPARQLQuery(query_text)
162         results = []
163         for r in query.execute(self.model):
164             library_id = fromTypedNode(r['library_id'])
165             if library_id in result_map:
166                 seq = SequenceResult(r)
167                 LOGGER.debug("Creating sequence result for library %s: %s",
168                              library_id,
169                              repr(seq))
170                 results.append(seq)
171         return results
172
173     def import_libraries(self, result_map):
174         for lib_id in result_map.keys():
175             liburl = urljoin(self.host, 'library/%s/' % (lib_id,))
176             library = RDF.Node(RDF.Uri(liburl))
177             self.import_library(library)
178
179     def import_library(self, library):
180         """Import library data into our model if we don't have it already
181         """
182         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
183         present = False
184         if not self.model.contains_statement(q):
185             present = True
186             load_into_model(self.model, 'rdfa', library)
187         LOGGER.debug("Did we import %s: %s", library.uri, present)
188
189     def find_relevant_flowcell_ids(self):
190         """Generate set of flowcell ids that had samples of interest on them
191         """
192         flowcell_query = RDF.SPARQLQuery("""
193 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
194
195 select distinct ?flowcell ?flowcell_id
196 WHERE {
197   ?library a libns:Library ;
198            libns:has_lane ?lane .
199   ?lane libns:flowcell ?flowcell .
200   ?flowcell libns:flowcell_id ?flowcell_id .
201 }""")
202         flowcell_ids = set()
203         for r in flowcell_query.execute(self.model):
204             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
205             imported = False
206             a_lane = self.model.get_target(r['flowcell'],
207                                            libraryOntology['has_lane'])
208             if a_lane is None:
209                 imported = True
210                 # we lack information about which lanes were on this flowcell
211                 load_into_model(self.model, 'rdfa', r['flowcell'])
212             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
213                                                      imported))
214
215         return flowcell_ids
216
217     def import_sequences(self, flowcell_ids):
218         seq_dirs = []
219         for f in flowcell_ids:
220             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
221         sequences = scan_for_sequences(seq_dirs)
222         for seq in sequences:
223             seq.save_to_model(self.model, self.host)
224         update_model_sequence_library(self.model, self.host)
225
226     def update_fastq_targets(self, result_map, raw_files):
227         """Return list of fastq files that need to be built.
228
229         Also update model with link between illumina result files
230         and our target fastq file.
231         """
232         # find what targets we're missing
233         needed_targets = {}
234         for seq in raw_files:
235             if not seq.isgood:
236                 continue
237             filename_attributes = {
238                 'flowcell': seq.flowcell_id,
239                 'lib_id': seq.library_id,
240                 'lane': seq.lane_number,
241                 'read': seq.read,
242                 'cycle': seq.cycle,
243                 'compression_extension': COMPRESSION_EXTENSIONS[self.compression],
244                 'is_paired': seq.ispaired
245             }
246
247             fqName = FastqName(**filename_attributes)
248
249             result_dir = result_map[seq.library_id]
250             target_pathname = os.path.join(result_dir, fqName.filename)
251             if self.force or not os.path.exists(target_pathname):
252                 t = needed_targets.setdefault(target_pathname, {})
253                 t.setdefault(seq.filetype, []).append(seq)
254             self.add_target_source_links(target_pathname, seq)
255         return needed_targets
256
257     def add_target_source_links(self, target, seq):
258         """Add link between target pathname and the 'lane' that produced it
259         (note lane objects are now post demultiplexing.)
260         """
261         target_uri = 'file://' + smart_str(target)
262         target_node = RDF.Node(RDF.Uri(target_uri))
263         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
264         self.model.add_statement(source_stmt)
265
266     def condor_srf_to_fastq(self, sources, target_pathname):
267         if len(sources) > 1:
268             raise ValueError("srf to fastq can only handle one file")
269
270         mid_point = None
271         if sources[0].flowcell_id == '30DY0AAXX':
272             mid_point = 76
273
274         return {
275             'sources': [sources[0].path],
276             'pyscript': srf2fastq.__file__,
277             'flowcell': sources[0].flowcell_id,
278             'ispaired': sources[0].ispaired,
279             'target': target_pathname,
280             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
281             'mid': mid_point,
282             'force': self.force,
283         }
284
285     def condor_qseq_to_fastq(self, sources, target_pathname):
286         paths = []
287         for source in sources:
288             paths.append(source.path)
289         paths.sort()
290         compression_argument = self.format_compression_flag()
291
292         return {
293             'pyscript': qseq2fastq.__file__,
294             'flowcell': sources[0].flowcell_id,
295             'target': target_pathname,
296             'compression': compression_argument,
297             'sources': paths,
298             'ispaired': sources[0].ispaired,
299             'istar': len(sources) == 1,
300         }
301
302     def condor_desplit_fastq(self, sources, target_pathname):
303         paths = []
304         for source in sources:
305             paths.append(source.path)
306         paths.sort()
307         compression_argument = self.format_compression_flag()
308
309         return {
310             'pyscript': desplit_fastq.__file__,
311             'target': target_pathname,
312             'compression': compression_argument,
313             'sources': paths,
314             'ispaired': sources[0].ispaired,
315         }
316
317     def format_compression_flag(self):
318         return '--'+self.compression if self.compression else ''
319
320
321 def make_lane_dict(lib_db, lib_id):
322     """
323     Convert the lane_set in a lib_db to a dictionary
324     indexed by flowcell ID
325     """
326     result = []
327     for lane in lib_db[lib_id]['lane_set']:
328         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
329         lane['flowcell'] = flowcell_id
330         result.append((lane['flowcell'], lane))
331     return dict(result)
332
333 class SequenceResult(object):
334     """Convert the sparql query result from find_archive_sequence_files
335     """
336     def __init__(self, result):
337         self.filenode = result['filenode']
338         self._filetype = result['filetype']
339         self.cycle = fromTypedNode(result['cycle'])
340         self.lane_number = fromTypedNode(result['lane_number'])
341         self.read = fromTypedNode(result['read'])
342         if isinstance(self.read, six.string_types):
343             self.read = 1
344         self.library = result['library']
345         self.library_id = fromTypedNode(result['library_id'])
346         self.flowcell = result['flowcell']
347         self.flowcell_id = fromTypedNode(result['flowcell_id'])
348         self.flowcell_type = fromTypedNode(result['flowcell_type'])
349         self.flowcell_status = fromTypedNode(result['flowcell_status'])
350
351     def _is_good(self):
352         """is this sequence / flowcell 'good enough'"""
353         if self.flowcell_status is not None and \
354            self.flowcell_status.lower() == "failed":
355             return False
356         return True
357     isgood = property(_is_good)
358
359     def _get_ispaired(self):
360         if self.flowcell_type.lower() == "paired":
361             return True
362         else:
363             return False
364     ispaired = property(_get_ispaired)
365
366     def _get_filetype(self):
367         return strip_namespace(libraryOntology, self._filetype)
368     filetype = property(_get_filetype)
369
370     def _get_path(self):
371         url = urlparse(str(self.filenode.uri))
372         if url.scheme == 'file':
373             return url.path
374         else:
375             errmsg = u"Unsupported scheme {0} for {1}"
376             raise ValueError(errmsg.format(url.scheme, unicode(url)))
377     path = property(_get_path)
378
379     def __repr__(self):
380         return "SequenceResult({0},{1},{2})".format(
381             str(self.filenode),
382             str(self.library_id),
383             str(self.flowcell_id))