Move code to make srf and qseq to fastq condor scripts to its own module
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 import sys
6 import types
7
8 from htsworkflow.frontend.samples.results import parse_flowcell_id
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.util.api import HtswApi
13
14 logger = logging.getLogger(__name__)
15
16 class CondorFastqExtract(object):
17     def __init__(self, host, apidata, sequences_path,
18                  log_path='log',
19                  force=False):
20         """Extract fastqs from results archive
21
22         Args:
23           host (str): root of the htsworkflow api server
24           apidata (dict): id & key to post to the server
25           sequences_path (str): root of the directory tree to scan for files
26           log_path (str): where to put condor log files
27           force (bool): do we force overwriting current files?
28         """
29         self.api = HtswApi(host, apidata)
30         self.sequences_path = sequences_path
31         self.log_path = log_path
32         self.force = force
33
34     def build_fastqs(self, library_result_map ):
35         """
36         Generate condor scripts to build any needed fastq files
37         
38         Args:
39           library_result_map (list):  [(library_id, destination directory), ...]
40         """
41         qseq_condor_header = self.get_qseq_condor_header()
42         qseq_condor_entries = []
43         srf_condor_header = self.get_srf_condor_header()
44         srf_condor_entries = []
45         lib_db = self.find_archive_sequence_files(library_result_map)
46     
47         needed_targets = self.find_missing_targets(library_result_map, lib_db)
48     
49         for target_pathname, available_sources in needed_targets.items():
50             logger.debug(' target : %s' % (target_pathname,))
51             logger.debug(' candidate sources: %s' % (available_sources,))
52             if available_sources.has_key('qseq'):
53                 source = available_sources['qseq']
54                 qseq_condor_entries.append(
55                     self.condor_qseq_to_fastq(source.path, 
56                                               target_pathname, 
57                                               source.flowcell)
58                 )
59             elif available_sources.has_key('srf'):
60                 source = available_sources['srf']
61                 mid = getattr(source, 'mid_point', None)
62                 srf_condor_entries.append(
63                     self.condor_srf_to_fastq(source.path, 
64                                              target_pathname,
65                                              source.paired,
66                                              source.flowcell,
67                                              mid)
68                 )
69             else:
70                 print " need file", target_pathname
71     
72         if len(srf_condor_entries) > 0:
73             make_submit_script('srf.fastq.condor', 
74                                srf_condor_header,
75                                srf_condor_entries)
76     
77         if len(qseq_condor_entries) > 0:
78             make_submit_script('qseq.fastq.condor', 
79                                qseq_condor_header,
80                                qseq_condor_entries)
81     
82
83     def get_qseq_condor_header(self):
84         return """Universe=vanilla
85 executable=%(exe)s
86 error=%(log)s/qseq2fastq.err.$(process).log
87 output=%(log)s/qseq2fastq.out.$(process).log
88 log=%(log)s/qseq2fastq.log
89
90 """ % {'exe': sys.executable,
91        'log': self.log_path }
92
93     def get_srf_condor_header(self):
94         return """Universe=vanilla
95 executable=%(exe)s
96 output=%(log)s/srf_pair_fastq.out.$(process).log
97 error=%(log)s/srf_pair_fastq.err.$(process).log
98 log=%(log)s/srf_pair_fastq.log
99 environment="%(env)s"
100     
101 """ % {'exe': sys.executable,
102            'log': self.log_path,
103            'env': os.environ.get('PYTHONPATH', '')
104       }
105             
106     def find_archive_sequence_files(self,  library_result_map):
107         """
108         Find archived sequence files associated with our results.
109         """
110         logger.debug("Searching for sequence files in: %s" %(self.sequences_path,))
111     
112         lib_db = {}
113         seq_dirs = set()
114         candidate_lanes = {}
115         for lib_id, result_dir in library_result_map:
116             lib_info = self.api.get_library(lib_id)
117             lib_info['lanes'] = {}
118             lib_db[lib_id] = lib_info
119     
120             for lane in lib_info['lane_set']:
121                 lane_key = (lane['flowcell'], lane['lane_number'])
122                 candidate_lanes[lane_key] = lib_id
123                 seq_dirs.add(os.path.join(self.sequences_path, 
124                                              'flowcells', 
125                                              lane['flowcell']))
126         logger.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
127         candidate_seq_list = scan_for_sequences(seq_dirs)
128     
129         # at this point we have too many sequences as scan_for_sequences
130         # returns all the sequences in a flowcell directory
131         # so lets filter out the extras
132         
133         for seq in candidate_seq_list:
134             lane_key = (seq.flowcell, seq.lane)
135             lib_id = candidate_lanes.get(lane_key, None)
136             if lib_id is not None:
137                 lib_info = lib_db[lib_id]
138                 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
139         
140         return lib_db
141     
142     def find_missing_targets(self, library_result_map, lib_db):
143         """
144         Check if the sequence file exists.
145         This requires computing what the sequence name is and checking
146         to see if it can be found in the sequence location.
147     
148         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
149         """
150         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
151         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
152         # find what targets we're missing
153         needed_targets = {}
154         for lib_id, result_dir in library_result_map:
155             lib = lib_db[lib_id]
156             lane_dict = make_lane_dict(lib_db, lib_id)
157             
158             for lane_key, sequences in lib['lanes'].items():
159                 for seq in sequences:
160                     seq.paired = lane_dict[seq.flowcell]['paired_end']
161                     lane_status = lane_dict[seq.flowcell]['status']
162     
163                     if seq.paired and seq.read is None:
164                         seq.read = 1
165                     filename_attributes = { 
166                         'flowcell': seq.flowcell,
167                         'lib_id': lib_id,
168                         'lane': seq.lane,
169                         'read': seq.read,
170                         'cycle': seq.cycle
171                         }
172                     # skip bad runs
173                     if lane_status == 'Failed':
174                         continue
175                     if seq.flowcell == '30DY0AAXX':
176                         # 30DY0 only ran for 151 bases instead of 152
177                         # it is actually 76 1st read, 75 2nd read
178                         seq.mid_point = 76
179     
180                     # end filters
181                     if seq.paired:
182                         target_name = fastq_paired_template % filename_attributes
183                     else:
184                         target_name = fastq_single_template % filename_attributes
185     
186                     target_pathname = os.path.join(result_dir, target_name)
187                     if self.force or not os.path.exists(target_pathname):
188                         t = needed_targets.setdefault(target_pathname, {})
189                         t[seq.filetype] = seq
190     
191         return needed_targets
192
193     
194     def condor_srf_to_fastq(self,
195                             srf_file,
196                             target_pathname,
197                             paired,
198                             flowcell=None,
199                             mid=None):
200         py = srf2fastq.__file__
201         args = [ py, srf_file, ]
202         if paired:
203             args.extend(['--left', target_pathname])
204             # this is ugly. I did it because I was pregenerating the target
205             # names before I tried to figure out what sources could generate
206             # those targets, and everything up to this point had been
207             # one-to-one. So I couldn't figure out how to pair the 
208             # target names. 
209             # With this at least the command will run correctly.
210             # however if we rename the default targets, this'll break
211             # also I think it'll generate it twice.
212             args.extend(['--right', 
213                          target_pathname.replace('_r1.fastq', '_r2.fastq')])
214         else:
215             args.extend(['--single', target_pathname ])
216         if flowcell is not None:
217             args.extend(['--flowcell', flowcell])
218     
219         if mid is not None:
220             args.extend(['-m', str(mid)])
221     
222         if self.force:
223             args.extend(['--force'])
224     
225         script = """arguments="%s"
226 queue
227 """ % (" ".join(args),)
228         
229         return  script 
230     
231     
232     def condor_qseq_to_fastq(self, qseq_file, target_pathname, flowcell=None):
233         py = qseq2fastq.__file__
234         args = [py, '-i', qseq_file, '-o', target_pathname ]
235         if flowcell is not None:
236             args.extend(['-f', flowcell])
237         script = """arguments="%s"
238 queue
239 """ % (" ".join(args))
240     
241         return script 
242     
243 def make_submit_script(target, header, body_list):
244     """
245     write out a text file
246
247     this was intended for condor submit scripts
248
249     Args:
250       target (str or stream): 
251         if target is a string, we will open and close the file
252         if target is a stream, the caller is responsible.
253
254       header (str);
255         header to write at the beginning of the file
256       body_list (list of strs):
257         a list of blocks to add to the file.
258     """
259     if type(target) in types.StringTypes:
260         f = open(target,"w")
261     else:
262         f = target
263     f.write(header)
264     for entry in body_list:
265         f.write(entry)
266     if type(target) in types.StringTypes:
267         f.close()
268
269 def make_lane_dict(lib_db, lib_id):
270     """
271     Convert the lane_set in a lib_db to a dictionary
272     indexed by flowcell ID
273     """
274     result = []
275     for lane in lib_db[lib_id]['lane_set']:
276         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
277         lane['flowcell'] = flowcell_id
278         result.append((lane['flowcell'], lane))
279     return dict(result)
280