Change unittest2 back into unittest.
[htsworkflow.git] / htsworkflow / pipelines / test / test_sequences.py
1 #!/usr/bin/env python
2 import os
3 import shutil
4 import tempfile
5 from unittest import TestCase
6
7 import RDF
8
9 from htsworkflow.pipelines import sequences
10 from htsworkflow.util.rdfhelp import get_model, load_string_into_model, \
11      rdfNS, libraryOntology, dump_model, fromTypedNode
12
13 class SequenceFileTests(TestCase):
14     """
15     Make sure the sequence archive class works
16     """
17     def test_get_flowcell_cycle(self):
18         tests = [
19             ('/root/42BW9AAXX/C1-152',
20              sequences.FlowcellPath('42BW9AAXX', 1, 152, None)),
21             ('/root/42BW9AAXX/C1-152/',
22              sequences.FlowcellPath('42BW9AAXX', 1, 152, None)),
23             ('/root/42BW9AAXX/C1-152/Project_12345',
24              sequences.FlowcellPath('42BW9AAXX', 1, 152, 'Project_12345')),
25             ('/root/42BW9AAXX/C1-152/Project_12345/',
26              sequences.FlowcellPath('42BW9AAXX', 1, 152, 'Project_12345')),
27         ]
28
29         for t in tests:
30             path = sequences.get_flowcell_cycle(t[0])
31             self.assertEqual(path, t[1])
32
33     def test_flowcell_cycle(self):
34         """
35         Make sure code to parse directory heirarchy works
36         """
37         path = '/root/42BW9AAXX/C1-152'
38         flowcell, start, stop, project = sequences.get_flowcell_cycle(path)
39
40         self.assertEqual(flowcell, '42BW9AAXX')
41         self.assertEqual(start, 1)
42         self.assertEqual(stop, 152)
43         self.assertEqual(project, None)
44
45         path = '/root/42BW9AAXX/other'
46         self.assertRaises(ValueError, sequences.get_flowcell_cycle, path)
47
48     def test_flowcell_project_cycle(self):
49         """
50         Make sure code to parse directory heirarchy works
51         """
52         path = '/root/42BW9AAXX/C1-152/Project_12345_Index1'
53         flowcell, start, stop, project = sequences.get_flowcell_cycle(path)
54
55         self.assertEqual(flowcell, '42BW9AAXX')
56         self.assertEqual(start, 1)
57         self.assertEqual(stop, 152)
58         self.assertEqual(project, 'Project_12345_Index1')
59
60         path = '/root/42BW9AAXX/other'
61         self.assertRaises(ValueError, sequences.get_flowcell_cycle, path)
62
63     def test_srf(self):
64         path = '/root/42BW9AAXX/C1-38'
65         name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_4.srf'
66         other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_5.srf'
67         pathname = os.path.join(path,name)
68         f0 = sequences.parse_srf(path, name)
69         f1 = sequences.parse_srf(path, name)
70         fother = sequences.parse_srf(path, other)
71
72         self.assertEqual(f0.filetype, 'srf')
73         self.assertEqual(f0.path, pathname)
74         self.assertEqual(unicode(f0), unicode(pathname))
75         self.assertEqual(repr(f0), "<srf 42BW9AAXX 4 %s>" % (pathname,))
76         self.assertEqual(f0.flowcell, '42BW9AAXX')
77         self.assertEqual(f0.lane, '4')
78         self.assertEqual(f0.read, None)
79         self.assertEqual(f0.pf, None)
80         self.assertEqual(f0.cycle, 38)
81         self.assertEqual(f0.make_target_name('/tmp'),
82                          os.path.join('/tmp', name))
83
84         self.assertEqual(f0, f1)
85         self.assertNotEqual(f0, fother)
86
87
88     def test_qseq(self):
89         path = '/root/42BW9AAXX/C1-36'
90         name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1.tar.bz2'
91         other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l5_r1.tar.bz2'
92         pathname = os.path.join(path,name)
93         f0 = sequences.parse_qseq(path, name)
94         f1 = sequences.parse_qseq(path, name)
95         fother = sequences.parse_qseq(path, other)
96
97         self.assertEqual(f0.filetype, 'qseq')
98         self.assertEqual(f0.path, pathname)
99         self.assertEqual(unicode(f0), unicode(pathname))
100         self.assertEqual(repr(f0), "<qseq 42BW9AAXX 4 %s>" %(pathname,))
101         self.assertEqual(f0.flowcell, '42BW9AAXX')
102         self.assertEqual(f0.lane, '4')
103         self.assertEqual(f0.read, 1)
104         self.assertEqual(f0.pf, None)
105         self.assertEqual(f0.cycle, 36)
106         self.assertEqual(f0.make_target_name('/tmp'),
107                          os.path.join('/tmp', name))
108
109         self.assertEqual(f0, f1)
110         self.assertNotEqual(f0, fother)
111
112         path = '/root/ilmn200901/C1-202'
113         name = 'woldlab_090125_HWI-EAS_0000_ilmn200901_l1_r1.tar.bz2'
114         other = 'woldlab_090125_HWI-EAS_0000_ilmn200901_l1_r2.tar.bz2'
115         pathname = os.path.join(path, name)
116         f0 = sequences.parse_qseq(path, name)
117         f1 = sequences.parse_qseq(path, name)
118         fother = sequences.parse_qseq(path, other)
119
120         self.assertEqual(f0.filetype, 'qseq')
121         self.assertEqual(f0.path, pathname)
122         self.assertEqual(unicode(f0), unicode(pathname))
123         self.assertEqual(repr(f0), "<qseq ilmn200901 1 %s>" %(pathname,))
124         self.assertEqual(f0.lane, '1')
125         self.assertEqual(f0.read, 1)
126         self.assertEqual(f0.pf, None)
127         self.assertEqual(f0.cycle, 202)
128         self.assertEqual(f0.make_target_name('/tmp'),
129                          os.path.join('/tmp', name))
130
131         self.assertEqual(f0, f1)
132         self.assertNotEqual(f0, fother)
133
134     def test_fastq(self):
135         path = '/root/42BW9AAXX/C1-38'
136         name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1_pass.fastq.bz2'
137         other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l5_r1_pass.fastq.bz2'
138         pathname = os.path.join(path,name)
139         f0 = sequences.parse_fastq(path, name)
140         f1 = sequences.parse_fastq(path, name)
141         fother = sequences.parse_fastq(path, other)
142
143         self.assertEqual(f0.filetype, 'fastq')
144         self.assertEqual(f0.path, pathname)
145         self.assertEqual(unicode(f0), unicode(pathname))
146         self.assertEqual(repr(f0), "<fastq 42BW9AAXX 4 %s>" % (pathname,))
147         self.assertEqual(f0.flowcell, '42BW9AAXX')
148         self.assertEqual(f0.lane, '4')
149         self.assertEqual(f0.read, 1)
150         self.assertEqual(f0.pf, True)
151         self.assertEqual(f0.cycle, 38)
152         self.assertEqual(f0.make_target_name('/tmp'),
153                          os.path.join('/tmp', name))
154
155         self.assertEqual(f0, f1)
156         self.assertNotEqual(f0, fother)
157
158         name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r2_nopass.fastq.bz2'
159         other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2_nopass.fastq.bz2'
160         pathname = os.path.join(path,name)
161         f0 = sequences.parse_fastq(path, name)
162         f1 = sequences.parse_fastq(path, name)
163         fother = sequences.parse_fastq(path, other)
164
165         self.assertEqual(f0.filetype, 'fastq')
166         self.assertEqual(f0.path, pathname)
167         self.assertEqual(unicode(f0), unicode(pathname))
168         self.assertEqual(repr(f0), "<fastq 42BW9AAXX 4 %s>" %(pathname,))
169         self.assertEqual(f0.flowcell, '42BW9AAXX')
170         self.assertEqual(f0.lane, '4')
171         self.assertEqual(f0.read, 2)
172         self.assertEqual(f0.pf, False)
173         self.assertEqual(f0.cycle, 38)
174         self.assertEqual(f0.make_target_name('/tmp'),
175                          os.path.join('/tmp', name))
176
177         self.assertEqual(f0, f1)
178         self.assertNotEqual(f0, fother)
179
180     def test_project_fastq(self):
181         path = '/root/42BW9AAXX/C1-38/Project_12345'
182         name = '11111_NoIndex_L001_R1_001.fastq.gz'
183         other = '22222_NoIndex_L001_R1_001.fastq.gz'
184         pathname = os.path.join(path,name)
185         f0 = sequences.parse_fastq(path, name)
186         f1 = sequences.parse_fastq(path, name)
187         fother = sequences.parse_fastq(path, other)
188
189         self.assertEqual(f0.filetype, 'split_fastq')
190         self.assertEqual(f0.path, pathname)
191         self.assertEqual(unicode(f0), unicode(pathname))
192         self.assertEqual(repr(f0), "<split_fastq 42BW9AAXX 1 %s>" %(pathname,))
193         self.assertEqual(f0.flowcell, '42BW9AAXX')
194         self.assertEqual(f0.lane, '1')
195         self.assertEqual(f0.read, 1)
196         self.assertEqual(f0.pf, True)
197         self.assertEqual(f0.project, '11111')
198         self.assertEqual(f0.index, 'NoIndex')
199         self.assertEqual(f0.cycle, 38)
200         self.assertEqual(f0.make_target_name('/tmp'),
201                          os.path.join('/tmp', name))
202
203         self.assertEqual(f0, f1)
204         self.assertNotEqual(f0, fother)
205
206         name = '11112_AAATTT_L001_R2_003.fastq.gz'
207         other = '11112_AAATTT_L002_R2_003.fastq.gz'
208         pathname = os.path.join(path,name)
209         f0 = sequences.parse_fastq(path, name)
210         f1 = sequences.parse_fastq(path, name)
211         fother = sequences.parse_fastq(path, other)
212
213         self.assertEqual(f0.filetype, 'split_fastq')
214         self.assertEqual(f0.path, pathname)
215         self.assertEqual(unicode(f0), unicode(pathname))
216         self.assertEqual(repr(f0), "<split_fastq 42BW9AAXX 1 %s>" % (pathname,))
217         self.assertEqual(f0.flowcell, '42BW9AAXX')
218         self.assertEqual(f0.lane, '1')
219         self.assertEqual(f0.read, 2)
220         self.assertEqual(f0.pf, True)
221         self.assertEqual(f0.project, '11112')
222         self.assertEqual(f0.index, 'AAATTT')
223         self.assertEqual(f0.cycle, 38)
224         self.assertEqual(f0.make_target_name('/tmp'),
225                          os.path.join('/tmp', name))
226
227         self.assertEqual(f0, f1)
228         self.assertNotEqual(f0, fother)
229
230     def test_parse_fastq_pf_flag(self):
231         other = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2_nopass.fastq.bz2'
232         data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX',
233                 'l1', 'r2', 'nopass']
234         self.assertEqual(sequences.parse_fastq_pf_flag(data), False)
235
236         data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX',
237                 'l1', 'r2', 'pass']
238         self.assertEqual(sequences.parse_fastq_pf_flag(data), True)
239
240         data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX',
241                 'l1', 'r2', 'all']
242         self.assertEqual(sequences.parse_fastq_pf_flag(data), None)
243
244         data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX',
245                 'l1', 'r2']
246         self.assertEqual(sequences.parse_fastq_pf_flag(data), None)
247
248         data = ['woldlab', '090622', 'HWI-EAS229', '0120', '42BW9AAXX',
249                 'l1', 'r2', 'all', 'newthing']
250         self.assertRaises(ValueError, sequences.parse_fastq_pf_flag, data)
251
252
253     def test_project_fastq_hashing(self):
254         """Can we tell the difference between sequence files?
255         """
256         path = '/root/42BW9AAXX/C1-38/Project_12345'
257         names = [('11111_NoIndex_L001_R1_001.fastq.gz',
258                   '11111_NoIndex_L001_R2_001.fastq.gz'),
259                  ('11112_NoIndex_L001_R1_001.fastq.gz',
260                   '11112_NoIndex_L001_R1_002.fastq.gz')
261                  ]
262         for a_name, b_name in names:
263             a = sequences.parse_fastq(path, a_name)
264             b = sequences.parse_fastq(path, b_name)
265             self.assertNotEqual(a, b)
266             self.assertNotEqual(a.key(), b.key())
267             self.assertNotEqual(hash(a), hash(b))
268
269     def test_eland(self):
270         path = '/root/42BW9AAXX/C1-38'
271         name = 's_4_eland_extended.txt.bz2'
272         pathname = os.path.join(path,name)
273         f = sequences.parse_eland(path, name)
274
275         self.assertEqual(f.filetype, 'eland')
276         self.assertEqual(f.path, pathname)
277         self.assertEqual(f.flowcell, '42BW9AAXX')
278         self.assertEqual(f.lane, '4')
279         self.assertEqual(f.read, None)
280         self.assertEqual(f.pf, None)
281         self.assertEqual(f.cycle, 38)
282         self.assertEqual(f.make_target_name('/tmp'),
283                          '/tmp/42BW9AAXX_38_s_4_eland_extended.txt.bz2')
284
285         path = '/root/42BW9AAXX/C1-152'
286         name = 's_4_1_eland_extended.txt.bz2'
287         pathname = os.path.join(path,name)
288         f = sequences.parse_eland(path, name)
289
290         self.assertEqual(f.filetype, 'eland')
291         self.assertEqual(f.path, pathname)
292         self.assertEqual(f.flowcell, '42BW9AAXX')
293         self.assertEqual(f.lane, '4')
294         self.assertEqual(f.read, 1)
295         self.assertEqual(f.pf, None)
296         self.assertEqual(f.cycle, 152)
297         self.assertEqual(f.make_target_name('/tmp'),
298                          '/tmp/42BW9AAXX_152_s_4_1_eland_extended.txt.bz2')
299
300     def _generate_sequences(self):
301         seqs = []
302         data = [('/root/42BW9AAXX/C1-152',
303                 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r1.tar.bz2'),
304                 ('/root/42BW9AAXX/C1-152',
305                 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2'),
306                 ('/root/42BW9AAXX/C1-152',
307                 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r1.tar.bz2'),
308                 ('/root/42BW9AAXX/C1-152',
309                 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r21.tar.bz2'),]
310
311         for path, name in data:
312             seqs.append(sequences.parse_qseq(path, name))
313
314         path = '/root/42BW9AAXX/C1-38/Project_12345'
315         name = '12345_AAATTT_L003_R1_001.fastq.gz'
316         pathname = os.path.join(path,name)
317         seqs.append(sequences.parse_fastq(path, name))
318         self.assertEqual(len(seqs), 5)
319         return seqs
320
321
322     def test_sql(self):
323         """
324         Make sure that the quick and dirty sql interface in sequences works
325         """
326         import sqlite3
327         db = sqlite3.connect(":memory:")
328         c = db.cursor()
329         sequences.create_sequence_table(c)
330
331         for seq in self._generate_sequences():
332             seq.save_to_sql(c)
333
334         count = c.execute("select count(*) from sequences")
335         row = count.fetchone()
336         self.assertEqual(row[0], 5)
337
338     def test_basic_rdf_scan(self):
339         """Make sure we can save to RDF model"""
340         import RDF
341         model = get_model()
342
343         for seq in self._generate_sequences():
344             seq.save_to_model(model)
345
346         files = list(model.find_statements(
347             RDF.Statement(None,
348                           rdfNS['type'],
349                           libraryOntology['IlluminaResult'])))
350         self.assertEqual(len(files), 5)
351         files = list(model.find_statements(
352             RDF.Statement(None,
353                           libraryOntology['file_type'],
354                           libraryOntology['qseq'])))
355         self.assertEqual(len(files), 4)
356         files = list(model.find_statements(
357             RDF.Statement(None,
358                           libraryOntology['file_type'],
359                           libraryOntology['split_fastq'])))
360         self.assertEqual(len(files), 1)
361
362         files = list(model.find_statements(
363             RDF.Statement(None, libraryOntology['library_id'], None)))
364         self.assertEqual(len(files), 1)
365
366         files = list(model.find_statements(
367             RDF.Statement(None, libraryOntology['flowcell_id'], None)))
368         self.assertEqual(len(files), 5)
369
370         files = list(model.find_statements(
371             RDF.Statement(None, libraryOntology['flowcell'], None)))
372         self.assertEqual(len(files), 0)
373
374         files = list(model.find_statements(
375             RDF.Statement(None, libraryOntology['library'], None)))
376         self.assertEqual(len(files), 0)
377
378     def test_rdf_scan_with_url(self):
379         """Make sure we can save to RDF model"""
380         import RDF
381         model = get_model()
382         base_url = 'http://localhost'
383         for seq in self._generate_sequences():
384             seq.save_to_model(model, base_url=base_url)
385         localFC = RDF.NS(base_url + '/flowcell/')
386         localLibrary = RDF.NS(base_url + '/library/')
387
388         files = list(model.find_statements(
389             RDF.Statement(None, libraryOntology['flowcell'], None)))
390         self.assertEqual(len(files), 5)
391         for f in files:
392             self.assertEqual(f.object, localFC['42BW9AAXX/'])
393
394         files = list(model.find_statements(
395             RDF.Statement(None, libraryOntology['library'], None)))
396         self.assertEqual(len(files), 1)
397         self.assertEqual(files[0].object, localLibrary['12345'])
398
399     def test_rdf_fixup_library(self):
400         """Make sure we can save to RDF model"""
401         base_url = 'http://localhost'
402         localLibrary = RDF.NS(base_url + '/library/')
403
404         flowcellInfo = """@prefix libns: <http://jumpgate.caltech.edu/wiki/LibraryOntology#> .
405
406 <{base}/flowcell/42BW9AAXX/>
407     libns:flowcell_id "42BW9AXX"@en ;
408     libns:has_lane <{base}/lane/1169>, <{base}/lane/1170>,
409                    <{base}/lane/1171>, <{base}/lane/1172> ;
410     libns:read_length 75 ;
411     a libns:IlluminaFlowcell .
412
413 <{base}/lane/1169>
414     libns:lane_number "1" ; libns:library <{base}/library/10923/> .
415 <{base}/lane/1170>
416     libns:lane_number "2" ; libns:library <{base}/library/10924/> .
417 <{base}/lane/1171>
418     libns:lane_number "3" ; libns:library <{base}/library/12345/> .
419 <{base}/lane/1172>
420     libns:lane_number "3" ; libns:library <{base}/library/10930/> .
421 """.format(base=base_url)
422         model = get_model()
423         load_string_into_model(model, 'turtle', flowcellInfo)
424         for seq in self._generate_sequences():
425             seq.save_to_model(model)
426         f = sequences.update_model_sequence_library(model, base_url=base_url)
427
428         libTerm = libraryOntology['library']
429         libIdTerm = libraryOntology['library_id']
430
431         url = 'file:///root/42BW9AAXX/C1-152/woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2'
432         nodes = list(model.get_targets(RDF.Uri(url), libTerm))
433         self.assertEqual(len(nodes), 1)
434         self.assertEqual(nodes[0], localLibrary['10923/'])
435         nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
436         self.assertEqual(len(nodes), 1)
437         self.assertEqual(fromTypedNode(nodes[0]), '10923')
438
439         url = 'file:///root/42BW9AAXX/C1-152/woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l2_r1.tar.bz2'
440         nodes = list(model.get_targets(RDF.Uri(url), libTerm))
441         self.assertEqual(len(nodes), 1)
442         self.assertEqual(nodes[0], localLibrary['10924/'])
443         nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
444         self.assertEqual(len(nodes), 1)
445         self.assertEqual(fromTypedNode(nodes[0]), '10924')
446
447         url = 'file:///root/42BW9AAXX/C1-38/Project_12345/12345_AAATTT_L003_R1_001.fastq.gz'
448         nodes = list(model.get_targets(RDF.Uri(url), libTerm))
449         self.assertEqual(len(nodes), 1)
450         self.assertEqual(nodes[0], localLibrary['12345/'])
451         nodes = list(model.get_targets(RDF.Uri(url), libIdTerm))
452         self.assertEqual(len(nodes), 1)
453         self.assertEqual(fromTypedNode(nodes[0]), '12345')
454
455     def test_load_from_model(self):
456         """Can we round trip through a RDF model"""
457         model = get_model()
458         path = '/root/42BW9AAXX/C1-38/Project_12345/'
459         filename = '12345_AAATTT_L003_R1_001.fastq.gz'
460         seq = sequences.parse_fastq(path, filename)
461         seq.save_to_model(model)
462
463         seq_id = 'file://'+path+filename
464         seqNode = RDF.Node(RDF.Uri(seq_id))
465         libNode = RDF.Node(RDF.Uri('http://localhost/library/12345'))
466         model.add_statement(
467             RDF.Statement(seqNode, libraryOntology['library'], libNode))
468         seq2 = sequences.SequenceFile.load_from_model(model, seq_id)
469
470         self.assertEqual(seq.flowcell, seq2.flowcell)
471         self.assertEqual(seq.flowcell, '42BW9AAXX')
472         self.assertEqual(seq.filetype, seq2.filetype)
473         self.assertEqual(seq2.filetype, 'split_fastq')
474         self.assertEqual(seq.lane, seq2.lane)
475         self.assertEqual(seq2.lane, '3')
476         self.assertEqual(seq.read, seq2.read)
477         self.assertEqual(seq2.read, 1)
478         self.assertEqual(seq.project, seq2.project)
479         self.assertEqual(seq2.project, '12345')
480         self.assertEqual(seq.index, seq2.index)
481         self.assertEqual(seq2.index, 'AAATTT')
482         self.assertEqual(seq.split, seq2.split)
483         self.assertEqual(seq2.split, '001')
484         self.assertEqual(seq.cycle, seq2.cycle)
485         self.assertEqual(seq.pf, seq2.pf)
486         self.assertEqual(seq2.libraryNode, libNode)
487         self.assertEqual(seq.path, seq2.path)
488
489     def test_scan_for_sequences(self):
490         # simulate tree
491         file_types_seen = set()
492         file_types_to_see = set(['fastq', 'srf', 'eland', 'qseq'])
493         lanes = set()
494         lanes_to_see = set(('1','2','3'))
495         with SimulateSimpleTree() as tree:
496             seqs = sequences.scan_for_sequences([tree.root, '/a/b/c/98345'])
497             for s in seqs:
498                 self.assertEquals(s.flowcell, '42BW9AAXX')
499                 self.assertEquals(s.cycle, 33)
500                 self.assertEquals(s.project, None)
501                 lanes.add(s.lane)
502                 file_types_seen.add(s.filetype)
503
504             self.assertEquals(len(seqs), 8)
505
506         self.assertEqual(lanes, lanes_to_see)
507         self.assertEqual(file_types_to_see, file_types_seen)
508         self.assertRaises(ValueError, sequences.scan_for_sequences, '/tmp')
509
510     def test_scan_for_hiseq_sequences(self):
511         # simulate tree
512         file_types_seen = set()
513         file_types_to_see = set(['split_fastq'])
514         lanes = set()
515         lanes_to_see = set(('1','2'))
516         projects_seen = set()
517         projects_to_see = set(('11111', '21111', '31111'))
518         with SimulateHiSeqTree() as tree:
519             seqs = sequences.scan_for_sequences([tree.root, '/a/b/c/98345'])
520             for s in seqs:
521                 self.assertEquals(s.flowcell, 'C02AAACXX')
522                 self.assertEquals(s.cycle, 101)
523                 lanes.add(s.lane)
524                 file_types_seen.add(s.filetype)
525                 projects_seen.add(s.project)
526
527             self.assertEquals(len(seqs), 12)
528
529         self.assertEqual(lanes, lanes_to_see)
530         self.assertEqual(file_types_to_see, file_types_seen)
531         self.assertEqual(projects_to_see, projects_seen)
532         # make sure we require a list, and not the confusing iterating over
533         # a string
534         self.assertRaises(ValueError, sequences.scan_for_sequences, '/tmp')
535
536 class SimulateTree(object):
537     def __enter__(self):
538         return self
539
540     def __exit__(self, exc_type, exc_val, exc_tb):
541         shutil.rmtree(self.root)
542
543     def mkflowcell(self, *components):
544         head = self.root
545         for c in components:
546             head = os.path.join(head, c)
547             if not os.path.exists(head):
548                 os.mkdir(head)
549         return head
550
551     def mkfile(self, flowcell, filename):
552         pathname = os.path.join(flowcell, filename)
553         stream = open(pathname,'w')
554         stream.write(pathname)
555         stream.write(os.linesep)
556         stream.close()
557
558 class SimulateHiSeqTree(SimulateTree):
559     def __init__(self):
560         self.root = tempfile.mkdtemp(prefix='sequences_')
561
562         files = [
563             ('Project_11111', '11111_AAGGCC_L001_R1_001.fastq.gz',),
564             ('Project_11111', '11111_AAGGCC_L001_R1_002.fastq.gz',),
565             ('Project_11111', '11111_AAGGCC_L001_R2_001.fastq.gz',),
566             ('Project_11111', '11111_AAGGCC_L001_R2_002.fastq.gz',),
567             ('Project_21111', '21111_TTTTTT_L001_R1_001.fastq.gz',),
568             ('Project_21111', '21111_TTTTTT_L001_R1_002.fastq.gz',),
569             ('Project_21111', '21111_TTTTTT_L001_R2_001.fastq.gz',),
570             ('Project_21111', '21111_TTTTTT_L001_R2_002.fastq.gz',),
571             ('Project_31111', '31111_NoIndex_L002_R1_001.fastq.gz',),
572             ('Project_31111', '31111_NoIndex_L002_R1_002.fastq.gz',),
573             ('Project_31111', '31111_NoIndex_L002_R2_001.fastq.gz',),
574             ('Project_31111', '31111_NoIndex_L002_R2_002.fastq.gz',),
575             ('.', '11111_AAGGCC_L001_R1_001_export.txt.gz'),
576             ('.', '11111_AAGGCC_L001_R1_002_export.txt.gz'),
577             ('.', '11111_AAGGCC_L001_R2_001_export.txt.gz'),
578             ('.', '11111_AAGGCC_L001_R2_002_export.txt.gz'),
579             ('.', '21111_AAGGCC_L001_R1_001_export.txt.gz'),
580             ('.', '21111_AAGGCC_L001_R1_002_export.txt.gz'),
581             ('.', '21111_AAGGCC_L001_R2_001_export.txt.gz'),
582             ('.', '21111_AAGGCC_L001_R2_002_export.txt.gz'),
583             ('.', '31111_NoIndex_L002_R1_001_export.txt.gz'),
584             ('.', '31111_NoIndex_L002_R1_002_export.txt.gz'),
585             ('.', '31111_NoIndex_L002_R2_001_export.txt.gz'),
586             ('.', '31111_NoIndex_L002_R2_002_export.txt.gz'),
587             ]
588         for d, f in files:
589             fc = self.mkflowcell(self.root, 'C02AAACXX', 'C1-101', d)
590             self.mkfile(fc, f)
591
592 class SimulateSimpleTree(SimulateTree):
593     def __init__(self):
594         self.root = tempfile.mkdtemp(prefix='sequences_')
595
596         fc = self.mkflowcell(self.root, '42BW9AAXX', 'C1-33')
597         files = [
598             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r1.tar.bz2',
599             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2',
600             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r1.tar.bz2.md5',
601             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l1_r2.tar.bz2.md5',
602             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_2.srf',
603             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l3_r1_pass.fastq.bz2',
604             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l3_r2_pass.fastq.bz2',
605             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l3_r1_nopass.fastq.bz2',
606             'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l3_r2_nopass.fastq.bz2',
607             's_1_eland_extended.txt.bz2',
608             's_1_eland_extended.txt.bz2.md5',
609             ]
610         for f in files:
611             self.mkfile(fc, f)
612
613
614 def suite():
615     from unittest import TestSuite, defaultTestLoader
616     suite = TestSuite()
617     suite.addTests(defaultTestLoader.loadTestsFromTestCase(SequenceFileTests))
618     return suite
619
620
621 if __name__ == "__main__":
622     from unittest import main
623     main(defaultTest="suite")