r90195 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r90194‎ | r90195 | r90196 >
Date:07:10, 16 June 2011
Author:halfak
Status:deferred
Tags:
Comment:
new dump processors. ready for testing
Modified paths:
  • /trunk/tools/wsor/scripts/page_process.py (added) (history)
  • /trunk/tools/wsor/scripts/process_dumps.py (added) (history)
  • /trunk/tools/wsor/scripts/revision_diff_process.py (added) (history)
  • /trunk/tools/wsor/scripts/test_lock_speed.py (modified) (history)

Diff [purge]

Index: trunk/tools/wsor/scripts/page_process.py
@@ -0,0 +1,2 @@
 2+def process(page, out):
 3+ out.put([page.getId(), page.getTitle()])
Index: trunk/tools/wsor/scripts/revision_diff_process.py
@@ -0,0 +1,99 @@
 2+import sys, subprocess, os, errno, re, argparse, logging, hashlib, types
 3+from difflib import SequenceMatcher
 4+from gl.containers import LimitedDictLists
 5+
 6+from text import STOP_WORDS, MARKUP
 7+
 8+
 9+def tokenize(text):
 10+ return re.findall(
 11+ r"[\w]+|\[\[|\]\]|\{\{|\}\}|\n+| +|&\w+;|'''|''|=+|\{\||\|\}|\|\-|.",
 12+ text
 13+ )
 14+
 15+def simpleDiff(a, b):
 16+ sm = SequenceMatcher(None, a, b)
 17+ added = []
 18+ removed = []
 19+ for (tag, i1, i2, j1, j2) in sm.get_opcodes():
 20+ if tag == 'replace':
 21+ removed.extend(a[i1:i2])
 22+ added.extend(b[j1:j2])
 23+ elif tag == 'delete':
 24+ removed.extend(a[i1:i2])
 25+ elif tag == 'insert':
 26+ added.extend(b[i1:i2])
 27+
 28+ return (added, removed)
 29+
 30+
 31+
 32+def process(page, output):
 33+ recentRevs = LimitedDictLists(maxsize=15)
 34+ lastTokens = []
 35+ for revision in page.readRevisions():
 36+ checksum = hashlib.md5(revision.getText().encode("utf-8")).hexdigest()
 37+ if checksum in recentRevs:
 38+ #found a revert
 39+ revertedToRev = recentRevs[checksum]
 40+
 41+ #get the revisions that were reverted
 42+ revertedRevs = [r for (c, r) in recentRevs if r.getId() > revertedToRev.getId()]
 43+
 44+ #write revert row
 45+ revert.write(
 46+ ['revert']+
 47+ [
 48+ revision.getId(),
 49+ revertedToRev.getId(),
 50+ len(revertedRevs)
 51+ ]
 52+ )
 53+
 54+ for rev in revertedRevs:
 55+ out.push(
 56+ ['reverted']+
 57+ [
 58+ rev.getId(),
 59+ revision.getId(),
 60+ revertedToRev.getId(),
 61+ len(revertedRevs)
 62+ ]
 63+ )
 64+ else:
 65+ pass
 66+
 67+ tokens = tokenize(revision.getText())
 68+
 69+ tokensAdded, tokensRemoved = simpleDiff(lastTokens, tokens)
 70+
 71+ row = {
 72+ 'rev_id': revision.getId(),
 73+ 'checksum': checksum,
 74+ 'tokens': len(revision.getText()),
 75+ 'cs_added': 0,
 76+ 'cs_removed': 0,
 77+ 'ts_added': 0,
 78+ 'ts_removed': 0,
 79+ 'ws_added': 0,
 80+ 'ws_removed': 0,
 81+ 'ms_added': 0,
 82+ 'ms_removed': 0
 83+ }
 84+ for token in tokensAdded:
 85+ row['ts_added'] += 1
 86+ row['cs_added'] += len(token)
 87+ if token.strip() == '': pass
 88+ if token in MARKUP: row['ms_added'] += 1
 89+ elif token not in STOP_WORDS: row['ws_added'] += 1
 90+ for token in tokensRemoved:
 91+ row['ts_removed'] += 1
 92+ row['cs_removed'] += len(token)
 93+ if token.strip() == '': pass
 94+ if token in MARKUP: row['ms_removed'] += 1
 95+ elif token not in STOP_WORDS: row['ws_removed'] += 1
 96+
 97+
 98+ output.pushRow(['meta']+[row[h] for h in metaHeaders])
 99+
 100+ lastTokens = tokens
