r94156 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r94155‎ | r94156 | r94157 >
Date:14:57, 10 August 2011
Author:mark
Status:deferred
Tags:
Comment:
* Implement Ganglia gmond python plugin; htcpseqcheck can now run standalone or as part of gmond
* Move more common functions out of htcpseqcheck and udpmcast into util.py
* Implement sliding counts for percentages
Modified paths:
  • /trunk/udpmcast/htcpseqcheck.py (modified) (history)
  • /trunk/udpmcast/htcpseqcheck.pyconf (added) (history)
  • /trunk/udpmcast/htcpseqcheck_ganglia.py (added) (history)
  • /trunk/udpmcast/udpmcast.py (modified) (history)
  • /trunk/udpmcast/util.py (modified) (history)

Diff [purge]

Index: trunk/udpmcast/htcpseqcheck.pyconf
@@ -0,0 +1,61 @@
 2+# Ganglia Python gmond module configuration file
 3+
 4+modules {
 5+ module {
 6+ name = "htcpseqcheck"
 7+ language = "python"
 8+
 9+ param multicast_group {
 10+ value = "239.128.0.112"
 11+ }
 12+
 13+ param port {
 14+ value = 4827
 15+ }
 16+ }
 17+}
 18+
 19+collection_group {
 20+ collect_every = 15
 21+ time_threshold = 15
 22+
 23+ metric {
 24+ name = "htcp_losspct"
 25+ title = "HTCP packet loss percentage"
 26+ }
 27+
 28+ metric {
 29+ name = "htcp_dequeued"
 30+ title = "Dequeued HTCP packets"
 31+ }
 32+
 33+ metric {
 34+ name = "htcp_dups"
 35+ title = "Duplicate HTCP packets"
 36+ }
 37+
 38+ metric {
 39+ name = "htcp_ancient"
 40+ title = "Ancient HTCP packets"
 41+ }
 42+
 43+ metric {
 44+ name = "htcp_received"
 45+ title = "Received HTCP packets"
 46+ }
 47+
 48+ metric {
 49+ name = "htcp_sources"
 50+ title = "Unique HTCP senders"
 51+ }
 52+
 53+ metric {
 54+ name = "htcp_lost"
 55+ title = "Lost HTCP packets"
 56+ }
 57+
 58+ metric {
 59+ name = "htcp_outoforder"
 60+ title = "HTCP packets received out-of-order"
 61+ }
 62+}
\ No newline at end of file
Index: trunk/udpmcast/util.py
@@ -1,8 +1,45 @@
22 # util.py
33 # utility functions shared by udpmcast and htcpseqcheck
44
5 -import os, signal
 5+import sys, os, signal, socket
66
 7+# Globals
 8+
 9+debugging = False
 10+
 11+def debug(msg):
 12+ global debugging
 13+
 14+ if debugging:
 15+ print >> sys.stderr, "DEBUG:", msg
 16+
 17+def open_htcp_socket(host="", portnr=4827):
 18+ # Open the UDP socket
 19+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
 20+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 21+ sock.bind((host, portnr))
 22+
 23+ return sock
 24+
 25+def join_multicast_group(sock, multicast_group):
 26+ import struct
 27+
 28+ ip_mreq = struct.pack('!4sl', socket.inet_aton(multicast_group),
 29+ socket.INADDR_ANY)
 30+ sock.setsockopt(socket.IPPROTO_IP,
 31+ socket.IP_ADD_MEMBERSHIP,
 32+ ip_mreq)
 33+
 34+ # We do not want to see our own messages back
 35+ sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)
 36+
 37+def set_multicast_ttl(sock, ttl):
 38+ # Set the multicast TTL if requested
 39+ sock.setsockopt(socket.IPPROTO_IP,
 40+ socket.IP_MULTICAST_TTL,
 41+ ttl)
 42+
 43+
