r14213 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r14212‎ | r14213 | r14214 >
Date:03:56, 14 May 2006
Author:brion
Status:old
Tags:
Comment:
Initial crappy POC remote compressor demon. No error handling, probably sucks so far.
Modified paths:
  • /trunk/tools/dbzip2/DistBits.py (added) (history)
  • /trunk/tools/dbzip2/dbzip2 (modified) (history)
  • /trunk/tools/dbzip2/dbzip2d (added) (history)

Diff [purge]

Index: trunk/tools/dbzip2/dbzip2
@@ -18,7 +18,6 @@
1919 # 2006-05-12
2020
2121 # TODO:
22 -# Remote compressors
2322 # Use a thread pool?
2423 # Selectable remote threads
2524 # Selectable block size
@@ -28,10 +27,15 @@
2928
3029 import bz2
3130 import getopt
 31+import random
 32+import socket
 33+import struct
3234 import sys
3335 import thread
3436 import time
3537
 38+import DistBits
 39+
3640 class Compressor(object):
3741 def __init__(self, args):
3842 self.inputStream = sys.stdin
@@ -45,6 +49,8 @@
4650 self.bytesRead = 0L
4751 self.bytesWritten = 0L
4852
 53+ self.remotes = []
 54+
4955 self.queue = []
5056 self.done = False
5157 self.threadLock = thread.allocate_lock()
@@ -52,13 +58,22 @@
5359 self.processArgs(args)
5460
5561 def processArgs(self, args):
56 - (options, remainder) = getopt.getopt(args, "vp:")
 62+ (options, remainder) = getopt.getopt(args, "vp:r:")
5763 for (opt, val) in options:
5864 if opt == "-v":
5965 self.verbosity += 1
6066 elif opt == "-p":
6167 self.threads = int(val)
 68+ elif opt == "-r":
 69+ self.remotes.append(self.splitHost(val))
6270
 71+ def splitHost(self, val):
 72+ if ":" in val:
 73+ (host, port) = val.split(":")
 74+ return (host, int(port))
 75+ else:
 76+ return (val, 12345)
 77+
6378 def debug(self, level, text):
6479 if self.verbosity >= level:
6580 sys.stderr.write(text + "\n")
@@ -160,12 +175,20 @@
161176
162177 def remoteThread(self, block, buffer):
163178 """Worker thread: send a block to a foreign server and receive data."""
164 - stream = RemoteCompressor()
165 - stream.send(block)
166 - data = stream.receive()
 179+ compressor = self.pickCompressor()
 180+ data = compressor.compress(block)
167181 self.debug(4, "remoteThread: got data!")
168182 buffer.set(data)
169183 # After it's been fulfilled, the writer thread will dequeue and write it.
 184+
 185+ def pickCompressor(self):
 186+ numRemotes = len(self.remotes)
 187+ if numRemotes:
 188+ index = random.randint(0, numRemotes - 1)
 189+ return RemoteCompressor(self.remotes[index])
 190+ else:
 191+ # Need to take a local...
 192+ return LocalCompressor()
170193
171194
172195 class QueuedBuffer(object):
@@ -186,22 +209,43 @@
187210 self.contents = data
188211
189212
190 -class RemoteCompressor(object):
 213+class LocalCompressor(object):
191214 """For initial testing, we just compress locally."""
192215
193 - def __init__(self):
194 - """FIXME: need a method for selecting a server, somewhere."""
195 - self.input = None
 216+ def algo(self, algo):
 217+ assert algo == "bzip2"
196218
197 - def send(self, block):
198 - """Send a block of uncompressed data to the remote site."""
199 - self.input = block
 219+ def compress(self, block):
 220+ return bz2.compress(block)
200221
201 - def receive(self):
202 - """Wait for the remote site to compress, and return its work."""
203 - assert self.input is not None
204 - return bz2.compress(self.input)
 222+ def close(self):
 223+ pass
205224
 225+class RemoteCompressor(object):
 226+ def __init__(self, address):
 227+ """Address is a (host, port) tuple."""
 228+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 229+ self.socket.connect(address)
 230+ self.connection = DistBits.Connection(self.socket)
 231+ self.connection.send("COMP", struct.pack(">l", 1))
 232+ self.connection.send("ALGO", "bzip2")
 233+
 234+ def compress(self, data):
 235+ self.connection.send("HUGE", data)
 236+ (atom, retdata) = self.connection.receive()
 237+ if atom == "SMAL":
 238+ return retdata
 239+ elif atom == "EROR":
 240+ raise Exception(data)
 241+ else:
 242+ raise Exception("Unknown return atom type")
 243+
 244+ def close(self):
 245+ self.connection.send("CLOS")
 246+ self.connection.close()
 247+ self.socket.close()
 248+ self.connection = None
 249+
