And now being split out I can reasonably test it.
This is needed so when we're reporting where the genome
locations mapped to we can summarize them as a genome
instead of chr1 chr2 chr3 etc.
from htsworkflow.pipelines.runfolder import ElementTree, LANE_LIST
from htsworkflow.pipelines.samplekey import SampleKey
+from htsworkflow.pipelines.genomemap import GenomeMap
from htsworkflow.util.ethelp import indent, flatten
from htsworkflow.util.opener import autoopen
self._mapped_reads = None
self._match_codes = None
self._reads = None
- if genome_map is None:
- genome_map = {}
- self.genome_map = genome_map
+ self.genome_map = GenomeMap(genome_map)
self.eland_type = None
if xml is not None:
gs_template.format(key.sample, key.lane)))
- genome_map = {}
+ genome_map = GenomeMap()
if genome_maps is not None:
- genome_map = genome_maps[key.lane]
+ genome_map = GenomeMap(genome_maps[key.lane])
elif len(genomesize) > 0:
- print "Found {0}".format(genomesize)
- genome_map = build_genome_size_map(genomesize[0])
+ LOGGER.info("Found {0}".format(genomesize))
+ genome_map.parse_genomesize(genomesize[0])
elif gerald is not None:
genome_dir = gerald.lanes[key].eland_genome
if genome_dir is not None:
- genome_map = build_genome_fasta_map(genome_dir)
+ genome_map.scan_genome_dir(genome_dir)
lane = ElandLane(pathnames, key.sample, key.lane, key.read, genome_map)
if self._part is not None: name.append('P%s' % (self.part,))
return '<ElandMatch(' + "_".join(name) + ')>'
-def build_genome_fasta_map(genome_dir):
- # build fasta to fasta file map
- LOGGER.info("Building genome map")
- genome = genome_dir.split(os.path.sep)[-1]
- fasta_map = {}
- for vld_file in glob(os.path.join(genome_dir, '*.vld')):
- is_link = False
- if os.path.islink(vld_file):
- is_link = True
- vld_file = os.path.realpath(vld_file)
- path, vld_name = os.path.split(vld_file)
- name, ext = os.path.splitext(vld_name)
- if is_link:
- fasta_map[name] = name
- else:
- fasta_map[name] = os.path.join(genome, name)
- return fasta_map
-
-def build_genome_size_map(pathname):
- """Guess what genome we're using"""
- sizes = {}
- tree = ElementTree.parse(pathname)
- for element in tree.getroot():
- name = element.attrib['contigName']
- bases = int(element.attrib['totalBases'])
- sizes[name] = bases
-
- # guess genome names
- if sizes.get('chr1', 0) == 197195432:
- genome = 'mm9'
- elif sizes.get('chr1', 0) == 247249719:
- genome = 'hg19'
- elif sizes.get('chrI', 0) == 230218:
- genome = 'sacCer3'
- elif len(sizes) == 1:
- genome = os.path.splitext(sizes.keys()[0])[0]
- else:
- raise RuntimeError("Unrecognized genome type, update detection")
-
- fasta_map = {}
- for k,v in sizes.items():
- fasta_map[k] = genome + '/' + k
-
- return fasta_map
def extract_eland_sequence(instream, outstream, start, end):
"""
--- /dev/null
+"""Convert chromosome names to genome/chromsome names
+"""
+from glob import glob
+import os
+import collections
+
+vldInfo = collections.namedtuple('vldInfo', 'name is_link')
+
+class GenomeMap(collections.MutableMapping):
+ def __init__(self, items=None):
+ self._contigs = {}
+ if items is not None:
+ self._contigs.update(items)
+
+ def __len__(self):
+ return len(self._contigs)
+
+ def __iter__(self):
+ return self._contigs.iterkeys()
+
+ def __getitem__(self, name):
+ return self._contigs[name]
+
+ def __setitem__(self, name, value):
+ self._contigs[name] = value
+
+ def __delitem__(self, name):
+ del self._contigs[name]
+
+ def scan_genome_dir(self, genome_dir):
+ """Build map from a genome directory"""
+ genome = genome_dir.split(os.path.sep)[-1]
+ vld_files = glob(os.path.join(genome_dir, '*.vld'))
+ vld = [ vldInfo(f, os.path.islink(f)) for f in vld_files ]
+ self.build_map_from_dir(genome, vld)
+
+ def build_map_from_dir(self, genome_name, vld_list):
+ """Initialize contig genome map from list of vldInfo tuples
+ """
+ # build fasta to fasta file map
+ for vld_file, is_link in vld_list:
+ vld_file = os.path.realpath(vld_file)
+ path, vld_name = os.path.split(vld_file)
+ name, ext = os.path.splitext(vld_name)
+ if is_link:
+ self._contigs[name] = name
+ else:
+ self._contigs[name] = genome_name + '/' + name
+
+ def parse_genomesize(self, pathname):
+ """Parse genomesize.xml file
+ """
+ tree = ElementTree.parse(pathname)
+ self.build_map_from_element(tree.getroot())
+
+ def build_map_from_element(self, root):
+ """Build genome map from a parsed HiSeq genomesizes.xml file
+ """
+ sizes = {}
+ filenames = {}
+ for element in root:
+ contig = element.attrib['contigName']
+ filename = element.attrib['fileName']
+ filenames[contig] = filename
+ bases = int(element.attrib['totalBases'])
+ sizes[contig] = bases
+
+ genome = guess_genome(sizes)
+
+ for contig, basese in sizes.items():
+ name = filenames[contig]
+ self._contigs[name] = genome + '/' + name
+
+def guess_genome(contig_sizes):
+ """Guess what genome we're using"""
+ # guess genome names
+
+ genomes = {'chr1': {197195432: 'mm9',
+ 247249719: 'hg19',
+ },
+ 'chrI': {230218, 'sacCer3'},
+ }
+
+ for key in genomes:
+ size = contig_sizes.get(key, 0)
+ if size in genomes[key]:
+ return genomes[key][size]
+
+ if len(contig_sizes) == 1:
+ return os.path.splitext(contig_sizes.keys()[0])[0]
+
+ raise RuntimeError("Unrecognized genome type, update detection code.")
--- /dev/null
+#!/usr/bin/env python
+"""More direct synthetic test cases for the eland output file processing
+"""
+import os
+from StringIO import StringIO
+import shutil
+import tempfile
+import unittest
+
+from htsworkflow.pipelines.runfolder import ElementTree
+from htsworkflow.pipelines import genomemap
+
+class TestGenomeMap(unittest.TestCase):
+ def test_genomesizes_xml(self):
+ xml = ElementTree.fromstring("""<sequenceSizes>
+ <chromosome fileName="chr2.fa" contigName="chr2" totalBases="181748087"/>
+ <chromosome fileName="chr1.fa" contigName="chr1" totalBases="197195432"/>
+ </sequenceSizes>
+""")
+ g = genomemap.GenomeMap()
+ g.build_map_from_element(xml)
+
+ self.assertTrue('chr1.fa' in g)
+ self.assertEqual(g['chr1.fa'], 'mm9/chr1.fa')
+
+ def test_simulated_genome_dir(self):
+ vlds = [genomemap.vldInfo('chr1.fa.vld', False),
+ genomemap.vldInfo('chr2.fa.vld', False),
+ genomemap.vldInfo('chr3.fa.vld', False),
+ genomemap.vldInfo('Lambda.fa.vld', True),]
+
+ g = genomemap.GenomeMap()
+ g.build_map_from_dir('mm9', vlds)
+
+ self.assertTrue('chr1.fa' in g)
+ self.assertEqual(len(g), 4)
+ self.assertEqual(g['chr1.fa'], 'mm9/chr1.fa')
+ self.assertEqual(g['Lambda.fa'], 'Lambda.fa')
+
+ def test_genome_dir(self):
+ g = genomemap.GenomeMap()
+ names = ['chr1', 'chr2', 'chr3']
+ tempdir = None
+ try:
+ tempdir = tempfile.mkdtemp(prefix='tmp_mm9')
+ for base in names:
+ name = os.path.join(tempdir, base + '.fa.vld')
+ stream = open(name, 'w')
+ stream.write(name)
+ stream.close()
+ g.scan_genome_dir(tempdir)
+ finally:
+ if tempdir is not None:
+ shutil.rmtree(tempdir)
+
+ temppath, tempgenome = os.path.split(tempdir)
+ self.assertTrue('chr1.fa' in g)
+ self.assertEqual(len(g), 3)
+ self.assertEqual(g['chr1.fa'], '{0}/chr1.fa'.format(tempgenome))
+if __name__ == "__main__":
+ unittest.main()