744 def createDaemon():
845 """
946 Detach a process from the controlling terminal and run it in the
Index: trunk/udpmcast/udpmcast.py
@@ -6,15 +6,11 @@
77 #
88 # $Id$
99
 10+import util
1011 import socket, getopt, sys, pwd, grp
1112
12 -debugging = False
 13+from util import debugging
1314
14 -def debug(msg):
15 - global debugging
16 - if debugging:
17 - print msg;
18 -
1915 def multicast_diagrams(sock, addrrules):
2016 portnr = sock.getsockname()[1];
2117
@@ -88,7 +84,7 @@
8985 elif option == '-g':
9086 group = value
9187 elif option == '-v':
92 - debugging = True
 88+ util.debugging = True
9389 elif option == '-t':
9490 multicast_ttl = int(value)
9591
@@ -101,25 +97,20 @@
10298 print "Error: Could not change uid or gid."
10399 sys.exit(-1)
104100
 101+
105102 # Become a daemon
106103 if daemon:
107 - from util import createDaemon
108 - createDaemon()
 104+ util.createDaemon()
109105
110 - # Open the UDP socket
111 - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
112 - sock.bind((host, portnr))
113 -
114 - # Set the multicast TTL if requested
 106+ sock = util.open_htcp_socket(host, portnr)
 107+
115108 if multicast_ttl is not None:
116 - sock.setsockopt(socket.IPPROTO_IP,
117 - socket.IP_MULTICAST_TTL,
118 - multicast_ttl)
 109+ util.set_multicast_ttl(sock, multicast_ttl)
119110
120111 # Join a multicast group if requested
121112 if multicast_group is not None:
122113 debug('Joining multicast group ' + multicast_group)
123 - join_multicast_group(sock, multicast_group)
 114+ util.join_multicast_group(sock, multicast_group)
124115
125116 # Parse the argument list
126117 addrrules = { 0: [] }
Index: trunk/udpmcast/htcpseqcheck.py
@@ -6,7 +6,10 @@
77 #
88 # $Id$
99
10 -import socket, getopt, sys, pwd, grp, struct
 10+import util
 11+import socket, getopt, sys, pwd, grp, struct, threading
 12+
 13+from util import debug
1114
1215 from datetime import datetime, timedelta
1316 from collections import deque
@@ -18,16 +21,11 @@
1922
2023 # Globals
2124
22 -debugging = False
2325 sourcebuf = {}
24 -totalcounts = Counter()
 26+totalcounts, slidingcounts = Counter(), Counter()
 27+slidingdeque = deque()
 28+stats_lock = threading.Lock()
2529
26 -def debug(msg):
27 - global debugging
28 - if debugging:
29 - print >> sys.stderr, "DEBUG:", msg
30 -
31 -
3230 class RingBuffer(deque):
3331 """
3432 Implements TCP window like behavior
@@ -106,56 +104,60 @@
107105
108106 checkhtcpseq(diagram, srcaddr[0])
109107
 108+def update_sliding_counts(counts, maxlen=10000):
 109+ "Implements a sliding window of counts"
 110+ global slidingdeque, slidingcounts
 111+
 112+ slidingcounts += counts
 113+ slidingdeque.append(counts)
 114+
 115+ if len(slidingdeque) > maxlen:
 116+ slidingcounts -= slidingdeque.popleft()
 117+
110118 def checkhtcpseq(diagram, srcaddr):
111 - global sourcebuf, totalcounts
 119+ global sourcebuf, totalcounts, slidingcounts, stats_lock
112120
113121 transid = struct.unpack('!I', diagram[8:12])[0]
114122
115 - sb = sourcebuf.setdefault(srcaddr, RingBuffer())
116 - try:
117 - counts = sb.add(transid)
118 - except IndexError:
119 - pass
120 - else:
121 - totalcounts.update(counts)
122 - if counts['lost']:
123 - # Lost packets
124 - print "%d lost packet(s) from %s, last id %d" % (counts['lost'], srcaddr, transid)
125 - elif counts['ancient']:
126 - print "Ancient packet from %s, id %d" % (srcaddr, transid)
127 -
128 - if counts['lost'] and sb.counts['dequeued']:
129 - print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %s" % (
130 - sb.counts['lost'],
131 - sb.counts['dequeued'],
132 - float(sb.counts['lost'])*100/sb.counts['dequeued'],
133 - sb.counts['outoforder'],
134 - sb.counts['dups'],
135 - sb.counts['ancient'],
136 - sb.counts['received'],
137 - srcaddr)
138 - print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %d sources" % (
139 - totalcounts['lost'],
140 - totalcounts['dequeued'],
141 - float(totalcounts['lost'])*100/totalcounts['dequeued'],
142 - totalcounts['outoforder'],
143 - totalcounts['dups'],
144 - totalcounts['ancient'],
145 - totalcounts['received'],
146 - len(sourcebuf.keys()))
 123+ with stats_lock: # Critical section
 124+ sb = sourcebuf.setdefault(srcaddr, RingBuffer())
 125+ try:
 126+ counts = sb.add(transid)
 127+ except IndexError:
 128+ pass
 129+ else:
 130+ totalcounts.update(counts)
 131+ update_sliding_counts(counts)
 132+
 133+ # Don't bother printing stats if sys.stdout is set to None
 134+ if not sys.stdout: return
 135+
 136+ if counts['lost']:
 137+ # Lost packets
 138+ print "%d lost packet(s) from %s, last id %d" % (counts['lost'], srcaddr, transid)
 139+ elif counts['ancient']:
 140+ print "Ancient packet from %s, id %d" % (srcaddr, transid)
 141+
 142+ if counts['lost'] and sb.counts['dequeued']:
 143+ print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %s" % (
 144+ sb.counts['lost'],
 145+ sb.counts['dequeued'],
 146+ float(sb.counts['lost'])*100/sb.counts['dequeued'],
 147+ sb.counts['outoforder'],
 148+ sb.counts['dups'],
 149+ sb.counts['ancient'],
 150+ sb.counts['received'],
 151+ srcaddr)
 152+ print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %d sources" % (
 153+ slidingcounts['lost'],
 154+ slidingcounts['dequeued'],
 155+ float(slidingcounts['lost'])*100/slidingcounts['dequeued'],
 156+ totalcounts['outoforder'],
 157+ totalcounts['dups'],
 158+ totalcounts['ancient'],
 159+ totalcounts['received'],
 160+ len(sourcebuf.keys()))
