1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat
9 from htsworkflow.pipelines.sequences import scan_for_sequences
10 from htsworkflow.pipelines.samplekey import SampleKey
11 from htsworkflow.pipelines import qseq2fastq
12 from htsworkflow.pipelines import srf2fastq
13 from htsworkflow.pipelines import desplit_fastq
14 from htsworkflow.util.api import HtswApi
15 from htsworkflow.util.conversion import parse_flowcell_id
17 from django.conf import settings
18 from django.template import Context, loader
20 LOGGER = logging.getLogger(__name__)
23 class CondorFastqExtract(object):
24 def __init__(self, host, apidata, sequences_path,
27 """Extract fastqs from results archive
30 host (str): root of the htsworkflow api server
31 apidata (dict): id & key to post to the server
32 sequences_path (str): root of the directory tree to scan for files
33 log_path (str): where to put condor log files
34 force (bool): do we force overwriting current files?
36 self.api = HtswApi(host, apidata)
37 self.sequences_path = sequences_path
38 self.log_path = log_path
41 def create_scripts(self, result_map ):
43 Generate condor scripts to build any needed fastq files
46 result_map: htsworkflow.submission.results.ResultMap()
48 template_map = {'srf': 'srf.condor',
49 'qseq': 'qseq.condor',
50 'split_fastq': 'split_fastq.condor',
51 'by_sample': 'lane_to_fastq.turtle',
55 pythonpath = os.environ.get('PYTHONPATH', None)
56 if pythonpath is not None:
57 env = "PYTHONPATH=%s" % (pythonpath,)
58 condor_entries = self.build_condor_arguments(result_map)
59 for script_type in template_map.keys():
60 template = loader.get_template(template_map[script_type])
61 variables = {'python': sys.executable,
62 'logdir': self.log_path,
64 'args': condor_entries[script_type],
65 'root_url': self.api.root_url,
67 context = Context(variables)
69 with open(script_type + '.condor','w+') as outstream:
70 outstream.write(template.render(context))
72 def build_condor_arguments(self, result_map):
73 condor_entries = {'srf': [],
77 conversion_funcs = {'srf': self.condor_srf_to_fastq,
78 'qseq': self.condor_qseq_to_fastq,
79 'split_fastq': self.condor_desplit_fastq
82 lib_db = self.find_archive_sequence_files(result_map)
83 needed_targets = self.find_missing_targets(result_map, lib_db)
85 for target_pathname, available_sources in needed_targets.items():
86 LOGGER.debug(' target : %s' % (target_pathname,))
87 LOGGER.debug(' candidate sources: %s' % (available_sources,))
88 for condor_type in available_sources.keys():
89 conversion = conversion_funcs.get(condor_type, None)
90 if conversion is None:
91 errmsg = "Unrecognized type: {0} for {1}"
92 print errmsg.format(condor_type,
93 pformat(available_sources))
95 sources = available_sources.get(condor_type, None)
97 if sources is not None:
98 condor_entries.setdefault(condor_type, []).append(
99 conversion(sources, target_pathname))
101 by_sample.setdefault(s.lane_id,[]).append(
104 print " need file", target_pathname
106 condor_entries['by_sample'] = by_sample
107 return condor_entries
109 def find_archive_sequence_files(self, result_map):
111 Find archived sequence files associated with our results.
113 LOGGER.debug("Searching for sequence files in: %s" %(self.sequences_path,))
118 for lib_id in result_map.keys():
119 lib_info = self.api.get_library(lib_id)
120 lib_info['lanes'] = {}
121 lib_db[lib_id] = lib_info
123 for lane in lib_info['lane_set']:
124 lane_key = (lane['flowcell'], lane['lane_number'])
125 candidate_lanes[lane_key] = (lib_id, lane['lane_id'])
126 seq_dirs.add(os.path.join(self.sequences_path,
129 LOGGER.debug("Seq_dirs = %s" %(unicode(seq_dirs)))
130 candidate_seq_list = scan_for_sequences(seq_dirs)
132 # at this point we have too many sequences as scan_for_sequences
133 # returns all the sequences in a flowcell directory
134 # so lets filter out the extras
136 for seq in candidate_seq_list:
137 lane_key = (seq.flowcell, seq.lane)
138 candidate_key = candidate_lanes.get(lane_key, None)
139 if candidate_key is not None:
140 lib_id, lane_id = candidate_key
141 seq.lane_id = lane_id
142 lib_info = lib_db[lib_id]
143 lib_info['lanes'].setdefault(lane_key, set()).add(seq)
147 def find_missing_targets(self, result_map, lib_db):
149 Check if the sequence file exists.
150 This requires computing what the sequence name is and checking
151 to see if it can be found in the sequence location.
153 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
155 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
156 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
157 # find what targets we're missing
159 for lib_id in result_map.keys():
160 result_dir = result_map[lib_id]
162 lane_dict = make_lane_dict(lib_db, lib_id)
164 for lane_key, sequences in lib['lanes'].items():
165 for seq in sequences:
166 seq.paired = lane_dict[seq.flowcell]['paired_end']
167 lane_status = lane_dict[seq.flowcell]['status']
169 if seq.paired and seq.read is None:
171 filename_attributes = {
172 'flowcell': seq.flowcell,
179 if lane_status == 'Failed':
181 if seq.flowcell == '30DY0AAXX':
182 # 30DY0 only ran for 151 bases instead of 152
183 # it is actually 76 1st read, 75 2nd read
188 target_name = fastq_paired_template % \
191 target_name = fastq_single_template % \
194 target_pathname = os.path.join(result_dir, target_name)
195 if self.force or not os.path.exists(target_pathname):
196 t = needed_targets.setdefault(target_pathname, {})
197 t.setdefault(seq.filetype, []).append(seq)
199 return needed_targets
202 def condor_srf_to_fastq(self, sources, target_pathname):
204 raise ValueError("srf to fastq can only handle one file")
207 'sources': [os.path.abspath(sources[0].path)],
208 'pyscript': srf2fastq.__file__,
209 'flowcell': sources[0].flowcell,
210 'ispaired': sources[0].paired,
211 'target': target_pathname,
212 'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
213 'mid': getattr(sources[0], 'mid_point', None),
217 def condor_qseq_to_fastq(self, sources, target_pathname):
219 for source in sources:
220 paths.append(source.path)
223 'pyscript': qseq2fastq.__file__,
224 'flowcell': sources[0].flowcell,
225 'target': target_pathname,
227 'ispaired': sources[0].paired,
228 'istar': len(sources) == 1,
231 def condor_desplit_fastq(self, sources, target_pathname):
233 for source in sources:
234 paths.append(source.path)
237 'pyscript': desplit_fastq.__file__,
238 'target': target_pathname,
240 'ispaired': sources[0].paired,
243 def lane_rdf(self, sources, target_pathname):
246 def make_lane_dict(lib_db, lib_id):
248 Convert the lane_set in a lib_db to a dictionary
249 indexed by flowcell ID
252 for lane in lib_db[lib_id]['lane_set']:
253 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
254 lane['flowcell'] = flowcell_id
255 result.append((lane['flowcell'], lane))