Index: trunk/tools/dbzip2/dbzip2 |
— | — | @@ -18,7 +18,6 @@ |
19 | 19 | # 2006-05-12 |
20 | 20 | |
21 | 21 | # TODO: |
22 | | -# Remote compressors |
23 | 22 | # Use a thread pool? |
24 | 23 | # Selectable remote threads |
25 | 24 | # Selectable block size |
— | — | @@ -28,10 +27,15 @@ |
29 | 28 | |
30 | 29 | import bz2 |
31 | 30 | import getopt |
| 31 | +import random |
| 32 | +import socket |
| 33 | +import struct |
32 | 34 | import sys |
33 | 35 | import thread |
34 | 36 | import time |
35 | 37 | |
| 38 | +import DistBits |
| 39 | + |
36 | 40 | class Compressor(object): |
37 | 41 | def __init__(self, args): |
38 | 42 | self.inputStream = sys.stdin |
— | — | @@ -45,6 +49,8 @@ |
46 | 50 | self.bytesRead = 0L |
47 | 51 | self.bytesWritten = 0L |
48 | 52 | |
| 53 | + self.remotes = [] |
| 54 | + |
49 | 55 | self.queue = [] |
50 | 56 | self.done = False |
51 | 57 | self.threadLock = thread.allocate_lock() |
— | — | @@ -52,13 +58,22 @@ |
53 | 59 | self.processArgs(args) |
54 | 60 | |
55 | 61 | def processArgs(self, args): |
56 | | - (options, remainder) = getopt.getopt(args, "vp:") |
| 62 | + (options, remainder) = getopt.getopt(args, "vp:r:") |
57 | 63 | for (opt, val) in options: |
58 | 64 | if opt == "-v": |
59 | 65 | self.verbosity += 1 |
60 | 66 | elif opt == "-p": |
61 | 67 | self.threads = int(val) |
| 68 | + elif opt == "-r": |
| 69 | + self.remotes.append(self.splitHost(val)) |
62 | 70 | |
| 71 | + def splitHost(self, val): |
| 72 | + if ":" in val: |
| 73 | + (host, port) = val.split(":") |
| 74 | + return (host, int(port)) |
| 75 | + else: |
| 76 | + return (val, 12345) |
| 77 | + |
63 | 78 | def debug(self, level, text): |
64 | 79 | if self.verbosity >= level: |
65 | 80 | sys.stderr.write(text + "\n") |
— | — | @@ -160,12 +175,20 @@ |
161 | 176 | |
162 | 177 | def remoteThread(self, block, buffer): |
163 | 178 | """Worker thread: send a block to a foreign server and receive data.""" |
164 | | - stream = RemoteCompressor() |
165 | | - stream.send(block) |
166 | | - data = stream.receive() |
| 179 | + compressor = self.pickCompressor() |
| 180 | + data = compressor.compress(block) |
167 | 181 | self.debug(4, "remoteThread: got data!") |
168 | 182 | buffer.set(data) |
169 | 183 | # After it's been fulfilled, the writer thread will dequeue and write it. |
| 184 | + |
| 185 | + def pickCompressor(self): |
| 186 | + numRemotes = len(self.remotes) |
| 187 | + if numRemotes: |
| 188 | + index = random.randint(0, numRemotes - 1) |
| 189 | + return RemoteCompressor(self.remotes[index]) |
| 190 | + else: |
| 191 | + # Need to take a local... |
| 192 | + return LocalCompressor() |
170 | 193 | |
171 | 194 | |
172 | 195 | class QueuedBuffer(object): |
— | — | @@ -186,22 +209,43 @@ |
187 | 210 | self.contents = data |
188 | 211 | |
189 | 212 | |
190 | | -class RemoteCompressor(object): |
| 213 | +class LocalCompressor(object): |
191 | 214 | """For initial testing, we just compress locally.""" |
192 | 215 | |
193 | | - def __init__(self): |
194 | | - """FIXME: need a method for selecting a server, somewhere.""" |
195 | | - self.input = None |
| 216 | + def algo(self, algo): |
| 217 | + assert algo == "bzip2" |
196 | 218 | |
197 | | - def send(self, block): |
198 | | - """Send a block of uncompressed data to the remote site.""" |
199 | | - self.input = block |
| 219 | + def compress(self, block): |
| 220 | + return bz2.compress(block) |
200 | 221 | |
201 | | - def receive(self): |
202 | | - """Wait for the remote site to compress, and return its work.""" |
203 | | - assert self.input is not None |
204 | | - return bz2.compress(self.input) |
| 222 | + def close(self): |
| 223 | + pass |
205 | 224 | |
| 225 | +class RemoteCompressor(object): |
| 226 | + def __init__(self, address): |
| 227 | + """Address is a (host, port) tuple.""" |
| 228 | + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 229 | + self.socket.connect(address) |
| 230 | + self.connection = DistBits.Connection(self.socket) |
| 231 | + self.connection.send("COMP", struct.pack(">l", 1)) |
| 232 | + self.connection.send("ALGO", "bzip2") |
| 233 | + |
| 234 | + def compress(self, data): |
| 235 | + self.connection.send("HUGE", data) |
| 236 | + (atom, retdata) = self.connection.receive() |
| 237 | + if atom == "SMAL": |
| 238 | + return retdata |
| 239 | + elif atom == "EROR": |
| 240 | + raise Exception(data) |
| 241 | + else: |
| 242 | + raise Exception("Unknown return atom type") |
| 243 | + |
| 244 | + def close(self): |
| 245 | + self.connection.send("CLOS") |
| 246 | + self.connection.close() |
| 247 | + self.socket.close() |
| 248 | + self.connection = None |
| 249 | + |
206 | 250 | if __name__ == "__main__": |
207 | 251 | compressor = Compressor(sys.argv[1:]) |
208 | 252 | compressor.run() |
Index: trunk/tools/dbzip2/DistBits.py |
— | — | @@ -0,0 +1,78 @@ |
| 2 | +import socket |
| 3 | +import struct |
| 4 | + |
| 5 | +# |
| 6 | +# Packet format: |
| 7 | +# [atom] 4-byte ASCII token identifier |
| 8 | +# [len ] 32-bit big-endian integer with size of remaining data. Must be <2GB |
| 9 | +# [data] variable length depending on len. if zero, not present. |
| 10 | +# |
| 11 | +# Client sends: |
| 12 | +# COMP <version> |
| 13 | +# Requests remote compression. Version is 32-bit big-endian integer, |
| 14 | +# should be 1. Unsupported version on the server should drop or return |
| 15 | +# error. |
| 16 | +# ALGO <algorithm> |
| 17 | +# "bzip2": Create a full bzip2 stream. Use data size as block size. |
| 18 | +# HUGE <data> |
| 19 | +# Uncompressed input data; always the last packet. After this, wait |
| 20 | +# for response. You may issue multiple such requests as long as the |
| 21 | +# connection remains open. |
| 22 | +# CLOS <no data> |
| 23 | +# Close the connection. (Optional?) |
| 24 | +# |
| 25 | +# Server sends back one of: |
| 26 | +# SMAL <data> |
| 27 | +# Compressed output data. |
| 28 | +# -or- |
| 29 | +# EROR <error string> |
| 30 | +# Some error condition which can be reported gracefully. |
| 31 | +# |
| 32 | + |
| 33 | +class Connection(object): |
| 34 | + def __init__(self, socket): |
| 35 | + self.stream = socket.makefile("rw") |
| 36 | + |
| 37 | + def close(self): |
| 38 | + self.stream.close() |
| 39 | + self.stream = None |
| 40 | + |
| 41 | + def send(self, atom, data=""): |
| 42 | + assert len(atom) == 4 |
| 43 | + |
| 44 | + header = struct.pack(">4sl", atom, len(data)) |
| 45 | + assert len(header) == 8 |
| 46 | + #header = "%4s%08xd" % (atom, len(data)) |
| 47 | + #assert len(header) == 12 |
| 48 | + |
| 49 | + self.stream.write(header) |
| 50 | + if len(data): |
| 51 | + self.stream.write(data) |
| 52 | + self.stream.flush() |
| 53 | + |
| 54 | + def receive(self): |
| 55 | + header = self.stream.read(8) |
| 56 | + #print "Read: '%s'" % header |
| 57 | + |
| 58 | + if header == "": |
| 59 | + # Connection closed |
| 60 | + return (None, None) |
| 61 | + |
| 62 | + assert len(header) == 8 |
| 63 | + |
| 64 | + (atom, size) = struct.unpack(">4sl", header) |
| 65 | + #atom = header[0:4] |
| 66 | + #size = int(header[4:12], 16) |
| 67 | + assert len(atom) == 4 |
| 68 | + assert size >= 0 |
| 69 | + |
| 70 | + if size > 0: |
| 71 | + data = self.stream.read(size) |
| 72 | + else: |
| 73 | + data = "" |
| 74 | + assert len(data) == size |
| 75 | + |
| 76 | + return (atom, data) |
| 77 | + |
| 78 | + def isOpen(self): |
| 79 | + return self.stream is not None |
Property changes on: trunk/tools/dbzip2/DistBits.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 80 | + native |
Index: trunk/tools/dbzip2/dbzip2d |
— | — | @@ -0,0 +1,65 @@ |
| 2 | +#!/usr/bin/python |
| 3 | + |
| 4 | +import bz2 |
| 5 | +import getopt |
| 6 | +import struct |
| 7 | +import sys |
| 8 | +import thread |
| 9 | +import time |
| 10 | + |
| 11 | +import DistBits |
| 12 | +from SocketServer import BaseRequestHandler, ForkingTCPServer |
| 13 | + |
| 14 | +class CompressorHandler(BaseRequestHandler): |
| 15 | + def debug(self, level, text): |
| 16 | + print text |
| 17 | + |
| 18 | + def setup(self): |
| 19 | + self.connection = DistBits.Connection(self.request) |
| 20 | + self.version = None |
| 21 | + self.algo = None |
| 22 | + |
| 23 | + def handle(self): |
| 24 | + self.debug(2, "Opened connection") |
| 25 | + |
| 26 | + handlers = { |
| 27 | + "COMP": self.handleComp, |
| 28 | + "ALGO": self.handleAlgo, |
| 29 | + "HUGE": self.handleHuge, |
| 30 | + "CLOS": self.handleClos } |
| 31 | + (atom, data) = self.connection.receive() |
| 32 | + while atom: |
| 33 | + if atom: |
| 34 | + self.debug(3, "Received %s atom, %d bytes." % (atom, len(data))) |
| 35 | + assert atom in handlers |
| 36 | + handlers[atom](data) |
| 37 | + else: |
| 38 | + self.debug(3, "End of connection.") |
| 39 | + (atom, data) = self.connection.receive() |
| 40 | + |
| 41 | + def handleComp(self, data): |
| 42 | + assert self.version is None |
| 43 | + assert len(data) == 4 |
| 44 | + self.version = struct.unpack(">l", data)[0] |
| 45 | + assert self.version == 1 |
| 46 | + |
| 47 | + def handleAlgo(self, data): |
| 48 | + assert self.version is not None |
| 49 | + assert data == "bzip2" |
| 50 | + self.algo = data |
| 51 | + |
| 52 | + def handleHuge(self, data): |
| 53 | + assert self.version is not None |
| 54 | + self.connection.send("SMAL", self.compress(data)) |
| 55 | + |
| 56 | + def handleClos(self, data): |
| 57 | + self.connection.close() |
| 58 | + |
| 59 | + def compress(self, data): |
| 60 | + assert self.algo == "bzip2" |
| 61 | + return bz2.compress(data) |
| 62 | + |
| 63 | +if __name__ == "__main__": |
| 64 | + port = 12345 |
| 65 | + server = ForkingTCPServer(("", port), CompressorHandler) |
| 66 | + server.serve_forever() |
Property changes on: trunk/tools/dbzip2/dbzip2d |
___________________________________________________________________ |
Added: svn:executable |
1 | 67 | + * |