r69278 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r69277‎ | r69278 | r69279 >
Date:01:15, 12 July 2010
Author:tstarling
Status:deferred
Tags:
Comment:
* Fixed MySQL metrics, were completely untested and broken.
* Updated data format for Ganglia 3.1. Added human-readable descriptions and titles.
* Added configuration file.
* Split disk stats off into their own package.
* Added diskio_devices string metric.
Modified paths:
  • /trunk/ganglia_metrics/DiskStats.py (added) (history)
  • /trunk/ganglia_metrics/GangliaMetrics.py (modified) (history)
  • /trunk/ganglia_metrics/MySQLStats.py (modified) (history)
  • /trunk/ganglia_metrics/debian/changelog (modified) (history)
  • /trunk/ganglia_metrics/gmetricd.py (modified) (history)
  • /trunk/ganglia_metrics/setup.py (modified) (history)

Diff [purge]

Index: trunk/ganglia_metrics/DiskStats.py
@@ -0,0 +1,173 @@
 2+from GangliaMetrics import *
 3+import time, re, sys, logging
 4+
 5+"""
 6+Utilisation metric
 7+"""
 8+class DiskUtilItem(DeltaMetricItem):
 9+ def __init__(self, name, meta):
 10+ DeltaMetricItem.__init__(self, name, meta, '%')
 11+
 12+ def getValue(self):
 13+ # Get the time spent doing I/O, in milliseconds per second
 14+ value = DeltaMetricItem.getValue(self)
 15+ if self.lastElapsed and value != None:
 16+ # Convert to a percentage of the elapsed time
 17+ return value / 10
 18+ else:
 19+ return None
 20+
 21+"""
 22+Load metric
 23+"""
 24+class DiskLoadItem(DeltaMetricItem):
 25+ def __init__(self, name, meta):
 26+ DeltaMetricItem.__init__(self, name, meta)
 27+
 28+ def getValue(self):
 29+ # Get the time spent doing I/O, in milliseconds per second
 30+ value = DeltaMetricItem.getValue(self)
 31+ if self.lastElapsed and value != None:
 32+ # Convert to a plain ratio
 33+ return value / 1000
 34+ else:
 35+ return None
 36+
 37+"""
 38+Statistics from /proc/diskstats
 39+Requires Linux 2.6+
 40+"""
 41+class DiskStats(MetricCollection):
 42+ # Field indexes
 43+ BLANK_INITIAL_SPACE = 0
 44+ MAJOR = 1
 45+ MINOR = 2
 46+ NAME = 3
 47+ READS = 4
 48+ READS_MERGED = 5
 49+ SECTORS_READ = 6
 50+ MS_READING = 7
 51+ WRITES = 8
 52+ WRITES_MERGED = 9
 53+ SECTORS_WRITTEN = 10
 54+ MS_WRITING = 11
 55+ REQS_PENDING = 12
 56+ MS_TOTAL = 13
 57+ MS_WEIGHTED = 14
 58+
 59+ def __init__(self):
 60+ self.metrics = {
 61+ 'diskio_read_bytes': DeltaMetricItem(
 62+ 'diskio_read_bytes',
 63+ {
 64+ 'TITLE': 'Disk read bytes',
 65+ 'DESC': 'Bytes read from block devices',
 66+ 'GROUP': 'disk'
 67+ },
 68+ 'bytes/sec'),
 69+ 'diskio_write_bytes': DeltaMetricItem(
 70+ 'diskio_write_bytes',
 71+ {
 72+ 'TITLE': 'Disk write bytes',
 73+ 'DESC': 'Bytes written to block devices',
 74+ 'GROUP': 'disk'
 75+ },
 76+ 'bytes/sec'),
 77+ 'diskio_read_load': DiskLoadItem(
 78+ 'diskio_read_load',
 79+ {
 80+ 'TITLE': 'Disk read load',
 81+ 'DESC': 'Time spent reading divided by wall clock time, averaged over devices (/proc/diskstats field #4)',
 82+ 'GROUP': 'disk'
 83+ }),
 84+ 'diskio_write_load': DiskLoadItem(
 85+ 'diskio_write_load',
 86+ {
 87+ 'TITLE': 'Disk write load',
 88+ 'DESC': 'Time spent writing divided by wall clock time, averaged over devices (/proc/diskstats field #8)',
 89+ 'GROUP': 'disk'
 90+ }),
 91+ 'diskio_total_load': DiskLoadItem(
 92+ 'diskio_total_load',
 93+ {
 94+ 'TITLE': 'Disk total load',
 95+ 'DESC': 'Queue size multiplied by total I/O time, divided by wall clock time, averaged over devices (/proc/diskstats field #11)',
 96+ 'GROUP': 'disk'
 97+ }),
 98+ 'diskio_util': DiskUtilItem(
 99+ 'diskio_util',
 100+ {
 101+ 'TITLE': 'Disk utilisation',
 102+ 'DESC': 'Time spent in I/O divided by wall clock time, averaged over devices (/proc/diskstats field #10)',
 103+ 'GROUP': 'disk',
 104+ }),
 105+ 'diskio_devices': StringMetric(
 106+ 'diskio_devices',
 107+ {
 108+ 'TITLE': 'Disk devices monitored',
 109+ 'DESC': 'Devices monitored in diskio_* metrics',
 110+ 'GROUP': 'disk'
 111+ })
 112+ }
 113+ self.delimiterRegex = re.compile(r"\s+")
 114+ self.deviceRegex = re.compile(r"^[sh]d[a-z]$")
 115+ self.disabled = False
 116+
 117+ def update(self):
 118+ if self.disabled:
 119+ return False
 120+
 121+ try:
 122+ procfile = open('/proc/diskstats', 'r')
 123+ except IOError:
 124+ type, value = sys.exc_info()[:2]
 125+ logger = logging.getLogger('GangliaMetrics')
 126+ logger.warning("Unable to open /proc/diskstats: %s\n" % value)
 127+ self.disabled = True
 128+ return False
 129+
 130+ contents = procfile.read(100000)
 131+ refTime = time.time()
 132+ procfile.close()
 133+ lines = contents.splitlines()
 134+
 135+ devCount = 0
 136+ sums = None
 137+ devNames = ''
 138+ for line in lines:
 139+ fields = self.delimiterRegex.split(line)
 140+ if self.deviceRegex.search(fields[self.NAME]) == None or \
 141+ len(fields) < self.MS_WEIGHTED or \
 142+ fields[self.READS] == 0:
 143+ continue
 144+
 145+ if sums == None:
 146+ sums = [0] * len(fields)
 147+
 148+ # Sum the summable stats
 149+ for i in xrange(len(fields)):
 150+ if fields[i].isdigit():
 151+ sums[i] += long(fields[i])
 152+
 153+ devCount += 1
 154+ if devNames != '':
 155+ devNames += ', '
 156+ devNames += fields[self.NAME]
 157+
 158+ # Put the summed stats into metrics
 159+ if devCount:
 160+ # The sector size in this case is hard-coded in the kernel as 512 bytes
 161+ # There doesn't appear to be any simple way to retrieve that figure
 162+ self.metrics['diskio_read_bytes'].set(sums[self.SECTORS_READ] * 512, refTime)
 163+ self.metrics['diskio_write_bytes'].set(sums[self.SECTORS_WRITTEN] * 512, refTime)
 164+
 165+ self.metrics['diskio_read_load'].set(sums[self.MS_READING], refTime, devCount)
 166+ self.metrics['diskio_write_load'].set(sums[self.MS_WRITING], refTime, devCount)
 167+ self.metrics['diskio_total_load'].set(sums[self.MS_WEIGHTED], refTime, devCount)
 168+ self.metrics['diskio_util'].set(sums[self.MS_TOTAL], refTime, devCount)
 169+
 170+ self.metrics['diskio_devices'].set(devNames)
 171+
 172+ return devCount != 0
 173+
 174+
