Index: trunk/tools/editor_trends/etl/store.py |
— | — | @@ -60,19 +60,34 @@ |
61 | 61 | fh = file_utils.create_txt_filehandle(self.rts.sorted, filename, |
62 | 62 | 'r', self.rts.encoding) |
63 | 63 | for line in file_utils.read_raw_data(fh): |
64 | | - if len(line) > 1: |
65 | | - contributor = line[0] |
| 64 | + if len(line) == 12: |
| 65 | + article_id = int(line[1]) |
| 66 | + contributor = line[2] |
66 | 67 | #print 'Parsing %s' % contributor |
67 | 68 | if prev_contributor != contributor and prev_contributor != -1: |
68 | 69 | editor_cache.add(prev_contributor, 'NEXT') |
69 | | - date = text_utils.convert_timestamp_to_datetime_utc(line[1]) |
70 | | - article_id = int(line[2]) |
| 70 | + |
71 | 71 | username = line[3].encode(self.rts.encoding) |
72 | | - ns = int(line[5]) |
| 72 | + |
| 73 | + ns = int(line[4]) |
| 74 | + date = text_utils.convert_timestamp_to_datetime_utc(line[6]) |
| 75 | + hash = line[7] |
| 76 | + revert = int(line[8]) |
| 77 | + bot = int(line[9]) |
| 78 | + cur_size = int(line[10]) |
| 79 | + delta = int(line[11]) |
| 80 | + |
73 | 81 | value = {'date': date, |
74 | 82 | 'article': article_id, |
75 | 83 | 'username': username, |
76 | | - 'ns': ns} |
| 84 | + 'ns': ns, |
| 85 | + 'hash':hash, |
| 86 | + 'revert':revert, |
| 87 | + 'cur_size':cur_size, |
| 88 | + 'delta':delta, |
| 89 | + 'bot':bot |
| 90 | + } |
| 91 | + |
77 | 92 | editor_cache.add(contributor, value) |
78 | 93 | prev_contributor = contributor |
79 | 94 | fh.close() |
— | — | @@ -81,18 +96,18 @@ |
82 | 97 | |
83 | 98 | def store_articles(rts): |
84 | 99 | mongo = db.init_mongo_db(rts.dbname) |
| 100 | + db.drop_collection(rts.dbname, rts.articles_raw) |
85 | 101 | collection = mongo[rts.articles_raw] |
| 102 | + db.add_index_to_collection(rts.dbname, rts.articles_raw, 'id') |
| 103 | + db.add_index_to_collection(rts.dbname, rts.articles_raw, 'title') |
86 | 104 | |
87 | | - location = os.path.join(rts.input_location, rts.language.code, rts.project.name) |
88 | | - fh = file_utils.create_txt_filehandle(location, 'articles.csv', 'r', rts.encoding) |
| 105 | + location = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt') |
| 106 | + fh = file_utils.create_txt_filehandle(location, 'titles.csv', 'r', rts.encoding) |
89 | 107 | print 'Storing article titles...' |
90 | | - print location |
91 | 108 | for line in fh: |
92 | 109 | line = line.strip() |
93 | | - try: |
94 | | - id, title = line.split('\t') |
95 | | - except ValueError: |
96 | | - print line.encode('utf-8') |
| 110 | + #print line.encode('utf-8') |
| 111 | + id, title = line.split('\t') |
97 | 112 | collection.insert({'id':id, 'title':title}) |
98 | 113 | fh.close() |
99 | 114 | print 'Done...' |
— | — | @@ -105,6 +120,7 @@ |
106 | 121 | ''' |
107 | 122 | store_articles(rts) |
108 | 123 | print 'Input directory is: %s ' % rts.sorted |
| 124 | + db.drop_collection(rts.dbname, rts.editors_dataset) |
109 | 125 | mongo = db.init_mongo_db(rts.dbname) |
110 | 126 | coll = mongo[rts.editors_raw] |
111 | 127 | coll.ensure_index('editor') |
Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -37,19 +37,18 @@ |
38 | 38 | import pycassa |
39 | 39 | |
40 | 40 | except ImportError: |
41 | | - print 'I am not going to use Cassandra today, it\'s my off day.' |
| 41 | + pass |
42 | 42 | |
43 | | - |
44 | 43 | from database import db |
45 | 44 | from bots import detector |
46 | 45 | from utils import file_utils |
47 | 46 | |
48 | | -NAMESPACE = { |
| 47 | +EXCLUDE_NAMESPACE = { |
49 | 48 | #0:'Main', |
50 | 49 | #1:'Talk', |
51 | 50 | #2:'User', |
52 | 51 | #3:'User talk', |
53 | | - 4:'Wikipedia', |
| 52 | + #4:'Wikipedia', |
54 | 53 | #5:'Wikipedia talk', |
55 | 54 | 6:'File', |
56 | 55 | #7:'File talk', |
— | — | @@ -71,6 +70,21 @@ |
72 | 71 | #109:'Book talk' |
73 | 72 | } |
74 | 73 | |
| 74 | +COUNT_EXCLUDE_NAMESPACE = { |
| 75 | + 1:'Talk', |
| 76 | + 2:'User', |
| 77 | + 4:'Wikipedia', |
| 78 | + 6:'File', |
| 79 | + 8:'MediaWiki', |
| 80 | + 10:'Template', |
| 81 | + 12:'Help', |
| 82 | + 14:'Category', |
| 83 | + 90:'Thread', |
| 84 | + 92:'Summary', |
| 85 | + 100:'Portal', |
| 86 | + 108:'Book', |
| 87 | +} |
| 88 | + |
75 | 89 | class Statistics: |
76 | 90 | def __init__(self): |
77 | 91 | self.count_articles = 0 |
— | — | @@ -82,18 +96,18 @@ |
83 | 97 | |
84 | 98 | |
85 | 99 | class Buffer: |
86 | | - def __init__(self, storage, id, rts=None, filehandles=None, locks=None): |
| 100 | + def __init__(self, storage, processs_id, rts=None, filehandles=None, locks=None): |
87 | 101 | assert storage == 'cassandra' or storage == 'mongo' or storage == 'csv', \ |
88 | 102 | 'Valid storage options are cassandra and mongo.' |
89 | 103 | self.storage = storage |
90 | 104 | self.revisions = {} |
91 | 105 | self.comments = {} |
92 | 106 | self.titles = {} |
93 | | - self.id = id |
| 107 | + self.processs_id = processs_id |
94 | 108 | self.keyspace_name = 'enwiki' |
95 | | - self.keys = ['revision_id', 'article_id', 'id', 'namespace', |
96 | | - 'title', 'timestamp', 'hash', 'revert', 'bot', 'prev_size', |
97 | | - 'cur_size', 'delta'] |
| 109 | + self.keys = ['revision_id', 'article_id', 'id', 'username', 'namespace', |
| 110 | + 'title', 'timestamp', 'hash', 'revert', 'bot', 'cur_size', |
| 111 | + 'delta'] |
98 | 112 | self.setup_storage() |
99 | 113 | self.stats = Statistics() |
100 | 114 | if storage == 'csv': |
— | — | @@ -101,7 +115,9 @@ |
102 | 116 | self.lock1 = locks[0] #lock for generic data |
103 | 117 | self.lock2 = locks[1] #lock for comment data |
104 | 118 | self.lock3 = locks[2] #lock for article titles |
105 | | - self.filehandles = filehandles |
| 119 | + self.filehandles = filehandles[0] |
| 120 | + self.fh_titles = filehandles[1] |
| 121 | + self.fh_comments = filehandles[2] |
106 | 122 | |
107 | 123 | def setup_storage(self): |
108 | 124 | if self.storage == 'cassandra': |
— | — | @@ -112,14 +128,6 @@ |
113 | 129 | self.db = db.init_mongo_db(self.keyspace_name) |
114 | 130 | self.collection = self.db['kaggle'] |
115 | 131 | |
116 | | - else: |
117 | | - title_file = 'titles.csv' |
118 | | - comment_file = 'comments.csv' |
119 | | - file_utils.delete_file('', title_file, directory=False) |
120 | | - file_utils.delete_file('', comment_file, directory=False) |
121 | | - self.fh_titles = codecs.open(title_file, 'a', 'utf-8') |
122 | | - self.fh_comments = codecs.open(comment_file, 'a', 'utf-8') |
123 | | - |
124 | 132 | def get_hash(self, id): |
125 | 133 | ''' |
126 | 134 | A very simple hash function based on modulo. The except clause has been |
— | — | @@ -164,10 +172,8 @@ |
165 | 173 | def empty(self): |
166 | 174 | self.store() |
167 | 175 | self.clear() |
168 | | - if self.storage == 'csv': |
169 | | - self.fh_main.close() |
170 | | - self.fh_extra.close() |
171 | 176 | |
| 177 | + |
172 | 178 | def clear(self): |
173 | 179 | self.revisions = {} |
174 | 180 | self.comments = {} |
— | — | @@ -184,31 +190,44 @@ |
185 | 191 | values = [] |
186 | 192 | for key in self.keys: |
187 | 193 | values.append(revision[key].decode('utf-8')) |
188 | | - values.insert(0, id) |
| 194 | + #values.insert(0, id) |
189 | 195 | rows.append(values) |
190 | 196 | self.write_output(rows) |
191 | 197 | |
192 | 198 | if self.comments: |
193 | 199 | self.lock2.acquire() |
194 | 200 | try: |
| 201 | + rows = [] |
195 | 202 | for revision_id, comment in self.comments.iteritems(): |
196 | | - comment = comment.decode('utf-8') |
197 | | - row = '\t'.join([revision_id, comment]) + '\n' |
198 | | - file_utils.write_list_to_csv(row, fh_comments, lock=self.lock2) |
199 | | - except: |
200 | | - pass |
| 203 | + #comment = comment.decode('utf-8') |
| 204 | + #row = '\t'.join([revision_id, comment]) + '\n' |
| 205 | + rows.append([revision_id, comment]) |
| 206 | + file_utils.write_list_to_csv(row, self.fh_comments) |
| 207 | + except Exception, error: |
| 208 | + print error |
201 | 209 | finally: |
202 | 210 | self.lock2.release() |
203 | 211 | |
204 | 212 | elif self.titles: |
205 | 213 | self.lock3.acquire() |
206 | 214 | try: |
207 | | - for article_id, title in self.titles.iteritems(): |
208 | | - title = title.decode('utf-8') |
209 | | - row = '\t'.join([article_id, title]) + '\n' |
210 | | - file_utils.write_list_to_csv(row, fh_titles, lock=self.lock3) |
211 | | - except: |
212 | | - pass |
| 215 | + rows = [] |
| 216 | + for article_id, dict in self.titles.iteritems(): |
| 217 | + keys = dict.keys() |
| 218 | + value = [] |
| 219 | + for key in keys: |
| 220 | + #obs = '%s=%s' % (key, dict[key]) |
| 221 | + value.append(dict[key]) |
| 222 | + #if key != 'ns' and key != 'title': |
| 223 | + # print dict['title'], obs |
| 224 | + #article_id = 'id=%s' % article_id |
| 225 | + value.insert(0, article_id) |
| 226 | + #title = title.encode('ascii') |
| 227 | + #row = '\t'.join([article_id, title]) + '\n' |
| 228 | + rows.append(value) |
| 229 | + file_utils.write_list_to_csv(rows, self.fh_titles, newline=False) |
| 230 | + except Exception, error: |
| 231 | + print error |
213 | 232 | finally: |
214 | 233 | self.lock3.release() |
215 | 234 | |
— | — | @@ -298,6 +317,51 @@ |
299 | 318 | return '' |
300 | 319 | |
301 | 320 | |
| 321 | +def parse_title(title): |
| 322 | + return title.text |
| 323 | + |
| 324 | + |
| 325 | +def parse_title_meta_data(title, namespace): |
| 326 | + ''' |
| 327 | + This function categorizes an article to assist the Wikimedia Taxonomy |
| 328 | + project. See |
| 329 | + http://meta.wikimedia.org/wiki/Contribution_Taxonomy_Project/Research_Questions |
| 330 | + ''' |
| 331 | + title_meta = {} |
| 332 | + if not namespace: |
| 333 | + return title_meta |
| 334 | + |
| 335 | + title_meta['title'] = title |
| 336 | + ns = namespace['namespace'] |
| 337 | + title_meta['ns'] = ns |
| 338 | + if title.startswith('List of'): |
| 339 | + title_meta['list'] = True |
| 340 | + elif ns == 4 or ns == 5: |
| 341 | + if title.find('Articles for deletion') > -1: |
| 342 | + title_meta['category'] = 'Deletion' |
| 343 | + elif title.find('Mediation Committee') > -1: |
| 344 | + title_meta['category'] = 'Mediation' |
| 345 | + elif title.find('Mediation Cabal') > -1: |
| 346 | + title_meta['category'] = 'Mediation' |
| 347 | + elif title.find('Arbitration') > -1: |
| 348 | + title_meta['category'] = 'Arbitration' |
| 349 | + elif title.find('Featured Articles') > -1: |
| 350 | + title_meta['category'] = 'Featured Article' |
| 351 | + elif title.find('Featured picture candidates') > -1: |
| 352 | + title_meta['category'] = 'Featured Pictures' |
| 353 | + elif title.find('Featured sound candidates') > -1: |
| 354 | + title_meta['category'] = 'Featured Sounds' |
| 355 | + elif title.find('Featured list candidates') > -1: |
| 356 | + title_meta['category'] = 'Featured Lists' |
| 357 | + elif title.find('Featured portal candidates') > -1: |
| 358 | + title_meta['category'] = 'Featured Portal' |
| 359 | + elif title.find('Featured topic candidates') > -1: |
| 360 | + title_meta['category'] = 'Featured Topic' |
| 361 | + elif title.find('Good Article') > -1: |
| 362 | + title_meta['category'] = 'Good Article' |
| 363 | + return title_meta |
| 364 | + |
| 365 | + |
302 | 366 | def extract_username(contributor, xml_namespace): |
303 | 367 | contributor = contributor.find('%s%s' % (xml_namespace, 'username')) |
304 | 368 | if contributor != None: |
— | — | @@ -380,7 +444,6 @@ |
381 | 445 | username = extract_username(revision, xml_namespace) |
382 | 446 | user_id = extract_contributor_id(revision, xml_namespace) |
383 | 447 | bot = determine_username_is_bot(revision, bots, xml_namespace) |
384 | | - revision.clear() |
385 | 448 | editor = {} |
386 | 449 | editor['username'] = username |
387 | 450 | editor['bot'] = bot |
— | — | @@ -391,19 +454,21 @@ |
392 | 455 | return editor |
393 | 456 | |
394 | 457 | |
395 | | -def determine_namespace(title, namespaces): |
| 458 | +def determine_namespace(title, include_ns, exclude_ns): |
| 459 | + ''' |
| 460 | + You can only determine whether an article belongs to the Main Namespace |
| 461 | + by ruling out that it does not belong to any other namepace |
| 462 | + ''' |
396 | 463 | ns = {} |
397 | 464 | if title != None: |
398 | | - for namespace in namespaces: |
399 | | - if title.startswith(namespace): |
400 | | - ns['namespace'] = namespaces[namespace] |
| 465 | + for namespace in include_ns: |
| 466 | + if title.startswith(ns): |
| 467 | + ns['namespace'] = include_ns[namespace] |
401 | 468 | if ns == {}: |
402 | | - for namespace in NAMESPACE.values(): |
| 469 | + for namespace in exclude_ns.values(): |
403 | 470 | if title.startswith(namespace): |
404 | | - ''' |
405 | | - article does not belong to either the main namespace, user, |
406 | | - talk or user talk namespace. |
407 | | - ''' |
| 471 | + '''article does not belong to any of the include_ns |
| 472 | + namespaces''' |
408 | 473 | ns = False |
409 | 474 | return ns |
410 | 475 | ns['namespace'] = 0 |
— | — | @@ -445,9 +510,8 @@ |
446 | 511 | |
447 | 512 | |
448 | 513 | def count_edits(article, counts, bots, xml_namespace): |
449 | | - namespaces = {} |
450 | | - title = article['title'].text |
451 | | - namespace = determine_namespace(title, namespaces) |
| 514 | + title = parse_title(article['title']) |
| 515 | + namespace = determine_namespace(title, {}, COUNT_EXCLUDE_NAMESPACE) |
452 | 516 | if namespace != False: |
453 | 517 | article_id = article['id'].text |
454 | 518 | revisions = article['revisions'] |
— | — | @@ -469,18 +533,21 @@ |
470 | 534 | return counts |
471 | 535 | |
472 | 536 | |
473 | | -def create_variables(article, cache, bots, xml_namespace): |
474 | | - namespaces = {'User': 2, |
| 537 | +def create_variables(article, cache, bots, xml_namespace, comments=False): |
| 538 | + include_ns = {'User Talk': 3, |
| 539 | + 'Wikipedia Talk': 5, |
475 | 540 | 'Talk': 1, |
476 | | - 'User Talk': 3, |
| 541 | + 'User': 2, |
| 542 | + 'Wikipedia': 4, |
477 | 543 | } |
478 | | - title = article['title'].text |
479 | | - namespace = determine_namespace(title, namespaces) |
480 | | - |
| 544 | + title = parse_title(article['title']) |
| 545 | + namespace = determine_namespace(title, include_ns, EXCLUDE_NAMESPACE) |
| 546 | + title_meta = parse_title_meta_data(title, namespace) |
481 | 547 | if namespace != False: |
482 | 548 | cache.stats.count_articles += 1 |
483 | 549 | article_id = article['id'].text |
484 | 550 | article['id'].clear() |
| 551 | + cache.titles[article_id] = title_meta |
485 | 552 | hashes = deque() |
486 | 553 | size = {} |
487 | 554 | revisions = article['revisions'] |
— | — | @@ -495,7 +562,6 @@ |
496 | 563 | if not contributor: |
497 | 564 | #editor is anonymous, ignore |
498 | 565 | continue |
499 | | - |
500 | 566 | revision_id = revision.find('%s%s' % (xml_namespace, 'id')) |
501 | 567 | revision_id = extract_revision_id(revision_id) |
502 | 568 | if revision_id == None: |
— | — | @@ -507,8 +573,9 @@ |
508 | 574 | text = extract_revision_text(revision) |
509 | 575 | row.update(contributor) |
510 | 576 | |
511 | | - comment = extract_comment_text(revision_id, revision) |
512 | | - cache.comments.update(comment) |
| 577 | + if comments: |
| 578 | + comment = extract_comment_text(revision_id, revision) |
| 579 | + cache.comments.update(comment) |
513 | 580 | |
514 | 581 | timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text |
515 | 582 | row['timestamp'] = timestamp |
— | — | @@ -525,8 +592,7 @@ |
526 | 593 | revision.clear() |
527 | 594 | |
528 | 595 | |
529 | | - |
530 | | -def parse_xml(fh, xml_namespace): |
| 596 | +def parse_xml(fh, xml_namespace, wikilytics=True): |
531 | 597 | context = iterparse(fh, events=('end',)) |
532 | 598 | context = iter(context) |
533 | 599 | |
— | — | @@ -548,28 +614,31 @@ |
549 | 615 | article = {} |
550 | 616 | article['revisions'] = [] |
551 | 617 | id = False |
552 | | - #elif event == 'end': |
| 618 | + #elif wikilytics == True and event == 'end': |
553 | 619 | # elem.clear() |
554 | 620 | |
555 | 621 | |
556 | | -def delayediter(iterable): |
557 | | - iterable = iter(iterable) |
558 | | - prev = iterable.next() |
559 | | - for item in iterable: |
560 | | - yield prev |
561 | | - prev = item |
562 | | - yield prev |
563 | | - |
564 | | -def stream_raw_xml(input_queue, storage, id, function, dataset, locks, rts): |
| 622 | +def stream_raw_xml(input_queue, storage, process_id, function, dataset, locks, rts): |
565 | 623 | bots = detector.retrieve_bots('en') |
566 | 624 | xml_namespace = '{http://www.mediawiki.org/xml/export-0.4/}' |
567 | 625 | path = os.path.join(rts.location, 'txt') |
| 626 | + |
568 | 627 | filehandles = [file_utils.create_txt_filehandle(path, '%s.csv' % fh, 'a', |
569 | 628 | rts.encoding) for fh in xrange(rts.max_filehandles)] |
| 629 | + |
| 630 | + title_file = os.path.join(path, 'titles.csv') |
| 631 | + comment_file = os.path.join(path, 'comments.csv') |
| 632 | + #file_utils.delete_file(path, title_file, directory=False) |
| 633 | + #file_utils.delete_file(path, comment_file, directory=False) |
| 634 | + fh_titles = codecs.open(title_file, 'a', 'utf-8') |
| 635 | + fh_comments = codecs.open(comment_file, 'a', 'utf-8') |
| 636 | + handles = [filehandles, fh_titles, fh_comments] |
| 637 | + wikilytics = False |
| 638 | + |
570 | 639 | t0 = datetime.datetime.now() |
571 | 640 | i = 0 |
572 | 641 | if dataset == 'training': |
573 | | - cache = Buffer(storage, id, rts, filehandles, locks) |
| 642 | + cache = Buffer(storage, process_id, rts, handles, locks) |
574 | 643 | else: |
575 | 644 | counts = {} |
576 | 645 | |
— | — | @@ -580,31 +649,45 @@ |
581 | 650 | break |
582 | 651 | |
583 | 652 | fh = file_utils.create_streaming_buffer(filename) |
| 653 | + filename = os.path.split(filename)[1] |
| 654 | + filename = os.path.splitext(filename)[0] |
584 | 655 | for article in parse_xml(fh, xml_namespace): |
585 | 656 | if dataset == 'training': |
586 | | - function(article, cache, bots, xml_namespace) |
| 657 | + function(article, cache, bots, xml_namespace, wikilytics) |
587 | 658 | elif dataset == 'prediction': |
588 | 659 | counts = function(article, counts, bots, xml_namespace) |
589 | 660 | i += 1 |
590 | 661 | if i % 10000 == 0: |
591 | | - print 'Worker %s parsed %s articles' % (id, i) |
| 662 | + print 'Worker %s parsed %s articles' % (process_id, i) |
592 | 663 | fh.close() |
593 | 664 | |
594 | 665 | t1 = datetime.datetime.now() |
595 | | - print 'Processing took %s' % (t1 - t0) |
| 666 | + print 'Processing of %s took %s' % (filename, (t1 - t0)) |
596 | 667 | t0 = t1 |
597 | 668 | |
598 | 669 | if dataset == 'training': |
599 | 670 | cache.empty() |
600 | 671 | cache.stats.summary() |
601 | | - print 'Finished parsing bz2 archives' |
602 | 672 | else: |
603 | 673 | location = os.getcwd() |
604 | | - filename = 'counts_%s.bin' % id |
| 674 | + keys = counts.keys() |
| 675 | + filename = 'counts_%s.csv' % filename |
| 676 | + fh = file_utils.create_txt_filehandle(location, filename, 'w', 'utf-8') |
| 677 | + file_utils.write_dict_to_csv(counts, fh, keys) |
| 678 | + fh.close() |
| 679 | + |
| 680 | + filename = 'counts_%s.bin' % filename |
605 | 681 | file_utils.store_object(counts, location, filename) |
606 | 682 | |
| 683 | + print 'Finished parsing bz2 archives' |
607 | 684 | |
| 685 | + |
608 | 686 | def setup(storage, rts=None): |
| 687 | + ''' |
| 688 | + Depending on the storage system selected (cassandra, csv or mongo) some |
| 689 | + preparations are made including setting up namespaces and cleaning up old |
| 690 | + files. |
| 691 | + ''' |
609 | 692 | keyspace_name = 'enwiki' |
610 | 693 | if storage == 'cassandra': |
611 | 694 | cassandra.install_schema(keyspace_name, drop_first=True) |
— | — | @@ -624,35 +707,34 @@ |
625 | 708 | #files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2'] |
626 | 709 | #files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2'] |
627 | 710 | |
628 | | - files = file_utils.retrieve_file_list(path, extension) |
| 711 | + files = file_utils.retrieve_file_list(rts.location, extension) |
| 712 | + #files = files[0:1] |
629 | 713 | |
630 | | - for file in files: |
631 | | - filename = os.path.join(path, file) |
| 714 | + for filename in files: |
| 715 | + filename = os.path.join(path, filename) |
632 | 716 | print filename |
633 | 717 | input_queue.put(filename) |
634 | 718 | |
635 | 719 | for x in xrange(processors): |
636 | 720 | input_queue.put(None) |
637 | 721 | |
638 | | - extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, id, function, dataset, locks, rts]) |
639 | | - for id in xrange(processors)] |
| 722 | + extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, |
| 723 | + process_id, function, |
| 724 | + dataset, locks, rts]) |
| 725 | + for process_id in xrange(processors)] |
640 | 726 | for extracter in extracters: |
641 | 727 | extracter.start() |
642 | 728 | |
643 | 729 | input_queue.join() |
| 730 | + #filehandles = [fh.close() for fh in filehandles] |
| 731 | + #fh_titles.close() |
| 732 | + #fh_comments.close() |
644 | 733 | |
645 | 734 | |
646 | | - |
647 | | -def debug(): |
648 | | - path = '/mnt/wikipedia_dumps/batch2/' |
649 | | - files = file_utils.retrieve_file_list(path, 'bz2') |
650 | | - for file in files: |
651 | | - filename = os.path.join(path, file) |
652 | | - unzip(filename) |
653 | | - |
654 | | - |
655 | 735 | def launcher_training(): |
656 | | - # launcher for creating training data |
| 736 | + ''' |
| 737 | + Launcher for creating training dataset for data competition |
| 738 | + ''' |
657 | 739 | path = '/mnt/wikipedia_dumps/batch2/' |
658 | 740 | function = create_variables |
659 | 741 | storage = 'csv' |
— | — | @@ -664,7 +746,9 @@ |
665 | 747 | |
666 | 748 | |
667 | 749 | def launcher_prediction(): |
668 | | - # launcher for creating test data |
| 750 | + ''' |
| 751 | + Launcher for creating prediction dataset for datacompetition |
| 752 | + ''' |
669 | 753 | path = '/mnt/wikipedia_dumps/batch1/' |
670 | 754 | function = count_edits |
671 | 755 | storage = 'csv' |
— | — | @@ -676,6 +760,9 @@ |
677 | 761 | |
678 | 762 | |
679 | 763 | def launcher(rts): |
| 764 | + ''' |
| 765 | + This is the generic entry point for regular Wikilytics usage. |
| 766 | + ''' |
680 | 767 | # launcher for creating regular mongo dataset |
681 | 768 | path = rts.location |
682 | 769 | function = create_variables |
— | — | @@ -694,4 +781,4 @@ |
695 | 782 | if __name__ == '__main__': |
696 | 783 | #launcher_training() |
697 | 784 | #launcher_prediction() |
698 | | - launcher() |
| 785 | + launcher(rts) |
Index: trunk/tools/editor_trends/etl/shaper.py |
— | — | @@ -50,7 +50,7 @@ |
51 | 51 | ''' |
52 | 52 | data = {} |
53 | 53 | for x in xrange(first_year, final_year): |
54 | | - data[x] = add_datatype(datatype) |
| 54 | + data[str(x)] = add_datatype(datatype) |
55 | 55 | return data |
56 | 56 | |
57 | 57 | def add_windows_to_datacontainer(datacontainer, windows): |
Index: trunk/tools/editor_trends/etl/transformer.py |
— | — | @@ -70,20 +70,37 @@ |
71 | 71 | first_year, final_year = determine_year_range(edits) |
72 | 72 | monthly_edits = determine_edits_by_month(edits, first_year, final_year) |
73 | 73 | monthly_edits = db.stringify_keys(monthly_edits) |
| 74 | + |
| 75 | + edits_by_year = determine_edits_by_year(edits, first_year, final_year) |
| 76 | + edits_by_year = db.stringify_keys(edits_by_year) |
| 77 | + |
| 78 | + last_edit_by_year = determine_last_edit_by_year(edits, first_year, final_year) |
| 79 | + last_edit_by_year = db.stringify_keys(last_edit_by_year) |
| 80 | + |
| 81 | + articles_edited = determine_articles_workedon(edits, first_year, final_year) |
| 82 | + articles_edited = db.stringify_keys(articles_edited) |
| 83 | + |
| 84 | + articles_by_year = determine_articles_by_year(articles_edited, first_year, final_year) |
| 85 | + articles_by_year = db.stringify_keys(articles_by_year) |
| 86 | + |
| 87 | + namespaces_edited = determine_namespaces_workedon(edits, first_year, final_year) |
| 88 | + namespaces_edited = db.stringify_keys(namespaces_edited) |
| 89 | + |
| 90 | + character_counts = determine_edit_volume(edits, first_year, final_year) |
| 91 | + character_counts = db.stringify_keys(character_counts) |
| 92 | + |
| 93 | + count_reverts = determine_number_reverts(edits, first_year, final_year) |
| 94 | + count_reverts = db.stringify_keys(count_reverts) |
| 95 | + |
74 | 96 | edits = sort_edits(edits) |
75 | | - edit_count = len(edits) |
| 97 | + edit_count = determine_number_edits(edits, first_year, final_year) |
| 98 | + |
76 | 99 | if len(edits) > cutoff: |
77 | 100 | new_wikipedian = edits[cutoff]['date'] |
78 | 101 | else: |
79 | 102 | new_wikipedian = False |
80 | 103 | first_edit = edits[0]['date'] |
81 | 104 | final_edit = edits[-1]['date'] |
82 | | - edits_by_year = determine_edits_by_year(edits, first_year, final_year) |
83 | | - edits_by_year = db.stringify_keys(edits_by_year) |
84 | | - last_edit_by_year = determine_last_edit_by_year(edits, first_year, final_year) |
85 | | - last_edit_by_year = db.stringify_keys(last_edit_by_year) |
86 | | - articles_by_year = determine_articles_by_year(edits, first_year, final_year) |
87 | | - articles_by_year = db.stringify_keys(articles_by_year) |
88 | 105 | edits = edits[:cutoff] |
89 | 106 | |
90 | 107 | self.output_db.insert({'editor': self.id, |
— | — | @@ -96,10 +113,75 @@ |
97 | 114 | 'articles_by_year': articles_by_year, |
98 | 115 | 'monthly_edits': monthly_edits, |
99 | 116 | 'last_edit_by_year': last_edit_by_year, |
100 | | - 'username': username |
| 117 | + 'username': username, |
| 118 | + 'articles_edited': articles_edited, |
| 119 | + 'namespaces_edited': namespaces_edited, |
| 120 | + 'character_counts': character_counts, |
101 | 121 | }, safe=True) |
102 | 122 | |
103 | 123 | |
| 124 | +def determine_number_edits(edits, first_year, final_year): |
| 125 | + count = 0 |
| 126 | + for year in edits: |
| 127 | + for edit in edits[year]: |
| 128 | + if edit['ns'] == 0: |
| 129 | + count += 1 |
| 130 | + return count |
| 131 | + |
| 132 | + |
| 133 | +def determine_articles_workedon(edits, first_year, final_year): |
| 134 | + dc = shaper.create_datacontainer(first_year, final_year) |
| 135 | + dc = shaper.add_months_to_datacontainer(dc, 'set') |
| 136 | + for year in edits: |
| 137 | + for edit in edits[year]: |
| 138 | + month = edit['date'].month |
| 139 | + dc[year][month].add(edit['article']) |
| 140 | + |
| 141 | + for year in dc: |
| 142 | + for month in dc[year]: |
| 143 | + dc[year][month] = list(dc[year][month]) |
| 144 | + return dc |
| 145 | + |
| 146 | + |
| 147 | +def determine_namespaces_workedon(edits, first_year, final_year): |
| 148 | + dc = shaper.create_datacontainer(first_year, final_year) |
| 149 | + dc = shaper.add_months_to_datacontainer(dc, 'set') |
| 150 | + for year in edits: |
| 151 | + for edit in edits[year]: |
| 152 | + month = edit['date'].month |
| 153 | + dc[year][month].add(edit['ns']) |
| 154 | + for year in dc: |
| 155 | + for month in dc[year]: |
| 156 | + dc[year][month] = list(dc[year][month]) |
| 157 | + return dc |
| 158 | + |
| 159 | + |
| 160 | +def determine_number_reverts(edits, first_year, final_year): |
| 161 | + dc = shaper.create_datacontainer(first_year, final_year) |
| 162 | + dc = shaper.add_months_to_datacontainer(dc, 0) |
| 163 | + for year in edits: |
| 164 | + for edit in edits[year]: |
| 165 | + month = edit['date'].month |
| 166 | + if edit['revert']: |
| 167 | + dc[year][month] += 1 |
| 168 | + return dc |
| 169 | + |
| 170 | + |
| 171 | +def determine_edit_volume(edits, first_year, final_year): |
| 172 | + dc = shaper.create_datacontainer(first_year, final_year) |
| 173 | + dc = shaper.add_months_to_datacontainer(dc, 'dict') |
| 174 | + for year in edits: |
| 175 | + for edit in edits[year]: |
| 176 | + month = edit['date'].month |
| 177 | + dc[year][month].setdefault('added', 0) |
| 178 | + dc[year][month].setdefault('removed', 0) |
| 179 | + if edit['delta'] < 0: |
| 180 | + dc[year][month]['removed'] += edit['delta'] |
| 181 | + elif edit['delta'] > 0: |
| 182 | + dc[year][month]['added'] += edit['delta'] |
| 183 | + return dc |
| 184 | + |
| 185 | + |
104 | 186 | def determine_year_range(edits): |
105 | 187 | years = [year for year in edits if edits[year] != []] |
106 | 188 | first_year = int(min(years)) |
— | — | @@ -109,12 +191,13 @@ |
110 | 192 | |
111 | 193 | def determine_last_edit_by_year(edits, first_year, final_year): |
112 | 194 | dc = shaper.create_datacontainer(first_year, final_year, 0) |
113 | | - for edit in edits: |
114 | | - edit = edit['date'] |
115 | | - if dc[edit.year] == 0: |
116 | | - dc[edit.year] = edit |
117 | | - elif dc[edit.year] < edit: |
118 | | - dc[edit.year] = edit |
| 195 | + for year in edits: |
| 196 | + for edit in edits[year]: |
| 197 | + date = str(edit['date'].year) |
| 198 | + if dc[date] == 0: |
| 199 | + dc[date] = edit |
| 200 | + elif dc[date] < edit: |
| 201 | + dc[date] = edit |
119 | 202 | return dc |
120 | 203 | |
121 | 204 | |
— | — | @@ -124,7 +207,7 @@ |
125 | 208 | for year in edits: |
126 | 209 | for edit in edits[year]: |
127 | 210 | m = edit['date'].month |
128 | | - dc[int(year)][m] += 1 |
| 211 | + dc[year][m] += 1 |
129 | 212 | return dc |
130 | 213 | |
131 | 214 | |
— | — | @@ -133,24 +216,25 @@ |
134 | 217 | This function counts the number of edits by year made by a particular editor. |
135 | 218 | ''' |
136 | 219 | dc = shaper.create_datacontainer(first_year, final_year, 0) |
137 | | - for edit in edits: |
138 | | - year = edit['date'].year |
139 | | - dc[year] += 1 |
| 220 | + for year in edits: |
| 221 | + for edit in edits[year]: |
| 222 | + year = str(edit['date'].year) |
| 223 | + dc[year] += 1 |
140 | 224 | return dc |
141 | 225 | |
142 | 226 | |
143 | | -def determine_articles_by_year(dates, first_year, final_year): |
| 227 | +def determine_articles_by_year(articles_edited, first_year, final_year): |
144 | 228 | ''' |
145 | 229 | This function counts the number of unique articles by year edited by a |
146 | 230 | particular editor. |
147 | 231 | ''' |
148 | | - articles = shaper.create_datacontainer(first_year, final_year, 'set') |
149 | | - for date in dates: |
150 | | - year = date['date'].year |
151 | | - articles[year].add(date['article']) |
152 | | - for year in articles: |
153 | | - articles[year] = len(articles[year]) |
154 | | - return articles |
| 232 | + dc = shaper.create_datacontainer(first_year, final_year) |
| 233 | + for year in articles_edited: |
| 234 | + edits = set() |
| 235 | + for month in articles_edited[year]: |
| 236 | + edits.update(articles_edited[year][month]) |
| 237 | + dc[year] = len(edits) |
| 238 | + return dc |
155 | 239 | |
156 | 240 | |
157 | 241 | def sort_edits(edits): |
— | — | @@ -160,9 +244,6 @@ |
161 | 245 | |
162 | 246 | def transform_editors_multi_launcher(rts): |
163 | 247 | ids = db.retrieve_distinct_keys(rts.dbname, rts.editors_raw, 'editor') |
164 | | -# kwargs = {'definition': 'traditional', |
165 | | -# 'pbar': True, |
166 | | -# } |
167 | 248 | tasks = multiprocessing.JoinableQueue() |
168 | 249 | consumers = [EditorConsumer(tasks, None) for i in xrange(rts.number_of_processes)] |
169 | 250 | |
— | — | @@ -181,6 +262,7 @@ |
182 | 263 | def setup_database(rts): |
183 | 264 | mongo = db.init_mongo_db(rts.dbname) |
184 | 265 | input_db = mongo[rts.editors_raw] |
| 266 | + db.drop_collection(rts.dbname, rts.editors_dataset) |
185 | 267 | output_db = mongo[rts.editors_dataset] |
186 | 268 | |
187 | 269 | output_db.ensure_index('editor') |
Index: trunk/tools/editor_trends/database/cache.py |
— | — | @@ -69,7 +69,7 @@ |
70 | 70 | else: |
71 | 71 | value.pop('username') |
72 | 72 | |
73 | | - year = value['date'].year |
| 73 | + year = str(value['date'].year) |
74 | 74 | self.editors[key]['edits'][year].append(value) |
75 | 75 | self.editors[key]['obs'] += 1 |
76 | 76 | |