[project @ rename python module to gaworkflow from uashelper]
authorDiane Trout <diane@caltech.edu>
Tue, 20 Nov 2007 01:04:19 +0000 (01:04 +0000)
committerDiane Trout <diane@caltech.edu>
Tue, 20 Nov 2007 01:04:19 +0000 (01:04 +0000)
gaworkflow/__init__.py [new file with mode: 0644]
gaworkflow/copier.py [new file with mode: 0644]
gaworkflow/spoolwatcher.py [new file with mode: 0644]
scripts/copier
scripts/spoolwatcher
uashelper/__init__.py [deleted file]
uashelper/copier.py [deleted file]
uashelper/spoolwatcher.py [deleted file]

diff --git a/gaworkflow/__init__.py b/gaworkflow/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/gaworkflow/copier.py b/gaworkflow/copier.py
new file mode 100644 (file)
index 0000000..2655320
--- /dev/null
@@ -0,0 +1,106 @@
+import copy
+import logging
+import logging.handlers
+import os
+import re
+import subprocess
+import sys
+import time
+import traceback
+
+from benderjab.bot import BenderFactory
+
+class rsync(object):
+  def __init__(self, pwfile):
+    self.pwfile = pwfile
+    self.cmd = ['/usr/bin/rsync', ]
+    self.cmd.append('--password-file=%s' % (pwfile))
+    self.source_base = 'rsync://sequencer@jumpgate.caltech.edu:8730/sequencer/'
+    self.dest_base = '/home/diane/gec/'
+    self.processes = {}
+    self.exit_code = None
+
+  def list(self):
+    """Get a directory listing"""
+    dirs_to_copy = []
+    args = copy.copy(self.cmd)
+    args.append(self.source_base)
+
+    logging.debug("Rsync cmd:" + " ".join(args))
+    short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
+    direntries = [ x.split() for x in short_process.stdout ]
+    for permissions, size, filedate, filetime, filename in direntries:
+      if permissions[0] == 'd':
+        # hey its a directory, the first step to being something we want to 
+        # copy
+        if re.match("[0-9]{6}", filename):
+          # it starts with something that looks like a 6 digit date
+          # aka good enough for me
+          dirs_to_copy.append(filename)
+    return dirs_to_copy
+
+  def create_copy_process(self, dirname):
+    args = copy.copy(self.cmd)
+    # we want to copy everything
+    args.append('-rlt') 
+    # from here
+    args.append(os.path.join(self.source_base, dirname))
+    # to here
+    args.append(self.dest_base)
+    logging.debug("Rsync cmd:" + " ".join(args))
+    return subprocess.Popen(args)
+  def copy(self):
+    """copy any interesting looking directories over
+    """
+    dirs_to_copy = self.list()
+    for d in dirs_to_copy:
+      process = self.processes.get(d, None)
+      if process is None:
+        # we don't have a process, so make one
+        logging.info("rsyncing %s" % (d))
+        self.processes[d] = self.create_copy_process(d)
+      else:
+        retcode = process.poll()
+        if retcode is not None:
+           # we finished
+           logging.info("finished rsyncing %s, exitcode %d" % (d, retcode))
+           del self.processes[d]
+
+class copier_bot_parser(object):
+  def __init__(self, ):
+    self.rsync = rsync('/home/diane/.sequencer')
+  
+  def __call__(self, msg, who):
+    try:
+      if re.match("start copy", msg):
+        logging.info("starting copy for %s" % (who.getStripped()))
+        self.rsync.copy()
+    except Exception, e:
+      errmsg = "Exception: " + str(e)
+      logging.error(errmsg)
+      logging.error(traceback.format_exc())
+      return errmsg
+
+def main(args=None):
+  if len(args) != 1:
+    print "need .benderjab config name"
+  configname = args[0]
+
+  logging.basicConfig(level=logging.INFO, 
+                      format='%(asctime)s %(levelname)s %(message)s')
+  log = logging.getLogger()
+  log.addHandler(logging.handlers.RotatingFileHandler(
+                   '/tmp/copier_bot.log', maxBytes=1000000, backupCount = 3)
+                )
+  bot = BenderFactory(configname)
+  bot.parser = copier_bot_parser()
+  bot.logon()
+  bot.eventLoop()
+  logging.shutdown()
+  return 0
+
+if __name__ == "__main__":
+  sys.exit(main(sys.argv[1:]))
+
diff --git a/gaworkflow/spoolwatcher.py b/gaworkflow/spoolwatcher.py
new file mode 100644 (file)
index 0000000..bba9fdb
--- /dev/null
@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+
+import os
+import re
+import sys
+import time
+
+# this uses pyinotify
+import pyinotify
+from pyinotify import EventsCodes
+
+from benderjab.xsend import send
+
+class Handler(pyinotify.ProcessEvent):
+    def __init__(self, watchmanager, runner_jid):
+      self.last_event_time = None
+      self.watchmanager = watchmanager
+      self.runner_jid = runner_jid
+
+    def process_IN_CREATE(self, event):
+      self.last_event_time = time.time()
+      msg = "Create: %s" %  os.path.join(event.path, event.name)
+      if event.name.lower() == "run.completed":
+        print "Run is done!"
+        try:
+          send(self.runner_jid, "a run finished, launch it, and swap the drive")
+        except IOError, e:
+          print "ERROR: couldn't send message"
+          print str(e)
+      print msg
+
+    def process_IN_DELETE(self, event):
+      print "Remove: %s" %  os.path.join(event.path, event.name)
+
+    def process_IN_UNMOUNT(self, event):
+      print "Unmounted ", str(event)
+
+class TreeWriteDoneNotifier:
+  """
+  Watch a directory and send a message when another process is done writing.
+
+  This monitors a directory tree using inotify (linux specific) and 
+  after some files having been written will send a message after <timeout>
+  seconds of no file writing.
+
+  (Basically when the solexa machine finishes dumping a round of data
+  this'll hopefully send out a message saying hey look theres data available
+  """
+
+  def __init__(self, watchdir, 
+                     copy_jid, runner_jid, 
+                     write_timeout=10, notify_timeout=5):
+     self.watchdir = watchdir
+     self.copy_jid = copy_jid
+     self.runner_jid = runner_jid
+     self.write_timeout = write_timeout
+     self.notify_timeout = int(notify_timeout * 1000)
+
+     self.wm = pyinotify.WatchManager()
+     self.handler = Handler(self.wm, self.runner_jid)
+     self.notifier = pyinotify.Notifier(self.wm, self.handler)
+     
+     self.add_watch(self.watchdir)
+
+  def add_watch(self, watchdir):
+     print "Watching:", watchdir
+     mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
+     # rec traverses the tree and adds all the directories that are there 
+     # at the start.
+     # auto_add will add in new directories as they are created
+     self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
+
+  def event_loop(self):
+    while True:  # loop forever
+      try:
+        # process the queue of events as explained above
+        self.notifier.process_events()
+        #check events waits timeout
+        if self.notifier.check_events(self.notify_timeout):
+          # read notified events and enqeue them
+          self.notifier.read_events()
+        # should we do something?
+       last_event_time = self.handler.last_event_time
+       if last_event_time is not None:
+         time_delta = time.time() - last_event_time
+         if time_delta > self.write_timeout:
+           self.copying_paused()
+           self.handler.last_event_time = None
+
+      except KeyboardInterrupt:
+        # destroy the inotify's instance on this interrupt (stop monitoring)
+        self.notifier.stop()
+        break
+
+  def copying_paused(self):
+    print "more than 10 seconds have elapsed"
+    send(self.copy_jid, "start copy")
+
index f0d780a4d4b35043b614cf60f5b42999468163f3..898f68521045a3caa107b6c403ddbc536d23d62e 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 import sys
-from uashelper.copier import main
+from gaworkflow.copier import main
 
 if __name__ == "__main__":
   sys.exit(main(sys.argv[1:]))