Property changes on: trunk/ganglia_metrics/DiskStats.py
___________________________________________________________________
Added: svn:eol-style
1175 + native
Index: trunk/ganglia_metrics/GangliaMetrics.py
@@ -1,22 +1,41 @@
22 from xdrlib import Packer
3 -import time, re, sys, logging
 3+import time, logging, socket
44
55 """ Metric base class """
66
77 class Metric(object):
8 -
9 - def __init__(self, name, units = '', type = 'double'):
 8+ def __init__(self, name, meta, units = ''):
109 self.name = name
1110 self.units = units
12 - self.type = type
 11+ self.type = 'float'
 12+ self.meta = meta
1313 self.lastSendTime = 0
 14+ self.lastMetadataSendTime = 0
1415
1516 self.slope = 'both'
1617 self.tmax = 60
1718 self.dmax = 0
1819 self.interval = 10
 20+ self.metadataInterval = 60
 21+ self.hostname = socket.gethostname()
 22+ self.format = '%.1f'
1923
2024 self.value = 0
 25+
 26+ self.gangliaVersion = 3 # TODO: get this from the system
 27+ self.debug = False
 28+
 29+ self.formatIDs = {
 30+ 'full': 128,
 31+ 'ushort': 128 + 1,
 32+ 'short': 128 + 2,
 33+ 'int': 128 + 3,
 34+ 'uint': 128 + 4,
 35+ 'string': 128 + 5,
 36+ 'float': 128 + 6,
 37+ 'double': 128 + 7,
 38+ 'request': 128 + 8
 39+ }
