1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines import qseq2fastq
11 from htsworkflow.pipelines import srf2fastq
12 from htsworkflow.pipelines import desplit_fastq
13 from htsworkflow.util.api import HtswApi
14 from htsworkflow.util.conversion import parse_flowcell_id
16 from django.conf import settings
17 from django.template import Context, loader
19 LOGGER = logging.getLogger(__name__)
21 class CondorFastqExtract(object):
22 def __init__(self, host, apidata, sequences_path,
25 """Extract fastqs from results archive
28 host (str): root of the htsworkflow api server
29 apidata (dict): id & key to post to the server
30 sequences_path (str): root of the directory tree to scan for files
31 log_path (str): where to put condor log files
32 force (bool): do we force overwriting current files?
34 self.api = HtswApi(host, apidata)
35 self.sequences_path = sequences_path
36 self.log_path = log_path
39 def create_scripts(self, result_map ):
41 Generate condor scripts to build any needed fastq files
44 result_map: htsworkflow.submission.results.ResultMap()
46 template_map = {'srf': 'srf.condor',
47 'qseq': 'qseq.condor',
48 'split_fastq': 'split_fastq.condor'}
50 condor_entries = self.build_condor_arguments(result_map)
51 for script_type in template_map.keys():
52 template = loader.get_template(template_map[script_type])
53 variables = {'python': sys.executable,
54 'logdir': self.log_path,
55 'env': os.environ.get('PYTHONPATH', None),
56 'args': condor_entries[script_type],
59 context = Context(variables)
61 with open(script_type + '.condor','w+') as outstream:
62 outstream.write(template.render(context))
64 def build_condor_arguments(self, result_map):
65 condor_entries = {'srf': [],
68 conversion_funcs = {'srf': self.condor_srf_to_fastq,
69 'qseq': self.condor_qseq_to_fastq,
70 'split_fastq': self.condor_desplit_fastq
73 lib_db = self.find_archive_sequence_files(result_map)
74 needed_targets = self.find_missing_targets(result_map, lib_db)
76 for target_pathname, available_sources in needed_targets.items():
77 LOGGER.debug(' target : %s' % (target_pathname,))
78 LOGGER.debug(' candidate sources: %s' % (available_sources,))
79 for condor_type in available_sources.keys():
80 conversion = conversion_funcs.get(condor_type, None)
81 if conversion is None:
82 errmsg = "Unrecognized type: {0} for {1}"
83 print errmsg.format(condor_type,
84 pformat(available_sources))
86 sources = available_sources.get(condor_type, None)
88 if sources is not None:
89 condor_entries.setdefault(condor_type, []).append(
90 conversion(sources, target_pathname))
92 print " need file", target_pathname
96 def find_archive_sequence_files(self, result_map):
98 Find archived sequence files associated with our results.
100 LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
105 for lib_id in result_map.keys():
106 lib_info = self.api.get_library(lib_id)
107 lib_info['lanes'] = {}
108 lib_db[lib_id] = lib_info
110 for lane in lib_info['lane_set']:
111 lane_key = (lane['flowcell'], lane['lane_number'])
112 candidate_lanes[lane_key] = lib_id
113 seq_dirs.add(os.path.join(self.sequences_path,
116 LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
117 candidate_seq_list = scan_for_sequences(seq_dirs)
119 # at this point we have too many sequences as scan_for_sequences
120 # returns all the sequences in a flowcell directory
121 # so lets filter out the extras
123 for seq in candidate_seq_list:
124 lane_key = (seq.flowcell, seq.lane)
125 lib_id = candidate_lanes.get(lane_key, None)
126 if lib_id is not None:
127 lib_info = lib_db[lib_id]
128 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
132 def find_missing_targets(self, result_map, lib_db):
134 Check if the sequence file exists.
135 This requires computing what the sequence name is and checking
136 to see if it can be found in the sequence location.
138 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
140 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
141 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
142 # find what targets we're missing
144 for lib_id in result_map.keys():
145 result_dir = result_map[lib_id]
147 lane_dict = make_lane_dict(lib_db, lib_id)
149 for lane_key, sequences in lib['lanes'].items():
150 for seq in sequences:
151 seq.paired = lane_dict[seq.flowcell]['paired_end']
152 lane_status = lane_dict[seq.flowcell]['status']
154 if seq.paired and seq.read is None:
156 filename_attributes = {
157 'flowcell': seq.flowcell,
164 if lane_status == 'Failed':
166 if seq.flowcell == '30DY0AAXX':
167 # 30DY0 only ran for 151 bases instead of 152
168 # it is actually 76 1st read, 75 2nd read
173 target_name = fastq_paired_template % \
176 target_name = fastq_single_template % \
179 target_pathname = os.path.join(result_dir, target_name)
180 if self.force or not os.path.exists(target_pathname):
181 t = needed_targets.setdefault(target_pathname, {})
182 t.setdefault(seq.filetype, []).append(seq)
184 return needed_targets
187 def condor_srf_to_fastq(self, sources, target_pathname):
189 raise ValueError("srf to fastq can only handle one file")
192 'sources': [os.path.abspath(sources[0].path)],
193 'pyscript': srf2fastq.__file__,
194 'flowcell': sources[0].flowcell,
195 'ispaired': sources[0].paired,
196 'target': target_pathname,
197 'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
198 'mid': getattr(sources[0], 'mid_point', None),
202 def condor_qseq_to_fastq(self, sources, target_pathname):
204 for source in sources:
205 paths.append(source.path)
208 'pyscript': qseq2fastq.__file__,
209 'flowcell': sources[0].flowcell,
210 'target': target_pathname,
212 'ispaired': sources[0].paired,
213 'istar': len(sources) == 1,
216 def condor_desplit_fastq(self, sources, target_pathname):
218 for source in sources:
219 paths.append(source.path)
222 'pyscript': desplit_fastq.__file__,
223 'target': target_pathname,
225 'ispaired': sources[0].paired,
228 def make_lane_dict(lib_db, lib_id):
230 Convert the lane_set in a lib_db to a dictionary
231 indexed by flowcell ID
234 for lane in lib_db[lib_id]['lane_set']:
235 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
236 lane['flowcell'] = flowcell_id
237 result.append((lane['flowcell'], lane))