r14487 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r14486‎ | r14487 | r14488 >
Date:23:04, 30 May 2006
Author:brion
Status:old
Tags:
Comment:
Use port 16986 ("BZ") by default instead of 12345
Allow user-selectable block size with -1 through -9 options ("BLOK" atom in net protocol)
Fix connection-close bug in network daemon
Modified paths:
  • /trunk/dbzip2/DistBits.py (modified) (history)
  • /trunk/dbzip2/dbzip2 (modified) (history)
  • /trunk/dbzip2/dbzip2d (modified) (history)

Diff [purge]

Index: trunk/dbzip2/dbzip2
@@ -8,7 +8,6 @@
99 # 2006-05-12
1010
1111 # TODO:
12 -# Selectable block size
1312 # Handle remote failures gracefully
1413 # Accept file input/output, behavior like bzip2
1514
@@ -36,6 +35,7 @@
3736 self.threads = 1 # Number of local threads to start.
3837 self.remotes = []
3938 self.verbosity = 0
 39+ self.blockSize100k = 9
4040
4141 self.crc = 0L
4242
@@ -55,21 +55,23 @@
5656 self.processArgs(args)
5757
5858 def processArgs(self, args):
59 - (options, remainder) = getopt.getopt(args, "vp:r:")
 59+ (options, remainder) = getopt.getopt(args, "123456789p:r:v")
6060 for (opt, val) in options:
61 - if opt == "-v":
62 - self.verbosity += 1
 61+ if opt >= "-1" and opt <= "-9":
 62+ self.blockSize100k = int(opt[1])
 63+ elif opt == "-r":
 64+ self.remotes.append(self.splitHost(val))
6365 elif opt == "-p":
6466 self.threads = int(val)
65 - elif opt == "-r":
66 - self.remotes.append(self.splitHost(val))
 67+ elif opt == "-v":
 68+ self.verbosity += 1
6769
6870 def splitHost(self, val):
6971 if ":" in val:
7072 (host, port) = val.split(":")
7173 return (host, int(port))
7274 else:
73 - return (val, 12345)
 75+ return (val, 16986)
7476
7577 def debug(self, level, text):
7678 if self.verbosity >= level:
@@ -78,9 +80,9 @@
7981 def run(self):
8082 """Start up the threads and goooo!"""
8183 for i in range(0, self.threads):
82 - self.compressors.append(LocalCompressor())
 84+ self.compressors.append(LocalCompressor(self.blockSize100k))
8385 for addr in self.remotes:
84 - self.compressors.append(RemoteCompressor(addr))
 86+ self.compressors.append(RemoteCompressor(addr, self.blockSize100k))
8587 assert len(self.compressors) >= 1
8688
8789 start_new_thread(self.readerThread, ())
@@ -104,7 +106,7 @@
105107 def readerThread(self):
106108 """Producer thread: run through the file handing out blocks."""
107109 self.debug(2, "readerThread: starting!")
108 - dbzutil.readblock(self.inputStream, self.dispatch)
 110+ dbzutil.readblock(self.inputStream, self.dispatch, self.blockSize100k)
