1 """Convert srf and qseq archive files to fastqs
5 from pprint import pformat,pprint
8 from urlparse import urljoin, urlparse
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11 update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
21 from htsworkflow.util.conversion import parse_flowcell_id
23 from django.conf import settings
24 from django.template import Context, loader
28 LOGGER = logging.getLogger(__name__)
31 class CondorFastqExtract(object):
32 def __init__(self, host, sequences_path,
35 """Extract fastqs from results archive
38 host (str): root of the htsworkflow api server
39 apidata (dict): id & key to post to the server
40 sequences_path (str): root of the directory tree to scan for files
41 log_path (str): where to put condor log files
42 force (bool): do we force overwriting current files?
45 self.model = get_model()
46 self.sequences_path = sequences_path
47 self.log_path = log_path
50 def create_scripts(self, result_map ):
52 Generate condor scripts to build any needed fastq files
55 result_map: htsworkflow.submission.results.ResultMap()
57 template_map = {'srf': 'srf.condor',
58 'qseq': 'qseq.condor',
59 'split_fastq': 'split_fastq.condor',
60 'by_sample': 'lane_to_fastq.turtle',
64 pythonpath = os.environ.get('PYTHONPATH', None)
65 if pythonpath is not None:
66 env = "PYTHONPATH=%s" % (pythonpath,)
67 condor_entries = self.build_condor_arguments(result_map)
68 for script_type in template_map.keys():
69 template = loader.get_template(template_map[script_type])
70 variables = {'python': sys.executable,
71 'logdir': self.log_path,
73 'args': condor_entries[script_type],
74 'root_url': self.host,
76 context = Context(variables)
78 with open(script_type + '.condor','w+') as outstream:
79 outstream.write(template.render(context))
81 def build_condor_arguments(self, result_map):
82 condor_entries = {'srf': [],
86 conversion_funcs = {'srf': self.condor_srf_to_fastq,
87 'qseq': self.condor_qseq_to_fastq,
88 'split_fastq': self.condor_desplit_fastq
91 sequences = self.find_archive_sequence_files(result_map)
92 needed_targets = self.find_missing_targets(result_map, sequences)
94 for target_pathname, available_sources in needed_targets.items():
95 LOGGER.debug(' target : %s' % (target_pathname,))
96 LOGGER.debug(' candidate sources: %s' % (available_sources,))
97 for condor_type in available_sources.keys():
98 conversion = conversion_funcs.get(condor_type, None)
99 if conversion is None:
100 errmsg = "Unrecognized type: {0} for {1}"
101 print errmsg.format(condor_type,
102 pformat(available_sources))
104 sources = available_sources.get(condor_type, None)
106 if sources is not None:
107 condor_entries.setdefault(condor_type, []).append(
108 conversion(sources, target_pathname))
110 by_sample.setdefault(s.lane_number,[]).append(
113 print " need file", target_pathname
115 condor_entries['by_sample'] = by_sample
116 return condor_entries
118 def find_archive_sequence_files(self, result_map):
120 Find archived sequence files associated with our results.
122 self.import_libraries(result_map)
123 flowcell_ids = self.find_relavant_flowcell_ids()
124 self.import_sequences(flowcell_ids)
126 query = RDF.SPARQLQuery("""
127 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
128 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
129 prefix xsd: <http://www.w3.org/2001/XMLSchema#>
131 select ?filenode ?filetype ?cycle ?lane_number ?read
133 ?flowcell ?flowcell_id ?read_length
134 ?flowcell_type ?flowcell_status
136 ?filenode libns:cycle ?cycle ;
137 libns:lane_number ?lane_number ;
139 libns:flowcell ?flowcell ;
140 libns:flowcell_id ?flowcell_id ;
141 libns:library ?library ;
142 libns:library_id ?library_id ;
145 ?flowcell libns:read_length ?read_length ;
146 libns:flowcell_type ?flowcell_type .
147 OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
148 FILTER(?filetype != libns:raw_file)
152 for r in query.execute(self.model):
153 library_id = fromTypedNode(r['library_id'])
154 if library_id in result_map:
155 results.append(SequenceResult(r))
158 def import_libraries(self, result_map):
159 for lib_id in result_map.keys():
160 lib_id_encoded = lib_id.encode('utf-8')
161 liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
162 library = RDF.Node(RDF.Uri(liburl))
163 self.import_library(library)
165 def import_library(self, library):
166 """Import library data into our model if we don't have it already
168 q = RDF.Statement(library, rdfNS['type'], libraryOntology['library'])
169 if not self.model.contains_statement(q):
170 load_into_model(self.model, 'rdfa', library)
172 def find_relavant_flowcell_ids(self):
173 """Generate set of flowcell ids that had samples of interest on them
175 flowcell_query =RDF.SPARQLQuery("""
176 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
178 select distinct ?library ?flowcell ?flowcell_id
180 ?library a libns:library ;
181 libns:has_lane ?lane .
182 ?lane libns:flowcell ?flowcell .
183 ?flowcell libns:flowcell_id ?flowcell_id .
186 for r in flowcell_query.execute(self.model):
187 flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
188 LOGGER.debug("Flowcells = %s" %(unicode(flowcell_ids)))
191 def import_sequences(self, flowcell_ids):
192 seq_dirs = ( os.path.join(self.sequences_path, f)
193 for f in flowcell_ids )
194 sequences = scan_for_sequences(seq_dirs)
195 for seq in sequences:
196 seq.save_to_model(self.model)
197 update_model_sequence_library(self.model, self.host)
199 def find_missing_targets(self, result_map, raw_files):
201 Check if the sequence file exists.
202 This requires computing what the sequence name is and checking
203 to see if it can be found in the sequence location.
205 Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
207 fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
208 fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
209 # find what targets we're missing
211 for seq in raw_files:
214 filename_attributes = {
215 'flowcell': seq.flowcell_id,
216 'lib_id': seq.library_id,
217 'lane': seq.lane_number,
223 target_name = fastq_paired_template % \
226 target_name = fastq_single_template % \
229 result_dir = result_map[seq.library_id]
230 target_pathname = os.path.join(result_dir, target_name)
231 if self.force or not os.path.exists(target_pathname):
232 t = needed_targets.setdefault(target_pathname, {})
233 t.setdefault(seq.filetype, []).append(seq)
235 return needed_targets
237 def condor_srf_to_fastq(self, sources, target_pathname):
239 raise ValueError("srf to fastq can only handle one file")
242 if sources[0].flowcell_id == '30DY0AAXX':
246 'sources': [sources[0].path],
247 'pyscript': srf2fastq.__file__,
248 'flowcell': sources[0].flowcell_id,
249 'ispaired': sources[0].ispaired,
250 'target': target_pathname,
251 'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
256 def condor_qseq_to_fastq(self, sources, target_pathname):
258 for source in sources:
259 paths.append(source.path)
262 'pyscript': qseq2fastq.__file__,
263 'flowcell': sources[0].flowcell_id,
264 'target': target_pathname,
266 'ispaired': sources[0].ispaired,
267 'istar': len(sources) == 1,
270 def condor_desplit_fastq(self, sources, target_pathname):
272 for source in sources:
273 paths.append(source.path)
276 'pyscript': desplit_fastq.__file__,
277 'target': target_pathname,
279 'ispaired': sources[0].ispaired,
283 def make_lane_dict(lib_db, lib_id):
285 Convert the lane_set in a lib_db to a dictionary
286 indexed by flowcell ID
289 for lane in lib_db[lib_id]['lane_set']:
290 flowcell_id, status = parse_flowcell_id(lane['flowcell'])
291 lane['flowcell'] = flowcell_id
292 result.append((lane['flowcell'], lane))
295 class SequenceResult(object):
296 """Convert the sparql query result from find_archive_sequence_files
298 def __init__(self, result):
299 self.filenode = result['filenode']
300 self._filetype = result['filetype']
301 self.cycle = fromTypedNode(result['cycle'])
302 self.lane_number = fromTypedNode(result['lane_number'])
303 self.read = fromTypedNode(result['read'])
304 if type(self.read) in types.StringTypes:
306 self.library = result['library']
307 self.library_id = fromTypedNode(result['library_id'])
308 self.flowcell = result['flowcell']
309 self.flowcell_id = fromTypedNode(result['flowcell_id'])
310 self.flowcell_type = fromTypedNode(result['flowcell_type'])
311 self.flowcell_status = fromTypedNode(result['flowcell_status'])
314 """is this sequence / flowcell 'good enough'"""
315 if self.flowcell_status is not None and \
316 self.flowcell_status.lower() == "failed":
319 isgood = property(_is_good)
321 def _get_ispaired(self):
322 if self.flowcell_type.lower() == "paired":
326 ispaired = property(_get_ispaired)
328 def _get_filetype(self):
329 return stripNamespace(libraryOntology, self._filetype)
330 filetype = property(_get_filetype)
333 url = urlparse(str(self.filenode.uri))
334 if url.scheme == 'file':
337 errmsg = u"Unsupported scheme {0} for {1}"
338 raise ValueError(errmsg.format(url.scheme, unicode(url)))
339 path = property(_get_path)