1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat,pprint
8 from urlparse import urljoin, urlparse
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11 update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
21 from htsworkflow.util.conversion import parse_flowcell_id
23 from django.conf import settings
24 from django.template import Context, loader
28 LOGGER = logging.getLogger(__name__)
31 class CondorFastqExtract(object):
32 def __init__(self, host, sequences_path,
36 """Extract fastqs from results archive
39 host (str): root of the htsworkflow api server
40 apidata (dict): id & key to post to the server
41 sequences_path (str): root of the directory tree to scan for files
42 log_path (str): where to put condor log files
43 force (bool): do we force overwriting current files?
46 self.model = get_model(model)
47 self.sequences_path = sequences_path
48 self.log_path = log_path
50 LOGGER.info("CondorFastq host={0}".format(self.host))
51 LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
52 LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
54 def create_scripts(self, result_map ):
56 Generate condor scripts to build any needed fastq files
59 result_map: htsworkflow.submission.results.ResultMap()
61 template_map = {'srf': 'srf.condor',
62 'qseq': 'qseq.condor',
63 'split_fastq': 'split_fastq.condor',
64 'by_sample': 'lane_to_fastq.turtle',
68 pythonpath = os.environ.get('PYTHONPATH', None)
69 if pythonpath is not None:
70 env = "PYTHONPATH=%s" % (pythonpath,)
71 condor_entries = self.build_condor_arguments(result_map)
72 for script_type in template_map.keys():
73 template = loader.get_template(template_map[script_type])
74 variables = {'python': sys.executable,
75 'logdir': self.log_path,
77 'args': condor_entries[script_type],
78 'root_url': self.host,
80 context = Context(variables)
82 with open(script_type + '.condor','w+') as outstream:
83 outstream.write(template.render(context))
85 def build_condor_arguments(self, result_map):
86 condor_entries = {'srf': [],
90 conversion_funcs = {'srf': self.condor_srf_to_fastq,
91 'qseq': self.condor_qseq_to_fastq,
92 'split_fastq': self.condor_desplit_fastq
95 sequences = self.find_archive_sequence_files(result_map)
96 needed_targets = self.find_missing_targets(result_map, sequences)
98 for target_pathname, available_sources in needed_targets.items():
99 LOGGER.debug(' target : %s' % (target_pathname,))
100 LOGGER.debug(' candidate sources: %s' % (available_sources,))
101 for condor_type in available_sources.keys():
102 conversion = conversion_funcs.get(condor_type, None)
103 if conversion is None:
104 errmsg = "Unrecognized type: {0} for {1}"
105 print errmsg.format(condor_type,
106 pformat(available_sources))
108 sources = available_sources.get(condor_type, None)
110 if sources is not None:
111 condor_entries.setdefault(condor_type, []).append(
112 conversion(sources, target_pathname))
114 by_sample.setdefault(s.lane_number,[]).append(
117 print " need file", target_pathname
119 condor_entries['by_sample'] = by_sample
120 return condor_entries
122 def find_archive_sequence_files(self, result_map):
124 Find archived sequence files associated with our results.
126 self.import_libraries(result_map)
127 flowcell_ids = self.find_relavant_flowcell_ids()
128 self.import_sequences(flowcell_ids)
131 query = RDF.SPARQLQuery("""
132 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
133 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
134 prefix xsd: <http://www.w3.org/2001/XMLSchema#>
136 select ?filenode ?filetype ?cycle ?lane_number ?read
138 ?flowcell ?flowcell_id ?read_length
139 ?flowcell_type ?flowcell_status
141 ?filenode libns:cycle ?cycle ;
142 libns:lane_number ?lane_number ;
144 libns:flowcell ?flowcell ;
145 libns:flowcell_id ?flowcell_id ;
146 libns:library ?library ;
147 libns:library_id ?library_id ;
150 ?flowcell libns:read_length ?read_length ;
151 libns:flowcell_type ?flowcell_type .
152 OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
153 FILTER(?filetype != libns:raw_file)
157 for r in query.execute(self.model):
158 library_id = fromTypedNode(r['library_id'])
159 if library_id in result_map:
160 results.append(SequenceResult(r))
163 def import_libraries(self, result_map):
164 for lib_id in result_map.keys():
165 lib_id_encoded = lib_id.encode('utf-8')
166 liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
167 library = RDF.Node(RDF.Uri(liburl))
168 self.import_library(library)
170 def import_library(self, library):
171 """Import library data into our model if we don't have it already
173 q = RDF.Statement(library, rdfNS['type'], libraryOntology['library'])
174 if not self.model.contains_statement(q):
175 load_into_model(self.model, 'rdfa', library)
177 def find_relavant_flowcell_ids(self):
178 """Generate set of flowcell ids that had samples of interest on them
180 flowcell_query =RDF.SPARQLQuery("""
181 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
183 select distinct ?flowcell ?flowcell_id
185 ?library a libns:library ;
186 libns:has_lane ?lane .
187 ?lane libns:flowcell ?flowcell .
188 ?flowcell libns:flowcell_id ?flowcell_id .
191 for r in flowcell_query.execute(self.model):
192 flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
193 LOGGER.debug("Flowcells = %s" %(unicode(flowcell_ids)))
194 flowcell_test = RDF.Statement(r['flowcell'],
196 libraryOntology['illumina_flowcell'])
197 if not self.model.contains_statement(flowcell_test):
198 # we probably lack full information about the flowcell.
199 load_into_model(self.model, 'rdfa', r['flowcell'])
202 def import_sequences(self, flowcell_ids):
205 for f in flowcell_ids:
206 seq_dirs.append(os.path.join(self.sequences_path, str(f)))
207 sequences = scan_for_sequences(seq_dirs)
208 for seq in sequences:
209 seq.save_to_model(self.model, self.host)
210 update_model_sequence_library(self.model, self.host)
212 def find_missing_targets(self, result_map, raw_files):
214 Check if the sequence file exists.
215 This requires computing what the sequence name is and checking
216 to see if it can be found in the sequence location.
218 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
220 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
221 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
222 # find what targets we're missing
224 for seq in raw_files:
227 filename_attributes = {
228 'flowcell': seq.flowcell_id,
229 'lib_id': seq.library_id,
230 'lane': seq.lane_number,
236 target_name = fastq_paired_template % \
239 target_name = fastq_single_template % \
242 result_dir = result_map[seq.library_id]
243 target_pathname = os.path.join(result_dir, target_name)
244 if self.force or not os.path.exists(target_pathname):
245 t = needed_targets.setdefault(target_pathname, {})
246 t.setdefault(seq.filetype, []).append(seq)
248 return needed_targets
250 def condor_srf_to_fastq(self, sources, target_pathname):
252 raise ValueError("srf to fastq can only handle one file")
255 if sources[0].flowcell_id == '30DY0AAXX':
259 'sources': [sources[0].path],
260 'pyscript': srf2fastq.__file__,
261 'flowcell': sources[0].flowcell_id,
262 'ispaired': sources[0].ispaired,
263 'target': target_pathname,
264 'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
269 def condor_qseq_to_fastq(self, sources, target_pathname):
271 for source in sources:
272 paths.append(source.path)
275 'pyscript': qseq2fastq.__file__,
276 'flowcell': sources[0].flowcell_id,
277 'target': target_pathname,
279 'ispaired': sources[0].ispaired,
280 'istar': len(sources) == 1,
283 def condor_desplit_fastq(self, sources, target_pathname):
285 for source in sources:
286 paths.append(source.path)
289 'pyscript': desplit_fastq.__file__,
290 'target': target_pathname,
292 'ispaired': sources[0].ispaired,
296 def make_lane_dict(lib_db, lib_id):
298 Convert the lane_set in a lib_db to a dictionary
299 indexed by flowcell ID
302 for lane in lib_db[lib_id]['lane_set']:
303 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
304 lane['flowcell'] = flowcell_id
305 result.append((lane['flowcell'], lane))
308 class SequenceResult(object):
309 """Convert the sparql query result from find_archive_sequence_files
311 def __init__(self, result):
312 self.filenode = result['filenode']
313 self._filetype = result['filetype']
314 self.cycle = fromTypedNode(result['cycle'])
315 self.lane_number = fromTypedNode(result['lane_number'])
316 self.read = fromTypedNode(result['read'])
317 if type(self.read) in types.StringTypes:
319 self.library = result['library']
320 self.library_id = fromTypedNode(result['library_id'])
321 self.flowcell = result['flowcell']
322 self.flowcell_id = fromTypedNode(result['flowcell_id'])
323 self.flowcell_type = fromTypedNode(result['flowcell_type'])
324 self.flowcell_status = fromTypedNode(result['flowcell_status'])
327 """is this sequence / flowcell 'good enough'"""
328 if self.flowcell_status is not None and \
329 self.flowcell_status.lower() == "failed":
332 isgood = property(_is_good)
334 def _get_ispaired(self):
335 if self.flowcell_type.lower() == "paired":
339 ispaired = property(_get_ispaired)
341 def _get_filetype(self):
342 return stripNamespace(libraryOntology, self._filetype)
343 filetype = property(_get_filetype)
346 url = urlparse(str(self.filenode.uri))
347 if url.scheme == 'file':
350 errmsg = u"Unsupported scheme {0} for {1}"
351 raise ValueError(errmsg.format(url.scheme, unicode(url)))
352 path = property(_get_path)