r90365 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r90364‎ | r90365 | r90366 >
Date:21:07, 18 June 2011
Author:halfak
Status:deferred
Tags:
Comment:
Working dump processor
Modified paths:
  • /trunk/tools/wsor/scripts/page_process.py (modified) (history)
  • /trunk/tools/wsor/scripts/process_dumps.py (modified) (history)

Diff [purge]

Index: trunk/tools/wsor/scripts/page_process.py
@@ -1,2 +1,2 @@
2 -def process(page, out):
3 - out.put([page.getId(), page.getTitle()])
 2+def process(dump, page):
 3+ yield (page.getId(), page.getTitle())
Index: trunk/tools/wsor/scripts/process_dumps.py
@@ -1,5 +1,5 @@
2 -import sys, logging, re, types, argparse, os
3 -from multiprocessing import Process, Queue, Lock, cpu_count
 2+import sys, logging, re, types, argparse, os, subprocess
 3+from multiprocessing import Process, Queue, Lock, cpu_count, Value
44 from Queue import Empty
55 from gl import wp
66
@@ -30,23 +30,26 @@
3131
3232 class Processor(Process):
3333
34 - def __init__(self, fileQueue, process, output, logger):
35 - self.fileQueue = fileQueue
36 - self.process = process
37 - self.output = output
38 - self.logger = logger
 34+ def __init__(self, input, processPage, output, callback, logger):
 35+ self.input = input
 36+ self.processPage = processPage
 37+ self.output = output
 38+ self.callback = callback
 39+ self.logger = logger
3940 Process.__init__(self)
4041
41 - def start(self):
 42+ def run(self):
4243 try:
4344 while True:
44 - fn = self.fileQueue.get(block=False)
 45+ foo = self.input.qsize()
 46+ fn = self.input.get(block=False)
4547 self.logger.info("Processing dump file %s." % fn)
46 - dump = wp.dump.Iterator(fn)
47 - for page in dump:
48 - self.logger.debug("Processing dump file %s." % fn)
 48+ dump = wp.dump.Iterator(openDumpFile(fn))
 49+ for page in dump.readPages():
 50+ self.logger.debug("Processing page %s:%s." % (page.getId(), page.getTitle()))
4951 try:
50 - self.process(page, output)
 52+ for out in self.processPage(dump, page):
 53+ self.output.put(out)
5154 except Exception as e:
5255 self.logger.error(
5356 "Failed to process page %s:%s - %s" % (
@@ -55,11 +58,16 @@
5659 e
5760 )
5861 )
 62+
 63+
5964
 65+
6066 except Empty:
6167 self.logger.info("Nothing left to do. Shutting down thread.")
62 - except Exception as e:
63 - raise e
 68+ finally:
 69+ self.callback()
 70+
 71+
6472
6573
6674 def main(args):
@@ -69,37 +77,49 @@
7078 logging.basicConfig(
7179 level=level,
7280 stream=LOGGING_STREAM,
73 - format='%(asctime)s %(levelname)-8s %(message)s',
 81+ format='%(name)s: %(asctime)s %(levelname)-8s %(message)s',
7482 datefmt='%b-%d %H:%M:%S'
7583 )
76 - logging.info("Starting dump processor with %s threads." % args.threads)
 84+ logging.info("Starting dump processor with %s threads." % min(args.threads, len(args.dump)))
 85+ for row in process_dumps(args.dump, args.processor.process, args.threads):
 86+ print('\t'.join(encode(v) for v in row))
 87+
 88+def process_dumps(dumps, processPage, threads):
 89+ input = dumpFiles(dumps)
 90+ output = Queue(maxsize=10000)
 91+ running = Value('i', 0)
7792
78 - dumpQueue = dumpFiles(args.dump)
79 - output = SafeOutput(args.out)
80 - processors = []
81 - for i in range(0, min(args.threads, len(args.dump))):
82 - p = Processor(
83 - dumpQueue,
84 - args.processor.process,
 93+ def dec(): running.value -= 1
 94+
 95+ for i in range(0, min(threads, input.qsize())):
 96+ running.value += 1
 97+ Processor(
 98+ input,
 99+ processPage,
85100 output,
 101+ dec,
86102 logging.getLogger("Process %s" % i)
87 - )
88 - processors.append(p)
89 -
 103+ ).start()
 104+
 105+
 106+ #output while processes are running
 107+ while running.value > 0:
 108+ try: yield output.get(timeout=.25)
 109+ except Empty: pass
 110+
 111+ #finish yielding output buffer
90112 try:
91 - for i in processors:
92 - processor.join()
93 -
94 - except KeyboardInterrupt:
95 - logging
96 -
 113+ while True: yield output.get(block=False)
 114+ except Empty:
 115+ pass
97116
98117
99118
100119 EXTENSIONS = {
101120 'xml': "cat",
102121 'bz2': "bzcat",
103 - '7z': "7z e -so"
 122+ '7z': "7z e -so 2>/dev/null",
 123+ 'lzma':"lzcat"
104124 }
105125
106126 EXT_RE = re.compile(r'\.([^\.]+)$')
@@ -120,6 +140,16 @@
121141 q = Queue()
122142 for path in paths: q.put(dumpFile(path))
123143 return q
 144+
 145+def openDumpFile(path):
 146+ match = EXT_RE.search(path)
 147+ ext = match.groups()[0]
 148+ p = subprocess.Popen(
 149+ "%s %s" % (EXTENSIONS[ext], path),
 150+ shell=True,
 151+ stdout=subprocess.PIPE
 152+ )
 153+ return p.stdout
124154
125155
126156 if __name__ == "__main__":
@@ -153,4 +183,4 @@
154184 )
155185 args = parser.parse_args()
156186 main(args)
157 -
 187+

Status & tagging log