"""
Analyze ELAND files
"""
-
+import collections
from glob import glob
import logging
import os
XML_VERSION = 2
LANE = 'ResultLane'
- def __init__(self, pathname=None, lane_id=None, end=None, xml=None):
- self.pathname = pathname
+ def __init__(self, pathnames=None, lane_id=None, end=None, xml=None):
+ self.pathnames = pathnames
self._sample_name = None
self.lane_id = lane_id
self.end = end
def _update_name(self):
# extract the sample name
- if self.pathname is None:
+ if self.pathnames is None or len(self.pathnames) == 0:
return
- path, name = os.path.split(self.pathname)
- split_name = name.split('_')
- self._sample_name = split_name[0]
+ sample_names = set()
+ for pathname in self.pathnames:
+ path, name = os.path.split(pathname)
+ split_name = name.split('_')
+ sample_names.add(split_name[0])
+ if len(sample_names) > 1:
+ errmsg = "Attempting to update from more than one sample %s"
+ raise RuntimeError(errmsg % (",".join(sample_names)))
+ self._sample_name = sample_names.pop()
+ return self._sample_name
def _get_sample_name(self):
if self._sample_name is None:
SCORE_QC = 1
SCORE_READ = 2
- def __init__(self, pathname=None, lane_id=None, end=None, genome_map=None, eland_type=None, xml=None):
- super(ElandLane, self).__init__(pathname, lane_id, end)
+ def __init__(self, pathnames=None, lane_id=None, end=None, genome_map=None, eland_type=None, xml=None):
+ super(ElandLane, self).__init__(pathnames, lane_id, end)
self._mapped_reads = None
self._match_codes = None
Actually read the file and actually count the reads
"""
# can't do anything if we don't have a file to process
- if self.pathname is None:
+ if self.pathnames is None or len(self.pathnames) == 0:
return
- self._guess_eland_type(self.pathname)
+ pathname = self.pathnames[-1]
+ self._guess_eland_type(pathname)
- if os.stat(self.pathname)[stat.ST_SIZE] == 0:
+ if os.stat(pathname)[stat.ST_SIZE] == 0:
raise RuntimeError("Eland isn't done, try again later.")
- LOGGER.debug("summarizing results for %s" % (self.pathname))
+ LOGGER.debug("summarizing results for %s" % (pathname))
+ self._match_codes = MatchCodes()
+ self._mapped_reads = MappedReads()
+ self._reads = 0
+
+ for pathname in self.pathnames:
+ stream = autoopen(pathname, 'r')
+ if self.eland_type == ELAND_SINGLE:
+ result = self._update_eland_result(stream)
+ elif self.eland_type == ELAND_MULTI or \
+ self.eland_type == ELAND_EXTENDED:
+ result = self._update_eland_multi(stream)
+ elif self.eland_type == ELAND_EXPORT:
+ result = self._update_eland_export(stream)
+ else:
+ errmsg = "Only support single/multi/extended eland files"
+ raise NotImplementedError(errmsg)
+ stream.close()
- stream = autoopen(self.pathname, 'r')
- if self.eland_type == ELAND_SINGLE:
- result = self._update_eland_result(stream)
- elif self.eland_type == ELAND_MULTI or \
- self.eland_type == ELAND_EXTENDED:
- result = self._update_eland_multi(stream)
- elif self.eland_type == ELAND_EXPORT:
- result = self._update_eland_export(stream)
- else:
- raise NotImplementedError("Only support single/multi/extended eland files")
- self._match_codes, self._mapped_reads, self._reads = result
+ match, mapped, reads = result
+ self._match_codes += match
+ self._mapped_reads += mapped
+ self._reads += reads
def _update_eland_result(self, instream):
reads = 0
- mapped_reads = {}
+ mapped_reads = MappedReads()
+ match_codes = MatchCodes()
- match_codes = {'NM':0, 'QC':0, 'RM':0,
- 'U0':0, 'U1':0, 'U2':0,
- 'R0':0, 'R1':0, 'R2':0,
- }
for line in instream:
reads += 1
fields = line.split()
MATCH_INDEX = 2
LOCATION_INDEX = 3
reads = 0
- mapped_reads = {}
+ mapped_reads = MappedReads()
+ match_codes = MatchCodes()
- match_codes = {'NM':0, 'QC':0, 'RM':0,
- 'U0':0, 'U1':0, 'U2':0,
- 'R0':0, 'R1':0, 'R2':0,
- }
for line in instream:
reads += 1
fields = line.split()
LOCATION_INDEX = 10
DESCRIPTOR_INDEX= 13
reads = 0
- mapped_reads = {}
-
- match_codes = {'NM':0, 'QC':0, 'RM':0,
- 'U0':0, 'U1':0, 'U2':0,
- 'R0':0, 'R1':0, 'R2':0,
- }
+ mapped_reads = MappedReads()
+ match_codes = MatchCodes()
for line in instream:
reads += 1
groups = ElandLane.MATCH_COUNTS_RE.match(match)
if groups is None:
# match is not of the form [\d]+:[\d]+:[\d]+
- if match_codes.has_key(match):
+ if match in match_codes:
# match is one quality control codes QC/NM etc
match_codes[match] += 1
return ElandLane.SCORE_QC
else:
LOGGER.warn("ElandLane unrecognized tag %s" % (element.tag,))
+
+class MatchCodes(collections.MutableMapping):
+ """Mapping to hold match counts -
+ supports combining two match count sets together
+ """
+ def __init__(self, initializer=None):
+ self.match_codes = {'NM':0, 'QC':0, 'RM':0,
+ 'U0':0, 'U1':0, 'U2':0,
+ 'R0':0, 'R1':0, 'R2':0,
+ }
+
+ if initializer is not None:
+ if not isinstance(initializer, collections.Mapping):
+ raise ValueError("Expected dictionary like class")
+ for key in initializer:
+ if key not in self.match_codes:
+ errmsg = "Initializer can only contain: %s"
+ raise ValueError(errmsg % (",".join(self.match_codes.keys())))
+ self.match_codes[key] += initializer[key]
+
+ def __iter__(self):
+ return iter(self.match_codes)
+
+ def __delitem__(self, key):
+ raise RuntimeError("delete not allowed")
+
+ def __getitem__(self, key):
+ return self.match_codes[key]
+
+ def __setitem__(self, key, value):
+ if key not in self.match_codes:
+ errmsg = "Unrecognized key, allowed values are: %s"
+ raise ValueError(errmsg % (",".join(self.match_codes.keys())))
+ self.match_codes[key] = value
+
+ def __len__(self):
+ return len(self.match_codes)
+
+ def __add__(self, other):
+ if not isinstance(other, MatchCodes):
+ raise ValueError("Expected a MatchCodes, got %s", str(type(other)))
+
+ newobj = MatchCodes(self)
+ for key, value in other.items():
+ newobj[key] = self.get(key, 0) + other[key]
+
+ return newobj
+
+
+class MappedReads(collections.MutableMapping):
+ """Mapping to hold mapped reads -
+ supports combining two mapped read sets together
+ """
+ def __init__(self, initializer=None):
+ self.mapped_reads = {}
+
+ if initializer is not None:
+ if not isinstance(initializer, collections.Mapping):
+ raise ValueError("Expected dictionary like class")
+ for key in initializer:
+ self[key] = self.setdefault(key, 0) + initializer[key]
+
+ def __iter__(self):
+ return iter(self.mapped_reads)
+
+ def __delitem__(self, key):
+ del self.mapped_reads[key]
+
+ def __getitem__(self, key):
+ return self.mapped_reads[key]
+
+ def __setitem__(self, key, value):
+ self.mapped_reads[key] = value
+
+ def __len__(self):
+ return len(self.mapped_reads)
+
+ def __add__(self, other):
+ if not isinstance(other, MappedReads):
+ raise ValueError("Expected a MappedReads, got %s", str(type(other)))
+
+ newobj = MappedReads(self)
+ for key in other:
+ newobj[key] = self.get(key, 0) + other[key]
+
+ return newobj
+
class SequenceLane(ResultLane):
XML_VERSION=1
LANE = 'SequenceLane'
Actually read the file and actually count the reads
"""
# can't do anything if we don't have a file to process
- if self.pathname is None:
+ if self.pathnames is None:
return
- if os.stat(self.pathname)[stat.ST_SIZE] == 0:
+ pathname = self.pathnames[-1]
+ if os.stat(pathname)[stat.ST_SIZE] == 0:
raise RuntimeError("Sequencing isn't done, try again later.")
- self._guess_sequence_type(self.pathname)
+ self._guess_sequence_type(pathname)
- LOGGER.info("summarizing results for %s" % (self.pathname))
+ LOGGER.info("summarizing results for %s" % (pathname))
lines = 0
- f = open(self.pathname)
+ f = open(pathname)
for l in f.xreadlines():
lines += 1
f.close()
elif self.sequence_type == SequenceLane.FASTQ_TYPE:
self._reads = lines / 4
else:
- raise NotImplementedError("This only supports scarf or fastq squence files")
+ errmsg = "This only supports scarf or fastq squence files"
+ raise NotImplementedError(errmsg)
def get_elements(self):
lane = ElementTree.Element(SequenceLane.LANE,
self.results[end][lane_id] = lane
def check_for_eland_file(basedir, pattern, lane_id, end):
- if end is None:
- full_lane_id = lane_id
- else:
- full_lane_id = "%d_%d" % ( lane_id, end )
-
- basename = pattern % (full_lane_id,)
- LOGGER.debug("Eland pattern: %s" %(basename,))
- pathname = os.path.join(basedir, basename)
- if os.path.exists(pathname):
- LOGGER.info('found eland file in %s' % (pathname,))
- return pathname
- else:
- return None
-
-def update_result_with_eland(gerald, results, lane_id, end, pathname, genome_maps):
+ #if end is None:
+ # full_lane_id = lane_id
+ #else:
+ # full_lane_id = "%d_%d" % ( lane_id, end )
+ eland_files = []
+ eland_pattern = pattern % (lane_id, end)
+ eland_re = re.compile(eland_pattern)
+ LOGGER.debug("Eland pattern: %s" %(eland_pattern,))
+ for filename in os.listdir(basedir):
+ if eland_re.match(filename):
+ LOGGER.info('found eland file %s' % (filename,))
+ eland_files.append(os.path.join(basedir, filename))
+
+ return eland_files
+
+def update_result_with_eland(gerald, results, lane_id, end, pathnames, genome_maps):
# yes the lane_id is also being computed in ElandLane._update
# I didn't want to clutter up my constructor
# but I needed to persist the sample_name/lane_id for
# runfolder summary_report
- path, name = os.path.split(pathname)
- LOGGER.info("Adding eland file %s" %(name,))
- # split_name = name.split('_')
- # lane_id = int(split_name[1])
+ names = [ os.path.split(p)[1] for p in pathnames]
+ LOGGER.info("Adding eland files %s" %(",".join(names),))
genome_map = {}
if genome_maps is not None:
if genome_dir is not None:
genome_map = build_genome_fasta_map(genome_dir)
- lane = ElandLane(pathname, lane_id, end, genome_map)
+ lane = ElandLane(pathnames, lane_id, end, genome_map)
if end is None:
effective_end = 0
if os.path.isdir(basedir_temp):
basedirs.append(basedir_temp)
+ # So how about scanning for Project*/Sample* directories as well
+ sample_pattern = os.path.join(gerald_dir, 'Project_*', 'Sample_*')
+ basedirs.extend(glob(sample_pattern))
# the order in patterns determines the preference for what
# will be found.
MAPPED_ELAND = 0
SEQUENCE = 1
- patterns = [('s_%s_eland_result.txt', MAPPED_ELAND),
- ('s_%s_eland_result.txt.bz2', MAPPED_ELAND),
- ('s_%s_eland_result.txt.gz', MAPPED_ELAND),
- ('s_%s_eland_extended.txt', MAPPED_ELAND),
- ('s_%s_eland_extended.txt.bz2', MAPPED_ELAND),
- ('s_%s_eland_extended.txt.gz', MAPPED_ELAND),
- ('s_%s_eland_multi.txt', MAPPED_ELAND),
- ('s_%s_eland_multi.txt.bz2', MAPPED_ELAND),
- ('s_%s_eland_multi.txt.gz', MAPPED_ELAND),
- ('s_%s_export.txt', MAPPED_ELAND),
- ('s_%s_export.txt.bz2', MAPPED_ELAND),
- ('s_%s_export.txt.gz', MAPPED_ELAND),
- ('s_%s_sequence.txt', SEQUENCE),]
+ patterns = [
+ ('(?P<sampleId>[^_]+)_(?P<index>(NoIndex|[AGCT])+)_L00%s(_R%s)_(?P<part>[\d]+)_export.txt(?P<ext>(\.bz2|\.gz)?)', MAPPED_ELAND),
+ ('s_(?P<lane>%s)(_(?P<end>%s))?_eland_result.txt(?P<ext>(\.bz2|\.gz)?)',
+ MAPPED_ELAND),
+ ('s_(?P<lane>%s)(_(?P<end>%s))?_eland_extended.txt(?P<ext>(\.bz2|\.gz)?)',
+ MAPPED_ELAND),
+ ('s_(?P<lane>%s)(_(?P<end>%s))?_eland_multi.txt(?P<ext>(\.bz2|\.gz)?)',
+ MAPPED_ELAND),
+ ('s_(?P<lane>%s)(_(?P<end>%s))?_export.txt(?P<ext>(\.bz2|\.gz)?)',
+ MAPPED_ELAND),
+ ('s_(?P<lane>%s)(_(?P<end>%s))?_sequence.txt(?P<ext>(\.bz2|\.gz)?)',
+ SEQUENCE),
+
+ #('s_%s_eland_result.txt', MAPPED_ELAND),
+ #('s_%s_eland_result.txt.bz2', MAPPED_ELAND),
+ #('s_%s_eland_result.txt.gz', MAPPED_ELAND),
+ #('s_%s_eland_extended.txt', MAPPED_ELAND),
+ #('s_%s_eland_extended.txt.bz2', MAPPED_ELAND),
+ #('s_%s_eland_extended.txt.gz', MAPPED_ELAND),
+ #('s_%s_eland_multi.txt', MAPPED_ELAND),
+ #('s_%s_eland_multi.txt.bz2', MAPPED_ELAND),
+ #('s_%s_eland_multi.txt.gz', MAPPED_ELAND),
+ #('s_%s_export.txt', MAPPED_ELAND),
+ #('s_%s_export.txt.bz2', MAPPED_ELAND),
+ #('s_%s_export.txt.gz', MAPPED_ELAND),
+ #('s_%s_sequence.txt', SEQUENCE),
+ ]
for basedir in basedirs:
for end in ends:
for lane_id in lane_ids:
for p in patterns:
- pathname = check_for_eland_file(basedir, p[0], lane_id, end)
- if pathname is not None:
+ pathnames = check_for_eland_file(basedir, p[0], lane_id, end)
+ if len(pathnames) > 0:
if p[1] == MAPPED_ELAND:
- update_result_with_eland(gerald, e.results, lane_id, end, pathname, genome_maps)
+ update_result_with_eland(gerald, e.results, lane_id, end, pathnames, genome_maps)
elif p[1] == SEQUENCE:
- update_result_with_sequence(gerald, e.results, lane_id, end, pathname)
+ update_result_with_sequence(gerald, e.results, lane_id, end, pathnames)
break
else:
LOGGER.debug("No eland file found in %s for lane %s and end %s" %(basedir, lane_id, end))
from optparse import OptionParser
parser = OptionParser("%prog: <gerald dir>+")
opts, args = parser.parse_args(cmdline)
- LOGGER.basicConfig(level=logging.DEBUG)
+ logging.basicConfig(level=logging.DEBUG)
for a in args:
LOGGER.info("Starting scan of %s" % (a,))
e = eland(a)
if xml is not None:
self.set_elements(xml)
+ def _get_date(self):
+ if self.pathname is not None:
+ epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
+ return datetime.fromtimestamp(epochstamp)
+ return datetime.today()
+
def _get_time(self):
return time.mktime(self.date.timetuple())
time = property(_get_time, doc='return run time as seconds since epoch')
print 'config.xml:', self.tree
self.summary.dump()
- def get_elements(self):
+ def get_elements(self, root_tag):
if self.tree is None or self.summary is None:
return None
- gerald = ElementTree.Element(Gerald.GERALD,
+ gerald = ElementTree.Element(root_tag,
{'version': unicode(Gerald.XML_VERSION)})
gerald.append(self.tree)
gerald.append(self.summary.get_elements())
gerald.append(self.eland_results.get_elements())
return gerald
- def set_elements(self, tree):
- if tree.tag != self.__class__.GERALD:
- raise ValueError('expected GERALD')
+ def set_elements(self, tree, root_tag):
+ if tree.tag != root_tag:
+ raise ValueError('expected %s' % (self.__class__.GERALD,))
xml_version = int(tree.attrib.get('version', 0))
if xml_version > Gerald.XML_VERSION:
LOGGER.warn('XML tree is a higher version than this class')
def _get_date(self):
if self.tree is None:
return datetime.today()
+
timestamp = self.tree.findtext('ChipWideRunParameters/TIME_STAMP')
if timestamp is not None:
epochstamp = time.mktime(time.strptime(timestamp, '%c'))
return datetime.fromtimestamp(epochstamp)
- if self.pathname is not None:
- epochstamp = os.stat(self.pathname)[stat.ST_MTIME]
- return datetime.fromtimestamp(epochstamp)
- return datetime.today()
+ return super(Gerald, self)._get_date()
date = property(_get_date)
+ def get_elements(self):
+ return super(Gerald, self).get_elements(Gerald.GERALD)
+
+ def set_elements(self, tree):
+ return super(Gerald, self).set_elements(tree, Gerald.GERALD)
+
def _get_experiment_root(self):
if self.tree is None:
return None
class CASAVA(Alignment):
GERALD='Casava'
+ def __init__(self, xml=None, pathname=None, tree=None):
+ super(CASAVA, self).__init__(xml=xml, pathname=pathname, tree=tree)
+
+ self._add_timestamp()
+
+ def _add_timestamp(self):
+ """Manually add a time stamp to CASAVA runs"""
+ if self.tree is None:
+ return
+ if len(self.tree.xpath('TIME_STAMP')) == 0:
+ time_stamp = self.date.strftime('%c')
+ time_element = ElementTree.Element('TIME_STAMP')
+ time_element.text = time_stamp
+ self.tree.append(time_element)
+
+ def _get_date(self):
+ if self.tree is None:
+ return None
+ time_element = self.tree.xpath('TIME_STAMP')
+ if len(time_element) == 1:
+ return datetime.strptime(time_element[0].text, '%c')
+ return super(CASAVA, self)._get_date()
+ date = property(_get_date)
+
+ def get_elements(self):
+ tree = super(CASAVA, self).get_elements(CASAVA.GERALD)
+ return tree
+
+ def set_elements(self, tree):
+ return super(CASAVA, self).set_elements(tree, CASAVA.GERALD)
+
def _get_runfolder_name(self):
if self.tree is None:
return None
g = CASAVA(pathname=pathname, tree=config_tree)
LOGGER.info("Parsing %s" % (report_summary,))
g.summary = SummaryHiSeq(report_summary)
+ g.eland_results = eland(g.pathname, g)
# parse eland files
return g
for lanes_dictionary in gerald_object.eland_results.results:
for eland_lane in lanes_dictionary.values():
- source_name = eland_lane.pathname
- if source_name is None:
- LOGGER.info(
- "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
- else:
- path, name = os.path.split(source_name)
- dest_name = os.path.join(cycle_dir, name)
- LOGGER.info("Saving eland file %s to %s" % \
- (source_name, dest_name))
-
- if is_compressed(name):
- LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
- shutil.copy(source_name, dest_name)
- else:
- # not compressed
- dest_name += '.bz2'
- args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
- bz_commands.append(" ".join(args))
- #LOGGER.info('Running: %s' % ( " ".join(args) ))
- #bzip_dest = open(dest_name, 'w')
- #bzip = subprocess.Popen(args, stdout=bzip_dest)
- #LOGGER.info('Saving to %s' % (dest_name, ))
- #bzip.wait()
+ for source_name in eland_lane.pathnames:
+ if source_name is None:
+ LOGGER.info(
+ "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
+ else:
+ path, name = os.path.split(source_name)
+ dest_name = os.path.join(cycle_dir, name)
+ LOGGER.info("Saving eland file %s to %s" % \
+ (source_name, dest_name))
+
+ if is_compressed(name):
+ LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
+ shutil.copy(source_name, dest_name)
+ else:
+ # not compressed
+ dest_name += '.bz2'
+ args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
+ bz_commands.append(" ".join(args))
+ #LOGGER.info('Running: %s' % ( " ".join(args) ))
+ #bzip_dest = open(dest_name, 'w')
+ #bzip = subprocess.Popen(args, stdout=bzip_dest)
+ #LOGGER.info('Saving to %s' % (dest_name, ))
+ #bzip.wait()
if len(bz_commands) > 0:
q = QueueCommands(bz_commands, num_jobs)
summary_dest = os.path.join(paths.summary_dir, 'Sample_Summary.htm')
shutil.copy(summary_source, summary_dest)
- body = get_unaligned_sample_export(lane, index_seq)
+ body = get_aligned_sample_export(lane, index_seq)
for split in ['001','002']:
for read in UNALIGNED_READS:
suffix = 'R{0}_{1}_export.txt.gz'.format(read, split)
""".format(flowcell=flowcell_id, lane=lane, index=index_seq)
return seq
-def get_unaligned_sample_export(lane, index_seq):
+def get_aligned_sample_export(lane, index_seq):
body = """HWI-ST0787\t102\t{lane}\t1101\t1207\t1993\t{index}\t1\tAANGGATTCGATCCGGCTTAAGAGATGAAAACCGAAAGGGCCGACCGAA\taaBS`ccceg[`ae[dRR_[[SPPPP__ececfYYWaegh^\\ZLLY\\X`\tNM\t\t\t\t\t\t
-HWI-ST0787\t102 {lane} 1101 1478 1997 {index} 1 CAAGAACCCCGGGGGGGGGGGGGCAGAGAGGGGGAATTTTTTTTTTGTT BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB NM N
-HWI-ST0787 102 {lane} 1101 1625 1994 {index} 1 AANAATGCTACAGAGACAAAACAAAACTGATATGAAAGTTGAGAATAAA \^BS\cccgegg[Q[QQQ[`egdgffbeggfgh^^YcfgfhXaHY^O^c chrII.fa
+HWI-ST0787\t102\t{lane}\t1101\t1478\t1997\t{index}\t1\tCAAGAACCCCGGGGGGGGGGGGGCAGAGAGGGGGAATTTTTTTTTTGTT\tBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\tNM\t\t\t\t\t\t\t\t\t\t\tN
+HWI-ST0787\t102\t{lane}\t1101\t1625\t1994\t{index}\t1\tAANAATGCTACAGAGACAAAACAAAACTGATATGAAAGTTGAGAATAAA\tB^BS\cccgegg[Q[QQQ[`egdgffbeggfgh^^YcfgfhXaHY^O^c\tchrII.fa\t67717938\tR\t99\t72
+HWI-ST0787\t102\t{lane}\t1101\t1625\t1994\t{index}\t1\tAANAATGCTACAGAGACAAAACAAAACTGATATGAAAGTTGAGAATAAA\tB^BS\cccgegg[Q[QQQ[`egdgffbeggfgh^^YcfgfhXaHY^O^c\t3:4:3\t\t\t\t\t\t\t\t\t\t\tY
""".format(lane=lane, index=index_seq)
return body
from StringIO import StringIO
import unittest
-from htsworkflow.pipelines.eland import ElandLane
+from htsworkflow.pipelines.eland import ElandLane, MatchCodes, MappedReads
+
+class MatchCodeTests(unittest.TestCase):
+ def test_initializer(self):
+ self.assertRaises(ValueError, MatchCodes, {'foo':'bar'})
+ self.assertRaises(ValueError, MatchCodes, 3)
+ mc = MatchCodes(None)
+
+ def test_dictlike(self):
+ mc = MatchCodes()
+ match_codes = {'NM':0, 'QC':0, 'RM':0,
+ 'U0':0, 'U1':0, 'U2':0,
+ 'R0':0, 'R1':0, 'R2':0,
+ }
+ self.assertEqual(mc.keys(), match_codes.keys())
+ self.assertEqual(mc.items(), match_codes.items())
+ self.assertEqual(mc.values(), match_codes.values())
+ self.assertRaises(KeyError, mc.__getitem__, 'foo')
+
+ def test_addition(self):
+ mc1 = MatchCodes()
+ mc2 = MatchCodes({'NM':5, 'QC':10, 'U0': 100})
+
+ mc1['NM'] += 5
+ self.assertEqual(mc1['NM'], 5)
+ self.assertEqual(mc1['QC'], 0)
+ self.assertEqual(mc1['U0'], 0)
+ mc1 += mc2
+ self.assertEqual(mc1['NM'], 10)
+ self.assertEqual(mc1['QC'], 10)
+ self.assertEqual(mc1['U0'], 100)
+
+
+class TestMappedReads(unittest.TestCase):
+ def test_initializer(self):
+ mr1 = MappedReads()
+ self.assertEqual(len(mr1), 0)
+ mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 12})
+ self.assertEqual(len(mr2), 2)
+ self.assertEqual(mr2['hg19'], 100)
+
+ self.assertRaises(ValueError, MappedReads, 3)
+
+ def test_dictionaryness(self):
+ mr1 = MappedReads()
+ mr1['chr9'] = 7
+ self.assertEqual(list(mr1.keys()), ['chr9'])
+ self.assertEqual(mr1['chr9'], 7)
+ self.assertEqual(mr1.items(), [('chr9', 7)])
+ del mr1['chr9']
+ self.assertEqual(len(mr1), 0)
+
+ def test_addition(self):
+ mr1 = MappedReads({'hg19': 100, 'Lambda1': 5})
+ mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 10})
+ mr3 = mr1 + mr2
+
+ self.assertEqual(len(mr1), 2)
+ self.assertEqual(len(mr2), 2)
+ self.assertEqual(len(mr3), 3)
+
+ self.assertEqual(mr1['Lambda1'], 5)
+ self.assertRaises(KeyError, mr1.__getitem__, 'newcontamUK.fa')
+ self.assertEqual(mr1.get('newcontamUK.fa', None), None)
+
+ mr3['Lambda3'] = 2
+ self.assertEqual(mr3['Lambda3'], 2)
class ElandTests(unittest.TestCase):
"""Test specific Eland modules
"""
def compare_match_array(self, current, expected):
for key in expected.keys():
- self.failUnlessEqual(current[key], expected[key],
+ self.assertEqual(current[key], expected[key],
"Key %s: %s != %s" % (key,current[key],expected[key]))
def test_eland_score_mapped_mismatches(self):
'R0':0, 'R1':0, 'R2':0,
}
r = eland._score_mapped_mismatches("QC", match_codes)
- self.failUnlessEqual(r, ElandLane.SCORE_QC)
- self.compare_match_array(match_codes,
+ self.assertEqual(r, ElandLane.SCORE_QC)
+ self.compare_match_array(match_codes,
{'NM':0, 'QC':1, 'RM':0,
'U0':0, 'U1':0, 'U2':0,
'R0':0, 'R1':0, 'R2':0,
})
r = eland._score_mapped_mismatches("NM", match_codes)
- self.failUnlessEqual(r, ElandLane.SCORE_QC)
- self.compare_match_array(match_codes,
+ self.assertEqual(r, ElandLane.SCORE_QC)
+ self.compare_match_array(match_codes,
{'NM':1, 'QC':1, 'RM':0,
'U0':0, 'U1':0, 'U2':0,
'R0':0, 'R1':0, 'R2':0,
})
r = eland._score_mapped_mismatches("1:0:0", match_codes)
- self.failUnlessEqual(r, ElandLane.SCORE_READ)
- self.compare_match_array(match_codes,
+ self.assertEqual(r, ElandLane.SCORE_READ)
+ self.compare_match_array(match_codes,
{'NM':1, 'QC':1, 'RM':0,
'U0':1, 'U1':0, 'U2':0,
'R0':0, 'R1':0, 'R2':0,
})
r = eland._score_mapped_mismatches("2:4:16", match_codes)
- self.failUnlessEqual(r, ElandLane.SCORE_READ)
- self.compare_match_array(match_codes,
+ self.assertEqual(r, ElandLane.SCORE_READ)
+ self.compare_match_array(match_codes,
{'NM':1, 'QC':1, 'RM':0,
'U0':1, 'U1':0, 'U2':0,
'R0':2, 'R1':4, 'R2':16,
})
r = eland._score_mapped_mismatches("1:1:1", match_codes)
- self.failUnlessEqual(r, ElandLane.SCORE_READ)
- self.compare_match_array(match_codes,
+ self.assertEqual(r, ElandLane.SCORE_READ)
+ self.compare_match_array(match_codes,
{'NM':1, 'QC':1, 'RM':0,
'U0':2, 'U1':1, 'U2':1,
'R0':2, 'R1':4, 'R2':16,
})
r = eland._score_mapped_mismatches("1:0:0", match_codes)
- self.failUnlessEqual(r, ElandLane.SCORE_READ)
- self.compare_match_array(match_codes,
+ self.assertEqual(r, ElandLane.SCORE_READ)
+ self.compare_match_array(match_codes,
{'NM':1, 'QC':1, 'RM':0,
'U0':3, 'U1':1, 'U2':1,
'R0':2, 'R1':4, 'R2':16,
})
r = eland._score_mapped_mismatches("0:0:1", match_codes)
- self.failUnlessEqual(r, ElandLane.SCORE_READ)
- self.compare_match_array(match_codes,
+ self.assertEqual(r, ElandLane.SCORE_READ)
+ self.compare_match_array(match_codes,
{'NM':1, 'QC':1, 'RM':0,
'U0':3, 'U1':1, 'U2':2,
'R0':2, 'R1':4, 'R2':16,
})
r = eland._score_mapped_mismatches("chr3.fa", match_codes)
- self.failUnlessEqual(r, ElandLane.SCORE_UNRECOGNIZED)
- self.compare_match_array(match_codes,
+ self.assertEqual(r, ElandLane.SCORE_UNRECOGNIZED)
+ self.compare_match_array(match_codes,
{'NM':1, 'QC':1, 'RM':0,
'U0':3, 'U1':1, 'U2':2,
'R0':2, 'R1':4, 'R2':16,
})
-
+
def test_count_mapped_export(self):
eland = ElandLane()
mapped_reads = {}
r = eland._count_mapped_export(mapped_reads, "chr3.fa", "38")
- self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
- self.failUnlessEqual(r, 'U0')
+ self.assertEqual(mapped_reads['chr3.fa'], 1)
+ self.assertEqual(r, 'U0')
mapped_reads = {}
r = eland._count_mapped_export(mapped_reads, "chr3.fa", "36A4")
- self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
- self.failUnlessEqual(r, 'U1')
+ self.assertEqual(mapped_reads['chr3.fa'], 1)
+ self.assertEqual(r, 'U1')
mapped_reads = {}
r = eland._count_mapped_export(mapped_reads, "chr3.fa", "30A2T2")
- self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
- self.failUnlessEqual(r, 'U2')
+ self.assertEqual(mapped_reads['chr3.fa'], 1)
+ self.assertEqual(r, 'U2')
mapped_reads = {}
r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26AG2T2")
- self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
- self.failUnlessEqual(r, 'U2')
+ self.assertEqual(mapped_reads['chr3.fa'], 1)
+ self.assertEqual(r, 'U2')
# deletion
mapped_reads = {}
r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^AG$4")
- self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
- self.failUnlessEqual(r, 'U2')
+ self.assertEqual(mapped_reads['chr3.fa'], 1)
+ self.assertEqual(r, 'U2')
# insertion
mapped_reads = {}
r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^2$4")
- self.failUnlessEqual(mapped_reads['chr3.fa'], 1)
- self.failUnlessEqual(r, 'U0')
+ self.assertEqual(mapped_reads['chr3.fa'], 1)
+ self.assertEqual(r, 'U0')
def test_update_eland_export(self):
"""Test scoring the pipeline export file"""
multi_read = StringIO("ILLUMINA-33A494 1 1 1 4405 1046 0 1 GTGGTTTCGCTGGATAGTNNGTAGGGACAGTGGGAATC ``````````__a__V^XBB^SW^^a_____a______ 9:2:1")
match_codes, match_reads, reads = eland._update_eland_export(qc_read)
- self.compare_match_array(match_codes,
+ self.compare_match_array(match_codes,
{'NM':0, 'QC':1, 'RM':0,
'U0':0, 'U1':0, 'U2':0,
'R0':0, 'R1':0, 'R2':0,
})
- self.failUnlessEqual(len(match_reads), 0)
- self.failUnlessEqual(reads, 1)
+ self.assertEqual(len(match_reads), 0)
+ self.assertEqual(reads, 1)
match_codes, match_reads, reads = eland._update_eland_export(one_read_exact)
- self.compare_match_array(match_codes,
+ self.compare_match_array(match_codes,
{'NM':0, 'QC':0, 'RM':0,
'U0':1, 'U1':0, 'U2':0,
'R0':0, 'R1':0, 'R2':0,
})
- self.failUnlessEqual(match_reads['chrX.fa'], 1)
- self.failUnlessEqual(reads, 1)
+ self.assertEqual(match_reads['chrX.fa'], 1)
+ self.assertEqual(reads, 1)
match_codes, match_reads, reads = eland._update_eland_export(one_read_mismatch)
- self.compare_match_array(match_codes,
+ self.compare_match_array(match_codes,
{'NM':0, 'QC':0, 'RM':0,
'U0':0, 'U1':0, 'U2':1,
'R0':0, 'R1':0, 'R2':0,
})
- self.failUnlessEqual(match_reads['chrX.fa'], 1)
- self.failUnlessEqual(reads, 1)
+ self.assertEqual(match_reads['chrX.fa'], 1)
+ self.assertEqual(reads, 1)
match_codes, match_reads, reads = eland._update_eland_export(multi_read)
- self.compare_match_array(match_codes,
+ self.compare_match_array(match_codes,
{'NM':0, 'QC':0, 'RM':0,
'U0':0, 'U1':0, 'U2':1,
'R0':9, 'R1':2, 'R2':0,
})
- self.failUnlessEqual(len(match_reads), 0)
- self.failUnlessEqual(reads, 1)
-
+ self.assertEqual(len(match_reads), 0)
+ self.assertEqual(reads, 1)
-def suite():
- return unittest.makeSuite(ElandTests, 'test')
if __name__ == "__main__":
- unittest.main(defaultTest="suite")
+ unittest.main()
obj.image_analysis_dir = intensities_dir
obj.bustard_dir = unaligned_dir
obj.gerald_dir = aligned_dir
+ obj.reads = 2
class RunfolderTests(unittest.TestCase):
(1854131, 429053.2), (4777517, 592904.0),
]
- self.failUnlessEqual(len(g.summary), 2)
+ self.failUnlessEqual(len(g.summary), self.reads)
for i in range(1,9):
summary_lane = g.summary[0][i]
self.failUnlessEqual(summary_lane.cluster, clusters[i])
xml = g.get_elements()
# just make sure that element tree can serialize the tree
xml_str = ElementTree.tostring(xml)
- g2 = gerald.Gerald(xml=xml)
- return
+ g2 = gerald.CASAVA(xml=xml)
# do it all again after extracting from the xml file
self.failUnlessEqual(g.software, g2.software)
self.failUnlessEqual(len(g.lanes.items()), len(g2.lanes.items()))
# test lane specific parameters from gerald config file
- for i in range(1,9):
+ for i in g.lanes.keys():
g_lane = g.lanes[i]
g2_lane = g2.lanes[i]
self.failUnlessEqual(g_lane.analysis, g2_lane.analysis)
self.failUnlessEqual(g_lane.use_bases, g2_lane.use_bases)
# test (some) summary elements
- self.failUnlessEqual(len(g.summary), 1)
+ self.failUnlessEqual(len(g.summary), self.reads)
for i in range(1,9):
g_summary = g.summary[0][i]
g2_summary = g2.summary[0][i]
def test_eland(self):
- return
hg_map = {'Lambda.fa': 'Lambda.fa'}
for i in range(1,22):
short_name = 'chr%d.fa' % (i,)