1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines.samplekey import SampleKey
11 from htsworkflow.pipelines import qseq2fastq
12 from htsworkflow.pipelines import srf2fastq
13 from htsworkflow.pipelines import desplit_fastq
14 from htsworkflow.util.api import HtswApi
15 from htsworkflow.util.conversion import parse_flowcell_id
17 from django.conf import settings
18 from django.template import Context, loader
20 LOGGER = logging.getLogger(__name__)
23 class CondorFastqExtract(object):
24 def __init__(self, host, apidata, sequences_path,
27 """Extract fastqs from results archive
30 host (str): root of the htsworkflow api server
31 apidata (dict): id & key to post to the server
32 sequences_path (str): root of the directory tree to scan for files
33 log_path (str): where to put condor log files
34 force (bool): do we force overwriting current files?
36 self.api = HtswApi(host, apidata)
37 self.sequences_path = sequences_path
38 self.log_path = log_path
41 def create_scripts(self, result_map ):
43 Generate condor scripts to build any needed fastq files
46 result_map: htsworkflow.submission.results.ResultMap()
48 template_map = {'srf': 'srf.condor',
49 'qseq': 'qseq.condor',
50 'split_fastq': 'split_fastq.condor',
51 'by_sample': 'lane_to_fastq.turtle',
54 condor_entries = self.build_condor_arguments(result_map)
55 for script_type in template_map.keys():
56 template = loader.get_template(template_map[script_type])
57 variables = {'python': sys.executable,
58 'logdir': self.log_path,
59 'env': os.environ.get('PYTHONPATH', None),
60 'args': condor_entries[script_type],
61 'root_url': self.api.root_url,
63 context = Context(variables)
65 with open(script_type + '.condor','w+') as outstream:
66 outstream.write(template.render(context))
68 def build_condor_arguments(self, result_map):
69 condor_entries = {'srf': [],
73 conversion_funcs = {'srf': self.condor_srf_to_fastq,
74 'qseq': self.condor_qseq_to_fastq,
75 'split_fastq': self.condor_desplit_fastq
78 lib_db = self.find_archive_sequence_files(result_map)
79 needed_targets = self.find_missing_targets(result_map, lib_db)
81 for target_pathname, available_sources in needed_targets.items():
82 LOGGER.debug(' target : %s' % (target_pathname,))
83 LOGGER.debug(' candidate sources: %s' % (available_sources,))
84 for condor_type in available_sources.keys():
85 conversion = conversion_funcs.get(condor_type, None)
86 if conversion is None:
87 errmsg = "Unrecognized type: {0} for {1}"
88 print errmsg.format(condor_type,
89 pformat(available_sources))
91 sources = available_sources.get(condor_type, None)
93 if sources is not None:
94 condor_entries.setdefault(condor_type, []).append(
95 conversion(sources, target_pathname))
97 by_sample.setdefault(s.lane_id,[]).append(
100 print " need file", target_pathname
102 condor_entries['by_sample'] = by_sample
103 return condor_entries
105 def find_archive_sequence_files(self, result_map):
107 Find archived sequence files associated with our results.
109 LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
114 for lib_id in result_map.keys():
115 lib_info = self.api.get_library(lib_id)
116 lib_info['lanes'] = {}
117 lib_db[lib_id] = lib_info
119 for lane in lib_info['lane_set']:
120 lane_key = (lane['flowcell'], lane['lane_number'])
121 candidate_lanes[lane_key] = (lib_id, lane['lane_id'])
122 seq_dirs.add(os.path.join(self.sequences_path,
125 LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
126 candidate_seq_list = scan_for_sequences(seq_dirs)
128 # at this point we have too many sequences as scan_for_sequences
129 # returns all the sequences in a flowcell directory
130 # so lets filter out the extras
132 for seq in candidate_seq_list:
133 lane_key = (seq.flowcell, seq.lane)
134 candidate_key = candidate_lanes.get(lane_key, None)
135 if candidate_key is not None:
136 lib_id, lane_id = candidate_key
137 seq.lane_id = lane_id
138 lib_info = lib_db[lib_id]
139 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
143 def find_missing_targets(self, result_map, lib_db):
145 Check if the sequence file exists.
146 This requires computing what the sequence name is and checking
147 to see if it can be found in the sequence location.
149 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
151 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
152 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
153 # find what targets we're missing
155 for lib_id in result_map.keys():
156 result_dir = result_map[lib_id]
158 lane_dict = make_lane_dict(lib_db, lib_id)
160 for lane_key, sequences in lib['lanes'].items():
161 for seq in sequences:
162 seq.paired = lane_dict[seq.flowcell]['paired_end']
163 lane_status = lane_dict[seq.flowcell]['status']
165 if seq.paired and seq.read is None:
167 filename_attributes = {
168 'flowcell': seq.flowcell,
175 if lane_status == 'Failed':
177 if seq.flowcell == '30DY0AAXX':
178 # 30DY0 only ran for 151 bases instead of 152
179 # it is actually 76 1st read, 75 2nd read
184 target_name = fastq_paired_template % \
187 target_name = fastq_single_template % \
190 target_pathname = os.path.join(result_dir, target_name)
191 if self.force or not os.path.exists(target_pathname):
192 t = needed_targets.setdefault(target_pathname, {})
193 t.setdefault(seq.filetype, []).append(seq)
195 return needed_targets
198 def condor_srf_to_fastq(self, sources, target_pathname):
200 raise ValueError("srf to fastq can only handle one file")
203 'sources': [os.path.abspath(sources[0].path)],
204 'pyscript': srf2fastq.__file__,
205 'flowcell': sources[0].flowcell,
206 'ispaired': sources[0].paired,
207 'target': target_pathname,
208 'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
209 'mid': getattr(sources[0], 'mid_point', None),
213 def condor_qseq_to_fastq(self, sources, target_pathname):
215 for source in sources:
216 paths.append(source.path)
219 'pyscript': qseq2fastq.__file__,
220 'flowcell': sources[0].flowcell,
221 'target': target_pathname,
223 'ispaired': sources[0].paired,
224 'istar': len(sources) == 1,
227 def condor_desplit_fastq(self, sources, target_pathname):
229 for source in sources:
230 paths.append(source.path)
233 'pyscript': desplit_fastq.__file__,
234 'target': target_pathname,
236 'ispaired': sources[0].paired,
239 def lane_rdf(self, sources, target_pathname):
242 def make_lane_dict(lib_db, lib_id):
244 Convert the lane_set in a lib_db to a dictionary
245 indexed by flowcell ID
248 for lane in lib_db[lib_id]['lane_set']:
249 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
250 lane['flowcell'] = flowcell_id
251 result.append((lane['flowcell'], lane))