Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -19,6 +19,7 @@ |
20 | 20 | |
21 | 21 | |
22 | 22 | import bz2 |
| 23 | +import os |
23 | 24 | import cStringIO |
24 | 25 | import hashlib |
25 | 26 | import codecs |
— | — | @@ -27,6 +28,7 @@ |
28 | 29 | import progressbar |
29 | 30 | from multiprocessing import JoinableQueue, Process, cpu_count, current_process |
30 | 31 | from xml.etree.cElementTree import fromstring, iterparse |
| 32 | +from lxml import objectify |
31 | 33 | from collections import deque |
32 | 34 | |
33 | 35 | if '..' not in sys.path: |
— | — | @@ -75,7 +77,16 @@ |
76 | 78 | #109:'Book talk' |
77 | 79 | } |
78 | 80 | |
| 81 | +class Statistics: |
| 82 | + def __init__(self): |
| 83 | + self.count_articles = 0 |
| 84 | + self.count_revisions = 0 |
79 | 85 | |
| 86 | + def summary(self): |
| 87 | + print 'Number of articles: %s' % self.count_articles |
| 88 | + print 'Number of revisions: %s' % self.count_revisions |
| 89 | + |
| 90 | + |
80 | 91 | class Buffer: |
81 | 92 | def __init__(self, storage, id): |
82 | 93 | assert storage == 'cassandra' or storage == 'mongo' or storage == 'csv', \ |
— | — | @@ -89,6 +100,7 @@ |
90 | 101 | 'title', 'timestamp', 'hash', 'revert', 'bot', 'prev_size', |
91 | 102 | 'cur_size', 'delta'] |
92 | 103 | self.setup_storage() |
| 104 | + self.stats = Statistics() |
93 | 105 | |
94 | 106 | def setup_storage(self): |
95 | 107 | if self.storage == 'cassandra': |
— | — | @@ -111,7 +123,8 @@ |
112 | 124 | self.stringify(revision) |
113 | 125 | id = revision['revision_id'] |
114 | 126 | self.revisions[id] = revision |
115 | | - if len(self.revisions) == 1000: |
| 127 | + if len(self.revisions) > 1000: |
| 128 | + print 'Emptying buffer' |
116 | 129 | self.store() |
117 | 130 | self.clear() |
118 | 131 | |
— | — | @@ -250,10 +263,10 @@ |
251 | 264 | if title.startswith(namespace): |
252 | 265 | ns['namespace'] = namespaces[namespace] |
253 | 266 | if ns == {}: |
254 | | - for namespace in NAMESPACE: |
| 267 | + for namespace in NAMESPACE.values(): |
255 | 268 | if title.startswith(namespace): |
256 | 269 | ns = False #article does not belong to either the main namespace, user, talk or user talk namespace. |
257 | | - break |
| 270 | + return ns |
258 | 271 | ns['namespace'] = 0 |
259 | 272 | else: |
260 | 273 | ns = False |
— | — | @@ -262,7 +275,7 @@ |
263 | 276 | |
264 | 277 | def prefill_row(title, article_id, namespace): |
265 | 278 | row = {} |
266 | | - row['title'] = title.text |
| 279 | + row['title'] = title |
267 | 280 | row['article_id'] = article_id |
268 | 281 | row.update(namespace) |
269 | 282 | return row |
— | — | @@ -277,108 +290,93 @@ |
278 | 291 | return revert |
279 | 292 | |
280 | 293 | |
281 | | -def create_variables(result_queue, storage, id): |
282 | | - bots = detector.retrieve_bots('en') |
283 | | - buffer = Buffer(storage, id) |
284 | | - i = 0 |
285 | | - while True: |
286 | | - article = result_queue.get(block=True) |
287 | | - result_queue.task_done() |
288 | | - if article == None: |
289 | | - break |
290 | | - i += 1 |
291 | | - #article = fromstring(article) |
292 | | - title = article['title'].text |
293 | | - namespace = determine_namespace(title) |
294 | | - if namespace != False: |
295 | | - #revisions = article.findall('revision') |
296 | | - article_id = article['id'].text |
297 | | - hashes = deque(maxlen=1000) |
298 | | - size = {} |
299 | | - for revision in article['revision']: |
300 | | - if revision == None: |
301 | | - #the entire revision is empty, weird. |
302 | | - continue |
| 294 | +def add_comment(revision_id, revision): |
| 295 | + comment = {} |
| 296 | + comment[revision_id] = revision.text |
| 297 | + return comment |
| 298 | + |
303 | 299 | |
304 | | - contributor = revision.find('contributor') |
305 | | - contributor = parse_contributor(contributor, bots) |
306 | | - if not contributor: |
307 | | - #editor is anonymous, ignore |
308 | | - continue |
| 300 | +def create_variables(article, cache, cache_comments, bots): |
309 | 301 | |
310 | | - revision_id = revision.find('id') |
311 | | - revision_id = extracter.extract_revision_id(revision_id) |
312 | | - if revision_id == None: |
313 | | - #revision_id is missing, which is weird |
314 | | - continue |
| 302 | + title = article['title'].text |
| 303 | + namespace = determine_namespace(article['title']) |
| 304 | + |
| 305 | + if namespace != False: |
| 306 | + cache.stats.count_articles += 1 |
| 307 | + article_id = article['id'].text |
| 308 | + hashes = deque() |
| 309 | + size = {} |
| 310 | + revisions = article['revisions'] |
| 311 | + for revision in revisions: |
| 312 | + cache.stats.count_revisions += 1 |
| 313 | + if revision == None: |
| 314 | + #the entire revision is empty, weird. |
| 315 | + continue |
| 316 | + contributor = revision.find('contributor') |
| 317 | + contributor = parse_contributor(contributor, bots) |
| 318 | + if not contributor: |
| 319 | + #editor is anonymous, ignore |
| 320 | + continue |
315 | 321 | |
316 | | - row = prefill_row(title, article_id, namespace) |
317 | | - row['revision_id'] = revision_id |
318 | | - text = extract_revision_text(revision) |
319 | | - row.update(contributor) |
| 322 | + revision_id = revision.find('id') |
| 323 | + revision_id = extracter.extract_revision_id(revision_id) |
| 324 | + if revision_id == None: |
| 325 | + #revision_id is missing, which is weird |
| 326 | + continue |
| 327 | + comment = add_comment(revision_id, revision) |
| 328 | + row = prefill_row(title, article_id, namespace) |
| 329 | + row['revision_id'] = revision_id |
| 330 | + text = extract_revision_text(revision) |
| 331 | + row.update(contributor) |
320 | 332 | |
| 333 | + timestamp = revision.find('timestamp').text |
| 334 | + row['timestamp'] = timestamp |
321 | 335 | |
322 | | - timestamp = revision.find('timestamp').text |
323 | | - row['timestamp'] = timestamp |
| 336 | + hash = create_md5hash(text) |
| 337 | + revert = is_revision_reverted(hash['hash'], hashes) |
| 338 | + hashes.append(hash['hash']) |
| 339 | + size = calculate_delta_article_size(size, text) |
324 | 340 | |
325 | | - hash = create_md5hash(text) |
326 | | - revert = is_revision_reverted(hash['hash'], hashes) |
327 | | - hashes.append(hash['hash']) |
328 | | - size = calculate_delta_article_size(size, text) |
| 341 | + row.update(hash) |
| 342 | + row.update(size) |
| 343 | + row.update(revert) |
| 344 | + cache.add(row) |
| 345 | + |
329 | 346 | |
330 | | - row.update(hash) |
331 | | - row.update(size) |
332 | | - row.update(revert) |
333 | | - # print row |
334 | | - # if row['username'] == None: |
335 | | - # contributor = revision.find('contributor') |
336 | | - # attrs = contributor.getchildren() |
337 | | - # for attr in attrs: |
338 | | - # print attr.text |
339 | | - #print revision_id, hash, delta, prev_size\ |
340 | 347 | |
341 | | - buffer.add(row) |
342 | | - if i % 10000 == 0: |
343 | | - print 'Parsed %s articles' % i |
344 | | -# except ValueError, e: |
345 | | -# print e |
346 | | -# except UnicodeDecodeError, e: |
347 | | -# print e |
348 | | - buffer.empty() |
349 | | - print 'Buffer is empty' |
350 | 348 | |
351 | | - |
352 | | -def parse_xml(source, result_queue): |
353 | | - context = iterparse(source, events=('end',)) |
| 349 | +def parse_xml(buffer): |
| 350 | + context = iterparse(buffer, events=('end',)) |
354 | 351 | context = iter(context) |
355 | 352 | event, root = context.next() |
356 | 353 | |
357 | 354 | article = {} |
358 | 355 | id = False |
| 356 | + article[root.tag] = root |
| 357 | + article['revisions'] = [] |
359 | 358 | for event, elem in context: |
360 | 359 | if event == 'end' and elem.tag == 'revision': |
361 | | - article[elem.tag] = elem |
| 360 | + article['revisions'].append(elem) |
362 | 361 | elif event == 'end' and elem.tag == 'id' and id == False: |
363 | 362 | article[elem.tag] = elem |
364 | 363 | id = True |
365 | | - article[root.tag] = root |
366 | | - result_queue.put(article) |
367 | | - root.clear() |
368 | 364 | |
| 365 | + return article |
369 | 366 | |
370 | | -def stream_raw_xml(input_queue, result_queue): |
| 367 | + |
| 368 | +def stream_raw_xml(input_queue, storage, id): |
371 | 369 | buffer = cStringIO.StringIO() |
372 | 370 | parsing = False |
373 | | - |
| 371 | + bots = detector.retrieve_bots('en') |
| 372 | + cache = Buffer(storage, id) |
| 373 | + cache_comments = Buffer(storage, id) |
| 374 | + i = 0 |
374 | 375 | while True: |
375 | 376 | filename = input_queue.get() |
376 | 377 | input_queue.task_done() |
377 | 378 | if filename == None: |
378 | 379 | break |
379 | 380 | |
380 | | - #filesize = file_utils.determine_filesize('', filename) |
381 | | - #pbar = progressbar.ProgressBar().start() |
382 | | - |
383 | 381 | for data in unzip(filename): |
384 | 382 | if data.startswith('<page>'): |
385 | 383 | parsing = True |
— | — | @@ -387,14 +385,19 @@ |
388 | 386 | buffer.write('\n') |
389 | 387 | if data == '</page>': |
390 | 388 | buffer.seek(0) |
391 | | - parse_xml(buffer, result_queue) |
| 389 | + article = parse_xml(buffer) |
| 390 | + create_variables(article, cache, cache_comments, bots) |
392 | 391 | buffer = cStringIO.StringIO() |
393 | | - #pbar.update(pbar.currval + len(data)) #is inaccurate!!! |
| 392 | + i += 1 |
| 393 | + if i % 10000 == 0: |
| 394 | + print 'Parsed %s articles' % i |
394 | 395 | |
395 | | - |
396 | | - for x in xrange(cpu_count()): |
397 | | - result_queue.put(None) |
| 396 | + |
| 397 | + cache.empty() |
| 398 | + cache_comments.empty() |
| 399 | + print 'Buffer is empty' |
398 | 400 | print 'Finished parsing bz2 archives' |
| 401 | + cache.stats.summary() |
399 | 402 | |
400 | 403 | |
401 | 404 | def debug(): |
— | — | @@ -428,33 +431,34 @@ |
429 | 432 | |
430 | 433 | |
431 | 434 | def launcher(): |
432 | | - |
433 | 435 | storage = 'csv' |
434 | 436 | setup(storage) |
435 | 437 | input_queue = JoinableQueue() |
436 | | - result_queue = JoinableQueue() |
437 | 438 | #files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2'] |
438 | | - files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2'] |
| 439 | + #files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2'] |
| 440 | + path = '/media/77fc623f-78c1-4f1e-be57-0f3043d778aa/wikipedia_dumps/batch1/' |
| 441 | + files = file_utils.retrieve_file_list(path, 'bz2', mask=None) |
439 | 442 | |
440 | 443 | for file in files: |
441 | | - input_queue.put(file) |
| 444 | + filename = os.path.join(path, file) |
| 445 | + print filename |
| 446 | + input_queue.put(filename) |
442 | 447 | |
443 | 448 | for x in xrange(cpu_count()): |
444 | 449 | input_queue.put(None) |
445 | 450 | |
446 | | - extracters = [Process(target=stream_raw_xml, args=[input_queue, result_queue]) |
| 451 | + extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, x]) |
447 | 452 | for x in xrange(cpu_count())] |
448 | 453 | for extracter in extracters: |
449 | 454 | extracter.start() |
450 | 455 | |
451 | | - creators = [Process(target=create_variables, args=[result_queue, storage, x]) |
452 | | - for x in xrange(cpu_count())] |
453 | | - for creator in creators: |
454 | | - creator.start() |
| 456 | + #creators = [Process(target=create_variables, args=[result_queue, storage, x]) |
| 457 | + # for x in xrange(cpu_count())] |
| 458 | + #for creator in creators: |
| 459 | + # creator.start() |
455 | 460 | |
456 | 461 | |
457 | 462 | input_queue.join() |
458 | | - result_queue.join() |
459 | 463 | |
460 | 464 | |
461 | 465 | if __name__ == '__main__': |