Always include the python path when generating the condor scripts
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat
6 import sys
7 import types
8
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.pipelines import desplit_fastq
13 from htsworkflow.util.api import HtswApi
14 from htsworkflow.util.conversion import parse_flowcell_id
15
16 LOGGER = logging.getLogger(__name__)
17
18 class CondorFastqExtract(object):
19     def __init__(self, host, apidata, sequences_path,
20                  log_path='log',
21                  force=False):
22         """Extract fastqs from results archive
23
24         Args:
25           host (str): root of the htsworkflow api server
26           apidata (dict): id & key to post to the server
27           sequences_path (str): root of the directory tree to scan for files
28           log_path (str): where to put condor log files
29           force (bool): do we force overwriting current files?
30         """
31         self.api = HtswApi(host, apidata)
32         self.sequences_path = sequences_path
33         self.log_path = log_path
34         self.force = force
35
36     def create_scripts(self, library_result_map ):
37         """
38         Generate condor scripts to build any needed fastq files
39
40         Args:
41           library_result_map (list):  [(library_id, destination directory), ...]
42         """
43         headers = {'srf': self.get_srf_condor_header(),
44                    'qseq': self.get_qseq_condor_header(),
45                    'split_fastq': self.get_split_fastq_condor_header(),
46                   }
47
48         condor_entries = self.build_condor_arguments(library_result_map)
49
50         for script_type in headers.keys():
51             make_submit_script('{0}.condor'.format(script_type),
52                                headers[script_type],
53                                condor_entries[script_type])
54
55     def build_condor_arguments(self, library_result_map):
56         condor_entries = {'srf': [],
57                           'qseq': [],
58                           'split_fastq': []}
59         conversion_funcs = {'srf': self.condor_srf_to_fastq,
60                             'qseq': self.condor_qseq_to_fastq,
61                             'split_fastq': self.condor_desplit_fastq
62                             }
63         lib_db = self.find_archive_sequence_files(library_result_map)
64
65         needed_targets = self.find_missing_targets(library_result_map, lib_db)
66
67         for target_pathname, available_sources in needed_targets.items():
68             LOGGER.debug(' target : %s' % (target_pathname,))
69             LOGGER.debug(' candidate sources: %s' % (available_sources,))
70             for condor_type in available_sources.keys():
71                 conversion = conversion_funcs.get(condor_type, None)
72                 if conversion is None:
73                     errmsg = "Unrecognized type: {0} for {1}"
74                     print errmsg.format(condor_type,
75                                         pformat(available_sources))
76                     continue
77                 sources = available_sources.get(condor_type, None)
78                 if sources is not None:
79                     condor_entries.setdefault(condor_type, []).append(
80                         conversion(sources, target_pathname)
81                     )
82             else:
83                 print " need file", target_pathname
84
85         return condor_entries
86
87     def get_split_fastq_condor_header(self):
88         return """Universe=vanilla
89 executable=%(exe)s
90 error=%(log)s/fastq.$(process).out
91 output=%(log)s/fastq.$(process).out
92 log=%(log)s/fastq.log
93 environment="PYTHONPATH=%(env)s"
94
95 """ % {'exe': sys.executable,
96        'log': self.log_path,
97        'env': os.environ.get('PYTHONPATH', '')
98       }
99
100     def get_qseq_condor_header(self):
101         return """Universe=vanilla
102 executable=%(exe)s
103 error=%(log)s/qseq2fastq.$(process).out
104 output=%(log)s/qseq2fastq.$(process).out
105 log=%(log)s/qseq2fastq.log
106 environment="PYTHONPATH=%(env)s"
107
108 """ % {'exe': sys.executable,
109        'log': self.log_path,
110        'env': os.environ.get('PYTHONPATH','')
111       }
112
113     def get_srf_condor_header(self):
114         return """Universe=vanilla
115 executable=%(exe)s
116 output=%(log)s/srf_pair_fastq.$(process).out
117 error=%(log)s/srf_pair_fastq.$(process).out
118 log=%(log)s/srf_pair_fastq.log
119 environment="PYTHONPATH=%(env)s"
120
121 """ % {'exe': sys.executable,
122            'log': self.log_path,
123            'env': os.environ.get('PYTHONPATH', '')
124       }
125
126     def find_archive_sequence_files(self,  library_result_map):
127         """
128         Find archived sequence files associated with our results.
129         """
130         LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
131
132         lib_db = {}
133         seq_dirs = set()
134         candidate_lanes = {}
135         for lib_id, result_dir in library_result_map:
136             lib_info = self.api.get_library(lib_id)
137             lib_info['lanes'] = {}
138             lib_db[lib_id] = lib_info
139
140             for lane in lib_info['lane_set']:
141                 lane_key = (lane['flowcell'], lane['lane_number'])
142                 candidate_lanes[lane_key] = lib_id
143                 seq_dirs.add(os.path.join(self.sequences_path,
144                                              'flowcells',
145                                              lane['flowcell']))
146         LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
147         candidate_seq_list = scan_for_sequences(seq_dirs)
148
149         # at this point we have too many sequences as scan_for_sequences
150         # returns all the sequences in a flowcell directory
151         # so lets filter out the extras
152
153         for seq in candidate_seq_list:
154             lane_key = (seq.flowcell, seq.lane)
155             lib_id = candidate_lanes.get(lane_key, None)
156             if lib_id is not None:
157                 lib_info = lib_db[lib_id]
158                 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
159
160         return lib_db
161
162     def find_missing_targets(self, library_result_map, lib_db):
163         """
164         Check if the sequence file exists.
165         This requires computing what the sequence name is and checking
166         to see if it can be found in the sequence location.
167
168         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
169         """
170         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
171         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
172         # find what targets we're missing
173         needed_targets = {}
174         for lib_id, result_dir in library_result_map:
175             lib = lib_db[lib_id]
176             lane_dict = make_lane_dict(lib_db, lib_id)
177
178             for lane_key, sequences in lib['lanes'].items():
179                 for seq in sequences:
180                     seq.paired = lane_dict[seq.flowcell]['paired_end']
181                     lane_status = lane_dict[seq.flowcell]['status']
182
183                     if seq.paired and seq.read is None:
184                         seq.read = 1
185                     filename_attributes = {
186                         'flowcell': seq.flowcell,
187                         'lib_id': lib_id,
188                         'lane': seq.lane,
189                         'read': seq.read,
190                         'cycle': seq.cycle
191                         }
192                     # skip bad runs
193                     if lane_status == 'Failed':
194                         continue
195                     if seq.flowcell == '30DY0AAXX':
196                         # 30DY0 only ran for 151 bases instead of 152
197                         # it is actually 76 1st read, 75 2nd read
198                         seq.mid_point = 76
199
200                     # end filters
201                     if seq.paired:
202                         target_name = fastq_paired_template % \
203                                       filename_attributes
204                     else:
205                         target_name = fastq_single_template % \
206                                       filename_attributes
207
208                     target_pathname = os.path.join(result_dir, target_name)
209                     if self.force or not os.path.exists(target_pathname):
210                         t = needed_targets.setdefault(target_pathname, {})
211                         t.setdefault(seq.filetype, []).append(seq)
212
213         return needed_targets
214
215
216     def condor_srf_to_fastq(self, sources, target_pathname):
217         if len(sources) > 1:
218             raise ValueError("srf to fastq can only handle one file")
219         source = sources[0]
220         py = srf2fastq.__file__
221         flowcell = source.flowcell
222         mid = getattr(source, 'mid_point', None)
223         args = [ py, source.path, '--verbose']
224         if source.paired:
225             args.extend(['--left', target_pathname])
226             # this is ugly. I did it because I was pregenerating the target
227             # names before I tried to figure out what sources could generate
228             # those targets, and everything up to this point had been
229             # one-to-one. So I couldn't figure out how to pair the
230             # target names.
231             # With this at least the command will run correctly.
232             # however if we rename the default targets, this'll break
233             # also I think it'll generate it twice.
234             args.extend(['--right',
235                          target_pathname.replace('_r1.fastq', '_r2.fastq')])
236         else:
237             args.extend(['--single', target_pathname ])
238
239         if flowcell is not None:
240             args.extend(['--flowcell', flowcell])
241
242         if mid is not None:
243             args.extend(['-m', str(mid)])
244
245         if self.force:
246             args.extend(['--force'])
247
248         script = """arguments="%s"
249 queue
250 """ % (" ".join(args),)
251
252         return  script
253
254
255     def condor_qseq_to_fastq(self, sources, target_pathname):
256         flowcell = sources[0].flowcell
257         py = qseq2fastq.__file__
258
259         args = [py, '-o', target_pathname ]
260         if flowcell is not None:
261             args.extend(['-f', flowcell])
262         if len(sources) == 1:
263             args += (['-i', sources[0].path])
264         else:
265             for source in sources:
266                 args += source.path
267         script = """arguments="%s"
268 queue
269 """ % (" ".join(args))
270
271         return script
272
273     def condor_desplit_fastq(self, sources, target_pathname):
274         py = desplit_fastq.__file__
275         args = [py, '-o', target_pathname, ]
276         paths = []
277         for source in sources:
278             paths.append(source.path)
279         paths.sort()
280         args += paths
281         script = 'arguments="%s"\nqueue\n' % ( ' '.join(args))
282         return script
283
284 def make_submit_script(target, header, body_list):
285     """
286     write out a text file
287
288     this was intended for condor submit scripts
289
290     Args:
291       target (str or stream):
292         if target is a string, we will open and close the file
293         if target is a stream, the caller is responsible.
294
295       header (str);
296         header to write at the beginning of the file
297       body_list (list of strs):
298         a list of blocks to add to the file.
299     """
300     if type(target) in types.StringTypes:
301         f = open(target,"w")
302     else:
303         f = target
304     f.write(header)
305     for entry in body_list:
306         f.write(entry)
307     if type(target) in types.StringTypes:
308         f.close()
309
310 def make_lane_dict(lib_db, lib_id):
311     """
312     Convert the lane_set in a lib_db to a dictionary
313     indexed by flowcell ID
314     """
315     result = []
316     for lane in lib_db[lib_id]['lane_set']:
317         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
318         lane['flowcell'] = flowcell_id
319         result.append((lane['flowcell'], lane))
320     return dict(result)
321