r82762 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r82761‎ | r82762 | r82763 >
Date:23:36, 24 February 2011
Author:diederik
Status:deferred
Tags:
Comment:
Generic XML parser that creates variables and can output to different databases (Cassandra and Mongo).
Modified paths:
  • /trunk/tools/editor_trends/etl/enricher.py (modified) (history)
  • /trunk/tools/editor_trends/etl/extracter.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/enricher.py
@@ -23,12 +23,64 @@
2424 import hashlib
2525 import codecs
2626 import re
27 -from multiprocessing import JoinableQueue, Process
28 -#from xml.etree.cElementTree. import iterparse
29 -from xml.etree.cElementTree import fromstring
 27+import sys
 28+import progressbar
 29+from multiprocessing import JoinableQueue, Process, cpu_count
 30+from xml.etree.cElementTree import fromstring, dump
 31+from collections import deque
3032
31 -RE_CATEGORY= re.compile('\(.*\`\,\.\-\:\'\)')
 33+try:
 34+ import pycassa
 35+except ImportError:
 36+ print 'I am not going to use Cassandra today, it\'s my off day.'
3237
 38+if '..' not in sys.path:
 39+ sys.path.append('..')
 40+
 41+from database import db
 42+from bots import detector
 43+from utils import file_utils
 44+import extracter
 45+
 46+try:
 47+ import psyco
 48+ psyco.full()
 49+except ImportError:
 50+ print 'and psyco is having an off day as well...'
 51+
 52+RE_CATEGORY = re.compile('\(.*\`\,\.\-\:\'\)')
 53+
 54+
 55+class Buffer:
 56+ def __init__(self, storage):
 57+ assert storage == 'cassandra' or storage == 'mongo', \
 58+ 'Valid storage options are cassandra and mongo.'
 59+ self.storage = storage
 60+ self.revisions = []
 61+ self.setup_db()
 62+
 63+ def setup_db(self):
 64+ if self.storage == 'cassandra':
 65+ pass
 66+ else:
 67+ self.db = db.init_mongo_db('enwiki')
 68+ self.collection = self.db['kaggle']
 69+
 70+ def add(self, revision):
 71+ self.revisions.append(revision)
 72+ if len(self.revisions) == 100:
 73+ self.store()
 74+
 75+ def empty(self):
 76+ self.store()
 77+
 78+ def store(self):
 79+ if self.storage == 'cassandra':
 80+ print 'insert into cassandra'
 81+ else:
 82+ print 'insert into mongo'
 83+
 84+
3385 def extract_categories():
3486 '''
3587 Field 1: page id
@@ -36,57 +88,117 @@
3789 Field 3: sort key
3890 Field 4: timestamp last change
3991 '''
40 - filename = '/Users/diederik/Downloads/enwiki-20110115-categorylinks.sql'
 92+ filename = 'C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-categorylinks.sql'
4193 output = codecs.open('categories.csv', 'w', encoding='utf-8')
4294 fh = codecs.open(filename, 'r', encoding='utf-8')
43 -
44 - for line in fh:
45 - if line.startswith('INSERT INTO `categorylinks` VALUES ('):
46 - line = line.replace('INSERT INTO `categorylinks` VALUES (','')
47 - line = line.replace("'",'')
48 - categories = line.split('),(')
49 - for category in categories:
50 - category = category.split(',')
51 - if len(category) ==4:
52 - output.write('%s\t%s\n' % (category[0], category[1]))
53 -
 95+
 96+ try:
 97+ for line in fh:
 98+ if line.startswith('INSERT INTO `categorylinks` VALUES ('):
 99+ line = line.replace('INSERT INTO `categorylinks` VALUES (', '')
 100+ line = line.replace("'", '')
 101+ categories = line.split('),(')
 102+ for category in categories:
 103+ category = category.split(',')
 104+ if len(category) == 4:
 105+ output.write('%s\t%s\n' % (category[0], category[1]))
 106+ except UnicodeDecodeError, e:
 107+ print e
 108+
54109 output.close()
55110 fh.close()
56111
 112+
