r39083 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r39082‎ | r39083 | r39084 >
Date:20:17, 10 August 2008
Author:mark
Status:old
Tags:
Comment:
Rewrote most of the server adding/modification/removal handling code, fixing several bugs and halving it in size (What crack was I smoking when I wrote that!?). Also improved monitor reporting, implemented configurable depool thresholds, and prepared resolving of realserver hostnames internally (but this remains disabled for now).
Modified paths:
  • /trunk/pybal/pybal/ipvs.py (modified) (history)
  • /trunk/pybal/pybal/monitor.py (modified) (history)
  • /trunk/pybal/pybal/monitors/proxyfetch.py (modified) (history)
  • /trunk/pybal/pybal/pybal.py (modified) (history)

Diff [purge]

Index: trunk/pybal/pybal/ipvs.py
@@ -88,7 +88,7 @@
8989 server: Server
9090 """
9191
92 - return '-d ' + cls.subCommandService(service) + ' -r ' + server.host
 92+ return '-d ' + cls.subCommandService(service) + ' -r ' + (server.ip or server.host)
9393 commandRemoveServer = classmethod(commandRemoveServer)
9494
9595 def commandAddServer(cls, service, server):
@@ -100,7 +100,7 @@
101101 server: Server
102102 """
103103
104 - cmd = '-a ' + cls.subCommandService(service) + ' -r ' + server.host
 104+ cmd = '-a ' + cls.subCommandService(service) + ' -r ' + (server.ip or server.host)
105105
106106 # Include weight if specified
107107 if server.weight:
@@ -118,7 +118,7 @@
119119 server: Server
120120 """
121121
122 - cmd = '-e ' + cls.subCommandService(service) + ' -r ' + server.host
 122+ cmd = '-e ' + cls.subCommandService(service) + ' -r ' + (server.ip or server.host)
123123
124124 # Include weight if specified
125125 if server.weight:
@@ -141,7 +141,7 @@
142142 """Constructor"""
143143
144144 self.name = name
145 - self.servers = {}
 145+ self.servers = set()
146146
147147 if (protocol not in self.SVC_PROTOS
148148 or scheduler not in self.SVC_SCHEDULERS):
@@ -179,11 +179,6 @@
180180 # Remove a previous service and add the new one
181181 cmdList = [self.ipvsManager.commandRemoveService(self.service()),
182182 self.ipvsManager.commandAddService(self.service())]
183 -
184 - # Add realservers
185 - for server in self.servers.itervalues():
186 - cmdList.append(self.ipvsManager.commandAddServer(self.service(), server))
187 -
188183 self.ipvsManager.modifyState(cmdList)
189184
190185 def assignServers(self, newServers):
@@ -191,49 +186,24 @@
192187 Takes a (new) set of servers (as a host->Server dictionary) and updates
193188 the LVS state accordingly.
194189 """
195 -
196 - # Compute set of servers to delete and edit
197 - removeServers, editServers = [], []
198 - for hostname, server in self.servers.iteritems():
199 - if hostname not in newServers:
200 - removeServers.append(server)
201 - else:
202 - editServers.append(server)
203 -
204 - # Compute set of servers to add
205 - addServers = []
206 - for hostname, server in newServers.iteritems():
207 - if hostname not in self.servers:
208 - addServers.append(server)
209 -
210 - self.servers = dict(newServers) # shallow copy
211 - cmdList = []
212 -
213 - # Add new servers first
214 - for server in addServers:
215 - cmdList.append(self.ipvsManager.commandAddServer(self.service(), server))
216 - server.pooled = True
217 -
218 - # Edit existing servers
219 - for server in editServers:
220 - cmdList.append(self.ipvsManager.commandEditServer(self.service(), server))
221190
222 - # Remove servers
223 - for server in removeServers:
224 - cmdList.append(self.ipvsManager.commandRemoveServer(self.service(), server))
225 - server.pooled = False
 191+ cmdList = [self.ipvsManager.commandAddServer(self.service(), server) for server in newServers - self.servers] + \
 192+ [self.ipvsManager.commandEditServer(self.service(), server) for server in newServers & self.servers] + \
 193+ [self.ipvsManager.commandRemoveServer(self.service(), server) for server in self.servers - newServers]
226194
 195+ self.servers = newServers
