Use isinstance(object, (types)) pattern instead of type(object) == types.Type
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 import six
9 from six.moves.urllib.parse import urljoin, urlparse
10
11 from htsworkflow.pipelines.sequences import scan_for_sequences, \
12      update_model_sequence_library
13 from htsworkflow.pipelines.samplekey import SampleKey
14 from htsworkflow.pipelines import qseq2fastq
15 from htsworkflow.pipelines import srf2fastq
16 from htsworkflow.pipelines import desplit_fastq
17 from htsworkflow.submission.fastqname import FastqName
18 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
19      fromTypedNode, \
20      strip_namespace
21 from htsworkflow.util.rdfns import *
22 from htsworkflow.util.conversion import parse_flowcell_id
23
24 from django.conf import settings
25 from django.template import Context, loader
26 from django.utils.encoding import smart_str
27
28 import RDF
29
30 LOGGER = logging.getLogger(__name__)
31
32 COMPRESSION_EXTENSIONS = {
33     None: '',
34     'gzip': '.gz'
35 }
36
37 class CondorFastqExtract(object):
38     def __init__(self, host, sequences_path,
39                  log_path='log',
40                  model=None,
41                  compression=None,
42                  force=False):
43         """Extract fastqs from results archive
44
45         Args:
46           host (str): root of the htsworkflow api server
47           apidata (dict): id & key to post to the server
48           sequences_path (str): root of the directory tree to scan for files
49           log_path (str): where to put condor log files
50           compression (str): one of 'gzip', 'bzip2'
51           force (bool): do we force overwriting current files?
52         """
53         self.host = host
54         self.model = get_model(model)
55         self.sequences_path = sequences_path
56         self.log_path = log_path
57         self.compression=compression
58         self.force = force
59         LOGGER.info("CondorFastq host={0}".format(self.host))
60         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
61         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
62         LOGGER.info("Compression {0}".format(self.compression))
63
64     def create_scripts(self, result_map ):
65         """
66         Generate condor scripts to build any needed fastq files
67
68         Args:
69           result_map: htsworkflow.submission.results.ResultMap()
70         """
71         template_map = {'srf': 'srf.condor',
72                         'qseq': 'qseq.condor',
73                         'split_fastq': 'split_fastq.condor',
74                         }
75
76         env = None
77         pythonpath = os.environ.get('PYTHONPATH', None)
78         if pythonpath is not None:
79             env = "PYTHONPATH=%s" % (pythonpath,)
80         condor_entries = self.build_condor_arguments(result_map)
81         for script_type in template_map.keys():
82             template = loader.get_template(template_map[script_type])
83             variables = {'python': sys.executable,
84                          'logdir': self.log_path,
85                          'env': env,
86                          'args': condor_entries[script_type],
87                          'root_url': self.host,
88                          }
89             context = Context(variables)
90
91             with open(script_type + '.condor','w+') as outstream:
92                 outstream.write(template.render(context))
93
94     def build_condor_arguments(self, result_map):
95         condor_entries = {'srf': [],
96                           'qseq': [],
97                           'split_fastq': []}
98
99         conversion_funcs = {'srf': self.condor_srf_to_fastq,
100                             'qseq': self.condor_qseq_to_fastq,
101                             'split_fastq': self.condor_desplit_fastq
102                             }
103         sequences = self.find_archive_sequence_files(result_map)
104         needed_targets = self.update_fastq_targets(result_map, sequences)
105
106         for target_pathname, available_sources in needed_targets.items():
107             LOGGER.debug(' target : %s' % (target_pathname,))
108             LOGGER.debug(' candidate sources: %s' % (available_sources,))
109             for condor_type in available_sources.keys():
110                 conversion = conversion_funcs.get(condor_type, None)
111                 if conversion is None:
112                     errmsg = "Unrecognized type: {0} for {1}"
113                     LOGGER.error(errmsg.format(condor_type,
114                                         pformat(available_sources)))
115                     continue
116                 sources = available_sources.get(condor_type, None)
117
118                 if sources is not None:
119                     condor_entries.setdefault(condor_type, []).append(
120                         conversion(sources, target_pathname))
121             else:
122                 LOGGER.warn(" need file %s", target_pathname)
123
124         return condor_entries
125
126     def find_archive_sequence_files(self,  result_map):
127         """
128         Find archived sequence files associated with our results.
129         """
130         self.import_libraries(result_map)
131         flowcell_ids = self.find_relevant_flowcell_ids()
132         self.import_sequences(flowcell_ids)
133
134         query_text = """
135         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
136         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
137         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
138
139         select ?filenode ?filetype ?cycle ?lane_number ?read
140                ?library  ?library_id
141                ?flowcell ?flowcell_id ?read_length
142                ?flowcell_type ?flowcell_status
143         where {
144             ?filenode libns:cycle ?cycle ;
145                       libns:lane_number ?lane_number ;
146                       libns:read ?read ;
147                       libns:flowcell ?flowcell ;
148                       libns:flowcell_id ?flowcell_id ;
149                       libns:library ?library ;
150                       libns:library_id ?library_id ;
151                       libns:file_type ?filetype ;
152                       a libns:IlluminaResult .
153             ?flowcell libns:read_length ?read_length ;
154                       libns:flowcell_type ?flowcell_type .
155             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
156             FILTER(?filetype != libns:sequencer_result)
157         }
158         """
159         LOGGER.debug("find_archive_sequence_files query: %s",
160                      query_text)
161         query = RDF.SPARQLQuery(query_text)
162         results = []
163         for r in query.execute(self.model):
164             library_id = fromTypedNode(r['library_id'])
165             if library_id in result_map:
166                 seq = SequenceResult(r)
167                 LOGGER.debug("Creating sequence result for library %s: %s",
168                              library_id,
169                              repr(seq))
170                 results.append(seq)
171         return results
172
173     def import_libraries(self, result_map):
174         for lib_id in result_map.keys():
175             lib_id_encoded = lib_id.encode('utf-8')
176             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
177             library = RDF.Node(RDF.Uri(liburl))
178             self.import_library(library)
179
180     def import_library(self, library):
181         """Import library data into our model if we don't have it already
182         """
183         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
184         present = False
185         if not self.model.contains_statement(q):
186             present = True
187             load_into_model(self.model, 'rdfa', library)
188         LOGGER.debug("Did we import %s: %s", library.uri, present)
189
190     def find_relevant_flowcell_ids(self):
191         """Generate set of flowcell ids that had samples of interest on them
192         """
193         flowcell_query = RDF.SPARQLQuery("""
194 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
195
196 select distinct ?flowcell ?flowcell_id
197 WHERE {
198   ?library a libns:Library ;
199            libns:has_lane ?lane .
200   ?lane libns:flowcell ?flowcell .
201   ?flowcell libns:flowcell_id ?flowcell_id .
202 }""")
203         flowcell_ids = set()
204         for r in flowcell_query.execute(self.model):
205             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
206             imported = False
207             a_lane = self.model.get_target(r['flowcell'],
208                                            libraryOntology['has_lane'])
209             if a_lane is None:
210                 imported = True
211                 # we lack information about which lanes were on this flowcell
212                 load_into_model(self.model, 'rdfa', r['flowcell'])
213             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
214                                                      imported))
215
216         return flowcell_ids
217
218     def import_sequences(self, flowcell_ids):
219         seq_dirs = []
220         for f in flowcell_ids:
221             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
222         sequences = scan_for_sequences(seq_dirs)
223         for seq in sequences:
224             seq.save_to_model(self.model, self.host)
225         update_model_sequence_library(self.model, self.host)
226
227     def update_fastq_targets(self, result_map, raw_files):
228         """Return list of fastq files that need to be built.
229
230         Also update model with link between illumina result files
231         and our target fastq file.
232         """
233         # find what targets we're missing
234         needed_targets = {}
235         for seq in raw_files:
236             if not seq.isgood:
237                 continue
238             filename_attributes = {
239                 'flowcell': seq.flowcell_id,
240                 'lib_id': seq.library_id,
241                 'lane': seq.lane_number,
242                 'read': seq.read,
243                 'cycle': seq.cycle,
244                 'compression_extension': COMPRESSION_EXTENSIONS[self.compression],
245                 'is_paired': seq.ispaired
246             }
247
248             fqName = FastqName(**filename_attributes)
249
250             result_dir = result_map[seq.library_id]
251             target_pathname = os.path.join(result_dir, fqName.filename)
252             if self.force or not os.path.exists(target_pathname):
253                 t = needed_targets.setdefault(target_pathname, {})
254                 t.setdefault(seq.filetype, []).append(seq)
255             self.add_target_source_links(target_pathname, seq)
256         return needed_targets
257
258     def add_target_source_links(self, target, seq):
259         """Add link between target pathname and the 'lane' that produced it
260         (note lane objects are now post demultiplexing.)
261         """
262         target_uri = 'file://' + smart_str(target)
263         target_node = RDF.Node(RDF.Uri(target_uri))
264         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
265         self.model.add_statement(source_stmt)
266
267     def condor_srf_to_fastq(self, sources, target_pathname):
268         if len(sources) > 1:
269             raise ValueError("srf to fastq can only handle one file")
270
271         mid_point = None
272         if sources[0].flowcell_id == '30DY0AAXX':
273             mid_point = 76
274
275         return {
276             'sources': [sources[0].path],
277             'pyscript': srf2fastq.__file__,
278             'flowcell': sources[0].flowcell_id,
279             'ispaired': sources[0].ispaired,
280             'target': target_pathname,
281             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
282             'mid': mid_point,
283             'force': self.force,
284         }
285
286     def condor_qseq_to_fastq(self, sources, target_pathname):
287         paths = []
288         for source in sources:
289             paths.append(source.path)
290         paths.sort()
291         return {
292             'pyscript': qseq2fastq.__file__,
293             'flowcell': sources[0].flowcell_id,
294             'target': target_pathname,
295             'sources': paths,
296             'ispaired': sources[0].ispaired,
297             'istar': len(sources) == 1,
298         }
299
300     def condor_desplit_fastq(self, sources, target_pathname):
301         paths = []
302         for source in sources:
303             paths.append(source.path)
304         paths.sort()
305         compression_argument = ''
306         if self.compression:
307             compression_argument = '--'+self.compression
308
309         return {
310             'pyscript': desplit_fastq.__file__,
311             'target': target_pathname,
312             'compression': compression_argument,
313             'sources': paths,
314             'ispaired': sources[0].ispaired,
315         }
316
317
318 def make_lane_dict(lib_db, lib_id):
319     """
320     Convert the lane_set in a lib_db to a dictionary
321     indexed by flowcell ID
322     """
323     result = []
324     for lane in lib_db[lib_id]['lane_set']:
325         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
326         lane['flowcell'] = flowcell_id
327         result.append((lane['flowcell'], lane))
328     return dict(result)
329
330 class SequenceResult(object):
331     """Convert the sparql query result from find_archive_sequence_files
332     """
333     def __init__(self, result):
334         self.filenode = result['filenode']
335         self._filetype = result['filetype']
336         self.cycle = fromTypedNode(result['cycle'])
337         self.lane_number = fromTypedNode(result['lane_number'])
338         self.read = fromTypedNode(result['read'])
339         if isinstance(self.read, six.string_types):
340             self.read = 1
341         self.library = result['library']
342         self.library_id = fromTypedNode(result['library_id'])
343         self.flowcell = result['flowcell']
344         self.flowcell_id = fromTypedNode(result['flowcell_id'])
345         self.flowcell_type = fromTypedNode(result['flowcell_type'])
346         self.flowcell_status = fromTypedNode(result['flowcell_status'])
347
348     def _is_good(self):
349         """is this sequence / flowcell 'good enough'"""
350         if self.flowcell_status is not None and \
351            self.flowcell_status.lower() == "failed":
352             return False
353         return True
354     isgood = property(_is_good)
355
356     def _get_ispaired(self):
357         if self.flowcell_type.lower() == "paired":
358             return True
359         else:
360             return False
361     ispaired = property(_get_ispaired)
362
363     def _get_filetype(self):
364         return strip_namespace(libraryOntology, self._filetype)
365     filetype = property(_get_filetype)
366
367     def _get_path(self):
368         url = urlparse(str(self.filenode.uri))
369         if url.scheme == 'file':
370             return url.path
371         else:
372             errmsg = u"Unsupported scheme {0} for {1}"
373             raise ValueError(errmsg.format(url.scheme, unicode(url)))
374     path = property(_get_path)
375
376     def __repr__(self):
377         return "SequenceResult({0},{1},{2})".format(
378             str(self.filenode),
379             str(self.library_id),
380             str(self.flowcell_id))