aab906b8e2212916bbc716dccc76b2225d97d3eb
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
17      fromTypedNode, \
18      libraryOntology, \
19      stripNamespace, \
20      rdfNS
21 from htsworkflow.util.conversion import parse_flowcell_id
22
23 from django.conf import settings
24 from django.template import Context, loader
25
26 import RDF
27
28 LOGGER = logging.getLogger(__name__)
29
30
31 class CondorFastqExtract(object):
32     def __init__(self, host, sequences_path,
33                  log_path='log',
34                  force=False):
35         """Extract fastqs from results archive
36
37         Args:
38           host (str): root of the htsworkflow api server
39           apidata (dict): id & key to post to the server
40           sequences_path (str): root of the directory tree to scan for files
41           log_path (str): where to put condor log files
42           force (bool): do we force overwriting current files?
43         """
44         self.host = host
45         self.model = get_model()
46         self.sequences_path = sequences_path
47         self.log_path = log_path
48         self.force = force
49         LOGGER.info("CondorFastq host={0}".format(self.host))
50         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
51         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
52
53     def create_scripts(self, result_map ):
54         """
55         Generate condor scripts to build any needed fastq files
56
57         Args:
58           result_map: htsworkflow.submission.results.ResultMap()
59         """
60         template_map = {'srf': 'srf.condor',
61                         'qseq': 'qseq.condor',
62                         'split_fastq': 'split_fastq.condor',
63                         'by_sample': 'lane_to_fastq.turtle',
64                         }
65
66         env = None
67         pythonpath = os.environ.get('PYTHONPATH', None)
68         if pythonpath is not None:
69             env = "PYTHONPATH=%s" % (pythonpath,)
70         condor_entries = self.build_condor_arguments(result_map)
71         for script_type in template_map.keys():
72             template = loader.get_template(template_map[script_type])
73             variables = {'python': sys.executable,
74                          'logdir': self.log_path,
75                          'env': env,
76                          'args': condor_entries[script_type],
77                          'root_url': self.host,
78                          }
79             context = Context(variables)
80
81             with open(script_type + '.condor','w+') as outstream:
82                 outstream.write(template.render(context))
83
84     def build_condor_arguments(self, result_map):
85         condor_entries = {'srf': [],
86                           'qseq': [],
87                           'split_fastq': []}
88
89         conversion_funcs = {'srf': self.condor_srf_to_fastq,
90                             'qseq': self.condor_qseq_to_fastq,
91                             'split_fastq': self.condor_desplit_fastq
92                             }
93         by_sample = {}
94         sequences = self.find_archive_sequence_files(result_map)
95         needed_targets = self.find_missing_targets(result_map, sequences)
96
97         for target_pathname, available_sources in needed_targets.items():
98             LOGGER.debug(' target : %s' % (target_pathname,))
99             LOGGER.debug(' candidate sources: %s' % (available_sources,))
100             for condor_type in available_sources.keys():
101                 conversion = conversion_funcs.get(condor_type, None)
102                 if conversion is None:
103                     errmsg = "Unrecognized type: {0} for {1}"
104                     print errmsg.format(condor_type,
105                                         pformat(available_sources))
106                     continue
107                 sources = available_sources.get(condor_type, None)
108
109                 if sources is not None:
110                     condor_entries.setdefault(condor_type, []).append(
111                         conversion(sources, target_pathname))
112                     for s in sources:
113                         by_sample.setdefault(s.lane_number,[]).append(
114                             target_pathname)
115             else:
116                 print " need file", target_pathname
117
118         condor_entries['by_sample'] = by_sample
119         return condor_entries
120
121     def find_archive_sequence_files(self,  result_map):
122         """
123         Find archived sequence files associated with our results.
124         """
125         self.import_libraries(result_map)
126         flowcell_ids = self.find_relavant_flowcell_ids()
127         self.import_sequences(flowcell_ids)
128
129
130         query = RDF.SPARQLQuery("""
131         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
132         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
133         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
134
135         select ?filenode ?filetype ?cycle ?lane_number ?read
136                ?library  ?library_id
137                ?flowcell ?flowcell_id ?read_length
138                ?flowcell_type ?flowcell_status
139         where {
140             ?filenode libns:cycle ?cycle ;
141                       libns:lane_number ?lane_number ;
142                       libns:read ?read ;
143                       libns:flowcell ?flowcell ;
144                       libns:flowcell_id ?flowcell_id ;
145                       libns:library ?library ;
146                       libns:library_id ?library_id ;
147                       rdf:type ?filetype ;
148                       a libns:raw_file .
149             ?flowcell libns:read_length ?read_length ;
150                       libns:flowcell_type ?flowcell_type .
151             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
152             FILTER(?filetype != libns:raw_file)
153         }
154         """)
155         results = []
156         for r in query.execute(self.model):
157             library_id = fromTypedNode(r['library_id'])
158             if library_id in result_map:
159                 results.append(SequenceResult(r))
160         return results
161
162     def import_libraries(self, result_map):
163         for lib_id in result_map.keys():
164             lib_id_encoded = lib_id.encode('utf-8')
165             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
166             library = RDF.Node(RDF.Uri(liburl))
167             self.import_library(library)
168
169     def import_library(self, library):
170         """Import library data into our model if we don't have it already
171         """
172         q = RDF.Statement(library, rdfNS['type'], libraryOntology['library'])
173         if not self.model.contains_statement(q):
174             load_into_model(self.model, 'rdfa', library)
175
176     def find_relavant_flowcell_ids(self):
177         """Generate set of flowcell ids that had samples of interest on them
178         """
179         flowcell_query =RDF.SPARQLQuery("""
180 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
181
182 select distinct ?library ?flowcell ?flowcell_id
183 WHERE {
184   ?library a libns:library ;
185            libns:has_lane ?lane .
186   ?lane libns:flowcell ?flowcell .
187   ?flowcell libns:flowcell_id ?flowcell_id .
188 }""")
189         flowcell_ids = set()
190         for r in flowcell_query.execute(self.model):
191             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
192             LOGGER.debug("Flowcells = %s" %(unicode(flowcell_ids)))
193             flowcell_test = RDF.Statement(r['flowcell'],
194                                           rdfNS['type'],
195                                           libraryOntology['illumina_flowcell'])
196         if not self.model.contains_statement(flowcell_test):
197             # we probably lack full information about the flowcell.
198             load_into_model(self.model, 'rdfa', r['flowcell'])
199         return flowcell_ids
200
201     def import_sequences(self, flowcell_ids):
202
203         seq_dirs = []
204         for f in flowcell_ids:
205             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
206         sequences = scan_for_sequences(seq_dirs)
207         for seq in sequences:
208             seq.save_to_model(self.model, self.host)
209         update_model_sequence_library(self.model, self.host)
210
211     def find_missing_targets(self, result_map, raw_files):
212         """
213         Check if the sequence file exists.
214         This requires computing what the sequence name is and checking
215         to see if it can be found in the sequence location.
216
217         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
218         """
219         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
220         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
221         # find what targets we're missing
222         needed_targets = {}
223         for seq in raw_files:
224             if not seq.isgood:
225                 continue
226             filename_attributes = {
227                 'flowcell': seq.flowcell_id,
228                 'lib_id': seq.library_id,
229                 'lane': seq.lane_number,
230                 'read': seq.read,
231                 'cycle': seq.cycle
232             }
233
234             if seq.ispaired:
235                 target_name = fastq_paired_template % \
236                               filename_attributes
237             else:
238                 target_name = fastq_single_template % \
239                               filename_attributes
240
241             result_dir = result_map[seq.library_id]
242             target_pathname = os.path.join(result_dir, target_name)
243             if self.force or not os.path.exists(target_pathname):
244                 t = needed_targets.setdefault(target_pathname, {})
245                 t.setdefault(seq.filetype, []).append(seq)
246
247         return needed_targets
248
249     def condor_srf_to_fastq(self, sources, target_pathname):
250         if len(sources) > 1:
251             raise ValueError("srf to fastq can only handle one file")
252
253         mid_point = None
254         if sources[0].flowcell_id == '30DY0AAXX':
255             mid_point = 76
256
257         return {
258             'sources': [sources[0].path],
259             'pyscript': srf2fastq.__file__,
260             'flowcell': sources[0].flowcell_id,
261             'ispaired': sources[0].ispaired,
262             'target': target_pathname,
263             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
264             'mid': mid_point,
265             'force': self.force,
266         }
267
268     def condor_qseq_to_fastq(self, sources, target_pathname):
269         paths = []
270         for source in sources:
271             paths.append(source.path)
272         paths.sort()
273         return {
274             'pyscript': qseq2fastq.__file__,
275             'flowcell': sources[0].flowcell_id,
276             'target': target_pathname,
277             'sources': paths,
278             'ispaired': sources[0].ispaired,
279             'istar': len(sources) == 1,
280         }
281
282     def condor_desplit_fastq(self, sources, target_pathname):
283         paths = []
284         for source in sources:
285             paths.append(source.path)
286         paths.sort()
287         return {
288             'pyscript': desplit_fastq.__file__,
289             'target': target_pathname,
290             'sources': paths,
291             'ispaired': sources[0].ispaired,
292         }
293
294
295 def make_lane_dict(lib_db, lib_id):
296     """
297     Convert the lane_set in a lib_db to a dictionary
298     indexed by flowcell ID
299     """
300     result = []
301     for lane in lib_db[lib_id]['lane_set']:
302         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
303         lane['flowcell'] = flowcell_id
304         result.append((lane['flowcell'], lane))
305     return dict(result)
306
307 class SequenceResult(object):
308     """Convert the sparql query result from find_archive_sequence_files
309     """
310     def __init__(self, result):
311         self.filenode = result['filenode']
312         self._filetype = result['filetype']
313         self.cycle = fromTypedNode(result['cycle'])
314         self.lane_number = fromTypedNode(result['lane_number'])
315         self.read = fromTypedNode(result['read'])
316         if type(self.read) in types.StringTypes:
317             self.read = 1
318         self.library = result['library']
319         self.library_id = fromTypedNode(result['library_id'])
320         self.flowcell = result['flowcell']
321         self.flowcell_id = fromTypedNode(result['flowcell_id'])
322         self.flowcell_type = fromTypedNode(result['flowcell_type'])
323         self.flowcell_status = fromTypedNode(result['flowcell_status'])
324
325     def _is_good(self):
326         """is this sequence / flowcell 'good enough'"""
327         if self.flowcell_status is not None and \
328            self.flowcell_status.lower() == "failed":
329             return False
330         return True
331     isgood = property(_is_good)
332
333     def _get_ispaired(self):
334         if self.flowcell_type.lower() == "paired":
335             return True
336         else:
337             return False
338     ispaired = property(_get_ispaired)
339
340     def _get_filetype(self):
341         return stripNamespace(libraryOntology, self._filetype)
342     filetype = property(_get_filetype)
343
344     def _get_path(self):
345         url = urlparse(str(self.filenode.uri))
346         if url.scheme == 'file':
347             return url.path
348         else:
349             errmsg = u"Unsupported scheme {0} for {1}"
350             raise ValueError(errmsg.format(url.scheme, unicode(url)))
351     path = property(_get_path)