Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -34,6 +34,8 @@ |
35 | 35 | if '..' not in sys.path: |
36 | 36 | sys.path.append('..') |
37 | 37 | |
| 38 | +from utils import file_utils |
| 39 | + |
38 | 40 | try: |
39 | 41 | from database import cassandra |
40 | 42 | import pycassa |
— | — | @@ -265,7 +267,11 @@ |
266 | 268 | if ns == {}: |
267 | 269 | for namespace in NAMESPACE.values(): |
268 | 270 | if title.startswith(namespace): |
269 | | - ns = False #article does not belong to either the main namespace, user, talk or user talk namespace. |
| 271 | + ''' |
| 272 | + article does not belong to either the main namespace, user, |
| 273 | + talk or user talk namespace. |
| 274 | + ''' |
| 275 | + ns = False |
270 | 276 | return ns |
271 | 277 | ns['namespace'] = 0 |
272 | 278 | else: |
— | — | @@ -283,7 +289,7 @@ |
284 | 290 | |
285 | 291 | def is_revision_reverted(hash_cur, hashes): |
286 | 292 | revert = {} |
287 | | - if hash_cur in hashes: |
| 293 | + if hash_cur in hashes and hash_cur != -1: |
288 | 294 | revert['revert'] = 1 |
289 | 295 | else: |
290 | 296 | revert['revert'] = 0 |
— | — | @@ -296,12 +302,34 @@ |
297 | 303 | if text != None and text.text != None: |
298 | 304 | comment[revision_id] = text.text.encode('utf-8') |
299 | 305 | return comment |
300 | | - |
301 | 306 | |
| 307 | + |
| 308 | + |
| 309 | +def count_edits(article, counts, bots): |
| 310 | + title = article['title'].text |
| 311 | + namespace = determine_namespace(article['title']) |
| 312 | + |
| 313 | + if namespace != False: |
| 314 | + article_id = article['id'].text |
| 315 | + revisions = article['revisions'] |
| 316 | + for revision in revisions: |
| 317 | + if revision == None: |
| 318 | + #the entire revision is empty, weird. |
| 319 | + continue |
| 320 | + contributor = revision.find('contributor') |
| 321 | + contributor = parse_contributor(contributor, bots) |
| 322 | + if not contributor: |
| 323 | + #editor is anonymous, ignore |
| 324 | + continue |
| 325 | + counts.setdefault(contributor['username'], 0) |
| 326 | + counts[contributor['username']] += 1 |
| 327 | + return counts |
| 328 | + |
| 329 | + |
302 | 330 | def create_variables(article, cache, bots): |
303 | 331 | title = article['title'].text |
304 | 332 | namespace = determine_namespace(article['title']) |
305 | | - |
| 333 | + |
306 | 334 | if namespace != False: |
307 | 335 | cache.stats.count_articles += 1 |
308 | 336 | article_id = article['id'].text |
— | — | @@ -324,7 +352,7 @@ |
325 | 353 | if revision_id == None: |
326 | 354 | #revision_id is missing, which is weird |
327 | 355 | continue |
328 | | - |
| 356 | + |
329 | 357 | row = prefill_row(title, article_id, namespace) |
330 | 358 | row['revision_id'] = revision_id |
331 | 359 | text = extract_revision_text(revision) |
— | — | @@ -332,7 +360,7 @@ |
333 | 361 | |
334 | 362 | comment = extract_comment_text(revision_id, revision) |
335 | 363 | cache.comments.update(comment) |
336 | | - |
| 364 | + |
337 | 365 | timestamp = revision.find('timestamp').text |
338 | 366 | row['timestamp'] = timestamp |
339 | 367 | |
— | — | @@ -366,12 +394,17 @@ |
367 | 395 | return article |
368 | 396 | |
369 | 397 | |
370 | | -def stream_raw_xml(input_queue, storage, id): |
| 398 | +def stream_raw_xml(input_queue, storage, id, dataset='training'): |
371 | 399 | buffer = cStringIO.StringIO() |
372 | 400 | parsing = False |
| 401 | + i = 0 |
373 | 402 | bots = detector.retrieve_bots('en') |
374 | | - cache = Buffer(storage, id) |
375 | | - i = 0 |
| 403 | + |
| 404 | + if dataset == 'training': |
| 405 | + cache = Buffer(storage, id) |
| 406 | + else: |
| 407 | + counts = {} |
| 408 | + |
376 | 409 | while True: |
377 | 410 | filename = input_queue.get() |
378 | 411 | input_queue.task_done() |
— | — | @@ -379,38 +412,32 @@ |
380 | 413 | break |
381 | 414 | |
382 | 415 | for data in unzip(filename): |
383 | | - if data.startswith('<page>'): |
| 416 | + if data.find('<page>') > -1: |
384 | 417 | parsing = True |
385 | 418 | if parsing: |
386 | 419 | buffer.write(data) |
387 | | - buffer.write('\n') |
388 | | - if data == '</page>': |
| 420 | + if data.find('</page>') > -1: |
389 | 421 | i += 1 |
390 | 422 | buffer.seek(0) |
391 | 423 | article = parse_xml(buffer) |
392 | | - create_variables(article, cache, bots) |
| 424 | + if dataset == 'training': |
| 425 | + function(article, cache, bots) |
| 426 | + else: |
| 427 | + counts = function(article, counts, bots) |
393 | 428 | buffer = cStringIO.StringIO() |
394 | 429 | |
395 | 430 | if i % 10000 == 0: |
396 | 431 | print 'Worker %s parsed %s articles' % (id, i) |
397 | 432 | |
398 | | - |
399 | | - cache.empty() |
400 | | - print 'Finished parsing bz2 archives' |
401 | | - cache.stats.summary() |
| 433 | + if dataset == 'training': |
| 434 | + cache.empty() |
| 435 | + print 'Finished parsing bz2 archives' |
| 436 | + cache.stats.summary() |
| 437 | + else: |
| 438 | + location = os.getcwd() |
| 439 | + file_utils.store_object(counts, location, 'counts.bin') |
402 | 440 | |
403 | 441 | |
404 | | -def debug(): |
405 | | - input_queue = JoinableQueue() |
406 | | - result_queue = JoinableQueue() |
407 | | - files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2'] |
408 | | - |
409 | | - for file in files: |
410 | | - input_queue.put(file) |
411 | | - |
412 | | - stream_raw_xml(input_queue, result_queue) |
413 | | - |
414 | | - |
415 | 442 | def unzip(filename): |
416 | 443 | ''' |
417 | 444 | Filename should be a fully qualified path to the bz2 file that will be |
— | — | @@ -419,26 +446,26 @@ |
420 | 447 | ''' |
421 | 448 | fh = bz2.BZ2File(filename, 'r') |
422 | 449 | for line in fh: |
423 | | - line = line.strip() |
424 | 450 | yield line |
425 | 451 | fh.close() |
426 | 452 | print 'Reached end of BZ2 file.' |
427 | 453 | |
| 454 | + |
428 | 455 | def setup(storage): |
429 | 456 | keyspace_name = 'enwiki' |
430 | 457 | if storage == 'cassandra': |
431 | 458 | cassandra.install_schema(keyspace_name, drop_first=True) |
432 | 459 | |
433 | 460 | |
434 | | -def launcher(): |
| 461 | +def launcher(function, path): |
435 | 462 | storage = 'csv' |
436 | 463 | setup(storage) |
437 | 464 | input_queue = JoinableQueue() |
438 | 465 | #files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2'] |
439 | 466 | #files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2'] |
440 | | - path = '/media/wikipedia_dumps/batch1/' |
441 | | - files = file_utils.retrieve_file_list(path, 'bz2', mask=None) |
442 | 467 | |
| 468 | + files = file_utils.retrieve_file_list(path, 'bz2') |
| 469 | + |
443 | 470 | for file in files: |
444 | 471 | filename = os.path.join(path, file) |
445 | 472 | print filename |
— | — | @@ -447,20 +474,19 @@ |
448 | 475 | for x in xrange(cpu_count()): |
449 | 476 | input_queue.put(None) |
450 | 477 | |
451 | | - extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, x]) |
| 478 | + extracters = [Process(target=stream_raw_xml, args=[input_queue, function, storage, x]) |
452 | 479 | for x in xrange(cpu_count())] |
453 | 480 | for extracter in extracters: |
454 | 481 | extracter.start() |
455 | 482 | |
456 | | - #creators = [Process(target=create_variables, args=[result_queue, storage, x]) |
457 | | - # for x in xrange(cpu_count())] |
458 | | - #for creator in creators: |
459 | | - # creator.start() |
460 | | - |
461 | | - |
462 | 483 | input_queue.join() |
463 | 484 | |
464 | 485 | |
465 | 486 | if __name__ == '__main__': |
466 | | - #debug() |
467 | | - launcher() |
| 487 | + path1 = '/media/wikipedia_dumps/batch1/' |
| 488 | + path2 = '/media/wikipedia_dumps/batch2/' |
| 489 | + function1 = create_variables |
| 490 | + function2 = count_edits |
| 491 | + |
| 492 | + launcher(function1, path1) # launcher for creating training data |
| 493 | + launcher(function2, path2) # launcher for creating test data |