import re
import stat
import sys
+import types
from htsworkflow.pipelines.runfolder import ElementTree, LANE_LIST
+from htsworkflow.pipelines.samplekey import SampleKey
from htsworkflow.util.ethelp import indent, flatten
from htsworkflow.util.opener import autoopen
XML_VERSION = 2
LANE = 'ResultLane'
- def __init__(self, pathnames=None, lane_id=None, end=None, xml=None):
+ def __init__(self, pathnames=None, sample=None, lane_id=None, end=None,
+ xml=None):
self.pathnames = pathnames
- self._sample_name = None
+ self.sample_name = sample
self.lane_id = lane_id
self.end = end
self._reads = None
"""
pass
- def _update_name(self):
- # extract the sample name
- if self.pathnames is None or len(self.pathnames) == 0:
- return
-
- sample_names = set()
- for pathname in self.pathnames:
- path, name = os.path.split(pathname)
- split_name = name.split('_')
- sample_names.add(split_name[0])
- if len(sample_names) > 1:
- errmsg = "Attempting to update from more than one sample %s"
- raise RuntimeError(errmsg % (",".join(sample_names)))
- self._sample_name = sample_names.pop()
- return self._sample_name
-
- def _get_sample_name(self):
- if self._sample_name is None:
- self._update_name()
- return self._sample_name
- sample_name = property(_get_sample_name)
-
def _get_reads(self):
if self._reads is None:
self._update()
def get_elements(self):
return None
+ def __repr__(self):
+ name = []
+
+ name.append('L%s' % (self.lane_id,))
+ name.append('R%s' % (self.end,))
+ name.append('S%s' % (self.sample_name,))
+
+ return '<ResultLane(' + ",".join(name) + ')>'
+
class ElandLane(ResultLane):
"""
Process an eland result file
SCORE_QC = 1
SCORE_READ = 2
- def __init__(self, pathnames=None, lane_id=None, end=None, genome_map=None, eland_type=None, xml=None):
- super(ElandLane, self).__init__(pathnames, lane_id, end)
+ def __init__(self, pathnames=None, sample=None, lane_id=None, end=None,
+ genome_map=None, eland_type=None, xml=None):
+ super(ElandLane, self).__init__(pathnames, sample, lane_id, end)
self._mapped_reads = None
self._match_codes = None
+ self._reads = None
if genome_map is None:
genome_map = {}
self.genome_map = genome_map
if xml is not None:
self.set_elements(xml)
+ def __repr__(self):
+ name = []
+
+ name.append('L%s' % (self.lane_id,))
+ name.append('R%s' % (self.end,))
+ name.append('S%s' % (self.sample_name,))
+
+ reads = str(self._reads) if self._reads is not None else 'Uncounted'
+ return '<ElandLane(' + ",".join(name) + ' = '+ reads + ')>'
+
def _guess_eland_type(self, pathname):
if self.eland_type is None:
# attempt autodetect eland file type
for element in tree:
tag = element.tag.lower()
if tag == SAMPLE_NAME.lower():
- self._sample_name = element.text
+ self.sample_name = element.text
elif tag == LANE_ID.lower():
self.lane_id = int(element.text)
elif tag == END.lower():
FASTQ_TYPE = 2
SEQUENCE_DESCRIPTION = { NONE_TYPE: 'None', SCARF_TYPE: 'SCARF', FASTQ_TYPE: 'FASTQ' }
- def __init__(self, pathname=None, lane_id=None, end=None, xml=None):
+ def __init__(self, pathnames=None, sample=None, lane_id=None, end=None,
+ xml=None):
self.sequence_type = None
- super(SequenceLane, self).__init__(pathname, lane_id, end, xml)
+ super(SequenceLane, self).__init__(pathnames, sample, lane_id, end, xml)
def _guess_sequence_type(self, pathname):
"""
for element in tree:
tag = element.tag.lower()
if tag == SAMPLE_NAME.lower():
- self._sample_name = element.text
+ self.sample_name = element.text
elif tag == LANE_ID.lower():
self.lane_id = int(element.text)
elif tag == END.lower():
else:
LOGGER.warn("SequenceLane unrecognized tag %s" % (element.tag,))
-class ELAND(object):
+class ELAND(collections.MutableMapping):
"""
Summarize information from eland files
"""
ELAND = 'ElandCollection'
LANE = 'Lane'
LANE_ID = 'id'
+ SAMPLE = 'sample'
END = 'end'
def __init__(self, xml=None):
# we need information from the gerald config.xml
- self.results = [{},{}]
+ self.results = collections.OrderedDict()
if xml is not None:
self.set_elements(xml)
- if len(self.results[0]) == 0:
- # Initialize our eland object with meaningless junk
- for l in LANE_LIST:
- self.results[0][l] = ResultLane(lane_id=l, end=0)
+ def __getitem__(self, key):
+ if not isinstance(key, SampleKey):
+ raise ValueError("Key must be a %s" % (str(type(SampleKey))))
+ return self.results[key]
+ def __setitem__(self, key, value):
+ if not isinstance(key, SampleKey):
+ raise ValueError("Key must be a %s" % (str(type(SampleKey))))
+ self.results[key] = value
+
+ def __delitem__(self, key):
+ del self.result[key]
+
+ def __iter__(self):
+ return self.results.iterkeys()
+
+ def __len__(self):
+ return len(self.results)
+
+ def find_keys(self, search):
+ """Return results that match key"""
+ if not isinstance(search, SampleKey):
+ raise ValueError("Key must be a %s" % (str(type(SampleKey))))
+ if not search.iswild:
+ yield self[search]
+ for key in self.keys():
+ if key.matches(search): yield key
def get_elements(self):
root = ElementTree.Element(ELAND.ELAND,
{'version': unicode(ELAND.XML_VERSION)})
- for end in range(len(self.results)):
- end_results = self.results[end]
- for lane_id, lane in end_results.items():
- eland_lane = lane.get_elements()
- if eland_lane is not None:
- eland_lane.attrib[ELAND.END] = unicode (end)
- eland_lane.attrib[ELAND.LANE_ID] = unicode(lane_id)
- root.append(eland_lane)
+
+ for key in self:
+ eland_lane = self[key].get_elements()
+ eland_lane.attrib[ELAND.END] = unicode(self[key].end-1)
+ eland_lane.attrib[ELAND.LANE_ID] = unicode(self[key].lane_id)
+ eland_lane.attrib[ELAND.SAMPLE] = unicode(self[key].sample_name)
+ root.append(eland_lane)
+ return root
return root
def set_elements(self, tree):
for element in list(tree):
lane_id = int(element.attrib[ELAND.LANE_ID])
end = int(element.attrib.get(ELAND.END, 0))
+ sample = element.attrib.get(ELAND.SAMPLE, 's')
if element.tag.lower() == ElandLane.LANE.lower():
lane = ElandLane(xml=element)
elif element.tag.lower() == SequenceLane.LANE.lower():
lane = SequenceLane(xml=element)
- self.results[end][lane_id] = lane
+ key = SampleKey(lane=lane_id, read=end+1, sample=sample)
+ self.results[key] = lane
-def check_for_eland_file(basedir, pattern, lane_id, end):
- eland_files = []
- eland_pattern = pattern % (lane_id, end)
- eland_re = re.compile(eland_pattern)
- #LOGGER.debug("Eland pattern: %s" %(eland_pattern,))
- for filename in os.listdir(basedir):
- if eland_re.match(filename):
- LOGGER.info('found eland file %s' % (filename,))
- eland_files.append(os.path.join(basedir, filename))
- return eland_files
+ def update_result_with_eland(self, gerald, key, pathnames,
+ genome_maps):
+ # yes the lane_id is also being computed in ElandLane._update
+ # I didn't want to clutter up my constructor
+ # but I needed to persist the sample_name/lane_id for
+ # runfolder summary_report
+ names = [ os.path.split(p)[1] for p in pathnames]
+ LOGGER.info("Adding eland files %s" %(",".join(names),))
-def update_result_with_eland(gerald, results, lane_id, end, pathnames, genome_maps):
- # yes the lane_id is also being computed in ElandLane._update
- # I didn't want to clutter up my constructor
- # but I needed to persist the sample_name/lane_id for
- # runfolder summary_report
- names = [ os.path.split(p)[1] for p in pathnames]
- LOGGER.info("Adding eland files %s" %(",".join(names),))
+ genome_map = {}
+ if genome_maps is not None:
+ genome_map = genome_maps[key.lane]
+ elif gerald is not None:
+ genome_dir = gerald.lanes[key.lane].eland_genome
+ if genome_dir is not None:
+ genome_map = build_genome_fasta_map(genome_dir)
- genome_map = {}
- if genome_maps is not None:
- genome_map = genome_maps[lane_id]
- elif gerald is not None:
- genome_dir = gerald.lanes[lane_id].eland_genome
- if genome_dir is not None:
- genome_map = build_genome_fasta_map(genome_dir)
+ lane = ElandLane(pathnames, key.sample, key.lane, key.read, genome_map)
- lane = ElandLane(pathnames, lane_id, end, genome_map)
+ self.results[key] = lane
- if end is None:
- effective_end = 0
- else:
- effective_end = end - 1
+ def update_result_with_sequence(self, gerald, key, pathnames,
+ genome_maps=None):
+ self.results[key] = SequenceLane(pathnames,
+ key.sample, key.lane, key.read)
- results[effective_end][lane_id] = lane
-def update_result_with_sequence(gerald, results, lane_id, end, pathname):
- result = SequenceLane(pathname, lane_id, end)
+def eland(gerald_dir, gerald=None, genome_maps=None):
+ e = ELAND()
+ eland_files = ElandMatches(e)
+ # collect
+ for path, dirnames, filenames in os.walk(gerald_dir):
+ for filename in filenames:
+ pathname = os.path.abspath(os.path.join(path, filename))
+ eland_files.add(pathname)
+ for key in eland_files:
+ eland_files.count(key, gerald, genome_maps)
+ return e
- if end is None:
- effective_end = 0
- else:
- effective_end = end - 1
- results[effective_end][lane_id] = result
+class ElandMatches(collections.MutableMapping):
+ def __init__(self, eland_container):
+ # the order in patterns determines the preference for what
+ # will be found.
+ self.eland_container = eland_container
+ MAPPED = eland_container.update_result_with_eland
+ SEQUENCE = eland_container.update_result_with_sequence
+
+ sample = '(?P<sample>[^_]+)'
+ hiIndex = '_(?P<index>(NoIndex|[AGCT])+)'
+ hiLane = '_L(?P<lane>[\d]+)'
+ gaLane = '_(?P<lane>[\d]+)'
+ hiRead = '_R(?P<read>[\d]+)'
+ gaRead = '(_(?P<read>[\d])+)?'
+ part = '_(?P<part>[\d]+)'
+ ext = '(?P<extention>(\.bz2|\.gz)?)'
+
+ hiPrefix = sample + hiIndex + hiLane + hiRead + part
+ gaPrefix = sample + gaLane + gaRead
+ P = collections.namedtuple('Patterns', 'pattern counter priority')
+ self.patterns = [
+ P(hiPrefix +'_export.txt' + ext, MAPPED, 6),
+ P(gaPrefix + '_eland_result.txt' + ext, MAPPED, 5),
+ P(gaPrefix + '_eland_extended.txt' + ext, MAPPED, 4),
+ P(gaPrefix + '_eland_multi.txt' + ext, MAPPED, 3),
+ P(gaPrefix + '_export.txt' + ext, MAPPED, 2),
+ P(gaPrefix + '_sequence.txt' + ext, SEQUENCE, 1),
+ ]
+ self.file_sets = {}
+ self.file_priority = {}
+ self.file_counter = {}
+
+ def add(self, pathname):
+ """Add pathname to our set of files
+ """
+ path, filename = os.path.split(pathname)
+
+ for pattern, counter, priority in self.patterns:
+ rematch = re.match(pattern, filename)
+ if rematch is not None:
+ m = ElandMatch(pathname, counter, **rematch.groupdict())
+ key = m.make_samplekey()
+ old_priority = self.file_priority.get(key, 0)
+ if priority > old_priority:
+ self.file_sets[key] = set((m,))
+ self.file_counter[key] = counter
+ self.file_priority[key] = priority
+ elif priority == old_priority:
+ self.file_sets[key].add(m)
+
+ def count(self, key, gerald=None, genome_maps=None):
+ #previous sig: gerald, e.results, lane_id, end, pathnames, genome_maps
+ counter = self.file_counter[key]
+ file_set = self.file_sets[key]
+ filenames = [ f.filename for f in file_set ]
+ return counter(gerald, key,
+ filenames, genome_maps)
+
+ def __iter__(self):
+ return iter(self.file_sets)
+ def __len__(self):
+ return len(self.file_sets)
-def eland(gerald_dir, gerald=None, genome_maps=None):
- e = ELAND()
+ def __getitem__(self, key):
+ return self.file_sets[key]
- lane_ids = range(1,9)
- ends = [None, 1, 2]
-
- basedirs = [gerald_dir]
-
- # if there is a basedir/Temp change basedir to point to the temp
- # directory, as 1.1rc1 moves most of the files we've historically
- # cared about to that subdirectory.
- # we should look into what the official 'result' files are.
- # and 1.3 moves them back
- basedir_temp = os.path.join(gerald_dir, 'Temp')
- if os.path.isdir(basedir_temp):
- basedirs.append(basedir_temp)
-
- # So how about scanning for Project*/Sample* directories as well
- sample_pattern = os.path.join(gerald_dir, 'Project_*', 'Sample_*')
- basedirs.extend(glob(sample_pattern))
-
- # the order in patterns determines the preference for what
- # will be found.
- MAPPED_ELAND = 0
- SEQUENCE = 1
- patterns = [
- ('(?P<sampleId>[^_]+)_(?P<index>(NoIndex|[AGCT])+)_L00%s(_R%s)_(?P<part>[\d]+)_export.txt(?P<ext>(\.bz2|\.gz)?)', MAPPED_ELAND),
- ('s_(?P<lane>%s)(_(?P<end>%s))?_eland_result.txt(?P<ext>(\.bz2|\.gz)?)',
- MAPPED_ELAND),
- ('s_(?P<lane>%s)(_(?P<end>%s))?_eland_extended.txt(?P<ext>(\.bz2|\.gz)?)',
- MAPPED_ELAND),
- ('s_(?P<lane>%s)(_(?P<end>%s))?_eland_multi.txt(?P<ext>(\.bz2|\.gz)?)',
- MAPPED_ELAND),
- ('s_(?P<lane>%s)(_(?P<end>%s))?_export.txt(?P<ext>(\.bz2|\.gz)?)',
- MAPPED_ELAND),
- ('s_(?P<lane>%s)(_(?P<end>%s))?_sequence.txt(?P<ext>(\.bz2|\.gz)?)',
- SEQUENCE),
-
- #('s_%s_eland_result.txt', MAPPED_ELAND),
- #('s_%s_eland_result.txt.bz2', MAPPED_ELAND),
- #('s_%s_eland_result.txt.gz', MAPPED_ELAND),
- #('s_%s_eland_extended.txt', MAPPED_ELAND),
- #('s_%s_eland_extended.txt.bz2', MAPPED_ELAND),
- #('s_%s_eland_extended.txt.gz', MAPPED_ELAND),
- #('s_%s_eland_multi.txt', MAPPED_ELAND),
- #('s_%s_eland_multi.txt.bz2', MAPPED_ELAND),
- #('s_%s_eland_multi.txt.gz', MAPPED_ELAND),
- #('s_%s_export.txt', MAPPED_ELAND),
- #('s_%s_export.txt.bz2', MAPPED_ELAND),
- #('s_%s_export.txt.gz', MAPPED_ELAND),
- #('s_%s_sequence.txt', SEQUENCE),
- ]
-
- for basedir in basedirs:
- for end in ends:
- for lane_id in lane_ids:
- for p in patterns:
- pathnames = check_for_eland_file(basedir, p[0], lane_id, end)
- if len(pathnames) > 0:
- if p[1] == MAPPED_ELAND:
- update_result_with_eland(gerald, e.results, lane_id, end, pathnames, genome_maps)
- elif p[1] == SEQUENCE:
- update_result_with_sequence(gerald, e.results, lane_id, end, pathnames)
- break
- else:
- LOGGER.debug("No eland file found in %s for lane %s and end %s" %(basedir, lane_id, end))
- continue
+ def __setitem__(self, key, value):
+ if not isintance(value, set):
+ raise ValueError("Expected set for value")
+ self.file_sets[key] = value
- return e
+ def __delitem__(self, key):
+ del self.file_sets[key]
+
+class ElandMatch(object):
+ def __init__(self, pathname, counter,
+ lane=None, read=None, extension=None,
+ sample=None, index=None, part=None, **kwargs):
+ self.filename = pathname
+ self.counter = counter
+ self._lane = lane
+ self._read = read
+ self.extension = extension
+ self.sample = sample
+ self.index = index
+ self._part = part
+ LOGGER.info("Found %s: L%s R%s Samp%s" % (
+ self.filename, self._lane, self._read, self.sample))
+
+ def make_samplekey(self):
+ read = self._read if self._read is not None else 1
+ return SampleKey(lane=self.lane, read=read, sample=self.sample)
+
+ def _get_lane(self):
+ if self._lane is not None:
+ return int(self._lane)
+ return self._lane
+ lane = property(_get_lane)
+
+ def _get_read(self):
+ if self._read is not None:
+ return int(self._read)
+ return self._read
+ read = property(_get_read)
+
+ def _get_part(self):
+ if self._part is not None:
+ return int(self._part)
+ return self._part
+ part = property(_get_part)
+
+ def __repr__(self):
+ name = []
+ if self.sample is not None: name.append(self.sample)
+ if self._lane is not None: name.append('L%s' % (self.lane,))
+ if self._read is not None: name.append('R%s' % (self.read,))
+ if self._part is not None: name.append('P%s' % (self.part,))
+ return '<ElandMatch(' + "_".join(name) + ')>'
def build_genome_fasta_map(genome_dir):
# build fasta to fasta file map
for a in args:
LOGGER.info("Starting scan of %s" % (a,))
e = eland(a)
- print e.get_elements()
-
+ print ElementTree.tostring(e.get_elements())
return
+"""Provide access to information stored in the GERALD directory.
"""
-Provide access to information stored in the GERALD directory.
-"""
+import collections
from datetime import datetime, date
import logging
import os
return self.__get_attribute('USE_BASES1')
use_bases = property(_get_use_bases)
-class LaneSpecificRunParameters(object):
+class LaneSpecificRunParameters(collections.MutableMapping):
"""
Provide access to LaneSpecificRunParameters
"""
def __init__(self, gerald):
self._gerald = gerald
- self._lane = None
+ self._lanes = None
- def _initalize_lanes(self):
+ def _initialize_lanes(self):
"""
build dictionary of LaneParameters
"""
element)
def __iter__(self):
+ if self._lanes is None:
+ self._initialize_lanes()
return self._lanes.iterkeys()
+
def __getitem__(self, key):
- if self._lane is None:
- self._initalize_lanes()
+ if self._lanes is None:
+ self._initialize_lanes()
return self._lanes[key]
- def get(self, key, default):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.get(key, None)
- def keys(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.keys()
- def values(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.values()
- def items(self):
- if self._lane is None:
- self._initalize_lanes()
- return self._lanes.items()
+
+ def __setitem__(self, key, value):
+ self._lanes[key] = value
+
+ def __delitem__(self, key):
+ del self._lanes[key]
+
def __len__(self):
- if self._lane is None:
- self._initalize_lanes()
+ if self._lanes is None:
+ self._initialize_lanes()
return len(self._lanes)
# copy & bzip eland files
bz_commands = []
- for lanes_dictionary in gerald_object.eland_results.results:
- for eland_lane in lanes_dictionary.values():
- for source_name in eland_lane.pathnames:
- if source_name is None:
- LOGGER.info(
- "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
- else:
- path, name = os.path.split(source_name)
- dest_name = os.path.join(cycle_dir, name)
- LOGGER.info("Saving eland file %s to %s" % \
- (source_name, dest_name))
-
- if is_compressed(name):
- LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
- shutil.copy(source_name, dest_name)
- else:
- # not compressed
- dest_name += '.bz2'
- args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
- bz_commands.append(" ".join(args))
- #LOGGER.info('Running: %s' % ( " ".join(args) ))
- #bzip_dest = open(dest_name, 'w')
- #bzip = subprocess.Popen(args, stdout=bzip_dest)
- #LOGGER.info('Saving to %s' % (dest_name, ))
- #bzip.wait()
+ for key in gerald_object.eland_results:
+ eland_lane = gerald_object.eland_results[key]
+ for source_name in eland_lane.pathnames:
+ if source_name is None:
+ LOGGER.info(
+ "Lane ID %s does not have a filename." % (eland_lane.lane_id,))
+ else:
+ path, name = os.path.split(source_name)
+ dest_name = os.path.join(cycle_dir, name)
+ LOGGER.info("Saving eland file %s to %s" % \
+ (source_name, dest_name))
+
+ if is_compressed(name):
+ LOGGER.info('Already compressed, Saving to %s' % (dest_name,))
+ shutil.copy(source_name, dest_name)
+ else:
+ # not compressed
+ dest_name += '.bz2'
+ args = ['bzip2', '-9', '-c', source_name, '>', dest_name ]
+ bz_commands.append(" ".join(args))
+ #LOGGER.info('Running: %s' % ( " ".join(args) ))
+ #bzip_dest = open(dest_name, 'w')
+ #bzip = subprocess.Popen(args, stdout=bzip_dest)
+ #LOGGER.info('Saving to %s' % (dest_name, ))
+ #bzip.wait()
if len(bz_commands) > 0:
q = QueueCommands(bz_commands, num_jobs)
--- /dev/null
+class SampleKey(object):
+ """Identifier for a sample in a particular 'location' on a flowcell.
+ """
+ def __init__(self, lane=None, read=None, sample=None):
+ self.lane = int(lane) if lane is not None else None
+ self.read = int(read) if read is not None else None
+ self.sample = sample
+
+ def _iswild(self):
+ return self.lane is None or \
+ self.read is None or \
+ self.sample is None
+ iswild = property(_iswild)
+
+ def matches(self, other):
+ """Test non-None attributes
+ """
+ if not (self.lane is None or other.lane is None):
+ if self.lane != other.lane: return False
+ if not (self.read is None or other.read is None):
+ if self.read != other.read: return False
+ if not (self.sample is None or other.sample is None):
+ if self.sample != other.sample: return False
+ return True
+
+ def __eq__(self, other):
+ return (self.lane == other.lane) and \
+ (self.read == other.read) and \
+ (self.sample == other.sample)
+
+ def __ne__(self, other):
+ return (self.lane != other.lane) or \
+ (self.read != other.read) or \
+ (self.sample != other.sample)
+
+ def __lt__(self, other):
+ if self.lane < other.lane:
+ return True
+ elif self.lane > other.lane:
+ return False
+ elif self.sample < other.sample:
+ return True
+ elif self.sample > other.sample:
+ return False
+ elif self.read < other.read:
+ return True
+ elif self.read > other.read:
+ return False
+ else:
+ # equal
+ return False
+
+ def __le__(self, other):
+ if self == other: return True
+ else: return self < other
+
+ def __gt__(self, other):
+ return not self <= other
+
+ def __ge__(self, other):
+ return not self < other
+
+ def __hash__(self):
+ return hash((self.sample, self.lane, self.read))
+
+ def __repr__(self):
+ name = []
+
+ name.append('L%s' % (self.lane,))
+ name.append('R%s' % (self.read,))
+ name.append('S%s' % (self.sample,))
+
+ return '<SampleKey(' + ",".join(name) + ')>'
+
from StringIO import StringIO
import unittest
-from htsworkflow.pipelines.eland import ElandLane, MatchCodes, MappedReads
+from htsworkflow.pipelines.eland import ELAND, ElandLane, ElandMatches, \
+ SampleKey, MatchCodes, MappedReads
class MatchCodeTests(unittest.TestCase):
def test_initializer(self):
self.assertEqual(len(match_reads), 0)
self.assertEqual(reads, 1)
+class TestElandMatches(unittest.TestCase):
+ def test_eland_replacing(self):
+ key = SampleKey(1, 1, 's')
+ e = ELAND()
+ em = ElandMatches(e)
+ em.add('s_1_sequence.txt')
+ self.assertEqual(len(em), 1)
+ self.assertEqual(len(em[key]), 1)
+ filename = iter(em[key]).next().filename
+ self.assertEqual(filename, 's_1_sequence.txt')
+ self.assertEqual(em.keys(), [key])
+ em.add('s_1_eland_result.txt')
+ self.assertEqual(len(em), 1)
+ self.assertEqual(len(em[key]), 1)
+ filename = iter(em[key]).next().filename
+ self.assertEqual(filename, 's_1_eland_result.txt')
+ self.assertEqual(em.keys(), [key])
+
+ def test_parts(self):
+ key11111 = SampleKey(1, 1, '11111')
+ key11112 = SampleKey(1, 1, '11112')
+ e = ELAND()
+ em = ElandMatches(e)
+ em.add('11111_CCAATT_L001_R1_001_export.txt.gz')
+ em.add('11111_CCAATT_L001_R1_002_export.txt.gz')
+ em.add('11111_CCAATT_L001_R1_003_export.txt.gz')
+ em.add('11112_AAGGTT_L001_R1_001_export.txt.gz')
+ em.add('11112_AAGGTT_L001_R1_002_export.txt.gz')
+ self.assertEqual(len(em), 2)
+ self.assertEqual(len(em[key11111]), 3)
+ self.assertEqual(len(em[key11112]), 2)
if __name__ == "__main__":
unittest.main()
from simulate_runfolder import TESTDATA_DIR
from htsworkflow.pipelines.runfolder import load_pipeline_run_xml
+from htsworkflow.pipelines.eland import SampleKey
class testLoadRunXML(unittest.TestCase):
- def _check_run_xml(self, run_xml_name, results):
+ def _check_run_xml(self, run_xml_name, results, eland_results=8):
run_xml_path = os.path.join(TESTDATA_DIR, run_xml_name)
run = load_pipeline_run_xml(run_xml_path)
-
+
self.failUnlessEqual(run.image_analysis.start, results['cycle_start'])
self.failUnlessEqual(run.image_analysis.stop, results['cycle_stop'])
-
- eland_summary_by_lane = run.gerald.eland_results.results[0]
- self.failUnlessEqual(len(eland_summary_by_lane), 8)
+
+ query = SampleKey(read=1)
+ eland_summary_by_lane = run.gerald.eland_results.find_keys(query)
+ self.failUnlessEqual(len(list(eland_summary_by_lane)), eland_results)
runfolder_name = results['runfolder_name']
self.failUnlessEqual(run.runfolder_name, runfolder_name)
for (end, lane), lane_results in results['lane_results'].items():
for name, test_value in lane_results.items():
xml_value = getattr(run.gerald.summary[end][lane], name)
-
+
self.failUnlessEqual(xml_value, test_value,
"%s[%s][%s]: %s %s != %s" % (run_xml_name, end, lane, name, xml_value, test_value))
-
+
def testVersion0(self):
run_xml_name = 'run_FC12150_2007-09-27.xml'
results = {'runfolder_name': '070924_USI-EAS44_0022_FC12150',
}
}
}
- self._check_run_xml(run_xml_name, results)
+ self._check_run_xml(run_xml_name, results, eland_results=0)
def testVersion1(self):
# end, lane
}
}
- self._check_run_xml(run_xml_name, results)
+ self._check_run_xml(run_xml_name, results, eland_results=8)
def testVersion2(self):
run_xml_name = 'run_62DJMAAXX_2011-01-09.xml'
}
}
}
- self._check_run_xml(run_xml_name, results)
+ self._check_run_xml(run_xml_name, results, eland_results=8)
def suite():
return unittest.makeSuite(testLoadRunXML,'test')
g_eland = g.eland_results
g2_eland = g2.eland_results
- for lane in g_eland.results[0].keys():
- g_results = g_eland.results[0][lane]
- g2_results = g2_eland.results[0][lane]
+ for key in g_eland:
+ g_results = g_eland[key]
+ g2_results = g2_eland[key]
self.failUnlessEqual(g_results.reads,
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
5:dm3_map, 6:dm3_map, 7:dm3_map, 8:dm3_map }
eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
- for i in range(1,9):
- lane = eland.results[0][i]
+ for key in eland:
+ lane = eland[key]
self.failUnlessEqual(lane.reads, 4)
self.failUnlessEqual(lane.sample_name, "s")
- self.failUnlessEqual(lane.lane_id, i)
+ self.failUnlessEqual(lane.lane_id, key.lane)
self.failUnlessEqual(len(lane.mapped_reads), 3)
self.failUnlessEqual(lane.mapped_reads['Lambda.fa'], 1)
self.failUnlessEqual(lane.mapped_reads['dm3/chr2L.fa'], 1)
xml_str = ElementTree.tostring(xml)
e2 = gerald.ELAND(xml=xml)
- for i in range(1,9):
- l1 = eland.results[0][i]
- l2 = e2.results[0][i]
+ for key in eland:
+ l1 = eland[key]
+ l2 = e2[key]
self.failUnlessEqual(l1.reads, l2.reads)
self.failUnlessEqual(l1.sample_name, l2.sample_name)
self.failUnlessEqual(l1.lane_id, l2.lane_id)
g_eland = g.eland_results
g2_eland = g2.eland_results
- for lane in g_eland.results[0].keys():
- g_results = g_eland.results[0][lane]
- g2_results = g2_eland.results[0][lane]
+ for key in g_eland:
+ g_results = g_eland[key]
+ g2_results = g2_eland[key]
self.failUnlessEqual(g_results.reads,
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
5:dm3_map, 6:dm3_map, 7:dm3_map, 8:dm3_map }
eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
- for i in range(1,9):
- lane = eland.results[0][i]
+ for key in eland:
+ lane = eland[key]
self.failUnlessEqual(lane.reads, 4)
self.failUnlessEqual(lane.sample_name, "s")
- self.failUnlessEqual(lane.lane_id, i)
+ self.failUnlessEqual(lane.lane_id, key.lane)
self.failUnlessEqual(len(lane.mapped_reads), 3)
self.failUnlessEqual(lane.mapped_reads['Lambda.fa'], 1)
self.failUnlessEqual(lane.mapped_reads['dm3/chr2L.fa'], 1)
xml_str = ElementTree.tostring(xml)
e2 = gerald.ELAND(xml=xml)
- for i in range(1,9):
- l1 = eland.results[0][i]
- l2 = e2.results[0][i]
+ for key in eland:
+ l1 = eland[key]
+ l2 = e2[key]
self.failUnlessEqual(l1.reads, l2.reads)
self.failUnlessEqual(l1.sample_name, l2.sample_name)
self.failUnlessEqual(l1.lane_id, l2.lane_id)
g_eland = g.eland_results
g2_eland = g2.eland_results
- for lane in g_eland.results[0].keys():
- g_results = g_eland.results[0][lane]
- g2_results = g2_eland.results[0][lane]
+ for key in g_eland:
+ g_results = g_eland[key]
+ g2_results = g2_eland[key]
self.failUnlessEqual(g_results.reads,
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
- for i in range(1,9):
- lane = eland.results[0][i]
+ for key in eland:
+ lane = eland[key]
self.failUnlessEqual(lane.reads, 6)
self.failUnlessEqual(lane.sample_name, "s")
- self.failUnlessEqual(lane.lane_id, i)
+ self.failUnlessEqual(lane.lane_id, key.lane)
self.failUnlessEqual(len(lane.mapped_reads), 17)
self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
self.failUnlessEqual(lane.match_codes['U0'], 3)
xml_str = ElementTree.tostring(xml)
e2 = gerald.ELAND(xml=xml)
- for i in range(1,9):
- l1 = eland.results[0][i]
- l2 = e2.results[0][i]
+ for key in eland:
+ l1 = eland[key]
+ l2 = e2[key]
self.failUnlessEqual(l1.reads, l2.reads)
self.failUnlessEqual(l1.sample_name, l2.sample_name)
self.failUnlessEqual(l1.lane_id, l2.lane_id)
g_eland = g.eland_results
g2_eland = g2.eland_results
- for lane in g_eland.results[0].keys():
- g_results = g_eland.results[0][lane]
- g2_results = g2_eland.results[0][lane]
+ for key in g_eland:
+ g_results = g_eland[key]
+ g2_results = g2_eland[key]
self.failUnlessEqual(g_results.reads,
g2_results.reads)
if isinstance(g_results, eland.ElandLane):
# test fastq
for i in range(1,4):
- lane = eland_container.results[0][i]
+ key = eland.SampleKey(lane=i, read=1, sample='s')
+ lane = eland_container[key]
self.failUnlessEqual(lane.reads, 3)
self.failUnlessEqual(lane.sample_name, 's')
self.failUnlessEqual(lane.lane_id, i)
# I added sequence lanes to the last 2 lanes of this test case
for i in range(4,9):
- lane = eland_container.results[0][i]
+ key = eland.SampleKey(lane=i, read=1, sample='s')
+ lane = eland_container[key]
self.failUnlessEqual(lane.reads, 28)
self.failUnlessEqual(lane.sample_name, "s")
self.failUnlessEqual(lane.lane_id, i)
xml_str = ElementTree.tostring(xml)
e2 = gerald.ELAND(xml=xml)
- for i in range(1,9):
- l1 = eland_container.results[0][i]
- l2 = e2.results[0][i]
+ for key in eland_container:
+ l1 = eland_container[key]
+ l2 = e2[key]
self.failUnlessEqual(l1.reads, l2.reads)
self.failUnlessEqual(l1.sample_name, l2.sample_name)
self.failUnlessEqual(l1.lane_id, l2.lane_id)
g_eland = g.eland_results
g2_eland = g2.eland_results
- for lane in g_eland.results[0].keys():
- g_results = g_eland.results[0][lane]
- g2_results = g2_eland.results[0][lane]
+ for key in g_eland:
+ g_results = g_eland[key]
+ g2_results = g2_eland[key]
self.failUnlessEqual(g_results.reads,
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
- for i in range(1,9):
- lane = eland.results[0][i]
+ for key in eland:
+ lane = eland[key]
self.failUnlessEqual(lane.reads, 6)
self.failUnlessEqual(lane.sample_name, "s")
- self.failUnlessEqual(lane.lane_id, i)
+ self.failUnlessEqual(lane.lane_id, key.lane)
self.failUnlessEqual(len(lane.mapped_reads), 17)
self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
self.failUnlessEqual(lane.mapped_reads['spike.fa/sample1'], 1)
xml_str = ElementTree.tostring(xml)
e2 = gerald.ELAND(xml=xml)
- for i in range(1,9):
- l1 = eland.results[0][i]
- l2 = e2.results[0][i]
+ for key in eland:
+ l1 = eland[key]
+ l2 = e2[key]
self.failUnlessEqual(l1.reads, l2.reads)
self.failUnlessEqual(l1.sample_name, l2.sample_name)
self.failUnlessEqual(l1.lane_id, l2.lane_id)
g_eland = g.eland_results
g2_eland = g2.eland_results
- for lane in g_eland.results[0].keys():
- g_results = g_eland.results[0][lane]
- g2_results = g2_eland.results[0][lane]
+ for key in g_eland:
+ g_results = g_eland[key]
+ g2_results = g2_eland[key]
self.failUnlessEqual(g_results.reads,
g2_results.reads)
if isinstance(g_results, eland.ElandLane):
eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
# I added sequence lanes to the last 2 lanes of this test case
- for i in range(1,7):
- lane = eland_container.results[0][i]
- self.failUnlessEqual(lane.reads, 6)
- self.failUnlessEqual(lane.sample_name, "s")
- self.failUnlessEqual(lane.lane_id, i)
- self.failUnlessEqual(len(lane.mapped_reads), 17)
- self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
- self.failUnlessEqual(lane.match_codes['U0'], 3)
- self.failUnlessEqual(lane.match_codes['R0'], 2)
- self.failUnlessEqual(lane.match_codes['U1'], 1)
- self.failUnlessEqual(lane.match_codes['R1'], 9)
- self.failUnlessEqual(lane.match_codes['U2'], 0)
- self.failUnlessEqual(lane.match_codes['R2'], 12)
- self.failUnlessEqual(lane.match_codes['NM'], 1)
- self.failUnlessEqual(lane.match_codes['QC'], 0)
-
- # test scarf
- lane = eland_container.results[0][7]
- self.failUnlessEqual(lane.reads, 5)
- self.failUnlessEqual(lane.sample_name, 's')
- self.failUnlessEqual(lane.lane_id, 7)
- self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
-
- # test fastq
- lane = eland_container.results[0][8]
- self.failUnlessEqual(lane.reads, 3)
- self.failUnlessEqual(lane.sample_name, 's')
- self.failUnlessEqual(lane.lane_id, 8)
- self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
+ for key in eland_container:
+ lane = eland_container[key]
+ if key.lane in [1,2,3,4,5,6]:
+ self.failUnlessEqual(lane.reads, 6)
+ self.failUnlessEqual(lane.sample_name, "s")
+ self.failUnlessEqual(lane.lane_id, key.lane)
+ self.failUnlessEqual(len(lane.mapped_reads), 17)
+ self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
+ self.failUnlessEqual(lane.match_codes['U0'], 3)
+ self.failUnlessEqual(lane.match_codes['R0'], 2)
+ self.failUnlessEqual(lane.match_codes['U1'], 1)
+ self.failUnlessEqual(lane.match_codes['R1'], 9)
+ self.failUnlessEqual(lane.match_codes['U2'], 0)
+ self.failUnlessEqual(lane.match_codes['R2'], 12)
+ self.failUnlessEqual(lane.match_codes['NM'], 1)
+ self.failUnlessEqual(lane.match_codes['QC'], 0)
+ elif key.lane == 7:
+ self.failUnlessEqual(lane.reads, 5)
+ self.failUnlessEqual(lane.sample_name, 's')
+ self.failUnlessEqual(lane.lane_id, 7)
+ self.failUnlessEqual(lane.sequence_type,
+ eland.SequenceLane.SCARF_TYPE)
+ elif key.lane == 8:
+ self.failUnlessEqual(lane.reads, 3)
+ self.failUnlessEqual(lane.sample_name, 's')
+ self.failUnlessEqual(lane.lane_id, 8)
+ self.failUnlessEqual(lane.sequence_type,
+ eland.SequenceLane.FASTQ_TYPE)
xml = eland_container.get_elements()
# just make sure that element tree can serialize the tree
xml_str = ElementTree.tostring(xml)
e2 = gerald.ELAND(xml=xml)
- for i in range(1,9):
- l1 = eland_container.results[0][i]
- l2 = e2.results[0][i]
+ for key in eland_container:
+ l1 = eland_container[key]
+ l2 = e2[key]
self.failUnlessEqual(l1.reads, l2.reads)
self.failUnlessEqual(l1.sample_name, l2.sample_name)
self.failUnlessEqual(l1.lane_id, l2.lane_id)
from htsworkflow.pipelines import firecrest
from htsworkflow.pipelines import bustard
from htsworkflow.pipelines import gerald
+from htsworkflow.pipelines.eland import SampleKey
from htsworkflow.pipelines import runfolder
from htsworkflow.pipelines.runfolder import ElementTree
g_eland = g.eland_results
g2_eland = g2.eland_results
- for lane in g_eland.results[end].keys():
- g_results = g_eland.results[end][lane]
- g2_results = g_eland.results[end][lane]
+ for key in g_eland:
+ g_results = g_eland[key]
+ g2_results = g2_eland[key]
self.failUnlessEqual(g_results.reads,
g2_results.reads)
self.failUnlessEqual(len(g_results.mapped_reads),
eland = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
# check first end
- for i in range(1,9):
- lane = eland.results[0][i]
+ for key in eland.find_keys(SampleKey(read=1)):
+ lane = eland[key]
self.failUnlessEqual(lane.reads, 6)
self.failUnlessEqual(lane.sample_name, "s")
- self.failUnlessEqual(lane.lane_id, i)
+ self.failUnlessEqual(lane.lane_id, key.lane)
self.failUnlessEqual(len(lane.mapped_reads), 17)
self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
self.failUnlessEqual(lane.match_codes['U0'], 3)
self.failUnlessEqual(lane.match_codes['QC'], 0)
# check second end
- for i in range(1,9):
- lane = eland.results[1][i]
+ for key in eland.find_keys(SampleKey(read=2)):
+ lane = eland[key]
self.failUnlessEqual(lane.reads, 7)
self.failUnlessEqual(lane.sample_name, "s")
- self.failUnlessEqual(lane.lane_id, i)
+ self.failUnlessEqual(lane.lane_id, key.lane)
self.failUnlessEqual(len(lane.mapped_reads), 17)
self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
self.failUnlessEqual(lane.match_codes['U0'], 3)
xml_str = ElementTree.tostring(xml)
e2 = gerald.ELAND(xml=xml)
- for end in [0, 1]:
- for i in range(1,9):
- l1 = eland.results[end][i]
- l2 = e2.results[end][i]
- self.failUnlessEqual(l1.reads, l2.reads)
- self.failUnlessEqual(l1.sample_name, l2.sample_name)
- self.failUnlessEqual(l1.lane_id, l2.lane_id)
- self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
- self.failUnlessEqual(len(l1.mapped_reads), 17)
- for k in l1.mapped_reads.keys():
- self.failUnlessEqual(l1.mapped_reads[k],
- l2.mapped_reads[k])
-
- self.failUnlessEqual(len(l1.match_codes), 9)
- self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
- for k in l1.match_codes.keys():
- self.failUnlessEqual(l1.match_codes[k],
- l2.match_codes[k])
+ for key in eland:
+ l1 = eland[key]
+ l2 = e2[key]
+ self.failUnlessEqual(l1.reads, l2.reads)
+ self.failUnlessEqual(l1.sample_name, l2.sample_name)
+ self.failUnlessEqual(l1.lane_id, l2.lane_id)
+ self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
+ self.failUnlessEqual(len(l1.mapped_reads), 17)
+ for k in l1.mapped_reads.keys():
+ self.failUnlessEqual(l1.mapped_reads[k],
+ l2.mapped_reads[k])
+
+ self.failUnlessEqual(len(l1.match_codes), 9)
+ self.failUnlessEqual(len(l1.match_codes), len(l2.match_codes))
+ for k in l1.match_codes.keys():
+ self.failUnlessEqual(l1.match_codes[k],
+ l2.match_codes[k])
def test_runfolder(self):
runs = runfolder.get_runs(self.runfolder_dir)
import unittest
from htsworkflow.pipelines import eland
+from htsworkflow.pipelines.samplekey import SampleKey
from htsworkflow.pipelines import ipar
from htsworkflow.pipelines import bustard
from htsworkflow.pipelines import gerald
g_eland = g.eland_results
g2_eland = g2.eland_results
- for lane in g_eland.results[0].keys():
- g_results = g_eland.results[0][lane]
- g2_results = g2_eland.results[0][lane]
+ for key in g_eland:
+ g_results = g_eland[key]
+ g2_results = g2_eland[key]
self.failUnlessEqual(g_results.reads,
g2_results.reads)
if isinstance(g_results, eland.ElandLane):
long_name = 'hg18/chr%d.fa' % (i,)
hg_map[short_name] = long_name
- genome_maps = { 1:hg_map, 2:hg_map, 3:hg_map, 4:hg_map,
- 5:hg_map, 6:hg_map, 7:hg_map, 8:hg_map }
+ samples = set(('11111', '11112', '11113', '11114', '11115',
+ '11116', '11117', '11118', '11119', '11120'))
+ genome_maps = {}
+ for i in range(1,9):
+ genome_maps[i] = hg_map
+
eland_container = gerald.eland(self.gerald_dir, genome_maps=genome_maps)
- # I added sequence lanes to the last 2 lanes of this test case
- for i in range(1,7):
- lane = eland_container.results[0][i]
- self.failUnlessEqual(lane.reads, 6)
- self.failUnlessEqual(lane.sample_name, "s")
- self.failUnlessEqual(lane.lane_id, i)
- self.failUnlessEqual(len(lane.mapped_reads), 17)
- self.failUnlessEqual(lane.mapped_reads['hg18/chr5.fa'], 4)
- self.failUnlessEqual(lane.match_codes['U0'], 3)
- self.failUnlessEqual(lane.match_codes['R0'], 2)
- self.failUnlessEqual(lane.match_codes['U1'], 1)
- self.failUnlessEqual(lane.match_codes['R1'], 9)
- self.failUnlessEqual(lane.match_codes['U2'], 0)
- self.failUnlessEqual(lane.match_codes['R2'], 12)
- self.failUnlessEqual(lane.match_codes['NM'], 1)
- self.failUnlessEqual(lane.match_codes['QC'], 0)
-
- # test scarf
- lane = eland_container.results[0][7]
- self.failUnlessEqual(lane.reads, 5)
- self.failUnlessEqual(lane.sample_name, 's')
- self.failUnlessEqual(lane.lane_id, 7)
- self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.SCARF_TYPE)
-
- # test fastq
- lane = eland_container.results[0][8]
- self.failUnlessEqual(lane.reads, 3)
- self.failUnlessEqual(lane.sample_name, 's')
- self.failUnlessEqual(lane.lane_id, 8)
- self.failUnlessEqual(lane.sequence_type, eland.SequenceLane.FASTQ_TYPE)
+ for lane in eland_container.values():
+ # I added sequence lanes to the last 2 lanes of this test case
+ if lane.sample_name == '11113':
+ self.assertEqual(lane.reads, 24)
+ self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 6)
+ self.assertEqual(lane.match_codes['U0'], 6)
+ self.assertEqual(lane.match_codes['R0'], 18)
+ self.assertEqual(lane.match_codes['R1'], 24)
+ self.assertEqual(lane.match_codes['R2'], 18)
+ self.assertEqual(lane.match_codes['NM'], 12)
+ else:
+ self.assertEqual(lane.reads, 8)
+ self.assertEqual(lane.mapped_reads['hg18/chr9.fa'], 2)
+ self.assertEqual(lane.match_codes['U0'], 2)
+ self.assertEqual(lane.match_codes['R0'], 6)
+ self.assertEqual(lane.match_codes['R1'], 8)
+ self.assertEqual(lane.match_codes['R2'], 6)
+ self.assertEqual(lane.match_codes['NM'], 4)
+
+ self.assertIn(lane.sample_name, samples)
+ #self.assertEqual(lane.lane_id, 1)
+ self.assertEqual(len(lane.mapped_reads), 1)
+ self.assertEqual(lane.match_codes['U1'], 0)
+ self.assertEqual(lane.match_codes['U2'], 0)
+ self.assertEqual(lane.match_codes['QC'], 0)
xml = eland_container.get_elements()
# just make sure that element tree can serialize the tree
xml_str = ElementTree.tostring(xml)
e2 = gerald.ELAND(xml=xml)
- for i in range(1,9):
- l1 = eland_container.results[0][i]
- l2 = e2.results[0][i]
+ for key in eland_container.results:
+ l1 = eland_container.results[key]
+ l2 = e2.results[key]
self.failUnlessEqual(l1.reads, l2.reads)
self.failUnlessEqual(l1.sample_name, l2.sample_name)
self.failUnlessEqual(l1.lane_id, l2.lane_id)
if isinstance(l1, eland.ElandLane):
self.failUnlessEqual(len(l1.mapped_reads), len(l2.mapped_reads))
- self.failUnlessEqual(len(l1.mapped_reads), 17)
+ self.failUnlessEqual(len(l1.mapped_reads), 1)
for k in l1.mapped_reads.keys():
self.failUnlessEqual(l1.mapped_reads[k],
l2.mapped_reads[k])
--- /dev/null
+#!/usr/bin/env python
+"""More direct synthetic test cases for the eland output file processing
+"""
+from StringIO import StringIO
+import unittest
+
+from htsworkflow.pipelines.samplekey import SampleKey
+
+class TestSampleKey(unittest.TestCase):
+ def test_equality(self):
+ k1 = SampleKey(lane=1, read='1', sample='12345')
+ k2 = SampleKey(lane=1, read=1, sample='12345')
+ k3 = SampleKey(lane=1, read=2, sample='12345')
+
+ self.assertEqual(k1, k2)
+ self.assertEqual(hash(k1), hash(k2))
+ self.assertNotEqual(k1, k3)
+
+ self.assertLess(k1, k3)
+ self.assertLessEqual(k1, k2)
+
+ self.assertGreater(k3, k1)
+
+
+ def test_matching(self):
+ k1 = SampleKey(lane=1, read='1', sample='12345')
+ k2 = SampleKey(lane=1, read=1, sample='12345')
+ k3 = SampleKey(lane=1, read=2, sample='12345')
+
+ q1 = SampleKey()
+ q2 = SampleKey(read=1)
+ q3 = SampleKey(sample='12345')
+
+ self.assertTrue(k1.matches(q1))
+ self.assertTrue(k2.matches(q1))
+ self.assertTrue(k3.matches(q1))
+
+ self.assertTrue(k1.matches(q2))
+ self.assertTrue(k2.matches(q2))
+ self.assertFalse(k3.matches(q2))
+
+ self.assertTrue(k1.matches(q3))
+ self.assertTrue(k2.matches(q3))
+ self.assertTrue(k3.matches(q3))
+
+if __name__ == "__main__":
+ unittest.main()
</Lane>
</Lanes>
<Projects>
- <Project name="12383">
+ <Project name="11111">
<ANALYSIS>eland_extended</ANALYSIS>
<CHROM_NAME_SOURCE>fileName</CHROM_NAME_SOURCE>
<ELAND_GENOME>/g/hg18/chromosomes/</ELAND_GENOME>