Index: trunk/tools/wsor/scripts/process_dumps.py
@@ -0,0 +1,156 @@
 2+import sys, logging, re, types, argparse, os
 3+from multiprocessing import Process, Queue, Lock, cpu_count
 4+from Queue import Empty
 5+from gl import wp
 6+
 7+class FileTypeError(Exception):pass
 8+
 9+def encode(v):
 10+ if type(v) == types.FloatType:
 11+ return str(int(v))
 12+ elif v == None:
 13+ return "\\N"
 14+ else:
 15+ return repr(v)
 16+
 17+
 18+
 19+class SafeOutput:
 20+
 21+ def __init__(self, fp):
 22+ self.fp = fp
 23+ self.l = Lock()
 24+
 25+ def push(self, row, encode=encode):
 26+ if __debug__:
 27+ row = tuple(row)
 28+
 29+ with self.l:
 30+ self.fp.write("\t".join(clean(v) for v in row) + "\n")
 31+
 32+class Processor(Process):
 33+
 34+ def __init__(self, fileQueue, process, output, logger):
 35+ self.fileQueue = fileQueue
 36+ self.process = process
 37+ self.output = output
 38+ self.logger = logger
 39+ Process.__init__(self)
 40+
 41+ def start(self):
 42+ try:
 43+ while True:
 44+ fn = self.fileQueue.get(block=False)
 45+ self.logger.info("Processing dump file %s." % fn)
 46+ dump = wp.dump.Iterator(fn)
 47+ for page in dump:
 48+ self.logger.debug("Processing dump file %s." % fn)
 49+ try:
 50+ self.process(page, output)
 51+ except Exception as e:
 52+ self.logger.error(
 53+ "Failed to process page %s:%s - %s" % (
 54+ page.getId(),
 55+ page.getTitle(),
 56+ e
 57+ )
 58+ )
 59+
 60+ except Empty:
 61+ self.logger.info("Nothing left to do. Shutting down thread.")
 62+ except Exception as e:
 63+ raise e
 64+
 65+
 66+def main(args):
 67+ LOGGING_STREAM = sys.stderr
 68+ if __debug__: level = logging.DEBUG
 69+ else: level = logging.INFO
 70+ logging.basicConfig(
 71+ level=level,
 72+ stream=LOGGING_STREAM,
 73+ format='%(asctime)s %(levelname)-8s %(message)s',
 74+ datefmt='%b-%d %H:%M:%S'
 75+ )
 76+ logging.info("Starting dump processor with %s threads." % args.threads)
 77+
 78+ dumpQueue = dumpFiles(args.dump)
 79+ output = SafeOutput(args.out)
 80+ processors = []
 81+ for i in range(0, min(args.threads, len(args.dump))):
 82+ p = Processor(
 83+ dumpQueue,
 84+ args.processor.process,
 85+ output,
 86+ logging.getLogger("Process %s" % i)
 87+ )
 88+ processors.append(p)
 89+
 90+ try:
 91+ for i in processors:
 92+ processor.join()
 93+
 94+ except KeyboardInterrupt:
 95+ logging
 96+
 97+
 98+
 99+
 100+EXTENSIONS = {
 101+ 'xml': "cat",
 102+ 'bz2': "bzcat",
 103+ '7z': "7z e -so"
 104+}
 105+
 106+EXT_RE = re.compile(r'\.([^\.]+)$')
 107+def dumpFile(path):
 108+ path = os.path.expanduser(path)
 109+ if not os.path.isfile(path):
 110+ raise FileTypeError("Can't find file %s" % path)
 111+
 112+ match = EXT_RE.search(path)
 113+ if match == None:
 114+ raise FileTypeError("No extension found for %s." % path)
 115+ elif match.groups()[0] not in EXTENSIONS:
 116+ raise FileTypeError("File type %r is not supported." % path)
 117+ else:
 118+ return path
 119+
 120+def dumpFiles(paths):
 121+ q = Queue()
 122+ for path in paths: q.put(dumpFile(path))
 123+ return q
 124+
 125+
 126+if __name__ == "__main__":
 127+ parser = argparse.ArgumentParser(
 128+ description='Maps a function across pages of MediaWiki dump files'
 129+ )
 130+ parser.add_argument(
 131+ '-o', '--out',
 132+ metavar="<path>",
 133+ type=lambda path:open(path, "w"),
 134+ help='the path to an output file to write putput to (defaults to stdout)',
 135+ default=sys.stdout
 136+ )
 137+ parser.add_argument(
 138+ '-t', '--threads',
 139+ metavar="",
 140+ type=int,
 141+ help='the number of threads to start (defaults to # of cores -1)',
 142+ default=cpu_count()-1
 143+ )
 144+ parser.add_argument(
 145+ 'processor',
 146+ type=__import__,
 147+ help='the class path to the function to use to process each page'
 148+ )
 149+ parser.add_argument(
 150+ 'dump',
 151+ type=dumpFile,
 152+ help='the XML dump file(s) to process',
 153+ nargs="+"
 154+ )
 155+ args = parser.parse_args()
 156+ main(args)
 157+
Index: trunk/tools/wsor/scripts/test_lock_speed.py
@@ -3,7 +3,7 @@
44
55 num = 0
66 start = time.time()
7 -for i in range(0, 1000):
 7+for i in range(0, 10000):
88 num = sum(n for n in range(0, i))
99
1010 print("Without lock took %s seconds" % (time.time()-start))
@@ -11,7 +11,7 @@
1212 num = 0
1313 l = Lock()
1414 start = time.time()
15 -for i in range(0, 1000):
 15+for i in range(0, 10000):
1616 l.acquire()
1717 num = sum(n for n in range(0, i))
1818 l.release()

Status & tagging log