147161
148 -def join_multicast_group(sock, multicast_group):
149 - import struct
150 -
151 - ip_mreq = struct.pack('!4sl', socket.inet_aton(multicast_group),
152 - socket.INADDR_ANY)
153 - sock.setsockopt(socket.IPPROTO_IP,
154 - socket.IP_ADD_MEMBERSHIP,
155 - ip_mreq)
156 -
157 - # We do not want to see our own messages back
158 - sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)
159 -
160162 def print_help():
161163 print 'Usage:\n\thtcpseqcheck [ options ]\n'
162164 print 'Options:'
@@ -191,7 +193,7 @@
192194 elif option == '-g':
193195 group = value
194196 elif option == '-v':
195 - debugging = True
 197+ util.debugging = True
196198
197199 try:
198200 # Change uid and gid
@@ -204,23 +206,25 @@
205207
206208 # Become a daemon
207209 if daemon:
208 - from util import createDaemon
209 - createDaemon()
 210+ util.createDaemon()
210211
211 - # Open the UDP socket
212 - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
213 - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
214 - sock.bind((host, portnr))
 212+ sock = util.open_htcp_socket(host, portnr)
215213
216214 # Join a multicast group if requested
217215 if multicast_group is not None:
218216 debug('Joining multicast group ' + multicast_group)
219 - join_multicast_group(sock, multicast_group)
 217+ util.join_multicast_group(sock, multicast_group)
220218
221 - # Multiplex everything that comes in
 219+ # Start receiving HTCP packets
222220 receive_htcp(sock)
223221 except socket.error, msg:
224222 print msg[1];
225223 except KeyboardInterrupt:
226224 pass
227225
 226+
 227+# Ganglia gmond module support
 228+try:
 229+ from htcpseqcheck_ganglia import metric_init, metric_cleanup
 230+except ImportError:
 231+ pass
