r84633 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r84632‎ | r84633 | r84634 >
Date:21:38, 23 March 2011
Author:diederik
Status:deferred
Tags:
Comment:
Kaggle dataset creator almost finished.
Modified paths:
  • /trunk/tools/editor_trends/etl/enricher.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/enricher.py
@@ -19,6 +19,7 @@
2020
2121
2222 import bz2
 23+import os
2324 import cStringIO
2425 import hashlib
2526 import codecs
@@ -27,6 +28,7 @@
2829 import progressbar
2930 from multiprocessing import JoinableQueue, Process, cpu_count, current_process
3031 from xml.etree.cElementTree import fromstring, iterparse
 32+from lxml import objectify
3133 from collections import deque
3234
3335 if '..' not in sys.path:
@@ -75,7 +77,16 @@
7678 #109:'Book talk'
7779 }
7880
 81+class Statistics:
 82+ def __init__(self):
 83+ self.count_articles = 0
 84+ self.count_revisions = 0
7985
 86+ def summary(self):
 87+ print 'Number of articles: %s' % self.count_articles
 88+ print 'Number of revisions: %s' % self.count_revisions
 89+
 90+
8091 class Buffer:
8192 def __init__(self, storage, id):
8293 assert storage == 'cassandra' or storage == 'mongo' or storage == 'csv', \
@@ -89,6 +100,7 @@
90101 'title', 'timestamp', 'hash', 'revert', 'bot', 'prev_size',
91102 'cur_size', 'delta']
92103 self.setup_storage()
 104+ self.stats = Statistics()
93105
94106 def setup_storage(self):
95107 if self.storage == 'cassandra':
@@ -111,7 +123,8 @@
112124 self.stringify(revision)
113125 id = revision['revision_id']
114126 self.revisions[id] = revision
115 - if len(self.revisions) == 1000:
 127+ if len(self.revisions) > 1000:
 128+ print 'Emptying buffer'
116129 self.store()
117130 self.clear()
118131
@@ -250,10 +263,10 @@
251264 if title.startswith(namespace):
252265 ns['namespace'] = namespaces[namespace]
253266 if ns == {}:
254 - for namespace in NAMESPACE:
 267+ for namespace in NAMESPACE.values():
255268 if title.startswith(namespace):
256269 ns = False #article does not belong to either the main namespace, user, talk or user talk namespace.
257 - break
 270+ return ns
258271 ns['namespace'] = 0
259272 else:
260273 ns = False
@@ -262,7 +275,7 @@
263276
264277 def prefill_row(title, article_id, namespace):
265278 row = {}
266 - row['title'] = title.text
 279+ row['title'] = title
267280 row['article_id'] = article_id
268281 row.update(namespace)
269282 return row
@@ -277,108 +290,93 @@
278291 return revert
279292
280293
281 -def create_variables(result_queue, storage, id):
282 - bots = detector.retrieve_bots('en')
283 - buffer = Buffer(storage, id)
284 - i = 0
285 - while True:
286 - article = result_queue.get(block=True)
287 - result_queue.task_done()
288 - if article == None:
289 - break
290 - i += 1
291 - #article = fromstring(article)
292 - title = article['title'].text
293 - namespace = determine_namespace(title)
294 - if namespace != False:
295 - #revisions = article.findall('revision')
296 - article_id = article['id'].text
297 - hashes = deque(maxlen=1000)
298 - size = {}
299 - for revision in article['revision']:
300 - if revision == None:
301 - #the entire revision is empty, weird.
302 - continue
 294+def add_comment(revision_id, revision):
 295+ comment = {}
 296+ comment[revision_id] = revision.text
 297+ return comment
 298+
303299
304 - contributor = revision.find('contributor')
305 - contributor = parse_contributor(contributor, bots)
306 - if not contributor:
307 - #editor is anonymous, ignore
308 - continue
 300+def create_variables(article, cache, cache_comments, bots):
309301
310 - revision_id = revision.find('id')
311 - revision_id = extracter.extract_revision_id(revision_id)
312 - if revision_id == None:
313 - #revision_id is missing, which is weird
314 - continue
 302+ title = article['title'].text
 303+ namespace = determine_namespace(article['title'])
 304+
 305+ if namespace != False:
 306+ cache.stats.count_articles += 1
 307+ article_id = article['id'].text
 308+ hashes = deque()
 309+ size = {}
 310+ revisions = article['revisions']
 311+ for revision in revisions:
 312+ cache.stats.count_revisions += 1
 313+ if revision == None:
 314+ #the entire revision is empty, weird.
 315+ continue
 316+ contributor = revision.find('contributor')
 317+ contributor = parse_contributor(contributor, bots)
 318+ if not contributor:
 319+ #editor is anonymous, ignore
 320+ continue
315321
316 - row = prefill_row(title, article_id, namespace)
317 - row['revision_id'] = revision_id
318 - text = extract_revision_text(revision)
319 - row.update(contributor)
 322+ revision_id = revision.find('id')
 323+ revision_id = extracter.extract_revision_id(revision_id)
 324+ if revision_id == None:
 325+ #revision_id is missing, which is weird
 326+ continue
 327+ comment = add_comment(revision_id, revision)
 328+ row = prefill_row(title, article_id, namespace)
 329+ row['revision_id'] = revision_id
 330+ text = extract_revision_text(revision)
 331+ row.update(contributor)