57113 def extract_revision_text(revision):
58114 rev = revision.find('text')
59115 if rev != None:
60116 return rev.text.encode('utf-8')
61117 else:
62118 return None
63 -
 119+
 120+
64121 def create_md5hash(revision):
65 - if revision == None:
66 - return False
67122 rev = extract_revision_text(revision)
 123+ hash = {}
68124 if rev != None:
69125 m = hashlib.md5()
70126 m.update(rev)
71127 #echo m.digest()
72 - return m.hexdigest()
 128+ hash['hash'] = m.hexdigest()
73129 else:
74 - return False
 130+ hash['hash'] = -1
 131+ return hash
75132
76 -
 133+
77134 def calculate_delta_article_size(prev_size, revision):
78 - if revision == None:
79 - return False
80 - rev= extract_revision_text(revision)
81 - if rev == None:
82 - return 0, prev_size
 135+ rev = extract_revision_text(revision)
 136+ size = {}
 137+ if prev_size == None:
 138+ size['prev_size'] = 0
 139+ size['cur_size'] = len(rev)
83140 else:
 141+ size['prev_size'] = prev_size
84142 delta = len(rev) - prev_size
85143 prev_size = len(rev)
86 - return delta, prev_size
87 -
 144+ size['cur_size'] = delta
 145+ return size
88146
89147
90 -def create_variables(result_queue):
 148+def parse_contributor(contributor, bots):
 149+ username = extracter.extract_username(contributor)
 150+ user_id = extracter.extract_contributor_id(contributor)
 151+ bot = extracter.determine_username_is_bot(contributor, bots=bots)
 152+ contributor = {}
 153+ contributor['username'] = username
 154+ contributor['bot'] = bot
 155+ if user_id != None:
 156+ contributor.update(user_id)
 157+ else:
 158+ contribuor = False
 159+ return contributor
 160+
 161+
 162+def determine_namespace(title):
 163+ namespaces = {'User': 2,
 164+ 'Talk': 1,
 165+ 'User Talk': 3,
 166+ }
 167+ ns = {}
 168+ if title.text != None:
 169+ title = title.text
 170+ title = title.split(':')
 171+ if len(title) == 1:
 172+ ns['namespace'] = 0
 173+ elif len(title) == 2:
 174+ if title[0] in namespaces:
 175+ ns['namespace'] = namespaces[title[0]]
 176+ else:
 177+ ns = False #article does not belong to either the main namespace, user, talk or user talk namespace.
 178+ else:
 179+ ns = False
 180+ return ns
 181+
 182+
 183+def prefill_row(title, article_id, namespace):
 184+ row = {}
 185+ row['title'] = title.text
 186+ row['article_id'] = article_id
 187+ row.update(namespace)
 188+ return row
 189+
 190+
 191+def is_revision_reverted(hash_cur, hashes):
 192+ revert = {}
 193+ if hash_cur in hashes:
 194+ revert['revert'] = 1
 195+ else:
 196+ revert['revert'] = 0
 197+ return revert
 198+
 199+
 200+def create_variables(result_queue, storage):
 201+ bots = detector.retrieve_bots('en')
 202+ buffer = Buffer(storage)
