Index: trunk/udpmcast/htcpseqcheck.pyconf |
— | — | @@ -0,0 +1,61 @@ |
| 2 | +# Ganglia Python gmond module configuration file |
| 3 | + |
| 4 | +modules { |
| 5 | + module { |
| 6 | + name = "htcpseqcheck" |
| 7 | + language = "python" |
| 8 | + |
| 9 | + param multicast_group { |
| 10 | + value = "239.128.0.112" |
| 11 | + } |
| 12 | + |
| 13 | + param port { |
| 14 | + value = 4827 |
| 15 | + } |
| 16 | + } |
| 17 | +} |
| 18 | + |
| 19 | +collection_group { |
| 20 | + collect_every = 15 |
| 21 | + time_threshold = 15 |
| 22 | + |
| 23 | + metric { |
| 24 | + name = "htcp_losspct" |
| 25 | + title = "HTCP packet loss percentage" |
| 26 | + } |
| 27 | + |
| 28 | + metric { |
| 29 | + name = "htcp_dequeued" |
| 30 | + title = "Dequeued HTCP packets" |
| 31 | + } |
| 32 | + |
| 33 | + metric { |
| 34 | + name = "htcp_dups" |
| 35 | + title = "Duplicate HTCP packets" |
| 36 | + } |
| 37 | + |
| 38 | + metric { |
| 39 | + name = "htcp_ancient" |
| 40 | + title = "Ancient HTCP packets" |
| 41 | + } |
| 42 | + |
| 43 | + metric { |
| 44 | + name = "htcp_received" |
| 45 | + title = "Received HTCP packets" |
| 46 | + } |
| 47 | + |
| 48 | + metric { |
| 49 | + name = "htcp_sources" |
| 50 | + title = "Unique HTCP senders" |
| 51 | + } |
| 52 | + |
| 53 | + metric { |
| 54 | + name = "htcp_lost" |
| 55 | + title = "Lost HTCP packets" |
| 56 | + } |
| 57 | + |
| 58 | + metric { |
| 59 | + name = "htcp_outoforder" |
| 60 | + title = "HTCP packets received out-of-order" |
| 61 | + } |
| 62 | +} |
\ No newline at end of file |
Index: trunk/udpmcast/util.py |
— | — | @@ -1,8 +1,45 @@ |
2 | 2 | # util.py |
3 | 3 | # utility functions shared by udpmcast and htcpseqcheck |
4 | 4 | |
5 | | -import os, signal |
| 5 | +import sys, os, signal, socket |
6 | 6 | |
| 7 | +# Globals |
| 8 | + |
| 9 | +debugging = False |
| 10 | + |
| 11 | +def debug(msg): |
| 12 | + global debugging |
| 13 | + |
| 14 | + if debugging: |
| 15 | + print >> sys.stderr, "DEBUG:", msg |
| 16 | + |
| 17 | +def open_htcp_socket(host="", portnr=4827): |
| 18 | + # Open the UDP socket |
| 19 | + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) |
| 20 | + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 21 | + sock.bind((host, portnr)) |
| 22 | + |
| 23 | + return sock |
| 24 | + |
| 25 | +def join_multicast_group(sock, multicast_group): |
| 26 | + import struct |
| 27 | + |
| 28 | + ip_mreq = struct.pack('!4sl', socket.inet_aton(multicast_group), |
| 29 | + socket.INADDR_ANY) |
| 30 | + sock.setsockopt(socket.IPPROTO_IP, |
| 31 | + socket.IP_ADD_MEMBERSHIP, |
| 32 | + ip_mreq) |
| 33 | + |
| 34 | + # We do not want to see our own messages back |
| 35 | + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0) |
| 36 | + |
| 37 | +def set_multicast_ttl(sock, ttl): |
| 38 | + # Set the multicast TTL if requested |
| 39 | + sock.setsockopt(socket.IPPROTO_IP, |
| 40 | + socket.IP_MULTICAST_TTL, |
| 41 | + ttl) |
| 42 | + |
| 43 | + |
7 | 44 | def createDaemon(): |
8 | 45 | """ |
9 | 46 | Detach a process from the controlling terminal and run it in the |
Index: trunk/udpmcast/udpmcast.py |
— | — | @@ -6,15 +6,11 @@ |
7 | 7 | # |
8 | 8 | # $Id$ |
9 | 9 | |
| 10 | +import util |
10 | 11 | import socket, getopt, sys, pwd, grp |
11 | 12 | |
12 | | -debugging = False |
| 13 | +from util import debugging |
13 | 14 | |
14 | | -def debug(msg): |
15 | | - global debugging |
16 | | - if debugging: |
17 | | - print msg; |
18 | | - |
19 | 15 | def multicast_diagrams(sock, addrrules): |
20 | 16 | portnr = sock.getsockname()[1]; |
21 | 17 | |
— | — | @@ -88,7 +84,7 @@ |
89 | 85 | elif option == '-g': |
90 | 86 | group = value |
91 | 87 | elif option == '-v': |
92 | | - debugging = True |
| 88 | + util.debugging = True |
93 | 89 | elif option == '-t': |
94 | 90 | multicast_ttl = int(value) |
95 | 91 | |
— | — | @@ -101,25 +97,20 @@ |
102 | 98 | print "Error: Could not change uid or gid." |
103 | 99 | sys.exit(-1) |
104 | 100 | |
| 101 | + |
105 | 102 | # Become a daemon |
106 | 103 | if daemon: |
107 | | - from util import createDaemon |
108 | | - createDaemon() |
| 104 | + util.createDaemon() |
109 | 105 | |
110 | | - # Open the UDP socket |
111 | | - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
112 | | - sock.bind((host, portnr)) |
113 | | - |
114 | | - # Set the multicast TTL if requested |
| 106 | + sock = util.open_htcp_socket(host, portnr) |
| 107 | + |
115 | 108 | if multicast_ttl is not None: |
116 | | - sock.setsockopt(socket.IPPROTO_IP, |
117 | | - socket.IP_MULTICAST_TTL, |
118 | | - multicast_ttl) |
| 109 | + util.set_multicast_ttl(sock, multicast_ttl) |
119 | 110 | |
120 | 111 | # Join a multicast group if requested |
121 | 112 | if multicast_group is not None: |
122 | 113 | debug('Joining multicast group ' + multicast_group) |
123 | | - join_multicast_group(sock, multicast_group) |
| 114 | + util.join_multicast_group(sock, multicast_group) |
124 | 115 | |
125 | 116 | # Parse the argument list |
126 | 117 | addrrules = { 0: [] } |
Index: trunk/udpmcast/htcpseqcheck.py |
— | — | @@ -6,7 +6,10 @@ |
7 | 7 | # |
8 | 8 | # $Id$ |
9 | 9 | |
10 | | -import socket, getopt, sys, pwd, grp, struct |
| 10 | +import util |
| 11 | +import socket, getopt, sys, pwd, grp, struct, threading |
| 12 | + |
| 13 | +from util import debug |
11 | 14 | |
12 | 15 | from datetime import datetime, timedelta |
13 | 16 | from collections import deque |
— | — | @@ -18,16 +21,11 @@ |
19 | 22 | |
20 | 23 | # Globals |
21 | 24 | |
22 | | -debugging = False |
23 | 25 | sourcebuf = {} |
24 | | -totalcounts = Counter() |
| 26 | +totalcounts, slidingcounts = Counter(), Counter() |
| 27 | +slidingdeque = deque() |
| 28 | +stats_lock = threading.Lock() |
25 | 29 | |
26 | | -def debug(msg): |
27 | | - global debugging |
28 | | - if debugging: |
29 | | - print >> sys.stderr, "DEBUG:", msg |
30 | | - |
31 | | - |
32 | 30 | class RingBuffer(deque): |
33 | 31 | """ |
34 | 32 | Implements TCP window like behavior |
— | — | @@ -106,56 +104,60 @@ |
107 | 105 | |
108 | 106 | checkhtcpseq(diagram, srcaddr[0]) |
109 | 107 | |
| 108 | +def update_sliding_counts(counts, maxlen=10000): |
| 109 | + "Implements a sliding window of counts" |
| 110 | + global slidingdeque, slidingcounts |
| 111 | + |
| 112 | + slidingcounts += counts |
| 113 | + slidingdeque.append(counts) |
| 114 | + |
| 115 | + if len(slidingdeque) > maxlen: |
| 116 | + slidingcounts -= slidingdeque.popleft() |
| 117 | + |
110 | 118 | def checkhtcpseq(diagram, srcaddr): |
111 | | - global sourcebuf, totalcounts |
| 119 | + global sourcebuf, totalcounts, slidingcounts, stats_lock |
112 | 120 | |
113 | 121 | transid = struct.unpack('!I', diagram[8:12])[0] |
114 | 122 | |
115 | | - sb = sourcebuf.setdefault(srcaddr, RingBuffer()) |
116 | | - try: |
117 | | - counts = sb.add(transid) |
118 | | - except IndexError: |
119 | | - pass |
120 | | - else: |
121 | | - totalcounts.update(counts) |
122 | | - if counts['lost']: |
123 | | - # Lost packets |
124 | | - print "%d lost packet(s) from %s, last id %d" % (counts['lost'], srcaddr, transid) |
125 | | - elif counts['ancient']: |
126 | | - print "Ancient packet from %s, id %d" % (srcaddr, transid) |
127 | | - |
128 | | - if counts['lost'] and sb.counts['dequeued']: |
129 | | - print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %s" % ( |
130 | | - sb.counts['lost'], |
131 | | - sb.counts['dequeued'], |
132 | | - float(sb.counts['lost'])*100/sb.counts['dequeued'], |
133 | | - sb.counts['outoforder'], |
134 | | - sb.counts['dups'], |
135 | | - sb.counts['ancient'], |
136 | | - sb.counts['received'], |
137 | | - srcaddr) |
138 | | - print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %d sources" % ( |
139 | | - totalcounts['lost'], |
140 | | - totalcounts['dequeued'], |
141 | | - float(totalcounts['lost'])*100/totalcounts['dequeued'], |
142 | | - totalcounts['outoforder'], |
143 | | - totalcounts['dups'], |
144 | | - totalcounts['ancient'], |
145 | | - totalcounts['received'], |
146 | | - len(sourcebuf.keys())) |
| 123 | + with stats_lock: # Critical section |
| 124 | + sb = sourcebuf.setdefault(srcaddr, RingBuffer()) |
| 125 | + try: |
| 126 | + counts = sb.add(transid) |
| 127 | + except IndexError: |
| 128 | + pass |
| 129 | + else: |
| 130 | + totalcounts.update(counts) |
| 131 | + update_sliding_counts(counts) |
| 132 | + |
| 133 | + # Don't bother printing stats if sys.stdout is set to None |
| 134 | + if not sys.stdout: return |
| 135 | + |
| 136 | + if counts['lost']: |
| 137 | + # Lost packets |
| 138 | + print "%d lost packet(s) from %s, last id %d" % (counts['lost'], srcaddr, transid) |
| 139 | + elif counts['ancient']: |
| 140 | + print "Ancient packet from %s, id %d" % (srcaddr, transid) |
| 141 | + |
| 142 | + if counts['lost'] and sb.counts['dequeued']: |
| 143 | + print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %s" % ( |
| 144 | + sb.counts['lost'], |
| 145 | + sb.counts['dequeued'], |
| 146 | + float(sb.counts['lost'])*100/sb.counts['dequeued'], |
| 147 | + sb.counts['outoforder'], |
| 148 | + sb.counts['dups'], |
| 149 | + sb.counts['ancient'], |
| 150 | + sb.counts['received'], |
| 151 | + srcaddr) |
| 152 | + print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %d sources" % ( |
| 153 | + slidingcounts['lost'], |
| 154 | + slidingcounts['dequeued'], |
| 155 | + float(slidingcounts['lost'])*100/slidingcounts['dequeued'], |
| 156 | + totalcounts['outoforder'], |
| 157 | + totalcounts['dups'], |
| 158 | + totalcounts['ancient'], |
| 159 | + totalcounts['received'], |
| 160 | + len(sourcebuf.keys())) |
147 | 161 | |
148 | | -def join_multicast_group(sock, multicast_group): |
149 | | - import struct |
150 | | - |
151 | | - ip_mreq = struct.pack('!4sl', socket.inet_aton(multicast_group), |
152 | | - socket.INADDR_ANY) |
153 | | - sock.setsockopt(socket.IPPROTO_IP, |
154 | | - socket.IP_ADD_MEMBERSHIP, |
155 | | - ip_mreq) |
156 | | - |
157 | | - # We do not want to see our own messages back |
158 | | - sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0) |
159 | | - |
160 | 162 | def print_help(): |
161 | 163 | print 'Usage:\n\thtcpseqcheck [ options ]\n' |
162 | 164 | print 'Options:' |
— | — | @@ -191,7 +193,7 @@ |
192 | 194 | elif option == '-g': |
193 | 195 | group = value |
194 | 196 | elif option == '-v': |
195 | | - debugging = True |
| 197 | + util.debugging = True |
196 | 198 | |
197 | 199 | try: |
198 | 200 | # Change uid and gid |
— | — | @@ -204,23 +206,25 @@ |
205 | 207 | |
206 | 208 | # Become a daemon |
207 | 209 | if daemon: |
208 | | - from util import createDaemon |
209 | | - createDaemon() |
| 210 | + util.createDaemon() |
210 | 211 | |
211 | | - # Open the UDP socket |
212 | | - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) |
213 | | - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
214 | | - sock.bind((host, portnr)) |
| 212 | + sock = util.open_htcp_socket(host, portnr) |
215 | 213 | |
216 | 214 | # Join a multicast group if requested |
217 | 215 | if multicast_group is not None: |
218 | 216 | debug('Joining multicast group ' + multicast_group) |
219 | | - join_multicast_group(sock, multicast_group) |
| 217 | + util.join_multicast_group(sock, multicast_group) |
220 | 218 | |
221 | | - # Multiplex everything that comes in |
| 219 | + # Start receiving HTCP packets |
222 | 220 | receive_htcp(sock) |
223 | 221 | except socket.error, msg: |
224 | 222 | print msg[1]; |
225 | 223 | except KeyboardInterrupt: |
226 | 224 | pass |
227 | 225 | |
| 226 | + |
| 227 | +# Ganglia gmond module support |
| 228 | +try: |
| 229 | + from htcpseqcheck_ganglia import metric_init, metric_cleanup |
| 230 | +except ImportError: |
| 231 | + pass |
\ No newline at end of file |
Index: trunk/udpmcast/htcpseqcheck_ganglia.py |
— | — | @@ -0,0 +1,149 @@ |
| 2 | +#!/usr/bin/env python |
| 3 | + |
| 4 | +# htcpseqcheck_ganglia.py |
| 5 | +# Ganglia gmond module integration |
| 6 | + |
| 7 | +import htcpseqcheck, util |
| 8 | +import threading, sys, socket, datetime |
| 9 | + |
| 10 | +from util import debug |
| 11 | + |
| 12 | +# Globals |
| 13 | +metrics = {} |
| 14 | + |
| 15 | +class HTCPSeqCheckThread(threading.Thread): |
| 16 | + |
| 17 | + name = "HTCPSeqCheck" |
| 18 | + daemon = True |
| 19 | + |
| 20 | + def run(self, kwargs={}): |
| 21 | + try: |
| 22 | + sock = util.open_htcp_socket(kwargs.get('host', ""), kwargs.get('port', 4827)) |
| 23 | + |
| 24 | + # Join a multicast group if requested |
| 25 | + if 'multicast_group' in kwargs: |
| 26 | + debug('Joining multicast group ' + kwargs['multicast_group']) |
| 27 | + util.join_multicast_group(sock, kwargs['multicast_group']) |
| 28 | + |
| 29 | + # Set sys.stdout to None; ganglia will do so anyway, and we |
| 30 | + # can detect this in htcpseqcheck. |
| 31 | + |
| 32 | + # Start receiving HTCP packets |
| 33 | + htcpseqcheck.receive_htcp(sock) |
| 34 | + except socket.error, msg: |
| 35 | + print >> sys.stderr, msg[1] |
| 36 | + sys.exit(1) |
| 37 | + |
| 38 | +def build_metrics_dict(): |
| 39 | + "Builds a dict of metric parameter dicts" |
| 40 | + |
| 41 | + metrics = { |
| 42 | + 'htcp_losspct': { |
| 43 | + 'value_type': "float", |
| 44 | + 'units': "%", |
| 45 | + 'format': "%.2f", |
| 46 | + 'slope': "both", |
| 47 | + 'description': "HTCP packet loss percentage", |
| 48 | + 'int_name': None, |
| 49 | + }, |
| 50 | + 'htcp_lost': { |
| 51 | + 'value_type': "uint", |
| 52 | + 'units': "packets/s", |
| 53 | + 'format': "%u", |
| 54 | + 'slope': "positive", |
| 55 | + 'description': "Lost HTCP packets", |
| 56 | + 'int_name': "lost", |
| 57 | + }, |
| 58 | + 'htcp_dequeued': { |
| 59 | + 'value_type': "uint", |
| 60 | + 'units': "packets/s", |
| 61 | + 'format': "%u", |
| 62 | + 'slope': "positive", |
| 63 | + 'description': "Dequeued HTCP packets", |
| 64 | + 'int_name': "dequeued", |
| 65 | + }, |
| 66 | + 'htcp_outoforder': { |
| 67 | + 'value_type': "uint", |
| 68 | + 'units': "packets/s", |
| 69 | + 'format': "%u", |
| 70 | + 'slope': "positive", |
| 71 | + 'description': "HTCP packets received out-of-order", |
| 72 | + 'int_name': "outoforder", |
| 73 | + }, |
| 74 | + 'htcp_dups': { |
| 75 | + 'value_type': "uint", |
| 76 | + 'units': "dups/s", |
| 77 | + 'format': "%u", |
| 78 | + 'slope': "positive", |
| 79 | + 'description': "Duplicate HTCP packets", |
| 80 | + 'int_name': "dups", |
| 81 | + }, |
| 82 | + 'htcp_ancient': { |
| 83 | + 'value_type': "uint", |
| 84 | + 'units': "packets/s", |
| 85 | + 'format': "%u", |
| 86 | + 'slope': "positive", |
| 87 | + 'description': "Ancient HTCP packets", |
| 88 | + 'int_name': "ancient", |
| 89 | + }, |
| 90 | + 'htcp_received': { |
| 91 | + 'value_type': "uint", |
| 92 | + 'units': "packets/s", |
| 93 | + 'format': "%u", |
| 94 | + 'slope': "positive", |
| 95 | + 'description': "Received HTCP packets", |
| 96 | + 'int_name': "received", |
| 97 | + }, |
| 98 | + 'htcp_sources': { |
| 99 | + 'value_type': "uint", |
| 100 | + 'units': "sources", |
| 101 | + 'format': "%u", |
| 102 | + 'slope': "both", |
| 103 | + 'description': "Unique HTCP senders", |
| 104 | + 'int_name': None, |
| 105 | + } |
| 106 | + } |
| 107 | + |
| 108 | + # Add common values |
| 109 | + for metricname, metric in metrics.iteritems(): |
| 110 | + metric.update({ |
| 111 | + 'name': metricname, |
| 112 | + 'call_back': metric_handler, |
| 113 | + 'time_max': 15, |
| 114 | + 'groups': "htcp" |
| 115 | + }) |
| 116 | + |
| 117 | + return metrics |
| 118 | + |
| 119 | +def metric_init(params): |
| 120 | + # gmond module initialization |
| 121 | + global metrics |
| 122 | + |
| 123 | + # Start HTCP metrics collection in a separate thread |
| 124 | + HTCPSeqCheckThread().start() |
| 125 | + |
| 126 | + metrics = build_metrics_dict() |
| 127 | + return list(metrics.values()) |
| 128 | + |
| 129 | +def metric_cleanup(params): |
| 130 | + pass |
| 131 | + |
| 132 | +def metric_handler(name): |
| 133 | + global metrics, silenceTime |
| 134 | + |
| 135 | + metric = metrics[name] |
| 136 | + |
| 137 | + try: |
| 138 | + with htcpseqcheck.stats_lock: # Critical section |
| 139 | + if name == "htcp_losspct": |
| 140 | + return float(htcpseqcheck.slidingcounts['lost']) / htcpseqcheck.slidingcounts['dequeued'] * 100 |
| 141 | + elif name == "htcp_sources": |
| 142 | + return len(htcpseqcheck.sourcebuf) |
| 143 | + else: |
| 144 | + return htcpseqcheck.totalcounts[metric['int_name']] |
| 145 | + except: |
| 146 | + return None |
| 147 | + |
| 148 | +if __name__ == '__main__': |
| 149 | + for metric in build_metrics_dict().itervalues(): |
| 150 | + print " metric {\n name = \"%(name)s\"\n title = \"%(description)s\"\n }\n" % metric |
\ No newline at end of file |
Property changes on: trunk/udpmcast/htcpseqcheck_ganglia.py |
___________________________________________________________________ |
Added: svn:mime-type |
1 | 151 | + text/plain |