64eb6a1892a93ab086adb377552db4a828a2a974
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat
6 import sys
7 import types
8
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines.samplekey import SampleKey
11 from htsworkflow.pipelines import qseq2fastq
12 from htsworkflow.pipelines import srf2fastq
13 from htsworkflow.pipelines import desplit_fastq
14 from htsworkflow.util.api import HtswApi
15 from htsworkflow.util.conversion import parse_flowcell_id
16
17 from django.conf import settings
18 from django.template import Context, loader
19
20 LOGGER = logging.getLogger(__name__)
21
22
23 class CondorFastqExtract(object):
24     def __init__(self, host, apidata, sequences_path,
25                  log_path='log',
26                  force=False):
27         """Extract fastqs from results archive
28
29         Args:
30           host (str): root of the htsworkflow api server
31           apidata (dict): id & key to post to the server
32           sequences_path (str): root of the directory tree to scan for files
33           log_path (str): where to put condor log files
34           force (bool): do we force overwriting current files?
35         """
36         self.api = HtswApi(host, apidata)
37         self.sequences_path = sequences_path
38         self.log_path = log_path
39         self.force = force
40
41     def create_scripts(self, result_map ):
42         """
43         Generate condor scripts to build any needed fastq files
44
45         Args:
46           result_map: htsworkflow.submission.results.ResultMap()
47         """
48         template_map = {'srf': 'srf.condor',
49                         'qseq': 'qseq.condor',
50                         'split_fastq': 'split_fastq.condor',
51                         'by_sample': 'lane_to_fastq.turtle',
52                         }
53
54         env = None
55         pythonpath = os.environ.get('PYTHONPATH', None)
56         if pythonpath is not None:
57             env = "PYTHONPATH=%s" % (pythonpath,)
58         condor_entries = self.build_condor_arguments(result_map)
59         for script_type in template_map.keys():
60             template = loader.get_template(template_map[script_type])
61             variables = {'python': sys.executable,
62                          'logdir': self.log_path,
63                          'env': env,
64                          'args': condor_entries[script_type],
65                          'root_url': self.api.root_url,
66                          }
67             context = Context(variables)
68
69             with open(script_type + '.condor','w+') as outstream:
70                 outstream.write(template.render(context))
71
72     def build_condor_arguments(self, result_map):
73         condor_entries = {'srf': [],
74                           'qseq': [],
75                           'split_fastq': []}
76
77         conversion_funcs = {'srf': self.condor_srf_to_fastq,
78                             'qseq': self.condor_qseq_to_fastq,
79                             'split_fastq': self.condor_desplit_fastq
80                             }
81         by_sample = {}
82         lib_db = self.find_archive_sequence_files(result_map)
83         needed_targets = self.find_missing_targets(result_map, lib_db)
84
85         for target_pathname, available_sources in needed_targets.items():
86             LOGGER.debug(' target : %s' % (target_pathname,))
87             LOGGER.debug(' candidate sources: %s' % (available_sources,))
88             for condor_type in available_sources.keys():
89                 conversion = conversion_funcs.get(condor_type, None)
90                 if conversion is None:
91                     errmsg = "Unrecognized type: {0} for {1}"
92                     print errmsg.format(condor_type,
93                                         pformat(available_sources))
94                     continue
95                 sources = available_sources.get(condor_type, None)
96
97                 if sources is not None:
98                     condor_entries.setdefault(condor_type, []).append(
99                         conversion(sources, target_pathname))
100                     for s in sources:
101                         by_sample.setdefault(s.lane_id,[]).append(
102                             target_pathname)
103             else:
104                 print " need file", target_pathname
105
106         condor_entries['by_sample'] = by_sample
107         return condor_entries
108
109     def find_archive_sequence_files(self,  result_map):
110         """
111         Find archived sequence files associated with our results.
112         """
113         LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
114
115         lib_db = {}
116         seq_dirs = set()
117         candidate_lanes = {}
118         for lib_id in result_map.keys():
119             lib_info = self.api.get_library(lib_id)
120             lib_info['lanes'] = {}
121             lib_db[lib_id] = lib_info
122
123             for lane in lib_info['lane_set']:
124                 lane_key = (lane['flowcell'], lane['lane_number'])
125                 candidate_lanes[lane_key] = (lib_id, lane['lane_id'])
126                 seq_dirs.add(os.path.join(self.sequences_path,
127                                              'flowcells',
128                                              lane['flowcell']))
129         LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
130         candidate_seq_list = scan_for_sequences(seq_dirs)
131
132         # at this point we have too many sequences as scan_for_sequences
133         # returns all the sequences in a flowcell directory
134         # so lets filter out the extras
135
136         for seq in candidate_seq_list:
137             lane_key = (seq.flowcell, seq.lane)
138             candidate_key = candidate_lanes.get(lane_key, None)
139             if candidate_key is not None:
140                 lib_id, lane_id = candidate_key
141                 seq.lane_id = lane_id
142                 lib_info = lib_db[lib_id]
143                 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
144
145         return lib_db
146
147     def find_missing_targets(self, result_map, lib_db):
148         """
149         Check if the sequence file exists.
150         This requires computing what the sequence name is and checking
151         to see if it can be found in the sequence location.
152
153         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
154         """
155         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
156         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
157         # find what targets we're missing
158         needed_targets = {}
159         for lib_id in result_map.keys():
160             result_dir = result_map[lib_id]
161             lib = lib_db[lib_id]
162             lane_dict = make_lane_dict(lib_db, lib_id)
163
164             for lane_key, sequences in lib['lanes'].items():
165                 for seq in sequences:
166                     seq.paired = lane_dict[seq.flowcell]['paired_end']
167                     lane_status = lane_dict[seq.flowcell]['status']
168
169                     if seq.paired and seq.read is None:
170                         seq.read = 1
171                     filename_attributes = {
172                         'flowcell': seq.flowcell,
173                         'lib_id': lib_id,
174                         'lane': seq.lane,
175                         'read': seq.read,
176                         'cycle': seq.cycle
177                         }
178                     # skip bad runs
179                     if lane_status == 'Failed':
180                         continue
181                     if seq.flowcell == '30DY0AAXX':
182                         # 30DY0 only ran for 151 bases instead of 152
183                         # it is actually 76 1st read, 75 2nd read
184                         seq.mid_point = 76
185
186                     # end filters
187                     if seq.paired:
188                         target_name = fastq_paired_template % \
189                                       filename_attributes
190                     else:
191                         target_name = fastq_single_template % \
192                                       filename_attributes
193
194                     target_pathname = os.path.join(result_dir, target_name)
195                     if self.force or not os.path.exists(target_pathname):
196                         t = needed_targets.setdefault(target_pathname, {})
197                         t.setdefault(seq.filetype, []).append(seq)
198
199         return needed_targets
200
201
202     def condor_srf_to_fastq(self, sources, target_pathname):
203         if len(sources) > 1:
204             raise ValueError("srf to fastq can only handle one file")
205
206         return {
207             'sources': [os.path.abspath(sources[0].path)],
208             'pyscript': srf2fastq.__file__,
209             'flowcell': sources[0].flowcell,
210             'ispaired': sources[0].paired,
211             'target': target_pathname,
212             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
213             'mid': getattr(sources[0], 'mid_point', None),
214             'force': self.force,
215         }
216
217     def condor_qseq_to_fastq(self, sources, target_pathname):
218         paths = []
219         for source in sources:
220             paths.append(source.path)
221         paths.sort()
222         return {
223             'pyscript': qseq2fastq.__file__,
224             'flowcell': sources[0].flowcell,
225             'target': target_pathname,
226             'sources': paths,
227             'ispaired': sources[0].paired,
228             'istar': len(sources) == 1,
229         }
230
231     def condor_desplit_fastq(self, sources, target_pathname):
232         paths = []
233         for source in sources:
234             paths.append(source.path)
235         paths.sort()
236         return {
237             'pyscript': desplit_fastq.__file__,
238             'target': target_pathname,
239             'sources': paths,
240             'ispaired': sources[0].paired,
241         }
242
243     def lane_rdf(self, sources, target_pathname):
244         pass
245
246 def make_lane_dict(lib_db, lib_id):
247     """
248     Convert the lane_set in a lib_db to a dictionary
249     indexed by flowcell ID
250     """
251     result = []
252     for lane in lib_db[lib_id]['lane_set']:
253         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
254         lane['flowcell'] = flowcell_id
255         result.append((lane['flowcell'], lane))
256     return dict(result)
257