227196 self.ipvsManager.modifyState(cmdList)
228197
229198 def addServer(self, server):
230199 """Adds (pools) a single Server to the LVS state"""
231200
232 - if server.host not in self.servers:
 201+ if server not in self.servers:
233202 cmdList = [self.ipvsManager.commandAddServer(self.service(), server)]
234203 else:
 204+ print "WARNING: possible bug; adding previously existing server to LVS"
235205 cmdList = [self.ipvsManager.commandEditServer(self.service(), server)]
236206
237 - self.servers[server.host] = server
 207+ self.servers.add(server)
238208
239209 self.ipvsManager.modifyState(cmdList)
240210 server.pooled = True
@@ -243,7 +213,12 @@
244214
245215 cmdList = [self.ipvsManager.commandRemoveServer(self.service(), server)]
246216
247 - del self.servers[server.host] # May raise KeyError
 217+ self.servers.remove(server) # May raise KeyError
248218
249219 server.pooled = False
250 - self.ipvsManager.modifyState(cmdList)
\ No newline at end of file
 220+ self.ipvsManager.modifyState(cmdList)
 221+
 222+ def getDepoolThreshold(self):
 223+ """Returns the threshold below which no more down servers will be depooled"""
 224+
 225+ return self.configuration.getfloat('depool-threshold', .5)
\ No newline at end of file
Index: trunk/pybal/pybal/monitors/proxyfetch.py
@@ -84,7 +84,7 @@
8585 def _fetchSuccessful(self, result):
8686 """Called when getProxyPage is finished successfully."""
8787
88 - print self.server.host + ': Fetch successful, %.2f s' % (seconds() - self.checkStartTime)
 88+ self.report('Fetch successful, %.3f s' % (seconds() - self.checkStartTime))
8989 self._resultUp()
9090
9191 return result
@@ -92,7 +92,7 @@
9393 def _fetchFailed(self, failure):
9494 """Called when getProxyPage finished with a failure."""
9595
96 - print self.server.host + ': Fetch failed, %.2f s' % (seconds() - self.checkStartTime)
 96+ self.report('Fetch failed, %.3f s' % (seconds() - self.checkStartTime))
9797
9898 self._resultDown(failure.getErrorMessage())
9999
Index: trunk/pybal/pybal/monitor.py
@@ -65,6 +65,13 @@
6666 if self.coordinator:
6767 self.coordinator.resultDown(self, reason)
6868
 69+ def report(self, text):
 70+ """
 71+ Common method for reporting/logging check results
 72+ """
 73+
 74+ print "[%s] %s (%s): %s" % (self.__name__, self.server.host, self.server.textStatus(), text)
 75+
6976 def _getConfigStringList(self, optionname):
7077 """
7178 Takes a (string) value, eval()s it and checks whether it
Index: trunk/pybal/pybal/pybal.py
@@ -34,15 +34,30 @@
3535 """Constructor"""
3636
3737 self.host = host
 38+ self.ip = None
3839 self.port = 80
3940
4041 self.monitors = []
4142
 43+ # A few invariants that SHOULD be maintained (but currently may not be):
 44+ # P0: pooled => enabled
 45+ # P1: up => pooled \/ !enabled
 46+ # P2: pooled => up \/ !canDepool
 47+
4248 self.weight = self.DEF_WEIGHT
4349 self.up = self.DEF_STATE
4450 self.pooled = self.up
45 - self.enabled = self.up
 51+ self.enabled = True
 52+ self.modified = None
4653
 54+ #self.resolveHostname()
 55+
 56+ def __eq__(self, other):
 57+ return instanceof(other, Server) and self.host == other.host
 58+
 59+ def __hash__(self):
 60+ return hash(self.host)
 61+
4762 def addMonitor(self, monitor):
4863 """Adds a monitor instance to the list"""
4964
@@ -60,29 +75,103 @@
6176
6277 for monitor in self.monitors:
6378 self.removeMonitor(monitor)
 79+
 80+ def resolveHostname(self):
 81+ """Attempts to resolve the server's hostname to an IP address for better reliability."""
 82+
 83+ if not self.ip:
 84+ from twisted.names import client
 85+ return client.lookupAddress(self.host).addCallback(self._hostnameResolved)
 86+ else:
 87+ from twisted.internet import defer
 88+ return defer.succeed(self.ip)
