r14231 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r14230‎ | r14231 | r14232 >
Date:22:53, 14 May 2006
Author:brion
Status:old
Tags:
Comment:
Move to top level; tools subdirectory is silly and just makes URLs longer
Modified paths:
  • /trunk/dbzip2 (added) (history)
  • /trunk/dbzip2 (added) (history)
  • /trunk/tools/dbzip2 (deleted) (history)

Diff [purge]

Index: trunk/dbzip2/dbzip2
@@ -0,0 +1,309 @@
 2+#!/usr/bin/python
 3+
 4+# In progress proof of concept for distributed parallel bzip2 compression.
 5+# Breaks incoming data stream into blocks, sends them out for separate
 6+# compression on multiple threads.
 7+#
 8+# Currently running bzip2 chunks in local threads, will try moving to
 9+# remotable network daemons distcc-style. It might or might not be worth
 10+# it depending on the overhead of slinging around all the data.
 11+#
 12+# Like pbzip2 the output files will work with the regular 'bzip2' tool
 13+# but may not work with other programs because each chunk is its own
 14+# separate stream. Other tools tend to end at the first stream's end.
 15+# it might be possible to combine the streams in some way, should
 16+# investigate.
 17+#
 18+# Brion Vibber <brion@pobox.com>
 19+# 2006-05-12
 20+
 21+# TODO:
 22+# Selectable block size
 23+# Handle remote failures gracefully
 24+# Merge blocks to single, more compatible stream
 25+# Accept file input/output, behavior like bzip2
 26+
 27+import bz2
 28+import getopt
 29+import random
 30+import socket
 31+import struct
 32+import sys
 33+import thread
 34+import time
 35+
 36+from thread import start_new_thread
 37+
 38+import DistBits
 39+
 40+class Compressor(object):
 41+ def __init__(self, args):
 42+ self.inputStream = sys.stdin
 43+ self.outputStream = sys.stdout
 44+
 45+ self.blockSize = 900000 # 900k default blocksize
 46+ self.threads = 1 # Number of local threads to start.
 47+ self.remotes = []
 48+ self.verbosity = 0
 49+
 50+ self.blocksRead = 0
 51+ self.blocksCompressed = 0
 52+ self.blocksWritten = 0
 53+ self.bytesRead = 0L
 54+ self.bytesWritten = 0L
 55+
 56+ self.compressors = []
 57+ self.inputQueue = [] # blocks to be compressed
 58+ self.outputQueue = [] # buffers to be written
 59+
 60+ self.done = False
 61+ self.threadLock = thread.allocate_lock()
 62+
 63+ self.processArgs(args)
 64+
 65+ def processArgs(self, args):
 66+ (options, remainder) = getopt.getopt(args, "vp:r:")
 67+ for (opt, val) in options:
 68+ if opt == "-v":
 69+ self.verbosity += 1
 70+ elif opt == "-p":
 71+ self.threads = int(val)
 72+ elif opt == "-r":
 73+ self.remotes.append(self.splitHost(val))
 74+
 75+ def splitHost(self, val):
 76+ if ":" in val:
 77+ (host, port) = val.split(":")
 78+ return (host, int(port))
 79+ else:
 80+ return (val, 12345)
 81+
 82+ def debug(self, level, text):
 83+ if self.verbosity >= level:
 84+ sys.stderr.write(text + "\n")
 85+
 86+ def run(self):
 87+ """Start up the threads and goooo!"""
 88+ for i in range(0, self.threads):
 89+ self.compressors.append(LocalCompressor())
 90+ for addr in self.remotes:
 91+ self.compressors.append(RemoteCompressor(addr))
 92+ assert len(self.compressors) >= 1
 93+
 94+ start_new_thread(self.readerThread, ())
 95+
 96+ for compressor in self.compressors:
 97+ start_new_thread(self.compressorThread, (compressor,))
 98+
 99+ self.writerThread()
 100+
 101+ def sleep(self):
 102+ """Wait a short time when out of data."""
 103+ time.sleep(0.01)
 104+
 105+ def lock(self):
 106+ self.threadLock.acquire()
 107+ assert self.threadLock.locked()
 108+
 109+ def unlock(self):
 110+ self.threadLock.release()
 111+
 112+ def readerThread(self):
 113+ """Producer thread: run through the file handing out blocks."""
 114+ self.debug(2, "readerThread: starting!")
 115+ block = self.nextBlock()
 116+ while block:
 117+ while not self.ready():
 118+ self.debug(4, "readerThread: full at %d; waiting" % len(self.inputQueue))
 119+ self.sleep()
 120+
 121+ self.lock()
 122+ self.blocksRead += 1
 123+ self.debug(2, "readerThread: dispatching block %d" % self.blocksRead)
 124+ self.dispatch(block)
 125+ self.unlock()
 126+
 127+ block = self.nextBlock()
 128+ self.done = True
 129+ self.debug(2, "readerThread: done; read %d blocks" % self.blocksRead)
 130+
 131+ def nextBlock(self):
 132+ buffer = self.inputStream.read(self.blockSize)
 133+ self.bytesRead += len(buffer)
 134+ self.debug(3, "nextBlock: %d" % len(buffer))
 135+ return buffer
 136+
 137+ def ready(self):
 138+ """Check if we have some free compressors. No sense filling up RAM."""
 139+ return len(self.inputQueue) < len(self.compressors)
 140+
 141+
 142+ # Queue management
 143+
 144+ def dispatch(self, block):
 145+ """Queue a block of data for remote compression."""
 146+ assert self.threadLock.locked()
 147+ buffer = QueuedBuffer(self.blocksRead, block)
 148+ self.inputQueue.append(buffer) # To the compressor threads
 149+ self.outputQueue.append(buffer) # To the writer thread, in order!
 150+
 151+ def dequeueInput(self):
 152+ """Fetch the next available block for compression."""
 153+ assert self.threadLock.locked()
 154+ if len(self.inputQueue):
 155+ return self.inputQueue.pop(0)
 156+ else:
 157+ return None
 158+
 159+ def dequeueOutput(self):
 160+ """Fetch the next completed block for writing."""
 161+ assert self.threadLock.locked()
 162+ if len(self.outputQueue) and self.outputQueue[0].ready():
 163+ return self.outputQueue.pop(0)
 164+ else:
 165+ return None
 166+
 167+
 168+ def writerThread(self):
 169+ """Consumer thread: as we receive compressed blocks from the
 170+ distributed compressors, write them to the output file.
 171+ Currently only writes blocks in order."""
 172+ self.debug(2, "writerThread: starting")
 173+ startTime = time.time()
 174+ while not (self.done and self.blocksWritten == self.blocksRead):
 175+ self.lock()
 176+ buffer = self.dequeueOutput()
 177+ self.unlock()
 178+
 179+ if buffer:
 180+ self.debug(4, "writerThread: wtf")
 181+ self.writeBuffer(buffer)
 182+ else:
 183+ self.debug(4, "writerThread: sleeping")
 184+ self.sleep()
 185+
 186+ delta = time.time() - startTime
 187+ megabyte = 1024.0 * 1024.0
 188+ rateIn = (float(self.bytesRead) / megabyte) / delta
 189+ rateOut = (float(self.bytesWritten) / megabyte) / delta
 190+ self.debug(1, "Wrote %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % (
 191+ self.blocksWritten, delta, rateIn, rateOut))
 192+
 193+ def writeBuffer(self, buffer):
 194+ """Write a buffer to the file. Currently requires that buffers
 195+ be processed in streaming order."""
 196+ self.blocksWritten += 1
 197+ self.bytesWritten += len(buffer.output)
 198+ self.debug(2, "writeBuffer: writing block %d (%d blocks, %d bytes)" %
 199+ (buffer.index, self.blocksWritten, self.bytesWritten))
 200+
 201+ assert buffer.output is not None
 202+ assert buffer.index == self.blocksWritten
 203+ self.outputStream.write(buffer.output)
 204+
 205+
 206+ def compressorThread(self, compressor):
 207+ """Worker thread: send a block to a foreign server and receive data."""
 208+ self.debug(3, "compressorThread: Started")
 209+ blocksCompressed = 0
 210+ bytesRead = 0L
 211+ bytesWritten = 0L
 212+ startTime = time.time()
 213+
 214+ while not (self.done and self.blocksCompressed == self.blocksRead):
 215+ self.lock()
 216+ buffer = self.dequeueInput()
 217+ self.unlock()
 218+ if buffer:
 219+ self.debug(4, "compressorThread: compressing")
 220+ data = compressor.compress(buffer.input)
 221+
 222+ self.lock()
 223+ buffer.set(data)
 224+
 225+ self.blocksCompressed += 1
 226+ blocksCompressed += 1
 227+ bytesRead += len(buffer.input)
 228+ bytesWritten += len(buffer.output)
 229+
 230+ self.debug(4, "compressorThread: compressed %d blocks" % self.blocksCompressed)
 231+ self.unlock()
 232+ else:
 233+ self.debug(4, "compressorThread: no input, sleeping")
 234+ self.sleep()
 235+ compressor.close()
 236+
 237+ delta = time.time() - startTime
 238+ megabyte = 1024.0 * 1024.0
 239+ rateIn = (float(bytesRead) / megabyte) / delta
 240+ rateOut = (float(bytesWritten) / megabyte) / delta
 241+ self.debug(1, "%s: processed %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % (
 242+ compressor, blocksCompressed, delta, rateIn, rateOut))
 243+
 244+
 245+class QueuedBuffer(object):
 246+ """Placeholder for received compressed buffer items."""
 247+
 248+ def __init__(self, index, input):
 249+ """Initialize an empty placeholder, no data yet."""
 250+ self.index = index
 251+ self.input = input
 252+ self.output = None
 253+
 254+ def ready(self):
 255+ return self.output is not None
 256+
 257+ def set(self, data):
 258+ """Store data and declare that we're ready to be flushed out."""
 259+ assert self.output is None
 260+ assert data is not None
 261+ self.output = data
 262+
 263+
 264+class LocalCompressor(object):
 265+ """For initial testing, we just compress locally."""
 266+
 267+ def algo(self, algo):
 268+ assert algo == "bzip2"
 269+
 270+ def compress(self, block):
 271+ return bz2.compress(block)
 272+
 273+ def close(self):
 274+ pass
 275+
 276+ def __str__(self):
 277+ return "local thread"
 278+
 279+class RemoteCompressor(object):
 280+ def __init__(self, address):
 281+ """Address is a (host, port) tuple."""
 282+ self.address = address
 283+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 284+ self.socket.connect(address)
 285+ self.connection = DistBits.Connection(self.socket)
 286+ self.connection.send("COMP", struct.pack(">l", 1))
 287+ self.connection.send("ALGO", "bzip2")
 288+
 289+ def compress(self, data):
 290+ self.connection.send("HUGE", data)
 291+ (atom, retdata) = self.connection.receive()
 292+ if atom == "SMAL":
 293+ return retdata
 294+ elif atom == "EROR":
 295+ raise Exception(data)
 296+ else:
 297+ raise Exception("Unknown return atom type")
 298+
 299+ def close(self):
 300+ self.connection.send("CLOS")
 301+ self.connection.close()
 302+ self.socket.close()
 303+ self.connection = None
 304+
 305+ def __str__(self):
 306+ return self.address[0] + ":" + str(self.address[1])
 307+
 308+if __name__ == "__main__":
 309+ compressor = Compressor(sys.argv[1:])
 310+ compressor.run()
