r90557 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r90556‎ | r90557 | r90558 >
Date:22:47, 21 June 2011
Author:halfak
Status:deferred
Tags:
Comment:
added wikimedia utilities
Modified paths:
  • /trunk/tools/wsor/scripts/process_dumps.py (deleted) (history)
  • /trunk/tools/wsor/ts_samples/testing.sql (modified) (history)
  • /trunk/tools/wsor/wikimedia (added) (history)
  • /trunk/tools/wsor/wikimedia/setup.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/__init__.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/__init__.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/iterator.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/map.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/__init__.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/sample (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/sample.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/__init__.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/large.xml.lzma (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/small.xml.lzma (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/test.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/test_iterator.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/tests/test_map.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/dump/xml_iterator.py (added) (history)
  • /trunk/tools/wsor/wikimedia/wmf/util.py (added) (history)

Diff [purge]

Index: trunk/tools/wsor/wikimedia/setup.py
@@ -0,0 +1,27 @@
 2+
 3+from setuptools import setup, find_packages
 4+
 5+setup(
 6+ name='util',
 7+ version='1.0',
 8+ description="WMF utilities",
 9+ long_description="""
 10+ A set of utilities originally authored by Aaron Halfaker
 11+ during the 2011 Wikimedia Summer of Research. The utilities
 12+ in this package are intended to aid in processing of
 13+ MediaWiki data related to Wikimedia projects. Many of the
 14+ utilities have been specifically designed to allow
 15+ processing of the massive about of data (currently) found
 16+ in the full history dump of the English Wikipedia
 17+ """
 18+ author='Aaron Halfaker',
 19+ author_email='aaron.halfaker@gmail.com',
 20+ url='http://meta.wikimedia.org/wiki/User:EpochFail',
 21+ packages=find_packages(),
 22+ entry_points = {
 23+ 'distutils.commands': [
 24+ 'dump_map = util.dump.map:main',
 25+ ]
 26+ },
 27+
 28+)
Index: trunk/tools/wsor/wikimedia/wmf/util.py
@@ -0,0 +1,236 @@
 2+from __future__ import with_statement, absolute_import
 3+import re, types
 4+import time, calendar, datetime
 5+import hashlib
 6+import urllib
 7+
 8+__docformat__ = "restructuredtext en"
 9+
 10+"""
 11+This module contains utility functions for interacting with Wikipedia.
 12+"""
 13+
 14+LONG_WP_TIME_STRING = '%Y-%m-%dT%H:%M:%SZ'
 15+"""
 16+The longhand version of Wikipedia timestamps.
 17+"""
 18+
 19+SHORT_WP_TIME_STRING = '%Y%m%d%H%M%S'
 20+"""
 21+The shorthand version of Wikipedia timestamps
 22+"""
 23+
 24+WPAPI_URL = "http://%s.wikipedia.org/w/api.php"
 25+"""
 26+The wikipedia API URL. A positional format token is included to so that the
 27+language specific prefix can be formatted in. See `wpAPIURL()`.
 28+"""
 29+
 30+
 31+VLOOSE_RE = re.compile(r'''
 32+ (^revert\ to.+using)
 33+ | (^reverted\ edits\ by.+using)
 34+ | (^reverted\ edits\ by.+to\ last\ version\ by)
 35+ | (^bot\ -\ rv.+to\ last\ version\ by)
 36+ | (-assisted\ reversion)
 37+ | (^(revert(ed)?|rv).+to\ last)
 38+ | (^undo\ revision.+by)
 39+ ''', re.IGNORECASE | re.DOTALL | re.VERBOSE)
 40+
 41+VSTRICT_RE = re.compile(r'''
 42+ (\brvv)
 43+ | (\brv[/ ]v)
 44+ | (vandal(?!proof|bot))
 45+ | (\b(rv|rev(ert)?|rm)\b.*(blank|spam|nonsense|porn|mass\sdelet|vand))
 46+ ''', re.IGNORECASE | re.DOTALL | re.VERBOSE)
 47+
 48+NAMESPACES = {
 49+ 'en': set([
 50+ 'Media',
 51+ 'Special',
 52+ 'Talk',
 53+ 'User talk',
 54+ 'Wikipedia talk',
 55+ 'Image talk',
 56+ 'MediaWiki talk',
 57+ 'Template talk',
 58+ 'Help talk',
 59+ 'Category talk',
 60+ 'Portal talk',
 61+ 'File talk',
 62+ 'User',
 63+ 'Wikipedia',
 64+ 'Image',
 65+ 'MediaWiki',
 66+ 'Template',
 67+ 'Help',
 68+ 'Category',
 69+ 'Portal',
 70+ 'File'
 71+ ])
 72+}
 73+
 74+NAMESPACE_RE = re.compile(r'^((?:%s)):' % ')|(?:'.join(NAMESPACES['en']),
 75+ re.IGNORECASE)
 76+
 77+def wpAPIURL(prefix="en"):
 78+ """
 79+ Creates a the URL for the wikipedia API based on a language prefix.
 80+
 81+ :Parameters:
 82+ prefix : string
 83+ the prefix to be formatted into the url
 84+
 85+ :Return:
 86+ the Wikipedia API url for a given language prefix
 87+ """
 88+ return WPAPI_URL % prefix
 89+
 90+
 91+def wp2Timestamp(wpTime):
 92+ """
 93+ Converts a Wikipedia timestamp to a Unix Epoch-based timestamp (seconds
 94+ since Jan. 1st 1970 GMT). This function will handle both long
 95+ (see `LONG_WP_TIME_STRING`) and short (see `SHORT_WP_TIME_STRING`)
 96+ time formats.
 97+
 98+ :Parameters:
 99+ wpTime : string
 100+ Wikipedia timestamp to be converted
 101+
 102+ :Return:
 103+ integer Unix Epoch-based timestamp (seconds since Jan. 1st 1970
 104+ GMT) version of the provided wpTime.
 105+ """
 106+ try:
 107+ myTime = time.strptime(wpTime, LONG_WP_TIME_STRING)
 108+ except ValueError as e:
 109+ try:
 110+ myTime = time.strptime(wpTime, SHORT_WP_TIME_STRING)
 111+ except ValueError as e:
 112+ raise ValueError("'%s' is not a valid Wikipedia date format" % wpTime)
 113+
 114+ return int(calendar.timegm(myTime))
 115+
 116+def timestamp2WP(timestamp):
 117+ """
 118+ Converts a Unix Epoch-based timestamp (seconds since Jan. 1st 1970 GMT)
 119+ timestamp to one acceptable by Wikipedia.
 120+
 121+ :Parameters:
 122+ timestamp : int
 123+ Unix timestamp to be converted
 124+
 125+ :Return:
 126+ string Wikipedia style timestamp
 127+ """
 128+
 129+ return datetime.datetime.utcfromtimestamp(timestamp).strftime('%Y%m%d%H%M%S')
 130+
 131+def digest(content):
 132+ return hashlib.md5(content.encode("utf-8")).hexdigest()
 133+
 134+
 135+def normalize(name):
 136+ """
 137+ Normalizes text from a Wikipedia title/segment by capitalizing the
 138+ first letter, replacing underscores with spaces, and collapsing all
 139+ spaces to one space.
 140+
 141+ :Parameters:
 142+ name : string
 143+ Namespace or title portion of a Wikipedia page name.
 144+
 145+ :Return:
 146+ string Normalized text
 147+ """
 148+
 149+ return name.capitalize().replace("_", " ").strip()
 150+
 151+def normalizeTitle(title, namespaces=NAMESPACES['en']):
 152+ """
 153+ Normalizes a Wikipedia page title and splits the title into
 154+ namespace and title pieces.
 155+
 156+ :Parameters:
 157+ title : string
 158+ The title of a Wikipedia page.
 159+ namespaces : set
 160+ A set of namespaces to look for in the title.
 161+
 162+ :Return:
 163+ The namespace, title tuple
 164+ """
 165+
 166+ if type(title) == types.UnicodeType:
 167+ title = title.encode('utf-8')
 168+
 169+ title = title.strip()
 170+ parts = title.split(":", 1)
 171+ if len(parts) == 1:
 172+ namespace = None
 173+ title = normalize(parts[0])
 174+ elif parts[1] == '':
 175+ namespace = None
 176+ title = normalize(title)
 177+ else:
 178+ nsPart = normalize(parts[0])
 179+ if nsPart in namespaces:
 180+ namespace = nsPart
 181+ title = normalize(parts[1])
 182+ else:
 183+ namespace = None
 184+ title = normalize(title)
 185+
 186+ return (namespace, title)
 187+
 188+def normalizeURLTitle(title, namespaces=NAMESPACES['en']):
 189+ """
 190+ Normalizes a Wikipedia page title obtained from a URL and splits
 191+ the title into namespace and title pieces.
 192+
 193+ :Parameters:
 194+ title : string
 195+ The title of a Wikipedia page.
 196+ namespaces : set
 197+ A set of namespaces to look for in the title.
 198+
 199+ :Return:
 200+ The namespace, title tuple
 201+ """
 202+
 203+ if type(title) == types.UnicodeType:
 204+ title = title.encode('utf-8')
 205+ title = urllib.unquote(title).split('#')[0]
 206+ ns = NAMESPACE_RE.match(title)
 207+ if not ns:
 208+ namespace = ""
 209+ title = normalize(title)
 210+ else:
 211+ nsPart = ns.group(1).capitalize()
 212+ if nsPart in namespaces:
 213+ namespace = nsPart
 214+ title = normalize(title[ns.end():])
 215+ return (namespace, title)
 216+
 217+def isVandalismByComment(editComment, testLoose=True, testStrict=True):
 218+ '''
 219+ Check the given edit comment against the VLOOSE and VSTRICT regexes
 220+ as configured, and returns a boolean defining if it matches or not.
 221+
 222+ @param editComment: The edit comment to test.
 223+ @type editComment: str
 224+
 225+ @param testLoose: If the edit comment matches VLOOSE_RE, True is returned
 226+ @type testLoose: bool
 227+
 228+ @param testStrict: If the edit comment matches VSTRICT_RE, True is returned
 229+ @type testStrict: bool
 230+ '''
 231+
 232+ if testLoose and VLOOSE_RE.search(editComment):
 233+ return True;
 234+ if testStrict and VSTRICT_RE.search(editComment):
 235+ return True;
 236+
 237+ return False;
Index: trunk/tools/wsor/wikimedia/wmf/dump/iterator.py
@@ -0,0 +1,220 @@
 2+from xml_iterator import XMLIterator
 3+from ..util import wp2Timestamp
 4+
 5+def cleanTag(prefix, raw):
 6+ return raw[len(prefix):]
 7+
 8+
 9+class Iterator:
 10+ """
 11+ WikiFile dump processor. This class constructs with a filepointer to a
 12+ Wikipedia XML dump file.
 13+
 14+ """
 15+
 16+ def __init__(self, fp):
 17+ """
 18+ Constructor
 19+
 20+ :Parameters:
 21+ fp : file pointer
 22+ a file pointer to the xml file to process.
 23+ """
 24+
 25+ self.fp = fp #:The file pointer passed to the constructor
 26+ self.namespaces = {} #:A map of possible namespaces
 27+ self.siteName = None #:The name of the site
 28+ self.base = None #:Base of the xml file
 29+ self.generator = None #:Generator of the dump
 30+ self.case = None #:The default title case
 31+
 32+ self.mediawikiElement = XMLIterator(fp)
 33+ self.ns = self.mediawikiElement.tag[:-len('mediawiki')]
 34+
 35+ pageCount = 0
 36+ done = False
 37+ for element in self.mediawikiElement:
 38+ tag = cleanTag(self.ns, element.tag)
 39+ if tag == "siteinfo":
 40+ self.loadSiteInfo(element)
 41+ element.clear()
 42+ break
 43+
 44+
 45+
 46+ def loadSiteInfo(self, siteInfoElement):
 47+ for element in siteInfoElement:
 48+ tag = cleanTag(self.ns, element.tag)
 49+
 50+ if tag == 'sitename':
 51+ self.siteName = element.text
 52+ elif tag == 'base':
 53+ self.base = element.text
 54+ elif tag == 'generator':
 55+ self.generator = element.text
 56+ elif tag == 'case':
 57+ self.case = element.text
 58+ elif tag == 'namespaces':
 59+ self.loadNamespaces(element)
 60+ element.clear()
 61+
 62+
 63+
 64+ def loadNamespaces(self, namespacesElement):
 65+ for element in namespacesElement:
 66+ tag = cleanTag(self.ns, element.tag)
 67+
 68+ if tag == "namespace":
 69+ namespace = Namespace(element)
 70+ self.namespaces[namespace.getName()] = namespace.getId()
 71+ else:
 72+ assert False, "This should never happen"
 73+
 74+
 75+ def readPages(self):
 76+ for element in self.mediawikiElement:
 77+ tag = cleanTag(self.ns, element.tag)
 78+ if tag == "page":
 79+ yield Page(self.ns, element)
 80+
 81+
 82+
 83+
 84+class Namespace:
 85+
 86+ def __init__(self, nsElement):
 87+ self.setId(nsElement.get('key'))
 88+ self.setName(nsElement.text)
 89+
 90+ def setId(self, id): self.id = int(id)
 91+ def getId(self): return self.id
 92+
 93+ def setName(self, name):
 94+ if name == None:
 95+ self.name = None
 96+ else:
 97+ self.name = unicode(name)
 98+ def getName(self): return self.name
 99+
 100+ def __repr__(self):
 101+ return "%s(%r, %r)" % (
 102+ self.__class__.__name__,
 103+ self.getId(),
 104+ self.getName()
 105+ )
 106+
 107+ def __eq__(self, other):
 108+ try:
 109+ return (
 110+ self.getId() == other.getId() and
 111+ self.getName() == other.getName()
 112+ )
 113+ except AttributeError:
 114+ return False
 115+
 116+class Page:
 117+
 118+ def __init__(self, ns, pageElement):
 119+ self.id = None
 120+ self.title = None
 121+ self.pageElement = pageElement
 122+ self.ns = ns
 123+ for element in pageElement:
 124+ tag = cleanTag(ns, element.tag)
 125+ if tag == "id":
 126+ self.setId(element.text)
 127+ elif tag == "title":
 128+ self.setTitle(element.text)
 129+
 130+ if self.id != None and self.title != None:
 131+ break
 132+
 133+ def readRevisions(self):
 134+ for element in self.pageElement:
 135+ tag = cleanTag(self.ns, element.tag)
 136+ if tag == "revision":
 137+ yield Revision(self.ns, element)
 138+ #element.clear()
 139+
 140+
 141+
 142+ def setId(self, id): self.id = int(id)
 143+ def getId(self): return self.id
 144+
 145+ def setTitle(self, title): self.title = unicode(title)
 146+ def getTitle(self): return self.title
 147+
 148+
 149+
 150+class Revision:
 151+
 152+ TAG_MAP = {
 153+ 'id': lambda s,e:s.setId(e.text),
 154+ 'timestamp': lambda s,e:s.setTimestamp(e.text),
 155+ 'contributor': lambda s,e:s.setContributor(e),
 156+ 'minor': lambda s,e:s.setMinor(True),
 157+ 'comment': lambda s,e:s.setComment(e.text),
 158+ 'text': lambda s,e:s.setText(e.text)
 159+ }
 160+
 161+ def __init__(self, ns, revisionElement):
 162+ self.ns = ns
 163+ self.id = None
 164+ self.timestamp = None
 165+ self.contributor = None
 166+ self.minor = False #No tag means minor edit
 167+ self.comment = None
 168+ self.text = None
 169+ for element in revisionElement:
 170+ tag = cleanTag(ns, element.tag)
 171+ self.TAG_MAP[tag](self, element)
 172+
 173+ def setId(self, id): self.id = int(id)
 174+ def getId(self): return self.id
 175+
 176+ def setTimestamp(self, timestamp):
 177+ try: self.timestamp = int(timestamp)
 178+ except ValueError: self.timestamp = wp2Timestamp(timestamp)
 179+ def getTimestamp(self): return self.timestamp
 180+
 181+ def setContributor(self, element):
 182+ if element.get("deleted", None) == "deleted":
 183+ self.contributor = None
 184+ else:
 185+ self.contributor = Contributor(self.ns, element)
 186+
 187+ def getContributor(self): return self.contributor
 188+
 189+ def setMinor(self, minor): self.minor = minor == True
 190+ def getMinor(self): return self.minor
 191+
 192+ def setComment(self, comment): self.comment = unicode(comment)
 193+ def getComment(self): return self.comment
 194+
 195+ def setText(self, text):
 196+ if text == None: self.text = u''
 197+ else: self.text = unicode(text)
 198+ def getText(self): return self.text
 199+
 200+class Contributor:
 201+
 202+ TAG_MAP = {
 203+ 'id': lambda s,e:s.setId(e.text),
 204+ 'username': lambda s,e:s.setUsername(e.text),
 205+ 'ip': lambda s,e:s.setUsername(e.text)
 206+ }
 207+
 208+ def __init__(self, ns, contributorElement):
 209+ self.id = None
 210+ for element in contributorElement:
 211+ tag = cleanTag(ns, element.tag)
 212+ self.TAG_MAP[tag](self, element)
 213+
 214+ def setId(self, id): self.id = int(id)
 215+ def getId(self): return self.id
 216+
 217+ def setUsername(self, username): self.username = unicode(username)
 218+ def getUsername(self): return self.username
 219+
 220+
 221+
Index: trunk/tools/wsor/wikimedia/wmf/dump/xml_iterator.py
@@ -0,0 +1,76 @@
 2+try:
 3+ import xml.etree.cElementTree as etree
 4+except ImportError:
 5+ import xml.etree.ElementTree as etree
 6+
 7+def XMLIterator(fp):
 8+ xmlIterator = etree.iterparse(fp, events=("start","end"))
 9+ return ElementIterator(xmlIterator.next()[1], xmlIterator)
 10+
 11+class ElementIteratorError: pass
 12+
 13+class ElementIterator:
 14+
 15+ def __init__(self, element, xmlIterator):
 16+ self.element = element
 17+ self.xmlIterator = xmlIterator
 18+ self.tagStack = [self.element.tag]
 19+
 20+ def __iter__(self):
 21+ if len(self.tagStack) == 0:
 22+ raise ElementIteratorError("Element has already been iterated through.")
 23+
 24+ for event, element in self.xmlIterator:
 25+ if event == "start":
 26+ element = ElementIterator(element, self.xmlIterator)
 27+ yield element
 28+ element.clear()
 29+
 30+ else: #event == "end"
 31+ assert element.tag == self.element.tag, "Expected %r, got %r" % (self.element.tag, element.tag)
 32+ self.tagStack.pop()
 33+
 34+ if len(self.tagStack) == 0:
 35+ break
 36+
 37+
 38+ def get(self, key, alt=None):
 39+ return self.element.attrib.get(key, alt)
 40+
 41+
 42+ def complete(self):
 43+ if len(self.tagStack) != 0:
 44+ for event, element in self.xmlIterator:
 45+ if event == "start":
 46+ self.tagStack.append(element.tag)
 47+ element.clear()
 48+
 49+ else: #event == "end"
 50+ assert self.tagStack[-1] == element.tag, "Expected %r at the end of %r" % (element.tag, self.tagStack)
 51+ self.tagStack.pop()
 52+
 53+ if len(self.tagStack) == 0:
 54+ break
 55+
 56+
 57+ def clear(self):
 58+ self.complete()
 59+ self.element.clear()
 60+
 61+
 62+ def __del__(self):
 63+ self.clear()
 64+
 65+ def __getattr__(self, attr):
 66+ if attr == "attrib":
 67+ return self.element.attrib
 68+ elif attr == "tag":
 69+ return self.element.tag
 70+ elif attr == "tail":
 71+ return self.element.tail
 72+ elif attr == "text":
 73+ self.complete()
 74+ return self.element.text
 75+ else:
 76+ raise AttributeError("%s has no attribute %r" % (self.__class__.__name__, attr))
 77+
Index: trunk/tools/wsor/wikimedia/wmf/dump/tests/test_iterator.py
@@ -0,0 +1,81 @@
 2+import sys, logging
 3+from nose.tools import eq_
 4+from . import sample
 5+from ..iterator import Iterator, Namespace
 6+import util
 7+
 8+logging.basicConfig(level=logging.INFO)
 9+
 10+def test_small():
 11+ fp = sample.getSmallXMLFilePointer()
 12+ wf = Iterator(fp)
 13+ for key in [
 14+ -2, -1, 0, 1, 2, 3, 4, 5, 6,
 15+ 7, 8, 9, 10, 11, 12, 13, 14, 15,
 16+ 100,101,108,109
 17+ ]:
 18+ assert key in wf.namespaces.values(), "Key %s not found in %s" % (key, wf.namespaces)
 19+
 20+ for page in wf.readPages():
 21+ eq_(
 22+ page.getTitle(),
 23+ u'Talk:Pilsbury Block'
 24+ )
 25+ for revision in page.readRevisions():
 26+ eq_(
 27+ revision.getId(),
 28+ 213377884
 29+ )
 30+ eq_(
 31+ revision.getTimestamp(),
 32+ util.wp2Timestamp("2008-05-19T01:41:53Z")
 33+ )
 34+ eq_(
 35+ revision.getContributor().getId(),
 36+ 905763
 37+ )
 38+ eq_(
 39+ revision.getContributor().getUsername(),
 40+ u"Swampyank"
 41+ )
 42+ eq_(
 43+ revision.getMinor(),
 44+ False
 45+ )
 46+ eq_(
 47+ revision.getComment(),
 48+ u"[[WP:AES|\u2190]]Created page with '{{WikiProject National Register of Historic Places|class=Stub}} {{WikiProject Maine|class=Stub|importance=Low}} {{reqphoto|in=Maine}}'"
 49+ )
 50+
 51+ eq_(
 52+ revision.getText(),
 53+ u"{{WikiProject National Register of Historic Places|class=Stub}}\n" +
 54+ u"{{WikiProject Maine|class=Stub|importance=Low}}\n" +
 55+ u"{{reqphoto|in=Maine}}"
 56+ )
 57+
 58+
 59+
 60+def test_large():
 61+ fp = sample.getLargeXMLFilePointer()
 62+ wf = Iterator(fp)
 63+ pageCounter = 0
 64+ revisionCounter = 0
 65+ for page in wf.readPages():
 66+ pageCounter += 1
 67+ for revision in page.readRevisions():
 68+ assert revision.getId() != None
 69+ assert revision.getTimestamp() != None
 70+ __ = revision.getContributor()
 71+ __ = revision.getComment()
 72+ assert revision.getMinor() != None
 73+ assert revision.getText() != None
 74+ #sys.stderr.write(".")
 75+ revisionCounter += 1
 76+ if revisionCounter >= 100: break
 77+
 78+
 79+ eq_(pageCounter, 1)
 80+ #eq_(revisionCounter, 15180)
 81+ eq_(revisionCounter, 100)
 82+
Index: trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/__init__.py
@@ -0,0 +1,28 @@
 2+import os, subprocess
 3+
 4+def extractFile(fileName):
 5+ decompressCall = "lzma -c -q -d %s" % fileName
 6+ process = subprocess.Popen(
 7+ decompressCall,
 8+ stdout=subprocess.PIPE,
 9+ stderr=subprocess.PIPE,
 10+ shell=True
 11+ )
 12+ return process.stdout
 13+
 14+def getSmallXMLFilePath():
 15+ pwd = os.path.dirname(os.path.realpath(__file__))
 16+ return os.path.join(pwd, "small.xml.lzma")
 17+
 18+
 19+def getLargeXMLFilePath():
 20+ pwd = os.path.dirname(os.path.realpath(__file__))
 21+ return os.path.join(pwd, "large.xml.lzma")
 22+
 23+
 24+def getSmallXMLFilePointer():
 25+ return extractFile(getSmallXMLFilePath())
 26+
 27+def getLargeXMLFilePointer():
 28+ return extractFile(getLargeXMLFilePath())
 29+
\ No newline at end of file
Index: trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/test.py
@@ -0,0 +1,4 @@
 2+import os
 3+print(__file__)
 4+print(os.path.realpath(__file__))
 5+print(os.path.realpath(__file__)[:-1*len(__file__)])
Index: trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/small.xml.lzma
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/small.xml.lzma
___________________________________________________________________
Added: svn:mime-type
16 + application/octet-stream
Index: trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/large.xml.lzma
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream
Property changes on: trunk/tools/wsor/wikimedia/wmf/dump/tests/sample/large.xml.lzma
___________________________________________________________________
Added: svn:mime-type
27 + application/octet-stream
Index: trunk/tools/wsor/wikimedia/wmf/dump/tests/test_map.py
@@ -0,0 +1,25 @@
 2+import sys, logging
 3+from nose.tools import eq_
 4+from gl import wp
 5+from . import sample
 6+from ..map import map
 7+
 8+
 9+def test_simple_map():
 10+ dumps = [sample.getSmallXMLFilePath(), sample.getLargeXMLFilePath()]
 11+
 12+ def processPage(dump, page):
 13+ assert hasattr(dump, "namespaces")
 14+ assert hasattr(page, "readRevisions")
 15+
 16+ count = 0
 17+ for rev in page.readRevisions():
 18+ count += 1
 19+ if count >= 100: break
 20+
 21+ yield (page.getId(), count)
 22+
 23+ output = dict(map(dumps, processPage))
 24+
 25+ eq_(output[17500012], 1)
 26+ eq_(output[12], 100)
Index: trunk/tools/wsor/wikimedia/wmf/dump/tests/__init__.py
Index: trunk/tools/wsor/wikimedia/wmf/dump/tests/sample.py
@@ -0,0 +1 @@
 2+
Index: trunk/tools/wsor/wikimedia/wmf/dump/map.py
@@ -0,0 +1,255 @@
 2+"""
 3+Dump Mapper
 4+
 5+This script acts as a map/function over the pages in a set of MediaWiki
 6+database dump files. This script allows the algorithm for processing a set of
 7+pages to be spread across the available processor cores of a system for faster
 8+analysis.
 9+
 10+This script can also be imported as a module to expose the `dump_map()` function
 11+that returns an iterator over output rather than printing to stdout.
 12+
 13+Examples:
 14+
 15+python -O process_dumps.py revision_meta /dumps/enwiki-20110115-pages-meta-history* > ~/data/revision_meta.tsv
 16+"""
 17+import sys, logging, re, types, argparse, os, subprocess
 18+from multiprocessing import Process, Queue, Lock, cpu_count, Value
 19+from Queue import Empty
 20+
 21+from .iterator import Iterator
 22+
 23+class FileTypeError(Exception):pass
 24+
 25+class Processor(Process):
 26+ """
 27+ A processor for managing the reading of dump files from a queue and
 28+ the application of a a function for each 'page'.
 29+ """
 30+
 31+ def __init__(self, input, processPage, output, callback, logger):
 32+ """
 33+ Constructor
 34+
 35+ :Parameters:
 36+ input : `multiprocessing.Queue`
 37+ a queue paths to dump files to process
 38+ processPage : function
 39+ a function to apply to each page of a dump file
 40+ output : `multiprocessing.Queue`
 41+ a queue to send processing output to
 42+ callback : function
 43+ a function to run upon completion
 44+ logger : `logging.Logger`
 45+ a logger object to send logging events to
 46+ """
 47+ self.input = input
 48+ self.processPage = processPage
 49+ self.output = output
 50+ self.callback = callback
 51+ self.logger = logger
 52+ Process.__init__(self)
 53+
 54+ def run(self):
 55+ try:
 56+ while True:
 57+ foo = self.input.qsize()
 58+ fn = self.input.get(block=False)
 59+ self.logger.info("Processing dump file %s." % fn)
 60+ dump = Iterator(openDumpFile(fn))
 61+ for page in dump.readPages():
 62+ self.logger.debug("Processing page %s:%s." % (page.getId(), page.getTitle()))
 63+ try:
 64+ for out in self.processPage(dump, page):
 65+ self.output.put(out)
 66+ except Exception as e:
 67+ self.logger.error(
 68+ "Failed to process page %s:%s - %s" % (
 69+ page.getId(),
 70+ page.getTitle(),
 71+ e
 72+ )
 73+ )
 74+
 75+
 76+
 77+
 78+ except Empty:
 79+ self.logger.info("Nothing left to do. Shutting down thread.")
 80+ finally:
 81+ self.callback()
 82+
 83+
 84+def map(dumps, processPage, threads=cpu_count()-1):
 85+ """
 86+ Maps a function across all of the pages in a set of dump files and returns
 87+ an (order not guaranteed) iterator over the output.
 88+
 89+ :Parameters:
 90+ dumps : list
 91+ a list of paths to dump files to process
 92+ processPage : function
 93+ a function to run on every page of a set of dump files.
 94+ threads : int
 95+ the number of individual processing threads to spool up
 96+ """
 97+
 98+ input = dumpFiles(dumps)
 99+ output = Queue(maxsize=10000)
 100+ running = Value('i', 0)
 101+
 102+ def dec(): running.value -= 1
 103+
 104+ for i in range(0, min(threads, input.qsize())):
 105+ running.value += 1
 106+ Processor(
 107+ input,
 108+ processPage,
 109+ output,
 110+ dec,
 111+ logging.getLogger("Process %s" % i)
 112+ ).start()
 113+
 114+
 115+ #output while processes are running
 116+ while running.value > 0:
 117+ try: yield output.get(timeout=.25)
 118+ except Empty: pass
 119+
 120+ #finish yielding output buffer
 121+ try:
 122+ while True: yield output.get(block=False)
 123+ except Empty:
 124+ pass
 125+
 126+
 127+
 128+EXTENSIONS = {
 129+ 'xml': "cat",
 130+ 'bz2': "bzcat",
 131+ '7z': "7z e -so 2>/dev/null",
 132+ 'lzma':"lzcat"
 133+}
 134+"""
 135+A map from file extension to the command to run to extract the data to standard out.
 136+"""
 137+
 138+EXT_RE = re.compile(r'\.([^\.]+)$')
 139+"""
 140+A regular expression for extracting the final extension of a file.
 141+"""
 142+
 143+
 144+def dumpFile(path):
 145+ """
 146+ Verifies that a file exists at a given path and that the file has a
 147+ known extension type.
 148+
 149+ :Parameters:
 150+ path : `str`
 151+ the path to a dump file
 152+
 153+ """
 154+ path = os.path.expanduser(path)
 155+ if not os.path.isfile(path):
 156+ raise FileTypeError("Can't find file %s" % path)
 157+
 158+ match = EXT_RE.search(path)
 159+ if match == None:
 160+ raise FileTypeError("No extension found for %s." % path)
 161+ elif match.groups()[0] not in EXTENSIONS:
 162+ raise FileTypeError("File type %r is not supported." % path)
 163+ else:
 164+ return path
 165+
 166+def dumpFiles(paths):
 167+ """
 168+ Produces a `multiprocessing.Queue` containing path for each value in
 169+ `paths` to be used by the `Processor`s.
 170+
 171+ :Parameters:
 172+ paths : iterable
 173+ the paths to add to the processing queue
 174+ """
 175+ q = Queue()
 176+ for path in paths: q.put(dumpFile(path))
 177+ return q
 178+
 179+def openDumpFile(path):
 180+ """
 181+ Turns a path to a dump file into a file-like object of (decompressed)
 182+ XML data.
 183+
 184+ :Parameters:
 185+ path : `str`
 186+ the path to the dump file to read
 187+ """
 188+ match = EXT_RE.search(path)
 189+ ext = match.groups()[0]
 190+ p = subprocess.Popen(
 191+ "%s %s" % (EXTENSIONS[ext], path),
 192+ shell=True,
 193+ stdout=subprocess.PIPE
 194+ )
 195+ return p.stdout
 196+
 197+
 198+def encode(v):
 199+ """
 200+ Encodes an output value as a string intended to be read by eval()
 201+ """
 202+ if type(v) == types.FloatType:
 203+ return str(int(v))
 204+ elif v == None:
 205+ return "\\N"
 206+ else:
 207+ return repr(v)
 208+
 209+
 210+
 211+def main():
 212+ parser = argparse.ArgumentParser(
 213+ description='Maps a function across pages of MediaWiki dump files'
 214+ )
 215+ parser.add_argument(
 216+ '-o', '--out',
 217+ metavar="<path>",
 218+ type=lambda path:open(path, "w"),
 219+ help='the path to an output file to write putput to (defaults to stdout)',
 220+ default=sys.stdout
 221+ )
 222+ parser.add_argument(
 223+ '-t', '--threads',
 224+ metavar="",
 225+ type=int,
 226+ help='the number of threads to start (defaults to # of cores -1)',
 227+ default=cpu_count()-1
 228+ )
 229+ parser.add_argument(
 230+ 'processor',
 231+ type=__import__,
 232+ help='the class path to the module that contains the process() function be passed each page'
 233+ )
 234+ parser.add_argument(
 235+ 'dump',
 236+ type=dumpFile,
 237+ help='the XML dump file(s) to process',
 238+ nargs="+"
 239+ )
 240+ args = parser.parse_args()
 241+
 242+ LOGGING_STREAM = sys.stderr
 243+ if __debug__: level = logging.DEBUG
 244+ else: level = logging.INFO
 245+ logging.basicConfig(
 246+ level=level,
 247+ stream=LOGGING_STREAM,
 248+ format='%(name)s: %(asctime)s %(levelname)-8s %(message)s',
 249+ datefmt='%b-%d %H:%M:%S'
 250+ )
 251+ logging.info("Starting dump processor with %s threads." % min(args.threads, len(args.dump)))
 252+ for row in dump_map(args.dump, args.processor.process, args.threads):
 253+ print('\t'.join(encode(v) for v in row))
 254+
 255+if __name__ == "__main__":
 256+ main()
Index: trunk/tools/wsor/wikimedia/wmf/dump/__init__.py
@@ -0,0 +1,2 @@
 2+from .iterator import Iterator
 3+from .map import map
Index: trunk/tools/wsor/wikimedia/wmf/__init__.py
@@ -0,0 +1,2 @@
 2+from __future__ import absolute_import
 3+from .util import *
Index: trunk/tools/wsor/scripts/process_dumps.py
@@ -1,186 +0,0 @@
2 -import sys, logging, re, types, argparse, os, subprocess
3 -from multiprocessing import Process, Queue, Lock, cpu_count, Value
4 -from Queue import Empty
5 -from gl import wp
6 -
7 -class FileTypeError(Exception):pass
8 -
9 -def encode(v):
10 - if type(v) == types.FloatType:
11 - return str(int(v))
12 - elif v == None:
13 - return "\\N"
14 - else:
15 - return repr(v)
16 -
17 -
18 -
19 -class SafeOutput:
20 -
21 - def __init__(self, fp):
22 - self.fp = fp
23 - self.l = Lock()
24 -
25 - def push(self, row, encode=encode):
26 - if __debug__:
27 - row = tuple(row)
28 -
29 - with self.l:
30 - self.fp.write("\t".join(clean(v) for v in row) + "\n")
31 -
32 -class Processor(Process):
33 -
34 - def __init__(self, input, processPage, output, callback, logger):
35 - self.input = input
36 - self.processPage = processPage
37 - self.output = output
38 - self.callback = callback
39 - self.logger = logger
40 - Process.__init__(self)
41 -
42 - def run(self):
43 - try:
44 - while True:
45 - foo = self.input.qsize()
46 - fn = self.input.get(block=False)
47 - self.logger.info("Processing dump file %s." % fn)
48 - dump = wp.dump.Iterator(openDumpFile(fn))
49 - for page in dump.readPages():
50 - self.logger.debug("Processing page %s:%s." % (page.getId(), page.getTitle()))
51 - try:
52 - for out in self.processPage(dump, page):
53 - self.output.put(out)
54 - except Exception as e:
55 - self.logger.error(
56 - "Failed to process page %s:%s - %s" % (
57 - page.getId(),
58 - page.getTitle(),
59 - e
60 - )
61 - )
62 -
63 -
64 -
65 -
66 - except Empty:
67 - self.logger.info("Nothing left to do. Shutting down thread.")
68 - finally:
69 - self.callback()
70 -
71 -
72 -
73 -
74 -def main(args):
75 - LOGGING_STREAM = sys.stderr
76 - if __debug__: level = logging.DEBUG
77 - else: level = logging.INFO
78 - logging.basicConfig(
79 - level=level,
80 - stream=LOGGING_STREAM,
81 - format='%(name)s: %(asctime)s %(levelname)-8s %(message)s',
82 - datefmt='%b-%d %H:%M:%S'
83 - )
84 - logging.info("Starting dump processor with %s threads." % min(args.threads, len(args.dump)))
85 - for row in process_dumps(args.dump, args.processor.process, args.threads):
86 - print('\t'.join(encode(v) for v in row))
87 -
88 -def process_dumps(dumps, processPage, threads):
89 - input = dumpFiles(dumps)
90 - output = Queue(maxsize=10000)
91 - running = Value('i', 0)
92 -
93 - def dec(): running.value -= 1
94 -
95 - for i in range(0, min(threads, input.qsize())):
96 - running.value += 1
97 - Processor(
98 - input,
99 - processPage,
100 - output,
101 - dec,
102 - logging.getLogger("Process %s" % i)
103 - ).start()
104 -
105 -
106 - #output while processes are running
107 - while running.value > 0:
108 - try: yield output.get(timeout=.25)
109 - except Empty: pass
110 -
111 - #finish yielding output buffer
112 - try:
113 - while True: yield output.get(block=False)
114 - except Empty:
115 - pass
116 -
117 -
118 -
119 -EXTENSIONS = {
120 - 'xml': "cat",
121 - 'bz2': "bzcat",
122 - '7z': "7z e -so 2>/dev/null",
123 - 'lzma':"lzcat"
124 -}
125 -
126 -EXT_RE = re.compile(r'\.([^\.]+)$')
127 -def dumpFile(path):
128 - path = os.path.expanduser(path)
129 - if not os.path.isfile(path):
130 - raise FileTypeError("Can't find file %s" % path)
131 -
132 - match = EXT_RE.search(path)
133 - if match == None:
134 - raise FileTypeError("No extension found for %s." % path)
135 - elif match.groups()[0] not in EXTENSIONS:
136 - raise FileTypeError("File type %r is not supported." % path)
137 - else:
138 - return path
139 -
140 -def dumpFiles(paths):
141 - q = Queue()
142 - for path in paths: q.put(dumpFile(path))
143 - return q
144 -
145 -def openDumpFile(path):
146 - match = EXT_RE.search(path)
147 - ext = match.groups()[0]
148 - p = subprocess.Popen(
149 - "%s %s" % (EXTENSIONS[ext], path),
150 - shell=True,
151 - stdout=subprocess.PIPE
152 - )
153 - return p.stdout
154 -
155 -
156 -if __name__ == "__main__":
157 - parser = argparse.ArgumentParser(
158 - description='Maps a function across pages of MediaWiki dump files'
159 - )
160 - parser.add_argument(
161 - '-o', '--out',
162 - metavar="<path>",
163 - type=lambda path:open(path, "w"),
164 - help='the path to an output file to write putput to (defaults to stdout)',
165 - default=sys.stdout
166 - )
167 - parser.add_argument(
168 - '-t', '--threads',
169 - metavar="",
170 - type=int,
171 - help='the number of threads to start (defaults to # of cores -1)',
172 - default=cpu_count()-1
173 - )
174 - parser.add_argument(
175 - 'processor',
176 - type=__import__,
177 - help='the class path to the function to use to process each page'
178 - )
179 - parser.add_argument(
180 - 'dump',
181 - type=dumpFile,
182 - help='the XML dump file(s) to process',
183 - nargs="+"
184 - )
185 - args = parser.parse_args()
186 - main(args)
187 -
Index: trunk/tools/wsor/ts_samples/testing.sql
@@ -40,3 +40,30 @@
4141 CREATE UNIQUE INDEX user_id_idx ON halfak.user_meta (user_id);
4242 CREATE INDEX first_edit_idx ON halfak.user_meta (first_edit);
4343 CREATE INDEX last_edit_idx ON halfak.user_meta (last_edit);
 44+
 45+
 46+SELECT
 47+ year,
 48+ biannual,
 49+ count(*)
 50+FROM
 51+(
 52+SELECT
 53+ u.user_id,
 54+ SUBSTRING(first_edit, 1,4) as year,
 55+ SUBSTRING(first_edit, 5,2) >= "07" as biannual
 56+FROM halfak.user_meta um
 57+INNER JOIN user u
 58+ ON u.user_id = um.user_id
 59+INNER JOIN page p
 60+ ON p.page_title = u.user_name
 61+ AND p.page_namespace = 3
 62+INNER JOIN revision r
 63+ ON um.user_id != r.rev_user
 64+ AND p.page_id = r.rev_page
 65+GROUP BY
 66+ user_id,
 67+ SUBSTRING(first_edit, 1,4),
 68+ SUBSTRING(first_edit, 5,2)
 69+) as foo
 70+GROUP BY year, biannual;

Status & tagging log