2140
2241 def isReady(self):
2342 return time.time() - self.lastSendTime >= self.interval
@@ -24,23 +43,83 @@
2544 def send(self, sock, address):
2645 value = self.getValue()
2746 if value != None:
28 - packer = Packer()
29 - packer.pack_enum(0) # metric_user_defined
30 - packer.pack_string(self.type)
31 - packer.pack_string(self.name)
32 - packer.pack_string(str(value))
33 - packer.pack_string(self.units)
34 - if self.slope == 'zero':
35 - slope = 0
36 - else:
37 - slope = 3
38 - packer.pack_uint(slope)
39 - packer.pack_uint(self.tmax)
40 - packer.pack_uint(self.dmax)
 47+ if self.debug:
 48+ logging.getLogger('GangliaMetrics').info('Sending %s = %s' % (self.name, value))
4149
42 - sock.sendto(packer.get_buffer(), address)
 50+ if self.gangliaVersion >= 3:
 51+ self.sendV3Data(sock, address, value)
 52+ else:
 53+ self.sendV2Data(sock, address, value)
4354 self.lastSendTime = time.time()
 55+
 56+ def sendV2Data(self, sock, address, value):
 57+ packer = Packer()
 58+ packer.pack_enum(0) # metric_user_defined
 59+ packer.pack_string(self.type)
 60+ packer.pack_string(self.name)
 61+ packer.pack_string(str(value))
 62+ packer.pack_string(self.units)
 63+ if self.slope == 'zero':
 64+ slope = 0
 65+ else:
 66+ slope = 3 # both
 67+ packer.pack_uint(slope)
 68+ packer.pack_uint(self.tmax)
 69+ packer.pack_uint(self.dmax)
 70+
 71+ sock.sendto(packer.get_buffer(), address)
