Add extract HiSeq project based split fastq files to ucsc_gather
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat
6 import sys
7 import types
8
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.pipelines import desplit_fastq
13 from htsworkflow.util.api import HtswApi
14 from htsworkflow.util.conversion import parse_flowcell_id
15
16 LOGGER = logging.getLogger(__name__)
17
18 class CondorFastqExtract(object):
19     def __init__(self, host, apidata, sequences_path,
20                  log_path='log',
21                  force=False):
22         """Extract fastqs from results archive
23
24         Args:
25           host (str): root of the htsworkflow api server
26           apidata (dict): id & key to post to the server
27           sequences_path (str): root of the directory tree to scan for files
28           log_path (str): where to put condor log files
29           force (bool): do we force overwriting current files?
30         """
31         self.api = HtswApi(host, apidata)
32         self.sequences_path = sequences_path
33         self.log_path = log_path
34         self.force = force
35
36     def create_scripts(self, library_result_map ):
37         """
38         Generate condor scripts to build any needed fastq files
39
40         Args:
41           library_result_map (list):  [(library_id, destination directory), ...]
42         """
43         headers = {'srf': self.get_srf_condor_header(),
44                    'qseq': self.get_qseq_condor_header(),
45                    'split_fastq': self.get_split_fastq_condor_header(),
46                   }
47
48         condor_entries = self.build_condor_arguments(library_result_map)
49
50         for script_type in headers.keys():
51             make_submit_script('{0}.condor'.format(script_type),
52                                headers[script_type],
53                                condor_entries[script_type])
54
55     def build_condor_arguments(self, library_result_map):
56         condor_entries = {'srf': [],
57                           'qseq': [],
58                           'split_fastq': []}
59         conversion_funcs = {'srf': self.condor_srf_to_fastq,
60                             'qseq': self.condor_qseq_to_fastq,
61                             'split_fastq': self.condor_desplit_fastq
62                             }
63         lib_db = self.find_archive_sequence_files(library_result_map)
64
65         needed_targets = self.find_missing_targets(library_result_map, lib_db)
66
67         for target_pathname, available_sources in needed_targets.items():
68             LOGGER.debug(' target : %s' % (target_pathname,))
69             LOGGER.debug(' candidate sources: %s' % (available_sources,))
70             for condor_type in available_sources.keys():
71                 conversion = conversion_funcs.get(condor_type, None)
72                 if conversion is None:
73                     errmsg = "Unrecognized type: {0} for {1}"
74                     print errmsg.format(condor_type,
75                                         pformat(available_sources))
76                     continue
77                 sources = available_sources.get(condor_type, None)
78                 if sources is not None:
79                     condor_entries.setdefault(condor_type, []).append(
80                         conversion(sources, target_pathname)
81                     )
82             else:
83                 print " need file", target_pathname
84
85         return condor_entries
86
87     def get_split_fastq_condor_header(self):
88         return """Universe=vanilla
89 executable=%(exe)s
90 error=%(log)s/fastq.$(process).out
91 output=%(log)s/fastq.$(process).out
92 log=%(log)s/fastq.log
93
94 """ % {'exe': sys.executable,
95        'log': self.log_path }
96
97     def get_qseq_condor_header(self):
98         return """Universe=vanilla
99 executable=%(exe)s
100 error=%(log)s/qseq2fastq.$(process).out
101 output=%(log)s/qseq2fastq.$(process).out
102 log=%(log)s/qseq2fastq.log
103
104 """ % {'exe': sys.executable,
105        'log': self.log_path }
106
107     def get_srf_condor_header(self):
108         return """Universe=vanilla
109 executable=%(exe)s
110 output=%(log)s/srf_pair_fastq.$(process).out
111 error=%(log)s/srf_pair_fastq.$(process).out
112 log=%(log)s/srf_pair_fastq.log
113 environment="PYTHONPATH=%(env)s"
114
115 """ % {'exe': sys.executable,
116            'log': self.log_path,
117            'env': os.environ.get('PYTHONPATH', '')
118       }
119
120     def find_archive_sequence_files(self,  library_result_map):
121         """
122         Find archived sequence files associated with our results.
123         """
124         LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
125
126         lib_db = {}
127         seq_dirs = set()
128         candidate_lanes = {}
129         for lib_id, result_dir in library_result_map:
130             lib_info = self.api.get_library(lib_id)
131             lib_info['lanes'] = {}
132             lib_db[lib_id] = lib_info
133
134             for lane in lib_info['lane_set']:
135                 lane_key = (lane['flowcell'], lane['lane_number'])
136                 candidate_lanes[lane_key] = lib_id
137                 seq_dirs.add(os.path.join(self.sequences_path,
138                                              'flowcells',
139                                              lane['flowcell']))
140         LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
141         candidate_seq_list = scan_for_sequences(seq_dirs)
142
143         # at this point we have too many sequences as scan_for_sequences
144         # returns all the sequences in a flowcell directory
145         # so lets filter out the extras
146
147         for seq in candidate_seq_list:
148             lane_key = (seq.flowcell, seq.lane)
149             lib_id = candidate_lanes.get(lane_key, None)
150             if lib_id is not None:
151                 lib_info = lib_db[lib_id]
152                 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
153
154         return lib_db
155
156     def find_missing_targets(self, library_result_map, lib_db):
157         """
158         Check if the sequence file exists.
159         This requires computing what the sequence name is and checking
160         to see if it can be found in the sequence location.
161
162         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
163         """
164         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
165         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
166         # find what targets we're missing
167         needed_targets = {}
168         for lib_id, result_dir in library_result_map:
169             lib = lib_db[lib_id]
170             lane_dict = make_lane_dict(lib_db, lib_id)
171
172             for lane_key, sequences in lib['lanes'].items():
173                 for seq in sequences:
174                     seq.paired = lane_dict[seq.flowcell]['paired_end']
175                     lane_status = lane_dict[seq.flowcell]['status']
176
177                     if seq.paired and seq.read is None:
178                         seq.read = 1
179                     filename_attributes = {
180                         'flowcell': seq.flowcell,
181                         'lib_id': lib_id,
182                         'lane': seq.lane,
183                         'read': seq.read,
184                         'cycle': seq.cycle
185                         }
186                     # skip bad runs
187                     if lane_status == 'Failed':
188                         continue
189                     if seq.flowcell == '30DY0AAXX':
190                         # 30DY0 only ran for 151 bases instead of 152
191                         # it is actually 76 1st read, 75 2nd read
192                         seq.mid_point = 76
193
194                     # end filters
195                     if seq.paired:
196                         target_name = fastq_paired_template % \
197                                       filename_attributes
198                     else:
199                         target_name = fastq_single_template % \
200                                       filename_attributes
201
202                     target_pathname = os.path.join(result_dir, target_name)
203                     if self.force or not os.path.exists(target_pathname):
204                         t = needed_targets.setdefault(target_pathname, {})
205                         t.setdefault(seq.filetype, []).append(seq)
206
207         return needed_targets
208
209
210     def condor_srf_to_fastq(self, sources, target_pathname):
211         if len(sources) > 1:
212             raise ValueError("srf to fastq can only handle one file")
213         source = sources[0]
214         py = srf2fastq.__file__
215         flowcell = source.flowcell
216         mid = getattr(source, 'mid_point', None)
217         args = [ py, source.path, '--verbose']
218         if source.paired:
219             args.extend(['--left', target_pathname])
220             # this is ugly. I did it because I was pregenerating the target
221             # names before I tried to figure out what sources could generate
222             # those targets, and everything up to this point had been
223             # one-to-one. So I couldn't figure out how to pair the
224             # target names.
225             # With this at least the command will run correctly.
226             # however if we rename the default targets, this'll break
227             # also I think it'll generate it twice.
228             args.extend(['--right',
229                          target_pathname.replace('_r1.fastq', '_r2.fastq')])
230         else:
231             args.extend(['--single', target_pathname ])
232
233         if flowcell is not None:
234             args.extend(['--flowcell', flowcell])
235
236         if mid is not None:
237             args.extend(['-m', str(mid)])
238
239         if self.force:
240             args.extend(['--force'])
241
242         script = """arguments="%s"
243 queue
244 """ % (" ".join(args),)
245
246         return  script
247
248
249     def condor_qseq_to_fastq(self, sources, target_pathname):
250         flowcell = sources[0].flowcell
251         py = qseq2fastq.__file__
252
253         args = [py, '-o', target_pathname ]
254         if flowcell is not None:
255             args.extend(['-f', flowcell])
256         if len(sources) == 1:
257             args += (['-i', sources[0].path])
258         else:
259             for source in sources:
260                 args += source.path
261         script = """arguments="%s"
262 queue
263 """ % (" ".join(args))
264
265         return script
266
267     def condor_desplit_fastq(self, sources, target_pathname):
268         py = desplit_fastq.__file__
269         args = [py, '-o', target_pathname, ]
270         paths = []
271         for source in sources:
272             paths.append(source.path)
273         paths.sort()
274         args += paths
275         script = 'arguments="%s"\nqueue\n' % ( ' '.join(args))
276         return script
277
278 def make_submit_script(target, header, body_list):
279     """
280     write out a text file
281
282     this was intended for condor submit scripts
283
284     Args:
285       target (str or stream):
286         if target is a string, we will open and close the file
287         if target is a stream, the caller is responsible.
288
289       header (str);
290         header to write at the beginning of the file
291       body_list (list of strs):
292         a list of blocks to add to the file.
293     """
294     if type(target) in types.StringTypes:
295         f = open(target,"w")
296     else:
297         f = target
298     f.write(header)
299     for entry in body_list:
300         f.write(entry)
301     if type(target) in types.StringTypes:
302         f.close()
303
304 def make_lane_dict(lib_db, lib_id):
305     """
306     Convert the lane_set in a lib_db to a dictionary
307     indexed by flowcell ID
308     """
309     result = []
310     for lane in lib_db[lib_id]['lane_set']:
311         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
312         lane['flowcell'] = flowcell_id
313         result.append((lane['flowcell'], lane))
314     return dict(result)
315