Index: trunk/tools/dbzip2/dbzip2 |
— | — | @@ -18,8 +18,6 @@ |
19 | 19 | # 2006-05-12 |
20 | 20 | |
21 | 21 | # TODO: |
22 | | -# Use a thread pool? |
23 | | -# Selectable remote threads |
24 | 22 | # Selectable block size |
25 | 23 | # Handle remote failures gracefully |
26 | 24 | # Merge blocks to single, more compatible stream |
— | — | @@ -34,24 +32,30 @@ |
35 | 33 | import thread |
36 | 34 | import time |
37 | 35 | |
| 36 | +from thread import start_new_thread |
| 37 | + |
38 | 38 | import DistBits |
39 | 39 | |
40 | 40 | class Compressor(object): |
41 | 41 | def __init__(self, args): |
42 | 42 | self.inputStream = sys.stdin |
43 | 43 | self.outputStream = sys.stdout |
| 44 | + |
44 | 45 | self.blockSize = 900000 # 900k default blocksize |
45 | | - self.threads = 1 |
| 46 | + self.threads = 1 # Number of local threads to start. |
| 47 | + self.remotes = [] |
46 | 48 | self.verbosity = 0 |
47 | 49 | |
48 | | - self.readCount = 0 |
49 | | - self.writeCount = 0 |
| 50 | + self.blocksRead = 0 |
| 51 | + self.blocksCompressed = 0 |
| 52 | + self.blocksWritten = 0 |
50 | 53 | self.bytesRead = 0L |
51 | 54 | self.bytesWritten = 0L |
52 | 55 | |
53 | | - self.remotes = [] |
| 56 | + self.compressors = [] |
| 57 | + self.inputQueue = [] # blocks to be compressed |
| 58 | + self.outputQueue = [] # buffers to be written |
54 | 59 | |
55 | | - self.queue = [] |
56 | 60 | self.done = False |
57 | 61 | self.threadLock = thread.allocate_lock() |
58 | 62 | |
— | — | @@ -79,7 +83,18 @@ |
80 | 84 | sys.stderr.write(text + "\n") |
81 | 85 | |
82 | 86 | def run(self): |
83 | | - thread.start_new_thread(self.readerThread, ()) |
| 87 | + """Start up the threads and goooo!""" |
| 88 | + for i in range(0, self.threads): |
| 89 | + self.compressors.append(LocalCompressor()) |
| 90 | + for addr in self.remotes: |
| 91 | + self.compressors.append(RemoteCompressor(addr)) |
| 92 | + assert len(self.compressors) >= 1 |
| 93 | + |
| 94 | + start_new_thread(self.readerThread, ()) |
| 95 | + |
| 96 | + for compressor in self.compressors: |
| 97 | + start_new_thread(self.compressorThread, (compressor,)) |
| 98 | + |
84 | 99 | self.writerThread() |
85 | 100 | |
86 | 101 | def sleep(self): |
— | — | @@ -99,18 +114,18 @@ |
100 | 115 | block = self.nextBlock() |
101 | 116 | while block: |
102 | 117 | while not self.ready(): |
103 | | - self.debug(4, "readerThread: full at %d; waiting" % len(self.queue)) |
| 118 | + self.debug(4, "readerThread: full at %d; waiting" % len(self.inputQueue)) |
104 | 119 | self.sleep() |
105 | 120 | |
106 | 121 | self.lock() |
107 | | - self.readCount += 1 |
108 | | - self.debug(2, "readerThread: dispatching block %d" % self.readCount) |
| 122 | + self.blocksRead += 1 |
| 123 | + self.debug(2, "readerThread: dispatching block %d" % self.blocksRead) |
109 | 124 | self.dispatch(block) |
110 | 125 | self.unlock() |
111 | 126 | |
112 | 127 | block = self.nextBlock() |
113 | 128 | self.done = True |
114 | | - self.debug(2, "readerThread: done; read %d blocks" % self.readCount) |
| 129 | + self.debug(2, "readerThread: done; read %d blocks" % self.blocksRead) |
115 | 130 | |
116 | 131 | def nextBlock(self): |
117 | 132 | buffer = self.inputStream.read(self.blockSize) |
— | — | @@ -119,94 +134,130 @@ |
120 | 135 | return buffer |
121 | 136 | |
122 | 137 | def ready(self): |
123 | | - """Check if we've gone over the limit of waiting connections.""" |
124 | | - return len(self.queue) < self.threads |
| 138 | + """Check if we have some free compressors. No sense filling up RAM.""" |
| 139 | + return len(self.inputQueue) < len(self.compressors) |
| 140 | + |
| 141 | + |
| 142 | + # Queue management |
125 | 143 | |
126 | 144 | def dispatch(self, block): |
127 | 145 | """Queue a block of data for remote compression.""" |
128 | 146 | assert self.threadLock.locked() |
129 | | - buffer = QueuedBuffer(self.readCount) |
130 | | - self.queue.append(buffer) |
131 | | - thread.start_new_thread(self.remoteThread, (block, buffer)) |
| 147 | + buffer = QueuedBuffer(self.blocksRead, block) |
| 148 | + self.inputQueue.append(buffer) # To the compressor threads |
| 149 | + self.outputQueue.append(buffer) # To the writer thread, in order! |
132 | 150 | |
| 151 | + def dequeueInput(self): |
| 152 | + """Fetch the next available block for compression.""" |
| 153 | + assert self.threadLock.locked() |
| 154 | + if len(self.inputQueue): |
| 155 | + return self.inputQueue.pop(0) |
| 156 | + else: |
| 157 | + return None |
133 | 158 | |
| 159 | + def dequeueOutput(self): |
| 160 | + """Fetch the next completed block for writing.""" |
| 161 | + assert self.threadLock.locked() |
| 162 | + if len(self.outputQueue) and self.outputQueue[0].ready(): |
| 163 | + return self.outputQueue.pop(0) |
| 164 | + else: |
| 165 | + return None |
| 166 | + |
| 167 | + |
134 | 168 | def writerThread(self): |
135 | 169 | """Consumer thread: as we receive compressed blocks from the |
136 | 170 | distributed compressors, write them to the output file. |
137 | 171 | Currently only writes blocks in order.""" |
138 | 172 | self.debug(2, "writerThread: starting") |
139 | 173 | startTime = time.time() |
140 | | - while not (self.done and self.writeCount == self.readCount): |
| 174 | + while not (self.done and self.blocksWritten == self.blocksRead): |
141 | 175 | self.lock() |
142 | | - buffer = self.dequeue() |
| 176 | + buffer = self.dequeueOutput() |
143 | 177 | self.unlock() |
144 | 178 | |
145 | 179 | if buffer: |
146 | | - self.writeCount += 1 |
147 | | - self.debug(2, "writerThread: writing block %d" % self.writeCount) |
| 180 | + self.debug(4, "writerThread: wtf") |
148 | 181 | self.writeBuffer(buffer) |
149 | 182 | else: |
150 | 183 | self.debug(4, "writerThread: sleeping") |
151 | 184 | self.sleep() |
| 185 | + |
152 | 186 | delta = time.time() - startTime |
153 | 187 | megabyte = 1024.0 * 1024.0 |
154 | 188 | rateIn = (float(self.bytesRead) / megabyte) / delta |
155 | 189 | rateOut = (float(self.bytesWritten) / megabyte) / delta |
156 | 190 | self.debug(1, "Wrote %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % ( |
157 | | - self.writeCount, delta, rateIn, rateOut)) |
| 191 | + self.blocksWritten, delta, rateIn, rateOut)) |
158 | 192 | |
159 | | - def dequeue(self): |
160 | | - """Fetch the next completed block for writing.""" |
161 | | - assert self.threadLock.locked() |
162 | | - if len(self.queue) and self.queue[0].ready(): |
163 | | - return self.queue.pop(0) |
164 | | - else: |
165 | | - return None |
166 | | - |
167 | 193 | def writeBuffer(self, buffer): |
168 | 194 | """Write a buffer to the file. Currently requires that buffers |
169 | 195 | be processed in streaming order.""" |
170 | | - self.debug(3, "writeBuffer: writing block %d" % buffer.index) |
171 | | - assert buffer.contents is not None |
172 | | - assert buffer.index == self.writeCount |
173 | | - self.bytesWritten += len(buffer.contents) |
174 | | - self.outputStream.write(buffer.contents) |
| 196 | + self.blocksWritten += 1 |
| 197 | + self.bytesWritten += len(buffer.output) |
| 198 | + self.debug(2, "writeBuffer: writing block %d (%d blocks, %d bytes)" % |
| 199 | + (buffer.index, self.blocksWritten, self.bytesWritten)) |
| 200 | + |
| 201 | + assert buffer.output is not None |
| 202 | + assert buffer.index == self.blocksWritten |
| 203 | + self.outputStream.write(buffer.output) |
175 | 204 | |
176 | 205 | |
177 | | - def remoteThread(self, block, buffer): |
| 206 | + def compressorThread(self, compressor): |
178 | 207 | """Worker thread: send a block to a foreign server and receive data.""" |
179 | | - compressor = self.pickCompressor() |
180 | | - data = compressor.compress(block) |
181 | | - self.debug(4, "remoteThread: got data!") |
182 | | - buffer.set(data) |
183 | | - # After it's been fulfilled, the writer thread will dequeue and write it. |
184 | | - |
185 | | - def pickCompressor(self): |
186 | | - numRemotes = len(self.remotes) |
187 | | - if numRemotes: |
188 | | - index = random.randint(0, numRemotes - 1) |
189 | | - return RemoteCompressor(self.remotes[index]) |
190 | | - else: |
191 | | - # Need to take a local... |
192 | | - return LocalCompressor() |
| 208 | + self.debug(3, "compressorThread: Started") |
| 209 | + blocksCompressed = 0 |
| 210 | + bytesRead = 0L |
| 211 | + bytesWritten = 0L |
| 212 | + startTime = time.time() |
| 213 | + |
| 214 | + while not (self.done and self.blocksCompressed == self.blocksRead): |
| 215 | + self.lock() |
| 216 | + buffer = self.dequeueInput() |
| 217 | + self.unlock() |
| 218 | + if buffer: |
| 219 | + self.debug(4, "compressorThread: compressing") |
| 220 | + data = compressor.compress(buffer.input) |
| 221 | + |
| 222 | + self.lock() |
| 223 | + buffer.set(data) |
| 224 | + |
| 225 | + self.blocksCompressed += 1 |
| 226 | + blocksCompressed += 1 |
| 227 | + bytesRead += len(buffer.input) |
| 228 | + bytesWritten += len(buffer.output) |
| 229 | + |
| 230 | + self.debug(4, "compressorThread: compressed %d blocks" % self.blocksCompressed) |
| 231 | + self.unlock() |
| 232 | + else: |
| 233 | + self.debug(4, "compressorThread: no input, sleeping") |
| 234 | + self.sleep() |
| 235 | + compressor.close() |
| 236 | + |
| 237 | + delta = time.time() - startTime |
| 238 | + megabyte = 1024.0 * 1024.0 |
| 239 | + rateIn = (float(bytesRead) / megabyte) / delta |
| 240 | + rateOut = (float(bytesWritten) / megabyte) / delta |
| 241 | + self.debug(1, "%s: processed %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % ( |
| 242 | + compressor, blocksCompressed, delta, rateIn, rateOut)) |
193 | 243 | |
194 | 244 | |
195 | 245 | class QueuedBuffer(object): |
196 | 246 | """Placeholder for received compressed buffer items.""" |
197 | 247 | |
198 | | - def __init__(self, index): |
| 248 | + def __init__(self, index, input): |
199 | 249 | """Initialize an empty placeholder, no data yet.""" |
200 | | - self.contents = None |
201 | 250 | self.index = index |
| 251 | + self.input = input |
| 252 | + self.output = None |
202 | 253 | |
203 | 254 | def ready(self): |
204 | | - return self.contents is not None |
| 255 | + return self.output is not None |
205 | 256 | |
206 | 257 | def set(self, data): |
207 | 258 | """Store data and declare that we're ready to be flushed out.""" |
208 | | - assert self.contents is None |
| 259 | + assert self.output is None |
209 | 260 | assert data is not None |
210 | | - self.contents = data |
| 261 | + self.output = data |
211 | 262 | |
212 | 263 | |
213 | 264 | class LocalCompressor(object): |
— | — | @@ -220,10 +271,14 @@ |
221 | 272 | |
222 | 273 | def close(self): |
223 | 274 | pass |
| 275 | + |
| 276 | + def __str__(self): |
| 277 | + return "local thread" |
224 | 278 | |
225 | 279 | class RemoteCompressor(object): |
226 | 280 | def __init__(self, address): |
227 | 281 | """Address is a (host, port) tuple.""" |
| 282 | + self.address = address |
228 | 283 | self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
229 | 284 | self.socket.connect(address) |
230 | 285 | self.connection = DistBits.Connection(self.socket) |
— | — | @@ -245,6 +300,9 @@ |
246 | 301 | self.connection.close() |
247 | 302 | self.socket.close() |
248 | 303 | self.connection = None |
| 304 | + |
| 305 | + def __str__(self): |
| 306 | + return self.address[0] + ":" + str(self.address[1]) |
249 | 307 | |
250 | 308 | if __name__ == "__main__": |
251 | 309 | compressor = Compressor(sys.argv[1:]) |