1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.pipelines import desplit_fastq
13 from htsworkflow.util.api import HtswApi
14 from htsworkflow.util.conversion import parse_flowcell_id
16 from django.conf import settings
17 from django.template import Context, loader
19 LOGGER = logging.getLogger(__name__)
21 class CondorFastqExtract(object):
22 def __init__(self, host, apidata, sequences_path,
25 """Extract fastqs from results archive
28 host (str): root of the htsworkflow api server
29 apidata (dict): id & key to post to the server
30 sequences_path (str): root of the directory tree to scan for files
31 log_path (str): where to put condor log files
32 force (bool): do we force overwriting current files?
34 self.api = HtswApi(host, apidata)
35 self.sequences_path = sequences_path
36 self.log_path = log_path
39 def create_scripts(self, library_result_map ):
41 Generate condor scripts to build any needed fastq files
44 library_result_map (list): [(library_id, destination directory), ...]
46 template_map = {'srf': 'srf.condor',
47 'qseq': 'qseq.condor',
48 'split_fastq': 'split_fastq.condor'}
50 condor_entries = self.build_condor_arguments(library_result_map)
51 for script_type in template_map.keys():
52 template = loader.get_template(template_map[script_type])
53 variables = {'python': sys.executable,
54 'logdir': self.log_path,
55 'env': os.environ.get('PYTHONPATH', None),
56 'args': condor_entries[script_type],
59 context = Context(variables)
61 with open(script_type + '.condor','w+') as outstream:
62 outstream.write(template.render(context))
64 def build_condor_arguments(self, library_result_map):
65 condor_entries = {'srf': [],
68 conversion_funcs = {'srf': self.condor_srf_to_fastq,
69 'qseq': self.condor_qseq_to_fastq,
70 'split_fastq': self.condor_desplit_fastq
72 lib_db = self.find_archive_sequence_files(library_result_map)
74 needed_targets = self.find_missing_targets(library_result_map, lib_db)
76 for target_pathname, available_sources in needed_targets.items():
77 LOGGER.debug(' target : %s' % (target_pathname,))
78 LOGGER.debug(' candidate sources: %s' % (available_sources,))
79 for condor_type in available_sources.keys():
80 conversion = conversion_funcs.get(condor_type, None)
81 if conversion is None:
82 errmsg = "Unrecognized type: {0} for {1}"
83 print errmsg.format(condor_type,
84 pformat(available_sources))
86 sources = available_sources.get(condor_type, None)
88 if sources is not None:
89 condor_entries.setdefault(condor_type, []).append(
90 conversion(sources, target_pathname))
92 print " need file", target_pathname
96 def find_archive_sequence_files(self, library_result_map):
98 Find archived sequence files associated with our results.
100 LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
105 for lib_id, result_dir in library_result_map:
106 lib_info = self.api.get_library(lib_id)
107 lib_info['lanes'] = {}
108 lib_db[lib_id] = lib_info
110 for lane in lib_info['lane_set']:
111 lane_key = (lane['flowcell'], lane['lane_number'])
112 candidate_lanes[lane_key] = lib_id
113 seq_dirs.add(os.path.join(self.sequences_path,
116 LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
117 candidate_seq_list = scan_for_sequences(seq_dirs)
119 # at this point we have too many sequences as scan_for_sequences
120 # returns all the sequences in a flowcell directory
121 # so lets filter out the extras
123 for seq in candidate_seq_list:
124 lane_key = (seq.flowcell, seq.lane)
125 lib_id = candidate_lanes.get(lane_key, None)
126 if lib_id is not None:
127 lib_info = lib_db[lib_id]
128 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
132 def find_missing_targets(self, library_result_map, lib_db):
134 Check if the sequence file exists.
135 This requires computing what the sequence name is and checking
136 to see if it can be found in the sequence location.
138 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
140 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
141 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
142 # find what targets we're missing
144 for lib_id, result_dir in library_result_map:
146 lane_dict = make_lane_dict(lib_db, lib_id)
148 for lane_key, sequences in lib['lanes'].items():
149 for seq in sequences:
150 seq.paired = lane_dict[seq.flowcell]['paired_end']
151 lane_status = lane_dict[seq.flowcell]['status']
153 if seq.paired and seq.read is None:
155 filename_attributes = {
156 'flowcell': seq.flowcell,
163 if lane_status == 'Failed':
165 if seq.flowcell == '30DY0AAXX':
166 # 30DY0 only ran for 151 bases instead of 152
167 # it is actually 76 1st read, 75 2nd read
172 target_name = fastq_paired_template % \
175 target_name = fastq_single_template % \
178 target_pathname = os.path.join(result_dir, target_name)
179 if self.force or not os.path.exists(target_pathname):
180 t = needed_targets.setdefault(target_pathname, {})
181 t.setdefault(seq.filetype, []).append(seq)
183 return needed_targets
186 def condor_srf_to_fastq(self, sources, target_pathname):
188 raise ValueError("srf to fastq can only handle one file")
191 'sources': [sources[0].path],
192 'pyscript': srf2fastq.__file__,
193 'flowcell': sources[0].flowcell,
194 'ispaired': sources[0].paired,
195 'target': target_pathname,
196 'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
197 'mid': getattr(sources[0], 'mid_point', None),
201 def condor_qseq_to_fastq(self, sources, target_pathname):
203 for source in sources:
204 paths.append(source.path)
207 'pyscript': qseq2fastq.__file__,
208 'flowcell': sources[0].flowcell,
209 'target': target_pathname,
211 'ispaired': sources[0].paired,
212 'istar': len(sources) == 1,
215 def condor_desplit_fastq(self, sources, target_pathname):
217 for source in sources:
218 paths.append(source.path)
221 'pyscript': desplit_fastq.__file__,
222 'target': target_pathname,
224 'ispaired': sources[0].paired,
227 def make_lane_dict(lib_db, lib_id):
229 Convert the lane_set in a lib_db to a dictionary
230 indexed by flowcell ID
233 for lane in lib_db[lib_id]['lane_set']:
234 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
235 lane['flowcell'] = flowcell_id
236 result.append((lane['flowcell'], lane))