From 2efb06f20edd9eea5bcd606dd95450da98cf3a4a Mon Sep 17 00:00:00 2001 From: Diane Trout Date: Fri, 29 Jun 2012 20:06:33 -0700 Subject: [PATCH] Support multiple having an eland file split into multiple fragments. The interesting thing was changing the match_codes and mapped_reads dictionaries into their own classes so I can add them together, and it'll go through and add matching dictionary entries together. e.g. MatchCodes({'U0':5})+MatchCodes({'U0':5}) == MatchCodes({'U0': 10}) Unfortunately because there are multiple samples per lane on the hiseq this still isn't enough to support counting aligned results yet. --- htsworkflow/pipelines/eland.py | 283 ++++++++++++++++------- htsworkflow/pipelines/runfolder.py | 46 ++-- htsworkflow/pipelines/test/test_eland.py | 157 +++++++++---- 3 files changed, 331 insertions(+), 155 deletions(-) diff --git a/htsworkflow/pipelines/eland.py b/htsworkflow/pipelines/eland.py index cf6de6c..e63eab6 100644 --- a/htsworkflow/pipelines/eland.py +++ b/htsworkflow/pipelines/eland.py @@ -1,7 +1,7 @@ """ Analyze ELAND files """ - +import collections from glob import glob import logging import os @@ -40,8 +40,8 @@ class ResultLane(object): XML_VERSION = 2 LANE = 'ResultLane' - def __init__(self, pathname=None, lane_id=None, end=None, xml=None): - self.pathname = pathname + def __init__(self, pathnames=None, lane_id=None, end=None, xml=None): + self.pathnames = pathnames self._sample_name = None self.lane_id = lane_id self.end = end @@ -58,12 +58,19 @@ class ResultLane(object): def _update_name(self): # extract the sample name - if self.pathname is None: + if self.pathnames is None or len(self.pathnames) == 0: return - path, name = os.path.split(self.pathname) - split_name = name.split('_') - self._sample_name = split_name[0] + sample_names = set() + for pathname in self.pathnames: + path, name = os.path.split(pathname) + split_name = name.split('_') + sample_names.add(split_name[0]) + if len(sample_names) > 1: + errmsg = "Attempting to update from more than one sample %s" + raise RuntimeError(errmsg % (",".join(sample_names))) + self._sample_name = sample_names.pop() + return self._sample_name def _get_sample_name(self): if self._sample_name is None: @@ -93,8 +100,8 @@ class ElandLane(ResultLane): SCORE_QC = 1 SCORE_READ = 2 - def __init__(self, pathname=None, lane_id=None, end=None, genome_map=None, eland_type=None, xml=None): - super(ElandLane, self).__init__(pathname, lane_id, end) + def __init__(self, pathnames=None, lane_id=None, end=None, genome_map=None, eland_type=None, xml=None): + super(ElandLane, self).__init__(pathnames, lane_id, end) self._mapped_reads = None self._match_codes = None @@ -126,35 +133,43 @@ class ElandLane(ResultLane): Actually read the file and actually count the reads """ # can't do anything if we don't have a file to process - if self.pathname is None: + if self.pathnames is None or len(self.pathnames) == 0: return - self._guess_eland_type(self.pathname) + pathname = self.pathnames[-1] + self._guess_eland_type(pathname) - if os.stat(self.pathname)[stat.ST_SIZE] == 0: + if os.stat(pathname)[stat.ST_SIZE] == 0: raise RuntimeError("Eland isn't done, try again later.") - LOGGER.debug("summarizing results for %s" % (self.pathname)) + LOGGER.debug("summarizing results for %s" % (pathname)) + self._match_codes = MatchCodes() + self._mapped_reads = MappedReads() + self._reads = 0 + + for pathname in self.pathnames: + stream = autoopen(pathname, 'r') + if self.eland_type == ELAND_SINGLE: + result = self._update_eland_result(stream) + elif self.eland_type == ELAND_MULTI or \ + self.eland_type == ELAND_EXTENDED: + result = self._update_eland_multi(stream) + elif self.eland_type == ELAND_EXPORT: + result = self._update_eland_export(stream) + else: + errmsg = "Only support single/multi/extended eland files" + raise NotImplementedError(errmsg) + stream.close() - stream = autoopen(self.pathname, 'r') - if self.eland_type == ELAND_SINGLE: - result = self._update_eland_result(stream) - elif self.eland_type == ELAND_MULTI or \ - self.eland_type == ELAND_EXTENDED: - result = self._update_eland_multi(stream) - elif self.eland_type == ELAND_EXPORT: - result = self._update_eland_export(stream) - else: - raise NotImplementedError("Only support single/multi/extended eland files") - self._match_codes, self._mapped_reads, self._reads = result + match, mapped, reads = result + self._match_codes += match + self._mapped_reads += mapped + self._reads += reads def _update_eland_result(self, instream): reads = 0 - mapped_reads = {} + mapped_reads = MappedReads() + match_codes = MatchCodes() - match_codes = {'NM':0, 'QC':0, 'RM':0, - 'U0':0, 'U1':0, 'U2':0, - 'R0':0, 'R1':0, 'R2':0, - } for line in instream: reads += 1 fields = line.split() @@ -174,12 +189,9 @@ class ElandLane(ResultLane): MATCH_INDEX = 2 LOCATION_INDEX = 3 reads = 0 - mapped_reads = {} + mapped_reads = MappedReads() + match_codes = MatchCodes() - match_codes = {'NM':0, 'QC':0, 'RM':0, - 'U0':0, 'U1':0, 'U2':0, - 'R0':0, 'R1':0, 'R2':0, - } for line in instream: reads += 1 fields = line.split() @@ -204,12 +216,8 @@ class ElandLane(ResultLane): LOCATION_INDEX = 10 DESCRIPTOR_INDEX= 13 reads = 0 - mapped_reads = {} - - match_codes = {'NM':0, 'QC':0, 'RM':0, - 'U0':0, 'U1':0, 'U2':0, - 'R0':0, 'R1':0, 'R2':0, - } + mapped_reads = MappedReads() + match_codes = MatchCodes() for line in instream: reads += 1 @@ -240,7 +248,7 @@ class ElandLane(ResultLane): groups = ElandLane.MATCH_COUNTS_RE.match(match) if groups is None: # match is not of the form [\d]+:[\d]+:[\d]+ - if match_codes.has_key(match): + if match in match_codes: # match is one quality control codes QC/NM etc match_codes[match] += 1 return ElandLane.SCORE_QC @@ -431,6 +439,93 @@ class ElandLane(ResultLane): else: LOGGER.warn("ElandLane unrecognized tag %s" % (element.tag,)) + +class MatchCodes(collections.MutableMapping): + """Mapping to hold match counts - + supports combining two match count sets together + """ + def __init__(self, initializer=None): + self.match_codes = {'NM':0, 'QC':0, 'RM':0, + 'U0':0, 'U1':0, 'U2':0, + 'R0':0, 'R1':0, 'R2':0, + } + + if initializer is not None: + if not isinstance(initializer, collections.Mapping): + raise ValueError("Expected dictionary like class") + for key in initializer: + if key not in self.match_codes: + errmsg = "Initializer can only contain: %s" + raise ValueError(errmsg % (",".join(self.match_codes.keys()))) + self.match_codes[key] += initializer[key] + + def __iter__(self): + return iter(self.match_codes) + + def __delitem__(self, key): + raise RuntimeError("delete not allowed") + + def __getitem__(self, key): + return self.match_codes[key] + + def __setitem__(self, key, value): + if key not in self.match_codes: + errmsg = "Unrecognized key, allowed values are: %s" + raise ValueError(errmsg % (",".join(self.match_codes.keys()))) + self.match_codes[key] = value + + def __len__(self): + return len(self.match_codes) + + def __add__(self, other): + if not isinstance(other, MatchCodes): + raise ValueError("Expected a MatchCodes, got %s", str(type(other))) + + newobj = MatchCodes(self) + for key, value in other.items(): + newobj[key] = self.get(key, 0) + other[key] + + return newobj + + +class MappedReads(collections.MutableMapping): + """Mapping to hold mapped reads - + supports combining two mapped read sets together + """ + def __init__(self, initializer=None): + self.mapped_reads = {} + + if initializer is not None: + if not isinstance(initializer, collections.Mapping): + raise ValueError("Expected dictionary like class") + for key in initializer: + self[key] = self.setdefault(key, 0) + initializer[key] + + def __iter__(self): + return iter(self.mapped_reads) + + def __delitem__(self, key): + del self.mapped_reads[key] + + def __getitem__(self, key): + return self.mapped_reads[key] + + def __setitem__(self, key, value): + self.mapped_reads[key] = value + + def __len__(self): + return len(self.mapped_reads) + + def __add__(self, other): + if not isinstance(other, MappedReads): + raise ValueError("Expected a MappedReads, got %s", str(type(other))) + + newobj = MappedReads(self) + for key in other: + newobj[key] = self.get(key, 0) + other[key] + + return newobj + class SequenceLane(ResultLane): XML_VERSION=1 LANE = 'SequenceLane' @@ -465,17 +560,18 @@ class SequenceLane(ResultLane): Actually read the file and actually count the reads """ # can't do anything if we don't have a file to process - if self.pathname is None: + if self.pathnames is None: return - if os.stat(self.pathname)[stat.ST_SIZE] == 0: + pathname = self.pathnames[-1] + if os.stat(pathname)[stat.ST_SIZE] == 0: raise RuntimeError("Sequencing isn't done, try again later.") - self._guess_sequence_type(self.pathname) + self._guess_sequence_type(pathname) - LOGGER.info("summarizing results for %s" % (self.pathname)) + LOGGER.info("summarizing results for %s" % (pathname)) lines = 0 - f = open(self.pathname) + f = open(pathname) for l in f.xreadlines(): lines += 1 f.close() @@ -485,7 +581,8 @@ class SequenceLane(ResultLane): elif self.sequence_type == SequenceLane.FASTQ_TYPE: self._reads = lines / 4 else: - raise NotImplementedError("This only supports scarf or fastq squence files") + errmsg = "This only supports scarf or fastq squence files" + raise NotImplementedError(errmsg) def get_elements(self): lane = ElementTree.Element(SequenceLane.LANE, @@ -576,29 +673,28 @@ class ELAND(object): self.results[end][lane_id] = lane def check_for_eland_file(basedir, pattern, lane_id, end): - if end is None: - full_lane_id = lane_id - else: - full_lane_id = "%d_%d" % ( lane_id, end ) - - basename = pattern % (full_lane_id,) - LOGGER.debug("Eland pattern: %s" %(basename,)) - pathname = os.path.join(basedir, basename) - if os.path.exists(pathname): - LOGGER.info('found eland file in %s' % (pathname,)) - return pathname - else: - return None - -def update_result_with_eland(gerald, results, lane_id, end, pathname, genome_maps): + #if end is None: + # full_lane_id = lane_id + #else: + # full_lane_id = "%d_%d" % ( lane_id, end ) + eland_files = [] + eland_pattern = pattern % (lane_id, end) + eland_re = re.compile(eland_pattern) + LOGGER.debug("Eland pattern: %s" %(eland_pattern,)) + for filename in os.listdir(basedir): + if eland_re.match(filename): + LOGGER.info('found eland file %s' % (filename,)) + eland_files.append(os.path.join(basedir, filename)) + + return eland_files + +def update_result_with_eland(gerald, results, lane_id, end, pathnames, genome_maps): # yes the lane_id is also being computed in ElandLane._update # I didn't want to clutter up my constructor # but I needed to persist the sample_name/lane_id for # runfolder summary_report - path, name = os.path.split(pathname) - LOGGER.info("Adding eland file %s" %(name,)) - # split_name = name.split('_') - # lane_id = int(split_name[1]) + names = [ os.path.split(p)[1] for p in pathnames] + LOGGER.info("Adding eland files %s" %(",".join(names),)) genome_map = {} if genome_maps is not None: @@ -608,7 +704,7 @@ def update_result_with_eland(gerald, results, lane_id, end, pathname, genome_map if genome_dir is not None: genome_map = build_genome_fasta_map(genome_dir) - lane = ElandLane(pathname, lane_id, end, genome_map) + lane = ElandLane(pathnames, lane_id, end, genome_map) if end is None: effective_end = 0 @@ -645,35 +741,52 @@ def eland(gerald_dir, gerald=None, genome_maps=None): if os.path.isdir(basedir_temp): basedirs.append(basedir_temp) + # So how about scanning for Project*/Sample* directories as well + sample_pattern = os.path.join(gerald_dir, 'Project_*', 'Sample_*') + basedirs.extend(glob(sample_pattern)) # the order in patterns determines the preference for what # will be found. MAPPED_ELAND = 0 SEQUENCE = 1 - patterns = [('s_%s_eland_result.txt', MAPPED_ELAND), - ('s_%s_eland_result.txt.bz2', MAPPED_ELAND), - ('s_%s_eland_result.txt.gz', MAPPED_ELAND), - ('s_%s_eland_extended.txt', MAPPED_ELAND), - ('s_%s_eland_extended.txt.bz2', MAPPED_ELAND), - ('s_%s_eland_extended.txt.gz', MAPPED_ELAND), - ('s_%s_eland_multi.txt', MAPPED_ELAND), - ('s_%s_eland_multi.txt.bz2', MAPPED_ELAND), - ('s_%s_eland_multi.txt.gz', MAPPED_ELAND), - ('s_%s_export.txt', MAPPED_ELAND), - ('s_%s_export.txt.bz2', MAPPED_ELAND), - ('s_%s_export.txt.gz', MAPPED_ELAND), - ('s_%s_sequence.txt', SEQUENCE),] + patterns = [ + ('(?P[^_]+)_(?P(NoIndex|[AGCT])+)_L00%s(_R%s)_(?P[\d]+)_export.txt(?P(\.bz2|\.gz)?)', MAPPED_ELAND), + ('s_(?P%s)(_(?P%s))?_eland_result.txt(?P(\.bz2|\.gz)?)', + MAPPED_ELAND), + ('s_(?P%s)(_(?P%s))?_eland_extended.txt(?P(\.bz2|\.gz)?)', + MAPPED_ELAND), + ('s_(?P%s)(_(?P%s))?_eland_multi.txt(?P(\.bz2|\.gz)?)', + MAPPED_ELAND), + ('s_(?P%s)(_(?P%s))?_export.txt(?P(\.bz2|\.gz)?)', + MAPPED_ELAND), + ('s_(?P%s)(_(?P%s))?_sequence.txt(?P(\.bz2|\.gz)?)', + SEQUENCE), + + #('s_%s_eland_result.txt', MAPPED_ELAND), + #('s_%s_eland_result.txt.bz2', MAPPED_ELAND), + #('s_%s_eland_result.txt.gz', MAPPED_ELAND), + #('s_%s_eland_extended.txt', MAPPED_ELAND), + #('s_%s_eland_extended.txt.bz2', MAPPED_ELAND), + #('s_%s_eland_extended.txt.gz', MAPPED_ELAND), + #('s_%s_eland_multi.txt', MAPPED_ELAND), + #('s_%s_eland_multi.txt.bz2', MAPPED_ELAND), + #('s_%s_eland_multi.txt.gz', MAPPED_ELAND), + #('s_%s_export.txt', MAPPED_ELAND), + #('s_%s_export.txt.bz2', MAPPED_ELAND), + #('s_%s_export.txt.gz', MAPPED_ELAND), + #('s_%s_sequence.txt', SEQUENCE), + ] for basedir in basedirs: for end in ends: for lane_id in lane_ids: for p in patterns: - pathname = check_for_eland_file(basedir, p[0], lane_id, end) - if pathname is not None: + pathnames = check_for_eland_file(basedir, p[0], lane_id, end) + if len(pathnames) > 0: if p[1] == MAPPED_ELAND: - update_result_with_eland(gerald, e.results, lane_id, end, pathname, genome_maps) + update_result_with_eland(gerald, e.results, lane_id, end, pathnames, genome_maps) elif p[1] == SEQUENCE: - update_result_with_sequence(gerald, e.results, lane_id, end, pathname) + update_result_with_sequence(gerald, e.results, lane_id, end, pathnames) break else: LOGGER.debug("No eland file found in %s for lane %s and end %s" %(basedir, lane_id, end)) @@ -719,7 +832,7 @@ def main(cmdline=None): from optparse import OptionParser parser = OptionParser("%prog: +") opts, args = parser.parse_args(cmdline) - LOGGER.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.DEBUG) for a in args: LOGGER.info("Starting scan of %s" % (a,)) e = eland(a) diff --git a/htsworkflow/pipelines/runfolder.py b/htsworkflow/pipelines/runfolder.py index 0fce8f8..0013a86 100644 --- a/htsworkflow/pipelines/runfolder.py +++ b/htsworkflow/pipelines/runfolder.py @@ -506,29 +506,29 @@ def compress_eland_results(gerald_object, cycle_dir, num_jobs=1): for lanes_dictionary in gerald_object.eland_results.results: for eland_lane in lanes_dictionary.values(): - source_name = eland_lane.pathname - if source_name is None: - LOGGER.info( - "Lane ID %s does not have a filename." % (eland_lane.lane_id,)) - else: - path, name = os.path.split(source_name) - dest_name = os.path.join(cycle_dir, name) - LOGGER.info("Saving eland file %s to %s" % \ - (source_name, dest_name)) - - if is_compressed(name): - LOGGER.info('Already compressed, Saving to %s' % (dest_name,)) - shutil.copy(source_name, dest_name) - else: - # not compressed - dest_name += '.bz2' - args = ['bzip2', '-9', '-c', source_name, '>', dest_name ] - bz_commands.append(" ".join(args)) - #LOGGER.info('Running: %s' % ( " ".join(args) )) - #bzip_dest = open(dest_name, 'w') - #bzip = subprocess.Popen(args, stdout=bzip_dest) - #LOGGER.info('Saving to %s' % (dest_name, )) - #bzip.wait() + for source_name in eland_lane.pathnames: + if source_name is None: + LOGGER.info( + "Lane ID %s does not have a filename." % (eland_lane.lane_id,)) + else: + path, name = os.path.split(source_name) + dest_name = os.path.join(cycle_dir, name) + LOGGER.info("Saving eland file %s to %s" % \ + (source_name, dest_name)) + + if is_compressed(name): + LOGGER.info('Already compressed, Saving to %s' % (dest_name,)) + shutil.copy(source_name, dest_name) + else: + # not compressed + dest_name += '.bz2' + args = ['bzip2', '-9', '-c', source_name, '>', dest_name ] + bz_commands.append(" ".join(args)) + #LOGGER.info('Running: %s' % ( " ".join(args) )) + #bzip_dest = open(dest_name, 'w') + #bzip = subprocess.Popen(args, stdout=bzip_dest) + #LOGGER.info('Saving to %s' % (dest_name, )) + #bzip.wait() if len(bz_commands) > 0: q = QueueCommands(bz_commands, num_jobs) diff --git a/htsworkflow/pipelines/test/test_eland.py b/htsworkflow/pipelines/test/test_eland.py index 4ffb3e7..66e2ce8 100644 --- a/htsworkflow/pipelines/test/test_eland.py +++ b/htsworkflow/pipelines/test/test_eland.py @@ -4,14 +4,80 @@ from StringIO import StringIO import unittest -from htsworkflow.pipelines.eland import ElandLane +from htsworkflow.pipelines.eland import ElandLane, MatchCodes, MappedReads + +class MatchCodeTests(unittest.TestCase): + def test_initializer(self): + self.assertRaises(ValueError, MatchCodes, {'foo':'bar'}) + self.assertRaises(ValueError, MatchCodes, 3) + mc = MatchCodes(None) + + def test_dictlike(self): + mc = MatchCodes() + match_codes = {'NM':0, 'QC':0, 'RM':0, + 'U0':0, 'U1':0, 'U2':0, + 'R0':0, 'R1':0, 'R2':0, + } + self.assertEqual(mc.keys(), match_codes.keys()) + self.assertEqual(mc.items(), match_codes.items()) + self.assertEqual(mc.values(), match_codes.values()) + self.assertRaises(KeyError, mc.__getitem__, 'foo') + + def test_addition(self): + mc1 = MatchCodes() + mc2 = MatchCodes({'NM':5, 'QC':10, 'U0': 100}) + + mc1['NM'] += 5 + self.assertEqual(mc1['NM'], 5) + self.assertEqual(mc1['QC'], 0) + self.assertEqual(mc1['U0'], 0) + mc1 += mc2 + self.assertEqual(mc1['NM'], 10) + self.assertEqual(mc1['QC'], 10) + self.assertEqual(mc1['U0'], 100) + + +class TestMappedReads(unittest.TestCase): + def test_initializer(self): + mr1 = MappedReads() + self.assertEqual(len(mr1), 0) + mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 12}) + self.assertEqual(len(mr2), 2) + self.assertEqual(mr2['hg19'], 100) + + self.assertRaises(ValueError, MappedReads, 3) + + def test_dictionaryness(self): + mr1 = MappedReads() + mr1['chr9'] = 7 + self.assertEqual(list(mr1.keys()), ['chr9']) + self.assertEqual(mr1['chr9'], 7) + self.assertEqual(mr1.items(), [('chr9', 7)]) + del mr1['chr9'] + self.assertEqual(len(mr1), 0) + + def test_addition(self): + mr1 = MappedReads({'hg19': 100, 'Lambda1': 5}) + mr2 = MappedReads({'hg19': 100, 'newcontamUK.fa': 10}) + mr3 = mr1 + mr2 + + self.assertEqual(len(mr1), 2) + self.assertEqual(len(mr2), 2) + self.assertEqual(len(mr3), 3) + + self.assertEqual(mr1['Lambda1'], 5) + self.assertRaises(KeyError, mr1.__getitem__, 'newcontamUK.fa') + self.assertEqual(mr1.get('newcontamUK.fa', None), None) + + mr3['Lambda3'] = 2 + self.assertEqual(mr3['Lambda3'], 2) class ElandTests(unittest.TestCase): """Test specific Eland modules """ def compare_match_array(self, current, expected): for key in expected.keys(): - self.failUnlessEqual(current[key], expected[key], + self.assertEqual(current[key], expected[key], "Key %s: %s != %s" % (key,current[key],expected[key])) def test_eland_score_mapped_mismatches(self): @@ -21,102 +87,102 @@ class ElandTests(unittest.TestCase): 'R0':0, 'R1':0, 'R2':0, } r = eland._score_mapped_mismatches("QC", match_codes) - self.failUnlessEqual(r, ElandLane.SCORE_QC) - self.compare_match_array(match_codes, + self.assertEqual(r, ElandLane.SCORE_QC) + self.compare_match_array(match_codes, {'NM':0, 'QC':1, 'RM':0, 'U0':0, 'U1':0, 'U2':0, 'R0':0, 'R1':0, 'R2':0, }) r = eland._score_mapped_mismatches("NM", match_codes) - self.failUnlessEqual(r, ElandLane.SCORE_QC) - self.compare_match_array(match_codes, + self.assertEqual(r, ElandLane.SCORE_QC) + self.compare_match_array(match_codes, {'NM':1, 'QC':1, 'RM':0, 'U0':0, 'U1':0, 'U2':0, 'R0':0, 'R1':0, 'R2':0, }) r = eland._score_mapped_mismatches("1:0:0", match_codes) - self.failUnlessEqual(r, ElandLane.SCORE_READ) - self.compare_match_array(match_codes, + self.assertEqual(r, ElandLane.SCORE_READ) + self.compare_match_array(match_codes, {'NM':1, 'QC':1, 'RM':0, 'U0':1, 'U1':0, 'U2':0, 'R0':0, 'R1':0, 'R2':0, }) r = eland._score_mapped_mismatches("2:4:16", match_codes) - self.failUnlessEqual(r, ElandLane.SCORE_READ) - self.compare_match_array(match_codes, + self.assertEqual(r, ElandLane.SCORE_READ) + self.compare_match_array(match_codes, {'NM':1, 'QC':1, 'RM':0, 'U0':1, 'U1':0, 'U2':0, 'R0':2, 'R1':4, 'R2':16, }) r = eland._score_mapped_mismatches("1:1:1", match_codes) - self.failUnlessEqual(r, ElandLane.SCORE_READ) - self.compare_match_array(match_codes, + self.assertEqual(r, ElandLane.SCORE_READ) + self.compare_match_array(match_codes, {'NM':1, 'QC':1, 'RM':0, 'U0':2, 'U1':1, 'U2':1, 'R0':2, 'R1':4, 'R2':16, }) r = eland._score_mapped_mismatches("1:0:0", match_codes) - self.failUnlessEqual(r, ElandLane.SCORE_READ) - self.compare_match_array(match_codes, + self.assertEqual(r, ElandLane.SCORE_READ) + self.compare_match_array(match_codes, {'NM':1, 'QC':1, 'RM':0, 'U0':3, 'U1':1, 'U2':1, 'R0':2, 'R1':4, 'R2':16, }) r = eland._score_mapped_mismatches("0:0:1", match_codes) - self.failUnlessEqual(r, ElandLane.SCORE_READ) - self.compare_match_array(match_codes, + self.assertEqual(r, ElandLane.SCORE_READ) + self.compare_match_array(match_codes, {'NM':1, 'QC':1, 'RM':0, 'U0':3, 'U1':1, 'U2':2, 'R0':2, 'R1':4, 'R2':16, }) r = eland._score_mapped_mismatches("chr3.fa", match_codes) - self.failUnlessEqual(r, ElandLane.SCORE_UNRECOGNIZED) - self.compare_match_array(match_codes, + self.assertEqual(r, ElandLane.SCORE_UNRECOGNIZED) + self.compare_match_array(match_codes, {'NM':1, 'QC':1, 'RM':0, 'U0':3, 'U1':1, 'U2':2, 'R0':2, 'R1':4, 'R2':16, }) - + def test_count_mapped_export(self): eland = ElandLane() mapped_reads = {} r = eland._count_mapped_export(mapped_reads, "chr3.fa", "38") - self.failUnlessEqual(mapped_reads['chr3.fa'], 1) - self.failUnlessEqual(r, 'U0') + self.assertEqual(mapped_reads['chr3.fa'], 1) + self.assertEqual(r, 'U0') mapped_reads = {} r = eland._count_mapped_export(mapped_reads, "chr3.fa", "36A4") - self.failUnlessEqual(mapped_reads['chr3.fa'], 1) - self.failUnlessEqual(r, 'U1') + self.assertEqual(mapped_reads['chr3.fa'], 1) + self.assertEqual(r, 'U1') mapped_reads = {} r = eland._count_mapped_export(mapped_reads, "chr3.fa", "30A2T2") - self.failUnlessEqual(mapped_reads['chr3.fa'], 1) - self.failUnlessEqual(r, 'U2') + self.assertEqual(mapped_reads['chr3.fa'], 1) + self.assertEqual(r, 'U2') mapped_reads = {} r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26AG2T2") - self.failUnlessEqual(mapped_reads['chr3.fa'], 1) - self.failUnlessEqual(r, 'U2') + self.assertEqual(mapped_reads['chr3.fa'], 1) + self.assertEqual(r, 'U2') # deletion mapped_reads = {} r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^AG$4") - self.failUnlessEqual(mapped_reads['chr3.fa'], 1) - self.failUnlessEqual(r, 'U2') + self.assertEqual(mapped_reads['chr3.fa'], 1) + self.assertEqual(r, 'U2') # insertion mapped_reads = {} r = eland._count_mapped_export(mapped_reads, "chr3.fa", "26^2$4") - self.failUnlessEqual(mapped_reads['chr3.fa'], 1) - self.failUnlessEqual(r, 'U0') + self.assertEqual(mapped_reads['chr3.fa'], 1) + self.assertEqual(r, 'U0') def test_update_eland_export(self): """Test scoring the pipeline export file""" @@ -127,44 +193,41 @@ class ElandTests(unittest.TestCase): multi_read = StringIO("ILLUMINA-33A494 1 1 1 4405 1046 0 1 GTGGTTTCGCTGGATAGTNNGTAGGGACAGTGGGAATC ``````````__a__V^XBB^SW^^a_____a______ 9:2:1") match_codes, match_reads, reads = eland._update_eland_export(qc_read) - self.compare_match_array(match_codes, + self.compare_match_array(match_codes, {'NM':0, 'QC':1, 'RM':0, 'U0':0, 'U1':0, 'U2':0, 'R0':0, 'R1':0, 'R2':0, }) - self.failUnlessEqual(len(match_reads), 0) - self.failUnlessEqual(reads, 1) + self.assertEqual(len(match_reads), 0) + self.assertEqual(reads, 1) match_codes, match_reads, reads = eland._update_eland_export(one_read_exact) - self.compare_match_array(match_codes, + self.compare_match_array(match_codes, {'NM':0, 'QC':0, 'RM':0, 'U0':1, 'U1':0, 'U2':0, 'R0':0, 'R1':0, 'R2':0, }) - self.failUnlessEqual(match_reads['chrX.fa'], 1) - self.failUnlessEqual(reads, 1) + self.assertEqual(match_reads['chrX.fa'], 1) + self.assertEqual(reads, 1) match_codes, match_reads, reads = eland._update_eland_export(one_read_mismatch) - self.compare_match_array(match_codes, + self.compare_match_array(match_codes, {'NM':0, 'QC':0, 'RM':0, 'U0':0, 'U1':0, 'U2':1, 'R0':0, 'R1':0, 'R2':0, }) - self.failUnlessEqual(match_reads['chrX.fa'], 1) - self.failUnlessEqual(reads, 1) + self.assertEqual(match_reads['chrX.fa'], 1) + self.assertEqual(reads, 1) match_codes, match_reads, reads = eland._update_eland_export(multi_read) - self.compare_match_array(match_codes, + self.compare_match_array(match_codes, {'NM':0, 'QC':0, 'RM':0, 'U0':0, 'U1':0, 'U2':1, 'R0':9, 'R1':2, 'R2':0, }) - self.failUnlessEqual(len(match_reads), 0) - self.failUnlessEqual(reads, 1) - + self.assertEqual(len(match_reads), 0) + self.assertEqual(reads, 1) -def suite(): - return unittest.makeSuite(ElandTests, 'test') if __name__ == "__main__": - unittest.main(defaultTest="suite") + unittest.main() -- 2.30.2