Fix matching scanned sequence files to library IDs work for hiseq runs.
[htsworkflow.git] / htsworkflow / submission / condorfastq.py
1 """Convert srf and qseq archive files to fastqs
2 """
3 import logging
4 import os
5 from pprint import pformat,pprint
6 import sys
7 import types
8 from urlparse import urljoin, urlparse
9
10 from htsworkflow.pipelines.sequences import scan_for_sequences, \
11      update_model_sequence_library
12 from htsworkflow.pipelines.samplekey import SampleKey
13 from htsworkflow.pipelines import qseq2fastq
14 from htsworkflow.pipelines import srf2fastq
15 from htsworkflow.pipelines import desplit_fastq
16 from htsworkflow.util.rdfhelp import get_model, dump_model, load_into_model, \
17      fromTypedNode, \
18      libraryOntology, \
19      stripNamespace, \
20      rdfNS
21 from htsworkflow.util.conversion import parse_flowcell_id
22
23 from django.conf import settings
24 from django.template import Context, loader
25
26 import RDF
27
28 LOGGER = logging.getLogger(__name__)
29
30
31 class CondorFastqExtract(object):
32     def __init__(self, host, sequences_path,
33                  log_path='log',
34                  force=False):
35         """Extract fastqs from results archive
36
37         Args:
38           host (str): root of the htsworkflow api server
39           apidata (dict): id & key to post to the server
40           sequences_path (str): root of the directory tree to scan for files
41           log_path (str): where to put condor log files
42           force (bool): do we force overwriting current files?
43         """
44         self.host = host
45         self.model = get_model()
46         self.sequences_path = sequences_path
47         self.log_path = log_path
48         self.force = force
49
50     def create_scripts(self, result_map ):
51         """
52         Generate condor scripts to build any needed fastq files
53
54         Args:
55           result_map: htsworkflow.submission.results.ResultMap()
56         """
57         template_map = {'srf': 'srf.condor',
58                         'qseq': 'qseq.condor',
59                         'split_fastq': 'split_fastq.condor',
60                         'by_sample': 'lane_to_fastq.turtle',
61                         }
62
63         env = None
64         pythonpath = os.environ.get('PYTHONPATH', None)
65         if pythonpath is not None:
66             env = "PYTHONPATH=%s" % (pythonpath,)
67         condor_entries = self.build_condor_arguments(result_map)
68         for script_type in template_map.keys():
69             template = loader.get_template(template_map[script_type])
70             variables = {'python': sys.executable,
71                          'logdir': self.log_path,
72                          'env': env,
73                          'args': condor_entries[script_type],
74                          'root_url': self.host,
75                          }
76             context = Context(variables)
77
78             with open(script_type + '.condor','w+') as outstream:
79                 outstream.write(template.render(context))
80
81     def build_condor_arguments(self, result_map):
82         condor_entries = {'srf': [],
83                           'qseq': [],
84                           'split_fastq': []}
85
86         conversion_funcs = {'srf': self.condor_srf_to_fastq,
87                             'qseq': self.condor_qseq_to_fastq,
88                             'split_fastq': self.condor_desplit_fastq
89                             }
90         by_sample = {}
91         sequences = self.find_archive_sequence_files(result_map)
92         needed_targets = self.find_missing_targets(result_map, sequences)
93
94         for target_pathname, available_sources in needed_targets.items():
95             LOGGER.debug(' target : %s' % (target_pathname,))
96             LOGGER.debug(' candidate sources: %s' % (available_sources,))
97             for condor_type in available_sources.keys():
98                 conversion = conversion_funcs.get(condor_type, None)
99                 if conversion is None:
100                     errmsg = "Unrecognized type: {0} for {1}"
101                     print errmsg.format(condor_type,
102                                         pformat(available_sources))
103                     continue
104                 sources = available_sources.get(condor_type, None)
105
106                 if sources is not None:
107                     condor_entries.setdefault(condor_type, []).append(
108                         conversion(sources, target_pathname))
109                     for s in sources:
110                         by_sample.setdefault(s.lane_number,[]).append(
111                             target_pathname)
112             else:
113                 print " need file", target_pathname
114
115         condor_entries['by_sample'] = by_sample
116         return condor_entries
117
118     def find_archive_sequence_files(self,  result_map):
119         """
120         Find archived sequence files associated with our results.
121         """
122         self.import_libraries(result_map)
123         flowcell_ids = self.find_relavant_flowcell_ids()
124         self.import_sequences(flowcell_ids)
125
126         query = RDF.SPARQLQuery("""
127         prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
128         prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
129         prefix xsd: <http://www.w3.org/2001/XMLSchema#>
130
131         select ?filenode ?filetype ?cycle ?lane_number ?read
132                ?library  ?library_id
133                ?flowcell ?flowcell_id ?read_length
134                ?flowcell_type ?flowcell_status
135         where {
136             ?filenode libns:cycle ?cycle ;
137                       libns:lane_number ?lane_number ;
138                       libns:read ?read ;
139                       libns:flowcell ?flowcell ;
140                       libns:flowcell_id ?flowcell_id ;
141                       libns:library ?library ;
142                       libns:library_id ?library_id ;
143                       rdf:type ?filetype ;
144                       a libns:raw_file .
145             ?flowcell libns:read_length ?read_length ;
146                       libns:flowcell_type ?flowcell_type .
147             OPTIONAL { ?flowcell libns:flowcell_status ?flowcell_status }
148             FILTER(?filetype != libns:raw_file)
149         }
150         """)
151         results = []
152         for r in query.execute(self.model):
153             library_id = fromTypedNode(r['library_id'])
154             if library_id in result_map:
155                 results.append(SequenceResult(r))
156         return results
157
158     def import_libraries(self, result_map):
159         for lib_id in result_map.keys():
160             liburl = urljoin(self.host, 'library/%s/' % (lib_id,))
161             library = RDF.Node(RDF.Uri(liburl))
162             self.import_library(library)
163
164     def import_library(self, library):
165         """Import library data into our model if we don't have it already
166         """
167         q = RDF.Statement(library, rdfNS['type'], libraryOntology['library'])
168         if not self.model.contains_statement(q):
169             load_into_model(self.model, 'rdfa', library)
170
171     def find_relavant_flowcell_ids(self):
172         """Generate set of flowcell ids that had samples of interest on them
173         """
174         flowcell_query =RDF.SPARQLQuery("""
175 prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#>
176
177 select distinct ?library ?flowcell ?flowcell_id
178 WHERE {
179   ?library a libns:library ;
180            libns:has_lane ?lane .
181   ?lane libns:flowcell ?flowcell .
182   ?flowcell libns:flowcell_id ?flowcell_id .
183 }""")
184         flowcell_ids = set()
185         for r in flowcell_query.execute(self.model):
186             flowcell_ids.add( fromTypedNode(r['flowcell_id']) )
187         LOGGER.debug("Flowcells = %s" %(unicode(flowcell_ids)))
188         return flowcell_ids
189
190     def import_sequences(self, flowcell_ids):
191         seq_dirs = ( os.path.join(self.sequences_path, f)
192                      for f in flowcell_ids )
193         sequences = scan_for_sequences(seq_dirs)
194         for seq in sequences:
195             seq.save_to_model(self.model)
196         update_model_sequence_library(self.model, self.host)
197
198     def find_missing_targets(self, result_map, raw_files):
199         """
200         Check if the sequence file exists.
201         This requires computing what the sequence name is and checking
202         to see if it can be found in the sequence location.
203
204         Adds seq.paired flag to sequences listed in lib_db[*]['lanes']
205         """
206         fastq_paired_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s_r%(read)s.fastq'
207         fastq_single_template = '%(lib_id)s_%(flowcell)s_c%(cycle)s_l%(lane)s.fastq'
208         # find what targets we're missing
209         needed_targets = {}
210         for seq in raw_files:
211             if not seq.isgood:
212                 continue
213             filename_attributes = {
214                 'flowcell': seq.flowcell_id,
215                 'lib_id': seq.library_id,
216                 'lane': seq.lane_number,
217                 'read': seq.read,
218                 'cycle': seq.cycle
219             }
220
221             if seq.ispaired:
222                 target_name = fastq_paired_template % \
223                               filename_attributes
224             else:
225                 target_name = fastq_single_template % \
226                               filename_attributes
227
228             result_dir = result_map[seq.library_id]
229             target_pathname = os.path.join(result_dir, target_name)
230             if self.force or not os.path.exists(target_pathname):
231                 t = needed_targets.setdefault(target_pathname, {})
232                 t.setdefault(seq.filetype, []).append(seq)
233
234         return needed_targets
235
236     def condor_srf_to_fastq(self, sources, target_pathname):
237         if len(sources) > 1:
238             raise ValueError("srf to fastq can only handle one file")
239
240         mid_point = None
241         if sources[0].flowcell_id == '30DY0AAXX':
242             mid_point = 76
243
244         return {
245             'sources': [sources[0].path],
246             'pyscript': srf2fastq.__file__,
247             'flowcell': sources[0].flowcell_id,
248             'ispaired': sources[0].ispaired,
249             'target': target_pathname,
250             'target_right': target_pathname.replace('_r1.fastq', '_r2.fastq'),
251             'mid': mid_point,
252             'force': self.force,
253         }
254
255     def condor_qseq_to_fastq(self, sources, target_pathname):
256         paths = []
257         for source in sources:
258             paths.append(source.path)
259         paths.sort()
260         return {
261             'pyscript': qseq2fastq.__file__,
262             'flowcell': sources[0].flowcell_id,
263             'target': target_pathname,
264             'sources': paths,
265             'ispaired': sources[0].ispaired,
266             'istar': len(sources) == 1,
267         }
268
269     def condor_desplit_fastq(self, sources, target_pathname):
270         paths = []
271         for source in sources:
272             paths.append(source.path)
273         paths.sort()
274         return {
275             'pyscript': desplit_fastq.__file__,
276             'target': target_pathname,
277             'sources': paths,
278             'ispaired': sources[0].ispaired,
279         }
280
281
282 def make_lane_dict(lib_db, lib_id):
283     """
284     Convert the lane_set in a lib_db to a dictionary
285     indexed by flowcell ID
286     """
287     result = []
288     for lane in lib_db[lib_id]['lane_set']:
289         flowcell_id, status = parse_flowcell_id(lane['flowcell'])
290         lane['flowcell'] = flowcell_id
291         result.append((lane['flowcell'], lane))
292     return dict(result)
293
294 class SequenceResult(object):
295     """Convert the sparql query result from find_archive_sequence_files
296     """
297     def __init__(self, result):
298         self.filenode = result['filenode']
299         self._filetype = result['filetype']
300         self.cycle = fromTypedNode(result['cycle'])
301         self.lane_number = fromTypedNode(result['lane_number'])
302         self.read = fromTypedNode(result['read'])
303         if type(self.read) in types.StringTypes:
304             self.read = 1
305         self.library = result['library']
306         self.library_id = fromTypedNode(result['library_id'])
307         self.flowcell = result['flowcell']
308         self.flowcell_id = fromTypedNode(result['flowcell_id'])
309         self.flowcell_type = fromTypedNode(result['flowcell_type'])
310         self.flowcell_status = fromTypedNode(result['flowcell_status'])
311
312     def _is_good(self):
313         """is this sequence / flowcell 'good enough'"""
314         if self.flowcell_status is not None and \
315            self.flowcell_status.lower() == "failed":
316             return False
317         return True
318     isgood = property(_is_good)
319
320     def _get_ispaired(self):
321         if self.flowcell_type.lower() == "paired":
322             return True
323         else:
324             return False
325     ispaired = property(_get_ispaired)
326
327     def _get_filetype(self):
328         return stripNamespace(libraryOntology, self._filetype)
329     filetype = property(_get_filetype)
330
331     def _get_path(self):
332         url = urlparse(str(self.filenode.uri))
333         if url.scheme == 'file':
334             return url.path
335         else:
336             errmsg = u"Unsupported scheme {0} for {1}"
337             raise ValueError(errmsg.format(url.scheme, unicode(url)))
338     path = property(_get_path)