e48afd5869b48a9bbc69059aa5ebcad90608312f
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
17      fromTypedNode, \
18      libraryOntology, \
19      stripNamespace, \
20      rdfNS
21 from htsworkflow.util.conversion import parse_flowcell_id
22
23 from django.conf import settings
24 from django.template import Context, loader
25
26 import RDF
27
28 LOGGER = logging.getLogger(__name__)
29
30
31 class CondorFastqExtract(object):
32     def __init__(self, host, sequences_path,
33                  log_path='log',
34                  model=None,
35                  force=False):
36         """Extract fastqs from results archive
37
38         Args:
39           host (str): root of the htsworkflow api server
40           apidata (dict): id & key to post to the server
41           sequences_path (str): root of the directory tree to scan for files
42           log_path (str): where to put condor log files
43           force (bool): do we force overwriting current files?
44         """
45         self.host = host
46         self.model = get_model(model)
47         self.sequences_path = sequences_path
48         self.log_path = log_path
49         self.force = force
50         LOGGER.info("CondorFastq host={0}".format(self.host))
51         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
52         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
53
54     def create_scripts(self, result_map ):
55         """
56         Generate condor scripts to build any needed fastq files
57
58         Args:
59           result_map: htsworkflow.submission.results.ResultMap()
60         """
61         template_map = {'srf': 'srf.condor',
62                         'qseq': 'qseq.condor',
63                         'split_fastq': 'split_fastq.condor',
64                         'by_sample': 'lane_to_fastq.turtle',
65                         }
66
67         env = None
68         pythonpath = os.environ.get('PYTHONPATH', None)
69         if pythonpath is not None:
70             env = "PYTHONPATH=%s" % (pythonpath,)
71         condor_entries = self.build_condor_arguments(result_map)
72         for script_type in template_map.keys():
73             template = loader.get_template(template_map[script_type])
74             variables = {'python': sys.executable,
75                          'logdir': self.log_path,
76                          'env': env,
77                          'args': condor_entries[script_type],
78                          'root_url': self.host,
79                          }
80             context = Context(variables)
81
82             with open(script_type + '.condor','w+') as outstream:
83                 outstream.write(template.render(context))
84
85     def build_condor_arguments(self, result_map):
86         condor_entries = {'srf': [],
87                           'qseq': [],
88                           'split_fastq': []}
89
90         conversion_funcs = {'srf': self.condor_srf_to_fastq,
91                             'qseq': self.condor_qseq_to_fastq,
92                             'split_fastq': self.condor_desplit_fastq
93                             }
94         by_sample = {}
95         sequences = self.find_archive_sequence_files(result_map)
96         needed_targets = self.find_missing_targets(result_map, sequences)
97
98         for target_pathname, available_sources in needed_targets.items():
99             LOGGER.debug(' target : %s' % (target_pathname,))
100             LOGGER.debug(' candidate sources: %s' % (available_sources,))
101             for condor_type in available_sources.keys():
102                 conversion = conversion_funcs.get(condor_type, None)
103                 if conversion is None:
104                     errmsg = "Unrecognized type: {0} for {1}"
105                     print errmsg.format(condor_type,
106                                         pformat(available_sources))
107                     continue
108                 sources = available_sources.get(condor_type, None)
109
110                 if sources is not None:
111                     condor_entries.setdefault(condor_type, []).append(
112                         conversion(sources, target_pathname))
113                     for s in sources:
114                         by_sample.setdefault(s.lane_number,[]).append(
115                             target_pathname)
116             else:
117                 print " need file", target_pathname
118
119         condor_entries['by_sample'] = by_sample
120         return condor_entries
121
122     def find_archive_sequence_files(self,  result_map):
123         """
124         Find archived sequence files associated with our results.
125         """
126         self.import_libraries(result_map)
127         flowcell_ids = self.find_relavant_flowcell_ids()
128         self.import_sequences(flowcell_ids)
129
130
131         query = RDF.SPARQLQuery("""
132         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
133         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
134         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
135
136         select ?filenode ?filetype ?cycle ?lane_number ?read
137                ?library  ?library_id
138                ?flowcell ?flowcell_id ?read_length
139                ?flowcell_type ?flowcell_status
140         where {
141             ?filenode libns:cycle ?cycle ;
142                       libns:lane_number ?lane_number ;
143                       libns:read ?read ;
144                       libns:flowcell ?flowcell ;
145                       libns:flowcell_id ?flowcell_id ;
146                       libns:library ?library ;
147                       libns:library_id ?library_id ;
148                       rdf:type ?filetype ;
149                       a libns:raw_file .
150             ?flowcell libns:read_length ?read_length ;
151                       libns:flowcell_type ?flowcell_type .
152             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
153             FILTER(?filetype != libns:raw_file)
154         }
155         """)
156         results = []
157         for r in query.execute(self.model):
158             library_id = fromTypedNode(r['library_id'])
159             if library_id in result_map:
160                 results.append(SequenceResult(r))
161         return results
162
163     def import_libraries(self, result_map):
164         for lib_id in result_map.keys():
165             lib_id_encoded = lib_id.encode('utf-8')
166             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
167             library = RDF.Node(RDF.Uri(liburl))
168             self.import_library(library)
169
170     def import_library(self, library):
171         """Import library data into our model if we don't have it already
172         """
173         q = RDF.Statement(library, rdfNS['type'], libraryOntology['library'])
174         if not self.model.contains_statement(q):
175             load_into_model(self.model, 'rdfa', library)
176
177     def find_relavant_flowcell_ids(self):
178         """Generate set of flowcell ids that had samples of interest on them
179         """
180         flowcell_query =RDF.SPARQLQuery("""
181 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
182
183 select distinct ?flowcell ?flowcell_id
184 WHERE {
185   ?library a libns:library ;
186            libns:has_lane ?lane .
187   ?lane libns:flowcell ?flowcell .
188   ?flowcell libns:flowcell_id ?flowcell_id .
189 }""")
190         flowcell_ids = set()
191         for r in flowcell_query.execute(self.model):
192             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
193             LOGGER.debug("Flowcells = %s" %(unicode(flowcell_ids)))
194             flowcell_test = RDF.Statement(r['flowcell'],
195                                           rdfNS['type'],
196                                           libraryOntology['illumina_flowcell'])
197             if not self.model.contains_statement(flowcell_test):
198                 # we probably lack full information about the flowcell.
199                 load_into_model(self.model, 'rdfa', r['flowcell'])
200         return flowcell_ids
201
202     def import_sequences(self, flowcell_ids):
203
204         seq_dirs = []
205         for f in flowcell_ids:
206             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
207         sequences = scan_for_sequences(seq_dirs)
208         for seq in sequences:
209             seq.save_to_model(self.model, self.host)
210         update_model_sequence_library(self.model, self.host)
211
212     def find_missing_targets(self, result_map, raw_files):
213         """
214         Check if the sequence file exists.
215         This requires computing what the sequence name is and checking
216         to see if it can be found in the sequence location.
217
218         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
219         """
220         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
221         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
222         # find what targets we're missing
223         needed_targets = {}
224         for seq in raw_files:
225             if not seq.isgood:
226                 continue
227             filename_attributes = {
228                 'flowcell': seq.flowcell_id,
229                 'lib_id': seq.library_id,
230                 'lane': seq.lane_number,
231                 'read': seq.read,
232                 'cycle': seq.cycle
233             }
234
235             if seq.ispaired:
236                 target_name = fastq_paired_template % \
237                               filename_attributes
238             else:
239                 target_name = fastq_single_template % \
240                               filename_attributes
241
242             result_dir = result_map[seq.library_id]
243             target_pathname = os.path.join(result_dir, target_name)
244             if self.force or not os.path.exists(target_pathname):
245                 t = needed_targets.setdefault(target_pathname, {})
246                 t.setdefault(seq.filetype, []).append(seq)
247
248         return needed_targets
249
250     def condor_srf_to_fastq(self, sources, target_pathname):
251         if len(sources) > 1:
252             raise ValueError("srf to fastq can only handle one file")
253
254         mid_point = None
255         if sources[0].flowcell_id == '30DY0AAXX':
256             mid_point = 76
257
258         return {
259             'sources': [sources[0].path],
260             'pyscript': srf2fastq.__file__,
261             'flowcell': sources[0].flowcell_id,
262             'ispaired': sources[0].ispaired,
263             'target': target_pathname,
264             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
265             'mid': mid_point,
266             'force': self.force,
267         }
268
269     def condor_qseq_to_fastq(self, sources, target_pathname):
270         paths = []
271         for source in sources:
272             paths.append(source.path)
273         paths.sort()
274         return {
275             'pyscript': qseq2fastq.__file__,
276             'flowcell': sources[0].flowcell_id,
277             'target': target_pathname,
278             'sources': paths,
279             'ispaired': sources[0].ispaired,
280             'istar': len(sources) == 1,
281         }
282
283     def condor_desplit_fastq(self, sources, target_pathname):
284         paths = []
285         for source in sources:
286             paths.append(source.path)
287         paths.sort()
288         return {
289             'pyscript': desplit_fastq.__file__,
290             'target': target_pathname,
291             'sources': paths,
292             'ispaired': sources[0].ispaired,
293         }
294
295
296 def make_lane_dict(lib_db, lib_id):
297     """
298     Convert the lane_set in a lib_db to a dictionary
299     indexed by flowcell ID
300     """
301     result = []
302     for lane in lib_db[lib_id]['lane_set']:
303         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
304         lane['flowcell'] = flowcell_id
305         result.append((lane['flowcell'], lane))
306     return dict(result)
307
308 class SequenceResult(object):
309     """Convert the sparql query result from find_archive_sequence_files
310     """
311     def __init__(self, result):
312         self.filenode = result['filenode']
313         self._filetype = result['filetype']
314         self.cycle = fromTypedNode(result['cycle'])
315         self.lane_number = fromTypedNode(result['lane_number'])
316         self.read = fromTypedNode(result['read'])
317         if type(self.read) in types.StringTypes:
318             self.read = 1
319         self.library = result['library']
320         self.library_id = fromTypedNode(result['library_id'])
321         self.flowcell = result['flowcell']
322         self.flowcell_id = fromTypedNode(result['flowcell_id'])
323         self.flowcell_type = fromTypedNode(result['flowcell_type'])
324         self.flowcell_status = fromTypedNode(result['flowcell_status'])
325
326     def _is_good(self):
327         """is this sequence / flowcell 'good enough'"""
328         if self.flowcell_status is not None and \
329            self.flowcell_status.lower() == "failed":
330             return False
331         return True
332     isgood = property(_is_good)
333
334     def _get_ispaired(self):
335         if self.flowcell_type.lower() == "paired":
336             return True
337         else:
338             return False
339     ispaired = property(_get_ispaired)
340
341     def _get_filetype(self):
342         return stripNamespace(libraryOntology, self._filetype)
343     filetype = property(_get_filetype)
344
345     def _get_path(self):
346         url = urlparse(str(self.filenode.uri))
347         if url.scheme == 'file':
348             return url.path
349         else:
350             errmsg = u"Unsupported scheme {0} for {1}"
351             raise ValueError(errmsg.format(url.scheme, unicode(url)))
352     path = property(_get_path)