r14215 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r14214‎ | r14215 | r14216 >
Date:06:12, 14 May 2006
Author:brion
Status:old
Tags:
Comment:
Rearrange dispatching to let compressor threads pull from a queue.
Possibly still some deadlock problems, but appears to mostly work.
Now reports per-compressor throughput rates with -v
Modified paths:
  • /trunk/tools/dbzip2/dbzip2 (modified) (history)

Diff [purge]

Index: trunk/tools/dbzip2/dbzip2
@@ -18,8 +18,6 @@
1919 # 2006-05-12
2020
2121 # TODO:
22 -# Use a thread pool?
23 -# Selectable remote threads
2422 # Selectable block size
2523 # Handle remote failures gracefully
2624 # Merge blocks to single, more compatible stream
@@ -34,24 +32,30 @@
3533 import thread
3634 import time
3735
 36+from thread import start_new_thread
 37+
3838 import DistBits
3939
4040 class Compressor(object):
4141 def __init__(self, args):
4242 self.inputStream = sys.stdin
4343 self.outputStream = sys.stdout
 44+
4445 self.blockSize = 900000 # 900k default blocksize
45 - self.threads = 1
 46+ self.threads = 1 # Number of local threads to start.
 47+ self.remotes = []
4648 self.verbosity = 0
4749
48 - self.readCount = 0
49 - self.writeCount = 0
 50+ self.blocksRead = 0
 51+ self.blocksCompressed = 0
 52+ self.blocksWritten = 0
5053 self.bytesRead = 0L
5154 self.bytesWritten = 0L
5255
53 - self.remotes = []
 56+ self.compressors = []
 57+ self.inputQueue = [] # blocks to be compressed
 58+ self.outputQueue = [] # buffers to be written
5459
55 - self.queue = []
5660 self.done = False
5761 self.threadLock = thread.allocate_lock()
5862
@@ -79,7 +83,18 @@
8084 sys.stderr.write(text + "\n")
8185
8286 def run(self):
83 - thread.start_new_thread(self.readerThread, ())
 87+ """Start up the threads and goooo!"""
 88+ for i in range(0, self.threads):
 89+ self.compressors.append(LocalCompressor())
 90+ for addr in self.remotes:
 91+ self.compressors.append(RemoteCompressor(addr))
 92+ assert len(self.compressors) >= 1
 93+
 94+ start_new_thread(self.readerThread, ())
 95+
 96+ for compressor in self.compressors:
 97+ start_new_thread(self.compressorThread, (compressor,))
 98+
8499 self.writerThread()
85100
86101 def sleep(self):
@@ -99,18 +114,18 @@
100115 block = self.nextBlock()
101116 while block:
102117 while not self.ready():
103 - self.debug(4, "readerThread: full at %d; waiting" % len(self.queue))
 118+ self.debug(4, "readerThread: full at %d; waiting" % len(self.inputQueue))
104119 self.sleep()
105120
106121 self.lock()
107 - self.readCount += 1
108 - self.debug(2, "readerThread: dispatching block %d" % self.readCount)
 122+ self.blocksRead += 1
 123+ self.debug(2, "readerThread: dispatching block %d" % self.blocksRead)
109124 self.dispatch(block)
110125 self.unlock()
111126
112127 block = self.nextBlock()
113128 self.done = True
114 - self.debug(2, "readerThread: done; read %d blocks" % self.readCount)
 129+ self.debug(2, "readerThread: done; read %d blocks" % self.blocksRead)
115130
116131 def nextBlock(self):
117132 buffer = self.inputStream.read(self.blockSize)
@@ -119,94 +134,130 @@
120135 return buffer
121136
122137 def ready(self):
123 - """Check if we've gone over the limit of waiting connections."""
124 - return len(self.queue) < self.threads
 138+ """Check if we have some free compressors. No sense filling up RAM."""
 139+ return len(self.inputQueue) < len(self.compressors)
 140+
 141+
 142+ # Queue management
125143
126144 def dispatch(self, block):
127145 """Queue a block of data for remote compression."""
128146 assert self.threadLock.locked()
129 - buffer = QueuedBuffer(self.readCount)
130 - self.queue.append(buffer)
131 - thread.start_new_thread(self.remoteThread, (block, buffer))
 147+ buffer = QueuedBuffer(self.blocksRead, block)
 148+ self.inputQueue.append(buffer) # To the compressor threads
 149+ self.outputQueue.append(buffer) # To the writer thread, in order!
132150
 151+ def dequeueInput(self):
 152+ """Fetch the next available block for compression."""
 153+ assert self.threadLock.locked()
 154+ if len(self.inputQueue):
 155+ return self.inputQueue.pop(0)
 156+ else:
 157+ return None
133158
 159+ def dequeueOutput(self):
 160+ """Fetch the next completed block for writing."""
 161+ assert self.threadLock.locked()
 162+ if len(self.outputQueue) and self.outputQueue[0].ready():
 163+ return self.outputQueue.pop(0)
 164+ else:
 165+ return None
 166+
 167+
134168 def writerThread(self):
135169 """Consumer thread: as we receive compressed blocks from the
136170 distributed compressors, write them to the output file.
137171 Currently only writes blocks in order."""
138172 self.debug(2, "writerThread: starting")
139173 startTime = time.time()
140 - while not (self.done and self.writeCount == self.readCount):
 174+ while not (self.done and self.blocksWritten == self.blocksRead):
141175 self.lock()
142 - buffer = self.dequeue()
 176+ buffer = self.dequeueOutput()
