the filename templates were moved from condorfastq to fastqname a while ago
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.submission.fastqname import FastqName
17 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
18      fromTypedNode, \
19      strip_namespace
20 from htsworkflow.util.rdfns import *
21 from htsworkflow.util.conversion import parse_flowcell_id
22
23 from django.conf import settings
24 from django.template import Context, loader
25
26 import RDF
27
28 LOGGER = logging.getLogger(__name__)
29
30
31 class CondorFastqExtract(object):
32     def __init__(self, host, sequences_path,
33                  log_path='log',
34                  model=None,
35                  force=False):
36         """Extract fastqs from results archive
37
38         Args:
39           host (str): root of the htsworkflow api server
40           apidata (dict): id & key to post to the server
41           sequences_path (str): root of the directory tree to scan for files
42           log_path (str): where to put condor log files
43           force (bool): do we force overwriting current files?
44         """
45         self.host = host
46         self.model = get_model(model)
47         self.sequences_path = sequences_path
48         self.log_path = log_path
49         self.force = force
50         LOGGER.info("CondorFastq host={0}".format(self.host))
51         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
52         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
53
54     def create_scripts(self, result_map ):
55         """
56         Generate condor scripts to build any needed fastq files
57
58         Args:
59           result_map: htsworkflow.submission.results.ResultMap()
60         """
61         template_map = {'srf': 'srf.condor',
62                         'qseq': 'qseq.condor',
63                         'split_fastq': 'split_fastq.condor',
64                         }
65
66         env = None
67         pythonpath = os.environ.get('PYTHONPATH', None)
68         if pythonpath is not None:
69             env = "PYTHONPATH=%s" % (pythonpath,)
70         condor_entries = self.build_condor_arguments(result_map)
71         for script_type in template_map.keys():
72             template = loader.get_template(template_map[script_type])
73             variables = {'python': sys.executable,
74                          'logdir': self.log_path,
75                          'env': env,
76                          'args': condor_entries[script_type],
77                          'root_url': self.host,
78                          }
79             context = Context(variables)
80
81             with open(script_type + '.condor','w+') as outstream:
82                 outstream.write(template.render(context))
83
84     def build_condor_arguments(self, result_map):
85         condor_entries = {'srf': [],
86                           'qseq': [],
87                           'split_fastq': []}
88
89         conversion_funcs = {'srf': self.condor_srf_to_fastq,
90                             'qseq': self.condor_qseq_to_fastq,
91                             'split_fastq': self.condor_desplit_fastq
92                             }
93         sequences = self.find_archive_sequence_files(result_map)
94         needed_targets = self.update_fastq_targets(result_map, sequences)
95
96         for target_pathname, available_sources in needed_targets.items():
97             LOGGER.debug(' target : %s' % (target_pathname,))
98             LOGGER.debug(' candidate sources: %s' % (available_sources,))
99             for condor_type in available_sources.keys():
100                 conversion = conversion_funcs.get(condor_type, None)
101                 if conversion is None:
102                     errmsg = "Unrecognized type: {0} for {1}"
103                     LOGGER.error(errmsg.format(condor_type,
104                                         pformat(available_sources)))
105                     continue
106                 sources = available_sources.get(condor_type, None)
107
108                 if sources is not None:
109                     condor_entries.setdefault(condor_type, []).append(
110                         conversion(sources, target_pathname))
111             else:
112                 LOGGER.warn(" need file %s", target_pathname)
113
114         return condor_entries
115
116     def find_archive_sequence_files(self,  result_map):
117         """
118         Find archived sequence files associated with our results.
119         """
120         self.import_libraries(result_map)
121         flowcell_ids = self.find_relevant_flowcell_ids()
122         self.import_sequences(flowcell_ids)
123
124         query_text = """
125         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
126         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
127         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
128
129         select ?filenode ?filetype ?cycle ?lane_number ?read
130                ?library  ?library_id
131                ?flowcell ?flowcell_id ?read_length
132                ?flowcell_type ?flowcell_status
133         where {
134             ?filenode libns:cycle ?cycle ;
135                       libns:lane_number ?lane_number ;
136                       libns:read ?read ;
137                       libns:flowcell ?flowcell ;
138                       libns:flowcell_id ?flowcell_id ;
139                       libns:library ?library ;
140                       libns:library_id ?library_id ;
141                       libns:file_type ?filetype ;
142                       a libns:IlluminaResult .
143             ?flowcell libns:read_length ?read_length ;
144                       libns:flowcell_type ?flowcell_type .
145             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
146             FILTER(?filetype != libns:sequencer_result)
147         }
148         """
149         LOGGER.debug("find_archive_sequence_files query: %s",
150                      query_text)
151         query = RDF.SPARQLQuery(query_text)
152         results = []
153         for r in query.execute(self.model):
154             library_id = fromTypedNode(r['library_id'])
155             if library_id in result_map:
156                 seq = SequenceResult(r)
157                 LOGGER.debug("Creating sequence result for library %s: %s",
158                              library_id,
159                              repr(seq))
160                 results.append(seq)
161         return results
162
163     def import_libraries(self, result_map):
164         for lib_id in result_map.keys():
165             lib_id_encoded = lib_id.encode('utf-8')
166             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
167             library = RDF.Node(RDF.Uri(liburl))
168             self.import_library(library)
169
170     def import_library(self, library):
171         """Import library data into our model if we don't have it already
172         """
173         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
174         present = False
175         if not self.model.contains_statement(q):
176             present = True
177             load_into_model(self.model, 'rdfa', library)
178         LOGGER.debug("Did we import %s: %s", library.uri, present)
179
180     def find_relevant_flowcell_ids(self):
181         """Generate set of flowcell ids that had samples of interest on them
182         """
183         flowcell_query = RDF.SPARQLQuery("""
184 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
185
186 select distinct ?flowcell ?flowcell_id
187 WHERE {
188   ?library a libns:Library ;
189            libns:has_lane ?lane .
190   ?lane libns:flowcell ?flowcell .
191   ?flowcell libns:flowcell_id ?flowcell_id .
192 }""")
193         flowcell_ids = set()
194         for r in flowcell_query.execute(self.model):
195             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
196             imported = False
197             a_lane = self.model.get_target(r['flowcell'],
198                                            libraryOntology['has_lane'])
199             if a_lane is None:
200                 imported = True
201                 # we lack information about which lanes were on this flowcell
202                 load_into_model(self.model, 'rdfa', r['flowcell'])
203             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
204                                                      imported))
205
206         return flowcell_ids
207
208     def import_sequences(self, flowcell_ids):
209         seq_dirs = []
210         for f in flowcell_ids:
211             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
212         sequences = scan_for_sequences(seq_dirs)
213         for seq in sequences:
214             seq.save_to_model(self.model, self.host)
215         update_model_sequence_library(self.model, self.host)
216
217     def update_fastq_targets(self, result_map, raw_files):
218         """Return list of fastq files that need to be built.
219
220         Also update model with link between illumina result files
221         and our target fastq file.
222         """
223         # find what targets we're missing
224         needed_targets = {}
225         for seq in raw_files:
226             if not seq.isgood:
227                 continue
228             filename_attributes = {
229                 'flowcell': seq.flowcell_id,
230                 'lib_id': seq.library_id,
231                 'lane': seq.lane_number,
232                 'read': seq.read,
233                 'cycle': seq.cycle,
234                 'is_paired': seq.ispaired
235             }
236
237             fqName = FastqName(**filename_attributes)
238
239             result_dir = result_map[seq.library_id]
240             target_pathname = os.path.join(result_dir, fqName.filename)
241             if self.force or not os.path.exists(target_pathname):
242                 t = needed_targets.setdefault(target_pathname, {})
243                 t.setdefault(seq.filetype, []).append(seq)
244             self.add_target_source_links(target_pathname, seq)
245         return needed_targets
246
247     def add_target_source_links(self, target, seq):
248         """Add link between target pathname and the 'lane' that produced it
249         (note lane objects are now post demultiplexing.)
250         """
251         target_uri = 'file://' + target.encode('utf-8')
252         target_node = RDF.Node(RDF.Uri(target_uri))
253         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
254         self.model.add_statement(source_stmt)
255
256     def condor_srf_to_fastq(self, sources, target_pathname):
257         if len(sources) > 1:
258             raise ValueError("srf to fastq can only handle one file")
259
260         mid_point = None
261         if sources[0].flowcell_id == '30DY0AAXX':
262             mid_point = 76
263
264         return {
265             'sources': [sources[0].path],
266             'pyscript': srf2fastq.__file__,
267             'flowcell': sources[0].flowcell_id,
268             'ispaired': sources[0].ispaired,
269             'target': target_pathname,
270             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
271             'mid': mid_point,
272             'force': self.force,
273         }
274
275     def condor_qseq_to_fastq(self, sources, target_pathname):
276         paths = []
277         for source in sources:
278             paths.append(source.path)
279         paths.sort()
280         return {
281             'pyscript': qseq2fastq.__file__,
282             'flowcell': sources[0].flowcell_id,
283             'target': target_pathname,
284             'sources': paths,
285             'ispaired': sources[0].ispaired,
286             'istar': len(sources) == 1,
287         }
288
289     def condor_desplit_fastq(self, sources, target_pathname):
290         paths = []
291         for source in sources:
292             paths.append(source.path)
293         paths.sort()
294         return {
295             'pyscript': desplit_fastq.__file__,
296             'target': target_pathname,
297             'sources': paths,
298             'ispaired': sources[0].ispaired,
299         }
300
301
302 def make_lane_dict(lib_db, lib_id):
303     """
304     Convert the lane_set in a lib_db to a dictionary
305     indexed by flowcell ID
306     """
307     result = []
308     for lane in lib_db[lib_id]['lane_set']:
309         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
310         lane['flowcell'] = flowcell_id
311         result.append((lane['flowcell'], lane))
312     return dict(result)
313
314 class SequenceResult(object):
315     """Convert the sparql query result from find_archive_sequence_files
316     """
317     def __init__(self, result):
318         self.filenode = result['filenode']
319         self._filetype = result['filetype']
320         self.cycle = fromTypedNode(result['cycle'])
321         self.lane_number = fromTypedNode(result['lane_number'])
322         self.read = fromTypedNode(result['read'])
323         if type(self.read) in types.StringTypes:
324             self.read = 1
325         self.library = result['library']
326         self.library_id = fromTypedNode(result['library_id'])
327         self.flowcell = result['flowcell']
328         self.flowcell_id = fromTypedNode(result['flowcell_id'])
329         self.flowcell_type = fromTypedNode(result['flowcell_type'])
330         self.flowcell_status = fromTypedNode(result['flowcell_status'])
331
332     def _is_good(self):
333         """is this sequence / flowcell 'good enough'"""
334         if self.flowcell_status is not None and \
335            self.flowcell_status.lower() == "failed":
336             return False
337         return True
338     isgood = property(_is_good)
339
340     def _get_ispaired(self):
341         if self.flowcell_type.lower() == "paired":
342             return True
343         else:
344             return False
345     ispaired = property(_get_ispaired)
346
347     def _get_filetype(self):
348         return strip_namespace(libraryOntology, self._filetype)
349     filetype = property(_get_filetype)
350
351     def _get_path(self):
352         url = urlparse(str(self.filenode.uri))
353         if url.scheme == 'file':
354             return url.path
355         else:
356             errmsg = u"Unsupported scheme {0} for {1}"
357             raise ValueError(errmsg.format(url.scheme, unicode(url)))
358     path = property(_get_path)
359
360     def __repr__(self):
361         return "SequenceResult({0},{1},{2})".format(
362             str(self.filenode),
363             str(self.library_id),
364             str(self.flowcell_id))