Index: trunk/tools/dbzip2/dbzip2 |
— | — | @@ -37,13 +37,18 @@ |
38 | 38 | self.inputStream = sys.stdin |
39 | 39 | self.outputStream = sys.stdout |
40 | 40 | self.blockSize = 900000 # 900k default blocksize |
41 | | - self.queue = [] |
42 | | - self.done = False |
43 | 41 | self.threads = 1 |
| 42 | + self.verbosity = 0 |
| 43 | + |
44 | 44 | self.readCount = 0 |
45 | 45 | self.writeCount = 0 |
46 | | - self.verbosity = 0 |
| 46 | + self.bytesRead = 0L |
| 47 | + self.bytesWritten = 0L |
| 48 | + |
| 49 | + self.queue = [] |
| 50 | + self.done = False |
47 | 51 | self.threadLock = thread.allocate_lock() |
| 52 | + |
48 | 53 | self.processArgs(args) |
49 | 54 | |
50 | 55 | def processArgs(self, args): |
— | — | @@ -75,7 +80,7 @@ |
76 | 81 | |
77 | 82 | def readerThread(self): |
78 | 83 | """Producer thread: run through the file handing out blocks.""" |
79 | | - self.debug(1, "readerThread: starting!") |
| 84 | + self.debug(2, "readerThread: starting!") |
80 | 85 | block = self.nextBlock() |
81 | 86 | while block: |
82 | 87 | while not self.ready(): |
— | — | @@ -84,17 +89,17 @@ |
85 | 90 | |
86 | 91 | self.lock() |
87 | 92 | self.readCount += 1 |
88 | | - self.debug(1, "readerThread: dispatching block %d" % self.readCount) |
| 93 | + self.debug(2, "readerThread: dispatching block %d" % self.readCount) |
89 | 94 | self.dispatch(block) |
90 | 95 | self.unlock() |
91 | 96 | |
92 | 97 | block = self.nextBlock() |
93 | 98 | self.done = True |
94 | | - self.debug(1, "readerThread: done; read %d blocks" % self.readCount) |
95 | | - # FIXME: we're not _totally_ done until it's processed. |
| 99 | + self.debug(2, "readerThread: done; read %d blocks" % self.readCount) |
96 | 100 | |
97 | 101 | def nextBlock(self): |
98 | 102 | buffer = self.inputStream.read(self.blockSize) |
| 103 | + self.bytesRead += len(buffer) |
99 | 104 | self.debug(3, "nextBlock: %d" % len(buffer)) |
100 | 105 | return buffer |
101 | 106 | |
— | — | @@ -114,7 +119,8 @@ |
115 | 120 | """Consumer thread: as we receive compressed blocks from the |
116 | 121 | distributed compressors, write them to the output file. |
117 | 122 | Currently only writes blocks in order.""" |
118 | | - self.debug(1, "writerThread: starting") |
| 123 | + self.debug(2, "writerThread: starting") |
| 124 | + startTime = time.time() |
119 | 125 | while not (self.done and self.writeCount == self.readCount): |
120 | 126 | self.lock() |
121 | 127 | buffer = self.dequeue() |
— | — | @@ -122,12 +128,17 @@ |
123 | 129 | |
124 | 130 | if buffer: |
125 | 131 | self.writeCount += 1 |
126 | | - self.debug(1, "writerThread: writing block %d" % self.writeCount) |
| 132 | + self.debug(2, "writerThread: writing block %d" % self.writeCount) |
127 | 133 | self.writeBuffer(buffer) |
128 | 134 | else: |
129 | | - self.debug(3, "writerThread: sleeping") |
| 135 | + self.debug(4, "writerThread: sleeping") |
130 | 136 | self.sleep() |
131 | | - self.debug(1, "writerThread: done; wrote %d blocks" % self.writeCount) |
| 137 | + delta = time.time() - startTime |
| 138 | + megabyte = 1024.0 * 1024.0 |
| 139 | + rateIn = (float(self.bytesRead) / megabyte) / delta |
| 140 | + rateOut = (float(self.bytesWritten) / megabyte) / delta |
| 141 | + self.debug(1, "Wrote %d blocks in %0.1f seconds (%0.3f MB/s in, %0.3f MB/s out)" % ( |
| 142 | + self.writeCount, delta, rateIn, rateOut)) |
132 | 143 | |
133 | 144 | def dequeue(self): |
134 | 145 | """Fetch the next completed block for writing.""" |
— | — | @@ -140,9 +151,10 @@ |
141 | 152 | def writeBuffer(self, buffer): |
142 | 153 | """Write a buffer to the file. Currently requires that buffers |
143 | 154 | be processed in streaming order.""" |
144 | | - self.debug(1, "writeBuffer: writing block %d" % buffer.index) |
| 155 | + self.debug(3, "writeBuffer: writing block %d" % buffer.index) |
145 | 156 | assert buffer.contents is not None |
146 | 157 | assert buffer.index == self.writeCount |
| 158 | + self.bytesWritten += len(buffer.contents) |
147 | 159 | self.outputStream.write(buffer.contents) |
148 | 160 | |
149 | 161 | |