109111 self.done = True
110112 self.debug(2, "readerThread: done; read %d blocks" % self.blocksRead)
111113
@@ -199,7 +201,6 @@
200202
201203 assert buffer.output is not None
202204 assert buffer.index == self.blocksWritten
203 - #self.outputStream.write(buffer.output)
204205
205206 (offset, overflow, crc) = findBzTrail(buffer.output)
206207 self.debug(2, "writeBuffer: block stream crc %08x, offset %d bits" %
@@ -220,7 +221,7 @@
221222 def writeHeader(self):
222223 self.debug(4, "writing file header")
223224 # hardcoded 900k blocksize
224 - self.bitStream.write("BZh9")
 225+ self.bitStream.write("BZh" + str(self.blockSize100k))
225226
226227 def writeTrailer(self):
227228 self.debug(4, "writing file trailer, combined CRC %08x" % self.crc)
@@ -289,13 +290,13 @@
290291
291292
292293 class LocalCompressor(object):
293 - """For initial testing, we just compress locally."""
 294+ """Compression tasks to run on a local thread."""
294295
295 - def algo(self, algo):
296 - assert algo == "bzip2"
 296+ def __init__(self, blockSize100k):
 297+ self.blockSize100k = blockSize100k
297298
298299 def compress(self, block):
299 - return bz2.compress(block)
 300+ return bz2.compress(block, self.blockSize100k)
300301
301302 def close(self):
302303 pass
@@ -304,14 +305,18 @@
305306 return "local thread"
306307
307308 class RemoteCompressor(object):
308 - def __init__(self, address):
 309+ def __init__(self, address, blockSize100k):
309310 """Address is a (host, port) tuple."""
310311 self.address = address
 312+ self.blockSize100k = blockSize100k
 313+
311314 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
312315 self.socket.connect(address)
 316+
313317 self.connection = DistBits.Connection(self.socket)
314318 self.connection.send("COMP", struct.pack(">l", 1))
315319 self.connection.send("ALGO", "bzip2")
 320+ self.connection.send("BLOK", str(blockSize100k))
316321
317322 def compress(self, data):
318323 self.connection.send("HUGE", data)
Index: trunk/dbzip2/DistBits.py
@@ -13,11 +13,16 @@
1414 # should be 1. Unsupported version on the server should drop or return
1515 # error.
1616 # ALGO <algorithm>
17 -# "bzip2": Create a full bzip2 stream. Use data size as block size.
 17+# "bzip2": Create a full bzip2 stream. Default block size is 900k.
 18+# BLOK <size in 100k>
 19+# Optional; "1" through "9" to select bzip2 block size.
1820 # HUGE <data>
19 -# Uncompressed input data; always the last packet. After this, wait
20 -# for response. You may issue multiple such requests as long as the
21 -# connection remains open.
 21+# Uncompressed input data. For each HUGE atom sent by the client, the
 22+# server will return a SMAL atom containing compressed data. You may
 23+# issue multiple such requests as long as the connection remains open;
 24+# the last set ALGO and BLOK settings continue to apply.
 25+# Multiple requests may be pipelined if the sides support it, but this
 26+# is not required; both clients and servers may block on each request.
2227 # CLOS <no data>
2328 # Close the connection. (Optional?)
2429 #
Index: trunk/dbzip2/dbzip2d
@@ -1,7 +1,10 @@
22 #!/usr/bin/python
33
 4+# Not to be confused with dbzip3d...
 5+
46 import bz2
57 import getopt
 8+import os
69 import struct
710 import sys
811 import thread
@@ -10,14 +13,23 @@
1114 import DistBits
1215 from SocketServer import BaseRequestHandler, ForkingTCPServer
1316
 17+listen = ""
 18+port = 16986 # "BZ"
 19+daemonize = False
 20+user = None
 21+verbosity = 0
 22+pidFileName = None
 23+
1424 class CompressorHandler(BaseRequestHandler):
1525 def debug(self, level, text):
16 - print text
 26+ if verbosity >= level:
 27+ sys.stderr.write(text + "\n")
1728
1829 def setup(self):
1930 self.connection = DistBits.Connection(self.request)
2031 self.version = None
2132 self.algo = None
 33+ self.blockSize100k = None
2234
2335 def handle(self):
2436 self.debug(2, "Opened connection")
@@ -25,17 +37,18 @@
2638 handlers = {
2739 "COMP": self.handleComp,
2840 "ALGO": self.handleAlgo,
 41+ "BLOK": self.handleBlok,
2942 "HUGE": self.handleHuge,
3043 "CLOS": self.handleClos }
31 - (atom, data) = self.connection.receive()
32 - while atom:
33 - if atom:
34 - self.debug(3, "Received %s atom, %d bytes." % (atom, len(data)))
35 - assert atom in handlers
36 - handlers[atom](data)
 44+ while True:
 45+ if self.connection.isOpen():
 46+ (atom, data) = self.connection.receive()
3747 else:
3848 self.debug(3, "End of connection.")
39 - (atom, data) = self.connection.receive()
 49+ break
 50+ self.debug(3, "Received %s atom, %d bytes." % (atom, len(data)))
 51+ assert atom in handlers
 52+ handlers[atom](data)
4053
4154 def handleComp(self, data):
4255 assert self.version is None
@@ -47,7 +60,15 @@
4861 assert self.version is not None
4962 assert data == "bzip2"
5063 self.algo = data
 64+ self.blockSize100k = 9
5165
 66+ def handleBlok(self, data):
 67+ assert self.version is not None
 68+ assert self.algo == "bzip2"
 69+ assert len(data) == 1
 70+ assert data >= "1" and data <= "9"
 71+ self.blockSize100k = int(data)
 72+
5273 def handleHuge(self, data):
5374 assert self.version is not None
5475 self.connection.send("SMAL", self.compress(data))
@@ -57,9 +78,47 @@
5879
5980 def compress(self, data):
6081 assert self.algo == "bzip2"
61 - return bz2.compress(data)
 82+ assert self.blockSize100k >= 1 and self.blockSize100k <= 9
 83+ return bz2.compress(data, self.blockSize100k)
6284
6385 if __name__ == "__main__":
64 - port = 12345
65 - server = ForkingTCPServer(("", port), CompressorHandler)
 86+
 87+ (options, remainder) = getopt.getopt(sys.argv[1:], "dl:p:u:v", ["pid-file="])
 88+ for (opt, val) in options:
 89+ if opt == "-d":
 90+ daemonize = True
 91+ elif opt == "-l":
 92+ listen = val
 93+ elif opt == "-p":
 94+ port = int(val)
 95+ elif opt == "-u":
 96+ user = val
 97+ elif opt == "-v":
 98+ verbosity += 1
 99+ elif opt == "--pid-file":
 100+ pidFileName = val
 101+
 102+ if daemonize:
 103+ if os.fork(): sys.exit(0)
 104+ os.setsid()
 105+ if os.fork(): sys.exit(0)
 106+
 107+ # Fixme: work this crap out
 108+ #os.chdir("/")
 109+ #os.umask(0)
 110+ #os.close(sys.stdin.fileno())
 111+ #sys.stdin = open("/dev/null", "r")
 112+ #os.close(sys.stdout.fileno())
 113+ #sys.stdout = open("/dev/null", "a")
 114+ #os.close(sys.stderr.fileno())
 115+ #sys.stderr = open("/dev/null", "a")
 116+
 117+ if pidFileName:
 118+ if os.path.exists(pidFileName):
 119+ os.path.unlink(pidFileName)
 120+ pidFile = file(pidFileName, "w")
 121+ pidFile.write(str(os.getpid()))
 122+ pidFile.close()
 123+
 124+ server = ForkingTCPServer((listen, port), CompressorHandler)
66125 server.serve_forever()

Status & tagging log