6489
65 - def merge(self, server):
66 - """Merges in configuration attributes of another instance"""
 90+ def _hostnameResolved(self, lookupResult):
 91+ try:
 92+ self.ip = lookupResult[0][0].payload.dottedQuad() # FIXME: IPv6
 93+ except:
 94+ pass
 95+
 96+ def destroy(self):
 97+ self.enabled = False
 98+ self.removeMonitors()
 99+
 100+ def createMonitoringInstances(self, coordinator, lvsservice):
 101+ """Creates and runs monitoring instances for this Server"""
67102
68 - for key, value in server.__dict__.iteritems():
69 - if (key, type(value)) in self.allowedConfigKeys:
70 - self.__dict__[key] = value
 103+ try:
 104+ monitorlist = eval(lvsservice.configuration['monitors'])
 105+ except KeyError:
 106+ print "LVS service", lvsservice.name, "does not have a 'monitors' configuration option set."
 107+ raise
 108+
 109+ if type(monitorlist) != list:
 110+ print "option 'monitors' in LVS service section", lvsservice.name, \
 111+ "is not a Python list."
 112+ else:
 113+ for monitorname in monitorlist:
 114+ try:
 115+ # FIXME: this is a hack?
 116+ monitormodule = getattr(sys.modules['pybal.monitors'], monitorname.lower())
 117+ monitorclass = getattr(monitormodule , monitorname + 'MonitoringProtocol' )
 118+ except AttributeError:
 119+ print "Monitor", monitorname, "does not exist."
 120+ else:
 121+ monitor = monitorclass(coordinator, self, lvsservice.configuration)
 122+ self.addMonitor(monitor)
 123+ monitor.run()
 124+
 125+ def calcStatus(self):
 126+ """AND quantification of monitor.up over all monitoring instances of a single Server"""
 127+
 128+ # Global status is up iff all monitors report up
 129+ return reduce(lambda b,monitor: b and monitor.up, self.monitors, self.monitors != [])
71130
 131+ def calcPartialStatus(self):
 132+ """OR quantification of monitor.up over all monitoring instances of a single Server"""
 133+
 134+ # Partial status is up iff one of the monitors reports up
 135+ return reduce(lambda b,monitor: b or monitor.up, self.monitors, self.monitors == [])
 136+
 137+ def textStatus(self):
 138+ return "%s/%s/%s" % (self.enabled and "enabled" or "disabled",
 139+ self.up and "up" or (self.calcPartialStatus() and "partially up" or "down"),
 140+ self.pooled and "pooled" or "not pooled")
 141+
 142+ def maintainState(self):
 143+ """Maintains a few invariants on configuration changes"""
 144+
 145+ # P0
 146+ if not self.enabled:
 147+ self.pooled = False
 148+ # P1
 149+ if not self.pooled and self.enabled:
 150+ self.up = False
 151+
 152+ def merge(self, configuration):
 153+ """Merges in configuration from a dictionary of (allowed) attributes"""
 154+
 155+ for key, value in configuration.iteritems():
 156+ if (key, type(value)) not in self.allowedConfigKeys:
 157+ del configuration[key]
 158+
 159+ # Overwrite configuration
 160+ self.__dict__.update(configuration)
 161+ self.maintainState()
 162+ self.modified = True # Indicate that this instance previously existed
 163+
 164+ @classmethod
72165 def buildServer(cls, configuration):
73166 """
74167 Factory method which builds a Server instance from a
75168 dictionary of (allowed) configuration attributes
76169 """
77170
78 - for key, value in configuration.iteritems():
79 - if (key, type(value)) not in cls.allowedConfigKeys:
80 - del configuration[key]
81 -
82171 server = cls(configuration['host']) # create a new instance...
83 - server.__dict__.update(configuration) # ...and override attributes
 172+ server.merge(configuration) # ...and override attributes
 173+ server.modified = False