4472
 73+ def sendV3Data(self, sock, address, value):
 74+ if time.time() - self.lastMetadataSendTime >= self.metadataInterval:
 75+ self.sendMetadata(sock, address)
 76+
 77+ packer = Packer()
 78+ packer.pack_enum(self.formatIDs[self.type])
 79+ packer.pack_string(self.hostname)
 80+ packer.pack_string(self.name)
 81+ packer.pack_bool(False) # spoof = false
 82+ packer.pack_string(self.format)
 83+ self.packValue(packer, value)
 84+
 85+ sock.sendto(packer.get_buffer(), address)
 86+
 87+ def packValue(self, packer, value):
 88+ if self.type == 'int':
 89+ packer.pack_int(value)
 90+ elif self.type == 'string':
 91+ packer.pack_string(value)
 92+ elif self.type == 'float':
 93+ packer.pack_float(value)
 94+ elif self.type == 'double':
 95+ packer.pack_double(value)
 96+ else:
 97+ logging.getLogger('GangliaMetrics').error('Cannot pack type ' + self.type)
 98+
 99+ def sendMetadata(self, sock, address):
 100+ self.lastMetadataSendTime = time.time()
 101+ packer = Packer()
 102+ packer.pack_enum(self.formatIDs['full'])
 103+ packer.pack_string(self.hostname)
 104+ packer.pack_string(self.name)
 105+ packer.pack_bool(False) # spoof = false
 106+ packer.pack_string(self.type)
 107+ packer.pack_string(self.name)
 108+ packer.pack_string(self.units)
 109+ if self.slope == 'zero':
 110+ slope = 0
 111+ else:
 112+ slope = 3
 113+ packer.pack_uint(slope)
 114+ packer.pack_uint(self.tmax)
 115+ packer.pack_uint(self.dmax)
 116+
 117+ packer.pack_uint(len(self.meta)) # array length
 118+ for name, value in self.meta.items():
 119+ packer.pack_string(name)
 120+ packer.pack_string(value)
 121+
 122+ sock.sendto(packer.get_buffer(), address)
 123+
45124 def getValue(self):
46125 return self.value
47126
@@ -53,8 +132,8 @@
54133 between consecutive values is calculated, the result is a count per second.
55134 """
56135 class DeltaMetric(Metric):
57 - def __init__(self, name, units = '', type = 'double'):
58 - Metric.__init__(self, name, units, type)
 136+ def __init__(self, name, meta, units = ''):
 137+ Metric.__init__(self, name, meta, units)
59138 self.lastCounterValue = 0
60139 self.lastRefTime = None
61140 self.lastElapsed = None
@@ -92,8 +171,8 @@
93172 A rolling average metric
94173 """
95174 class RollingMetric(Metric):
96 - def __init__(self, name, avPeriod = 60, units = '', type = 'double'):
97 - Metric.__init__(self, name, units, type)
 175+ def __init__(self, name, meta, avPeriod = 60, units = ''):
 176+ Metric.__init__(self, name, meta, units)
98177 self.queue = []
99178 self.sum = 0
100179 self.targetSize = avPeriod / self.interval
@@ -124,8 +203,8 @@
125204 If no value is pushed during a given polling interval, the previous average is returned
126205 """
127206 class PushMetric(Metric):
128 - def __init__(self, name, units = '', type = 'double'):
129 - Metric.__init__(self, name, units, type)
 207+ def __init__(self, name, meta, units = ''):
 208+ Metric.__init__(self, name, meta, units)
130209 self.lastAv = None
131210 self.sum = 0
132211 self.count = 0
@@ -179,120 +258,12 @@
180259 self.metrics[metric.name] = metric
181260
182261 """
183 -Utilisation metric for DiskStats
 262+String metric