320332
 333+ timestamp = revision.find('timestamp').text
 334+ row['timestamp'] = timestamp
321335
322 - timestamp = revision.find('timestamp').text
323 - row['timestamp'] = timestamp
 336+ hash = create_md5hash(text)
 337+ revert = is_revision_reverted(hash['hash'], hashes)
 338+ hashes.append(hash['hash'])
 339+ size = calculate_delta_article_size(size, text)
324340
325 - hash = create_md5hash(text)
326 - revert = is_revision_reverted(hash['hash'], hashes)
327 - hashes.append(hash['hash'])
328 - size = calculate_delta_article_size(size, text)
 341+ row.update(hash)
 342+ row.update(size)
 343+ row.update(revert)
 344+ cache.add(row)
 345+
329346
330 - row.update(hash)
331 - row.update(size)
332 - row.update(revert)
333 - # print row
334 - # if row['username'] == None:
335 - # contributor = revision.find('contributor')
336 - # attrs = contributor.getchildren()
337 - # for attr in attrs:
338 - # print attr.text
339 - #print revision_id, hash, delta, prev_size\
340347
341 - buffer.add(row)
342 - if i % 10000 == 0:
343 - print 'Parsed %s articles' % i
344 -# except ValueError, e:
345 -# print e
346 -# except UnicodeDecodeError, e:
347 -# print e
348 - buffer.empty()
349 - print 'Buffer is empty'
350348
351 -
352 -def parse_xml(source, result_queue):
353 - context = iterparse(source, events=('end',))
 349+def parse_xml(buffer):
 350+ context = iterparse(buffer, events=('end',))
354351 context = iter(context)
355352 event, root = context.next()
356353
357354 article = {}
358355 id = False
 356+ article[root.tag] = root
 357+ article['revisions'] = []
359358 for event, elem in context:
360359 if event == 'end' and elem.tag == 'revision':
361 - article[elem.tag] = elem
 360+ article['revisions'].append(elem)
362361 elif event == 'end' and elem.tag == 'id' and id == False:
363362 article[elem.tag] = elem
364363 id = True
365 - article[root.tag] = root
366 - result_queue.put(article)
367 - root.clear()
368364
 365+ return article
369366
370 -def stream_raw_xml(input_queue, result_queue):
 367+
 368+def stream_raw_xml(input_queue, storage, id):
371369 buffer = cStringIO.StringIO()
372370 parsing = False
373 -
 371+ bots = detector.retrieve_bots('en')
 372+ cache = Buffer(storage, id)
 373+ cache_comments = Buffer(storage, id)
 374+ i = 0
374375 while True:
375376 filename = input_queue.get()
376377 input_queue.task_done()
377378 if filename == None:
378379 break
379380
380 - #filesize = file_utils.determine_filesize('', filename)
381 - #pbar = progressbar.ProgressBar().start()
382 -
383381 for data in unzip(filename):
384382 if data.startswith('<page>'):
385383 parsing = True
@@ -387,14 +385,19 @@
388386 buffer.write('\n')
389387 if data == '</page>':
390388 buffer.seek(0)
391 - parse_xml(buffer, result_queue)
 389+ article = parse_xml(buffer)
 390+ create_variables(article, cache, cache_comments, bots)
392391 buffer = cStringIO.StringIO()
393 - #pbar.update(pbar.currval + len(data)) #is inaccurate!!!
 392+ i += 1
 393+ if i % 10000 == 0:
 394+ print 'Parsed %s articles' % i
394395
395 -
396 - for x in xrange(cpu_count()):
397 - result_queue.put(None)
 396+
 397+ cache.empty()
 398+ cache_comments.empty()
 399+ print 'Buffer is empty'
398400 print 'Finished parsing bz2 archives'
 401+ cache.stats.summary()
399402
400403
401404 def debug():
@@ -428,33 +431,34 @@
429432
430433
431434 def launcher():
432 -
433435 storage = 'csv'
434436 setup(storage)
435437 input_queue = JoinableQueue()
436 - result_queue = JoinableQueue()
437438 #files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
438 - files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
 439+ #files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
 440+ path = '/media/77fc623f-78c1-4f1e-be57-0f3043d778aa/wikipedia_dumps/batch1/'
 441+ files = file_utils.retrieve_file_list(path, 'bz2', mask=None)
439442
440443 for file in files:
441 - input_queue.put(file)
 444+ filename = os.path.join(path, file)
 445+ print filename
 446+ input_queue.put(filename)
442447
443448 for x in xrange(cpu_count()):
444449 input_queue.put(None)
445450
446 - extracters = [Process(target=stream_raw_xml, args=[input_queue, result_queue])
 451+ extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, x])
447452 for x in xrange(cpu_count())]
448453 for extracter in extracters:
449454 extracter.start()
450455
451 - creators = [Process(target=create_variables, args=[result_queue, storage, x])
452 - for x in xrange(cpu_count())]
453 - for creator in creators:
454 - creator.start()
 456+ #creators = [Process(target=create_variables, args=[result_queue, storage, x])
 457+ # for x in xrange(cpu_count())]
 458+ #for creator in creators:
 459+ # creator.start()
455460
456461
457462 input_queue.join()
458 - result_queue.join()
459463
460464
461465 if __name__ == '__main__':

Status & tagging log