This might actually generate soft file with raw & supplemental data.
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
17      fromTypedNode, \
18      stripNamespace
19 from htsworkflow.util.rdfns import *
20 from htsworkflow.util.conversion import parse_flowcell_id
21
22 from django.conf import settings
23 from django.template import Context, loader
24
25 import RDF
26
27 LOGGER = logging.getLogger(__name__)
28
29
30 class CondorFastqExtract(object):
31     def __init__(self, host, sequences_path,
32                  log_path='log',
33                  model=None,
34                  force=False):
35         """Extract fastqs from results archive
36
37         Args:
38           host (str): root of the htsworkflow api server
39           apidata (dict): id & key to post to the server
40           sequences_path (str): root of the directory tree to scan for files
41           log_path (str): where to put condor log files
42           force (bool): do we force overwriting current files?
43         """
44         self.host = host
45         self.model = get_model(model)
46         self.sequences_path = sequences_path
47         self.log_path = log_path
48         self.force = force
49         LOGGER.info("CondorFastq host={0}".format(self.host))
50         LOGGER.info("CondorFastq sequences_path={0}".format(self.sequences_path))
51         LOGGER.info("CondorFastq log_path={0}".format(self.log_path))
52
53     def create_scripts(self, result_map ):
54         """
55         Generate condor scripts to build any needed fastq files
56
57         Args:
58           result_map: htsworkflow.submission.results.ResultMap()
59         """
60         template_map = {'srf': 'srf.condor',
61                         'qseq': 'qseq.condor',
62                         'split_fastq': 'split_fastq.condor',
63                         }
64
65         env = None
66         pythonpath = os.environ.get('PYTHONPATH', None)
67         if pythonpath is not None:
68             env = "PYTHONPATH=%s" % (pythonpath,)
69         condor_entries = self.build_condor_arguments(result_map)
70         for script_type in template_map.keys():
71             template = loader.get_template(template_map[script_type])
72             variables = {'python': sys.executable,
73                          'logdir': self.log_path,
74                          'env': env,
75                          'args': condor_entries[script_type],
76                          'root_url': self.host,
77                          }
78             context = Context(variables)
79
80             with open(script_type + '.condor','w+') as outstream:
81                 outstream.write(template.render(context))
82
83     def build_condor_arguments(self, result_map):
84         condor_entries = {'srf': [],
85                           'qseq': [],
86                           'split_fastq': []}
87
88         conversion_funcs = {'srf': self.condor_srf_to_fastq,
89                             'qseq': self.condor_qseq_to_fastq,
90                             'split_fastq': self.condor_desplit_fastq
91                             }
92         sequences = self.find_archive_sequence_files(result_map)
93         needed_targets = self.update_fastq_targets(result_map, sequences)
94
95         for target_pathname, available_sources in needed_targets.items():
96             LOGGER.debug(' target : %s' % (target_pathname,))
97             LOGGER.debug(' candidate sources: %s' % (available_sources,))
98             for condor_type in available_sources.keys():
99                 conversion = conversion_funcs.get(condor_type, None)
100                 if conversion is None:
101                     errmsg = "Unrecognized type: {0} for {1}"
102                     print errmsg.format(condor_type,
103                                         pformat(available_sources))
104                     continue
105                 sources = available_sources.get(condor_type, None)
106
107                 if sources is not None:
108                     condor_entries.setdefault(condor_type, []).append(
109                         conversion(sources, target_pathname))
110             else:
111                 print " need file", target_pathname
112
113         return condor_entries
114
115     def find_archive_sequence_files(self,  result_map):
116         """
117         Find archived sequence files associated with our results.
118         """
119         self.import_libraries(result_map)
120         flowcell_ids = self.find_relevant_flowcell_ids()
121         self.import_sequences(flowcell_ids)
122
123         query_text = """
124         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
125         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
126         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
127
128         select ?filenode ?filetype ?cycle ?lane_number ?read
129                ?library  ?library_id
130                ?flowcell ?flowcell_id ?read_length
131                ?flowcell_type ?flowcell_status
132         where {
133             ?filenode libns:cycle ?cycle ;
134                       libns:lane_number ?lane_number ;
135                       libns:read ?read ;
136                       libns:flowcell ?flowcell ;
137                       libns:flowcell_id ?flowcell_id ;
138                       libns:library ?library ;
139                       libns:library_id ?library_id ;
140                       libns:file_type ?filetype ;
141                       a libns:IlluminaResult .
142             ?flowcell libns:read_length ?read_length ;
143                       libns:flowcell_type ?flowcell_type .
144             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
145             FILTER(?filetype != libns:sequencer_result)
146         }
147         """
148         LOGGER.debug("find_archive_sequence_files query: %s",
149                      query_text)
150         query = RDF.SPARQLQuery(query_text)
151         results = []
152         for r in query.execute(self.model):
153             library_id = fromTypedNode(r['library_id'])
154             if library_id in result_map:
155                 seq = SequenceResult(r)
156                 LOGGER.debug("Creating sequence result for library %s: %s",
157                              library_id,
158                              repr(seq))
159                 results.append(seq)
160         return results
161
162     def import_libraries(self, result_map):
163         for lib_id in result_map.keys():
164             lib_id_encoded = lib_id.encode('utf-8')
165             liburl = urljoin(self.host, 'library/%s/' % (lib_id_encoded,))
166             library = RDF.Node(RDF.Uri(liburl))
167             self.import_library(library)
168
169     def import_library(self, library):
170         """Import library data into our model if we don't have it already
171         """
172         q = RDF.Statement(library, rdfNS['type'], libraryOntology['Library'])
173         present = False
174         if not self.model.contains_statement(q):
175             present = True
176             load_into_model(self.model, 'rdfa', library)
177         LOGGER.debug("Did we import %s: %s", library.uri, present)
178
179     def find_relevant_flowcell_ids(self):
180         """Generate set of flowcell ids that had samples of interest on them
181         """
182         flowcell_query = RDF.SPARQLQuery("""
183 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
184
185 select distinct ?flowcell ?flowcell_id
186 WHERE {
187   ?library a libns:Library ;
188            libns:has_lane ?lane .
189   ?lane libns:flowcell ?flowcell .
190   ?flowcell libns:flowcell_id ?flowcell_id .
191 }""")
192         flowcell_ids = set()
193         for r in flowcell_query.execute(self.model):
194             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
195             imported = False
196             a_lane = self.model.get_target(r['flowcell'],
197                                            libraryOntology['has_lane'])
198             if a_lane is None:
199                 imported = True
200                 # we lack information about which lanes were on this flowcell
201                 load_into_model(self.model, 'rdfa', r['flowcell'])
202             LOGGER.debug("Did we imported %s: %s" % (r['flowcell'].uri,
203                                                      imported))
204
205         return flowcell_ids
206
207     def import_sequences(self, flowcell_ids):
208         seq_dirs = []
209         for f in flowcell_ids:
210             seq_dirs.append(os.path.join(self.sequences_path, str(f)))
211         sequences = scan_for_sequences(seq_dirs)
212         for seq in sequences:
213             seq.save_to_model(self.model, self.host)
214         update_model_sequence_library(self.model, self.host)
215
216     def update_fastq_targets(self, result_map, raw_files):
217         """Return list of fastq files that need to be built.
218
219         Also update model with link between illumina result files
220         and our target fastq file.
221         """
222         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
223         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
224         # find what targets we're missing
225         needed_targets = {}
226         for seq in raw_files:
227             if not seq.isgood:
228                 continue
229             filename_attributes = {
230                 'flowcell': seq.flowcell_id,
231                 'lib_id': seq.library_id,
232                 'lane': seq.lane_number,
233                 'read': seq.read,
234                 'cycle': seq.cycle
235             }
236
237             if seq.ispaired:
238                 target_name = fastq_paired_template % \
239                               filename_attributes
240             else:
241                 target_name = fastq_single_template % \
242                               filename_attributes
243
244             result_dir = result_map[seq.library_id]
245             target_pathname = os.path.join(result_dir, target_name)
246             if self.force or not os.path.exists(target_pathname):
247                 t = needed_targets.setdefault(target_pathname, {})
248                 t.setdefault(seq.filetype, []).append(seq)
249             self.add_target_source_links(target_pathname, seq)
250         return needed_targets
251
252     def add_target_source_links(self, target, seq):
253         """Add link between target pathname and the 'lane' that produced it
254         (note lane objects are now post demultiplexing.)
255         """
256         target_uri = 'file://' + target.encode('utf-8')
257         target_node = RDF.Node(RDF.Uri(target_uri))
258         source_stmt = RDF.Statement(target_node, dcNS['source'], seq.filenode)
259         self.model.add_statement(source_stmt)
260
261     def condor_srf_to_fastq(self, sources, target_pathname):
262         if len(sources) > 1:
263             raise ValueError("srf to fastq can only handle one file")
264
265         mid_point = None
266         if sources[0].flowcell_id == '30DY0AAXX':
267             mid_point = 76
268
269         return {
270             'sources': [sources[0].path],
271             'pyscript': srf2fastq.__file__,
272             'flowcell': sources[0].flowcell_id,
273             'ispaired': sources[0].ispaired,
274             'target': target_pathname,
275             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
276             'mid': mid_point,
277             'force': self.force,
278         }
279
280     def condor_qseq_to_fastq(self, sources, target_pathname):
281         paths = []
282         for source in sources:
283             paths.append(source.path)
284         paths.sort()
285         return {
286             'pyscript': qseq2fastq.__file__,
287             'flowcell': sources[0].flowcell_id,
288             'target': target_pathname,
289             'sources': paths,
290             'ispaired': sources[0].ispaired,
291             'istar': len(sources) == 1,
292         }
293
294     def condor_desplit_fastq(self, sources, target_pathname):
295         paths = []
296         for source in sources:
297             paths.append(source.path)
298         paths.sort()
299         return {
300             'pyscript': desplit_fastq.__file__,
301             'target': target_pathname,
302             'sources': paths,
303             'ispaired': sources[0].ispaired,
304         }
305
306
307 def make_lane_dict(lib_db, lib_id):
308     """
309     Convert the lane_set in a lib_db to a dictionary
310     indexed by flowcell ID
311     """
312     result = []
313     for lane in lib_db[lib_id]['lane_set']:
314         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
315         lane['flowcell'] = flowcell_id
316         result.append((lane['flowcell'], lane))
317     return dict(result)
318
319 class SequenceResult(object):
320     """Convert the sparql query result from find_archive_sequence_files
321     """
322     def __init__(self, result):
323         self.filenode = result['filenode']
324         self._filetype = result['filetype']
325         self.cycle = fromTypedNode(result['cycle'])
326         self.lane_number = fromTypedNode(result['lane_number'])
327         self.read = fromTypedNode(result['read'])
328         if type(self.read) in types.StringTypes:
329             self.read = 1
330         self.library = result['library']
331         self.library_id = fromTypedNode(result['library_id'])
332         self.flowcell = result['flowcell']
333         self.flowcell_id = fromTypedNode(result['flowcell_id'])
334         self.flowcell_type = fromTypedNode(result['flowcell_type'])
335         self.flowcell_status = fromTypedNode(result['flowcell_status'])
336
337     def _is_good(self):
338         """is this sequence / flowcell 'good enough'"""
339         if self.flowcell_status is not None and \
340            self.flowcell_status.lower() == "failed":
341             return False
342         return True
343     isgood = property(_is_good)
344
345     def _get_ispaired(self):
346         if self.flowcell_type.lower() == "paired":
347             return True
348         else:
349             return False
350     ispaired = property(_get_ispaired)
351
352     def _get_filetype(self):
353         return stripNamespace(libraryOntology, self._filetype)
354     filetype = property(_get_filetype)
355
356     def _get_path(self):
357         url = urlparse(str(self.filenode.uri))
358         if url.scheme == 'file':
359             return url.path
360         else:
361             errmsg = u"Unsupported scheme {0} for {1}"
362             raise ValueError(errmsg.format(url.scheme, unicode(url)))
363     path = property(_get_path)
364
365     def __repr__(self):
366         return "SequenceResult({0},{1},{2})".format(
367             str(self.filenode),
368             str(self.library_id),
369             str(self.flowcell_id))