Property changes on: trunk/dbzip2/dbzip2
___________________________________________________________________
Added: svn:executable
1311 + *
Index: trunk/dbzip2/DistBits.py
@@ -0,0 +1,78 @@
 2+import socket
 3+import struct
 4+
 5+#
 6+# Packet format:
 7+# [atom] 4-byte ASCII token identifier
 8+# [len ] 32-bit big-endian integer with size of remaining data. Must be <2GB
 9+# [data] variable length depending on len. if zero, not present.
 10+#
 11+# Client sends:
 12+# COMP <version>
 13+# Requests remote compression. Version is 32-bit big-endian integer,
 14+# should be 1. Unsupported version on the server should drop or return
 15+# error.
 16+# ALGO <algorithm>
 17+# "bzip2": Create a full bzip2 stream. Use data size as block size.
 18+# HUGE <data>
 19+# Uncompressed input data; always the last packet. After this, wait
 20+# for response. You may issue multiple such requests as long as the
 21+# connection remains open.
 22+# CLOS <no data>
 23+# Close the connection. (Optional?)
 24+#
 25+# Server sends back one of:
 26+# SMAL <data>
 27+# Compressed output data.
 28+# -or-
 29+# EROR <error string>
 30+# Some error condition which can be reported gracefully.
 31+#
 32+
 33+class Connection(object):
 34+ def __init__(self, socket):
 35+ self.stream = socket.makefile("rw")
 36+
 37+ def close(self):
 38+ self.stream.close()
 39+ self.stream = None
 40+
 41+ def send(self, atom, data=""):
 42+ assert len(atom) == 4
 43+
 44+ header = struct.pack(">4sl", atom, len(data))
 45+ assert len(header) == 8
 46+ #header = "%4s%08xd" % (atom, len(data))
 47+ #assert len(header) == 12
 48+
 49+ self.stream.write(header)
 50+ if len(data):
 51+ self.stream.write(data)
 52+ self.stream.flush()
 53+
 54+ def receive(self):
 55+ header = self.stream.read(8)
 56+ #print "Read: '%s'" % header
 57+
 58+ if header == "":
 59+ # Connection closed
 60+ return (None, None)
 61+
 62+ assert len(header) == 8
 63+
 64+ (atom, size) = struct.unpack(">4sl", header)
 65+ #atom = header[0:4]
 66+ #size = int(header[4:12], 16)
 67+ assert len(atom) == 4
 68+ assert size >= 0
 69+
 70+ if size > 0:
 71+ data = self.stream.read(size)
 72+ else:
 73+ data = ""
 74+ assert len(data) == size
 75+
 76+ return (atom, data)
 77+
 78+ def isOpen(self):
 79+ return self.stream is not None
