Index: trunk/tools/dbzip2/dbzip2 |
— | — | @@ -20,7 +20,6 @@ |
21 | 21 | # TODO: |
22 | 22 | # Remote compressors |
23 | 23 | # Use a thread pool? |
24 | | -# Selectable number of threads |
25 | 24 | # Selectable remote threads |
26 | 25 | # Selectable block size |
27 | 26 | # Handle remote failures gracefully |
— | — | @@ -28,30 +27,38 @@ |
29 | 28 | # Accept file input/output, behavior like bzip2 |
30 | 29 | |
31 | 30 | import bz2 |
| 31 | +import getopt |
32 | 32 | import sys |
33 | 33 | import thread |
34 | 34 | import time |
35 | 35 | |
36 | 36 | class Compressor(object): |
37 | | - def __init__(self, inputStream, outputStream): |
38 | | - self.inputStream = inputStream |
39 | | - self.outputStream = outputStream |
| 37 | + def __init__(self, args): |
| 38 | + self.inputStream = sys.stdin |
| 39 | + self.outputStream = sys.stdout |
40 | 40 | self.blockSize = 900000 # 900k default blocksize |
41 | 41 | self.queue = [] |
42 | 42 | self.done = False |
43 | | - self.maxQueued = 2 |
| 43 | + self.threads = 1 |
44 | 44 | self.readCount = 0 |
45 | 45 | self.writeCount = 0 |
46 | 46 | self.verbosity = 0 |
47 | 47 | self.threadLock = thread.allocate_lock() |
| 48 | + self.processArgs(args) |
48 | 49 | |
| 50 | + def processArgs(self, args): |
| 51 | + (options, remainder) = getopt.getopt(args, "vp:") |
| 52 | + for (opt, val) in options: |
| 53 | + if opt == "-v": |
| 54 | + self.verbosity += 1 |
| 55 | + elif opt == "-p": |
| 56 | + self.threads = int(val) |
| 57 | + |
49 | 58 | def debug(self, level, text): |
50 | 59 | if self.verbosity >= level: |
51 | 60 | sys.stderr.write(text + "\n") |
52 | 61 | |
53 | 62 | def run(self): |
54 | | - #thread.start_new_thread(self.writerThread, ()) |
55 | | - #self.readerThread() |
56 | 63 | thread.start_new_thread(self.readerThread, ()) |
57 | 64 | self.writerThread() |
58 | 65 | |
— | — | @@ -93,7 +100,7 @@ |
94 | 101 | |
95 | 102 | def ready(self): |
96 | 103 | """Check if we've gone over the limit of waiting connections.""" |
97 | | - return len(self.queue) < self.maxQueued |
| 104 | + return len(self.queue) < self.threads |
98 | 105 | |
99 | 106 | def dispatch(self, block): |
100 | 107 | """Queue a block of data for remote compression.""" |
— | — | @@ -184,5 +191,5 @@ |
185 | 192 | return bz2.compress(self.input) |
186 | 193 | |
187 | 194 | if __name__ == "__main__": |
188 | | - compressor = Compressor(sys.stdin, sys.stdout) |
| 195 | + compressor = Compressor(sys.argv[1:]) |
189 | 196 | compressor.run() |