def list(self):
"""Get a directory listing"""
- dirs_to_copy = []
args = copy.copy(self.cmd)
args.append(self.source_base)
logging.debug("Rsync cmd:" + " ".join(args))
short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
- direntries = [ x.split() for x in short_process.stdout ]
+ return self.list_parser(short_process.stdout)
+
+ def list_filter(self, lines):
+ """
+ parse rsync directory listing
+ """
+ dirs_to_copy = []
+ direntries = [ x[0:42].split() + [x[43:]] for x in lines ]
for permissions, size, filedate, filetime, filename in direntries:
if permissions[0] == 'd':
# hey its a directory, the first step to being something we want to
self.failUnlessEqual(len(c.notify_users), 1)
self.failUnlessEqual(c.notify_users[0], 'user3@example.fake')
+ def test_dirlist_filter(self):
+ """
+ test our dir listing parser
+ """
+ # everyone should have a root dir, and since we're not
+ # currently writing files... it should all be good
+ r = copier.rsync('/', '/', '/')
+
+ listing = [
+ '-rwxrw-r-- 123268 2007/12/29 17:39:31 2038EAAXX.rtf',
+ '-rwxrw-r-- 6 2007/12/29 15:10:29 New Text Document.txt',
+ ]
+
+ result = r.list_filter(listing)
+
def suite():
return unittest.makeSuite(testCopier,'test')