index 1e5d029aa2508b01e5d16df3c7114faeb7bfa101..96c452fe209b3313361345722f6b7ce29e5b2212 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-from uashelper.spoolwatcher import TreeWriteDoneNotifier
+from gaworkflow.spoolwatcher import TreeWriteDoneNotifier
 
 if __name__ == "__main__":
   copy_jid = "cellthumper@chaos.caltech.edu"
diff --git a/uashelper/__init__.py b/uashelper/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/uashelper/copier.py b/uashelper/copier.py
deleted file mode 100644 (file)
index 2655320..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-import copy
-import logging
-import logging.handlers
-import os
-import re
-import subprocess
-import sys
-import time
-import traceback
-
-from benderjab.bot import BenderFactory
-
-class rsync(object):
-  def __init__(self, pwfile):
-    self.pwfile = pwfile
-    self.cmd = ['/usr/bin/rsync', ]
-    self.cmd.append('--password-file=%s' % (pwfile))
-    self.source_base = 'rsync://sequencer@jumpgate.caltech.edu:8730/sequencer/'
-    self.dest_base = '/home/diane/gec/'
-    self.processes = {}
-    self.exit_code = None
-
-  def list(self):
-    """Get a directory listing"""
-    dirs_to_copy = []
-    args = copy.copy(self.cmd)
-    args.append(self.source_base)
-
-    logging.debug("Rsync cmd:" + " ".join(args))
-    short_process = subprocess.Popen(args, stdout=subprocess.PIPE)
-    direntries = [ x.split() for x in short_process.stdout ]
-    for permissions, size, filedate, filetime, filename in direntries:
-      if permissions[0] == 'd':
-        # hey its a directory, the first step to being something we want to 
-        # copy
-        if re.match("[0-9]{6}", filename):
-          # it starts with something that looks like a 6 digit date
-          # aka good enough for me
-          dirs_to_copy.append(filename)
-    return dirs_to_copy
-
-  def create_copy_process(self, dirname):
-    args = copy.copy(self.cmd)
-    # we want to copy everything
-    args.append('-rlt') 
-    # from here
-    args.append(os.path.join(self.source_base, dirname))
-    # to here
-    args.append(self.dest_base)
-    logging.debug("Rsync cmd:" + " ".join(args))
-    return subprocess.Popen(args)
-  def copy(self):
-    """copy any interesting looking directories over
-    """
-    dirs_to_copy = self.list()
-    for d in dirs_to_copy:
-      process = self.processes.get(d, None)
-      if process is None:
-        # we don't have a process, so make one
-        logging.info("rsyncing %s" % (d))
-        self.processes[d] = self.create_copy_process(d)
-      else:
-        retcode = process.poll()
-        if retcode is not None:
-           # we finished
-           logging.info("finished rsyncing %s, exitcode %d" % (d, retcode))
-           del self.processes[d]
-
-class copier_bot_parser(object):
-  def __init__(self, ):
-    self.rsync = rsync('/home/diane/.sequencer')
-  
-  def __call__(self, msg, who):
-    try:
-      if re.match("start copy", msg):
-        logging.info("starting copy for %s" % (who.getStripped()))
-        self.rsync.copy()
-    except Exception, e:
-      errmsg = "Exception: " + str(e)
-      logging.error(errmsg)
-      logging.error(traceback.format_exc())
-      return errmsg
-
-def main(args=None):
-  if len(args) != 1:
-    print "need .benderjab config name"
-  configname = args[0]
-
-  logging.basicConfig(level=logging.INFO, 
-                      format='%(asctime)s %(levelname)s %(message)s')
-  log = logging.getLogger()
-  log.addHandler(logging.handlers.RotatingFileHandler(
-                   '/tmp/copier_bot.log', maxBytes=1000000, backupCount = 3)
-                )
-  bot = BenderFactory(configname)
-  bot.parser = copier_bot_parser()
-  bot.logon()
-  bot.eventLoop()
-  logging.shutdown()
-  return 0
-
-if __name__ == "__main__":
-  sys.exit(main(sys.argv[1:]))
-
diff --git a/uashelper/spoolwatcher.py b/uashelper/spoolwatcher.py
deleted file mode 100644 (file)
index bba9fdb..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import re
-import sys
-import time
-
-# this uses pyinotify
-import pyinotify
-from pyinotify import EventsCodes
-
-from benderjab.xsend import send
-
-class Handler(pyinotify.ProcessEvent):
-    def __init__(self, watchmanager, runner_jid):
-      self.last_event_time = None
-      self.watchmanager = watchmanager
-      self.runner_jid = runner_jid
-
-    def process_IN_CREATE(self, event):
-      self.last_event_time = time.time()
-      msg = "Create: %s" %  os.path.join(event.path, event.name)
-      if event.name.lower() == "run.completed":
-        print "Run is done!"
-        try:
-          send(self.runner_jid, "a run finished, launch it, and swap the drive")
-        except IOError, e:
-          print "ERROR: couldn't send message"
-          print str(e)
-      print msg
-
-    def process_IN_DELETE(self, event):
-      print "Remove: %s" %  os.path.join(event.path, event.name)
-
-    def process_IN_UNMOUNT(self, event):
-      print "Unmounted ", str(event)
-
-class TreeWriteDoneNotifier:
-  """
-  Watch a directory and send a message when another process is done writing.
-
-  This monitors a directory tree using inotify (linux specific) and 
-  after some files having been written will send a message after <timeout>
-  seconds of no file writing.
-
-  (Basically when the solexa machine finishes dumping a round of data
-  this'll hopefully send out a message saying hey look theres data available
-  """
-
-  def __init__(self, watchdir, 
-                     copy_jid, runner_jid, 
-                     write_timeout=10, notify_timeout=5):
-     self.watchdir = watchdir
-     self.copy_jid = copy_jid
-     self.runner_jid = runner_jid
-     self.write_timeout = write_timeout
-     self.notify_timeout = int(notify_timeout * 1000)
-
-     self.wm = pyinotify.WatchManager()
-     self.handler = Handler(self.wm, self.runner_jid)
-     self.notifier = pyinotify.Notifier(self.wm, self.handler)
-     
-     self.add_watch(self.watchdir)
-
-  def add_watch(self, watchdir):
-     print "Watching:", watchdir
-     mask = EventsCodes.IN_CREATE | EventsCodes.IN_UNMOUNT
-     # rec traverses the tree and adds all the directories that are there 
-     # at the start.
-     # auto_add will add in new directories as they are created
-     self.wdd = self.wm.add_watch(watchdir, mask, rec=True, auto_add=True)
-
-  def event_loop(self):
-    while True:  # loop forever
-      try:
-        # process the queue of events as explained above
-        self.notifier.process_events()
-        #check events waits timeout
-        if self.notifier.check_events(self.notify_timeout):
-          # read notified events and enqeue them
-          self.notifier.read_events()
-        # should we do something?
-       last_event_time = self.handler.last_event_time
-       if last_event_time is not None:
-         time_delta = time.time() - last_event_time
-         if time_delta > self.write_timeout:
-           self.copying_paused()
-           self.handler.last_event_time = None
-
-      except KeyboardInterrupt:
-        # destroy the inotify's instance on this interrupt (stop monitoring)
-        self.notifier.stop()
-        break
-
-  def copying_paused(self):
-    print "more than 10 seconds have elapsed"
-    send(self.copy_jid, "start copy")
-