1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.pipelines import desplit_fastq
13 from htsworkflow.util.api import HtswApi
14 from htsworkflow.util.conversion import parse_flowcell_id
16 LOGGER = logging.getLogger(__name__)
18 class CondorFastqExtract(object):
19 def __init__(self, host, apidata, sequences_path,
22 """Extract fastqs from results archive
25 host (str): root of the htsworkflow api server
26 apidata (dict): id & key to post to the server
27 sequences_path (str): root of the directory tree to scan for files
28 log_path (str): where to put condor log files
29 force (bool): do we force overwriting current files?
31 self.api = HtswApi(host, apidata)
32 self.sequences_path = sequences_path
33 self.log_path = log_path
36 def create_scripts(self, library_result_map ):
38 Generate condor scripts to build any needed fastq files
41 library_result_map (list): [(library_id, destination directory), ...]
43 headers = {'srf': self.get_srf_condor_header(),
44 'qseq': self.get_qseq_condor_header(),
45 'split_fastq': self.get_split_fastq_condor_header(),
48 condor_entries = self.build_condor_arguments(library_result_map)
50 for script_type in headers.keys():
51 make_submit_script('{0}.condor'.format(script_type),
53 condor_entries[script_type])
55 def build_condor_arguments(self, library_result_map):
56 condor_entries = {'srf': [],
59 conversion_funcs = {'srf': self.condor_srf_to_fastq,
60 'qseq': self.condor_qseq_to_fastq,
61 'split_fastq': self.condor_desplit_fastq
63 lib_db = self.find_archive_sequence_files(library_result_map)
65 needed_targets = self.find_missing_targets(library_result_map, lib_db)
67 for target_pathname, available_sources in needed_targets.items():
68 LOGGER.debug(' target : %s' % (target_pathname,))
69 LOGGER.debug(' candidate sources: %s' % (available_sources,))
70 for condor_type in available_sources.keys():
71 conversion = conversion_funcs.get(condor_type, None)
72 if conversion is None:
73 errmsg = "Unrecognized type: {0} for {1}"
74 print errmsg.format(condor_type,
75 pformat(available_sources))
77 sources = available_sources.get(condor_type, None)
78 if sources is not None:
79 condor_entries.setdefault(condor_type, []).append(
80 conversion(sources, target_pathname)
83 print " need file", target_pathname
87 def get_split_fastq_condor_header(self):
88 return """Universe=vanilla
90 error=%(log)s/fastq.$(process).out
91 output=%(log)s/fastq.$(process).out
93 environment="PYTHONPATH=%(env)s"
95 """ % {'exe': sys.executable,
97 'env': os.environ.get('PYTHONPATH', '')
100 def get_qseq_condor_header(self):
101 return """Universe=vanilla
103 error=%(log)s/qseq2fastq.$(process).out
104 output=%(log)s/qseq2fastq.$(process).out
105 log=%(log)s/qseq2fastq.log
106 environment="PYTHONPATH=%(env)s"
108 """ % {'exe': sys.executable,
109 'log': self.log_path,
110 'env': os.environ.get('PYTHONPATH','')
113 def get_srf_condor_header(self):
114 return """Universe=vanilla
116 output=%(log)s/srf_pair_fastq.$(process).out
117 error=%(log)s/srf_pair_fastq.$(process).out
118 log=%(log)s/srf_pair_fastq.log
119 environment="PYTHONPATH=%(env)s"
121 """ % {'exe': sys.executable,
122 'log': self.log_path,
123 'env': os.environ.get('PYTHONPATH', '')
126 def find_archive_sequence_files(self, library_result_map):
128 Find archived sequence files associated with our results.
130 LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
135 for lib_id, result_dir in library_result_map:
136 lib_info = self.api.get_library(lib_id)
137 lib_info['lanes'] = {}
138 lib_db[lib_id] = lib_info
140 for lane in lib_info['lane_set']:
141 lane_key = (lane['flowcell'], lane['lane_number'])
142 candidate_lanes[lane_key] = lib_id
143 seq_dirs.add(os.path.join(self.sequences_path,
146 LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
147 candidate_seq_list = scan_for_sequences(seq_dirs)
149 # at this point we have too many sequences as scan_for_sequences
150 # returns all the sequences in a flowcell directory
151 # so lets filter out the extras
153 for seq in candidate_seq_list:
154 lane_key = (seq.flowcell, seq.lane)
155 lib_id = candidate_lanes.get(lane_key, None)
156 if lib_id is not None:
157 lib_info = lib_db[lib_id]
158 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
162 def find_missing_targets(self, library_result_map, lib_db):
164 Check if the sequence file exists.
165 This requires computing what the sequence name is and checking
166 to see if it can be found in the sequence location.
168 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
170 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
171 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
172 # find what targets we're missing
174 for lib_id, result_dir in library_result_map:
176 lane_dict = make_lane_dict(lib_db, lib_id)
178 for lane_key, sequences in lib['lanes'].items():
179 for seq in sequences:
180 seq.paired = lane_dict[seq.flowcell]['paired_end']
181 lane_status = lane_dict[seq.flowcell]['status']
183 if seq.paired and seq.read is None:
185 filename_attributes = {
186 'flowcell': seq.flowcell,
193 if lane_status == 'Failed':
195 if seq.flowcell == '30DY0AAXX':
196 # 30DY0 only ran for 151 bases instead of 152
197 # it is actually 76 1st read, 75 2nd read
202 target_name = fastq_paired_template % \
205 target_name = fastq_single_template % \
208 target_pathname = os.path.join(result_dir, target_name)
209 if self.force or not os.path.exists(target_pathname):
210 t = needed_targets.setdefault(target_pathname, {})
211 t.setdefault(seq.filetype, []).append(seq)
213 return needed_targets
216 def condor_srf_to_fastq(self, sources, target_pathname):
218 raise ValueError("srf to fastq can only handle one file")
220 py = srf2fastq.__file__
221 flowcell = source.flowcell
222 mid = getattr(source, 'mid_point', None)
223 args = [ py, source.path, '--verbose']
225 args.extend(['--left', target_pathname])
226 # this is ugly. I did it because I was pregenerating the target
227 # names before I tried to figure out what sources could generate
228 # those targets, and everything up to this point had been
229 # one-to-one. So I couldn't figure out how to pair the
231 # With this at least the command will run correctly.
232 # however if we rename the default targets, this'll break
233 # also I think it'll generate it twice.
234 args.extend(['--right',
235 target_pathname.replace('_r1.fastq', '_r2.fastq')])
237 args.extend(['--single', target_pathname ])
239 if flowcell is not None:
240 args.extend(['--flowcell', flowcell])
243 args.extend(['-m', str(mid)])
246 args.extend(['--force'])
248 script = """arguments="%s"
250 """ % (" ".join(args),)
255 def condor_qseq_to_fastq(self, sources, target_pathname):
256 flowcell = sources[0].flowcell
257 py = qseq2fastq.__file__
259 args = [py, '-o', target_pathname ]
260 if flowcell is not None:
261 args.extend(['-f', flowcell])
262 if len(sources) == 1:
263 args += (['-i', sources[0].path])
265 for source in sources:
267 script = """arguments="%s"
269 """ % (" ".join(args))
273 def condor_desplit_fastq(self, sources, target_pathname):
274 py = desplit_fastq.__file__
275 args = [py, '-o', target_pathname, ]
277 for source in sources:
278 paths.append(source.path)
281 script = 'arguments="%s"\nqueue\n' % ( ' '.join(args))
284 def make_submit_script(target, header, body_list):
286 write out a text file
288 this was intended for condor submit scripts
291 target (str or stream):
292 if target is a string, we will open and close the file
293 if target is a stream, the caller is responsible.
296 header to write at the beginning of the file
297 body_list (list of strs):
298 a list of blocks to add to the file.
300 if type(target) in types.StringTypes:
305 for entry in body_list:
307 if type(target) in types.StringTypes:
310 def make_lane_dict(lib_db, lib_id):
312 Convert the lane_set in a lib_db to a dictionary
313 indexed by flowcell ID
316 for lane in lib_db[lib_id]['lane_set']:
317 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
318 lane['flowcell'] = flowcell_id
319 result.append((lane['flowcell'], lane))