206250 if __name__ == "__main__":
207251 compressor = Compressor(sys.argv[1:])
208252 compressor.run()
Index: trunk/tools/dbzip2/DistBits.py
@@ -0,0 +1,78 @@
 2+import socket
 3+import struct
 4+
 5+#
 6+# Packet format:
 7+# [atom] 4-byte ASCII token identifier
 8+# [len ] 32-bit big-endian integer with size of remaining data. Must be <2GB
 9+# [data] variable length depending on len. if zero, not present.
 10+#
 11+# Client sends:
 12+# COMP <version>
 13+# Requests remote compression. Version is 32-bit big-endian integer,
 14+# should be 1. Unsupported version on the server should drop or return
 15+# error.
 16+# ALGO <algorithm>
 17+# "bzip2": Create a full bzip2 stream. Use data size as block size.
 18+# HUGE <data>
 19+# Uncompressed input data; always the last packet. After this, wait
 20+# for response. You may issue multiple such requests as long as the
 21+# connection remains open.
 22+# CLOS <no data>
 23+# Close the connection. (Optional?)
 24+#
 25+# Server sends back one of:
 26+# SMAL <data>
 27+# Compressed output data.
 28+# -or-
 29+# EROR <error string>
 30+# Some error condition which can be reported gracefully.
 31+#
 32+
 33+class Connection(object):
 34+ def __init__(self, socket):
 35+ self.stream = socket.makefile("rw")
 36+
 37+ def close(self):
 38+ self.stream.close()
 39+ self.stream = None
 40+
 41+ def send(self, atom, data=""):
 42+ assert len(atom) == 4
 43+
 44+ header = struct.pack(">4sl", atom, len(data))
 45+ assert len(header) == 8
 46+ #header = "%4s%08xd" % (atom, len(data))
 47+ #assert len(header) == 12
 48+
 49+ self.stream.write(header)
 50+ if len(data):
 51+ self.stream.write(data)
 52+ self.stream.flush()
 53+
 54+ def receive(self):
 55+ header = self.stream.read(8)
 56+ #print "Read: '%s'" % header
 57+
 58+ if header == "":
 59+ # Connection closed
 60+ return (None, None)
 61+
 62+ assert len(header) == 8
 63+
 64+ (atom, size) = struct.unpack(">4sl", header)
 65+ #atom = header[0:4]
 66+ #size = int(header[4:12], 16)
 67+ assert len(atom) == 4
 68+ assert size >= 0
 69+
 70+ if size > 0:
 71+ data = self.stream.read(size)
 72+ else:
 73+ data = ""
 74+ assert len(data) == size
 75+
 76+ return (atom, data)
 77+
 78+ def isOpen(self):
 79+ return self.stream is not None
Property changes on: trunk/tools/dbzip2/DistBits.py
___________________________________________________________________
Added: svn:eol-style
180 + native
Index: trunk/tools/dbzip2/dbzip2d
@@ -0,0 +1,65 @@
 2+#!/usr/bin/python
 3+
 4+import bz2
 5+import getopt
 6+import struct
 7+import sys
 8+import thread
 9+import time
 10+
 11+import DistBits
 12+from SocketServer import BaseRequestHandler, ForkingTCPServer
 13+
 14+class CompressorHandler(BaseRequestHandler):
 15+ def debug(self, level, text):
 16+ print text
 17+
 18+ def setup(self):
 19+ self.connection = DistBits.Connection(self.request)
 20+ self.version = None
 21+ self.algo = None
 22+
 23+ def handle(self):
 24+ self.debug(2, "Opened connection")
 25+
 26+ handlers = {
 27+ "COMP": self.handleComp,
 28+ "ALGO": self.handleAlgo,
 29+ "HUGE": self.handleHuge,
 30+ "CLOS": self.handleClos }
 31+ (atom, data) = self.connection.receive()
 32+ while atom:
 33+ if atom:
 34+ self.debug(3, "Received %s atom, %d bytes." % (atom, len(data)))
 35+ assert atom in handlers
 36+ handlers[atom](data)
 37+ else:
 38+ self.debug(3, "End of connection.")
 39+ (atom, data) = self.connection.receive()
 40+
 41+ def handleComp(self, data):
 42+ assert self.version is None
 43+ assert len(data) == 4
 44+ self.version = struct.unpack(">l", data)[0]
 45+ assert self.version == 1
 46+
 47+ def handleAlgo(self, data):
 48+ assert self.version is not None
 49+ assert data == "bzip2"
 50+ self.algo = data
 51+
 52+ def handleHuge(self, data):
 53+ assert self.version is not None
 54+ self.connection.send("SMAL", self.compress(data))
 55+
 56+ def handleClos(self, data):
 57+ self.connection.close()
 58+
 59+ def compress(self, data):
 60+ assert self.algo == "bzip2"
 61+ return bz2.compress(data)
 62+
 63+if __name__ == "__main__":
 64+ port = 12345
 65+ server = ForkingTCPServer(("", port), CompressorHandler)
 66+ server.serve_forever()
Property changes on: trunk/tools/dbzip2/dbzip2d
___________________________________________________________________
Added: svn:executable
167 + *

Status & tagging log