r94108 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r94107‎ | r94108 | r94109 >
Date:14:34, 9 August 2011
Author:mark
Status:deferred
Tags:
Comment:
New version, implementing a window buffer for out-of-order packets
Modified paths:
  • /trunk/udpmcast/compat.py (added) (history)
  • /trunk/udpmcast/htcpseqcheck.py (modified) (history)

Diff [purge]

Index: trunk/udpmcast/compat.py
@@ -0,0 +1,186 @@
 2+# For Python 2.5/2.6
 3+
 4+from operator import itemgetter
 5+from heapq import nlargest
 6+from itertools import repeat, ifilter
 7+
 8+class Counter(dict):
 9+ '''Dict subclass for counting hashable objects. Sometimes called a bag
 10+ or multiset. Elements are stored as dictionary keys and their counts
 11+ are stored as dictionary values.
 12+
 13+ >>> Counter('zyzygy')
 14+ Counter({'y': 3, 'z': 2, 'g': 1})
 15+
 16+ '''
 17+
 18+ def __init__(self, iterable=None, **kwds):
 19+ '''Create a new, empty Counter object. And if given, count elements
 20+ from an input iterable. Or, initialize the count from another mapping
 21+ of elements to their counts.
 22+
 23+ >>> c = Counter() # a new, empty counter
 24+ >>> c = Counter('gallahad') # a new counter from an iterable
 25+ >>> c = Counter({'a': 4, 'b': 2}) # a new counter from a mapping
 26+ >>> c = Counter(a=4, b=2) # a new counter from keyword args
 27+
 28+ '''
 29+ self.update(iterable, **kwds)
 30+
 31+ def __missing__(self, key):
 32+ return 0
 33+
 34+ def most_common(self, n=None):
 35+ '''List the n most common elements and their counts from the most
 36+ common to the least. If n is None, then list all element counts.
 37+
 38+ >>> Counter('abracadabra').most_common(3)
 39+ [('a', 5), ('r', 2), ('b', 2)]
 40+
 41+ '''
 42+ if n is None:
 43+ return sorted(self.iteritems(), key=itemgetter(1), reverse=True)
 44+ return nlargest(n, self.iteritems(), key=itemgetter(1))
 45+
 46+ def elements(self):
 47+ '''Iterator over elements repeating each as many times as its count.
 48+
 49+ >>> c = Counter('ABCABC')
 50+ >>> sorted(c.elements())
 51+ ['A', 'A', 'B', 'B', 'C', 'C']
 52+
 53+ If an element's count has been set to zero or is a negative number,
 54+ elements() will ignore it.
 55+
 56+ '''
 57+ for elem, count in self.iteritems():
 58+ for _ in repeat(None, count):
 59+ yield elem
 60+
 61+ # Override dict methods where the meaning changes for Counter objects.
 62+
 63+ @classmethod
 64+ def fromkeys(cls, iterable, v=None):
 65+ raise NotImplementedError(
 66+ 'Counter.fromkeys() is undefined. Use Counter(iterable) instead.')
 67+
 68+ def update(self, iterable=None, **kwds):
 69+ '''Like dict.update() but add counts instead of replacing them.
 70+
 71+ Source can be an iterable, a dictionary, or another Counter instance.
 72+
 73+ >>> c = Counter('which')
 74+ >>> c.update('witch') # add elements from another iterable
 75+ >>> d = Counter('watch')
 76+ >>> c.update(d) # add elements from another counter
 77+ >>> c['h'] # four 'h' in which, witch, and watch
 78+ 4
 79+
 80+ '''
 81+ if iterable is not None:
 82+ if hasattr(iterable, 'iteritems'):
 83+ if self:
 84+ self_get = self.get
 85+ for elem, count in iterable.iteritems():
 86+ self[elem] = self_get(elem, 0) + count
 87+ else:
 88+ dict.update(self, iterable) # fast path when counter is empty
 89+ else:
 90+ self_get = self.get
 91+ for elem in iterable:
 92+ self[elem] = self_get(elem, 0) + 1
 93+ if kwds:
 94+ self.update(kwds)
 95+
 96+ def copy(self):
 97+ 'Like dict.copy() but returns a Counter instance instead of a dict.'
 98+ return Counter(self)
 99+
 100+ def __delitem__(self, elem):
 101+ 'Like dict.__delitem__() but does not raise KeyError for missing values.'
 102+ if elem in self:
 103+ dict.__delitem__(self, elem)
 104+
 105+ def __repr__(self):
 106+ if not self:
 107+ return '%s()' % self.__class__.__name__
 108+ items = ', '.join(map('%r: %r'.__mod__, self.most_common()))
 109+ return '%s({%s})' % (self.__class__.__name__, items)
 110+
 111+ # Multiset-style mathematical operations discussed in:
 112+ # Knuth TAOCP Volume II section 4.6.3 exercise 19
 113+ # and at http://en.wikipedia.org/wiki/Multiset
 114+ #
 115+ # Outputs guaranteed to only include positive counts.
 116+ #
 117+ # To strip negative and zero counts, add-in an empty counter:
 118+ # c += Counter()
 119+
 120+ def __add__(self, other):
 121+ '''Add counts from two counters.
 122+
 123+ >>> Counter('abbb') + Counter('bcc')
 124+ Counter({'b': 4, 'c': 2, 'a': 1})
 125+
 126+
 127+ '''
 128+ if not isinstance(other, Counter):
 129+ return NotImplemented
 130+ result = Counter()
 131+ for elem in set(self) | set(other):
 132+ newcount = self[elem] + other[elem]
 133+ if newcount > 0:
 134+ result[elem] = newcount
 135+ return result
 136+
 137+ def __sub__(self, other):
 138+ ''' Subtract count, but keep only results with positive counts.
 139+
 140+ >>> Counter('abbbc') - Counter('bccd')
 141+ Counter({'b': 2, 'a': 1})
 142+
 143+ '''
 144+ if not isinstance(other, Counter):
 145+ return NotImplemented
 146+ result = Counter()
 147+ for elem in set(self) | set(other):
 148+ newcount = self[elem] - other[elem]
 149+ if newcount > 0:
 150+ result[elem] = newcount
 151+ return result
 152+
 153+ def __or__(self, other):
 154+ '''Union is the maximum of value in either of the input counters.
 155+
 156+ >>> Counter('abbb') | Counter('bcc')
 157+ Counter({'b': 3, 'c': 2, 'a': 1})
 158+
 159+ '''
 160+ if not isinstance(other, Counter):
 161+ return NotImplemented
 162+ _max = max
 163+ result = Counter()
 164+ for elem in set(self) | set(other):
 165+ newcount = _max(self[elem], other[elem])
 166+ if newcount > 0:
 167+ result[elem] = newcount
 168+ return result
 169+
 170+ def __and__(self, other):
 171+ ''' Intersection is the minimum of corresponding counts.
 172+
 173+ >>> Counter('abbb') & Counter('bcc')
 174+ Counter({'b': 1})
 175+
 176+ '''
 177+ if not isinstance(other, Counter):
 178+ return NotImplemented
 179+ _min = min
 180+ result = Counter()
 181+ if len(self) < len(other):
 182+ self, other = other, self
 183+ for elem in ifilter(self.__contains__, other):
 184+ newcount = _min(self[elem], other[elem])
 185+ if newcount > 0:
 186+ result[elem] = newcount
 187+ return result
