1 """Convert srf and qseq archive files to fastqs
8 from htsworkflow.pipelines.sequences import scan_for_sequences
9 from htsworkflow.pipelines import qseq2fastq
10 from htsworkflow.pipelines import srf2fastq
11 from htsworkflow.util.api import HtswApi
12 from htsworkflow.util.conversion import parse_flowcell_id
14 LOGGER = logging.getLogger(__name__)
16 class CondorFastqExtract(object):
17 def __init__(self, host, apidata, sequences_path,
20 """Extract fastqs from results archive
23 host (str): root of the htsworkflow api server
24 apidata (dict): id & key to post to the server
25 sequences_path (str): root of the directory tree to scan for files
26 log_path (str): where to put condor log files
27 force (bool): do we force overwriting current files?
29 self.api = HtswApi(host, apidata)
30 self.sequences_path = sequences_path
31 self.log_path = log_path
34 def build_fastqs(self, library_result_map ):
36 Generate condor scripts to build any needed fastq files
39 library_result_map (list): [(library_id, destination directory), ...]
41 qseq_condor_header = self.get_qseq_condor_header()
42 qseq_condor_entries = []
43 srf_condor_header = self.get_srf_condor_header()
44 srf_condor_entries = []
45 lib_db = self.find_archive_sequence_files(library_result_map)
47 needed_targets = self.find_missing_targets(library_result_map, lib_db)
49 for target_pathname, available_sources in needed_targets.items():
50 LOGGER.debug(' target : %s' % (target_pathname,))
51 LOGGER.debug(' candidate sources: %s' % (available_sources,))
52 if available_sources.has_key('qseq'):
53 source = available_sources['qseq']
54 qseq_condor_entries.append(
55 self.condor_qseq_to_fastq(source.path,
59 elif available_sources.has_key('srf'):
60 source = available_sources['srf']
61 mid = getattr(source, 'mid_point', None)
62 srf_condor_entries.append(
63 self.condor_srf_to_fastq(source.path,
70 print " need file", target_pathname
72 if len(srf_condor_entries) > 0:
73 make_submit_script('srf.fastq.condor',
77 if len(qseq_condor_entries) > 0:
78 make_submit_script('qseq.fastq.condor',
83 def get_qseq_condor_header(self):
84 return """Universe=vanilla
86 error=%(log)s/qseq2fastq.$(process).out
87 output=%(log)s/qseq2fastq.$(process).out
88 log=%(log)s/qseq2fastq.log
90 """ % {'exe': sys.executable,
91 'log': self.log_path }
93 def get_srf_condor_header(self):
94 return """Universe=vanilla
96 output=%(log)s/srf_pair_fastq.$(process).out
97 error=%(log)s/srf_pair_fastq.$(process).out
98 log=%(log)s/srf_pair_fastq.log
99 environment="PYTHONPATH=%(env)s"
101 """ % {'exe': sys.executable,
102 'log': self.log_path,
103 'env': os.environ.get('PYTHONPATH', '')
106 def find_archive_sequence_files(self, library_result_map):
108 Find archived sequence files associated with our results.
110 LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
115 for lib_id, result_dir in library_result_map:
116 lib_info = self.api.get_library(lib_id)
117 lib_info['lanes'] = {}
118 lib_db[lib_id] = lib_info
120 for lane in lib_info['lane_set']:
121 lane_key = (lane['flowcell'], lane['lane_number'])
122 candidate_lanes[lane_key] = lib_id
123 seq_dirs.add(os.path.join(self.sequences_path,
126 LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
127 candidate_seq_list = scan_for_sequences(seq_dirs)
129 # at this point we have too many sequences as scan_for_sequences
130 # returns all the sequences in a flowcell directory
131 # so lets filter out the extras
133 for seq in candidate_seq_list:
134 lane_key = (seq.flowcell, seq.lane)
135 lib_id = candidate_lanes.get(lane_key, None)
136 if lib_id is not None:
137 lib_info = lib_db[lib_id]
138 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
142 def find_missing_targets(self, library_result_map, lib_db):
144 Check if the sequence file exists.
145 This requires computing what the sequence name is and checking
146 to see if it can be found in the sequence location.
148 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
150 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
151 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
152 # find what targets we're missing
154 for lib_id, result_dir in library_result_map:
156 lane_dict = make_lane_dict(lib_db, lib_id)
158 for lane_key, sequences in lib['lanes'].items():
159 for seq in sequences:
160 seq.paired = lane_dict[seq.flowcell]['paired_end']
161 lane_status = lane_dict[seq.flowcell]['status']
163 if seq.paired and seq.read is None:
165 filename_attributes = {
166 'flowcell': seq.flowcell,
173 if lane_status == 'Failed':
175 if seq.flowcell == '30DY0AAXX':
176 # 30DY0 only ran for 151 bases instead of 152
177 # it is actually 76 1st read, 75 2nd read
182 target_name = fastq_paired_template % filename_attributes
184 target_name = fastq_single_template % filename_attributes
186 target_pathname = os.path.join(result_dir, target_name)
187 if self.force or not os.path.exists(target_pathname):
188 t = needed_targets.setdefault(target_pathname, {})
189 t[seq.filetype] = seq
191 return needed_targets
194 def condor_srf_to_fastq(self,
200 py = srf2fastq.__file__
201 args = [ py, srf_file, '--verbose']
203 args.extend(['--left', target_pathname])
204 # this is ugly. I did it because I was pregenerating the target
205 # names before I tried to figure out what sources could generate
206 # those targets, and everything up to this point had been
207 # one-to-one. So I couldn't figure out how to pair the
209 # With this at least the command will run correctly.
210 # however if we rename the default targets, this'll break
211 # also I think it'll generate it twice.
212 args.extend(['--right',
213 target_pathname.replace('_r1.fastq', '_r2.fastq')])
215 args.extend(['--single', target_pathname ])
216 if flowcell is not None:
217 args.extend(['--flowcell', flowcell])
220 args.extend(['-m', str(mid)])
223 args.extend(['--force'])
225 script = """arguments="%s"
227 """ % (" ".join(args),)
232 def condor_qseq_to_fastq(self, qseq_file, target_pathname, flowcell=None):
233 py = qseq2fastq.__file__
234 args = [py, '-i', qseq_file, '-o', target_pathname ]
235 if flowcell is not None:
236 args.extend(['-f', flowcell])
237 script = """arguments="%s"
239 """ % (" ".join(args))
243 def make_submit_script(target, header, body_list):
245 write out a text file
247 this was intended for condor submit scripts
250 target (str or stream):
251 if target is a string, we will open and close the file
252 if target is a stream, the caller is responsible.
255 header to write at the beginning of the file
256 body_list (list of strs):
257 a list of blocks to add to the file.
259 if type(target) in types.StringTypes:
264 for entry in body_list:
266 if type(target) in types.StringTypes:
269 def make_lane_dict(lib_db, lib_id):
271 Convert the lane_set in a lib_db to a dictionary
272 indexed by flowcell ID
275 for lane in lib_db[lib_id]['lane_set']:
276 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
277 lane['flowcell'] = flowcell_id
278 result.append((lane['flowcell'], lane))