Index: trunk/dbzip2/dbzip2 |
— | — | @@ -8,7 +8,6 @@ |
9 | 9 | # 2006-05-12 |
10 | 10 | |
11 | 11 | # TODO: |
12 | | -# Selectable block size |
13 | 12 | # Handle remote failures gracefully |
14 | 13 | # Accept file input/output, behavior like bzip2 |
15 | 14 | |
— | — | @@ -36,6 +35,7 @@ |
37 | 36 | self.threads = 1 # Number of local threads to start. |
38 | 37 | self.remotes = [] |
39 | 38 | self.verbosity = 0 |
| 39 | + self.blockSize100k = 9 |
40 | 40 | |
41 | 41 | self.crc = 0L |
42 | 42 | |
— | — | @@ -55,21 +55,23 @@ |
56 | 56 | self.processArgs(args) |
57 | 57 | |
58 | 58 | def processArgs(self, args): |
59 | | - (options, remainder) = getopt.getopt(args, "vp:r:") |
| 59 | + (options, remainder) = getopt.getopt(args, "123456789p:r:v") |
60 | 60 | for (opt, val) in options: |
61 | | - if opt == "-v": |
62 | | - self.verbosity += 1 |
| 61 | + if opt >= "-1" and opt <= "-9": |
| 62 | + self.blockSize100k = int(opt[1]) |
| 63 | + elif opt == "-r": |
| 64 | + self.remotes.append(self.splitHost(val)) |
63 | 65 | elif opt == "-p": |
64 | 66 | self.threads = int(val) |
65 | | - elif opt == "-r": |
66 | | - self.remotes.append(self.splitHost(val)) |
| 67 | + elif opt == "-v": |
| 68 | + self.verbosity += 1 |
67 | 69 | |
68 | 70 | def splitHost(self, val): |
69 | 71 | if ":" in val: |
70 | 72 | (host, port) = val.split(":") |
71 | 73 | return (host, int(port)) |
72 | 74 | else: |
73 | | - return (val, 12345) |
| 75 | + return (val, 16986) |
74 | 76 | |
75 | 77 | def debug(self, level, text): |
76 | 78 | if self.verbosity >= level: |
— | — | @@ -78,9 +80,9 @@ |
79 | 81 | def run(self): |
80 | 82 | """Start up the threads and goooo!""" |
81 | 83 | for i in range(0, self.threads): |
82 | | - self.compressors.append(LocalCompressor()) |
| 84 | + self.compressors.append(LocalCompressor(self.blockSize100k)) |
83 | 85 | for addr in self.remotes: |
84 | | - self.compressors.append(RemoteCompressor(addr)) |
| 86 | + self.compressors.append(RemoteCompressor(addr, self.blockSize100k)) |
85 | 87 | assert len(self.compressors) >= 1 |
86 | 88 | |
87 | 89 | start_new_thread(self.readerThread, ()) |
— | — | @@ -104,7 +106,7 @@ |
105 | 107 | def readerThread(self): |
106 | 108 | """Producer thread: run through the file handing out blocks.""" |
107 | 109 | self.debug(2, "readerThread: starting!") |
108 | | - dbzutil.readblock(self.inputStream, self.dispatch) |
| 110 | + dbzutil.readblock(self.inputStream, self.dispatch, self.blockSize100k) |
109 | 111 | self.done = True |
110 | 112 | self.debug(2, "readerThread: done; read %d blocks" % self.blocksRead) |
111 | 113 | |
— | — | @@ -199,7 +201,6 @@ |
200 | 202 | |
201 | 203 | assert buffer.output is not None |
202 | 204 | assert buffer.index == self.blocksWritten |
203 | | - #self.outputStream.write(buffer.output) |
204 | 205 | |
205 | 206 | (offset, overflow, crc) = findBzTrail(buffer.output) |
206 | 207 | self.debug(2, "writeBuffer: block stream crc %08x, offset %d bits" % |
— | — | @@ -220,7 +221,7 @@ |
221 | 222 | def writeHeader(self): |
222 | 223 | self.debug(4, "writing file header") |
223 | 224 | # hardcoded 900k blocksize |
224 | | - self.bitStream.write("BZh9") |
| 225 | + self.bitStream.write("BZh" + str(self.blockSize100k)) |
225 | 226 | |
226 | 227 | def writeTrailer(self): |
227 | 228 | self.debug(4, "writing file trailer, combined CRC %08x" % self.crc) |
— | — | @@ -289,13 +290,13 @@ |
290 | 291 | |
291 | 292 | |
292 | 293 | class LocalCompressor(object): |
293 | | - """For initial testing, we just compress locally.""" |
| 294 | + """Compression tasks to run on a local thread.""" |
294 | 295 | |
295 | | - def algo(self, algo): |
296 | | - assert algo == "bzip2" |
| 296 | + def __init__(self, blockSize100k): |
| 297 | + self.blockSize100k = blockSize100k |
297 | 298 | |
298 | 299 | def compress(self, block): |
299 | | - return bz2.compress(block) |
| 300 | + return bz2.compress(block, self.blockSize100k) |
300 | 301 | |
301 | 302 | def close(self): |
302 | 303 | pass |
— | — | @@ -304,14 +305,18 @@ |
305 | 306 | return "local thread" |
306 | 307 | |
307 | 308 | class RemoteCompressor(object): |
308 | | - def __init__(self, address): |
| 309 | + def __init__(self, address, blockSize100k): |
309 | 310 | """Address is a (host, port) tuple.""" |
310 | 311 | self.address = address |
| 312 | + self.blockSize100k = blockSize100k |
| 313 | + |
311 | 314 | self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
312 | 315 | self.socket.connect(address) |
| 316 | + |
313 | 317 | self.connection = DistBits.Connection(self.socket) |
314 | 318 | self.connection.send("COMP", struct.pack(">l", 1)) |
315 | 319 | self.connection.send("ALGO", "bzip2") |
| 320 | + self.connection.send("BLOK", str(blockSize100k)) |
316 | 321 | |
317 | 322 | def compress(self, data): |
318 | 323 | self.connection.send("HUGE", data) |
Index: trunk/dbzip2/DistBits.py |
— | — | @@ -13,11 +13,16 @@ |
14 | 14 | # should be 1. Unsupported version on the server should drop or return |
15 | 15 | # error. |
16 | 16 | # ALGO <algorithm> |
17 | | -# "bzip2": Create a full bzip2 stream. Use data size as block size. |
| 17 | +# "bzip2": Create a full bzip2 stream. Default block size is 900k. |
| 18 | +# BLOK <size in 100k> |
| 19 | +# Optional; "1" through "9" to select bzip2 block size. |
18 | 20 | # HUGE <data> |
19 | | -# Uncompressed input data; always the last packet. After this, wait |
20 | | -# for response. You may issue multiple such requests as long as the |
21 | | -# connection remains open. |
| 21 | +# Uncompressed input data. For each HUGE atom sent by the client, the |
| 22 | +# server will return a SMAL atom containing compressed data. You may |
| 23 | +# issue multiple such requests as long as the connection remains open; |
| 24 | +# the last set ALGO and BLOK settings continue to apply. |
| 25 | +# Multiple requests may be pipelined if the sides support it, but this |
| 26 | +# is not required; both clients and servers may block on each request. |
22 | 27 | # CLOS <no data> |
23 | 28 | # Close the connection. (Optional?) |
24 | 29 | # |
Index: trunk/dbzip2/dbzip2d |
— | — | @@ -1,7 +1,10 @@ |
2 | 2 | #!/usr/bin/python |
3 | 3 | |
| 4 | +# Not to be confused with dbzip3d... |
| 5 | + |
4 | 6 | import bz2 |
5 | 7 | import getopt |
| 8 | +import os |
6 | 9 | import struct |
7 | 10 | import sys |
8 | 11 | import thread |
— | — | @@ -10,14 +13,23 @@ |
11 | 14 | import DistBits |
12 | 15 | from SocketServer import BaseRequestHandler, ForkingTCPServer |
13 | 16 | |
| 17 | +listen = "" |
| 18 | +port = 16986 # "BZ" |
| 19 | +daemonize = False |
| 20 | +user = None |
| 21 | +verbosity = 0 |
| 22 | +pidFileName = None |
| 23 | + |
14 | 24 | class CompressorHandler(BaseRequestHandler): |
15 | 25 | def debug(self, level, text): |
16 | | - print text |
| 26 | + if verbosity >= level: |
| 27 | + sys.stderr.write(text + "\n") |
17 | 28 | |
18 | 29 | def setup(self): |
19 | 30 | self.connection = DistBits.Connection(self.request) |
20 | 31 | self.version = None |
21 | 32 | self.algo = None |
| 33 | + self.blockSize100k = None |
22 | 34 | |
23 | 35 | def handle(self): |
24 | 36 | self.debug(2, "Opened connection") |
— | — | @@ -25,17 +37,18 @@ |
26 | 38 | handlers = { |
27 | 39 | "COMP": self.handleComp, |
28 | 40 | "ALGO": self.handleAlgo, |
| 41 | + "BLOK": self.handleBlok, |
29 | 42 | "HUGE": self.handleHuge, |
30 | 43 | "CLOS": self.handleClos } |
31 | | - (atom, data) = self.connection.receive() |
32 | | - while atom: |
33 | | - if atom: |
34 | | - self.debug(3, "Received %s atom, %d bytes." % (atom, len(data))) |
35 | | - assert atom in handlers |
36 | | - handlers[atom](data) |
| 44 | + while True: |
| 45 | + if self.connection.isOpen(): |
| 46 | + (atom, data) = self.connection.receive() |
37 | 47 | else: |
38 | 48 | self.debug(3, "End of connection.") |
39 | | - (atom, data) = self.connection.receive() |
| 49 | + break |
| 50 | + self.debug(3, "Received %s atom, %d bytes." % (atom, len(data))) |
| 51 | + assert atom in handlers |
| 52 | + handlers[atom](data) |
40 | 53 | |
41 | 54 | def handleComp(self, data): |
42 | 55 | assert self.version is None |
— | — | @@ -47,7 +60,15 @@ |
48 | 61 | assert self.version is not None |
49 | 62 | assert data == "bzip2" |
50 | 63 | self.algo = data |
| 64 | + self.blockSize100k = 9 |
51 | 65 | |
| 66 | + def handleBlok(self, data): |
| 67 | + assert self.version is not None |
| 68 | + assert self.algo == "bzip2" |
| 69 | + assert len(data) == 1 |
| 70 | + assert data >= "1" and data <= "9" |
| 71 | + self.blockSize100k = int(data) |
| 72 | + |
52 | 73 | def handleHuge(self, data): |
53 | 74 | assert self.version is not None |
54 | 75 | self.connection.send("SMAL", self.compress(data)) |
— | — | @@ -57,9 +78,47 @@ |
58 | 79 | |
59 | 80 | def compress(self, data): |
60 | 81 | assert self.algo == "bzip2" |
61 | | - return bz2.compress(data) |
| 82 | + assert self.blockSize100k >= 1 and self.blockSize100k <= 9 |
| 83 | + return bz2.compress(data, self.blockSize100k) |
62 | 84 | |
63 | 85 | if __name__ == "__main__": |
64 | | - port = 12345 |
65 | | - server = ForkingTCPServer(("", port), CompressorHandler) |
| 86 | + |
| 87 | + (options, remainder) = getopt.getopt(sys.argv[1:], "dl:p:u:v", ["pid-file="]) |
| 88 | + for (opt, val) in options: |
| 89 | + if opt == "-d": |
| 90 | + daemonize = True |
| 91 | + elif opt == "-l": |
| 92 | + listen = val |
| 93 | + elif opt == "-p": |
| 94 | + port = int(val) |
| 95 | + elif opt == "-u": |
| 96 | + user = val |
| 97 | + elif opt == "-v": |
| 98 | + verbosity += 1 |
| 99 | + elif opt == "--pid-file": |
| 100 | + pidFileName = val |
| 101 | + |
| 102 | + if daemonize: |
| 103 | + if os.fork(): sys.exit(0) |
| 104 | + os.setsid() |
| 105 | + if os.fork(): sys.exit(0) |
| 106 | + |
| 107 | + # Fixme: work this crap out |
| 108 | + #os.chdir("/") |
| 109 | + #os.umask(0) |
| 110 | + #os.close(sys.stdin.fileno()) |
| 111 | + #sys.stdin = open("/dev/null", "r") |
| 112 | + #os.close(sys.stdout.fileno()) |
| 113 | + #sys.stdout = open("/dev/null", "a") |
| 114 | + #os.close(sys.stderr.fileno()) |
| 115 | + #sys.stderr = open("/dev/null", "a") |
| 116 | + |
| 117 | + if pidFileName: |
| 118 | + if os.path.exists(pidFileName): |
| 119 | + os.path.unlink(pidFileName) |
| 120 | + pidFile = file(pidFileName, "w") |
| 121 | + pidFile.write(str(os.getpid())) |
| 122 | + pidFile.close() |
| 123 | + |
| 124 | + server = ForkingTCPServer((listen, port), CompressorHandler) |
66 | 125 | server.serve_forever() |