r40109 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r40108‎ | r40109 | r40110 >
Date:21:27, 27 August 2008
Author:mark
Status:old
Tags:
Comment:
Implement "RunCommand" Monitor, which can run an arbitrary process and decide on server health based on the return code, or a timeout kill.
Modified paths:
  • /trunk/pybal/TODO (modified) (history)
  • /trunk/pybal/pybal/monitor.py (modified) (history)
  • /trunk/pybal/pybal/monitors/__init__.py (modified) (history)
  • /trunk/pybal/pybal/monitors/runcommand.py (added) (history)
  • /trunk/pybal/pybal/util.py (modified) (history)

Diff [purge]

Index: trunk/pybal/TODO
@@ -11,4 +11,5 @@
1212 * (Partial) lvsmon-like configuration from textfiles retrieved
1313 from the realservers, e.g. for computing the weight of a realserver
1414 * Reimplement IPVSManager using Netlink once the IPVS Netlink interface
15 - has entered the kernel
\ No newline at end of file
 15+ has entered the kernel
 16+* Revisit the whole config parsing wrt consistency and security
\ No newline at end of file
Index: trunk/pybal/pybal/util.py
@@ -44,7 +44,7 @@
4545 try:
4646 return int(self[key])
4747 except KeyError:
48 - if defaut is not None:
 48+ if default is not None:
4949 return default
5050 else:
5151 raise
Index: trunk/pybal/pybal/monitors/__init__.py
@@ -7,4 +7,4 @@
88 $Id$
99 """
1010
11 -__all__ = [ 'proxyfetch', 'idleconnection' ]
\ No newline at end of file
 11+__all__ = [ 'proxyfetch', 'idleconnection', 'runcommand' ]
\ No newline at end of file
Index: trunk/pybal/pybal/monitors/runcommand.py
@@ -0,0 +1,302 @@
 2+"""
 3+runcommand.py
 4+Copyright (C) 2008 by Mark Bergsma <mark@nedworks.org>
 5+
 6+Monitor class implementations for PyBal
 7+
 8+$Id$
 9+"""
 10+
 11+from pybal import monitor
 12+
 13+import os, sys, signal
 14+
 15+from twisted.internet import reactor, process, error
 16+
 17+class ProcessGroupProcess(process.Process):
 18+ """
 19+ Derivative of twisted.internet.process that supports Unix
 20+ process groups, sessions and timeouts
 21+ """
 22+
 23+ def __init__(self,
 24+ reactor, command, args, environment, path, proto,
 25+ uid=None, gid=None, childFDs=None,
 26+ sessionLeader=False, timeout=None):
 27+ """Spawn an operating-system process.
 28+
 29+ This is where the hard work of disconnecting all currently open
 30+ files / forking / executing the new process happens. (This is
 31+ executed automatically when a Process is instantiated.)
 32+
 33+ This will also run the subprocess as a given user ID and group ID, if
 34+ specified. (Implementation Note: this doesn't support all the arcane
 35+ nuances of setXXuid on UNIX: it will assume that either your effective
 36+ or real UID is 0.)
 37+ """
 38+ if not proto:
 39+ assert 'r' not in childFDs.values()
 40+ assert 'w' not in childFDs.values()
 41+ if not signal.getsignal(signal.SIGCHLD):
 42+ log.msg("spawnProcess called, but the SIGCHLD handler is not "
 43+ "installed. This probably means you have not yet "
 44+ "called reactor.run, or called "
 45+ "reactor.run(installSignalHandler=0). You will probably "
 46+ "never see this process finish, and it may become a "
 47+ "zombie process.")
 48+ # if you see this message during a unit test, look in
 49+ # test-standard.xhtml or twisted.test.test_process.SignalMixin
 50+ # for a workaround
 51+
 52+ self.lostProcess = False
 53+
 54+ settingUID = (uid is not None) or (gid is not None)
 55+ if settingUID:
 56+ curegid = os.getegid()
 57+ currgid = os.getgid()
 58+ cureuid = os.geteuid()
 59+ curruid = os.getuid()
 60+ if uid is None:
 61+ uid = cureuid
 62+ if gid is None:
 63+ gid = curegid
 64+ # prepare to change UID in subprocess
 65+ os.setuid(0)
 66+ os.setgid(0)
 67+
 68+ self.pipes = {}
 69+ # keys are childFDs, we can sense them closing
 70+ # values are ProcessReader/ProcessWriters
 71+
 72+ helpers = {}
 73+ # keys are childFDs
 74+ # values are parentFDs
 75+
 76+ if childFDs is None:
 77+ childFDs = {0: "w", # we write to the child's stdin
 78+ 1: "r", # we read from their stdout
 79+ 2: "r", # and we read from their stderr
 80+ }
 81+
 82+ debug = self.debug
 83+ if debug: print "childFDs", childFDs
 84+
 85+ # fdmap.keys() are filenos of pipes that are used by the child.
 86+ fdmap = {} # maps childFD to parentFD
 87+ for childFD, target in childFDs.items():
 88+ if debug: print "[%d]" % childFD, target
 89+ if target == "r":
 90+ # we need a pipe that the parent can read from
 91+ readFD, writeFD = os.pipe()
 92+ if debug: print "readFD=%d, writeFD%d" % (readFD, writeFD)
 93+ fdmap[childFD] = writeFD # child writes to this
 94+ helpers[childFD] = readFD # parent reads from this
 95+ elif target == "w":
 96+ # we need a pipe that the parent can write to
 97+ readFD, writeFD = os.pipe()
 98+ if debug: print "readFD=%d, writeFD=%d" % (readFD, writeFD)
 99+ fdmap[childFD] = readFD # child reads from this
 100+ helpers[childFD] = writeFD # parent writes to this
 101+ else:
 102+ assert type(target) == int, '%r should be an int' % (target,)
 103+ fdmap[childFD] = target # parent ignores this
 104+ if debug: print "fdmap", fdmap
 105+ if debug: print "helpers", helpers
 106+ # the child only cares about fdmap.values()
 107+
 108+ self.pid = os.fork()
 109+ if self.pid == 0: # pid is 0 in the child process
 110+
 111+ # do not put *ANY* code outside the try block. The child process
 112+ # must either exec or _exit. If it gets outside this block (due
 113+ # to an exception that is not handled here, but which might be
 114+ # handled higher up), there will be two copies of the parent
 115+ # running in parallel, doing all kinds of damage.
 116+
 117+ # After each change to this code, review it to make sure there
 118+ # are no exit paths.
 119+
 120+ try:
 121+ # stop debugging, if I am! I don't care anymore!
 122+ sys.settrace(None)
 123+ # close all parent-side pipes
 124+ self._setupChild(fdmap)
 125+ # Make a session/process group leader if requested
 126+ if sessionLeader: self._setupSession()
 127+ self._execChild(path, settingUID, uid, gid,
 128+ command, args, environment)
 129+ except:
 130+ # If there are errors, bail and try to write something
 131+ # descriptive to stderr.
 132+ # XXX: The parent's stderr isn't necessarily fd 2 anymore, or
 133+ # even still available
 134+ # XXXX: however even libc assumes write(2,err) is a useful
 135+ # thing to attempt
 136+ try:
 137+ stderr = os.fdopen(2,'w')
 138+ stderr.write("Upon execvpe %s %s in environment %s\n:" %
 139+ (command, str(args),
 140+ "id %s" % id(environment)))
 141+ traceback.print_exc(file=stderr)
 142+ stderr.flush()
 143+ for fd in range(3):
 144+ os.close(fd)
 145+ except:
 146+ pass # make *sure* the child terminates
 147+ # Did you read the comment about not adding code here?
 148+ os._exit(1)
 149+
 150+ # we are the parent
 151+
 152+ if settingUID:
 153+ os.setregid(currgid, curegid)
 154+ os.setreuid(curruid, cureuid)
 155+ self.status = -1 # this records the exit status of the child
 156+
 157+ if timeout:
 158+ self.timeoutCall = reactor.callLater(timeout, self._processTimeout)
 159+
 160+ self.proto = proto
 161+
 162+ # arrange for the parent-side pipes to be read and written
 163+ for childFD, parentFD in helpers.items():
 164+ os.close(fdmap[childFD])
 165+
 166+ if childFDs[childFD] == "r":
 167+ reader = process.ProcessReader(reactor, self, childFD, parentFD)
 168+ self.pipes[childFD] = reader
 169+
 170+ if childFDs[childFD] == "w":
 171+ writer = process.ProcessWriter(reactor, self, childFD, parentFD, forceReadHack=True)
 172+ self.pipes[childFD] = writer
 173+
 174+ try:
 175+ # the 'transport' is used for some compatibility methods
 176+ if self.proto is not None:
 177+ self.proto.makeConnection(self)
 178+ except:
 179+ log.err()
 180+ process.registerReapProcessHandler(self.pid, self)
 181+
 182+ def processEnded(self, status):
 183+ if self.timeoutCall:
 184+ try:
 185+ self.timeoutCall.cancel()
 186+ except:
 187+ pass
 188+
 189+ process.Process.processEnded(self, status)
 190+
 191+ def _setupSession(self):
 192+ os.setsid()
 193+
 194+ def _processTimeout(self):
 195+ """
 196+ Called when the timeout expires.
 197+ """
 198+
 199+ # Kill the process group
 200+ if not self.lostProcess:
 201+ os.kill(-self.pid, signal.SIGKILL)
 202+
 203+class RunCommandMonitoringProtocol(monitor.MonitoringProtocol):
 204+ """
 205+ Monitor that checks server uptime by repeatedly fetching a certain URL
 206+ """
 207+
 208+ __name__ = 'RunCommand'
 209+
 210+ INTV_CHECK = 60
 211+
 212+ TIMEOUT_RUN = 20
 213+
 214+ def __init__(self, coordinator, server, configuration={}):
 215+ """Constructor"""
 216+
 217+ # Call ancestor constructor
 218+ super(RunCommandMonitoringProtocol, self).__init__(coordinator, server, configuration)
 219+
 220+ self.intvCheck = self._getConfigInt('interval', self.INTV_CHECK)
 221+ self.timeout = self._getConfigInt('timeout', self.TIMEOUT_RUN)
 222+ self.command = self._getConfigString('command')
 223+ self.arguments = self._getConfigStringList('arguments')
 224+ self.logOutput = self._getConfigBool('log-output', True)
 225+
 226+ self.checkCall = None
 227+ self.runningProcess = None
 228+
 229+ # Install cleanup handler
 230+ reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
 231+
 232+ def run(self):
 233+ """Start the monitoring"""
 234+
 235+ super(RunCommandMonitoringProtocol, self).run()
 236+
 237+ if not self.checkCall or not self.checkCall.active():
 238+ self.checkCall = reactor.callLater(self.intvCheck, self.runCommand)
 239+
 240+ def stop(self):
 241+ """Stop all running and/or upcoming checks"""
 242+
 243+ super(RunCommandMonitoringProtocol, self).stop()
 244+
 245+ if self.checkCall and self.checkCall.active():
 246+ self.checkCall.cancel()
 247+
 248+ # FIXME: Kill any currently running check
 249+
 250+ def runCommand(self):
 251+ """Periodically called method that does a single uptime check."""
 252+
 253+ self.runningProcess = self._spawnProcess(self, self.command, [self.command] + self.arguments,
 254+ sessionLeader=True, timeout=(self.timeout or None))
 255+
 256+ def makeConnection(self, process):
 257+ pass
 258+
 259+ def childDataReceived(self, childFD, data):
 260+ if not self.logOutput: return
 261+
 262+ # Escape control chars
 263+ map = {'\n': r'\n',
 264+ '\r': r'\r',
 265+ '\t': r'\t'}
 266+ for char, subst in map.iteritems():
 267+ data = data.replace(char, subst)
 268+
 269+ self.report("Cmd stdout: " + data)
 270+
 271+ def childConnectionLost(self, childFD):
 272+ pass
 273+
 274+ def processEnded(self, reason):
 275+ """
 276+ Called when the process has either ended
 277+ """
 278+
 279+ if reason.check(error.ProcessDone):
 280+ self._resultUp()
 281+ elif reason.check(error.ProcessTerminated):
 282+ self._resultDown(reason.getErrorMessage())
 283+
 284+ # Schedule the next check
 285+ if self.active:
 286+ self.checkCall = reactor.callLater(self.intvCheck, self.runCommand)
 287+
 288+ reason.trap(error.ProcessDone, error.ProcessTerminated)
 289+
 290+ def _spawnProcess(self, processProtocol, executable, args=(),
 291+ env={}, path=None,
 292+ uid=None, gid=None, childFDs=None,
 293+ sessionLeader=False, timeout=None):
 294+ """
 295+ Replacement for posixbase.PosixReactorBase.spawnProcess with added
 296+ process group / session and timeout support, and support for
 297+ non-POSIX platforms and PTYs removed.
 298+ """
 299+
 300+ args, env = reactor._checkProcessArgs(args, env)
 301+ return ProcessGroupProcess(reactor, executable, args, env, path,
 302+ processProtocol, uid, gid, childFDs,
 303+ sessionLeader, timeout)
Index: trunk/pybal/pybal/monitor.py
@@ -72,6 +72,19 @@
7373
7474 print "[%s] %s (%s): %s" % (self.__name__, self.server.host, self.server.textStatus(), text)
7575
 76+ def _getConfigBool(self, optionname, default=None):
 77+ return self.configuration.getboolean('%s.%s' % (self.__name__.lower(), optionname), default)
 78+
 79+ def _getConfigInt(self, optionname, default=None):
 80+ return self.configuration.getint('%s.%s' % (self.__name__.lower(), optionname), default)
 81+
 82+ def _getConfigString(self, optionname):
 83+ val = self.configuration[self.__name__.lower() + '.' + optionname]
 84+ if type(val) == str:
 85+ return val
 86+ else:
 87+ raise ValueError, "Value of %s is not a string" % optionname
 88+
7689 def _getConfigStringList(self, optionname):
7790 """
7891 Takes a (string) value, eval()s it and checks whether it
@@ -86,5 +99,5 @@
87100 # Checked that each list member is a string
88101 return val
89102 else:
90 - raise ValueError, "Value '%s' is not a string or stringlist" % val
 103+ raise ValueError, "Value of %s is not a string or stringlist" % optionname
91104
\ No newline at end of file