184263 """
185 -class DiskUtilItem(DeltaMetricItem):
186 - def __init__(self, name):
187 - DeltaMetricItem.__init__(self, name, '%')
188 -
189 - def getValue(self):
190 - # Get the time spent doing I/O, in milliseconds per second
191 - value = DeltaMetricItem.getValue(self)
192 - if self.lastElapsed and value != None:
193 - # Convert to a percentage of the elapsed time
194 - return value / 10
195 - else:
196 - return None
197 -
198 -"""
199 -Load metric for DiskStats
200 -"""
201 -class DiskLoadItem(DeltaMetricItem):
202 - def __init__(self, name):
203 - DeltaMetricItem.__init__(self, name)
204 -
205 - def getValue(self):
206 - # Get the time spent doing I/O, in milliseconds per second
207 - value = DeltaMetricItem.getValue(self)
208 - if self.lastElapsed and value != None:
209 - # Convert to a plain ratio
210 - return value / 1000
211 - else:
212 - return None
213 -
214 -"""
215 -Statistics from /proc/diskstats
216 -Requires Linux 2.6+
217 -"""
218 -class DiskStats(MetricCollection):
219 - # Field indexes
220 - BLANK_INITIAL_SPACE = 0
221 - MAJOR = 1
222 - MINOR = 2
223 - NAME = 3
224 - READS = 4
225 - READS_MERGED = 5
226 - SECTORS_READ = 6
227 - MS_READING = 7
228 - WRITES = 8
229 - WRITES_MERGED = 9
230 - SECTORS_WRITTEN = 10
231 - MS_WRITING = 11
232 - REQS_PENDING = 12
233 - MS_TOTAL = 13
234 - MS_WEIGHTED = 14
235 -
236 - def __init__(self):
237 - self.metrics = {
238 - 'diskio_read_bytes': DeltaMetricItem('diskio_read_bytes', 'bytes/sec'),
239 - 'diskio_write_bytes': DeltaMetricItem('diskio_write_bytes', 'bytes/sec'),
240 - 'diskio_read_load': DiskLoadItem('diskio_read_load'),
241 - 'diskio_write_load': DiskLoadItem('diskio_write_load'),
242 - 'diskio_total_load': DiskLoadItem('diskio_total_load'),
243 - 'diskio_util': DiskUtilItem('diskio_util')
244 - }
245 - self.delimiterRegex = re.compile(r"\s+")
246 - self.deviceRegex = re.compile(r"^[sh]d[a-z]$")
247 - self.disabled = False
248 -
249 - def update(self):
250 - if self.disabled:
251 - return False
252 -
253 - try:
254 - procfile = open('/proc/diskstats', 'r')
255 - except IOError:
256 - type, value = sys.exc_info()[:2]
257 - logger = logging.getLogger('GangliaMetrics')
258 - logger.warning("Unable to open /proc/diskstats: %s\n" % value)
259 - self.disabled = True
260 - return False
261 -
262 - contents = procfile.read(100000)
263 - refTime = time.time()
264 - procfile.close()
265 - lines = contents.splitlines()
266 -
267 - devCount = 0
268 - sums = None
269 - for line in lines:
270 - fields = self.delimiterRegex.split(line)
271 - if self.deviceRegex.search(fields[self.NAME]) == None or \
272 - len(fields) < self.MS_WEIGHTED or \
273 - fields[self.READS] == 0:
274 - continue
275 -
276 - if sums == None:
277 - sums = [0] * len(fields)
278 -
279 - # Sum the summable stats
280 - for i in xrange(len(fields)):
281 - if fields[i].isdigit():
282 - sums[i] += long(fields[i])
283 - devCount += 1
284 -
285 - # Put the summed stats into metrics
286 - if devCount:
287 - # The sector size in this case is hard-coded in the kernel as 512 bytes
288 - # There doesn't appear to be any simple way to retrieve that figure
289 - self.metrics['diskio_read_bytes'].set(sums[self.SECTORS_READ] * 512, refTime)
290 - self.metrics['diskio_write_bytes'].set(sums[self.SECTORS_WRITTEN] * 512, refTime)
291 -
292 - self.metrics['diskio_read_load'].set(sums[self.MS_READING], refTime, devCount)
293 - self.metrics['diskio_write_load'].set(sums[self.MS_WRITING], refTime, devCount)
294 - self.metrics['diskio_total_load'].set(sums[self.MS_WEIGHTED], refTime, devCount)
295 - self.metrics['diskio_util'].set(sums[self.MS_TOTAL], refTime, devCount)
296 -
297 - return devCount != 0
298 -
299 -
 264+class StringMetric(Metric):
 265+ def __init__(self, name, meta):
 266+ Metric.__init__(self, name, meta)
 267+ self.interval = 60
 268+ self.type = 'string'
 269+ self.format = '%s'
 270+ self.value = ''
Index: trunk/ganglia_metrics/debian/changelog
@@ -1,3 +1,11 @@
 2+ganglia-metrics (1.3-1) lucid; urgency=low
 3+
 4+ * Fixed MySQL metrics
 5+ * Updated for Ganglia 3.1
 6+ * Added configuration file
 7+
 8+ -- Tim Starling <tstarling@wikimedia.org> Mon, 12 Jul 2010 11:07:17 +1000
 9+
210 ganglia-metrics (1.2-1) hardy; urgency=low
311
412 * Depend on ganglia-monitor | gmond
Index: trunk/ganglia_metrics/gmetricd.py
@@ -1,21 +1,37 @@
22 #! /usr/bin/env python
33
44 from xdrlib import Packer
5 -import sys, socket, re, GangliaMetrics, MySQLStats, time, os, signal, pwd, logging, commands
 5+import sys, socket, re, GangliaMetrics, DiskStats, MySQLStats, time, os, signal, pwd, logging
 6+import StringIO, ConfigParser
67 from SelectServer import *
78
89 # Configuration
910
10 -conf = {
11 - 'gmondConf': '/etc/gmond.conf',
 11+configParser = ConfigParser.ConfigParser( {
 12+ 'gmondconf': '/etc/gmond.conf',
1213 'sock': '/tmp/gmetric.sock',
1314 'log': '/var/log/gmetricd/gmetricd.log',
1415 'pid': '/var/run/gmetricd.pid',
1516 'user': 'gmetric',
16 - 'dbuser': 'wikiadmin',
17 - 'dbpassword': commands.getoutput('/home/wikipedia/bin/wikiadmin_pass')
18 -}
 17+ 'dbuser': '',
 18+ 'dbpassword': '',
 19+ 'mysqlclient': 'mysql',
 20+} )
1921
 22+try:
 23+ configFile = open('/etc/gmetricd.conf')
 24+except:
 25+ configFile = False
 26+if configFile:
 27+ configData = "[DEFAULT]\n"
 28+ configData += configFile.read()
 29+ configFile.close()
 30+ configParser.readfp(StringIO.StringIO(configData))
 31+
 32+conf = {}
 33+for name, value in configParser.items('DEFAULT'):
 34+ conf[name] = value
 35+
2036 unixSocket = None
2137
2238 class GmetricListenSocket(ListenSocket):
@@ -106,7 +122,7 @@
107123 selectServer = SelectServer()
108124
109125 # Determine the multicast address
110 -gmondFile = open(conf['gmondConf'])
 126+gmondFile = open(conf['gmondconf'])
111127 addrRegex = re.compile(r"^\s*mcast_join\s*=\s*([0-9.:]+)")
112128 portRegex = re.compile(r"^\s*port\s*=\s*([0-9]+)")
113129 ttlRegex = re.compile(r"^\s*ttl\s*=\s*([0-9]+)")
@@ -161,7 +177,7 @@
162178 selectServer.addReader(unixSocket)
163179
164180 # Create the metrics
165 -diskStats = GangliaMetrics.DiskStats()
 181+diskStats = DiskStats.DiskStats()
166182 pushMetrics = GangliaMetrics.MetricCollection()
167183
168184 mysqlStats = MySQLStats.MySQLStats( conf['dbuser'], conf['dbpassword'] )
Index: trunk/ganglia_metrics/setup.py
@@ -3,7 +3,7 @@
44 from distutils.core import setup
55
66 setup(name='ganglia_metrics',
7 - version='1.0',
 7+ version='1.3',
88 description='Ganglia metric daemon',
99 author='Tim Starling',
1010 author_email='tstarling@wikimedia.org',
Index: trunk/ganglia_metrics/MySQLStats.py
@@ -1,4 +1,4 @@
2 -import logging, commands, time
 2+import logging, subprocess, time, pprint
33 from GangliaMetrics import *
44 from xml.dom.minidom import parseString
55
@@ -8,54 +8,95 @@
99 class MySQLStats(MetricCollection):
1010 def __init__(self, user, password):
1111 self.metrics = {
12 - 'mysql_questions': DeltaMetricItem('mysql_questions', 'q/s'),
13 - 'mysql_threads_connected': RollingMetric('mysql_threads_connected', 60),
14 - 'mysql_threads_running': RollingMetric('mysql_threads_running', 60),
15 - 'mysql_slave_lag': Metric('mysql_slave_lag', 's')
 12+ 'mysql_questions': DeltaMetricItem(
 13+ 'mysql_questions',
 14+ {
 15+ 'TITLE': 'MySQL queries',
 16+ 'DESC': 'Queries per second received at this MySQL server',
 17+ 'GROUP': 'mysql'
 18+ },
 19+ 'q/s'),
 20+ 'mysql_threads_connected': RollingMetric(
 21+ 'mysql_threads_connected',
 22+ {
 23+ 'TITLE': 'MySQL threads connected',
 24+ 'DESC': 'Number of threads connected to this MySQL server',
 25+ 'GROUP': 'mysql'
 26+ },
 27+ 60),
 28+ 'mysql_threads_running': RollingMetric(
 29+ 'mysql_threads_running',
 30+ {
 31+ 'TITLE': 'MySQL threads running',
 32+ 'DESC': 'Number of MySQL threads in a non-sleep state',
 33+ 'GROUP': 'mysql'
 34+ },
 35+ 60)
1636 }
1737 self.user = user
1838 self.password = password
19 - self.pipes = None
2039
2140 if self.query('select 1') == None:
2241 self.disabled = True
 42+ logger = logging.getLogger('GangliaMetrics')
 43+ logger.warning('Unable to run query, disabling MySQL statistics')
2344 else:
2445 self.disabled = False
25 - logger = logging.getLogger('GangliaMetrics')
26 - logger.warning('Unable to run query, disabling MySQL statistics')
 46+ lag = self.getLag()
 47+ if lag != None:
 48+ self.addLagMetric()
2749
 50+ def addLagMetric(self):
 51+ self.metrics['mysql_slave_lag'] = Metric(
 52+ 'mysql_slave_lag',
 53+ {
 54+ 'TITLE': 'MySQL slave lag',
 55+ 'DESC': 'MySQL slave lag in seconds (may be zero if replication is broken)',
 56+ 'GROUP': 'mysql'
 57+ },
 58+ 's')
 59+
2860 def update(self):
29 - if disabled:
 61+ if self.disabled:
3062 return False
3163
3264 refTime = time.time()
3365 status = self.showStatus()
34 - if not status:
35 - self.markDown()
36 - return False
 66+ if status:
 67+ self.metrics['mysql_questions'].set(int(status['Questions']), refTime)
 68+ self.metrics['mysql_threads_connected'].set(int(status['Threads_connected']))
 69+ self.metrics['mysql_threads_running'].set(int(status['Threads_running']))
3770
3871 lag = self.getLag()
 72+ if lag != None:
 73+ if 'mysql_slave_lag' not in self.metrics:
 74+ self.addLagMetric()
 75+ self.metrics['mysql_slave_lag'].set(int(lag))
3976
40 - self.metrics['mysql_questions'].set(int(status['Questions']), refTime)
41 - self.metrics['mysql_threads_connected'].set(int(status['Threads_connected']))
42 - self.metrics['mysql_threads_running'].set(int(status['Threads_running']))
43 - self.metrics['mysql_slave_lag'].set(float(lag)) # float = wishful thinking
4477 return True
4578
46 - def escapeshellarg(self, s):
47 - return s.replace( "\\", "\\\\").replace( "'", "'\\''")
 79+ def query(self, sql):
 80+ global conf
 81+ proc = subprocess.Popen(
 82+ [
 83+ conf['mysqlclient'], '-XB',
 84+ '--user=' + self.user,
 85+ '--password=' + self.password,
 86+ '-e', sql
 87+ ],
 88+ stdout = subprocess.PIPE,
 89+ stderr = subprocess.PIPE )
 90+ (out, stderr) = proc.communicate()
 91+ if proc.returncode:
 92+ logger = logging.getLogger('GangliaMetrics')
 93+ logger.warning("SQL error: " + stderr.rstrip())
 94+ return None
4895
49 - def query(self, sql):
50 - out = commands.getoutput("mysql -XB -u '%s' -p'%s' -e '%s'" % (
51 - self.escapeshellarg(self.user),
52 - self.escapeshellarg(self.password),
53 - self.escapeshellarg(sql)
54 - ))
5596 try:
5697 dom = parseString(out)
5798 except:
5899 logger = logging.getLogger('GangliaMetrics')
59 - logger.warning("SQL error: Unable to parse XML result\n")
 100+ logger.warning("SQL error: Unable to parse XML result")
60101 return None
61102 return dom
62103
@@ -64,35 +105,39 @@
65106 self.metrics['mysql_threads_connected'].set(None)
66107 self.metrics['mysql_threads_running'].set(None)
67108 self.metrics['mysql_slave_lag'].set(None)
68 - self.conn = None
69109
70110 def showStatus(self):
71 - result = self.query("SHOW STATUS")
 111+ result = self.query("SHOW /*!50002 GLOBAL */ STATUS")
72112 if not result:
73113 return None
74114
75115 resultHash = {}
76116 for row in result.documentElement.getElementsByTagName('row'):
77 - name = row.getElementsByTagName('Variable_name')[0].childNodes[0].data
78 - value = row.getElementsByTagName('Value')[0].childNodes[0].data
79 - resultHash[name] = value
 117+ name = None
 118+ value = None
 119+ for field in row.childNodes:
 120+ if field.nodeName != 'field' or field.firstChild == None:
 121+ continue
 122+ if field.getAttribute('name') == 'Variable_name':
 123+ name = field.firstChild.data
 124+ elif field.getAttribute('name') == 'Value':
 125+ value = field.firstChild.data
 126+ if name != None and value != None:
 127+ resultHash[name] = value
80128 return resultHash
81129
82130 def getLag(self):
83 - result = self.query("SHOW PROCESSLIST")
 131+ result = self.query("SHOW SLAVE STATUS")
84132 if not result:
85133 return None
86134
87 - for row in result.documentElement.getElementsByTagName('row'):
88 - user = row.getElementsByTagName('User')[0].childNodes[0].data
89 - time = row.getElementsByTagName('Time')[0].childNodes[0].data
90 - state = row.getElementsByTagName('State')[0].childNodes[0].data
91 - if user == 'system user' and \
92 - state != 'Waiting for master to send event' and \
93 - state != 'Connecting to master' and \
94 - state != 'Queueing master event to the relay log' and \
95 - state != 'Waiting for master update' and \
96 - state != 'Requesting binlog dump':
97 - return time
 135+ fields = result.documentElement.getElementsByTagName('field')
 136+ if not fields.length:
 137+ return None
 138+
 139+ for field in fields:
 140+ if field.getAttribute('name') == 'Seconds_Behind_Master' and field.firstChild:
 141+ return field.firstChild.data
 142+
98143 return None
99 -
 144+

Follow-up revisions

RevisionCommit summaryAuthorDate
r69279Fixes to r69278 following package testing:...tstarling01:30, 12 July 2010

Status & tagging log