Property changes on: trunk/udpmcast/compat.py
___________________________________________________________________
Added: svn:mime-type
1188 + text/plain
Index: trunk/udpmcast/htcpseqcheck.py
@@ -7,12 +7,94 @@
88 # $Id$
99
1010 import socket, getopt, sys, os, signal, pwd, grp, struct
 11+
 12+from datetime import datetime, timedelta
 13+from collections import deque
1114
 15+try:
 16+ from collections import Counter
 17+except ImportError:
 18+ from compat import Counter
 19+
 20+# Globals
 21+
1222 debugging = False
13 -sourceseq = {}
14 -loss, total = 0, 0
15 -sources = []
 23+sourcebuf = {}
 24+totalcounts = Counter()
1625
 26+def debug(msg):
 27+ global debugging
 28+ if debugging:
 29+ print >> sys.stderr, "DEBUG:", msg
 30+
 31+
 32+class RingBuffer(deque):
 33+ """
 34+ Implements TCP window like behavior
 35+ """
 36+
 37+ def __init__(self, iterable=[], maxlen=None, buffersize=timedelta(seconds=5)):
 38+ self.counts = Counter()
 39+ self.buffersize = buffersize
 40+
 41+ deque.__init__(self, iterable, maxlen)
 42+
 43+ def add(self, seqnr):
 44+ """
 45+ Expects a sequence nr and adds it to the ringbuffer
 46+ """
 47+
 48+ ts = datetime.utcnow()
 49+ counts = Counter()
 50+ try:
 51+ headseq, tailseq = self[0][0], self[-1][0]
 52+ except IndexError:
 53+ headseq, tailseq = seqnr-1, seqnr-1
 54+
 55+ try:
 56+ if seqnr == tailseq + 1:
 57+ # Normal case, in-order arrival
 58+ self.append((seqnr, ts, True))
 59+ debug("Appended seqnr %d, timestamp %s" % (seqnr, ts))
 60+ elif seqnr > tailseq + 1:
 61+ # Packet(s) missing, fill the gap
 62+ for seq in range(tailseq+1, seqnr):
 63+ self.append((seq, ts, False))
 64+ self.append((seqnr, ts, True))
 65+ debug("Filled gap of %d packets before new packet seqnr %d, timestamp %s" % (seqnr-tailseq-1, seqnr, ts))
 66+ elif seqnr < headseq:
 67+ counts['ancient'] += 1
 68+ elif seqnr <= tailseq:
 69+ # Late packet
 70+ assert self[seqnr-headseq][0] == seqnr # Incorrect seqnr?
 71+
 72+ if self[seqnr-headseq][2]:
 73+ counts['dups'] += 1 # Already exists
 74+ debug("Duplicate packet %d" % seqnr)
 75+ else:
 76+ # Store with original timestamp
 77+ self[seqnr-headseq] = (seqnr, self[seqnr-headseq][1], True)
 78+ counts['outoforder'] += 1
 79+ debug("Inserted late packet %d, timestamp %s" % (seqnr, ts))
 80+ except:
 81+ raise
 82+ else:
 83+ counts['received'] += 1
 84+ # Purge old packets
 85+ self.deque(ts, counts)
 86+ return counts
 87+
 88+ def deque(self, now=datetime.utcnow(), counts=Counter()):
 89+ while self and self[0][1] < now - self.buffersize:
 90+ packet = self.popleft()
 91+ counts['dequeued'] += 1
 92+ debug("Dequeued packet id %d, timestamp %s, received %s" % packet)
 93+ if not packet[2]:
 94+ counts['lost'] += 1
 95+
 96+ self.counts.update(counts)
 97+
 98+
