Move the code to scan the sequence file archive to its own module so
authorDiane Trout <diane@caltech.edu>
Thu, 10 Jun 2010 00:55:09 +0000 (00:55 +0000)
committerDiane Trout <diane@caltech.edu>
Thu, 10 Jun 2010 00:55:09 +0000 (00:55 +0000)
I can use it in scripts other than make-library-tree

htsworkflow/pipelines/sequences.py [new file with mode: 0644]
htsworkflow/pipelines/test/test_sequences.py [new file with mode: 0644]
scripts/make-library-tree

diff --git a/htsworkflow/pipelines/sequences.py b/htsworkflow/pipelines/sequences.py
new file mode 100644 (file)
index 0000000..cbdb433
--- /dev/null
@@ -0,0 +1,149 @@
+"""
+Utilities to work with the various eras of sequence archive files
+"""
+import logging
+import os
+import re
+
+eland_re = re.compile('s_(?P<lane>\d)(_(?P<read>\d))?_eland_')
+raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
+qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
+
+class SequenceFile(object):
+    """
+    Simple container class that holds the path to a sequence archive
+    and basic descriptive information.     
+    """
+    def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
+        self.filetype = filetype
+        self.path = path
+        self.flowcell = flowcell
+        self.lane = lane
+        self.read = read
+        self.pf = pf
+        self.cycle = cycle
+
+    def __hash__(self):
+        return hash(self.key())
+
+    def key(self):
+        return (self.flowcell, self.lane)
+
+    def unicode(self):
+        return unicode(self.path)
+
+    def __eq__(self, other):
+        """
+        Equality is defined if everything but the path matches
+        """
+        attributes = ['filetype','flowcell', 'lane', 'read', 'pf', 'cycle']
+        for a in attributes:
+            if getattr(self, a) != getattr(other, a):
+                return False
+
+        return True
+
+    def __repr__(self):
+        return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
+
+    def make_target_name(self, root):
+        """
+        Create target name for where we need to link this sequence too
+        """
+        path, basename = os.path.split(self.path)
+        # Because the names aren't unque we include the flowcel name
+        # because there were different eland files for different length
+        # analyses, we include the cycle length in the name.
+        if self.filetype == 'eland':
+            template = "%(flowcell)s_%(cycle)s_%(eland)s"
+            basename = template % { 'flowcell': self.flowcell,
+                                    'cycle': self.cycle,
+                                    'eland': basename }
+        # else:
+        # all the other file types have names that include flowcell/lane
+        # information and thus are unique so we don't have to do anything
+        return os.path.join(root, basename)
+        
+def parse_srf(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    flowcell = records[4]
+    lane = int(records[5][0])
+    fullpath = os.path.join(path, filename)
+    return SequenceFile('srf', fullpath, flowcell, lane)
+
+def parse_qseq(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    fullpath = os.path.join(path, filename)
+    flowcell = records[4]
+    lane = int(records[5][1])
+    read = int(records[6][1])
+    return SequenceFile('qseq', fullpath, flowcell, lane, read)
+
+def parse_fastq(path, filename):
+    basename, ext = os.path.splitext(filename)
+    records = basename.split('_')
+    fullpath = os.path.join(path, filename)
+    flowcell = records[4]
+    lane = int(records[5][1])
+    read = int(records[6][1])
+    if records[-1].startswith('pass'):
+        pf = True
+    elif records[-1].startswith('nopass'):
+        pf = False
+    else:
+        raise ValueError("Unrecognized fastq name")
+        
+    return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
+
+def parse_eland(path, filename, eland_match=None):
+    if eland_match is None:
+        eland_match = eland_re.match(filename)
+    fullpath = os.path.join(path, filename)
+    rest, cycle = os.path.split(path)
+    rest, flowcell = os.path.split(rest)
+    if eland_match.group('lane'):
+        lane = int(eland_match.group('lane'))
+    else:
+        lane = None
+    if eland_match.group('read'):
+        read = int(eland_match.group('read'))
+    else:
+        read = None
+    return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
+    
+def scan_for_sequences(dirs):
+    """
+    Scan through a list of directories for sequence like files
+    """
+    # be forgiving if someone just gives us a string
+    if type(dirs) != type([]):
+        dirs = [dirs]
+
+    sequences = []
+    for d in dirs:
+        logging.info("Scanning %s for sequences" % (d,))
+        for path, dirname, filenames in os.walk(d):
+            for f in filenames:
+                seq = None
+                # find sequence files
+                if raw_seq_re.match(f):
+                    if f.endswith('.md5'):
+                        continue
+                    elif f.endswith('.srf') or f.endswith('.srf.bz2'):
+                        seq = parse_srf(path, f)
+                    elif qseq_re.match(f):
+                        seq = parse_qseq(path, f)
+                    elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
+                        seq = parse_fastq(path, f)
+                eland_match = eland_re.match(f)
+                if eland_match:
+                    if f.endswith('.md5'):
+                        continue
+                    seq = parse_eland(path, f, eland_match)
+                if seq:
+                    sequences.append(seq)
+                    logging.debug("Found sequence at %s" % (f,))
+                    
+    return sequences
diff --git a/htsworkflow/pipelines/test/test_sequences.py b/htsworkflow/pipelines/test/test_sequences.py
new file mode 100644 (file)
index 0000000..765df31
--- /dev/null
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+import os
+import unittest
+
+from htsworkflow.pipelines import sequences
+
+class SequenceFileTests(unittest.TestCase):
+    """
+    Make sure the sequence archive class works
+    """
+    def test_srf(self):
+        path = '/root/42BW9AAXX/C1-38'
+        name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_4.srf'
+        pathname = os.path.join(path,name)
+        f = sequences.parse_srf(path, name)
+
+        self.failUnlessEqual(f.filetype, 'srf')
+        self.failUnlessEqual(f.path, pathname)
+        self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+        self.failUnlessEqual(f.lane, 4)
+        self.failUnlessEqual(f.read, None)
+        self.failUnlessEqual(f.pf, None)
+        self.failUnlessEqual(f.cycle, None)
+
+    def test_qseq(self):
+        path = '/root/42BW9AAXX/C1-36'
+        name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1.tar.bz2'
+        pathname = os.path.join(path,name)
+        f = sequences.parse_qseq(path, name)
+
+        self.failUnlessEqual(f.filetype, 'qseq')
+        self.failUnlessEqual(f.path, pathname)
+        self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+        self.failUnlessEqual(f.lane, 4)
+        self.failUnlessEqual(f.read, 1)
+        self.failUnlessEqual(f.pf, None)
+        self.failUnlessEqual(f.cycle, None)
+
+    def test_fastq(self):
+        path = '/root/42BW9AAXX/C1-38'
+        name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1_pass.fastq.bz2'
+        pathname = os.path.join(path,name)
+        f = sequences.parse_fastq(path, name)
+
+        self.failUnlessEqual(f.filetype, 'fastq')
+        self.failUnlessEqual(f.path, pathname)
+        self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+        self.failUnlessEqual(f.lane, 4)
+        self.failUnlessEqual(f.read, 1)
+        self.failUnlessEqual(f.pf, True)
+        self.failUnlessEqual(f.cycle, None)
+
+        name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r2_nopass.fastq.bz2'
+        pathname = os.path.join(path,name)
+        f = sequences.parse_fastq(path, name)
+
+        self.failUnlessEqual(f.filetype, 'fastq')
+        self.failUnlessEqual(f.path, pathname)
+        self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+        self.failUnlessEqual(f.lane, 4)
+        self.failUnlessEqual(f.read, 2)
+        self.failUnlessEqual(f.pf, False)
+        self.failUnlessEqual(f.cycle, None)
+
+    def test_eland(self):
+        path = '/root/42BW9AAXX/C1-38'
+        name = 's_4_eland_extended.txt.bz2'
+        pathname = os.path.join(path,name)
+        f = sequences.parse_eland(path, name)
+
+        self.failUnlessEqual(f.filetype, 'eland')
+        self.failUnlessEqual(f.path, pathname)
+        self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+        self.failUnlessEqual(f.lane, 4)
+        self.failUnlessEqual(f.read, None)
+        self.failUnlessEqual(f.pf, None)
+        self.failUnlessEqual(f.cycle, 'C1-38')
+
+        path = '/root/42BW9AAXX/C1-152'
+        name = 's_4_1_eland_extended.txt.bz2'
+        pathname = os.path.join(path,name)
+        f = sequences.parse_eland(path, name)
+
+        self.failUnlessEqual(f.filetype, 'eland')
+        self.failUnlessEqual(f.path, pathname)
+        self.failUnlessEqual(f.flowcell, '42BW9AAXX')
+        self.failUnlessEqual(f.lane, 4)
+        self.failUnlessEqual(f.read, 1)
+        self.failUnlessEqual(f.pf, None)
+        self.failUnlessEqual(f.cycle, 'C1-152')
+
+    def test_sequence_file_equality(self):
+        path = '/root/42BW9AAXX/C1-38'
+        name = 'woldlab_090622_HWI-EAS229_0120_42BW9AAXX_l4_r1.tar.bz2'
+
+        f1_qseq = sequences.parse_qseq(path, name)
+        f2_qseq = sequences.parse_qseq(path, name)
+
+        self.failUnlessEqual(f1_qseq, f2_qseq)
+
+def suite():
+    return unittest.makeSuite(SequenceFileTests,'test')
+
+if __name__ == "__main__":
+    unittest.main(defaultTest="suite")
index b6edb9311eddbd1680fab7bcd86c816ad11c94c9..2ccbec6632717902e3ff2de3513d918952b93cef 100644 (file)
@@ -6,121 +6,10 @@ import logging
 import os
 from optparse import OptionParser
 import stat
-import re
 import shelve
 
 from htsworkflow.util import api
-
-eland_re = re.compile('s_(?P<lane>\d)(?P<read>_\d)?_eland_')
-raw_seq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+')
-qseq_re = re.compile('woldlab_[0-9]{6}_[^_]+_[\d]+_[\dA-Z]+_l[\d]_r[\d].tar.bz2')
-
-class SequenceFile(object):
-    def __init__(self, filetype, path, flowcell, lane, read=None, pf=None, cycle=None):
-        self.filetype = filetype
-        self.path = path
-        self.flowcell = flowcell
-        self.lane = lane
-        self.read = read
-        self.pf = pf
-        self.cycle = cycle
-
-    def __hash__(self):
-        return hash(self.key())
-
-    def key(self):
-        return (self.flowcell, self.lane)
-
-    def unicode(self):
-        return unicode(self.path)
-
-    def __repr__(self):
-        return u"<%s %s %s %s>" % (self.filetype, self.flowcell, self.lane, self.path)
-
-    def make_target_name(self, root):
-        """
-        Create target name for where we need to link this sequence too
-        """
-        path, basename = os.path.split(self.path)
-        # Because the names aren't unque we include the flowcel name
-        # because there were different eland files for different length
-        # analyses, we include the cycle length in the name.
-        if self.filetype == 'eland':
-            template = "%(flowcell)s_%(cycle)s_%(eland)s"
-            basename = template % { 'flowcell': self.flowcell,
-                                    'cycle': self.cycle,
-                                    'eland': basename }
-        # else:
-        # all the other file types have names that include flowcell/lane
-        # information and thus are unique so we don't have to do anything
-        return os.path.join(root, basename)
-        
-def parse_srf(path, filename):
-    basename, ext = os.path.splitext(filename)
-    records = basename.split('_')
-    flowcell = records[4]
-    lane = int(records[5][0])
-    fullpath = os.path.join(path, filename)
-    return SequenceFile('srf', fullpath, flowcell, lane)
-
-def parse_qseq(path, filename):
-    basename, ext = os.path.splitext(filename)
-    records = basename.split('_')
-    fullpath = os.path.join(path, filename)
-    flowcell = records[4]
-    lane = int(records[5][1])
-    read = int(records[6][1])
-    return SequenceFile('qseq', fullpath, flowcell, lane, read)
-
-def parse_fastq(path, filename):
-    basename, ext = os.path.splitext(filename)
-    records = basename.split('_')
-    fullpath = os.path.join(path, filename)
-    flowcell = records[4]
-    lane = int(records[5][1])
-    read = int(records[6][1])
-    if records[-1] == 'pass':
-        pf = True
-    else:
-        pf = False
-    return SequenceFile('fastq', fullpath, flowcell, lane, read, pf=pf)
-
-def parse_eland(path, filename, eland_match):
-    fullpath = os.path.join(path, filename)
-    rest, cycle = os.path.split(path)
-    rest, flowcell = os.path.split(rest)
-    lane = eland_match.group('lane')
-    read = eland_match.group('read')
-    return SequenceFile('eland', fullpath, flowcell, lane, read, cycle=cycle)
-    
-def scan_for_sequences(dirs):
-    sequences = []
-    for d in dirs:
-        logging.info("Scanning %s for sequences" % (d,))
-        for path, dirname, filenames in os.walk(d):
-            for f in filenames:
-                seq = None
-                # find sequence files
-                if raw_seq_re.match(f):
-                    if f.endswith('.md5'):
-                        continue
-                    elif f.endswith('.srf') or f.endswith('.srf.bz2'):
-                        seq = parse_srf(path, f)
-                    elif qseq_re.match(f):
-                        seq = parse_qseq(path, f)
-                    elif f.endswith('fastq') or f.endswith('.fastq.bz2'):
-                        seq = parse_fastq(path, f)
-                eland_match = eland_re.match(f)
-                if eland_match:
-                    if f.endswith('.md5'):
-                        continue
-                    seq = parse_eland(path, f, eland_match)
-                if seq:
-                    sequences.append(seq)
-                    logging.debug("Found sequence at %s" % (f,))
-                    
-    return sequences
-
+from htsworkflow.pipelines.sequences import scan_for_sequences
 
 def build_flowcell_db(fcdb_filename, sequences, baseurl, apiid, apikey):
     """