Index: trunk/udpmcast/compat.py |
— | — | @@ -0,0 +1,186 @@ |
| 2 | +# For Python 2.5/2.6 |
| 3 | + |
| 4 | +from operator import itemgetter |
| 5 | +from heapq import nlargest |
| 6 | +from itertools import repeat, ifilter |
| 7 | + |
| 8 | +class Counter(dict): |
| 9 | + '''Dict subclass for counting hashable objects. Sometimes called a bag |
| 10 | + or multiset. Elements are stored as dictionary keys and their counts |
| 11 | + are stored as dictionary values. |
| 12 | + |
| 13 | + >>> Counter('zyzygy') |
| 14 | + Counter({'y': 3, 'z': 2, 'g': 1}) |
| 15 | + |
| 16 | + ''' |
| 17 | + |
| 18 | + def __init__(self, iterable=None, **kwds): |
| 19 | + '''Create a new, empty Counter object. And if given, count elements |
| 20 | + from an input iterable. Or, initialize the count from another mapping |
| 21 | + of elements to their counts. |
| 22 | + |
| 23 | + >>> c = Counter() # a new, empty counter |
| 24 | + >>> c = Counter('gallahad') # a new counter from an iterable |
| 25 | + >>> c = Counter({'a': 4, 'b': 2}) # a new counter from a mapping |
| 26 | + >>> c = Counter(a=4, b=2) # a new counter from keyword args |
| 27 | + |
| 28 | + ''' |
| 29 | + self.update(iterable, **kwds) |
| 30 | + |
| 31 | + def __missing__(self, key): |
| 32 | + return 0 |
| 33 | + |
| 34 | + def most_common(self, n=None): |
| 35 | + '''List the n most common elements and their counts from the most |
| 36 | + common to the least. If n is None, then list all element counts. |
| 37 | + |
| 38 | + >>> Counter('abracadabra').most_common(3) |
| 39 | + [('a', 5), ('r', 2), ('b', 2)] |
| 40 | + |
| 41 | + ''' |
| 42 | + if n is None: |
| 43 | + return sorted(self.iteritems(), key=itemgetter(1), reverse=True) |
| 44 | + return nlargest(n, self.iteritems(), key=itemgetter(1)) |
| 45 | + |
| 46 | + def elements(self): |
| 47 | + '''Iterator over elements repeating each as many times as its count. |
| 48 | + |
| 49 | + >>> c = Counter('ABCABC') |
| 50 | + >>> sorted(c.elements()) |
| 51 | + ['A', 'A', 'B', 'B', 'C', 'C'] |
| 52 | + |
| 53 | + If an element's count has been set to zero or is a negative number, |
| 54 | + elements() will ignore it. |
| 55 | + |
| 56 | + ''' |
| 57 | + for elem, count in self.iteritems(): |
| 58 | + for _ in repeat(None, count): |
| 59 | + yield elem |
| 60 | + |
| 61 | + # Override dict methods where the meaning changes for Counter objects. |
| 62 | + |
| 63 | + @classmethod |
| 64 | + def fromkeys(cls, iterable, v=None): |
| 65 | + raise NotImplementedError( |
| 66 | + 'Counter.fromkeys() is undefined. Use Counter(iterable) instead.') |
| 67 | + |
| 68 | + def update(self, iterable=None, **kwds): |
| 69 | + '''Like dict.update() but add counts instead of replacing them. |
| 70 | + |
| 71 | + Source can be an iterable, a dictionary, or another Counter instance. |
| 72 | + |
| 73 | + >>> c = Counter('which') |
| 74 | + >>> c.update('witch') # add elements from another iterable |
| 75 | + >>> d = Counter('watch') |
| 76 | + >>> c.update(d) # add elements from another counter |
| 77 | + >>> c['h'] # four 'h' in which, witch, and watch |
| 78 | + 4 |
| 79 | + |
| 80 | + ''' |
| 81 | + if iterable is not None: |
| 82 | + if hasattr(iterable, 'iteritems'): |
| 83 | + if self: |
| 84 | + self_get = self.get |
| 85 | + for elem, count in iterable.iteritems(): |
| 86 | + self[elem] = self_get(elem, 0) + count |
| 87 | + else: |
| 88 | + dict.update(self, iterable) # fast path when counter is empty |
| 89 | + else: |
| 90 | + self_get = self.get |
| 91 | + for elem in iterable: |
| 92 | + self[elem] = self_get(elem, 0) + 1 |
| 93 | + if kwds: |
| 94 | + self.update(kwds) |
| 95 | + |
| 96 | + def copy(self): |
| 97 | + 'Like dict.copy() but returns a Counter instance instead of a dict.' |
| 98 | + return Counter(self) |
| 99 | + |
| 100 | + def __delitem__(self, elem): |
| 101 | + 'Like dict.__delitem__() but does not raise KeyError for missing values.' |
| 102 | + if elem in self: |
| 103 | + dict.__delitem__(self, elem) |
| 104 | + |
| 105 | + def __repr__(self): |
| 106 | + if not self: |
| 107 | + return '%s()' % self.__class__.__name__ |
| 108 | + items = ', '.join(map('%r: %r'.__mod__, self.most_common())) |
| 109 | + return '%s({%s})' % (self.__class__.__name__, items) |
| 110 | + |
| 111 | + # Multiset-style mathematical operations discussed in: |
| 112 | + # Knuth TAOCP Volume II section 4.6.3 exercise 19 |
| 113 | + # and at http://en.wikipedia.org/wiki/Multiset |
| 114 | + # |
| 115 | + # Outputs guaranteed to only include positive counts. |
| 116 | + # |
| 117 | + # To strip negative and zero counts, add-in an empty counter: |
| 118 | + # c += Counter() |
| 119 | + |
| 120 | + def __add__(self, other): |
| 121 | + '''Add counts from two counters. |
| 122 | + |
| 123 | + >>> Counter('abbb') + Counter('bcc') |
| 124 | + Counter({'b': 4, 'c': 2, 'a': 1}) |
| 125 | + |
| 126 | + |
| 127 | + ''' |
| 128 | + if not isinstance(other, Counter): |
| 129 | + return NotImplemented |
| 130 | + result = Counter() |
| 131 | + for elem in set(self) | set(other): |
| 132 | + newcount = self[elem] + other[elem] |
| 133 | + if newcount > 0: |
| 134 | + result[elem] = newcount |
| 135 | + return result |
| 136 | + |
| 137 | + def __sub__(self, other): |
| 138 | + ''' Subtract count, but keep only results with positive counts. |
| 139 | + |
| 140 | + >>> Counter('abbbc') - Counter('bccd') |
| 141 | + Counter({'b': 2, 'a': 1}) |
| 142 | + |
| 143 | + ''' |
| 144 | + if not isinstance(other, Counter): |
| 145 | + return NotImplemented |
| 146 | + result = Counter() |
| 147 | + for elem in set(self) | set(other): |
| 148 | + newcount = self[elem] - other[elem] |
| 149 | + if newcount > 0: |
| 150 | + result[elem] = newcount |
| 151 | + return result |
| 152 | + |
| 153 | + def __or__(self, other): |
| 154 | + '''Union is the maximum of value in either of the input counters. |
| 155 | + |
| 156 | + >>> Counter('abbb') | Counter('bcc') |
| 157 | + Counter({'b': 3, 'c': 2, 'a': 1}) |
| 158 | + |
| 159 | + ''' |
| 160 | + if not isinstance(other, Counter): |
| 161 | + return NotImplemented |
| 162 | + _max = max |
| 163 | + result = Counter() |
| 164 | + for elem in set(self) | set(other): |
| 165 | + newcount = _max(self[elem], other[elem]) |
| 166 | + if newcount > 0: |
| 167 | + result[elem] = newcount |
| 168 | + return result |
| 169 | + |
| 170 | + def __and__(self, other): |
| 171 | + ''' Intersection is the minimum of corresponding counts. |
| 172 | + |
| 173 | + >>> Counter('abbb') & Counter('bcc') |
| 174 | + Counter({'b': 1}) |
| 175 | + |
| 176 | + ''' |
| 177 | + if not isinstance(other, Counter): |
| 178 | + return NotImplemented |
| 179 | + _min = min |
| 180 | + result = Counter() |
| 181 | + if len(self) < len(other): |
| 182 | + self, other = other, self |
| 183 | + for elem in ifilter(self.__contains__, other): |
| 184 | + newcount = _min(self[elem], other[elem]) |
| 185 | + if newcount > 0: |
| 186 | + result[elem] = newcount |
| 187 | + return result |
Property changes on: trunk/udpmcast/compat.py |
___________________________________________________________________ |
Added: svn:mime-type |
1 | 188 | + text/plain |
Index: trunk/udpmcast/htcpseqcheck.py |
— | — | @@ -7,12 +7,94 @@ |
8 | 8 | # $Id$ |
9 | 9 | |
10 | 10 | import socket, getopt, sys, os, signal, pwd, grp, struct |
| 11 | + |
| 12 | +from datetime import datetime, timedelta |
| 13 | +from collections import deque |
11 | 14 | |
| 15 | +try: |
| 16 | + from collections import Counter |
| 17 | +except ImportError: |
| 18 | + from compat import Counter |
| 19 | + |
| 20 | +# Globals |
| 21 | + |
12 | 22 | debugging = False |
13 | | -sourceseq = {} |
14 | | -loss, total = 0, 0 |
15 | | -sources = [] |
| 23 | +sourcebuf = {} |
| 24 | +totalcounts = Counter() |
16 | 25 | |
| 26 | +def debug(msg): |
| 27 | + global debugging |
| 28 | + if debugging: |
| 29 | + print >> sys.stderr, "DEBUG:", msg |
| 30 | + |
| 31 | + |
| 32 | +class RingBuffer(deque): |
| 33 | + """ |
| 34 | + Implements TCP window like behavior |
| 35 | + """ |
| 36 | + |
| 37 | + def __init__(self, iterable=[], maxlen=None, buffersize=timedelta(seconds=5)): |
| 38 | + self.counts = Counter() |
| 39 | + self.buffersize = buffersize |
| 40 | + |
| 41 | + deque.__init__(self, iterable, maxlen) |
| 42 | + |
| 43 | + def add(self, seqnr): |
| 44 | + """ |
| 45 | + Expects a sequence nr and adds it to the ringbuffer |
| 46 | + """ |
| 47 | + |
| 48 | + ts = datetime.utcnow() |
| 49 | + counts = Counter() |
| 50 | + try: |
| 51 | + headseq, tailseq = self[0][0], self[-1][0] |
| 52 | + except IndexError: |
| 53 | + headseq, tailseq = seqnr-1, seqnr-1 |
| 54 | + |
| 55 | + try: |
| 56 | + if seqnr == tailseq + 1: |
| 57 | + # Normal case, in-order arrival |
| 58 | + self.append((seqnr, ts, True)) |
| 59 | + debug("Appended seqnr %d, timestamp %s" % (seqnr, ts)) |
| 60 | + elif seqnr > tailseq + 1: |
| 61 | + # Packet(s) missing, fill the gap |
| 62 | + for seq in range(tailseq+1, seqnr): |
| 63 | + self.append((seq, ts, False)) |
| 64 | + self.append((seqnr, ts, True)) |
| 65 | + debug("Filled gap of %d packets before new packet seqnr %d, timestamp %s" % (seqnr-tailseq-1, seqnr, ts)) |
| 66 | + elif seqnr < headseq: |
| 67 | + counts['ancient'] += 1 |
| 68 | + elif seqnr <= tailseq: |
| 69 | + # Late packet |
| 70 | + assert self[seqnr-headseq][0] == seqnr # Incorrect seqnr? |
| 71 | + |
| 72 | + if self[seqnr-headseq][2]: |
| 73 | + counts['dups'] += 1 # Already exists |
| 74 | + debug("Duplicate packet %d" % seqnr) |
| 75 | + else: |
| 76 | + # Store with original timestamp |
| 77 | + self[seqnr-headseq] = (seqnr, self[seqnr-headseq][1], True) |
| 78 | + counts['outoforder'] += 1 |
| 79 | + debug("Inserted late packet %d, timestamp %s" % (seqnr, ts)) |
| 80 | + except: |
| 81 | + raise |
| 82 | + else: |
| 83 | + counts['received'] += 1 |
| 84 | + # Purge old packets |
| 85 | + self.deque(ts, counts) |
| 86 | + return counts |
| 87 | + |
| 88 | + def deque(self, now=datetime.utcnow(), counts=Counter()): |
| 89 | + while self and self[0][1] < now - self.buffersize: |
| 90 | + packet = self.popleft() |
| 91 | + counts['dequeued'] += 1 |
| 92 | + debug("Dequeued packet id %d, timestamp %s, received %s" % packet) |
| 93 | + if not packet[2]: |
| 94 | + counts['lost'] += 1 |
| 95 | + |
| 96 | + self.counts.update(counts) |
| 97 | + |
| 98 | + |
17 | 99 | def createDaemon(): |
18 | 100 | """ |
19 | 101 | Detach a process from the controlling terminal and run it in the |
— | — | @@ -82,16 +164,11 @@ |
83 | 165 | |
84 | 166 | # Redirect the standard file descriptors to /dev/null. |
85 | 167 | os.open("/dev/null", os.O_RDONLY) # standard input (0) |
86 | | - os.open("/dev/null", os.O_RDWR) # standard output (1) |
87 | | - os.open("/dev/null", os.O_RDWR) # standard error (2) |
| 168 | + os.open("/dev/null", os.O_RDWR) # standard output (1) |
| 169 | + os.open("/dev/null", os.O_RDWR) # standard error (2) |
88 | 170 | |
89 | 171 | return(0) |
90 | 172 | |
91 | | -def debug(msg): |
92 | | - global debugging |
93 | | - if debugging: |
94 | | - print msg; |
95 | | - |
96 | 173 | def receive_htcp(sock): |
97 | 174 | portnr = sock.getsockname()[1]; |
98 | 175 | |
— | — | @@ -102,24 +179,42 @@ |
103 | 180 | checkhtcpseq(diagram, srcaddr[0]) |
104 | 181 | |
105 | 182 | def checkhtcpseq(diagram, srcaddr): |
106 | | - global sourceseq, loss, total, sources |
| 183 | + global sourcebuf, totalcounts |
107 | 184 | |
108 | | - sources.append(srcaddr) |
109 | 185 | transid = struct.unpack('!I', diagram[8:12])[0] |
| 186 | + |
| 187 | + sb = sourcebuf.setdefault(srcaddr, RingBuffer()) |
110 | 188 | try: |
111 | | - diff = transid - sourceseq[srcaddr] |
112 | | - except: |
113 | | - return |
| 189 | + counts = sb.add(transid) |
| 190 | + except IndexError: |
| 191 | + pass |
114 | 192 | else: |
115 | | - total += diff |
116 | | - if diff != 1: |
117 | | - loss += diff-1 |
118 | | - print "Out of order or", diff-1, "missing packet(s) from", srcaddr, "id", transid, "last received id was", sourceseq[srcaddr] |
119 | | - print "Last 10 sources:", " ".join(sources[-10:]) |
120 | | - print "%d/%d losses (%f%%), %d sources" % (loss, total, float(loss)*100/total, len(sourceseq.keys())) |
121 | | - finally: |
122 | | - sourceseq[srcaddr] = transid |
123 | | - if total % 100 == 0: sources = sources[-10:] |
| 193 | + totalcounts.update(counts) |
| 194 | + if counts['lost']: |
| 195 | + # Lost packets |
| 196 | + print "%d lost packet(s) from %s, last id %d" % (counts['lost'], srcaddr, transid) |
| 197 | + elif counts['ancient']: |
| 198 | + print "Ancient packet from %s, id %d, latest id was %d" % (srcaddr, transid, sb[-1][0]) |
| 199 | + |
| 200 | + if counts['lost'] and sb.counts['dequeued']: |
| 201 | + print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %s" % ( |
| 202 | + sb.counts['lost'], |
| 203 | + sb.counts['dequeued'], |
| 204 | + float(sb.counts['lost'])*100/sb.counts['dequeued'], |
| 205 | + sb.counts['outoforder'], |
| 206 | + sb.counts['dups'], |
| 207 | + sb.counts['ancient'], |
| 208 | + sb.counts['received'], |
| 209 | + srcaddr) |
| 210 | + print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %d sources" % ( |
| 211 | + totalcounts['lost'], |
| 212 | + totalcounts['dequeued'], |
| 213 | + float(totalcounts['lost'])*100/totalcounts['dequeued'], |
| 214 | + totalcounts['outoforder'], |
| 215 | + totalcounts['dups'], |
| 216 | + totalcounts['ancient'], |
| 217 | + totalcounts['received'], |
| 218 | + len(sourcebuf.keys())) |
124 | 219 | |
125 | 220 | def join_multicast_group(sock, multicast_group): |
126 | 221 | import struct |
— | — | @@ -185,8 +280,8 @@ |
186 | 281 | |
187 | 282 | # Open the UDP socket |
188 | 283 | sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) |
189 | | - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
190 | | - sock.bind((host, portnr)) |
| 284 | + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 285 | + sock.bind((host, portnr)) |
191 | 286 | |
192 | 287 | # Join a multicast group if requested |
193 | 288 | if multicast_group is not None: |