Property changes on: trunk/dbzip2/DistBits.py
___________________________________________________________________
Added: svn:eol-style
180 + native
Index: trunk/dbzip2/README
@@ -0,0 +1,55 @@
 2+Compression with bzip2 is hideously slow. This doesn't really become apparent
 3+until you've got a 350 gigabyte XML data dump to compress and it literally
 4+can take days.
 5+
 6+I came across a "Parallel BZIP2" utility on the net, which claims linear
 7+speedup of bzip2 compression on SMP systems. In a quick test on my dual-core
 8+G5 box I found it gave a 52% speedup over the straight single-threaded bzip2
 9+when set for two processors; so there might be some potential here.
 10+
 11+This pbzip2 utility exploits two properties of bzip2:
 12+# Compression is done on largish blocks, making it very convenient to pass
 13+those blocks out to threads for simultaneous processing.
 14+# When uncompressing, the bzip2 utility will combine multiple concatenated
 15+compressed streams into a single output file.
 16+
 17+
 18+While the bzip2 scheme works on blocks, usually the data stream has one
 19+header marker and runs through to the end. pbzip2 just calls into libbz2's
 20+high-level functions to create a complete self-contained compressed stream
 21+for each block. This has a tiny overhead in extra header bytes, but is
 22+pretty negligible.
 23+
 24+The main downside is that other programs capable of reading bzip2 streams
 25+might not read the whole file; both PHP's compress.bzip2:// streams and
 26+the Apache Java thingy I'm using in mwdumper stop at the end of the first
 27+block.
 28+
 29+
 30+That can probably be worked around, though, if this works well enough.
 31+It might be possible to stitch the blocks back together into a single
 32+stream, too, but I'll have to dive deeper into Bzip2 to figure that out.
 33+
 34+
 35+I've hacked together this Python prototype which basically replicates
 36+the basic compression functionality of pbzip2, using local threads;
 37+performance looks pretty similar for two threads.
 38+
 39+Will next try a simple TCP interface to run the compression on multiple
 40+machines on the LAN, and do some tests to see if it's feasible to speed
 41+up streaming block compression with several helper boxes.
 42+
 43+
 44+If this is promising, I also will want to look at 7zip, see if something
 45+similar can be done there.
 46+
 47+-- brion vibber (brion @ pobox.com / brion @ wikimedia.org)
 48+
 49+
 50+== References ==
 51+
 52+Parallel BZIP2:
 53+* http://compression.ca/pbzip2/
 54+
 55+Copy of distcc protocol documentation, for inspiration/nicking:
 56+* http://www.opensource.apple.com/darwinsource/DevToolsNov2005/distcc-39/distcc_dist/doc/protocol-1.txt
