Older rdf library doesn't like unicode strings as nodes
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
17      fromTypedNode, \
18      libraryOntology, \
19      stripNamespace, \
20      rdfNS
21 from htsworkflow.util.conversion import parse_flowcell_id
22
23 from django.conf import settings
24 from django.template import Context, loader
25
26 import RDF
27
28 LOGGER = logging.getLogger(__name__)
29
30
31 class CondorFastqExtract(object):
32     def __init__(self, host, sequences_path,
33                  log_path='log',
34                  force=False):
35         """Extract fastqs from results archive
36
37         Args:
38           host (str): root of the htsworkflow api server
39           apidata (dict): id & key to post to the server
40           sequences_path (str): root of the directory tree to scan for files
41           log_path (str): where to put condor log files
42           force (bool): do we force overwriting current files?
43         """
44         self.host = host
45         self.model = get_model()
46         self.sequences_path = sequences_path
47         self.log_path = log_path
48         self.force = force
49
50     def create_scripts(self, result_map ):
51         """
52         Generate condor scripts to build any needed fastq files
53
54         Args:
55           result_map: htsworkflow.submission.results.ResultMap()
56         """
57         template_map = {'srf': 'srf.condor',
58                         'qseq': 'qseq.condor',
59                         'split_fastq': 'split_fastq.condor',
60                         'by_sample': 'lane_to_fastq.turtle',
61                         }
62
63         env = None
64         pythonpath = os.environ.get('PYTHONPATH', None)
65         if pythonpath is not None:
66             env = "PYTHONPATH=%s" % (pythonpath,)
67         condor_entries = self.build_condor_arguments(result_map)
68         for script_type in template_map.keys():
69             template = loader.get_template(template_map[script_type])
70             variables = {'python': sys.executable,
71                          'logdir': self.log_path,
72                          'env': env,
73                          'args': condor_entries[script_type],
74                          'root_url': self.host,
75                          }
76             context = Context(variables)
77
78             with open(script_type + '.condor','w+') as outstream:
79                 outstream.write(template.render(context))
80
81     def build_condor_arguments(self, result_map):
82         condor_entries = {'srf': [],
83                           'qseq': [],
84                           'split_fastq': []}
85
86         conversion_funcs = {'srf': self.condor_srf_to_fastq,
87                             'qseq': self.condor_qseq_to_fastq,
88                             'split_fastq': self.condor_desplit_fastq
89                             }
90         by_sample = {}
91         sequences = self.find_archive_sequence_files(result_map)
92         needed_targets = self.find_missing_targets(result_map, sequences)
93
94         for target_pathname, available_sources in needed_targets.items():
95             LOGGER.debug(' target : %s' % (target_pathname,))
96             LOGGER.debug(' candidate sources: %s' % (available_sources,))
97             for condor_type in available_sources.keys():
98                 conversion = conversion_funcs.get(condor_type, None)
99                 if conversion is None:
100                     errmsg = "Unrecognized type: {0} for {1}"
101                     print errmsg.format(condor_type,
102                                         pformat(available_sources))
103                     continue
104                 sources = available_sources.get(condor_type, None)
105
106                 if sources is not None:
107                     condor_entries.setdefault(condor_type, []).append(
108                         conversion(sources, target_pathname))
109                     for s in sources:
110                         by_sample.setdefault(s.lane_number,[]).append(
111                             target_pathname)
112             else:
113                 print " need file", target_pathname
114
115         condor_entries['by_sample'] = by_sample
116         return condor_entries
117
118     def find_archive_sequence_files(self,  result_map):
119         """
120         Find archived sequence files associated with our results.
121         """
122         self.import_libraries(result_map)
123         flowcell_ids = self.find_relavant_flowcell_ids()
124         self.import_sequences(flowcell_ids)
125
126         query = RDF.SPARQLQuery("""
127         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
128         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
129         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
130
131         select ?filenode ?filetype ?cycle ?lane_number ?read
132                ?library  ?library_id
133                ?flowcell ?flowcell_id ?read_length
134                ?flowcell_type ?flowcell_status
135         where {
136             ?filenode libns:cycle ?cycle ;
137                       libns:lane_number ?lane_number ;
138                       libns:read ?read ;
139                       libns:flowcell ?flowcell ;
140                       libns:flowcell_id ?flowcell_id ;
141                       libns:library ?library ;
142                       libns:library_id ?library_id ;
143                       rdf:type ?filetype ;
144                       a libns:raw_file .
145             ?flowcell libns:read_length ?read_length ;
146                       libns:flowcell_type ?flowcell_type .
147             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
148             FILTER(?filetype != libns:raw_file)
149         }
150         """)
151         results = []
152         for r in query.execute(self.model):
153             library_id = fromTypedNode(r['library_id'])
154             if library_id in result_map:
155                 results.append(SequenceResult(r))
156         return results
157
158     def import_libraries(self, result_map):
159         for lib_id in result_map.keys():
160             lib_id_encoded = lib_id.encode('utf-8')
161             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
162             library = RDF.Node(RDF.Uri(liburl))
163             self.import_library(library)
164
165     def import_library(self, library):
166         """Import library data into our model if we don't have it already
167         """
168         q = RDF.Statement(library, rdfNS['type'], libraryOntology['library'])
169         if not self.model.contains_statement(q):
170             load_into_model(self.model, 'rdfa', library)
171
172     def find_relavant_flowcell_ids(self):
173         """Generate set of flowcell ids that had samples of interest on them
174         """
175         flowcell_query =RDF.SPARQLQuery("""
176 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
177
178 select distinct ?library ?flowcell ?flowcell_id
179 WHERE {
180   ?library a libns:library ;
181            libns:has_lane ?lane .
182   ?lane libns:flowcell ?flowcell .
183   ?flowcell libns:flowcell_id ?flowcell_id .
184 }""")
185         flowcell_ids = set()
186         for r in flowcell_query.execute(self.model):
187             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
188         LOGGER.debug("Flowcells = %s" %(unicode(flowcell_ids)))
189         return flowcell_ids
190
191     def import_sequences(self, flowcell_ids):
192         seq_dirs = ( os.path.join(self.sequences_path, f)
193                      for f in flowcell_ids )
194         sequences = scan_for_sequences(seq_dirs)
195         for seq in sequences:
196             seq.save_to_model(self.model)
197         update_model_sequence_library(self.model, self.host)
198
199     def find_missing_targets(self, result_map, raw_files):
200         """
201         Check if the sequence file exists.
202         This requires computing what the sequence name is and checking
203         to see if it can be found in the sequence location.
204
205         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
206         """
207         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
208         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
209         # find what targets we're missing
210         needed_targets = {}
211         for seq in raw_files:
212             if not seq.isgood:
213                 continue
214             filename_attributes = {
215                 'flowcell': seq.flowcell_id,
216                 'lib_id': seq.library_id,
217                 'lane': seq.lane_number,
218                 'read': seq.read,
219                 'cycle': seq.cycle
220             }
221
222             if seq.ispaired:
223                 target_name = fastq_paired_template % \
224                               filename_attributes
225             else:
226                 target_name = fastq_single_template % \
227                               filename_attributes
228
229             result_dir = result_map[seq.library_id]
230             target_pathname = os.path.join(result_dir, target_name)
231             if self.force or not os.path.exists(target_pathname):
232                 t = needed_targets.setdefault(target_pathname, {})
233                 t.setdefault(seq.filetype, []).append(seq)
234
235         return needed_targets
236
237     def condor_srf_to_fastq(self, sources, target_pathname):
238         if len(sources) > 1:
239             raise ValueError("srf to fastq can only handle one file")
240
241         mid_point = None
242         if sources[0].flowcell_id == '30DY0AAXX':
243             mid_point = 76
244
245         return {
246             'sources': [sources[0].path],
247             'pyscript': srf2fastq.__file__,
248             'flowcell': sources[0].flowcell_id,
249             'ispaired': sources[0].ispaired,
250             'target': target_pathname,
251             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
252             'mid': mid_point,
253             'force': self.force,
254         }
255
256     def condor_qseq_to_fastq(self, sources, target_pathname):
257         paths = []
258         for source in sources:
259             paths.append(source.path)
260         paths.sort()
261         return {
262             'pyscript': qseq2fastq.__file__,
263             'flowcell': sources[0].flowcell_id,
264             'target': target_pathname,
265             'sources': paths,
266             'ispaired': sources[0].ispaired,
267             'istar': len(sources) == 1,
268         }
269
270     def condor_desplit_fastq(self, sources, target_pathname):
271         paths = []
272         for source in sources:
273             paths.append(source.path)
274         paths.sort()
275         return {
276             'pyscript': desplit_fastq.__file__,
277             'target': target_pathname,
278             'sources': paths,
279             'ispaired': sources[0].ispaired,
280         }
281
282
283 def make_lane_dict(lib_db, lib_id):
284     """
285     Convert the lane_set in a lib_db to a dictionary
286     indexed by flowcell ID
287     """
288     result = []
289     for lane in lib_db[lib_id]['lane_set']:
290         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
291         lane['flowcell'] = flowcell_id
292         result.append((lane['flowcell'], lane))
293     return dict(result)
294
295 class SequenceResult(object):
296     """Convert the sparql query result from find_archive_sequence_files
297     """
298     def __init__(self, result):
299         self.filenode = result['filenode']
300         self._filetype = result['filetype']
301         self.cycle = fromTypedNode(result['cycle'])
302         self.lane_number = fromTypedNode(result['lane_number'])
303         self.read = fromTypedNode(result['read'])
304         if type(self.read) in types.StringTypes:
305             self.read = 1
306         self.library = result['library']
307         self.library_id = fromTypedNode(result['library_id'])
308         self.flowcell = result['flowcell']
309         self.flowcell_id = fromTypedNode(result['flowcell_id'])
310         self.flowcell_type = fromTypedNode(result['flowcell_type'])
311         self.flowcell_status = fromTypedNode(result['flowcell_status'])
312
313     def _is_good(self):
314         """is this sequence / flowcell 'good enough'"""
315         if self.flowcell_status is not None and \
316            self.flowcell_status.lower() == "failed":
317             return False
318         return True
319     isgood = property(_is_good)
320
321     def _get_ispaired(self):
322         if self.flowcell_type.lower() == "paired":
323             return True
324         else:
325             return False
326     ispaired = property(_get_ispaired)
327
328     def _get_filetype(self):
329         return stripNamespace(libraryOntology, self._filetype)
330     filetype = property(_get_filetype)
331
332     def _get_path(self):
333         url = urlparse(str(self.filenode.uri))
334         if url.scheme == 'file':
335             return url.path
336         else:
337             errmsg = u"Unsupported scheme {0} for {1}"
338             raise ValueError(errmsg.format(url.scheme, unicode(url)))
339     path = property(_get_path)