Create a lane to file name turtle data file
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat
6 import sys
7 import types
8
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines.samplekey import SampleKey
11 from htsworkflow.pipelines import qseq2fastq
12 from htsworkflow.pipelines import srf2fastq
13 from htsworkflow.pipelines import desplit_fastq
14 from htsworkflow.util.api import HtswApi
15 from htsworkflow.util.conversion import parse_flowcell_id
16
17 from django.conf import settings
18 from django.template import Context, loader
19
20 LOGGER = logging.getLogger(__name__)
21
22
23 class CondorFastqExtract(object):
24     def __init__(self, host, apidata, sequences_path,
25                  log_path='log',
26                  force=False):
27         """Extract fastqs from results archive
28
29         Args:
30           host (str): root of the htsworkflow api server
31           apidata (dict): id & key to post to the server
32           sequences_path (str): root of the directory tree to scan for files
33           log_path (str): where to put condor log files
34           force (bool): do we force overwriting current files?
35         """
36         self.api = HtswApi(host, apidata)
37         self.sequences_path = sequences_path
38         self.log_path = log_path
39         self.force = force
40
41     def create_scripts(self, result_map ):
42         """
43         Generate condor scripts to build any needed fastq files
44
45         Args:
46           result_map: htsworkflow.submission.results.ResultMap()
47         """
48         template_map = {'srf': 'srf.condor',
49                         'qseq': 'qseq.condor',
50                         'split_fastq': 'split_fastq.condor',
51                         'by_sample': 'lane_to_fastq.turtle',
52                         }
53
54         condor_entries = self.build_condor_arguments(result_map)
55         for script_type in template_map.keys():
56             template = loader.get_template(template_map[script_type])
57             variables = {'python': sys.executable,
58                          'logdir': self.log_path,
59                          'env': os.environ.get('PYTHONPATH', None),
60                          'args': condor_entries[script_type],
61                          'root_url': self.api.root_url,
62                          }
63             context = Context(variables)
64
65             with open(script_type + '.condor','w+') as outstream:
66                 outstream.write(template.render(context))
67
68     def build_condor_arguments(self, result_map):
69         condor_entries = {'srf': [],
70                           'qseq': [],
71                           'split_fastq': []}
72
73         conversion_funcs = {'srf': self.condor_srf_to_fastq,
74                             'qseq': self.condor_qseq_to_fastq,
75                             'split_fastq': self.condor_desplit_fastq
76                             }
77         by_sample = {}
78         lib_db = self.find_archive_sequence_files(result_map)
79         needed_targets = self.find_missing_targets(result_map, lib_db)
80
81         for target_pathname, available_sources in needed_targets.items():
82             LOGGER.debug(' target : %s' % (target_pathname,))
83             LOGGER.debug(' candidate sources: %s' % (available_sources,))
84             for condor_type in available_sources.keys():
85                 conversion = conversion_funcs.get(condor_type, None)
86                 if conversion is None:
87                     errmsg = "Unrecognized type: {0} for {1}"
88                     print errmsg.format(condor_type,
89                                         pformat(available_sources))
90                     continue
91                 sources = available_sources.get(condor_type, None)
92
93                 if sources is not None:
94                     condor_entries.setdefault(condor_type, []).append(
95                         conversion(sources, target_pathname))
96                     for s in sources:
97                         by_sample.setdefault(s.lane_id,[]).append(
98                             target_pathname)
99             else:
100                 print " need file", target_pathname
101
102         condor_entries['by_sample'] = by_sample
103         return condor_entries
104
105     def find_archive_sequence_files(self,  result_map):
106         """
107         Find archived sequence files associated with our results.
108         """
109         LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
110
111         lib_db = {}
112         seq_dirs = set()
113         candidate_lanes = {}
114         for lib_id in result_map.keys():
115             lib_info = self.api.get_library(lib_id)
116             lib_info['lanes'] = {}
117             lib_db[lib_id] = lib_info
118
119             for lane in lib_info['lane_set']:
120                 lane_key = (lane['flowcell'], lane['lane_number'])
121                 candidate_lanes[lane_key] = (lib_id, lane['lane_id'])
122                 seq_dirs.add(os.path.join(self.sequences_path,
123                                              'flowcells',
124                                              lane['flowcell']))
125         LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
126         candidate_seq_list = scan_for_sequences(seq_dirs)
127
128         # at this point we have too many sequences as scan_for_sequences
129         # returns all the sequences in a flowcell directory
130         # so lets filter out the extras
131
132         for seq in candidate_seq_list:
133             lane_key = (seq.flowcell, seq.lane)
134             candidate_key = candidate_lanes.get(lane_key, None)
135             if candidate_key is not None:
136                 lib_id, lane_id = candidate_key
137                 seq.lane_id = lane_id
138                 lib_info = lib_db[lib_id]
139                 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
140
141         return lib_db
142
143     def find_missing_targets(self, result_map, lib_db):
144         """
145         Check if the sequence file exists.
146         This requires computing what the sequence name is and checking
147         to see if it can be found in the sequence location.
148
149         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
150         """
151         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
152         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
153         # find what targets we're missing
154         needed_targets = {}
155         for lib_id in result_map.keys():
156             result_dir = result_map[lib_id]
157             lib = lib_db[lib_id]
158             lane_dict = make_lane_dict(lib_db, lib_id)
159
160             for lane_key, sequences in lib['lanes'].items():
161                 for seq in sequences:
162                     seq.paired = lane_dict[seq.flowcell]['paired_end']
163                     lane_status = lane_dict[seq.flowcell]['status']
164
165                     if seq.paired and seq.read is None:
166                         seq.read = 1
167                     filename_attributes = {
168                         'flowcell': seq.flowcell,
169                         'lib_id': lib_id,
170                         'lane': seq.lane,
171                         'read': seq.read,
172                         'cycle': seq.cycle
173                         }
174                     # skip bad runs
175                     if lane_status == 'Failed':
176                         continue
177                     if seq.flowcell == '30DY0AAXX':
178                         # 30DY0 only ran for 151 bases instead of 152
179                         # it is actually 76 1st read, 75 2nd read
180                         seq.mid_point = 76
181
182                     # end filters
183                     if seq.paired:
184                         target_name = fastq_paired_template % \
185                                       filename_attributes
186                     else:
187                         target_name = fastq_single_template % \
188                                       filename_attributes
189
190                     target_pathname = os.path.join(result_dir, target_name)
191                     if self.force or not os.path.exists(target_pathname):
192                         t = needed_targets.setdefault(target_pathname, {})
193                         t.setdefault(seq.filetype, []).append(seq)
194
195         return needed_targets
196
197
198     def condor_srf_to_fastq(self, sources, target_pathname):
199         if len(sources) > 1:
200             raise ValueError("srf to fastq can only handle one file")
201
202         return {
203             'sources': [os.path.abspath(sources[0].path)],
204             'pyscript': srf2fastq.__file__,
205             'flowcell': sources[0].flowcell,
206             'ispaired': sources[0].paired,
207             'target': target_pathname,
208             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
209             'mid': getattr(sources[0], 'mid_point', None),
210             'force': self.force,
211         }
212
213     def condor_qseq_to_fastq(self, sources, target_pathname):
214         paths = []
215         for source in sources:
216             paths.append(source.path)
217         paths.sort()
218         return {
219             'pyscript': qseq2fastq.__file__,
220             'flowcell': sources[0].flowcell,
221             'target': target_pathname,
222             'sources': paths,
223             'ispaired': sources[0].paired,
224             'istar': len(sources) == 1,
225         }
226
227     def condor_desplit_fastq(self, sources, target_pathname):
228         paths = []
229         for source in sources:
230             paths.append(source.path)
231         paths.sort()
232         return {
233             'pyscript': desplit_fastq.__file__,
234             'target': target_pathname,
235             'sources': paths,
236             'ispaired': sources[0].paired,
237         }
238
239     def lane_rdf(self, sources, target_pathname):
240         pass
241
242 def make_lane_dict(lib_db, lib_id):
243     """
244     Convert the lane_set in a lib_db to a dictionary
245     indexed by flowcell ID
246     """
247     result = []
248     for lane in lib_db[lib_id]['lane_set']:
249         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
250         lane['flowcell'] = flowcell_id
251         result.append((lane['flowcell'], lane))
252     return dict(result)
253