91203 while True:
92204 try:
93205 article = result_queue.get(block=True)
@@ -94,26 +206,56 @@
95207 if article == None:
96208 break
97209 article = fromstring(article)
98 - prev_size = 0
99 - revisions = article.findall('revision')
100 - for revision in revisions:
101 - revision_id = revision.find('id').text
102 - hash = create_md5hash(revision)
103 - delta, prev_size = calculate_delta_article_size(prev_size, revision)
104 - print revision_id, hash, delta, prev_size
 210+ title = article.find('title')
 211+ namespace = determine_namespace(title)
 212+ if namespace != False:
 213+ prev_size = None
 214+ revisions = article.findall('revision')
 215+ article_id = article.find('id').text
 216+ hashes = deque(maxlen=100)
 217+ for revision in revisions:
 218+ row = prefill_row(title, article_id, namespace)
 219+ if revision == None:
 220+ continue
 221+
 222+ contributor = revision.find('contributor')
 223+ contributor = parse_contributor(contributor, bots)
 224+ if not contributor:
 225+ #editor is anonymous, ignore
 226+ continue
 227+
 228+ row.update(contributor)
 229+ revision_id = revision.find('id')
 230+ revision_id = extracter.extract_revision_id(revision_id)
 231+ row['revision_id'] = revision_id
 232+
 233+
 234+ hash = create_md5hash(revision)
 235+ revert = is_revision_reverted(hash['hash'], hashes)
 236+ hashes.append(hash['hash'])
 237+ size = calculate_delta_article_size(prev_size, revision)
 238+
 239+ row.update(hash)
 240+ row.update(size)
 241+ row.update(revert)
 242+ print row
 243+ # if row['username'] == None:
 244+ # contributor = revision.find('contributor')
 245+ # attrs = contributor.getchildren()
 246+ # for attr in attrs:
 247+ # print attr.text
 248+ #print revision_id, hash, delta, prev_size\
 249+
 250+ buffer.add(row)
 251+
105252 except ValueError, e:
106 - pass
107 - #print e
108 -
 253+ print e
 254+ except UnicodeDecodeError, e:
 255+ print e
 256+ buffer.empty()
109257
110258
111259 def create_article(input_queue, result_queue):
112 - '''
113 - This function creates three variables:
114 - 1) a MD5 hash for each revision
115 - 2) the size of the current revision
116 - 3) the delta size of the current revision and the previous revision
117 - '''
118260 buffer = cStringIO.StringIO()
119261 parsing = False
120262 while True:
@@ -121,29 +263,30 @@
122264 input_queue.task_done()
123265 if filename == None:
124266 break
125 -
 267+ filesize = file_utils.determine_filesize('', filename)
 268+ pbar = progressbar.ProgressBar(maxval=filesize).start()
126269 for data in unzip(filename):
127270 if data.startswith('<page>'):
128271 parsing = True
129 - #print data
130272 if parsing:
131273 buffer.write(data)
132274 if data == '</page>':
133 - xml1 = buffer.getvalue()
134 - #xml1 = xml1.decode('utf-8')
135 - #xml1 = xml1.encode('utf-8')
136 - #xml1 = fromstring(xml1)
137 - if xml1 != None:
138 - result_queue.put(xml1)
 275+ xml_string = buffer.getvalue()
 276+ if xml_string != None:
 277+ result_queue.put(xml_string)
139278 buffer = cStringIO.StringIO()
140 -
141 - result_queue.put(None)
 279+ pbar.update(pbar.currval + len(data)) #is inaccurate!!!
 280+
 281+
 282+ result_queue.put(None)
142283 print 'Finished parsing bz2 archives'
143284
 285+
144286 def unzip(filename):
145287 '''
146 - Filename should be a fully qualified path to the bz2 file that will be decompressed.
147 - It will iterate line by line and yield this back to create_article
 288+ Filename should be a fully qualified path to the bz2 file that will be
 289+ decompressed. It will iterate line by line and yield this back to
 290+ create_article
148291 '''
149292 fh = bz2.BZ2File(filename, 'r')
150293 for line in fh:
@@ -156,27 +299,29 @@
157300 def launcher():
158301 input_queue = JoinableQueue()
159302 result_queue = JoinableQueue()
160 - files = ['/Users/diederik/Downloads/enwiki-20110115-pages-articles1.xml.bz2']
 303+ storage = 'cassandra'
 304+ files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
161305 for file in files:
162306 input_queue.put(file)
163 -
164 - for x in xrange(2):
 307+
 308+ for x in xrange(cpu_count):
165309 input_queue.put(None)
166 -
167 - extracters = [Process(target=create_article, args=[input_queue, result_queue]) for x in xrange(2)]
 310+
 311+ extracters = [Process(target=create_article, args=[input_queue, result_queue])
 312+ for x in xrange(cpu_count)]
168313 for extracter in extracters:
169314 extracter.start()
170 -
171 - creators = [Process(target=create_variables, args=[result_queue]) for x in xrange(2)]
 315+
 316+ creators = [Process(target=create_variables, args=[result_queue, storage])
 317+ for x in xrange(cpu_count)]