84174
85175 return server
86 - buildServer = classmethod(buildServer)
87176
88177 class Coordinator:
89178 """
@@ -99,7 +188,7 @@
100189
101190 self.lvsservice = lvsservice
102191 self.servers = {}
103 - self.pooledDownServers = []
 192+ self.pooledDownServers = set()
104193 self.configHash = None
105194 self.serverConfigURL = configURL
106195
@@ -107,53 +196,16 @@
108197 from twisted.internet import task
109198 task.LoopingCall(self.loadServers).start(self.intvLoadServers)
110199
111 - def assignServers(self, servers):
 200+ def assignServers(self):
112201 """
113202 Takes a new set of servers (as a host->Server dict) and
114203 hands them over to LVSService
115204 """
116 -
117 - self.servers = servers
118205
119206 # Hand over enabled servers to LVSService
120207 self.lvsservice.assignServers(
121 - dict([(server.host, server) for server in servers.itervalues() if server.enabled]))
122 -
123 - def createMonitoringInstances(self, servers=None):
124 - """Creates and runs monitoring instances for a list of Servers"""
125 -
126 - # Use self.servers by default
127 - if servers is None:
128 - servers = self.servers.itervalues()
129 -
130 - for server in servers:
131 - if not server.enabled: continue
132 -
133 - try:
134 - monitorlist = eval(self.lvsservice.configuration['monitors'])
135 - except KeyError:
136 - print "LVS service", self.lvsservice.name, "does not have a 'monitors' configuration option set."
 208+ set([server for server in self.servers.itervalues() if server.pooled]))
137209
138 - if type(monitorlist) != list:
139 - print "option 'monitors' in LVS service section", self.lvsservice.name, \
140 - "is not a Python list."
141 - else:
142 - for monitorname in monitorlist:
143 - try:
144 - # FIXME: this is a hack?
145 - monitormodule = getattr(sys.modules['pybal.monitors'], monitorname.lower())
146 - monitorclass = getattr(monitormodule , monitorname + 'MonitoringProtocol' )
147 - server.addMonitor(monitorclass(self, server, self.lvsservice.configuration))
148 - except AttributeError:
149 - print "Monitor", monitorname, "does not exist."
150 -
151 - # Set initial status
152 - #server.up = self.calcStatus(server)
153 -
154 - # Run all instances
155 - for monitor in server.monitors:
156 - monitor.run()
157 -
158210 def resultDown(self, monitor, reason=None):
159211 """
160212 Accepts a 'down' notification status result from a single monitoring instance
@@ -162,11 +214,11 @@
163215
164216 server = monitor.server
165217
166 - print 'Monitoring instance', monitor.name(), 'reports server', server.host, 'down:', (reason or '(reason unknown)')
 218+ print "Monitoring instance %s reports servers %s (%s) down:" % (monitor.name(), server.host, server.textStatus()), (reason or '(reason unknown)')
167219
168220 if server.up:
169221 server.up = False
170 - self.depool(server)
 222+ if server.pooled: self.depool(server)
171223
172224 def resultUp(self, monitor):
173225 """
@@ -176,30 +228,21 @@
177229
178230 server = monitor.server
179231
180 - if not server.up and self.calcStatus(server):
 232+ if not server.up and server.calcStatus():
 233+ print "Server %s (%s) is up" % (server.host, server.textStatus())
181234 server.up = True
182 - self.repool(server)
183 -
184 - print 'Server', server.host, 'is up'
185 -
186 - def calcStatus(self, server):
187 - """AND quantification of monitor.up over all monitoring instances of a single Server"""
188 -
189 - # Global status is up iff all monitors report up
190 - return reduce(lambda b,monitor: b and monitor.up, server.monitors, server.monitors != [])
 235+ if server.enabled: self.repool(server)
191236
192237 def depool(self, server):
193238 """Depools a single Server, if possible"""
194239
195 - if not server.pooled: return
 240+ assert server.pooled
196241
197 - if self.canDepool(server):
 242+ if self.canDepool():
198243 self.lvsservice.removeServer(server)
199 - try: self.pooledDownServers.remove(server)
200 - except ValueError: pass
 244+ self.pooledDownServers.discard(server)
201245 else:
202 - if server not in self.pooledDownServers:
203 - self.pooledDownServers.append(server)
 246+ self.pooledDownServers.add(server)
204247 print 'Could not depool server', server.host, 'because of too many down!'
205248
206249 def repool(self, server):
@@ -208,28 +251,28 @@
209252 not be depooled then because of too many hosts down.
210253 """
211254
212 - if not server.pooled and server.enabled:
 255+ assert server.enabled
 256+
 257+ if not server.pooled:
213258 self.lvsservice.addServer(server)
 259+ else:
 260+ print "Leaving previously pooled but down server", server.host, "pooled"
