17e463351282b7b8091f922760e8b1fd69a2c934
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.submission.fastqname import FastqName
17 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
18      fromTypedNode, \
19      stripNamespace
20 from htsworkflow.util.rdfns import *
21 from htsworkflow.util.conversion import parse_flowcell_id
22
23 from django.conf import settings
24 from django.template import Context, loader
25
26 import RDF
27
28 LOGGER = logging.getLogger(__name__)
29
30
31 class CondorFastqExtract(object):
32     def __init__(self, host, sequences_path,
33                  log_path='log',
34                  model=None,
35                  force=False):
36         """Extract fastqs from results archive
37
38         Args:
39           host (str): root of the htsworkflow api server
40           apidata (dict): id & key to post to the server
41           sequences_path (str): root of the directory tree to scan for files
42           log_path (str): where to put condor log files
43           force (bool): do we force overwriting current files?
44         """
45         self.host = host
46         self.model = get_model(model)
47         self.sequences_path = sequences_path
48         self.log_path = log_path
49         self.force = force
50         LOGGER.info("CondorFastq host={0}".format(self.host))
51         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
52         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
53
54     def create_scripts(self, result_map ):
55         """
56         Generate condor scripts to build any needed fastq files
57
58         Args:
59           result_map: htsworkflow.submission.results.ResultMap()
60         """
61         template_map = {'srf': 'srf.condor',
62                         'qseq': 'qseq.condor',
63                         'split_fastq': 'split_fastq.condor',
64                         }
65
66         env = None
67         pythonpath = os.environ.get('PYTHONPATH', None)
68         if pythonpath is not None:
69             env = "PYTHONPATH=%s" % (pythonpath,)
70         condor_entries = self.build_condor_arguments(result_map)
71         for script_type in template_map.keys():
72             template = loader.get_template(template_map[script_type])
73             variables = {'python': sys.executable,
74                          'logdir': self.log_path,
75                          'env': env,
76                          'args': condor_entries[script_type],
77                          'root_url': self.host,
78                          }
79             context = Context(variables)
80
81             with open(script_type + '.condor','w+') as outstream:
82                 outstream.write(template.render(context))
83
84     def build_condor_arguments(self, result_map):
85         condor_entries = {'srf': [],
86                           'qseq': [],
87                           'split_fastq': []}
88
89         conversion_funcs = {'srf': self.condor_srf_to_fastq,
90                             'qseq': self.condor_qseq_to_fastq,
91                             'split_fastq': self.condor_desplit_fastq
92                             }
93         sequences = self.find_archive_sequence_files(result_map)
94         needed_targets = self.update_fastq_targets(result_map, sequences)
95
96         for target_pathname, available_sources in needed_targets.items():
97             LOGGER.debug(' target : %s' % (target_pathname,))
98             LOGGER.debug(' candidate sources: %s' % (available_sources,))
99             for condor_type in available_sources.keys():
100                 conversion = conversion_funcs.get(condor_type, None)
101                 if conversion is None:
102                     errmsg = "Unrecognized type: {0} for {1}"
103                     LOGGER.error(errmsg.format(condor_type,
104                                         pformat(available_sources)))
105                     continue
106                 sources = available_sources.get(condor_type, None)
107
108                 if sources is not None:
109                     condor_entries.setdefault(condor_type, []).append(
110                         conversion(sources, target_pathname))
111             else:
112                 LOGGER.warn(" need file %s", target_pathname)
113
114         return condor_entries
115
116     def find_archive_sequence_files(self,  result_map):
117         """
118         Find archived sequence files associated with our results.
119         """
120         self.import_libraries(result_map)
121         flowcell_ids = self.find_relevant_flowcell_ids()
122         self.import_sequences(flowcell_ids)
123
124         query_text = """
125         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
126         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
127         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
128
129         select ?filenode ?filetype ?cycle ?lane_number ?read
130                ?library  ?library_id
131                ?flowcell ?flowcell_id ?read_length
132                ?flowcell_type ?flowcell_status
133         where {
134             ?filenode libns:cycle ?cycle ;
135                       libns:lane_number ?lane_number ;
136                       libns:read ?read ;
137                       libns:flowcell ?flowcell ;
138                       libns:flowcell_id ?flowcell_id ;
139                       libns:library ?library ;
140                       libns:library_id ?library_id ;
141                       libns:file_type ?filetype ;
142                       a libns:IlluminaResult .
143             ?flowcell libns:read_length ?read_length ;
144                       libns:flowcell_type ?flowcell_type .
145             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
146             FILTER(?filetype != libns:sequencer_result)
147         }
148         """
149         LOGGER.debug("find_archive_sequence_files query: %s",
150                      query_text)
151         query = RDF.SPARQLQuery(query_text)
152         results = []
153         for r in query.execute(self.model):
154             library_id = fromTypedNode(r['library_id'])
155             if library_id in result_map:
156                 seq = SequenceResult(r)
157                 LOGGER.debug("Creating sequence result for library %s: %s",
158                              library_id,
159                              repr(seq))
160                 results.append(seq)
161         return results
162
163     def import_libraries(self, result_map):
164         for lib_id in result_map.keys():
165             lib_id_encoded = lib_id.encode('utf-8')
166             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
167             library = RDF.Node(RDF.Uri(liburl))
168             self.import_library(library)
169
170     def import_library(self, library):
171         """Import library data into our model if we don't have it already
172         """
173         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
174         present = False
175         if not self.model.contains_statement(q):
176             present = True
177             load_into_model(self.model, 'rdfa', library)
178         LOGGER.debug("Did we import %s: %s", library.uri, present)
179
180     def find_relevant_flowcell_ids(self):
181         """Generate set of flowcell ids that had samples of interest on them
182         """
183         flowcell_query = RDF.SPARQLQuery("""
184 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
185
186 select distinct ?flowcell ?flowcell_id
187 WHERE {
188   ?library a libns:Library ;
189            libns:has_lane ?lane .
190   ?lane libns:flowcell ?flowcell .
191   ?flowcell libns:flowcell_id ?flowcell_id .
192 }""")
193         flowcell_ids = set()
194         for r in flowcell_query.execute(self.model):
195             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
196             imported = False
197             a_lane = self.model.get_target(r['flowcell'],
198                                            libraryOntology['has_lane'])
199             if a_lane is None:
200                 imported = True
201                 # we lack information about which lanes were on this flowcell
202                 load_into_model(self.model, 'rdfa', r['flowcell'])
203             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
204                                                      imported))
205
206         return flowcell_ids
207
208     def import_sequences(self, flowcell_ids):
209         seq_dirs = []
210         for f in flowcell_ids:
211             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
212         sequences = scan_for_sequences(seq_dirs)
213         for seq in sequences:
214             seq.save_to_model(self.model, self.host)
215         update_model_sequence_library(self.model, self.host)
216
217     def update_fastq_targets(self, result_map, raw_files):
218         """Return list of fastq files that need to be built.
219
220         Also update model with link between illumina result files
221         and our target fastq file.
222         """
223         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
224         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
225         # find what targets we're missing
226         needed_targets = {}
227         for seq in raw_files:
228             if not seq.isgood:
229                 continue
230             filename_attributes = {
231                 'flowcell': seq.flowcell_id,
232                 'lib_id': seq.library_id,
233                 'lane': seq.lane_number,
234                 'read': seq.read,
235                 'cycle': seq.cycle,
236                 'is_paired': seq.ispaired
237             }
238
239             fqName = FastqName(**filename_attributes)
240
241             result_dir = result_map[seq.library_id]
242             target_pathname = os.path.join(result_dir, fqName.filename)
243             if self.force or not os.path.exists(target_pathname):
244                 t = needed_targets.setdefault(target_pathname, {})
245                 t.setdefault(seq.filetype, []).append(seq)
246             self.add_target_source_links(target_pathname, seq)
247         return needed_targets
248
249     def add_target_source_links(self, target, seq):
250         """Add link between target pathname and the 'lane' that produced it
251         (note lane objects are now post demultiplexing.)
252         """
253         target_uri = 'file://' + target.encode('utf-8')
254         target_node = RDF.Node(RDF.Uri(target_uri))
255         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
256         self.model.add_statement(source_stmt)
257
258     def condor_srf_to_fastq(self, sources, target_pathname):
259         if len(sources) > 1:
260             raise ValueError("srf to fastq can only handle one file")
261
262         mid_point = None
263         if sources[0].flowcell_id == '30DY0AAXX':
264             mid_point = 76
265
266         return {
267             'sources': [sources[0].path],
268             'pyscript': srf2fastq.__file__,
269             'flowcell': sources[0].flowcell_id,
270             'ispaired': sources[0].ispaired,
271             'target': target_pathname,
272             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
273             'mid': mid_point,
274             'force': self.force,
275         }
276
277     def condor_qseq_to_fastq(self, sources, target_pathname):
278         paths = []
279         for source in sources:
280             paths.append(source.path)
281         paths.sort()
282         return {
283             'pyscript': qseq2fastq.__file__,
284             'flowcell': sources[0].flowcell_id,
285             'target': target_pathname,
286             'sources': paths,
287             'ispaired': sources[0].ispaired,
288             'istar': len(sources) == 1,
289         }
290
291     def condor_desplit_fastq(self, sources, target_pathname):
292         paths = []
293         for source in sources:
294             paths.append(source.path)
295         paths.sort()
296         return {
297             'pyscript': desplit_fastq.__file__,
298             'target': target_pathname,
299             'sources': paths,
300             'ispaired': sources[0].ispaired,
301         }
302
303
304 def make_lane_dict(lib_db, lib_id):
305     """
306     Convert the lane_set in a lib_db to a dictionary
307     indexed by flowcell ID
308     """
309     result = []
310     for lane in lib_db[lib_id]['lane_set']:
311         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
312         lane['flowcell'] = flowcell_id
313         result.append((lane['flowcell'], lane))
314     return dict(result)
315
316 class SequenceResult(object):
317     """Convert the sparql query result from find_archive_sequence_files
318     """
319     def __init__(self, result):
320         self.filenode = result['filenode']
321         self._filetype = result['filetype']
322         self.cycle = fromTypedNode(result['cycle'])
323         self.lane_number = fromTypedNode(result['lane_number'])
324         self.read = fromTypedNode(result['read'])
325         if type(self.read) in types.StringTypes:
326             self.read = 1
327         self.library = result['library']
328         self.library_id = fromTypedNode(result['library_id'])
329         self.flowcell = result['flowcell']
330         self.flowcell_id = fromTypedNode(result['flowcell_id'])
331         self.flowcell_type = fromTypedNode(result['flowcell_type'])
332         self.flowcell_status = fromTypedNode(result['flowcell_status'])
333
334     def _is_good(self):
335         """is this sequence / flowcell 'good enough'"""
336         if self.flowcell_status is not None and \
337            self.flowcell_status.lower() == "failed":
338             return False
339         return True
340     isgood = property(_is_good)
341
342     def _get_ispaired(self):
343         if self.flowcell_type.lower() == "paired":
344             return True
345         else:
346             return False
347     ispaired = property(_get_ispaired)
348
349     def _get_filetype(self):
350         return stripNamespace(libraryOntology, self._filetype)
351     filetype = property(_get_filetype)
352
353     def _get_path(self):
354         url = urlparse(str(self.filenode.uri))
355         if url.scheme == 'file':
356             return url.path
357         else:
358             errmsg = u"Unsupported scheme {0} for {1}"
359             raise ValueError(errmsg.format(url.scheme, unicode(url)))
360     path = property(_get_path)
361
362     def __repr__(self):
363         return "SequenceResult({0},{1},{2})".format(
364             str(self.filenode),
365             str(self.library_id),
366             str(self.flowcell_id))