Update fastqname code & test code to better support having optional compression exten...
[htsworkflow.git] / htsworkflow / submission / fastqname.py
1 """Standardize reading and writing fastq submission names.
2 """
3 import collections
4 import re
5 PAIRED_TEMPLATE = '{lib_id}_{flowcell}_c{cycle}_l{lane}_r{read}.fastq{compression_extension}'
6 SINGLE_TEMPLATE = '{lib_id}_{flowcell}_c{cycle}_l{lane}.fastq{compression_extension}'
7
8 FASTQ_RE = re.compile(
9     '(?P<lib_id>[^_]+)_(?P<flowcell>[^_]+)_'\
10     'c(?P<cycle>[\d]+)_l(?P<lane>[\d]+)(_r(?P<read>[\d]))?\.fastq(?P<compression_extension>.[\w]+)?')
11
12 class FastqName(collections.Mapping):
13     """Utility class to convert to the standardized submission fastq name.
14     """
15     def __init__(self, is_paired=None, **kwargs):
16         """Create a fastq name handler.
17
18         Takes filename or common attributes like flowcell, lib_id, lane, read, cycle
19         """
20         self._attributes = ('flowcell', 'lib_id', 'lane', 'read', 'cycle', 'compression_extension')
21         self._is_paired = is_paired
22
23         if len(kwargs) == 0:
24             return
25         if 'filename' in kwargs:
26             self._init_by_filename(**kwargs)
27         else:
28             self._init_by_attributes(**kwargs)
29
30     def _init_by_attributes(self, **kwargs):
31         for k in self._attributes:
32             value = None
33             if k in kwargs:
34                 value = kwargs[k]
35             self[k] = value
36
37     def _init_by_filename(self, filename):
38         match = FASTQ_RE.match(filename)
39         if match is None:
40             raise ValueError('Is "{0}" a submission fastq?'.format(filename))
41
42         for k in self._attributes:
43             self[k] = match.group(k)
44
45     def _get_is_paired(self):
46         if self._is_paired is None:
47             return getattr(self, 'read', None) is not None
48         else:
49             return self._is_paired
50     def _set_is_paired(self, value):
51         self._is_paired = value
52     is_paired = property(_get_is_paired, _set_is_paired)
53
54     def _is_valid(self):
55         if self.is_paired and self['read'] is None:
56             return False
57
58         for k in self.keys():
59             if k == 'read':
60                 continue
61             elif k == 'compression_extension':
62                 if self[k] not in (None, '', '.gz', '.bz2'):
63                     return False
64             elif self[k] is None:
65                 return False
66         return True
67     is_valid = property(_is_valid)
68
69     def _get_filename(self):
70         if not self.is_valid:
71             raise ValueError(
72                 "Please set all needed variables before generating a filename")
73
74         T = PAIRED_TEMPLATE if self.is_paired else SINGLE_TEMPLATE
75         attributes = {}
76         for k in self:
77             v = self[k]
78             attributes[k] = v if v is not None else ''
79         return T.format(**attributes)
80     filename = property(_get_filename)
81
82     def __iter__(self):
83         return iter(self._attributes)
84
85     def __getitem__(self, key):
86         return getattr(self, key, None)
87
88     def __setitem__(self, key, value):
89         if key in self._attributes:
90             setattr(self, key, value)
91         else:
92             raise ValueError("Unrecognized key {0}".format(key))
93
94     def __len__(self):
95         return len([k for k in self if self[k] is not None])