214261
215262 # If it had been pooled in down state before, remove it from the list
216 - try: self.pooledDownServers.remove(server)
217 - except ValueError: pass
 263+ self.pooledDownServers.discard(server)
218264
219265 # See if we can depool any servers that could not be depooled before
220 - for server in self.pooledDownServers:
221 - if self.canDepool(server):
222 - self.depool(server)
223 - else: # then we can't depool any further servers either...
224 - break
 266+ while len(self.pooledDownServers) > 0 and self.canDepool():
 267+ self.depool(self.pooledDownServers.pop())
225268
226 - def canDepool(self, server):
 269+ def canDepool(self):
227270 """Returns a boolean denoting whether another server can be depooled"""
228271
229272 # Construct a list of servers that have status 'down'
230273 downServers = [server for server in self.servers.itervalues() if not server.up]
231274
232 - # Only allow depooling if less than half of the total amount of servers are down
233 - return len(downServers) <= len(self.servers) / 2
 275+ # The total amount of pooled servers may never drop below a configured threshold
 276+ return len(self.servers) - len(downServers) >= len(self.servers) * self.lvsservice.getDepoolThreshold()
234277
235278 def loadServers(self, configURL=None):
236279 """Periodic task to load a new server list/configuration file from a specified URL."""
@@ -267,7 +310,6 @@
268311 """Parses the server list and changes the state accordingly."""
269312
270313 delServers = self.servers.copy() # Shallow copy
271 - setupMonitoring = []
272314
273315 for line in lines:
274316 line = line.rstrip('\n').strip()
@@ -279,37 +321,23 @@
280322 if host in self.servers:
281323 # Existing server. merge
282324 server = delServers.pop(host)
283 - newServer = Server.buildServer(serverdict)
284 -
285 - print "Merging server %s, weight %d" % ( host, newServer.weight )
286 -
287 - # FIXME: Doesn't "enabled" mean "monitored, but not pooled"?
288 - if not newServer.enabled and server.enabled:
289 - server.removeMonitors()
290 - elif newServer.enabled and not server.enabled:
291 - setupMonitoring.append(newServer)
292 -
293 - # FIXME: BUG. .pooled and .up will remain in default state (up),
294 - # and monitoring instances may not be setup.
295 - server.merge(newServer)
 325+ server.merge(serverdict)
 326+ print "Merged %s server %s, weight %d" % (server.enabled and "enabled" or "disabled", host, server.weight)
296327 else:
297328 # New server
298329 server = Server.buildServer(serverdict)
299330 self.servers[host] = server
300 -
301 - print "New server %s, weight %d" % ( host, server.weight )
302 -
303 - setupMonitoring.append(server)
304 -
305 - self.createMonitoringInstances(setupMonitoring)
306 -
 331+ server.createMonitoringInstances(self, self.lvsservice)
 332+ print "New %s server %s, weight %d" % (server.enabled and "enabled" or "disabled", host, server.weight )
 333+
307334 # Remove old servers
308335 for host, server in delServers.iteritems():
309 - server.enabled = False
310 - server.removeMonitors()
 336+ print "Removing server %s (no longer found in new configuration)" % host
 337+ server.destroy()
311338 del self.servers[host]
312339
313 - self.assignServers(self.servers) # FIXME
 340+ # Assign the updated list of enabled servers to the LVSService instance
 341+ self.assignServers()
314342
315343 class BGPFailover:
316344 """Class for maintaining a BGP session to a router for IP address failover"""
@@ -334,7 +362,7 @@
335363 for prefix in BGPFailover.prefixes:
336364 attrSet = bgp.FrozenAttributeSet([bgp.OriginAttribute(),
337365 bgp.ASPathAttribute(asPath),
338 - bgp.NextHopAttribute(str(prefix))])
 366+ bgp.NextHopAttribute(bgp.NextHopAttribute.ANY)])
339367 advertisements.add(bgp.Advertisement(prefix, attrSet))
340368
341369 self.bgpPeering.setAdvertisements(advertisements)
@@ -388,6 +416,16 @@
389417 return False
390418 else:
391419 raise ValueError
 420+
 421+ def getfloat(self, key, default=None):
 422+ try:
 423+ return float(self[key])
 424+ except KeyError:
 425+ if default is not None:
 426+ return default
 427+ else:
 428+ raise
 429+ # do not intercept ValueError
392430
393431 def parseCommandLine(configuration):
394432 """

Status & tagging log