1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.pipelines import desplit_fastq
13 from htsworkflow.util.api import HtswApi
14 from htsworkflow.util.conversion import parse_flowcell_id
16 LOGGER = logging.getLogger(__name__)
18 class CondorFastqExtract(object):
19 def __init__(self, host, apidata, sequences_path,
22 """Extract fastqs from results archive
25 host (str): root of the htsworkflow api server
26 apidata (dict): id & key to post to the server
27 sequences_path (str): root of the directory tree to scan for files
28 log_path (str): where to put condor log files
29 force (bool): do we force overwriting current files?
31 self.api = HtswApi(host, apidata)
32 self.sequences_path = sequences_path
33 self.log_path = log_path
36 def create_scripts(self, library_result_map ):
38 Generate condor scripts to build any needed fastq files
41 library_result_map (list): [(library_id, destination directory), ...]
43 headers = {'srf': self.get_srf_condor_header(),
44 'qseq': self.get_qseq_condor_header(),
45 'split_fastq': self.get_split_fastq_condor_header(),
48 condor_entries = self.build_condor_arguments(library_result_map)
50 for script_type in headers.keys():
51 make_submit_script('{0}.condor'.format(script_type),
53 condor_entries[script_type])
55 def build_condor_arguments(self, library_result_map):
56 condor_entries = {'srf': [],
59 conversion_funcs = {'srf': self.condor_srf_to_fastq,
60 'qseq': self.condor_qseq_to_fastq,
61 'split_fastq': self.condor_desplit_fastq
63 lib_db = self.find_archive_sequence_files(library_result_map)
65 needed_targets = self.find_missing_targets(library_result_map, lib_db)
67 for target_pathname, available_sources in needed_targets.items():
68 LOGGER.debug(' target : %s' % (target_pathname,))
69 LOGGER.debug(' candidate sources: %s' % (available_sources,))
70 for condor_type in available_sources.keys():
71 conversion = conversion_funcs.get(condor_type, None)
72 if conversion is None:
73 errmsg = "Unrecognized type: {0} for {1}"
74 print errmsg.format(condor_type,
75 pformat(available_sources))
77 sources = available_sources.get(condor_type, None)
78 if sources is not None:
79 condor_entries.setdefault(condor_type, []).append(
80 conversion(sources, target_pathname)
83 print " need file", target_pathname
87 def get_split_fastq_condor_header(self):
88 return """Universe=vanilla
90 error=%(log)s/fastq.$(process).out
91 output=%(log)s/fastq.$(process).out
94 """ % {'exe': sys.executable,
95 'log': self.log_path }
97 def get_qseq_condor_header(self):
98 return """Universe=vanilla
100 error=%(log)s/qseq2fastq.$(process).out
101 output=%(log)s/qseq2fastq.$(process).out
102 log=%(log)s/qseq2fastq.log
104 """ % {'exe': sys.executable,
105 'log': self.log_path }
107 def get_srf_condor_header(self):
108 return """Universe=vanilla
110 output=%(log)s/srf_pair_fastq.$(process).out
111 error=%(log)s/srf_pair_fastq.$(process).out
112 log=%(log)s/srf_pair_fastq.log
113 environment="PYTHONPATH=%(env)s"
115 """ % {'exe': sys.executable,
116 'log': self.log_path,
117 'env': os.environ.get('PYTHONPATH', '')
120 def find_archive_sequence_files(self, library_result_map):
122 Find archived sequence files associated with our results.
124 LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
129 for lib_id, result_dir in library_result_map:
130 lib_info = self.api.get_library(lib_id)
131 lib_info['lanes'] = {}
132 lib_db[lib_id] = lib_info
134 for lane in lib_info['lane_set']:
135 lane_key = (lane['flowcell'], lane['lane_number'])
136 candidate_lanes[lane_key] = lib_id
137 seq_dirs.add(os.path.join(self.sequences_path,
140 LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
141 candidate_seq_list = scan_for_sequences(seq_dirs)
143 # at this point we have too many sequences as scan_for_sequences
144 # returns all the sequences in a flowcell directory
145 # so lets filter out the extras
147 for seq in candidate_seq_list:
148 lane_key = (seq.flowcell, seq.lane)
149 lib_id = candidate_lanes.get(lane_key, None)
150 if lib_id is not None:
151 lib_info = lib_db[lib_id]
152 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
156 def find_missing_targets(self, library_result_map, lib_db):
158 Check if the sequence file exists.
159 This requires computing what the sequence name is and checking
160 to see if it can be found in the sequence location.
162 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
164 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
165 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
166 # find what targets we're missing
168 for lib_id, result_dir in library_result_map:
170 lane_dict = make_lane_dict(lib_db, lib_id)
172 for lane_key, sequences in lib['lanes'].items():
173 for seq in sequences:
174 seq.paired = lane_dict[seq.flowcell]['paired_end']
175 lane_status = lane_dict[seq.flowcell]['status']
177 if seq.paired and seq.read is None:
179 filename_attributes = {
180 'flowcell': seq.flowcell,
187 if lane_status == 'Failed':
189 if seq.flowcell == '30DY0AAXX':
190 # 30DY0 only ran for 151 bases instead of 152
191 # it is actually 76 1st read, 75 2nd read
196 target_name = fastq_paired_template % \
199 target_name = fastq_single_template % \
202 target_pathname = os.path.join(result_dir, target_name)
203 if self.force or not os.path.exists(target_pathname):
204 t = needed_targets.setdefault(target_pathname, {})
205 t.setdefault(seq.filetype, []).append(seq)
207 return needed_targets
210 def condor_srf_to_fastq(self, sources, target_pathname):
212 raise ValueError("srf to fastq can only handle one file")
214 py = srf2fastq.__file__
215 flowcell = source.flowcell
216 mid = getattr(source, 'mid_point', None)
217 args = [ py, source.path, '--verbose']
219 args.extend(['--left', target_pathname])
220 # this is ugly. I did it because I was pregenerating the target
221 # names before I tried to figure out what sources could generate
222 # those targets, and everything up to this point had been
223 # one-to-one. So I couldn't figure out how to pair the
225 # With this at least the command will run correctly.
226 # however if we rename the default targets, this'll break
227 # also I think it'll generate it twice.
228 args.extend(['--right',
229 target_pathname.replace('_r1.fastq', '_r2.fastq')])
231 args.extend(['--single', target_pathname ])
233 if flowcell is not None:
234 args.extend(['--flowcell', flowcell])
237 args.extend(['-m', str(mid)])
240 args.extend(['--force'])
242 script = """arguments="%s"
244 """ % (" ".join(args),)
249 def condor_qseq_to_fastq(self, sources, target_pathname):
250 flowcell = sources[0].flowcell
251 py = qseq2fastq.__file__
253 args = [py, '-o', target_pathname ]
254 if flowcell is not None:
255 args.extend(['-f', flowcell])
256 if len(sources) == 1:
257 args += (['-i', sources[0].path])
259 for source in sources:
261 script = """arguments="%s"
263 """ % (" ".join(args))
267 def condor_desplit_fastq(self, sources, target_pathname):
268 py = desplit_fastq.__file__
269 args = [py, '-o', target_pathname, ]
271 for source in sources:
272 paths.append(source.path)
275 script = 'arguments="%s"\nqueue\n' % ( ' '.join(args))
278 def make_submit_script(target, header, body_list):
280 write out a text file
282 this was intended for condor submit scripts
285 target (str or stream):
286 if target is a string, we will open and close the file
287 if target is a stream, the caller is responsible.
290 header to write at the beginning of the file
291 body_list (list of strs):
292 a list of blocks to add to the file.
294 if type(target) in types.StringTypes:
299 for entry in body_list:
301 if type(target) in types.StringTypes:
304 def make_lane_dict(lib_db, lib_id):
306 Convert the lane_set in a lib_db to a dictionary
307 indexed by flowcell ID
310 for lane in lib_db[lib_id]['lane_set']:
311 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
312 lane['flowcell'] = flowcell_id
313 result.append((lane['flowcell'], lane))