01fe6c5a19274869b22e41821011135ee8ccb6ae
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
17      fromTypedNode, \
18      stripNamespace
19 from htsworkflow.util.rdfns import *
20 from htsworkflow.util.conversion import parse_flowcell_id
21
22 from django.conf import settings
23 from django.template import Context, loader
24
25 import RDF
26
27 LOGGER = logging.getLogger(__name__)
28
29
30 class CondorFastqExtract(object):
31     def __init__(self, host, sequences_path,
32                  log_path='log',
33                  model=None,
34                  force=False):
35         """Extract fastqs from results archive
36
37         Args:
38           host (str): root of the htsworkflow api server
39           apidata (dict): id & key to post to the server
40           sequences_path (str): root of the directory tree to scan for files
41           log_path (str): where to put condor log files
42           force (bool): do we force overwriting current files?
43         """
44         self.host = host
45         self.model = get_model(model)
46         self.sequences_path = sequences_path
47         self.log_path = log_path
48         self.force = force
49         LOGGER.info("CondorFastq host={0}".format(self.host))
50         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
51         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
52
53     def create_scripts(self, result_map ):
54         """
55         Generate condor scripts to build any needed fastq files
56
57         Args:
58           result_map: htsworkflow.submission.results.ResultMap()
59         """
60         template_map = {'srf': 'srf.condor',
61                         'qseq': 'qseq.condor',
62                         'split_fastq': 'split_fastq.condor',
63                         }
64
65         env = None
66         pythonpath = os.environ.get('PYTHONPATH', None)
67         if pythonpath is not None:
68             env = "PYTHONPATH=%s" % (pythonpath,)
69         condor_entries = self.build_condor_arguments(result_map)
70         for script_type in template_map.keys():
71             template = loader.get_template(template_map[script_type])
72             variables = {'python': sys.executable,
73                          'logdir': self.log_path,
74                          'env': env,
75                          'args': condor_entries[script_type],
76                          'root_url': self.host,
77                          }
78             context = Context(variables)
79
80             with open(script_type + '.condor','w+') as outstream:
81                 outstream.write(template.render(context))
82
83     def build_condor_arguments(self, result_map):
84         condor_entries = {'srf': [],
85                           'qseq': [],
86                           'split_fastq': []}
87
88         conversion_funcs = {'srf': self.condor_srf_to_fastq,
89                             'qseq': self.condor_qseq_to_fastq,
90                             'split_fastq': self.condor_desplit_fastq
91                             }
92         sequences = self.find_archive_sequence_files(result_map)
93         needed_targets = self.update_fastq_targets(result_map, sequences)
94
95         for target_pathname, available_sources in needed_targets.items():
96             LOGGER.debug(' target : %s' % (target_pathname,))
97             LOGGER.debug(' candidate sources: %s' % (available_sources,))
98             for condor_type in available_sources.keys():
99                 conversion = conversion_funcs.get(condor_type, None)
100                 if conversion is None:
101                     errmsg = "Unrecognized type: {0} for {1}"
102                     print errmsg.format(condor_type,
103                                         pformat(available_sources))
104                     continue
105                 sources = available_sources.get(condor_type, None)
106
107                 if sources is not None:
108                     condor_entries.setdefault(condor_type, []).append(
109                         conversion(sources, target_pathname))
110             else:
111                 print " need file", target_pathname
112
113         return condor_entries
114
115     def find_archive_sequence_files(self,  result_map):
116         """
117         Find archived sequence files associated with our results.
118         """
119         self.import_libraries(result_map)
120         flowcell_ids = self.find_relevant_flowcell_ids()
121         self.import_sequences(flowcell_ids)
122
123         query_text = """
124         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
125         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
126         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
127
128         select ?filenode ?filetype ?cycle ?lane_number ?read
129                ?library  ?library_id
130                ?flowcell ?flowcell_id ?read_length
131                ?flowcell_type ?flowcell_status
132         where {
133             ?filenode libns:cycle ?cycle ;
134                       libns:lane_number ?lane_number ;
135                       libns:read ?read ;
136                       libns:flowcell ?flowcell ;
137                       libns:flowcell_id ?flowcell_id ;
138                       libns:library ?library ;
139                       libns:library_id ?library_id ;
140                       libns:file_type ?filetype ;
141                       a libns:IlluminaResult .
142             ?flowcell libns:read_length ?read_length ;
143                       libns:flowcell_type ?flowcell_type .
144             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
145             FILTER(?filetype != libns:sequencer_result)
146         }
147         """
148         LOGGER.debug("find_archive_sequence_files query: %s",
149                      query_text)
150         query = RDF.SPARQLQuery(query_text)
151         results = []
152         for r in query.execute(self.model):
153             library_id = fromTypedNode(r['library_id'])
154             if library_id in result_map:
155                 seq = SequenceResult(r)
156                 LOGGER.debug("Creating sequence result for library %s: %s",
157                              library_id,
158                              repr(seq))
159                 results.append(seq)
160         return results
161
162     def import_libraries(self, result_map):
163         for lib_id in result_map.keys():
164             lib_id_encoded = lib_id.encode('utf-8')
165             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
166             library = RDF.Node(RDF.Uri(liburl))
167             self.import_library(library)
168
169     def import_library(self, library):
170         """Import library data into our model if we don't have it already
171         """
172         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
173         present = False
174         if not self.model.contains_statement(q):
175             present = True
176             load_into_model(self.model, 'rdfa', library)
177         LOGGER.debug("Did we import %s: %s", library.uri, present)
178
179     def find_relevant_flowcell_ids(self):
180         """Generate set of flowcell ids that had samples of interest on them
181         """
182         flowcell_query = RDF.SPARQLQuery("""
183 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
184
185 select distinct ?flowcell ?flowcell_id
186 WHERE {
187   ?library a libns:Library ;
188            libns:has_lane ?lane .
189   ?lane libns:flowcell ?flowcell .
190   ?flowcell libns:flowcell_id ?flowcell_id .
191 }""")
192         flowcell_ids = set()
193         for r in flowcell_query.execute(self.model):
194             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
195             imported = False
196             a_lane = self.model.get_target(r['flowcell'],
197                                            libraryOntology['has_lane'])
198             print a_lane
199             if a_lane is None:
200                 imported = True
201                 # we lack information about which lanes were on this flowcell
202                 load_into_model(self.model, 'rdfa', r['flowcell'])
203             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
204                                                      imported))
205
206         return flowcell_ids
207
208     def import_sequences(self, flowcell_ids):
209         seq_dirs = []
210         for f in flowcell_ids:
211             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
212         sequences = scan_for_sequences(seq_dirs)
213         for seq in sequences:
214             seq.save_to_model(self.model, self.host)
215         update_model_sequence_library(self.model, self.host)
216
217     def update_fastq_targets(self, result_map, raw_files):
218         """Return list of fastq files that need to be built.
219
220         Also update model with link between illumina result files
221         and our target fastq file.
222         """
223         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
224         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
225         # find what targets we're missing
226         needed_targets = {}
227         for seq in raw_files:
228             if not seq.isgood:
229                 continue
230             filename_attributes = {
231                 'flowcell': seq.flowcell_id,
232                 'lib_id': seq.library_id,
233                 'lane': seq.lane_number,
234                 'read': seq.read,
235                 'cycle': seq.cycle
236             }
237
238             if seq.ispaired:
239                 target_name = fastq_paired_template % \
240                               filename_attributes
241             else:
242                 target_name = fastq_single_template % \
243                               filename_attributes
244
245             result_dir = result_map[seq.library_id]
246             target_pathname = os.path.join(result_dir, target_name)
247             if self.force or not os.path.exists(target_pathname):
248                 t = needed_targets.setdefault(target_pathname, {})
249                 t.setdefault(seq.filetype, []).append(seq)
250             self.add_target_source_links(target_pathname, seq)
251         return needed_targets
252
253     def add_target_source_links(self, target, seq):
254         """Add link between target pathname and the 'lane' that produced it
255         (note lane objects are now post demultiplexing.)
256         """
257         target_uri = 'file://' + target.encode('utf-8')
258         target_node = RDF.Node(RDF.Uri(target_uri))
259         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
260         self.model.add_statement(source_stmt)
261
262     def condor_srf_to_fastq(self, sources, target_pathname):
263         if len(sources) > 1:
264             raise ValueError("srf to fastq can only handle one file")
265
266         mid_point = None
267         if sources[0].flowcell_id == '30DY0AAXX':
268             mid_point = 76
269
270         return {
271             'sources': [sources[0].path],
272             'pyscript': srf2fastq.__file__,
273             'flowcell': sources[0].flowcell_id,
274             'ispaired': sources[0].ispaired,
275             'target': target_pathname,
276             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
277             'mid': mid_point,
278             'force': self.force,
279         }
280
281     def condor_qseq_to_fastq(self, sources, target_pathname):
282         paths = []
283         for source in sources:
284             paths.append(source.path)
285         paths.sort()
286         return {
287             'pyscript': qseq2fastq.__file__,
288             'flowcell': sources[0].flowcell_id,
289             'target': target_pathname,
290             'sources': paths,
291             'ispaired': sources[0].ispaired,
292             'istar': len(sources) == 1,
293         }
294
295     def condor_desplit_fastq(self, sources, target_pathname):
296         paths = []
297         for source in sources:
298             paths.append(source.path)
299         paths.sort()
300         return {
301             'pyscript': desplit_fastq.__file__,
302             'target': target_pathname,
303             'sources': paths,
304             'ispaired': sources[0].ispaired,
305         }
306
307
308 def make_lane_dict(lib_db, lib_id):
309     """
310     Convert the lane_set in a lib_db to a dictionary
311     indexed by flowcell ID
312     """
313     result = []
314     for lane in lib_db[lib_id]['lane_set']:
315         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
316         lane['flowcell'] = flowcell_id
317         result.append((lane['flowcell'], lane))
318     return dict(result)
319
320 class SequenceResult(object):
321     """Convert the sparql query result from find_archive_sequence_files
322     """
323     def __init__(self, result):
324         self.filenode = result['filenode']
325         self._filetype = result['filetype']
326         self.cycle = fromTypedNode(result['cycle'])
327         self.lane_number = fromTypedNode(result['lane_number'])
328         self.read = fromTypedNode(result['read'])
329         if type(self.read) in types.StringTypes:
330             self.read = 1
331         self.library = result['library']
332         self.library_id = fromTypedNode(result['library_id'])
333         self.flowcell = result['flowcell']
334         self.flowcell_id = fromTypedNode(result['flowcell_id'])
335         self.flowcell_type = fromTypedNode(result['flowcell_type'])
336         self.flowcell_status = fromTypedNode(result['flowcell_status'])
337
338     def _is_good(self):
339         """is this sequence / flowcell 'good enough'"""
340         if self.flowcell_status is not None and \
341            self.flowcell_status.lower() == "failed":
342             return False
343         return True
344     isgood = property(_is_good)
345
346     def _get_ispaired(self):
347         if self.flowcell_type.lower() == "paired":
348             return True
349         else:
350             return False
351     ispaired = property(_get_ispaired)
352
353     def _get_filetype(self):
354         return stripNamespace(libraryOntology, self._filetype)
355     filetype = property(_get_filetype)
356
357     def _get_path(self):
358         url = urlparse(str(self.filenode.uri))
359         if url.scheme == 'file':
360             return url.path
361         else:
362             errmsg = u"Unsupported scheme {0} for {1}"
363             raise ValueError(errmsg.format(url.scheme, unicode(url)))
364     path = property(_get_path)
365
366     def __repr__(self):
367         return "SequenceResult({0},{1},{2})".format(
368             str(self.filenode),
369             str(self.library_id),
370             str(self.flowcell_id))