143177 self.unlock()
144178
145179 if buffer:
146 - self.writeCount += 1
147 - self.debug(2, "writerThread: writing block %d" % self.writeCount)
 180+ self.debug(4, "writerThread: wtf")
148181 self.writeBuffer(buffer)
149182 else:
150183 self.debug(4, "writerThread: sleeping")
151184 self.sleep()
 185+
152186 delta = time.time() - startTime
153187 megabyte = 1024.0 * 1024.0
154188 rateIn = (float(self.bytesRead) / megabyte) / delta
155189 rateOut = (float(self.bytesWritten) / megabyte) / delta
156190 self.debug(1, "Wrote %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % (
157 - self.writeCount, delta, rateIn, rateOut))
 191+ self.blocksWritten, delta, rateIn, rateOut))
158192
159 - def dequeue(self):
160 - """Fetch the next completed block for writing."""
161 - assert self.threadLock.locked()
162 - if len(self.queue) and self.queue[0].ready():
163 - return self.queue.pop(0)
164 - else:
165 - return None
166 -
167193 def writeBuffer(self, buffer):
168194 """Write a buffer to the file. Currently requires that buffers
169195 be processed in streaming order."""
170 - self.debug(3, "writeBuffer: writing block %d" % buffer.index)
171 - assert buffer.contents is not None
172 - assert buffer.index == self.writeCount
173 - self.bytesWritten += len(buffer.contents)
174 - self.outputStream.write(buffer.contents)
 196+ self.blocksWritten += 1
 197+ self.bytesWritten += len(buffer.output)
 198+ self.debug(2, "writeBuffer: writing block %d (%d blocks, %d bytes)" %
 199+ (buffer.index, self.blocksWritten, self.bytesWritten))
 200+
 201+ assert buffer.output is not None
 202+ assert buffer.index == self.blocksWritten
 203+ self.outputStream.write(buffer.output)
175204
176205
177 - def remoteThread(self, block, buffer):
 206+ def compressorThread(self, compressor):
178207 """Worker thread: send a block to a foreign server and receive data."""
179 - compressor = self.pickCompressor()
180 - data = compressor.compress(block)
181 - self.debug(4, "remoteThread: got data!")
182 - buffer.set(data)
183 - # After it's been fulfilled, the writer thread will dequeue and write it.
184 -
185 - def pickCompressor(self):
186 - numRemotes = len(self.remotes)
187 - if numRemotes:
188 - index = random.randint(0, numRemotes - 1)
189 - return RemoteCompressor(self.remotes[index])
190 - else:
191 - # Need to take a local...
192 - return LocalCompressor()
 208+ self.debug(3, "compressorThread: Started")
 209+ blocksCompressed = 0
 210+ bytesRead = 0L
 211+ bytesWritten = 0L
 212+ startTime = time.time()
 213+
 214+ while not (self.done and self.blocksCompressed == self.blocksRead):
 215+ self.lock()
 216+ buffer = self.dequeueInput()
 217+ self.unlock()
 218+ if buffer:
 219+ self.debug(4, "compressorThread: compressing")
 220+ data = compressor.compress(buffer.input)
 221+
 222+ self.lock()
 223+ buffer.set(data)
 224+
 225+ self.blocksCompressed += 1
 226+ blocksCompressed += 1
 227+ bytesRead += len(buffer.input)
 228+ bytesWritten += len(buffer.output)
 229+
 230+ self.debug(4, "compressorThread: compressed %d blocks" % self.blocksCompressed)
 231+ self.unlock()
 232+ else:
 233+ self.debug(4, "compressorThread: no input, sleeping")
 234+ self.sleep()
 235+ compressor.close()
 236+
 237+ delta = time.time() - startTime
 238+ megabyte = 1024.0 * 1024.0
 239+ rateIn = (float(bytesRead) / megabyte) / delta
 240+ rateOut = (float(bytesWritten) / megabyte) / delta
 241+ self.debug(1, "%s: processed %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % (
 242+ compressor, blocksCompressed, delta, rateIn, rateOut))
193243
194244
195245 class QueuedBuffer(object):
196246 """Placeholder for received compressed buffer items."""
197247
198 - def __init__(self, index):
 248+ def __init__(self, index, input):
199249 """Initialize an empty placeholder, no data yet."""
200 - self.contents = None
201250 self.index = index
 251+ self.input = input
 252+ self.output = None
202253
203254 def ready(self):
204 - return self.contents is not None
 255+ return self.output is not None
205256
206257 def set(self, data):
207258 """Store data and declare that we're ready to be flushed out."""
208 - assert self.contents is None
 259+ assert self.output is None
209260 assert data is not None
210 - self.contents = data
 261+ self.output = data
211262
212263
213264 class LocalCompressor(object):
@@ -220,10 +271,14 @@
221272
222273 def close(self):
223274 pass
 275+
 276+ def __str__(self):
 277+ return "local thread"
224278
225279 class RemoteCompressor(object):
226280 def __init__(self, address):
227281 """Address is a (host, port) tuple."""
 282+ self.address = address
228283 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
229284 self.socket.connect(address)
230285 self.connection = DistBits.Connection(self.socket)
@@ -245,6 +300,9 @@
246301 self.connection.close()
247302 self.socket.close()
248303 self.connection = None
 304+
 305+ def __str__(self):
 306+ return self.address[0] + ":" + str(self.address[1])
249307
250308 if __name__ == "__main__":
251309 compressor = Compressor(sys.argv[1:])

Status & tagging log