Index: trunk/tools/wsor/scripts/page_process.py |
— | — | @@ -1,2 +1,2 @@ |
2 | | -def process(page, out):
|
3 | | - out.put([page.getId(), page.getTitle()])
|
| 2 | +def process(dump, page):
|
| 3 | + yield (page.getId(), page.getTitle())
|
Index: trunk/tools/wsor/scripts/process_dumps.py |
— | — | @@ -1,5 +1,5 @@ |
2 | | -import sys, logging, re, types, argparse, os |
3 | | -from multiprocessing import Process, Queue, Lock, cpu_count |
| 2 | +import sys, logging, re, types, argparse, os, subprocess |
| 3 | +from multiprocessing import Process, Queue, Lock, cpu_count, Value |
4 | 4 | from Queue import Empty |
5 | 5 | from gl import wp |
6 | 6 | |
— | — | @@ -30,23 +30,26 @@ |
31 | 31 | |
32 | 32 | class Processor(Process): |
33 | 33 | |
34 | | - def __init__(self, fileQueue, process, output, logger): |
35 | | - self.fileQueue = fileQueue |
36 | | - self.process = process |
37 | | - self.output = output |
38 | | - self.logger = logger |
| 34 | + def __init__(self, input, processPage, output, callback, logger): |
| 35 | + self.input = input |
| 36 | + self.processPage = processPage |
| 37 | + self.output = output |
| 38 | + self.callback = callback |
| 39 | + self.logger = logger |
39 | 40 | Process.__init__(self) |
40 | 41 | |
41 | | - def start(self): |
| 42 | + def run(self): |
42 | 43 | try: |
43 | 44 | while True: |
44 | | - fn = self.fileQueue.get(block=False) |
| 45 | + foo = self.input.qsize() |
| 46 | + fn = self.input.get(block=False) |
45 | 47 | self.logger.info("Processing dump file %s." % fn) |
46 | | - dump = wp.dump.Iterator(fn) |
47 | | - for page in dump: |
48 | | - self.logger.debug("Processing dump file %s." % fn) |
| 48 | + dump = wp.dump.Iterator(openDumpFile(fn)) |
| 49 | + for page in dump.readPages(): |
| 50 | + self.logger.debug("Processing page %s:%s." % (page.getId(), page.getTitle())) |
49 | 51 | try: |
50 | | - self.process(page, output) |
| 52 | + for out in self.processPage(dump, page): |
| 53 | + self.output.put(out) |
51 | 54 | except Exception as e: |
52 | 55 | self.logger.error( |
53 | 56 | "Failed to process page %s:%s - %s" % ( |
— | — | @@ -55,11 +58,16 @@ |
56 | 59 | e |
57 | 60 | ) |
58 | 61 | ) |
| 62 | + |
| 63 | + |
59 | 64 | |
| 65 | + |
60 | 66 | except Empty: |
61 | 67 | self.logger.info("Nothing left to do. Shutting down thread.") |
62 | | - except Exception as e: |
63 | | - raise e |
| 68 | + finally: |
| 69 | + self.callback() |
| 70 | + |
| 71 | + |
64 | 72 | |
65 | 73 | |
66 | 74 | def main(args): |
— | — | @@ -69,37 +77,49 @@ |
70 | 78 | logging.basicConfig( |
71 | 79 | level=level, |
72 | 80 | stream=LOGGING_STREAM, |
73 | | - format='%(asctime)s %(levelname)-8s %(message)s', |
| 81 | + format='%(name)s: %(asctime)s %(levelname)-8s %(message)s', |
74 | 82 | datefmt='%b-%d %H:%M:%S' |
75 | 83 | ) |
76 | | - logging.info("Starting dump processor with %s threads." % args.threads) |
| 84 | + logging.info("Starting dump processor with %s threads." % min(args.threads, len(args.dump))) |
| 85 | + for row in process_dumps(args.dump, args.processor.process, args.threads): |
| 86 | + print('\t'.join(encode(v) for v in row)) |
| 87 | + |
| 88 | +def process_dumps(dumps, processPage, threads): |
| 89 | + input = dumpFiles(dumps) |
| 90 | + output = Queue(maxsize=10000) |
| 91 | + running = Value('i', 0) |
77 | 92 | |
78 | | - dumpQueue = dumpFiles(args.dump) |
79 | | - output = SafeOutput(args.out) |
80 | | - processors = [] |
81 | | - for i in range(0, min(args.threads, len(args.dump))): |
82 | | - p = Processor( |
83 | | - dumpQueue, |
84 | | - args.processor.process, |
| 93 | + def dec(): running.value -= 1 |
| 94 | + |
| 95 | + for i in range(0, min(threads, input.qsize())): |
| 96 | + running.value += 1 |
| 97 | + Processor( |
| 98 | + input, |
| 99 | + processPage, |
85 | 100 | output, |
| 101 | + dec, |
86 | 102 | logging.getLogger("Process %s" % i) |
87 | | - ) |
88 | | - processors.append(p) |
89 | | - |
| 103 | + ).start() |
| 104 | + |
| 105 | + |
| 106 | + #output while processes are running |
| 107 | + while running.value > 0: |
| 108 | + try: yield output.get(timeout=.25) |
| 109 | + except Empty: pass |
| 110 | + |
| 111 | + #finish yielding output buffer |
90 | 112 | try: |
91 | | - for i in processors: |
92 | | - processor.join() |
93 | | - |
94 | | - except KeyboardInterrupt: |
95 | | - logging |
96 | | - |
| 113 | + while True: yield output.get(block=False) |
| 114 | + except Empty: |
| 115 | + pass |
97 | 116 | |
98 | 117 | |
99 | 118 | |
100 | 119 | EXTENSIONS = { |
101 | 120 | 'xml': "cat", |
102 | 121 | 'bz2': "bzcat", |
103 | | - '7z': "7z e -so" |
| 122 | + '7z': "7z e -so 2>/dev/null", |
| 123 | + 'lzma':"lzcat" |
104 | 124 | } |
105 | 125 | |
106 | 126 | EXT_RE = re.compile(r'\.([^\.]+)$') |
— | — | @@ -120,6 +140,16 @@ |
121 | 141 | q = Queue() |
122 | 142 | for path in paths: q.put(dumpFile(path)) |
123 | 143 | return q |
| 144 | + |
| 145 | +def openDumpFile(path): |
| 146 | + match = EXT_RE.search(path) |
| 147 | + ext = match.groups()[0] |
| 148 | + p = subprocess.Popen( |
| 149 | + "%s %s" % (EXTENSIONS[ext], path), |
| 150 | + shell=True, |
| 151 | + stdout=subprocess.PIPE |
| 152 | + ) |
| 153 | + return p.stdout |
124 | 154 | |
125 | 155 | |
126 | 156 | if __name__ == "__main__": |
— | — | @@ -153,4 +183,4 @@ |
154 | 184 | ) |
155 | 185 | args = parser.parse_args() |
156 | 186 | main(args) |
157 | | - |
| 187 | + |