Index: trunk/dbzip2/dbzip2 |
— | — | @@ -0,0 +1,309 @@ |
| 2 | +#!/usr/bin/python |
| 3 | + |
| 4 | +# In progress proof of concept for distributed parallel bzip2 compression. |
| 5 | +# Breaks incoming data stream into blocks, sends them out for separate |
| 6 | +# compression on multiple threads. |
| 7 | +# |
| 8 | +# Currently running bzip2 chunks in local threads, will try moving to |
| 9 | +# remotable network daemons distcc-style. It might or might not be worth |
| 10 | +# it depending on the overhead of slinging around all the data. |
| 11 | +# |
| 12 | +# Like pbzip2 the output files will work with the regular 'bzip2' tool |
| 13 | +# but may not work with other programs because each chunk is its own |
| 14 | +# separate stream. Other tools tend to end at the first stream's end. |
| 15 | +# it might be possible to combine the streams in some way, should |
| 16 | +# investigate. |
| 17 | +# |
| 18 | +# Brion Vibber <brion@pobox.com> |
| 19 | +# 2006-05-12 |
| 20 | + |
| 21 | +# TODO: |
| 22 | +# Selectable block size |
| 23 | +# Handle remote failures gracefully |
| 24 | +# Merge blocks to single, more compatible stream |
| 25 | +# Accept file input/output, behavior like bzip2 |
| 26 | + |
| 27 | +import bz2 |
| 28 | +import getopt |
| 29 | +import random |
| 30 | +import socket |
| 31 | +import struct |
| 32 | +import sys |
| 33 | +import thread |
| 34 | +import time |
| 35 | + |
| 36 | +from thread import start_new_thread |
| 37 | + |
| 38 | +import DistBits |
| 39 | + |
| 40 | +class Compressor(object): |
| 41 | + def __init__(self, args): |
| 42 | + self.inputStream = sys.stdin |
| 43 | + self.outputStream = sys.stdout |
| 44 | + |
| 45 | + self.blockSize = 900000 # 900k default blocksize |
| 46 | + self.threads = 1 # Number of local threads to start. |
| 47 | + self.remotes = [] |
| 48 | + self.verbosity = 0 |
| 49 | + |
| 50 | + self.blocksRead = 0 |
| 51 | + self.blocksCompressed = 0 |
| 52 | + self.blocksWritten = 0 |
| 53 | + self.bytesRead = 0L |
| 54 | + self.bytesWritten = 0L |
| 55 | + |
| 56 | + self.compressors = [] |
| 57 | + self.inputQueue = [] # blocks to be compressed |
| 58 | + self.outputQueue = [] # buffers to be written |
| 59 | + |
| 60 | + self.done = False |
| 61 | + self.threadLock = thread.allocate_lock() |
| 62 | + |
| 63 | + self.processArgs(args) |
| 64 | + |
| 65 | + def processArgs(self, args): |
| 66 | + (options, remainder) = getopt.getopt(args, "vp:r:") |
| 67 | + for (opt, val) in options: |
| 68 | + if opt == "-v": |
| 69 | + self.verbosity += 1 |
| 70 | + elif opt == "-p": |
| 71 | + self.threads = int(val) |
| 72 | + elif opt == "-r": |
| 73 | + self.remotes.append(self.splitHost(val)) |
| 74 | + |
| 75 | + def splitHost(self, val): |
| 76 | + if ":" in val: |
| 77 | + (host, port) = val.split(":") |
| 78 | + return (host, int(port)) |
| 79 | + else: |
| 80 | + return (val, 12345) |
| 81 | + |
| 82 | + def debug(self, level, text): |
| 83 | + if self.verbosity >= level: |
| 84 | + sys.stderr.write(text + "\n") |
| 85 | + |
| 86 | + def run(self): |
| 87 | + """Start up the threads and goooo!""" |
| 88 | + for i in range(0, self.threads): |
| 89 | + self.compressors.append(LocalCompressor()) |
| 90 | + for addr in self.remotes: |
| 91 | + self.compressors.append(RemoteCompressor(addr)) |
| 92 | + assert len(self.compressors) >= 1 |
| 93 | + |
| 94 | + start_new_thread(self.readerThread, ()) |
| 95 | + |
| 96 | + for compressor in self.compressors: |
| 97 | + start_new_thread(self.compressorThread, (compressor,)) |
| 98 | + |
| 99 | + self.writerThread() |
| 100 | + |
| 101 | + def sleep(self): |
| 102 | + """Wait a short time when out of data.""" |
| 103 | + time.sleep(0.01) |
| 104 | + |
| 105 | + def lock(self): |
| 106 | + self.threadLock.acquire() |
| 107 | + assert self.threadLock.locked() |
| 108 | + |
| 109 | + def unlock(self): |
| 110 | + self.threadLock.release() |
| 111 | + |
| 112 | + def readerThread(self): |
| 113 | + """Producer thread: run through the file handing out blocks.""" |
| 114 | + self.debug(2, "readerThread: starting!") |
| 115 | + block = self.nextBlock() |
| 116 | + while block: |
| 117 | + while not self.ready(): |
| 118 | + self.debug(4, "readerThread: full at %d; waiting" % len(self.inputQueue)) |
| 119 | + self.sleep() |
| 120 | + |
| 121 | + self.lock() |
| 122 | + self.blocksRead += 1 |
| 123 | + self.debug(2, "readerThread: dispatching block %d" % self.blocksRead) |
| 124 | + self.dispatch(block) |
| 125 | + self.unlock() |
| 126 | + |
| 127 | + block = self.nextBlock() |
| 128 | + self.done = True |
| 129 | + self.debug(2, "readerThread: done; read %d blocks" % self.blocksRead) |
| 130 | + |
| 131 | + def nextBlock(self): |
| 132 | + buffer = self.inputStream.read(self.blockSize) |
| 133 | + self.bytesRead += len(buffer) |
| 134 | + self.debug(3, "nextBlock: %d" % len(buffer)) |
| 135 | + return buffer |
| 136 | + |
| 137 | + def ready(self): |
| 138 | + """Check if we have some free compressors. No sense filling up RAM.""" |
| 139 | + return len(self.inputQueue) < len(self.compressors) |
| 140 | + |
| 141 | + |
| 142 | + # Queue management |
| 143 | + |
| 144 | + def dispatch(self, block): |
| 145 | + """Queue a block of data for remote compression.""" |
| 146 | + assert self.threadLock.locked() |
| 147 | + buffer = QueuedBuffer(self.blocksRead, block) |
| 148 | + self.inputQueue.append(buffer) # To the compressor threads |
| 149 | + self.outputQueue.append(buffer) # To the writer thread, in order! |
| 150 | + |
| 151 | + def dequeueInput(self): |
| 152 | + """Fetch the next available block for compression.""" |
| 153 | + assert self.threadLock.locked() |
| 154 | + if len(self.inputQueue): |
| 155 | + return self.inputQueue.pop(0) |
| 156 | + else: |
| 157 | + return None |
| 158 | + |
| 159 | + def dequeueOutput(self): |
| 160 | + """Fetch the next completed block for writing.""" |
| 161 | + assert self.threadLock.locked() |
| 162 | + if len(self.outputQueue) and self.outputQueue[0].ready(): |
| 163 | + return self.outputQueue.pop(0) |
| 164 | + else: |
| 165 | + return None |
| 166 | + |
| 167 | + |
| 168 | + def writerThread(self): |
| 169 | + """Consumer thread: as we receive compressed blocks from the |
| 170 | + distributed compressors, write them to the output file. |
| 171 | + Currently only writes blocks in order.""" |
| 172 | + self.debug(2, "writerThread: starting") |
| 173 | + startTime = time.time() |
| 174 | + while not (self.done and self.blocksWritten == self.blocksRead): |
| 175 | + self.lock() |
| 176 | + buffer = self.dequeueOutput() |
| 177 | + self.unlock() |
| 178 | + |
| 179 | + if buffer: |
| 180 | + self.debug(4, "writerThread: wtf") |
| 181 | + self.writeBuffer(buffer) |
| 182 | + else: |
| 183 | + self.debug(4, "writerThread: sleeping") |
| 184 | + self.sleep() |
| 185 | + |
| 186 | + delta = time.time() - startTime |
| 187 | + megabyte = 1024.0 * 1024.0 |
| 188 | + rateIn = (float(self.bytesRead) / megabyte) / delta |
| 189 | + rateOut = (float(self.bytesWritten) / megabyte) / delta |
| 190 | + self.debug(1, "Wrote %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % ( |
| 191 | + self.blocksWritten, delta, rateIn, rateOut)) |
| 192 | + |
| 193 | + def writeBuffer(self, buffer): |
| 194 | + """Write a buffer to the file. Currently requires that buffers |
| 195 | + be processed in streaming order.""" |
| 196 | + self.blocksWritten += 1 |
| 197 | + self.bytesWritten += len(buffer.output) |
| 198 | + self.debug(2, "writeBuffer: writing block %d (%d blocks, %d bytes)" % |
| 199 | + (buffer.index, self.blocksWritten, self.bytesWritten)) |
| 200 | + |
| 201 | + assert buffer.output is not None |
| 202 | + assert buffer.index == self.blocksWritten |
| 203 | + self.outputStream.write(buffer.output) |
| 204 | + |
| 205 | + |
| 206 | + def compressorThread(self, compressor): |
| 207 | + """Worker thread: send a block to a foreign server and receive data.""" |
| 208 | + self.debug(3, "compressorThread: Started") |
| 209 | + blocksCompressed = 0 |
| 210 | + bytesRead = 0L |
| 211 | + bytesWritten = 0L |
| 212 | + startTime = time.time() |
| 213 | + |
| 214 | + while not (self.done and self.blocksCompressed == self.blocksRead): |
| 215 | + self.lock() |
| 216 | + buffer = self.dequeueInput() |
| 217 | + self.unlock() |
| 218 | + if buffer: |
| 219 | + self.debug(4, "compressorThread: compressing") |
| 220 | + data = compressor.compress(buffer.input) |
| 221 | + |
| 222 | + self.lock() |
| 223 | + buffer.set(data) |
| 224 | + |
| 225 | + self.blocksCompressed += 1 |
| 226 | + blocksCompressed += 1 |
| 227 | + bytesRead += len(buffer.input) |
| 228 | + bytesWritten += len(buffer.output) |
| 229 | + |
| 230 | + self.debug(4, "compressorThread: compressed %d blocks" % self.blocksCompressed) |
| 231 | + self.unlock() |
| 232 | + else: |
| 233 | + self.debug(4, "compressorThread: no input, sleeping") |
| 234 | + self.sleep() |
| 235 | + compressor.close() |
| 236 | + |
| 237 | + delta = time.time() - startTime |
| 238 | + megabyte = 1024.0 * 1024.0 |
| 239 | + rateIn = (float(bytesRead) / megabyte) / delta |
| 240 | + rateOut = (float(bytesWritten) / megabyte) / delta |
| 241 | + self.debug(1, "%s: processed %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % ( |
| 242 | + compressor, blocksCompressed, delta, rateIn, rateOut)) |
| 243 | + |
| 244 | + |
| 245 | +class QueuedBuffer(object): |
| 246 | + """Placeholder for received compressed buffer items.""" |
| 247 | + |
| 248 | + def __init__(self, index, input): |
| 249 | + """Initialize an empty placeholder, no data yet.""" |
| 250 | + self.index = index |
| 251 | + self.input = input |
| 252 | + self.output = None |
| 253 | + |
| 254 | + def ready(self): |
| 255 | + return self.output is not None |
| 256 | + |
| 257 | + def set(self, data): |
| 258 | + """Store data and declare that we're ready to be flushed out.""" |
| 259 | + assert self.output is None |
| 260 | + assert data is not None |
| 261 | + self.output = data |
| 262 | + |
| 263 | + |
| 264 | +class LocalCompressor(object): |
| 265 | + """For initial testing, we just compress locally.""" |
| 266 | + |
| 267 | + def algo(self, algo): |
| 268 | + assert algo == "bzip2" |
| 269 | + |
| 270 | + def compress(self, block): |
| 271 | + return bz2.compress(block) |
| 272 | + |
| 273 | + def close(self): |
| 274 | + pass |
| 275 | + |
| 276 | + def __str__(self): |
| 277 | + return "local thread" |
| 278 | + |
| 279 | +class RemoteCompressor(object): |
| 280 | + def __init__(self, address): |
| 281 | + """Address is a (host, port) tuple.""" |
| 282 | + self.address = address |
| 283 | + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 284 | + self.socket.connect(address) |
| 285 | + self.connection = DistBits.Connection(self.socket) |
| 286 | + self.connection.send("COMP", struct.pack(">l", 1)) |
| 287 | + self.connection.send("ALGO", "bzip2") |
| 288 | + |
| 289 | + def compress(self, data): |
| 290 | + self.connection.send("HUGE", data) |
| 291 | + (atom, retdata) = self.connection.receive() |
| 292 | + if atom == "SMAL": |
| 293 | + return retdata |
| 294 | + elif atom == "EROR": |
| 295 | + raise Exception(data) |
| 296 | + else: |
| 297 | + raise Exception("Unknown return atom type") |
| 298 | + |
| 299 | + def close(self): |
| 300 | + self.connection.send("CLOS") |
| 301 | + self.connection.close() |
| 302 | + self.socket.close() |
| 303 | + self.connection = None |
| 304 | + |
| 305 | + def __str__(self): |
| 306 | + return self.address[0] + ":" + str(self.address[1]) |
| 307 | + |
| 308 | +if __name__ == "__main__": |
| 309 | + compressor = Compressor(sys.argv[1:]) |
| 310 | + compressor.run() |
Property changes on: trunk/dbzip2/dbzip2 |
___________________________________________________________________ |
Added: svn:executable |
1 | 311 | + * |
Index: trunk/dbzip2/DistBits.py |
— | — | @@ -0,0 +1,78 @@ |
| 2 | +import socket |
| 3 | +import struct |
| 4 | + |
| 5 | +# |
| 6 | +# Packet format: |
| 7 | +# [atom] 4-byte ASCII token identifier |
| 8 | +# [len ] 32-bit big-endian integer with size of remaining data. Must be <2GB |
| 9 | +# [data] variable length depending on len. if zero, not present. |
| 10 | +# |
| 11 | +# Client sends: |
| 12 | +# COMP <version> |
| 13 | +# Requests remote compression. Version is 32-bit big-endian integer, |
| 14 | +# should be 1. Unsupported version on the server should drop or return |
| 15 | +# error. |
| 16 | +# ALGO <algorithm> |
| 17 | +# "bzip2": Create a full bzip2 stream. Use data size as block size. |
| 18 | +# HUGE <data> |
| 19 | +# Uncompressed input data; always the last packet. After this, wait |
| 20 | +# for response. You may issue multiple such requests as long as the |
| 21 | +# connection remains open. |
| 22 | +# CLOS <no data> |
| 23 | +# Close the connection. (Optional?) |
| 24 | +# |
| 25 | +# Server sends back one of: |
| 26 | +# SMAL <data> |
| 27 | +# Compressed output data. |
| 28 | +# -or- |
| 29 | +# EROR <error string> |
| 30 | +# Some error condition which can be reported gracefully. |
| 31 | +# |
| 32 | + |
| 33 | +class Connection(object): |
| 34 | + def __init__(self, socket): |
| 35 | + self.stream = socket.makefile("rw") |
| 36 | + |
| 37 | + def close(self): |
| 38 | + self.stream.close() |
| 39 | + self.stream = None |
| 40 | + |
| 41 | + def send(self, atom, data=""): |
| 42 | + assert len(atom) == 4 |
| 43 | + |
| 44 | + header = struct.pack(">4sl", atom, len(data)) |
| 45 | + assert len(header) == 8 |
| 46 | + #header = "%4s%08xd" % (atom, len(data)) |
| 47 | + #assert len(header) == 12 |
| 48 | + |
| 49 | + self.stream.write(header) |
| 50 | + if len(data): |
| 51 | + self.stream.write(data) |
| 52 | + self.stream.flush() |
| 53 | + |
| 54 | + def receive(self): |
| 55 | + header = self.stream.read(8) |
| 56 | + #print "Read: '%s'" % header |
| 57 | + |
| 58 | + if header == "": |
| 59 | + # Connection closed |
| 60 | + return (None, None) |
| 61 | + |
| 62 | + assert len(header) == 8 |
| 63 | + |
| 64 | + (atom, size) = struct.unpack(">4sl", header) |
| 65 | + #atom = header[0:4] |
| 66 | + #size = int(header[4:12], 16) |
| 67 | + assert len(atom) == 4 |
| 68 | + assert size >= 0 |
| 69 | + |
| 70 | + if size > 0: |
| 71 | + data = self.stream.read(size) |
| 72 | + else: |
| 73 | + data = "" |
| 74 | + assert len(data) == size |
| 75 | + |
| 76 | + return (atom, data) |
| 77 | + |
| 78 | + def isOpen(self): |
| 79 | + return self.stream is not None |
Property changes on: trunk/dbzip2/DistBits.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 80 | + native |
Index: trunk/dbzip2/README |
— | — | @@ -0,0 +1,55 @@ |
| 2 | +Compression with bzip2 is hideously slow. This doesn't really become apparent |
| 3 | +until you've got a 350 gigabyte XML data dump to compress and it literally |
| 4 | +can take days. |
| 5 | + |
| 6 | +I came across a "Parallel BZIP2" utility on the net, which claims linear |
| 7 | +speedup of bzip2 compression on SMP systems. In a quick test on my dual-core |
| 8 | +G5 box I found it gave a 52% speedup over the straight single-threaded bzip2 |
| 9 | +when set for two processors; so there might be some potential here. |
| 10 | + |
| 11 | +This pbzip2 utility exploits two properties of bzip2: |
| 12 | +# Compression is done on largish blocks, making it very convenient to pass |
| 13 | +those blocks out to threads for simultaneous processing. |
| 14 | +# When uncompressing, the bzip2 utility will combine multiple concatenated |
| 15 | +compressed streams into a single output file. |
| 16 | + |
| 17 | + |
| 18 | +While the bzip2 scheme works on blocks, usually the data stream has one |
| 19 | +header marker and runs through to the end. pbzip2 just calls into libbz2's |
| 20 | +high-level functions to create a complete self-contained compressed stream |
| 21 | +for each block. This has a tiny overhead in extra header bytes, but is |
| 22 | +pretty negligible. |
| 23 | + |
| 24 | +The main downside is that other programs capable of reading bzip2 streams |
| 25 | +might not read the whole file; both PHP's compress.bzip2:// streams and |
| 26 | +the Apache Java thingy I'm using in mwdumper stop at the end of the first |
| 27 | +block. |
| 28 | + |
| 29 | + |
| 30 | +That can probably be worked around, though, if this works well enough. |
| 31 | +It might be possible to stitch the blocks back together into a single |
| 32 | +stream, too, but I'll have to dive deeper into Bzip2 to figure that out. |
| 33 | + |
| 34 | + |
| 35 | +I've hacked together this Python prototype which basically replicates |
| 36 | +the basic compression functionality of pbzip2, using local threads; |
| 37 | +performance looks pretty similar for two threads. |
| 38 | + |
| 39 | +Will next try a simple TCP interface to run the compression on multiple |
| 40 | +machines on the LAN, and do some tests to see if it's feasible to speed |
| 41 | +up streaming block compression with several helper boxes. |
| 42 | + |
| 43 | + |
| 44 | +If this is promising, I also will want to look at 7zip, see if something |
| 45 | +similar can be done there. |
| 46 | + |
| 47 | +-- brion vibber (brion @ pobox.com / brion @ wikimedia.org) |
| 48 | + |
| 49 | + |
| 50 | +== References == |
| 51 | + |
| 52 | +Parallel BZIP2: |
| 53 | +* http://compression.ca/pbzip2/ |
| 54 | + |
| 55 | +Copy of distcc protocol documentation, for inspiration/nicking: |
| 56 | +* http://www.opensource.apple.com/darwinsource/DevToolsNov2005/distcc-39/distcc_dist/doc/protocol-1.txt |
Index: trunk/dbzip2/dbzip2d |
— | — | @@ -0,0 +1,65 @@ |
| 2 | +#!/usr/bin/python |
| 3 | + |
| 4 | +import bz2 |
| 5 | +import getopt |
| 6 | +import struct |
| 7 | +import sys |
| 8 | +import thread |
| 9 | +import time |
| 10 | + |
| 11 | +import DistBits |
| 12 | +from SocketServer import BaseRequestHandler, ForkingTCPServer |
| 13 | + |
| 14 | +class CompressorHandler(BaseRequestHandler): |
| 15 | + def debug(self, level, text): |
| 16 | + print text |
| 17 | + |
| 18 | + def setup(self): |
| 19 | + self.connection = DistBits.Connection(self.request) |
| 20 | + self.version = None |
| 21 | + self.algo = None |
| 22 | + |
| 23 | + def handle(self): |
| 24 | + self.debug(2, "Opened connection") |
| 25 | + |
| 26 | + handlers = { |
| 27 | + "COMP": self.handleComp, |
| 28 | + "ALGO": self.handleAlgo, |
| 29 | + "HUGE": self.handleHuge, |
| 30 | + "CLOS": self.handleClos } |
| 31 | + (atom, data) = self.connection.receive() |
| 32 | + while atom: |
| 33 | + if atom: |
| 34 | + self.debug(3, "Received %s atom, %d bytes." % (atom, len(data))) |
| 35 | + assert atom in handlers |
| 36 | + handlers[atom](data) |
| 37 | + else: |
| 38 | + self.debug(3, "End of connection.") |
| 39 | + (atom, data) = self.connection.receive() |
| 40 | + |
| 41 | + def handleComp(self, data): |
| 42 | + assert self.version is None |
| 43 | + assert len(data) == 4 |
| 44 | + self.version = struct.unpack(">l", data)[0] |
| 45 | + assert self.version == 1 |
| 46 | + |
| 47 | + def handleAlgo(self, data): |
| 48 | + assert self.version is not None |
| 49 | + assert data == "bzip2" |
| 50 | + self.algo = data |
| 51 | + |
| 52 | + def handleHuge(self, data): |
| 53 | + assert self.version is not None |
| 54 | + self.connection.send("SMAL", self.compress(data)) |
| 55 | + |
| 56 | + def handleClos(self, data): |
| 57 | + self.connection.close() |
| 58 | + |
| 59 | + def compress(self, data): |
| 60 | + assert self.algo == "bzip2" |
| 61 | + return bz2.compress(data) |
| 62 | + |
| 63 | +if __name__ == "__main__": |
| 64 | + port = 12345 |
| 65 | + server = ForkingTCPServer(("", port), CompressorHandler) |
| 66 | + server.serve_forever() |
Property changes on: trunk/dbzip2/dbzip2d |
___________________________________________________________________ |
Added: svn:executable |
1 | 67 | + * |