Move code out of ucsc_gather and into the htsworkflow tree.
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat
6 import sys
7 import types
8
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.pipelines import desplit_fastq
13 from htsworkflow.util.api import HtswApi
14 from htsworkflow.util.conversion import parse_flowcell_id
15
16 from django.conf import settings
17 from django.template import Context, loader
18
19 LOGGER = logging.getLogger(__name__)
20
21 class CondorFastqExtract(object):
22     def __init__(self, host, apidata, sequences_path,
23                  log_path='log',
24                  force=False):
25         """Extract fastqs from results archive
26
27         Args:
28           host (str): root of the htsworkflow api server
29           apidata (dict): id & key to post to the server
30           sequences_path (str): root of the directory tree to scan for files
31           log_path (str): where to put condor log files
32           force (bool): do we force overwriting current files?
33         """
34         self.api = HtswApi(host, apidata)
35         self.sequences_path = sequences_path
36         self.log_path = log_path
37         self.force = force
38
39     def create_scripts(self, result_map ):
40         """
41         Generate condor scripts to build any needed fastq files
42
43         Args:
44           result_map: htsworkflow.submission.results.ResultMap()
45         """
46         template_map = {'srf': 'srf.condor',
47                         'qseq': 'qseq.condor',
48                         'split_fastq': 'split_fastq.condor'}
49
50         condor_entries = self.build_condor_arguments(result_map)
51         for script_type in template_map.keys():
52             template = loader.get_template(template_map[script_type])
53             variables = {'python': sys.executable,
54                          'logdir': self.log_path,
55                          'env': os.environ.get('PYTHONPATH', None),
56                          'args': condor_entries[script_type],
57                          }
58
59             context = Context(variables)
60
61             with open(script_type + '.condor','w+') as outstream:
62                 outstream.write(template.render(context))
63
64     def build_condor_arguments(self, result_map):
65         condor_entries = {'srf': [],
66                           'qseq': [],
67                           'split_fastq': []}
68         conversion_funcs = {'srf': self.condor_srf_to_fastq,
69                             'qseq': self.condor_qseq_to_fastq,
70                             'split_fastq': self.condor_desplit_fastq
71                             }
72
73         lib_db = self.find_archive_sequence_files(result_map)
74         needed_targets = self.find_missing_targets(result_map, lib_db)
75
76         for target_pathname, available_sources in needed_targets.items():
77             LOGGER.debug(' target : %s' % (target_pathname,))
78             LOGGER.debug(' candidate sources: %s' % (available_sources,))
79             for condor_type in available_sources.keys():
80                 conversion = conversion_funcs.get(condor_type, None)
81                 if conversion is None:
82                     errmsg = "Unrecognized type: {0} for {1}"
83                     print errmsg.format(condor_type,
84                                         pformat(available_sources))
85                     continue
86                 sources = available_sources.get(condor_type, None)
87
88                 if sources is not None:
89                     condor_entries.setdefault(condor_type, []).append(
90                         conversion(sources, target_pathname))
91             else:
92                 print " need file", target_pathname
93
94         return condor_entries
95
96     def find_archive_sequence_files(self,  result_map):
97         """
98         Find archived sequence files associated with our results.
99         """
100         LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
101
102         lib_db = {}
103         seq_dirs = set()
104         candidate_lanes = {}
105         for lib_id in result_map.keys():
106             lib_info = self.api.get_library(lib_id)
107             lib_info['lanes'] = {}
108             lib_db[lib_id] = lib_info
109
110             for lane in lib_info['lane_set']:
111                 lane_key = (lane['flowcell'], lane['lane_number'])
112                 candidate_lanes[lane_key] = lib_id
113                 seq_dirs.add(os.path.join(self.sequences_path,
114                                              'flowcells',
115                                              lane['flowcell']))
116         LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
117         candidate_seq_list = scan_for_sequences(seq_dirs)
118
119         # at this point we have too many sequences as scan_for_sequences
120         # returns all the sequences in a flowcell directory
121         # so lets filter out the extras
122
123         for seq in candidate_seq_list:
124             lane_key = (seq.flowcell, seq.lane)
125             lib_id = candidate_lanes.get(lane_key, None)
126             if lib_id is not None:
127                 lib_info = lib_db[lib_id]
128                 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
129
130         return lib_db
131
132     def find_missing_targets(self, result_map, lib_db):
133         """
134         Check if the sequence file exists.
135         This requires computing what the sequence name is and checking
136         to see if it can be found in the sequence location.
137
138         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
139         """
140         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
141         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
142         # find what targets we're missing
143         needed_targets = {}
144         for lib_id in result_map.keys():
145             result_dir = result_map[lib_id]
146             lib = lib_db[lib_id]
147             lane_dict = make_lane_dict(lib_db, lib_id)
148
149             for lane_key, sequences in lib['lanes'].items():
150                 for seq in sequences:
151                     seq.paired = lane_dict[seq.flowcell]['paired_end']
152                     lane_status = lane_dict[seq.flowcell]['status']
153
154                     if seq.paired and seq.read is None:
155                         seq.read = 1
156                     filename_attributes = {
157                         'flowcell': seq.flowcell,
158                         'lib_id': lib_id,
159                         'lane': seq.lane,
160                         'read': seq.read,
161                         'cycle': seq.cycle
162                         }
163                     # skip bad runs
164                     if lane_status == 'Failed':
165                         continue
166                     if seq.flowcell == '30DY0AAXX':
167                         # 30DY0 only ran for 151 bases instead of 152
168                         # it is actually 76 1st read, 75 2nd read
169                         seq.mid_point = 76
170
171                     # end filters
172                     if seq.paired:
173                         target_name = fastq_paired_template % \
174                                       filename_attributes
175                     else:
176                         target_name = fastq_single_template % \
177                                       filename_attributes
178
179                     target_pathname = os.path.join(result_dir, target_name)
180                     if self.force or not os.path.exists(target_pathname):
181                         t = needed_targets.setdefault(target_pathname, {})
182                         t.setdefault(seq.filetype, []).append(seq)
183
184         return needed_targets
185
186
187     def condor_srf_to_fastq(self, sources, target_pathname):
188         if len(sources) > 1:
189             raise ValueError("srf to fastq can only handle one file")
190
191         return {
192             'sources': [os.path.abspath(sources[0].path)],
193             'pyscript': srf2fastq.__file__,
194             'flowcell': sources[0].flowcell,
195             'ispaired': sources[0].paired,
196             'target': target_pathname,
197             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
198             'mid': getattr(sources[0], 'mid_point', None),
199             'force': self.force,
200         }
201
202     def condor_qseq_to_fastq(self, sources, target_pathname):
203         paths = []
204         for source in sources:
205             paths.append(source.path)
206         paths.sort()
207         return {
208             'pyscript': qseq2fastq.__file__,
209             'flowcell': sources[0].flowcell,
210             'target': target_pathname,
211             'sources': paths,
212             'ispaired': sources[0].paired,
213             'istar': len(sources) == 1,
214         }
215
216     def condor_desplit_fastq(self, sources, target_pathname):
217         paths = []
218         for source in sources:
219             paths.append(source.path)
220         paths.sort()
221         return {
222             'pyscript': desplit_fastq.__file__,
223             'target': target_pathname,
224             'sources': paths,
225             'ispaired': sources[0].paired,
226         }
227
228 def make_lane_dict(lib_db, lib_id):
229     """
230     Convert the lane_set in a lib_db to a dictionary
231     indexed by flowcell ID
232     """
233     result = []
234     for lane in lib_db[lib_id]['lane_set']:
235         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
236         lane['flowcell'] = flowcell_id
237         result.append((lane['flowcell'], lane))
238     return dict(result)
239