1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat,pprint
8 from urlparse import urljoin, urlparse
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11 update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
21 from htsworkflow.util.conversion import parse_flowcell_id
23 from django.conf import settings
24 from django.template import Context, loader
28 LOGGER = logging.getLogger(__name__)
31 class CondorFastqExtract(object):
32 def __init__(self, host, sequences_path,
35 """Extract fastqs from results archive
38 host (str): root of the htsworkflow api server
39 apidata (dict): id & key to post to the server
40 sequences_path (str): root of the directory tree to scan for files
41 log_path (str): where to put condor log files
42 force (bool): do we force overwriting current files?
45 self.model = get_model()
46 self.sequences_path = sequences_path
47 self.log_path = log_path
49 LOGGER.info("CondorFastq host={0}".format(self.host))
50 LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
51 LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
53 def create_scripts(self, result_map ):
55 Generate condor scripts to build any needed fastq files
58 result_map: htsworkflow.submission.results.ResultMap()
60 template_map = {'srf': 'srf.condor',
61 'qseq': 'qseq.condor',
62 'split_fastq': 'split_fastq.condor',
63 'by_sample': 'lane_to_fastq.turtle',
67 pythonpath = os.environ.get('PYTHONPATH', None)
68 if pythonpath is not None:
69 env = "PYTHONPATH=%s" % (pythonpath,)
70 condor_entries = self.build_condor_arguments(result_map)
71 for script_type in template_map.keys():
72 template = loader.get_template(template_map[script_type])
73 variables = {'python': sys.executable,
74 'logdir': self.log_path,
76 'args': condor_entries[script_type],
77 'root_url': self.host,
79 context = Context(variables)
81 with open(script_type + '.condor','w+') as outstream:
82 outstream.write(template.render(context))
84 def build_condor_arguments(self, result_map):
85 condor_entries = {'srf': [],
89 conversion_funcs = {'srf': self.condor_srf_to_fastq,
90 'qseq': self.condor_qseq_to_fastq,
91 'split_fastq': self.condor_desplit_fastq
94 sequences = self.find_archive_sequence_files(result_map)
95 needed_targets = self.find_missing_targets(result_map, sequences)
97 for target_pathname, available_sources in needed_targets.items():
98 LOGGER.debug(' target : %s' % (target_pathname,))
99 LOGGER.debug(' candidate sources: %s' % (available_sources,))
100 for condor_type in available_sources.keys():
101 conversion = conversion_funcs.get(condor_type, None)
102 if conversion is None:
103 errmsg = "Unrecognized type: {0} for {1}"
104 print errmsg.format(condor_type,
105 pformat(available_sources))
107 sources = available_sources.get(condor_type, None)
109 if sources is not None:
110 condor_entries.setdefault(condor_type, []).append(
111 conversion(sources, target_pathname))
113 by_sample.setdefault(s.lane_number,[]).append(
116 print " need file", target_pathname
118 condor_entries['by_sample'] = by_sample
119 return condor_entries
121 def find_archive_sequence_files(self, result_map):
123 Find archived sequence files associated with our results.
125 self.import_libraries(result_map)
126 flowcell_ids = self.find_relavant_flowcell_ids()
127 self.import_sequences(flowcell_ids)
130 query = RDF.SPARQLQuery("""
131 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
132 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
133 prefix xsd: <http://www.w3.org/2001/XMLSchema#>
135 select ?filenode ?filetype ?cycle ?lane_number ?read
137 ?flowcell ?flowcell_id ?read_length
138 ?flowcell_type ?flowcell_status
140 ?filenode libns:cycle ?cycle ;
141 libns:lane_number ?lane_number ;
143 libns:flowcell ?flowcell ;
144 libns:flowcell_id ?flowcell_id ;
145 libns:library ?library ;
146 libns:library_id ?library_id ;
149 ?flowcell libns:read_length ?read_length ;
150 libns:flowcell_type ?flowcell_type .
151 OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
152 FILTER(?filetype != libns:raw_file)
156 for r in query.execute(self.model):
157 library_id = fromTypedNode(r['library_id'])
158 if library_id in result_map:
159 results.append(SequenceResult(r))
162 def import_libraries(self, result_map):
163 for lib_id in result_map.keys():
164 lib_id_encoded = lib_id.encode('utf-8')
165 liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
166 library = RDF.Node(RDF.Uri(liburl))
167 self.import_library(library)
169 def import_library(self, library):
170 """Import library data into our model if we don't have it already
172 q = RDF.Statement(library, rdfNS['type'], libraryOntology['library'])
173 if not self.model.contains_statement(q):
174 load_into_model(self.model, 'rdfa', library)
176 def find_relavant_flowcell_ids(self):
177 """Generate set of flowcell ids that had samples of interest on them
179 flowcell_query =RDF.SPARQLQuery("""
180 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
182 select distinct ?library ?flowcell ?flowcell_id
184 ?library a libns:library ;
185 libns:has_lane ?lane .
186 ?lane libns:flowcell ?flowcell .
187 ?flowcell libns:flowcell_id ?flowcell_id .
190 for r in flowcell_query.execute(self.model):
191 flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
192 LOGGER.debug("Flowcells = %s" %(unicode(flowcell_ids)))
193 flowcell_test = RDF.Statement(r['flowcell'],
195 libraryOntology['illumina_flowcell'])
196 if not self.model.contains_statement(flowcell_test):
197 # we probably lack full information about the flowcell.
198 load_into_model(self.model, 'rdfa', r['flowcell'])
201 def import_sequences(self, flowcell_ids):
204 for f in flowcell_ids:
205 seq_dirs.append(os.path.join(self.sequences_path, str(f)))
206 sequences = scan_for_sequences(seq_dirs)
207 for seq in sequences:
208 seq.save_to_model(self.model, self.host)
209 update_model_sequence_library(self.model, self.host)
211 def find_missing_targets(self, result_map, raw_files):
213 Check if the sequence file exists.
214 This requires computing what the sequence name is and checking
215 to see if it can be found in the sequence location.
217 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
219 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
220 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
221 # find what targets we're missing
223 for seq in raw_files:
226 filename_attributes = {
227 'flowcell': seq.flowcell_id,
228 'lib_id': seq.library_id,
229 'lane': seq.lane_number,
235 target_name = fastq_paired_template % \
238 target_name = fastq_single_template % \
241 result_dir = result_map[seq.library_id]
242 target_pathname = os.path.join(result_dir, target_name)
243 if self.force or not os.path.exists(target_pathname):
244 t = needed_targets.setdefault(target_pathname, {})
245 t.setdefault(seq.filetype, []).append(seq)
247 return needed_targets
249 def condor_srf_to_fastq(self, sources, target_pathname):
251 raise ValueError("srf to fastq can only handle one file")
254 if sources[0].flowcell_id == '30DY0AAXX':
258 'sources': [sources[0].path],
259 'pyscript': srf2fastq.__file__,
260 'flowcell': sources[0].flowcell_id,
261 'ispaired': sources[0].ispaired,
262 'target': target_pathname,
263 'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
268 def condor_qseq_to_fastq(self, sources, target_pathname):
270 for source in sources:
271 paths.append(source.path)
274 'pyscript': qseq2fastq.__file__,
275 'flowcell': sources[0].flowcell_id,
276 'target': target_pathname,
278 'ispaired': sources[0].ispaired,
279 'istar': len(sources) == 1,
282 def condor_desplit_fastq(self, sources, target_pathname):
284 for source in sources:
285 paths.append(source.path)
288 'pyscript': desplit_fastq.__file__,
289 'target': target_pathname,
291 'ispaired': sources[0].ispaired,
295 def make_lane_dict(lib_db, lib_id):
297 Convert the lane_set in a lib_db to a dictionary
298 indexed by flowcell ID
301 for lane in lib_db[lib_id]['lane_set']:
302 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
303 lane['flowcell'] = flowcell_id
304 result.append((lane['flowcell'], lane))
307 class SequenceResult(object):
308 """Convert the sparql query result from find_archive_sequence_files
310 def __init__(self, result):
311 self.filenode = result['filenode']
312 self._filetype = result['filetype']
313 self.cycle = fromTypedNode(result['cycle'])
314 self.lane_number = fromTypedNode(result['lane_number'])
315 self.read = fromTypedNode(result['read'])
316 if type(self.read) in types.StringTypes:
318 self.library = result['library']
319 self.library_id = fromTypedNode(result['library_id'])
320 self.flowcell = result['flowcell']
321 self.flowcell_id = fromTypedNode(result['flowcell_id'])
322 self.flowcell_type = fromTypedNode(result['flowcell_type'])
323 self.flowcell_status = fromTypedNode(result['flowcell_status'])
326 """is this sequence / flowcell 'good enough'"""
327 if self.flowcell_status is not None and \
328 self.flowcell_status.lower() == "failed":
331 isgood = property(_is_good)
333 def _get_ispaired(self):
334 if self.flowcell_type.lower() == "paired":
338 ispaired = property(_get_ispaired)
340 def _get_filetype(self):
341 return stripNamespace(libraryOntology, self._filetype)
342 filetype = property(_get_filetype)
345 url = urlparse(str(self.filenode.uri))
346 if url.scheme == 'file':
349 errmsg = u"Unsupported scheme {0} for {1}"
350 raise ValueError(errmsg.format(url.scheme, unicode(url)))
351 path = property(_get_path)