70ad32ecf09486a395ebc8c0927368ccc9ea057f
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat
6 import sys
7 import types
8
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.pipelines import desplit_fastq
13 from htsworkflow.util.api import HtswApi
14 from htsworkflow.util.conversion import parse_flowcell_id
15
16 from django.conf import settings
17 from django.template import Context, loader
18
19 LOGGER = logging.getLogger(__name__)
20
21 class CondorFastqExtract(object):
22     def __init__(self, host, apidata, sequences_path,
23                  log_path='log',
24                  force=False):
25         """Extract fastqs from results archive
26
27         Args:
28           host (str): root of the htsworkflow api server
29           apidata (dict): id & key to post to the server
30           sequences_path (str): root of the directory tree to scan for files
31           log_path (str): where to put condor log files
32           force (bool): do we force overwriting current files?
33         """
34         self.api = HtswApi(host, apidata)
35         self.sequences_path = sequences_path
36         self.log_path = log_path
37         self.force = force
38
39     def create_scripts(self, library_result_map ):
40         """
41         Generate condor scripts to build any needed fastq files
42
43         Args:
44           library_result_map (list):  [(library_id, destination directory), ...]
45         """
46         template_map = {'srf': 'srf.condor',
47                         'qseq': 'qseq.condor',
48                         'split_fastq': 'split_fastq.condor'}
49
50         condor_entries = self.build_condor_arguments(library_result_map)
51         for script_type in template_map.keys():
52             template = loader.get_template(template_map[script_type])
53             variables = {'python': sys.executable,
54                          'logdir': self.log_path,
55                          'env': os.environ.get('PYTHONPATH', None),
56                          'args': condor_entries[script_type],
57                          }
58
59             context = Context(variables)
60
61             with open(script_type + '.condor','w+') as outstream:
62                 outstream.write(template.render(context))
63
64     def build_condor_arguments(self, library_result_map):
65         condor_entries = {'srf': [],
66                           'qseq': [],
67                           'split_fastq': []}
68         conversion_funcs = {'srf': self.condor_srf_to_fastq,
69                             'qseq': self.condor_qseq_to_fastq,
70                             'split_fastq': self.condor_desplit_fastq
71                             }
72         lib_db = self.find_archive_sequence_files(library_result_map)
73
74         needed_targets = self.find_missing_targets(library_result_map, lib_db)
75
76         for target_pathname, available_sources in needed_targets.items():
77             LOGGER.debug(' target : %s' % (target_pathname,))
78             LOGGER.debug(' candidate sources: %s' % (available_sources,))
79             for condor_type in available_sources.keys():
80                 conversion = conversion_funcs.get(condor_type, None)
81                 if conversion is None:
82                     errmsg = "Unrecognized type: {0} for {1}"
83                     print errmsg.format(condor_type,
84                                         pformat(available_sources))
85                     continue
86                 sources = available_sources.get(condor_type, None)
87
88                 if sources is not None:
89                     condor_entries.setdefault(condor_type, []).append(
90                         conversion(sources, target_pathname))
91             else:
92                 print " need file", target_pathname
93
94         return condor_entries
95
96     def find_archive_sequence_files(self,  library_result_map):
97         """
98         Find archived sequence files associated with our results.
99         """
100         LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
101
102         lib_db = {}
103         seq_dirs = set()
104         candidate_lanes = {}
105         for lib_id, result_dir in library_result_map:
106             lib_info = self.api.get_library(lib_id)
107             lib_info['lanes'] = {}
108             lib_db[lib_id] = lib_info
109
110             for lane in lib_info['lane_set']:
111                 lane_key = (lane['flowcell'], lane['lane_number'])
112                 candidate_lanes[lane_key] = lib_id
113                 seq_dirs.add(os.path.join(self.sequences_path,
114                                              'flowcells',
115                                              lane['flowcell']))
116         LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
117         candidate_seq_list = scan_for_sequences(seq_dirs)
118
119         # at this point we have too many sequences as scan_for_sequences
120         # returns all the sequences in a flowcell directory
121         # so lets filter out the extras
122
123         for seq in candidate_seq_list:
124             lane_key = (seq.flowcell, seq.lane)
125             lib_id = candidate_lanes.get(lane_key, None)
126             if lib_id is not None:
127                 lib_info = lib_db[lib_id]
128                 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
129
130         return lib_db
131
132     def find_missing_targets(self, library_result_map, lib_db):
133         """
134         Check if the sequence file exists.
135         This requires computing what the sequence name is and checking
136         to see if it can be found in the sequence location.
137
138         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
139         """
140         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
141         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
142         # find what targets we're missing
143         needed_targets = {}
144         for lib_id, result_dir in library_result_map:
145             lib = lib_db[lib_id]
146             lane_dict = make_lane_dict(lib_db, lib_id)
147
148             for lane_key, sequences in lib['lanes'].items():
149                 for seq in sequences:
150                     seq.paired = lane_dict[seq.flowcell]['paired_end']
151                     lane_status = lane_dict[seq.flowcell]['status']
152
153                     if seq.paired and seq.read is None:
154                         seq.read = 1
155                     filename_attributes = {
156                         'flowcell': seq.flowcell,
157                         'lib_id': lib_id,
158                         'lane': seq.lane,
159                         'read': seq.read,
160                         'cycle': seq.cycle
161                         }
162                     # skip bad runs
163                     if lane_status == 'Failed':
164                         continue
165                     if seq.flowcell == '30DY0AAXX':
166                         # 30DY0 only ran for 151 bases instead of 152
167                         # it is actually 76 1st read, 75 2nd read
168                         seq.mid_point = 76
169
170                     # end filters
171                     if seq.paired:
172                         target_name = fastq_paired_template % \
173                                       filename_attributes
174                     else:
175                         target_name = fastq_single_template % \
176                                       filename_attributes
177
178                     target_pathname = os.path.join(result_dir, target_name)
179                     if self.force or not os.path.exists(target_pathname):
180                         t = needed_targets.setdefault(target_pathname, {})
181                         t.setdefault(seq.filetype, []).append(seq)
182
183         return needed_targets
184
185
186     def condor_srf_to_fastq(self, sources, target_pathname):
187         if len(sources) > 1:
188             raise ValueError("srf to fastq can only handle one file")
189
190         return {
191             'sources': [sources[0].path],
192             'pyscript': srf2fastq.__file__,
193             'flowcell': sources[0].flowcell,
194             'ispaired': sources[0].paired,
195             'target': target_pathname,
196             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
197             'mid': getattr(sources[0], 'mid_point', None),
198             'force': self.force,
199         }
200
201     def condor_qseq_to_fastq(self, sources, target_pathname):
202         paths = []
203         for source in sources:
204             paths.append(source.path)
205         paths.sort()
206         return {
207             'pyscript': qseq2fastq.__file__,
208             'flowcell': sources[0].flowcell,
209             'target': target_pathname,
210             'sources': paths,
211             'ispaired': sources[0].paired,
212             'istar': len(sources) == 1,
213         }
214
215     def condor_desplit_fastq(self, sources, target_pathname):
216         paths = []
217         for source in sources:
218             paths.append(source.path)
219         paths.sort()
220         return {
221             'pyscript': desplit_fastq.__file__,
222             'target': target_pathname,
223             'sources': paths,
224             'ispaired': sources[0].paired,
225         }
226
227 def make_lane_dict(lib_db, lib_id):
228     """
229     Convert the lane_set in a lib_db to a dictionary
230     indexed by flowcell ID
231     """
232     result = []
233     for lane in lib_db[lib_id]['lane_set']:
234         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
235         lane['flowcell'] = flowcell_id
236         result.append((lane['flowcell'], lane))
237     return dict(result)
238