Directly generate compressed fastq files from HiSeq split fastqs.
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.submission.fastqname import FastqName
17 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
18      fromTypedNode, \
19      strip_namespace
20 from htsworkflow.util.rdfns import *
21 from htsworkflow.util.conversion import parse_flowcell_id
22
23 from django.conf import settings
24 from django.template import Context, loader
25
26 import RDF
27
28 LOGGER = logging.getLogger(__name__)
29
30 COMPRESSION_EXTENSIONS = {
31     None: '',
32     'gzip': '.gz'
33 }
34
35 class CondorFastqExtract(object):
36     def __init__(self, host, sequences_path,
37                  log_path='log',
38                  model=None,
39                  compression=None,
40                  force=False):
41         """Extract fastqs from results archive
42
43         Args:
44           host (str): root of the htsworkflow api server
45           apidata (dict): id & key to post to the server
46           sequences_path (str): root of the directory tree to scan for files
47           log_path (str): where to put condor log files
48           compression (str): one of 'gzip', 'bzip2'
49           force (bool): do we force overwriting current files?
50         """
51         self.host = host
52         self.model = get_model(model)
53         self.sequences_path = sequences_path
54         self.log_path = log_path
55         self.compression=compression
56         self.force = force
57         LOGGER.info("CondorFastq host={0}".format(self.host))
58         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
59         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
60         LOGGER.info("Compression {0}".format(self.compression))
61
62     def create_scripts(self, result_map ):
63         """
64         Generate condor scripts to build any needed fastq files
65
66         Args:
67           result_map: htsworkflow.submission.results.ResultMap()
68         """
69         template_map = {'srf': 'srf.condor',
70                         'qseq': 'qseq.condor',
71                         'split_fastq': 'split_fastq.condor',
72                         }
73
74         env = None
75         pythonpath = os.environ.get('PYTHONPATH', None)
76         if pythonpath is not None:
77             env = "PYTHONPATH=%s" % (pythonpath,)
78         condor_entries = self.build_condor_arguments(result_map)
79         for script_type in template_map.keys():
80             template = loader.get_template(template_map[script_type])
81             variables = {'python': sys.executable,
82                          'logdir': self.log_path,
83                          'env': env,
84                          'args': condor_entries[script_type],
85                          'root_url': self.host,
86                          }
87             context = Context(variables)
88
89             with open(script_type + '.condor','w+') as outstream:
90                 outstream.write(template.render(context))
91
92     def build_condor_arguments(self, result_map):
93         condor_entries = {'srf': [],
94                           'qseq': [],
95                           'split_fastq': []}
96
97         conversion_funcs = {'srf': self.condor_srf_to_fastq,
98                             'qseq': self.condor_qseq_to_fastq,
99                             'split_fastq': self.condor_desplit_fastq
100                             }
101         sequences = self.find_archive_sequence_files(result_map)
102         needed_targets = self.update_fastq_targets(result_map, sequences)
103
104         for target_pathname, available_sources in needed_targets.items():
105             LOGGER.debug(' target : %s' % (target_pathname,))
106             LOGGER.debug(' candidate sources: %s' % (available_sources,))
107             for condor_type in available_sources.keys():
108                 conversion = conversion_funcs.get(condor_type, None)
109                 if conversion is None:
110                     errmsg = "Unrecognized type: {0} for {1}"
111                     LOGGER.error(errmsg.format(condor_type,
112                                         pformat(available_sources)))
113                     continue
114                 sources = available_sources.get(condor_type, None)
115
116                 if sources is not None:
117                     condor_entries.setdefault(condor_type, []).append(
118                         conversion(sources, target_pathname))
119             else:
120                 LOGGER.warn(" need file %s", target_pathname)
121
122         return condor_entries
123
124     def find_archive_sequence_files(self,  result_map):
125         """
126         Find archived sequence files associated with our results.
127         """
128         self.import_libraries(result_map)
129         flowcell_ids = self.find_relevant_flowcell_ids()
130         self.import_sequences(flowcell_ids)
131
132         query_text = """
133         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
134         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
135         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
136
137         select ?filenode ?filetype ?cycle ?lane_number ?read
138                ?library  ?library_id
139                ?flowcell ?flowcell_id ?read_length
140                ?flowcell_type ?flowcell_status
141         where {
142             ?filenode libns:cycle ?cycle ;
143                       libns:lane_number ?lane_number ;
144                       libns:read ?read ;
145                       libns:flowcell ?flowcell ;
146                       libns:flowcell_id ?flowcell_id ;
147                       libns:library ?library ;
148                       libns:library_id ?library_id ;
149                       libns:file_type ?filetype ;
150                       a libns:IlluminaResult .
151             ?flowcell libns:read_length ?read_length ;
152                       libns:flowcell_type ?flowcell_type .
153             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
154             FILTER(?filetype != libns:sequencer_result)
155         }
156         """
157         LOGGER.debug("find_archive_sequence_files query: %s",
158                      query_text)
159         query = RDF.SPARQLQuery(query_text)
160         results = []
161         for r in query.execute(self.model):
162             library_id = fromTypedNode(r['library_id'])
163             if library_id in result_map:
164                 seq = SequenceResult(r)
165                 LOGGER.debug("Creating sequence result for library %s: %s",
166                              library_id,
167                              repr(seq))
168                 results.append(seq)
169         return results
170
171     def import_libraries(self, result_map):
172         for lib_id in result_map.keys():
173             lib_id_encoded = lib_id.encode('utf-8')
174             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
175             library = RDF.Node(RDF.Uri(liburl))
176             self.import_library(library)
177
178     def import_library(self, library):
179         """Import library data into our model if we don't have it already
180         """
181         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
182         present = False
183         if not self.model.contains_statement(q):
184             present = True
185             load_into_model(self.model, 'rdfa', library)
186         LOGGER.debug("Did we import %s: %s", library.uri, present)
187
188     def find_relevant_flowcell_ids(self):
189         """Generate set of flowcell ids that had samples of interest on them
190         """
191         flowcell_query = RDF.SPARQLQuery("""
192 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
193
194 select distinct ?flowcell ?flowcell_id
195 WHERE {
196   ?library a libns:Library ;
197            libns:has_lane ?lane .
198   ?lane libns:flowcell ?flowcell .
199   ?flowcell libns:flowcell_id ?flowcell_id .
200 }""")
201         flowcell_ids = set()
202         for r in flowcell_query.execute(self.model):
203             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
204             imported = False
205             a_lane = self.model.get_target(r['flowcell'],
206                                            libraryOntology['has_lane'])
207             if a_lane is None:
208                 imported = True
209                 # we lack information about which lanes were on this flowcell
210                 load_into_model(self.model, 'rdfa', r['flowcell'])
211             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
212                                                      imported))
213
214         return flowcell_ids
215
216     def import_sequences(self, flowcell_ids):
217         seq_dirs = []
218         for f in flowcell_ids:
219             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
220         sequences = scan_for_sequences(seq_dirs)
221         for seq in sequences:
222             seq.save_to_model(self.model, self.host)
223         update_model_sequence_library(self.model, self.host)
224
225     def update_fastq_targets(self, result_map, raw_files):
226         """Return list of fastq files that need to be built.
227
228         Also update model with link between illumina result files
229         and our target fastq file.
230         """
231         # find what targets we're missing
232         needed_targets = {}
233         for seq in raw_files:
234             if not seq.isgood:
235                 continue
236             filename_attributes = {
237                 'flowcell': seq.flowcell_id,
238                 'lib_id': seq.library_id,
239                 'lane': seq.lane_number,
240                 'read': seq.read,
241                 'cycle': seq.cycle,
242                 'compression_extension': COMPRESSION_EXTENSIONS[self.compression],
243                 'is_paired': seq.ispaired
244             }
245
246             fqName = FastqName(**filename_attributes)
247
248             result_dir = result_map[seq.library_id]
249             target_pathname = os.path.join(result_dir, fqName.filename)
250             if self.force or not os.path.exists(target_pathname):
251                 t = needed_targets.setdefault(target_pathname, {})
252                 t.setdefault(seq.filetype, []).append(seq)
253             self.add_target_source_links(target_pathname, seq)
254         return needed_targets
255
256     def add_target_source_links(self, target, seq):
257         """Add link between target pathname and the 'lane' that produced it
258         (note lane objects are now post demultiplexing.)
259         """
260         target_uri = 'file://' + target.encode('utf-8')
261         target_node = RDF.Node(RDF.Uri(target_uri))
262         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
263         self.model.add_statement(source_stmt)
264
265     def condor_srf_to_fastq(self, sources, target_pathname):
266         if len(sources) > 1:
267             raise ValueError("srf to fastq can only handle one file")
268
269         mid_point = None
270         if sources[0].flowcell_id == '30DY0AAXX':
271             mid_point = 76
272
273         return {
274             'sources': [sources[0].path],
275             'pyscript': srf2fastq.__file__,
276             'flowcell': sources[0].flowcell_id,
277             'ispaired': sources[0].ispaired,
278             'target': target_pathname,
279             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
280             'mid': mid_point,
281             'force': self.force,
282         }
283
284     def condor_qseq_to_fastq(self, sources, target_pathname):
285         paths = []
286         for source in sources:
287             paths.append(source.path)
288         paths.sort()
289         return {
290             'pyscript': qseq2fastq.__file__,
291             'flowcell': sources[0].flowcell_id,
292             'target': target_pathname,
293             'sources': paths,
294             'ispaired': sources[0].ispaired,
295             'istar': len(sources) == 1,
296         }
297
298     def condor_desplit_fastq(self, sources, target_pathname):
299         paths = []
300         for source in sources:
301             paths.append(source.path)
302         paths.sort()
303         compression_argument = ''
304         if self.compression:
305             compression_argument = '--'+self.compression
306
307         return {
308             'pyscript': desplit_fastq.__file__,
309             'target': target_pathname,
310             'compression': compression_argument,
311             'sources': paths,
312             'ispaired': sources[0].ispaired,
313         }
314
315
316 def make_lane_dict(lib_db, lib_id):
317     """
318     Convert the lane_set in a lib_db to a dictionary
319     indexed by flowcell ID
320     """
321     result = []
322     for lane in lib_db[lib_id]['lane_set']:
323         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
324         lane['flowcell'] = flowcell_id
325         result.append((lane['flowcell'], lane))
326     return dict(result)
327
328 class SequenceResult(object):
329     """Convert the sparql query result from find_archive_sequence_files
330     """
331     def __init__(self, result):
332         self.filenode = result['filenode']
333         self._filetype = result['filetype']
334         self.cycle = fromTypedNode(result['cycle'])
335         self.lane_number = fromTypedNode(result['lane_number'])
336         self.read = fromTypedNode(result['read'])
337         if type(self.read) in types.StringTypes:
338             self.read = 1
339         self.library = result['library']
340         self.library_id = fromTypedNode(result['library_id'])
341         self.flowcell = result['flowcell']
342         self.flowcell_id = fromTypedNode(result['flowcell_id'])
343         self.flowcell_type = fromTypedNode(result['flowcell_type'])
344         self.flowcell_status = fromTypedNode(result['flowcell_status'])
345
346     def _is_good(self):
347         """is this sequence / flowcell 'good enough'"""
348         if self.flowcell_status is not None and \
349            self.flowcell_status.lower() == "failed":
350             return False
351         return True
352     isgood = property(_is_good)
353
354     def _get_ispaired(self):
355         if self.flowcell_type.lower() == "paired":
356             return True
357         else:
358             return False
359     ispaired = property(_get_ispaired)
360
361     def _get_filetype(self):
362         return strip_namespace(libraryOntology, self._filetype)
363     filetype = property(_get_filetype)
364
365     def _get_path(self):
366         url = urlparse(str(self.filenode.uri))
367         if url.scheme == 'file':
368             return url.path
369         else:
370             errmsg = u"Unsupported scheme {0} for {1}"
371             raise ValueError(errmsg.format(url.scheme, unicode(url)))
372     path = property(_get_path)
373
374     def __repr__(self):
375         return "SequenceResult({0},{1},{2})".format(
376             str(self.filenode),
377             str(self.library_id),
378             str(self.flowcell_id))