r84500 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r84499‎ | r84500 | r84501 >
Date:23:28, 21 March 2011
Author:diederik
Status:deferred
Tags:
Comment:
Converted XML parser for Kaggle competition to streaming parser, hopefully no more out of memory errors.
Modified paths:
  • /trunk/tools/editor_trends/etl/enricher.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/enricher.py
@@ -26,7 +26,7 @@
2727 import sys
2828 import progressbar
2929 from multiprocessing import JoinableQueue, Process, cpu_count, current_process
30 -from xml.etree.cElementTree import fromstring
 30+from xml.etree.cElementTree import fromstring, iterparse
3131 from collections import deque
3232
3333 if '..' not in sys.path:
@@ -287,15 +287,15 @@
288288 if article == None:
289289 break
290290 i += 1
291 - article = fromstring(article)
292 - title = article.find('title')
 291+ #article = fromstring(article)
 292+ title = article['title'].text
293293 namespace = determine_namespace(title)
294294 if namespace != False:
295 - revisions = article.findall('revision')
296 - article_id = article.find('id').text
 295+ #revisions = article.findall('revision')
 296+ article_id = article['id'].text
297297 hashes = deque(maxlen=1000)
298298 size = {}
299 - for revision in revisions:
 299+ for revision in article['revision']:
300300 if revision == None:
301301 #the entire revision is empty, weird.
302302 continue
@@ -348,34 +348,66 @@
349349 print 'Buffer is empty'
350350
351351
352 -def create_article(input_queue, result_queue):
 352+def parse_xml(source, result_queue):
 353+ context = iterparse(source, events=('end',))
 354+ context = iter(context)
 355+ event, root = context.next()
 356+
 357+ article = {}
 358+ id = False
 359+ for event, elem in context:
 360+ if event == 'end' and elem.tag == 'revision':
 361+ article[elem.tag] = elem
 362+ elif event == 'end' and elem.tag == 'id' and id == False:
 363+ article[elem.tag] = elem
 364+ id = True
 365+ article[root.tag] = root
 366+ result_queue.put(article)
 367+ root.clear()
 368+
 369+
 370+def stream_raw_xml(input_queue, result_queue):
353371 buffer = cStringIO.StringIO()
354372 parsing = False
 373+
355374 while True:
356375 filename = input_queue.get()
357376 input_queue.task_done()
358377 if filename == None:
359378 break
 379+
360380 #filesize = file_utils.determine_filesize('', filename)
361381 #pbar = progressbar.ProgressBar().start()
 382+
362383 for data in unzip(filename):
363384 if data.startswith('<page>'):
364385 parsing = True
365386 if parsing:
366387 buffer.write(data)
 388+ buffer.write('\n')
367389 if data == '</page>':
368 - xml_string = buffer.getvalue()
369 - if xml_string != None:
370 - result_queue.put(xml_string)
 390+ buffer.seek(0)
 391+ parse_xml(buffer, result_queue)
371392 buffer = cStringIO.StringIO()
372393 #pbar.update(pbar.currval + len(data)) #is inaccurate!!!
373394
374395
375 - #for x in xrange(cpu_count()):
376 - result_queue.put(None)
 396+ for x in xrange(cpu_count()):
 397+ result_queue.put(None)
377398 print 'Finished parsing bz2 archives'
378399
379400
 401+def debug():
 402+ input_queue = JoinableQueue()
 403+ result_queue = JoinableQueue()
 404+ files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
 405+
 406+ for file in files:
 407+ input_queue.put(file)
 408+
 409+ stream_raw_xml(input_queue, result_queue)
 410+
 411+
380412 def unzip(filename):
381413 '''
382414 Filename should be a fully qualified path to the bz2 file that will be
@@ -410,7 +442,7 @@
411443 for x in xrange(cpu_count()):
412444 input_queue.put(None)
413445
414 - extracters = [Process(target=create_article, args=[input_queue, result_queue])
 446+ extracters = [Process(target=stream_raw_xml, args=[input_queue, result_queue])
415447 for x in xrange(cpu_count())]
416448 for extracter in extracters:
417449 extracter.start()
@@ -426,5 +458,5 @@
427459
428460
429461 if __name__ == '__main__':
430 - #extract_categories()
 462+ #debug()
431463 launcher()

Status & tagging log