Index: trunk/dbzip2/dbzip2d
@@ -0,0 +1,65 @@
 2+#!/usr/bin/python
 3+
 4+import bz2
 5+import getopt
 6+import struct
 7+import sys
 8+import thread
 9+import time
 10+
 11+import DistBits
 12+from SocketServer import BaseRequestHandler, ForkingTCPServer
 13+
 14+class CompressorHandler(BaseRequestHandler):
 15+ def debug(self, level, text):
 16+ print text
 17+
 18+ def setup(self):
 19+ self.connection = DistBits.Connection(self.request)
 20+ self.version = None
 21+ self.algo = None
 22+
 23+ def handle(self):
 24+ self.debug(2, "Opened connection")
 25+
 26+ handlers = {
 27+ "COMP": self.handleComp,
 28+ "ALGO": self.handleAlgo,
 29+ "HUGE": self.handleHuge,
 30+ "CLOS": self.handleClos }
 31+ (atom, data) = self.connection.receive()
 32+ while atom:
 33+ if atom:
 34+ self.debug(3, "Received %s atom, %d bytes." % (atom, len(data)))
 35+ assert atom in handlers
 36+ handlers[atom](data)
 37+ else:
 38+ self.debug(3, "End of connection.")
 39+ (atom, data) = self.connection.receive()
 40+
 41+ def handleComp(self, data):
 42+ assert self.version is None
 43+ assert len(data) == 4
 44+ self.version = struct.unpack(">l", data)[0]
 45+ assert self.version == 1
 46+
 47+ def handleAlgo(self, data):
 48+ assert self.version is not None
 49+ assert data == "bzip2"
 50+ self.algo = data
 51+
 52+ def handleHuge(self, data):
 53+ assert self.version is not None
 54+ self.connection.send("SMAL", self.compress(data))
 55+
 56+ def handleClos(self, data):
 57+ self.connection.close()
 58+
 59+ def compress(self, data):
 60+ assert self.algo == "bzip2"
 61+ return bz2.compress(data)
 62+
 63+if __name__ == "__main__":
 64+ port = 12345
 65+ server = ForkingTCPServer(("", port), CompressorHandler)
 66+ server.serve_forever()
Property changes on: trunk/dbzip2/dbzip2d
___________________________________________________________________
Added: svn:executable
167 + *

Status & tagging log