\ No newline at end of file
Index: trunk/udpmcast/htcpseqcheck_ganglia.py
@@ -0,0 +1,149 @@
 2+#!/usr/bin/env python
 3+
 4+# htcpseqcheck_ganglia.py
 5+# Ganglia gmond module integration
 6+
 7+import htcpseqcheck, util
 8+import threading, sys, socket, datetime
 9+
 10+from util import debug
 11+
 12+# Globals
 13+metrics = {}
 14+
 15+class HTCPSeqCheckThread(threading.Thread):
 16+
 17+ name = "HTCPSeqCheck"
 18+ daemon = True
 19+
 20+ def run(self, kwargs={}):
 21+ try:
 22+ sock = util.open_htcp_socket(kwargs.get('host', ""), kwargs.get('port', 4827))
 23+
 24+ # Join a multicast group if requested
 25+ if 'multicast_group' in kwargs:
 26+ debug('Joining multicast group ' + kwargs['multicast_group'])
 27+ util.join_multicast_group(sock, kwargs['multicast_group'])
 28+
 29+ # Set sys.stdout to None; ganglia will do so anyway, and we
 30+ # can detect this in htcpseqcheck.
 31+
 32+ # Start receiving HTCP packets
 33+ htcpseqcheck.receive_htcp(sock)
 34+ except socket.error, msg:
 35+ print >> sys.stderr, msg[1]
 36+ sys.exit(1)
 37+
 38+def build_metrics_dict():
 39+ "Builds a dict of metric parameter dicts"
 40+
 41+ metrics = {
 42+ 'htcp_losspct': {
 43+ 'value_type': "float",
 44+ 'units': "%",
 45+ 'format': "%.2f",
 46+ 'slope': "both",
 47+ 'description': "HTCP packet loss percentage",
 48+ 'int_name': None,
 49+ },
 50+ 'htcp_lost': {
 51+ 'value_type': "uint",
 52+ 'units': "packets/s",
 53+ 'format': "%u",
 54+ 'slope': "positive",
 55+ 'description': "Lost HTCP packets",
 56+ 'int_name': "lost",
 57+ },
 58+ 'htcp_dequeued': {
 59+ 'value_type': "uint",
 60+ 'units': "packets/s",
 61+ 'format': "%u",
 62+ 'slope': "positive",
 63+ 'description': "Dequeued HTCP packets",
 64+ 'int_name': "dequeued",
 65+ },
 66+ 'htcp_outoforder': {
 67+ 'value_type': "uint",
 68+ 'units': "packets/s",
 69+ 'format': "%u",
 70+ 'slope': "positive",
 71+ 'description': "HTCP packets received out-of-order",
 72+ 'int_name': "outoforder",
 73+ },
 74+ 'htcp_dups': {
 75+ 'value_type': "uint",
 76+ 'units': "dups/s",
 77+ 'format': "%u",
 78+ 'slope': "positive",
 79+ 'description': "Duplicate HTCP packets",
 80+ 'int_name': "dups",
 81+ },
 82+ 'htcp_ancient': {
 83+ 'value_type': "uint",
 84+ 'units': "packets/s",
 85+ 'format': "%u",
 86+ 'slope': "positive",
 87+ 'description': "Ancient HTCP packets",
 88+ 'int_name': "ancient",
 89+ },
 90+ 'htcp_received': {
 91+ 'value_type': "uint",
 92+ 'units': "packets/s",
 93+ 'format': "%u",
 94+ 'slope': "positive",
 95+ 'description': "Received HTCP packets",
 96+ 'int_name': "received",
 97+ },
 98+ 'htcp_sources': {
 99+ 'value_type': "uint",
 100+ 'units': "sources",
 101+ 'format': "%u",
 102+ 'slope': "both",
 103+ 'description': "Unique HTCP senders",
 104+ 'int_name': None,
 105+ }
 106+ }
 107+
 108+ # Add common values
 109+ for metricname, metric in metrics.iteritems():
 110+ metric.update({
 111+ 'name': metricname,
 112+ 'call_back': metric_handler,
 113+ 'time_max': 15,
 114+ 'groups': "htcp"
 115+ })
 116+
 117+ return metrics
 118+
 119+def metric_init(params):
 120+ # gmond module initialization
 121+ global metrics
 122+
 123+ # Start HTCP metrics collection in a separate thread
 124+ HTCPSeqCheckThread().start()
 125+
 126+ metrics = build_metrics_dict()
 127+ return list(metrics.values())
 128+
 129+def metric_cleanup(params):
 130+ pass
 131+
 132+def metric_handler(name):
 133+ global metrics, silenceTime
 134+
 135+ metric = metrics[name]
 136+
 137+ try:
 138+ with htcpseqcheck.stats_lock: # Critical section
 139+ if name == "htcp_losspct":
 140+ return float(htcpseqcheck.slidingcounts['lost']) / htcpseqcheck.slidingcounts['dequeued'] * 100
 141+ elif name == "htcp_sources":
 142+ return len(htcpseqcheck.sourcebuf)
 143+ else:
 144+ return htcpseqcheck.totalcounts[metric['int_name']]
 145+ except:
 146+ return None
 147+
 148+if __name__ == '__main__':
 149+ for metric in build_metrics_dict().itervalues():
 150+ print " metric {\n name = \"%(name)s\"\n title = \"%(description)s\"\n }\n" % metric
\ No newline at end of file
Property changes on: trunk/udpmcast/htcpseqcheck_ganglia.py
___________________________________________________________________
Added: svn:mime-type
1151 + text/plain

Status & tagging log