Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -26,7 +26,7 @@ |
27 | 27 | import sys
|
28 | 28 | import progressbar
|
29 | 29 | from multiprocessing import JoinableQueue, Process, cpu_count, current_process
|
30 | | -from xml.etree.cElementTree import fromstring
|
| 30 | +from xml.etree.cElementTree import fromstring, iterparse
|
31 | 31 | from collections import deque
|
32 | 32 |
|
33 | 33 | if '..' not in sys.path:
|
— | — | @@ -287,15 +287,15 @@ |
288 | 288 | if article == None:
|
289 | 289 | break
|
290 | 290 | i += 1
|
291 | | - article = fromstring(article)
|
292 | | - title = article.find('title')
|
| 291 | + #article = fromstring(article)
|
| 292 | + title = article['title'].text
|
293 | 293 | namespace = determine_namespace(title)
|
294 | 294 | if namespace != False:
|
295 | | - revisions = article.findall('revision')
|
296 | | - article_id = article.find('id').text
|
| 295 | + #revisions = article.findall('revision')
|
| 296 | + article_id = article['id'].text
|
297 | 297 | hashes = deque(maxlen=1000)
|
298 | 298 | size = {}
|
299 | | - for revision in revisions:
|
| 299 | + for revision in article['revision']:
|
300 | 300 | if revision == None:
|
301 | 301 | #the entire revision is empty, weird.
|
302 | 302 | continue
|
— | — | @@ -348,34 +348,66 @@ |
349 | 349 | print 'Buffer is empty'
|
350 | 350 |
|
351 | 351 |
|
352 | | -def create_article(input_queue, result_queue):
|
| 352 | +def parse_xml(source, result_queue):
|
| 353 | + context = iterparse(source, events=('end',))
|
| 354 | + context = iter(context)
|
| 355 | + event, root = context.next()
|
| 356 | +
|
| 357 | + article = {}
|
| 358 | + id = False
|
| 359 | + for event, elem in context:
|
| 360 | + if event == 'end' and elem.tag == 'revision':
|
| 361 | + article[elem.tag] = elem
|
| 362 | + elif event == 'end' and elem.tag == 'id' and id == False:
|
| 363 | + article[elem.tag] = elem
|
| 364 | + id = True
|
| 365 | + article[root.tag] = root
|
| 366 | + result_queue.put(article)
|
| 367 | + root.clear()
|
| 368 | +
|
| 369 | +
|
| 370 | +def stream_raw_xml(input_queue, result_queue):
|
353 | 371 | buffer = cStringIO.StringIO()
|
354 | 372 | parsing = False
|
| 373 | +
|
355 | 374 | while True:
|
356 | 375 | filename = input_queue.get()
|
357 | 376 | input_queue.task_done()
|
358 | 377 | if filename == None:
|
359 | 378 | break
|
| 379 | +
|
360 | 380 | #filesize = file_utils.determine_filesize('', filename)
|
361 | 381 | #pbar = progressbar.ProgressBar().start()
|
| 382 | +
|
362 | 383 | for data in unzip(filename):
|
363 | 384 | if data.startswith('<page>'):
|
364 | 385 | parsing = True
|
365 | 386 | if parsing:
|
366 | 387 | buffer.write(data)
|
| 388 | + buffer.write('\n')
|
367 | 389 | if data == '</page>':
|
368 | | - xml_string = buffer.getvalue()
|
369 | | - if xml_string != None:
|
370 | | - result_queue.put(xml_string)
|
| 390 | + buffer.seek(0)
|
| 391 | + parse_xml(buffer, result_queue)
|
371 | 392 | buffer = cStringIO.StringIO()
|
372 | 393 | #pbar.update(pbar.currval + len(data)) #is inaccurate!!!
|
373 | 394 |
|
374 | 395 |
|
375 | | - #for x in xrange(cpu_count()):
|
376 | | - result_queue.put(None)
|
| 396 | + for x in xrange(cpu_count()):
|
| 397 | + result_queue.put(None)
|
377 | 398 | print 'Finished parsing bz2 archives'
|
378 | 399 |
|
379 | 400 |
|
| 401 | +def debug():
|
| 402 | + input_queue = JoinableQueue()
|
| 403 | + result_queue = JoinableQueue()
|
| 404 | + files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
|
| 405 | +
|
| 406 | + for file in files:
|
| 407 | + input_queue.put(file)
|
| 408 | +
|
| 409 | + stream_raw_xml(input_queue, result_queue)
|
| 410 | +
|
| 411 | +
|
380 | 412 | def unzip(filename):
|
381 | 413 | '''
|
382 | 414 | Filename should be a fully qualified path to the bz2 file that will be
|
— | — | @@ -410,7 +442,7 @@ |
411 | 443 | for x in xrange(cpu_count()):
|
412 | 444 | input_queue.put(None)
|
413 | 445 |
|
414 | | - extracters = [Process(target=create_article, args=[input_queue, result_queue])
|
| 446 | + extracters = [Process(target=stream_raw_xml, args=[input_queue, result_queue])
|
415 | 447 | for x in xrange(cpu_count())]
|
416 | 448 | for extracter in extracters:
|
417 | 449 | extracter.start()
|
— | — | @@ -426,5 +458,5 @@ |
427 | 459 |
|
428 | 460 |
|
429 | 461 | if __name__ == '__main__':
|
430 | | - #extract_categories()
|
| 462 | + #debug()
|
431 | 463 | launcher()
|