1799 def createDaemon():
18100 """
19101 Detach a process from the controlling terminal and run it in the
@@ -82,16 +164,11 @@
83165
84166 # Redirect the standard file descriptors to /dev/null.
85167 os.open("/dev/null", os.O_RDONLY) # standard input (0)
86 - os.open("/dev/null", os.O_RDWR) # standard output (1)
87 - os.open("/dev/null", os.O_RDWR) # standard error (2)
 168+ os.open("/dev/null", os.O_RDWR) # standard output (1)
 169+ os.open("/dev/null", os.O_RDWR) # standard error (2)
88170
89171 return(0)
90172
91 -def debug(msg):
92 - global debugging
93 - if debugging:
94 - print msg;
95 -
96173 def receive_htcp(sock):
97174 portnr = sock.getsockname()[1];
98175
@@ -102,24 +179,42 @@
103180 checkhtcpseq(diagram, srcaddr[0])
104181
105182 def checkhtcpseq(diagram, srcaddr):
106 - global sourceseq, loss, total, sources
 183+ global sourcebuf, totalcounts
107184
108 - sources.append(srcaddr)
109185 transid = struct.unpack('!I', diagram[8:12])[0]
 186+
 187+ sb = sourcebuf.setdefault(srcaddr, RingBuffer())
110188 try:
111 - diff = transid - sourceseq[srcaddr]
112 - except:
113 - return
 189+ counts = sb.add(transid)
 190+ except IndexError:
 191+ pass
114192 else:
115 - total += diff
116 - if diff != 1:
117 - loss += diff-1
118 - print "Out of order or", diff-1, "missing packet(s) from", srcaddr, "id", transid, "last received id was", sourceseq[srcaddr]
119 - print "Last 10 sources:", " ".join(sources[-10:])
120 - print "%d/%d losses (%f%%), %d sources" % (loss, total, float(loss)*100/total, len(sourceseq.keys()))
121 - finally:
122 - sourceseq[srcaddr] = transid
123 - if total % 100 == 0: sources = sources[-10:]
 193+ totalcounts.update(counts)
 194+ if counts['lost']:
 195+ # Lost packets
 196+ print "%d lost packet(s) from %s, last id %d" % (counts['lost'], srcaddr, transid)
 197+ elif counts['ancient']:
 198+ print "Ancient packet from %s, id %d, latest id was %d" % (srcaddr, transid, sb[-1][0])
 199+
 200+ if counts['lost'] and sb.counts['dequeued']:
 201+ print "%d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %s" % (
 202+ sb.counts['lost'],
 203+ sb.counts['dequeued'],
 204+ float(sb.counts['lost'])*100/sb.counts['dequeued'],
 205+ sb.counts['outoforder'],
 206+ sb.counts['dups'],
 207+ sb.counts['ancient'],
 208+ sb.counts['received'],
 209+ srcaddr)
 210+ print "Totals: %d/%d losses (%.2f%%), %d out-of-order, %d dups, %d ancient, %d received from %d sources" % (
 211+ totalcounts['lost'],
 212+ totalcounts['dequeued'],
 213+ float(totalcounts['lost'])*100/totalcounts['dequeued'],
 214+ totalcounts['outoforder'],
 215+ totalcounts['dups'],
 216+ totalcounts['ancient'],
 217+ totalcounts['received'],
 218+ len(sourcebuf.keys()))
124219
125220 def join_multicast_group(sock, multicast_group):
126221 import struct
@@ -185,8 +280,8 @@
186281
187282 # Open the UDP socket
188283 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
189 - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
190 - sock.bind((host, portnr))
 284+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 285+ sock.bind((host, portnr))
191286
192287 # Join a multicast group if requested
193288 if multicast_group is not None:

Status & tagging log