172318 for creator in creators:
173319 creator.start()
174 -
175 -
 320+
 321+
176322 input_queue.join()
177323 result_queue.join()
178 -
179 -
180324
 325+
181326 if __name__ == '__main__':
182 - extract_categories()
183 - #launcher()
\ No newline at end of file
 327+ #extract_categories()
 328+ launcher()
Index: trunk/tools/editor_trends/etl/extracter.py
@@ -153,6 +153,13 @@
154154 return None
155155
156156
 157+def extract_revision_id(revision_id, **kwargs):
 158+ if revision_id != None:
 159+ return revision_id.text
 160+ else:
 161+ return None
 162+
 163+
157164 def extract_contributor_id(contributor, **kwargs):
158165 '''
159166 @contributor is the xml contributor node containing a number of attributes
@@ -184,18 +191,19 @@
185192 the variable tags determines which attributes are being parsed, the values
186193 in this dictionary are the functions used to extract the data.
187194 '''
188 - headers = ['id', 'date', 'article', 'username']
 195+ headers = ['id', 'date', 'article', 'username', 'revision_id']
189196 tags = {'contributor': {'id': extract_contributor_id,
190197 'bot': determine_username_is_bot,
191198 'username': extract_username,
192199 },
193200 'timestamp': {'date': wikitree.parser.extract_text},
 201+ 'id': {'revision_id': extract_revision_id,
 202+ }
194203 }
195204 vars = {}
196205 flat = []
197206
198207 for x, revision in enumerate(revisions):
199 - #print len(revision.getchildren())
200208 vars[x] = {}
201209 vars[x]['article'] = page
202210 for tag in tags:
@@ -208,7 +216,6 @@
209217 f = tags[tag][function]
210218 value = f(el, bots=bots)
211219 if isinstance(value, dict):
212 - #if type(value) == type({}):
213220 for kw in value:
214221 vars[x][kw] = value[kw]
215222 else:
@@ -237,7 +244,7 @@
238245 return output
239246
240247
241 -def parse_dumpfile(tasks, rts, filehandles, lock):
 248+def parse_dumpfile(tasks, rts, lock):
242249 bot_ids = detector.retrieve_bots(rts.language.code)
243250 location = os.path.join(rts.input_location, rts.language.code, rts.project.name)
244251 output = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt')
@@ -351,6 +358,7 @@
352359 files = file_utils.retrieve_file_list(properties.location,
353360 extension,
354361 mask=canonical_filename)
 362+ print properties.location
355363 print 'Checking if dump file has been extracted...'
356364 for fn in files:
357365 file_without_ext = fn.replace('%s%s' % ('.', extension), '')
@@ -369,7 +377,6 @@
370378 print 'There was an error while extracting %s, please make sure \
371379 that %s is valid archive.' % (fn, fn)
372380 return False
373 - print tasks.qsize()
374381 return tasks
375382
376383
@@ -392,14 +399,10 @@
393400 return result
394401
395402 lock = multiprocessing.Lock()
396 -# filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 'a',
397 -# settings.encoding) for fh in xrange(settings.max_filehandles)]
398403
399 - filehandles = []
400404 consumers = [multiprocessing.Process(target=parse_dumpfile,
401405 args=(tasks,
402406 rts,
403 - filehandles,
404407 lock))
405408 for x in xrange(rts.number_of_processes)]
406409
@@ -411,11 +414,11 @@
412415 w.start()
413416
414417 tasks.join()
415 - filehandles = [fh.close() for fh in filehandles]
416418
417 - result = sum([consumer.exitcode for consumer in consumers])
 419+ result = sum([consumer.exitcode for consumer in consumers
 420+ if consumer.exitcode != None])
418421
419 - if restult == 0:
 422+ if result == 0:
420423 return True
421424 else:
422425 return False
@@ -427,5 +430,6 @@
428431 filename = 'svwiki-latest-stub-meta-history.xml'
429432 parse_dumpfile(project, filename, language_code)
430433
 434+
431435 if __name__ == '__main__':
432436 debug()

Status & tagging log