r85805 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r85804‎ | r85805 | r85806 >
Date:17:30, 11 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Major refactoring of the extracter code. Split previous file in three components:
1) buffer.py containing the CVSBuffer class
2) variables.py containing the actual logic for extracting information and constructing variables
3) extracter.py entry point for manage.py and setting up the different process.

This should make it easier to reuse code, fix a memory leak and overall improve performance.
Modified paths:
  • /trunk/tools/editor_trends/classes/buffer.py (added) (history)
  • /trunk/tools/editor_trends/etl/enricher.py (deleted) (history)
  • /trunk/tools/editor_trends/etl/extracter.py (replaced) (history)
  • /trunk/tools/editor_trends/etl/sort.py (modified) (history)
  • /trunk/tools/editor_trends/etl/store.py (modified) (history)
  • /trunk/tools/editor_trends/etl/variables.py (added) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/enricher.py
@@ -1,823 +0,0 @@
2 -#!/usr/bin/python
3 -# -*- coding: utf-8 -*-
4 -'''
5 -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
6 -This program is free software; you can redistribute it and/or
7 -modify it under the terms of the GNU General Public License version 2
8 -as published by the Free Software Foundation.
9 -This program is distributed in the hope that it will be useful,
10 -but WITHOUT ANY WARRANTY; without even the implied warranty of
11 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 -See the GNU General Public License for more details, at
13 -http://www.fsf.org/licenses/gpl.html
14 -'''
15 -
16 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__email__ = 'dvanliere at gmail dot com'
18 -__date__ = '2011-02-06'
19 -__version__ = '0.1'
20 -
21 -
22 -import os
23 -import hashlib
24 -import codecs
25 -import sys
26 -import itertools
27 -import datetime
28 -import progressbar
29 -from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager
30 -from xml.etree.cElementTree import iterparse, dump
31 -from collections import deque
32 -
33 -if '..' not in sys.path:
34 - sys.path.append('..')
35 -
36 -from classes import storage
37 -from analyses.adhoc import bot_detector
38 -from utils import file_utils
39 -
40 -EXCLUDE_NAMESPACE = {
41 - #0:'Main',
42 - #1:'Talk',
43 - #2:'User',
44 - #3:'User talk',
45 - #4:'Wikipedia',
46 - #5:'Wikipedia talk',
47 - 6:'File',
48 - #7:'File talk',
49 - 8:'MediaWiki',
50 - #9:'MediaWiki talk',
51 - 10:'Template',
52 - #11:'Template talk',
53 - 12:'Help',
54 - #13:'Help talk',
55 - 14:'Category',
56 - #15:'Category talk',
57 - 90:'Thread',
58 - #91:'Thread talk',
59 - 92:'Summary',
60 - #93:'Summary talk',
61 - 100:'Portal',
62 - #101:'Portal talk',
63 - 108:'Book',
64 - #109:'Book talk'
65 -}
66 -
67 -COUNT_EXCLUDE_NAMESPACE = {
68 - 1:'Talk',
69 - 2:'User',
70 - 4:'Wikipedia',
71 - 6:'File',
72 - 8:'MediaWiki',
73 - 10:'Template',
74 - 12:'Help',
75 - 14:'Category',
76 - 90:'Thread',
77 - 92:'Summary',
78 - 100:'Portal',
79 - 108:'Book',
80 -}
81 -
82 -
83 -class CustomLock:
84 - def __init__(self, lock, open_handles):
85 - self.lock = lock
86 - self.open_handles = open_handles
87 -
88 - def available(self, handle):
89 - self.lock.acquire()
90 - try:
91 - self.open_handles.index(handle)
92 - #print 'RETRIEVED FILEHANDLE %s' % handle
93 - return False
94 - except (ValueError, Exception), error:
95 - self.open_handles.append(handle)
96 - #print 'ADDED FILEHANDLE %s' % handle
97 - return True
98 - finally:
99 - #print 'FIles locked: %s' % len(self.open_handles)
100 - self.lock.release()
101 -
102 - def release(self, handle):
103 - #print 'RELEASED FILEHANDLE %s' % handle
104 - self.open_handles.remove(handle)
105 -
106 -
107 -class Buffer:
108 - def __init__(self, process_id, rts, lock):
109 - self.rts = rts
110 - self.lock = lock
111 - self.revisions = {}
112 - self.comments = {}
113 - self.articles = {}
114 - self.process_id = process_id
115 - self.count_articles = 0
116 - self.count_revisions = 0
117 - self.filehandles = [file_utils.create_txt_filehandle(self.rts.txt,
118 - file_id, 'a', 'utf-8') for file_id in xrange(self.rts.max_filehandles)]
119 - self.keys = ['revision_id', 'article_id', 'id', 'username', 'namespace',
120 - 'title', 'timestamp', 'hash', 'revert', 'bot', 'cur_size',
121 - 'delta']
122 - self.fh_articles = file_utils.create_txt_filehandle(self.rts.txt,
123 - 'articles_%s' % self.process_id, 'w', 'utf-8')
124 - self.fh_comments = file_utils.create_txt_filehandle(self.rts.txt,
125 - 'comments_%s' % self.process_id, 'w', 'utf-8')
126 -
127 - def get_hash(self, id):
128 - '''
129 - A very simple hash function based on modulo. The except clause has been
130 - added because there are instances where the username is stored in userid
131 - tag and hence that's a string and not an integer.
132 - '''
133 - try:
134 - return int(id) % self.rts.max_filehandles
135 - except ValueError:
136 - return sum([ord(i) for i in id]) % self.rts.max_filehandles
137 -
138 - def invert_dictionary(self, editors):
139 - hashes = {}
140 - for editor, file_id in editors.iteritems():
141 - hashes.setdefault(file_id, [])
142 - hashes[file_id].append(editor)
143 - return hashes
144 -
145 - def group_revisions_by_fileid(self):
146 - '''
147 - This function groups observation by editor id and then by file_id,
148 - this way we have to make fewer file opening calls and should reduce
149 - processing time.
150 - '''
151 - data = {}
152 - editors = {}
153 - #first, we group all revisions by editor
154 -
155 - for revision in self.revisions.values():
156 - row = []
157 - #strip away the keys and make sure that the values are always in the same sequence
158 - for key in self.keys:
159 - row.append(revision[key].decode('utf-8'))
160 - editor_id = row[0]
161 - data.setdefault(editor_id, [])
162 - data[editor_id].append(row)
163 - editors.setdefault(editor_id, self.get_hash(editor_id))
164 -
165 - #now, we are going to group all editors by file_id
166 - file_ids = self.invert_dictionary(editors)
167 - self.revisions = {}
168 - for file_id, editors in file_ids.iteritems():
169 - for editor in editors:
170 - self.revisions.setdefault(file_id, [])
171 - self.revisions[file_id].extend(data[editor])
172 -
173 - def add(self, revision):
174 - self.stringify(revision)
175 - id = revision['revision_id']
176 - self.revisions[id] = revision
177 - if len(self.revisions) > 10000:
178 - #print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions))
179 - self.store()
180 -
181 -
182 - def stringify(self, revision):
183 - for key, value in revision.iteritems():
184 - try:
185 - value = str(value)
186 - except UnicodeEncodeError:
187 - value = value.encode('utf-8')
188 - revision[key] = value
189 -
190 -
191 - def summary(self):
192 - print 'Worker %s: Number of articles: %s' % (self.process_id, self.count_articles)
193 - print 'Worker %s: Number of revisions: %s' % (self.process_id, self.count_revisions)
194 -
195 - def store(self):
196 - self.write_revisions()
197 - self.write_articles()
198 - self.write_comments()
199 -
200 - def close(self):
201 - self.store()
202 - self.filehandles = [fh.close() for fh in self.filehandles]
203 -
204 - def write_comments(self):
205 - rows = []
206 - try:
207 - for revision_id, comment in self.comments.iteritems():
208 - #comment = comment.decode('utf-8')
209 - #row = '\t'.join([revision_id, comment]) + '\n'
210 - rows.append([revision_id, comment])
211 - file_utils.write_list_to_csv(rows, self.fh_comments)
212 - self.comments = {}
213 - except Exception, error:
214 - print '''Encountered the following error while writing comment data
215 - to %s: %s''' % (self.fh_comments, error)
216 -
217 - def write_articles(self):
218 - #t0 = datetime.datetime.now()
219 - if len(self.articles.keys()) > 10000:
220 - rows = []
221 - try:
222 - for article_id, data in self.articles.iteritems():
223 - keys = data.keys()
224 - keys.insert(0, 'id')
225 -
226 - values = data.values()
227 - values.insert(0, article_id)
228 -
229 - row = zip(keys, values)
230 - row = list(itertools.chain(*row))
231 - #row = '\t'.join([article_id, title]) + '\n'
232 - rows.append(row)
233 - file_utils.write_list_to_csv(rows, self.fh_articles, newline=False)
234 - self.articles = {}
235 - except Exception, error:
236 - print '''Encountered the following error while writing article
237 - data to %s: %s''' % (self.fh_articles, error)
238 - #t1 = datetime.datetime.now()
239 - #print '%s articles took %s' % (len(self.articles.keys()), (t1 - t0))
240 -
241 - def write_revisions(self):
242 - t0 = datetime.datetime.now()
243 - self.group_revisions_by_fileid()
244 - file_ids = self.revisions.keys()
245 - for file_id in file_ids:
246 - wait = True
247 - for i, revision in enumerate(self.revisions[file_id]):
248 - if i == 0:
249 - while wait:
250 - print file_id, self.lock
251 - if self.lock.available(file_id):
252 - fh = self.filehandles[file_id]
253 - wait = False
254 - try:
255 - file_utils.write_list_to_csv(revision, fh)
256 - except Exception, error:
257 - print '''Encountered the following error while writing
258 - revision data to %s: %s''' % (fh, error)
259 - self.lock.release(file_id)
260 - del self.revisions[file_id]
261 - wait = True
262 - t1 = datetime.datetime.now()
263 - print 'Worker %s: %s revisions took %s' % (self.process_id,
264 - len(self.revisions),
265 - (t1 - t0))
266 -
267 -def extract_categories():
268 - '''
269 - Field 1: page id
270 - Field 2: name category
271 - Field 3: sort key
272 - Field 4: timestamp last change
273 - '''
274 - filename = 'C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-categorylinks.sql'
275 - output = codecs.open('categories.csv', 'w', encoding='utf-8')
276 - fh = codecs.open(filename, 'r', encoding='utf-8')
277 -
278 - try:
279 - for line in fh:
280 - if line.startswith('INSERT INTO `categorylinks` VALUES ('):
281 - line = line.replace('INSERT INTO `categorylinks` VALUES (', '')
282 - line = line.replace("'", '')
283 - categories = line.split('),(')
284 - for category in categories:
285 - category = category.split(',')
286 - if len(category) == 4:
287 - output.write('%s\t%s\n' % (category[0], category[1]))
288 - except UnicodeDecodeError, e:
289 - print e
290 -
291 - output.close()
292 - fh.close()
293 -
294 -
295 -def validate_hostname(address):
296 - '''
297 - This is not a foolproof solution at all. The problem is that it's really
298 - hard to determine whether a string is a hostname or not **reliably**. This
299 - is a very fast rule of thumb. Will lead to false positives,
300 - but that's life :)
301 - '''
302 - parts = address.split(".")
303 - if len(parts) > 2:
304 - return True
305 - else:
306 - return False
307 -
308 -
309 -def validate_ip(address):
310 - parts = address.split(".")
311 - if len(parts) != 4:
312 - return False
313 - parts = parts[:3]
314 - for item in parts:
315 - try:
316 - if not 0 <= int(item) <= 255:
317 - return False
318 - except ValueError:
319 - return False
320 - return True
321 -
322 -
323 -def extract_revision_text(revision):
324 - rev = revision.find('ns0:text')
325 - if rev != None:
326 - if rev.text == None:
327 - rev = fix_revision_text(revision)
328 - return rev.text.encode('utf-8')
329 - else:
330 - return ''
331 -
332 -
333 -def parse_title(title):
334 - return title.text
335 -
336 -
337 -def parse_title_meta_data(title, namespace):
338 - '''
339 - This function categorizes an article to assist the Wikimedia Taxonomy
340 - project. See
341 - http://meta.wikimedia.org/wiki/Contribution_Taxonomy_Project/Research_Questions
342 - '''
343 - title_meta = {}
344 - if not namespace:
345 - return title_meta
346 -
347 - title_meta['title'] = title
348 - ns = namespace['namespace']
349 - title_meta['ns'] = ns
350 - if title.startswith('List of'):
351 - title_meta['category'] = 'List'
352 - elif ns == 4 or ns == 5:
353 - if title.find('Articles for deletion') > -1:
354 - title_meta['category'] = 'Deletion'
355 - elif title.find('Mediation Committee') > -1:
356 - title_meta['category'] = 'Mediation'
357 - elif title.find('Mediation Cabal') > -1:
358 - title_meta['category'] = 'Mediation'
359 - elif title.find('Arbitration') > -1:
360 - title_meta['category'] = 'Arbitration'
361 - elif title.find('Featured Articles') > -1:
362 - title_meta['category'] = 'Featured Article'
363 - elif title.find('Featured picture candidates') > -1:
364 - title_meta['category'] = 'Featured Pictures'
365 - elif title.find('Featured sound candidates') > -1:
366 - title_meta['category'] = 'Featured Sounds'
367 - elif title.find('Featured list candidates') > -1:
368 - title_meta['category'] = 'Featured Lists'
369 - elif title.find('Featured portal candidates') > -1:
370 - title_meta['category'] = 'Featured Portal'
371 - elif title.find('Featured topic candidates') > -1:
372 - title_meta['category'] = 'Featured Topic'
373 - elif title.find('Good Article') > -1:
374 - title_meta['category'] = 'Good Article'
375 - return title_meta
376 -
377 -
378 -def extract_username(contributor, xml_namespace):
379 - contributor = contributor.find('%s%s' % (xml_namespace, 'username'))
380 - if contributor != None:
381 - return contributor.text
382 - else:
383 - return None
384 -
385 -
386 -def determine_username_is_bot(contributor, bots, xml_namespace):
387 - '''
388 - #contributor is an xml element containing the id of the contributor
389 - @bots should have a dict with all the bot ids and bot names
390 - @Return False if username id is not in bot dict id or True if username id
391 - is a bot id.
392 - '''
393 - username = contributor.find('%s%s' % (xml_namespace, 'username'))
394 - if username == None:
395 - return 0
396 - else:
397 - if username.text in bots:
398 - return 1
399 - else:
400 - return 0
401 -
402 -
403 -def extract_contributor_id(revision, xml_namespace):
404 - '''
405 - @contributor is the xml contributor node containing a number of attributes
406 - Currently, we are only interested in registered contributors, hence we
407 - ignore anonymous editors.
408 - '''
409 - if revision.get('deleted'):
410 - # ASK: Not sure if this is the best way to code deleted contributors.
411 - return None
412 - elem = revision.find('%s%s' % (xml_namespace, 'id'))
413 - if elem != None:
414 - return {'id':elem.text}
415 - else:
416 - elem = revision.find('%s%s' % (xml_namespace, 'ip'))
417 - if elem == None or elem.text == None:
418 - return None
419 - elif validate_ip(elem.text) == False and validate_hostname(elem.text) == False:
420 - return {'username':elem.text, 'id': elem.text}
421 - else:
422 - return None
423 -
424 -
425 -def fix_revision_text(revision):
426 - if revision.text == None:
427 - revision.text = ''
428 - return revision
429 -
430 -
431 -def create_md5hash(text):
432 - hash = {}
433 - if text != None:
434 - m = hashlib.md5()
435 - m.update(text)
436 - #echo m.digest()
437 - hash['hash'] = m.hexdigest()
438 - else:
439 - hash['hash'] = -1
440 - return hash
441 -
442 -
443 -def calculate_delta_article_size(size, text):
444 - if 'prev_size' not in size:
445 - size['prev_size'] = 0
446 - size['cur_size'] = len(text)
447 - size['delta'] = len(text)
448 - else:
449 - size['prev_size'] = size['cur_size']
450 - delta = len(text) - size['prev_size']
451 - size['cur_size'] = len(text)
452 - size['delta'] = delta
453 - return size
454 -
455 -
456 -def parse_contributor(revision, bots, xml_namespace):
457 - username = extract_username(revision, xml_namespace)
458 - user_id = extract_contributor_id(revision, xml_namespace)
459 - bot = determine_username_is_bot(revision, bots, xml_namespace)
460 - editor = {}
461 - editor['username'] = username
462 - editor['bot'] = bot
463 - if user_id != None:
464 - editor.update(user_id)
465 - else:
466 - editor = False
467 - return editor
468 -
469 -
470 -def determine_namespace(title, namespaces, include_ns, exclude_ns):
471 - '''
472 - You can only determine whether an article belongs to the Main Namespace
473 - by ruling out that it does not belong to any other namepace
474 - '''
475 - ns = {}
476 - if title != None:
477 - for key in include_ns:
478 - namespace = namespaces.get(key)
479 - if namespace and title.startswith(namespace):
480 - ns['namespace'] = key
481 - if ns == {}:
482 - for key in exclude_ns:
483 - namespace = namespaces.get(key)
484 - if namespace and title.startswith(namespace):
485 - '''article does not belong to any of the include_ns
486 - namespaces'''
487 - ns = False
488 - return ns
489 - ns['namespace'] = 0
490 - else:
491 - ns = False
492 - return ns
493 -
494 -
495 -def prefill_row(title, article_id, namespace):
496 - row = {}
497 - row['title'] = title
498 - row['article_id'] = article_id
499 - row.update(namespace)
500 - return row
501 -
502 -
503 -def is_revision_reverted(hash_cur, hashes):
504 - revert = {}
505 - if hash_cur in hashes and hash_cur != -1:
506 - revert['revert'] = 1
507 - else:
508 - revert['revert'] = 0
509 - return revert
510 -
511 -
512 -def extract_revision_id(revision_id):
513 - if revision_id != None:
514 - return revision_id.text
515 - else:
516 - return None
517 -
518 -
519 -def extract_comment_text(revision_id, revision):
520 - comment = {}
521 - text = revision.find('comment')
522 - if text != None and text.text != None:
523 - comment[revision_id] = text.text.encode('utf-8')
524 - return comment
525 -
526 -
527 -def create_namespace_dict(siteinfo, xml_namespace):
528 - '''
529 - This function determines the local names of the different namespaces.
530 - '''
531 - namespaces = {}
532 - print 'Detected xml namespace: %s' % xml_namespace
533 - print 'Constructing namespace dictionary'
534 - elements = siteinfo.find('%s%s' % (xml_namespace, 'namespaces'))
535 - for elem in elements.getchildren():
536 - key = int(elem.get('key'))
537 - namespaces[key] = elem.text #extract_text(ns)
538 - text = elem.text if elem.text != None else ''
539 - try:
540 - print key, text.encode('utf-8')
541 - except UnicodeEncodeError:
542 - print key
543 - elem.clear()
544 - siteinfo.clear()
545 - if namespaces == {}:
546 - sys.exit(-1)
547 - return namespaces
548 -
549 -
550 -def determine_xml_namespace(siteinfo):
551 - '''
552 - This function determines the xml_namespace version
553 - '''
554 - for elem in siteinfo :
555 - if elem.tag.endswith('sitename'):
556 - xml_namespace = elem.tag
557 - pos = xml_namespace.find('sitename')
558 - xml_namespace = xml_namespace[0:pos]
559 - elem.clear()
560 - return xml_namespace
561 - else:
562 - elem.clear()
563 -
564 -
565 -def count_edits(article, counts, bots, xml_namespace):
566 - title = parse_title(article['title'])
567 - namespace = determine_namespace(title, {}, COUNT_EXCLUDE_NAMESPACE)
568 - if namespace != False:
569 - article_id = article['id'].text
570 - revisions = article['revisions']
571 - for revision in revisions:
572 - if revision == None:
573 - #the entire revision is empty, weird.
574 - continue
575 -
576 - contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
577 - contributor = parse_contributor(contributor, bots)
578 - if not contributor:
579 - #editor is anonymous, ignore
580 - continue
581 - counts.setdefault(contributor['username'], 0)
582 - counts[contributor['username']] += 1
583 - revision.clear()
584 - return counts
585 -
586 -
587 -def create_variables(article, cache, bots, xml_namespace, comments=False):
588 - include_ns = {3: 'User Talk',
589 - 5: 'Wikipedia Talk',
590 - 1: 'Talk',
591 - 2: 'User',
592 - 4: 'Wikipedia'}
593 -
594 - title = parse_title(article['title'])
595 - namespaces = article['namespaces']
596 - namespace = determine_namespace(title, namespaces, include_ns, EXCLUDE_NAMESPACE)
597 - title_meta = parse_title_meta_data(title, namespace)
598 - if namespace != False:
599 - cache.count_articles += 1
600 - article_id = article['id'].text
601 - article['id'].clear()
602 - cache.articles[article_id] = title_meta
603 - hashes = deque()
604 - size = {}
605 - revisions = article['revisions']
606 - for revision in revisions:
607 - cache.count_revisions += 1
608 - if revision == None:
609 - #the entire revision is empty, weird.
610 - continue
611 - #dump(revision)
612 - contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
613 - contributor = parse_contributor(contributor, bots, xml_namespace)
614 - if not contributor:
615 - #editor is anonymous, ignore
616 - continue
617 - revision_id = revision.find('%s%s' % (xml_namespace, 'id'))
618 - revision_id = extract_revision_id(revision_id)
619 - if revision_id == None:
620 - #revision_id is missing, which is weird
621 - continue
622 -
623 - row = prefill_row(title, article_id, namespace)
624 - row['revision_id'] = revision_id
625 - text = extract_revision_text(revision)
626 - row.update(contributor)
627 -
628 - if comments:
629 - comment = extract_comment_text(revision_id, revision)
630 - cache.comments.update(comment)
631 -
632 - timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text
633 - row['timestamp'] = timestamp
634 -
635 - hash = create_md5hash(text)
636 - revert = is_revision_reverted(hash['hash'], hashes)
637 - hashes.append(hash['hash'])
638 - size = calculate_delta_article_size(size, text)
639 -
640 - row.update(hash)
641 - row.update(size)
642 - row.update(revert)
643 - cache.add(row)
644 - revision.clear()
645 -
646 -
647 -def parse_xml(fh, rts):
648 - context = iterparse(fh, events=('end',))
649 - context = iter(context)
650 -
651 - article = {}
652 - article['revisions'] = []
653 - elements = []
654 - id = False
655 - ns = False
656 -
657 - try:
658 - for event, elem in context:
659 - if event == 'end' and elem.tag.endswith('siteinfo'):
660 - xml_namespace = determine_xml_namespace(elem)
661 - namespaces = create_namespace_dict(elem, xml_namespace)
662 - article['namespaces'] = namespaces
663 - ns = True
664 - elif event == 'end' and elem.tag.endswith('title'):
665 - article['title'] = elem
666 - elif event == 'end' and elem.tag.endswith('revision'):
667 - article['revisions'].append(elem)
668 - elif event == 'end' and elem.tag.endswith('id') and id == False:
669 - article['id'] = elem
670 - id = True
671 - elif event == 'end' and elem.tag.endswith('page'):
672 - yield article, xml_namespace
673 - elem.clear()
674 - article = {}
675 - article['revisions'] = []
676 - article['namespaces'] = namespaces
677 - id = False
678 - #elif event == 'end' and ns == True:
679 - # elem.clear()
680 - except SyntaxError, error:
681 - print 'Encountered invalid XML tag. Error message: %s' % error
682 - dump(elem)
683 - sys.exit(-1)
684 - except IOError, error:
685 - print '''Archive file is possibly corrupted. Please delete this archive
686 - and retry downloading. Error message: %s''' % error
687 - sys.exit(-1)
688 -
689 -def stream_raw_xml(input_queue, process_id, function, dataset, lock, rts):
690 - bots = bot_detector.retrieve_bots(rts.language.code)
691 -
692 - t0 = datetime.datetime.now()
693 - i = 0
694 - if dataset == 'training':
695 - cache = Buffer(process_id, rts, lock)
696 - else:
697 - counts = {}
698 -
699 - while True:
700 - filename = input_queue.get()
701 - input_queue.task_done()
702 - if filename == None:
703 - print '%s files left in the queue' % input_queue.qsize()
704 - break
705 -
706 - fh = file_utils.create_streaming_buffer(filename)
707 - filename = os.path.split(filename)[1]
708 - filename = os.path.splitext(filename)[0]
709 - for article, xml_namespace in parse_xml(fh, rts):
710 - if dataset == 'training':
711 - function(article, cache, bots, xml_namespace)
712 - elif dataset == 'prediction':
713 - counts = function(article, counts, bots, xml_namespace)
714 - i += 1
715 - if i % 10000 == 0:
716 - print 'Worker %s parsed %s articles' % (process_id, i)
717 - fh.close()
718 -
719 - t1 = datetime.datetime.now()
720 - print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0))
721 - print 'There are %s files left in the queue' % (input_queue.qsize())
722 - t0 = t1
723 -
724 - if dataset == 'training':
725 - cache.close()
726 - cache.summary()
727 - else:
728 - location = os.getcwd()
729 - keys = counts.keys()
730 - filename = 'counts_%s.csv' % filename
731 - fh = file_utils.create_txt_filehandle(location, filename, 'w', 'utf-8')
732 - file_utils.write_dict_to_csv(counts, fh, keys)
733 - fh.close()
734 -
735 - filename = 'counts_%s.bin' % filename
736 - file_utils.store_object(counts, location, filename)
737 -
738 - print 'Finished parsing Wikipedia dump files.'
739 -
740 -
741 -def setup(rts):
742 - '''
743 - Depending on the storage system selected (cassandra, csv or mongo) some
744 - preparations are made including setting up namespaces and cleaning up old
745 - files.
746 - '''
747 - res = file_utils.delete_file(rts.txt, None, directory=True)
748 - if res:
749 - res = file_utils.create_directory(rts.txt)
750 -
751 -
752 -def multiprocessor_launcher(function, dataset, lock, rts):
753 - mgr = Manager()
754 - open_handles = []
755 - open_handles = mgr.list(open_handles)
756 - clock = CustomLock(lock, open_handles)
757 - input_queue = JoinableQueue()
758 -
759 - files = file_utils.retrieve_file_list(rts.input_location)
760 -
761 - if len(files) > cpu_count():
762 - processors = cpu_count() - 1
763 - else:
764 - processors = len(files)
765 -
766 - for filename in files:
767 - filename = os.path.join(rts.input_location, filename)
768 - print filename
769 - input_queue.put(filename)
770 -
771 - for x in xrange(processors):
772 - print 'Inserting poison pill %s...' % x
773 - input_queue.put(None)
774 -
775 - extracters = [Process(target=stream_raw_xml, args=[input_queue,
776 - process_id, function,
777 - dataset, clock, rts])
778 - for process_id in xrange(processors)]
779 - for extracter in extracters:
780 - extracter.start()
781 -
782 - input_queue.join()
783 -
784 -
785 -def launcher_training():
786 - '''
787 - Launcher for creating training dataset for data competition
788 - '''
789 - path = '/mnt/wikipedia_dumps/batch2/'
790 - function = create_variables
791 - dataset = 'training'
792 - rts = DummyRTS(path)
793 - locks = []
794 - multiprocessor_launcher(function, dataset, locks, rts)
795 -
796 -
797 -def launcher_prediction():
798 - '''
799 - Launcher for creating prediction dataset for datacompetition
800 - '''
801 - path = '/mnt/wikipedia_dumps/batch1/'
802 - function = count_edits
803 - dataset = 'prediction'
804 - rts = DummyRTS(path)
805 - locks = []
806 - multiprocessor_launcher(function, dataset, locks, rts)
807 -
808 -
809 -def launcher(rts):
810 - '''
811 - This is the generic entry point for regular Wikilytics usage.
812 - '''
813 - # launcher for creating regular mongo dataset
814 - function = create_variables
815 - dataset = 'training'
816 - lock = RLock()
817 - setup(rts)
818 - multiprocessor_launcher(function, dataset, lock, rts)
819 -
820 -
821 -if __name__ == '__main__':
822 - launcher_training()
823 - #launcher_prediction()
824 - #launcher(rts)
Index: trunk/tools/editor_trends/etl/variables.py
@@ -0,0 +1,280 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+
 17+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 18+__author__email = 'dvanliere at gmail dot com'
 19+__date__ = '2011-04-10'
 20+__version__ = '0.1'
 21+
 22+import hashlib
 23+
 24+def validate_hostname(address):
 25+ '''
 26+ This is not a foolproof solution at all. The problem is that it's really
 27+ hard to determine whether a string is a hostname or not **reliably**. This
 28+ is a very fast rule of thumb. Will lead to false positives,
 29+ but that's life :)
 30+ '''
 31+ parts = address.split(".")
 32+ if len(parts) > 2:
 33+ return True
 34+ else:
 35+ return False
 36+
 37+
 38+def validate_ip(address):
 39+ parts = address.split(".")
 40+ if len(parts) != 4:
 41+ return False
 42+ parts = parts[:3]
 43+ for item in parts:
 44+ try:
 45+ if not 0 <= int(item) <= 255:
 46+ return False
 47+ except ValueError:
 48+ return False
 49+ return True
 50+
 51+
 52+def extract_revision_text(revision):
 53+ rev = revision.find('ns0:text')
 54+ if rev != None:
 55+ if rev.text == None:
 56+ rev = fix_revision_text(revision)
 57+ return rev.text.encode('utf-8')
 58+ else:
 59+ return ''
 60+
 61+
 62+def parse_title(title):
 63+ return title.text
 64+
 65+
 66+def parse_title_meta_data(title, namespace):
 67+ '''
 68+ This function categorizes an article to assist the Wikimedia Taxonomy
 69+ project. See
 70+ http://meta.wikimedia.org/wiki/Contribution_Taxonomy_Project/Research_Questions
 71+ '''
 72+ title_meta = {}
 73+ if not namespace:
 74+ return title_meta
 75+
 76+ title_meta['title'] = title
 77+ ns = namespace['namespace']
 78+ title_meta['ns'] = ns
 79+ if title.startswith('List of'):
 80+ title_meta['category'] = 'List'
 81+ elif ns == 4 or ns == 5:
 82+ if title.find('Articles for deletion') > -1:
 83+ title_meta['category'] = 'Deletion'
 84+ elif title.find('Mediation Committee') > -1:
 85+ title_meta['category'] = 'Mediation'
 86+ elif title.find('Mediation Cabal') > -1:
 87+ title_meta['category'] = 'Mediation'
 88+ elif title.find('Arbitration') > -1:
 89+ title_meta['category'] = 'Arbitration'
 90+ elif title.find('Featured Articles') > -1:
 91+ title_meta['category'] = 'Featured Article'
 92+ elif title.find('Featured picture candidates') > -1:
 93+ title_meta['category'] = 'Featured Pictures'
 94+ elif title.find('Featured sound candidates') > -1:
 95+ title_meta['category'] = 'Featured Sounds'
 96+ elif title.find('Featured list candidates') > -1:
 97+ title_meta['category'] = 'Featured Lists'
 98+ elif title.find('Featured portal candidates') > -1:
 99+ title_meta['category'] = 'Featured Portal'
 100+ elif title.find('Featured topic candidates') > -1:
 101+ title_meta['category'] = 'Featured Topic'
 102+ elif title.find('Good Article') > -1:
 103+ title_meta['category'] = 'Good Article'
 104+ return title_meta
 105+
 106+
 107+def extract_username(contributor, xml_namespace):
 108+ contributor = contributor.find('%s%s' % (xml_namespace, 'username'))
 109+ if contributor != None:
 110+ return contributor.text
 111+ else:
 112+ return None
 113+
 114+
 115+def determine_username_is_bot(contributor, bots, xml_namespace):
 116+ '''
 117+ #contributor is an xml element containing the id of the contributor
 118+ @bots should have a dict with all the bot ids and bot names
 119+ @Return False if username id is not in bot dict id or True if username id
 120+ is a bot id.
 121+ '''
 122+ username = contributor.find('%s%s' % (xml_namespace, 'username'))
 123+ if username == None:
 124+ return 0
 125+ else:
 126+ if username.text in bots:
 127+ return 1
 128+ else:
 129+ return 0
 130+
 131+
 132+def extract_contributor_id(revision, xml_namespace):
 133+ '''
 134+ @contributor is the xml contributor node containing a number of attributes
 135+ Currently, we are only interested in registered contributors, hence we
 136+ ignore anonymous editors.
 137+ '''
 138+ if revision.get('deleted'):
 139+ # ASK: Not sure if this is the best way to code deleted contributors.
 140+ return None
 141+ elem = revision.find('%s%s' % (xml_namespace, 'id'))
 142+ if elem != None:
 143+ return {'id':elem.text}
 144+ else:
 145+ elem = revision.find('%s%s' % (xml_namespace, 'ip'))
 146+ if elem == None or elem.text == None:
 147+ return None
 148+ elif validate_ip(elem.text) == False and validate_hostname(elem.text) == False:
 149+ return {'username':elem.text, 'id': elem.text}
 150+ else:
 151+ return None
 152+
 153+
 154+def fix_revision_text(revision):
 155+ if revision.text == None:
 156+ revision.text = ''
 157+ return revision
 158+
 159+
 160+def create_md5hash(text):
 161+ hash = {}
 162+ if text != None:
 163+ m = hashlib.md5()
 164+ m.update(text)
 165+ #echo m.digest()
 166+ hash['hash'] = m.hexdigest()
 167+ else:
 168+ hash['hash'] = -1
 169+ return hash
 170+
 171+
 172+def calculate_delta_article_size(size, text):
 173+ if 'prev_size' not in size:
 174+ size['prev_size'] = 0
 175+ size['cur_size'] = len(text)
 176+ size['delta'] = len(text)
 177+ else:
 178+ size['prev_size'] = size['cur_size']
 179+ delta = len(text) - size['prev_size']
 180+ size['cur_size'] = len(text)
 181+ size['delta'] = delta
 182+ return size
 183+
 184+
 185+def parse_contributor(revision, bots, xml_namespace):
 186+ username = extract_username(revision, xml_namespace)
 187+ user_id = extract_contributor_id(revision, xml_namespace)
 188+ bot = determine_username_is_bot(revision, bots, xml_namespace)
 189+ editor = {}
 190+ editor['username'] = username
 191+ editor['bot'] = bot
 192+ if user_id != None:
 193+ editor.update(user_id)
 194+ else:
 195+ editor = False
 196+ return editor
 197+
 198+
 199+def determine_namespace(title, namespaces, include_ns):
 200+ '''
 201+ You can only determine whether an article belongs to the Main Namespace
 202+ by ruling out that it does not belong to any other namepace
 203+ '''
 204+ ns = {}
 205+ if title != None:
 206+ for key in include_ns:
 207+ namespace = namespaces.pop(key, None)
 208+ if namespace and title.startswith(namespace):
 209+ ns['namespace'] = key
 210+ if ns == {}:
 211+ for namespace in namespaces.itervalues():
 212+ if namespace and title.startswith(namespace):
 213+ '''article does not belong to any of the include_ns
 214+ namespaces'''
 215+ return False
 216+ ns = 0
 217+ else:
 218+ ns = False
 219+ return ns
 220+
 221+
 222+def is_revision_reverted(hash_cur, hashes):
 223+ revert = {}
 224+ if hash_cur in hashes and hash_cur != -1:
 225+ revert['revert'] = 1
 226+ else:
 227+ revert['revert'] = 0
 228+ return revert
 229+
 230+
 231+def extract_revision_id(revision_id):
 232+ if revision_id != None:
 233+ return revision_id.text
 234+ else:
 235+ return None
 236+
 237+
 238+def extract_comment_text(revision_id, revision):
 239+ comment = {}
 240+ text = revision.find('comment')
 241+ if text != None and text.text != None:
 242+ comment[revision_id] = text.text.encode('utf-8')
 243+ return comment
 244+
 245+
 246+def create_namespace_dict(siteinfo, xml_namespace):
 247+ '''
 248+ This function determines the local names of the different namespaces.
 249+ '''
 250+ namespaces = {}
 251+ print 'Detected xml namespace: %s' % xml_namespace
 252+ print 'Constructing namespace dictionary'
 253+ elements = siteinfo.find('%s%s' % (xml_namespace, 'namespaces'))
 254+ for elem in elements.getchildren():
 255+ key = int(elem.get('key'))
 256+ namespaces[key] = elem.text #extract_text(ns)
 257+ text = elem.text if elem.text != None else ''
 258+ try:
 259+ print key, text.encode('utf-8')
 260+ except UnicodeEncodeError:
 261+ print key
 262+ elem.clear()
 263+ if namespaces == {}:
 264+ print 'Could not determine namespaces. Exiting.'
 265+ sys.exit(-1)
 266+ return namespaces
 267+
 268+
 269+def determine_xml_namespace(siteinfo):
 270+ '''
 271+ This function determines the xml_namespace version
 272+ '''
 273+ for elem in siteinfo :
 274+ if elem.tag.endswith('sitename'):
 275+ xml_namespace = elem.tag
 276+ pos = xml_namespace.find('sitename')
 277+ xml_namespace = xml_namespace[0:pos]
 278+ elem.clear()
 279+ return xml_namespace
 280+ else:
 281+ elem.clear()
Property changes on: trunk/tools/editor_trends/etl/variables.py
___________________________________________________________________
Added: svn:eol-style
1282 + native
Index: trunk/tools/editor_trends/etl/store.py
@@ -108,24 +108,27 @@
109109 db.add_index('category')
110110
111111 location = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt')
112 - fh = file_utils.create_txt_filehandle(location, 'titles.csv', 'r', 'utf-8')
113 - print 'Storing article titles...'
114 - for line in fh:
115 - line = line.strip()
116 - #print line.encode('utf-8')
117 - line = line.split('\t')
118 - data = {}
119 - x, y = 0, 1
120 - while y < len(line):
121 - key, value = line[x], line[y]
122 - data[key] = value
123 - x += 2
124 - y += 2
125 - db.insert(data)
126 - fh.close()
127 - print 'Done...'
 112+ files = file_utils.retrieve_file_list(rts.txt, extension='csv', mask='article')
128113
 114+ print 'Storing article...'
 115+ for file in files:
 116+ print file
 117+ fh = file_utils.create_txt_filehandle(rts.txt, file, 'r', 'utf-8')
 118+ for line in fh:
 119+ line = line.strip()
 120+ line = line.split('\t')
 121+ data = {}
 122+ x, y = 0, 1
 123+ while y < len(line):
 124+ key, value = line[x], line[y]
 125+ data[key] = value
 126+ x += 2
 127+ y += 2
 128+ db.insert(data)
 129+ fh.close()
 130+ print 'Done storing articles...'
129131
 132+
130133 def launcher(rts):
131134 '''
132135 This is the main entry point and creates a number of workers and launches
Index: trunk/tools/editor_trends/etl/extracter.py
@@ -12,460 +12,277 @@
1313 http://www.fsf.org/licenses/gpl.html
1414 '''
1515
 16+
1617 __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__email__ = 'dvanliere at gmail dot com'
18 -__date__ = '2010-12-13'
 18+__author__email = 'dvanliere at gmail dot com'
 19+__date__ = '2011-04-10'
1920 __version__ = '0.1'
2021
 22+
 23+from collections import deque
2124 import sys
22 -import re
23 -import os
24 -import multiprocessing
25 -import progressbar
26 -from Queue import Empty
 25+from xml.etree.cElementTree import iterparse, dump
 26+from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager
2727
2828 if '..' not in sys.path:
2929 sys.path.append('..')
3030
31 -import wikitree.parser
32 -from bots import detector
33 -from utils import file_utils
34 -from utils import compression
35 -from utils import log
 31+from etl import variables
 32+from classes import buffer
 33+from analyses.adhoc import bot_detector
3634
37 -try:
38 - import psyco
39 - psyco.full()
40 -except ImportError:
41 - pass
 35+def parse_revision(revision, article, xml_namespace, cache, bots, md5hashes, size):
 36+ if revision == None:
 37+ #the entire revision is empty, weird.
 38+ #dump(revision)
 39+ return md5hashes, size
4240
 41+ contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
 42+ contributor = variables.parse_contributor(contributor, bots, xml_namespace)
 43+ if not contributor:
 44+ #editor is anonymous, ignore
 45+ return md5hashes, size
4346
44 -RE_NUMERIC_CHARACTER = re.compile('&#(\d+);')
 47+ revision_id = revision.find('%s%s' % (xml_namespace, 'id'))
 48+ revision_id = variables.extract_revision_id(revision_id)
 49+ if revision_id == None:
 50+ #revision_id is missing, which is weird
 51+ return md5hashes, size
4552
 53+ article['revision_id'] = revision_id
 54+ text = variables.extract_revision_text(revision)
 55+ article.update(contributor)
4656
47 -def remove_numeric_character_references(rts, text):
48 - return re.sub(RE_NUMERIC_CHARACTER, lenient_deccharref, text).encode('utf-8')
 57+ comment = variables.extract_comment_text(revision_id, revision)
 58+ cache.comments.update(comment)
4959
 60+ timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text
 61+ article['timestamp'] = timestamp
5062
51 -def lenient_deccharref(m):
52 - try:
53 - return unichr(int(m.group(1)))
54 - except ValueError:
55 - '''
56 - There are a few articles that raise a Value Error here, the reason is
57 - that I am using a narrow Python build (UCS2) instead of a wide build
58 - (UCS4). The quick fix is to return an empty string...
59 - Real solution is to rebuild Python with UCS4 support.....
60 - '''
61 - return ''
 63+ hash = variables.create_md5hash(text)
 64+ revert = variables.is_revision_reverted(hash['hash'], md5hashes)
 65+ md5hashes.append(hash['hash'])
 66+ size = variables.calculate_delta_article_size(size, text)
6267
 68+ article.update(hash)
 69+ article.update(size)
 70+ article.update(revert)
 71+ cache.add(article)
 72+ return md5hashes, size
6373
64 -def build_namespaces_locale(namespaces, include=['0']):
65 - '''
66 - @include is a list of namespace keys that should not be ignored, the default
67 - setting is to ignore all namespaces except the main namespace.
68 - '''
69 - ns = {}
70 - for key, value in namespaces.iteritems():
71 - if key in include:
72 - #value = namespaces[namespace].get(u'*', None)
73 - #ns.append(value)
74 - ns[key] = value
75 - return ns
 74+def setup_parser(rts):
 75+ bots = '' #bot_detector.retrieve_bots(rts.language.code)
 76+ start = 'start'; end = 'end'
 77+ context = iterparse(fh, events=(start, end))
 78+ context = iter(context)
7679
 80+ include_ns = {3: 'User Talk',
 81+ 5: 'Wikipedia Talk',
 82+ 1: 'Talk',
 83+ 2: 'User',
 84+ 4: 'Wikipedia'}
7785
78 -def parse_comments(rts, revisions, function):
79 - for revision in revisions:
80 - comment = revision.find('{%s}comment' % rts.xml_namespace)
81 - if comment != None and comment.text != None:
82 - comment.text = function(comment.text)
83 - return revisions
 86+ return bots, context, include_ns
8487
8588
86 -def parse_article(elem, namespaces):
87 - '''
88 - @namespaces is a list of valid namespaces that should be included in the analysis
89 - if the article should be ignored then this function returns false, else it returns
90 - the namespace identifier and namespace name.
91 - '''
92 - title = elem.text
93 - if title == None:
94 - return False
95 - ns = title.split(':')
96 - if len(ns) == 1 and '0' in namespaces:
97 - return {'id': 0, 'name': 'main namespace'}
98 - else:
99 - if ns[0] in namespaces.values():
100 - #print namespaces, title.decode('utf-8'), ns
101 - return {'id': ns[0], 'name': ns[1]}
102 - else:
103 - return False
 89+def datacompetition_count_edits(rts, file_id):
 90+ bots, context, include_ns = setup_parser(rts)
 91+ counts = {}
 92+ id = False
 93+ ns = False
 94+ parse = False
10495
 96+ try:
 97+ for event, elem in context:
 98+ if event is end and elem.tag.endswith('siteinfo'):
 99+ xml_namespace = variables.determine_xml_namespace(elem)
 100+ namespaces = variables.create_namespace_dict(elem, xml_namespace)
 101+ ns = True
 102+ elem.clear()
105103
106 -def validate_hostname(address):
107 - '''
108 - This is not a foolproof solution at all. The problem is that it's really
109 - hard to determine whether a string is a hostname or not **reliably**. This
110 - is a very fast rule of thumb. Will lead to false positives,
111 - but that's life :)
112 - '''
113 - parts = address.split(".")
114 - if len(parts) > 2:
115 - return True
116 - else:
117 - return False
 104+ elif event is end and elem.tag.endswith('title'):
 105+ title = variables.parse_title(elem)
 106+ article['title'] = title
 107+ current_namespace = variables.determine_namespace(title, namespaces, include_ns)
 108+ if current_namespace != False:
 109+ parse = True
 110+ article['namespace'] = current_namespace
 111+ cache.count_articles += 1
 112+ elem.clear()
118113
 114+ elif elem.tag.endswith('revision') and parse == True:
 115+ if event is start:
 116+ clear = False
 117+ else:
 118+ print 'IMPLEMENT'
 119+ #md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
 120+ cache.count_revisions += 1
 121+ clear = True
 122+ if clear:
 123+ elem.clear()
119124
120 -def validate_ip(address):
121 - parts = address.split(".")
122 - if len(parts) != 4:
123 - return False
124 - parts = parts[:3]
125 - for item in parts:
126 - try:
127 - if not 0 <= int(item) <= 255:
128 - return False
129 - except ValueError:
130 - return False
131 - return True
 125+ elif event is end and elem.tag.endswith('page'):
 126+ elem.clear()
 127+ #Reset all variables for next article
 128+ id = False
 129+ parse = False
132130
 131+ except SyntaxError, error:
 132+ print 'Encountered invalid XML tag. Error message: %s' % error
 133+ dump(elem)
 134+ sys.exit(-1)
 135+ except IOError, error:
 136+ print '''Archive file is possibly corrupted. Please delete this archive
 137+ and retry downloading. Error message: %s''' % error
 138+ sys.exit(-1)
133139
134 -def determine_username_is_bot(contributor, **kwargs):
135 - '''
136 - #contributor is an xml element containing the id of the contributor
137 - @bots should have a dict with all the bot ids and bot names
138 - @Return False if username id is not in bot dict id or True if username id
139 - is a bot id.
140 - '''
141 - bots = kwargs.get('bots')
142 - username = contributor.find('username')
143 - if username == None:
144 - return 0
145 - else:
146 - if username.text in bots:
147 - return 1
148 - else:
149 - return 0
 140+ filename = 'counts_kaggle_%s.csv' % file_id
 141+ fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8')
 142+ file_utils.write_dict_to_csv(counts, fh, keys)
 143+ fh.close()
150144
 145+ filename = 'counts_kaggle_%s.bin' % file_id
 146+ file_utils.store_object(counts, location, filename)
151147
152 -def extract_username(contributor, **kwargs):
153 - contributor = contributor.find('username')
154 - if contributor != None and contributor.text != None:
155 - contributor = contributor.text.encode('utf-8')
156 - return contributor.decode('utf-8')
157 - else:
158 - return None
159148
 149+def parse_xml(fh, cache, rts):
 150+ bots, context, include_ns = setup_parser(rts)
 151+ article = {}
 152+ size = {}
 153+ id = False
 154+ ns = False
 155+ parse = False
160156
161 -def extract_revision_id(revision_id, **kwargs):
162 - if revision_id != None:
163 - return revision_id.text
164 - else:
165 - return None
 157+ try:
 158+ for event, elem in context:
 159+ if event is end and elem.tag.endswith('siteinfo'):
 160+ xml_namespace = variables.determine_xml_namespace(elem)
 161+ namespaces = variables.create_namespace_dict(elem, xml_namespace)
 162+ ns = True
 163+ elem.clear()
166164
 165+ elif event is end and elem.tag.endswith('title'):
 166+ title = variables.parse_title(elem)
 167+ article['title'] = title
 168+ current_namespace = variables.determine_namespace(title, namespaces, include_ns)
 169+ title_meta = variables.parse_title_meta_data(title, current_namespace)
 170+ if current_namespace != False:
 171+ parse = True
 172+ article['namespace'] = current_namespace
 173+ cache.count_articles += 1
 174+ md5hashes = deque()
 175+ elem.clear()
167176
168 -def extract_contributor_id(contributor, **kwargs):
169 - '''
170 - @contributor is the xml contributor node containing a number of attributes
171 - Currently, we are only interested in registered contributors, hence we
172 - ignore anonymous editors.
173 - '''
174 - if contributor.get('deleted'):
175 - # ASK: Not sure if this is the best way to code deleted contributors.
176 - return None
177 - elem = contributor.find('id')
178 - if elem != None:
179 - return {'id':elem.text}
180 - else:
181 - elem = contributor.find('ip')
182 - if elem != None and elem.text != None \
183 - and validate_ip(elem.text) == False \
184 - and validate_hostname(elem.text) == False:
185 - return {'username':elem.text, 'id': elem.text}
186 - else:
187 - return None
 177+ elif elem.tag.endswith('revision') and parse == True:
 178+ if event is start:
 179+ clear = False
 180+ else:
 181+ md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
 182+ cache.count_revisions += 1
 183+ clear = True
 184+ if clear:
 185+ elem.clear()
188186
 187+ elif event is end and elem.tag.endswith('id') and id == False:
 188+ article['article_id'] = elem.text
 189+ if current_namespace:
 190+ cache.articles[article['article_id']] = title_meta
 191+ id = True
 192+ elem.clear()
189193
190 -def parse_title(title):
191 - title_data = {}
192 - t1 = type(title.text)
193 - if type(title.text) != type('str'):
194 - print 'encodign'
195 - print title.text.encode('utf-8')
196 - title = title.text.encode('utf-8')
197 - title = title.decode('utf-8', 'ignore')
198 - print title
199 - else:
200 - title = title.text
 194+ elif event is end and elem.tag.endswith('page'):
 195+ elem.clear()
 196+ #Reset all variables for next article
 197+ article = {}
 198+ size = {}
 199+ md5hashes = deque()
 200+ id = False
 201+ parse = False
201202
202 - #title = title.encode('utf-8')
203 - title_data['title'] = title #.decode('utf-8')
204 - t2 = type(title_data['title'])
205 - print t1, t2
206 - #title_data['title'] = title
207 - if title_data['title'].startswith('List of'):
208 - title_data['list'] = True
209 - else:
210 - title_data['list'] = False
211 - return title_data
212203
 204+ except SyntaxError, error:
 205+ print 'Encountered invalid XML tag. Error message: %s' % error
 206+ dump(elem)
 207+ sys.exit(-1)
 208+ except IOError, error:
 209+ print '''Archive file is possibly corrupted. Please delete this archive
 210+ and retry downloading. Error message: %s''' % error
 211+ sys.exit(-1)
213212
214 -def output_editor_information(revisions, page, bots, rts):
215 - '''
216 - @elem is an XML element containing 1 revision from a page
217 - @output is where to store the data, a filehandle
218 - @**kwargs contains extra information
219 -
220 - the variable tags determines which attributes are being parsed, the values
221 - in this dictionary are the functions used to extract the data.
222 - '''
223 - headers = ['id', 'date', 'article', 'username', 'revision_id']
224 - tags = {'contributor': {'id': extract_contributor_id,
225 - 'bot': determine_username_is_bot,
226 - 'username': extract_username,
227 - },
228 - 'timestamp': {'date': wikitree.parser.extract_text},
229 - 'id': {'revision_id': extract_revision_id,
230 - }
231 - }
232 - vars = {}
233 - flat = []
234213
235 - for x, revision in enumerate(revisions):
236 - vars[x] = {}
237 - vars[x]['article'] = page
238 - for tag in tags:
239 - el = revision.find('%s' % tag)
240 - if el == None:
241 - #print cElementTree.tostring(revision, 'utf-8')
242 - del vars[x]
243 - break
244 - for function in tags[tag].keys():
245 - f = tags[tag][function]
246 - value = f(el, bots=bots)
247 - if isinstance(value, dict):
248 - for kw in value:
249 - vars[x][kw] = value[kw]
250 - else:
251 - vars[x][function] = value
 214+def stream_raw_xml(input_queue, process_id, lock, rts):
 215+ t0 = datetime.datetime.now()
 216+ i = 0
 217+ file_id = 0
 218+ cache = buffer.CSVBuffer(process_id, rts, lock)
252219
253 - '''
254 - This loop determines for each observation whether it should be stored
255 - or not.
256 - '''
257 - for x in vars:
258 - if vars[x]['bot'] == 1 or vars[x]['id'] == None \
259 - or vars[x]['username'] == None:
260 - continue
261 - else:
262 - f = []
263 - for head in headers:
264 - f.append(vars[x][head])
265 - flat.append(f)
266 - return flat
267 -
268 -
269 -def add_namespace_to_output(output, namespace):
270 - for x, o in enumerate(output):
271 - o.append(namespace['id'])
272 - output[x] = o
273 - return output
274 -
275 -
276 -def parse_dumpfile(tasks, rts, lock):
277 - bot_ids = detector.retrieve_bots(rts.language.code)
278 - location = os.path.join(rts.input_location, rts.language.code, rts.project.name)
279 - output = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt')
280 - widgets = log.init_progressbar_widgets('Extracting data')
281 - filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 'a',
282 - 'utf-8') for fh in xrange(rts.max_filehandles)]
283220 while True:
284 - total, processed = 0.0, 0.0
285 - try:
286 - filename = tasks.get(block=False)
287 - except Empty:
288 - break
289 - finally:
290 - print tasks.qsize()
291 - tasks.task_done()
292 -
 221+ filename = input_queue.get()
 222+ input_queue.task_done()
 223+ file_id += 1
293224 if filename == None:
294 - print '\nThere are no more jobs in the queue left.'
 225+ print '%s files left in the queue' % input_queue.qsize()
295226 break
296227
297 - filesize = file_utils.determine_filesize(location, filename)
298 - print 'Opening %s...' % (os.path.join(location, filename))
299 - print 'Filesize: %s' % filesize
300 - fh1 = file_utils.create_txt_filehandle(location, filename, 'r', 'ascii')
301 - fh2 = file_utils.create_txt_filehandle(location, 'articles.csv', 'a', 'utf-8')
302 - ns, xml_namespace = wikitree.parser.extract_meta_information(fh1)
303 - ns = build_namespaces_locale(ns, rts.namespaces)
304 - rts.xml_namespace = xml_namespace
 228+ fh = file_utils.create_streaming_buffer(filename)
305229
306 - pbar = progressbar.ProgressBar(widgets=widgets, maxval=filesize).start()
307 - for page, article_size in wikitree.parser.read_input(fh1):
308 - title = page.find('title')
309 - total += 1
310 - namespace = parse_article(title, ns)
311 - if namespace != False:
312 - article_id = page.find('id').text
313 - title = parse_title(title)
314 - revisions = page.findall('revision')
315 - revisions = parse_comments(rts, revisions, remove_numeric_character_references)
316 - output = output_editor_information(revisions, article_id, bot_ids, rts)
317 - output = add_namespace_to_output(output, namespace)
318 - write_output(output, filehandles, lock, rts)
319 - #file_utils.write_list_to_csv([article_id, title.values()], fh2, newline=False, lock=lock)
320 - processed += 1
321 - page.clear()
322 - pbar.update(pbar.currval + article_size)
 230+ if rts.kaggle:
 231+ datacompetition_count_edits(rts, file_id)
 232+ else:
 233+ parse_xml(fh, cache, rts)
323234
324 - fh1.close()
325 - fh2.close()
326 - print 'Closing %s...' % (os.path.join(location, filename))
327 - print 'Total pages: %s' % total
328 - try:
329 - print 'Pages processed: %s (%s)' % (processed, processed / total)
330 - except ZeroDivisionError:
331 - print 'Pages processed: %s' % processed
 235+ i += 1
 236+ if i % 10000 == 0:
 237+ print 'Worker %s parsed %s articles' % (process_id, i)
 238+ fh.close()
332239
333 - return True
 240+ t1 = datetime.datetime.now()
 241+ print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0))
 242+ print 'There are %s files left in the queue' % (input_queue.qsize())
 243+ t0 = t1
334244
 245+ if rts.kaggle:
 246+ cache.close()
 247+ cache.summary()
335248
336 -def group_observations(obs):
337 - '''
338 - This function groups observation by editor id, this way we have to make
339 - fewer fileopening calls.
340 - '''
341 - d = {}
342 - for o in obs:
343 - id = o[0]
344 - if id not in d:
345 - d[id] = []
346 - #if len(o) == 6:
347 - d[id].append(o)
348 - return d
349249
 250+def debug():
 251+ fh = 'c:\\wikimedia\sv\wiki\svwiki-latest-stub-meta-history.xml'
350252
351 -def write_output(observations, filehandles, lock, rts):
352 - observations = group_observations(observations)
353 - for obs in observations:
354 - #lock the write around all edits of an editor for a particular page
355 - lock.acquire()
356 - try:
357 - for i, o in enumerate(observations[obs]):
358 - if i == 0:
359 - fh = filehandles[hash(rts, obs)]
360 - file_utils.write_list_to_csv(o, fh)
361253
362 - except Exception, error:
363 - print 'Encountered the following error while writing data to %s: %s' % (error, fh)
364 - finally:
365 - lock.release()
366 -
367 -
368 -def hash(rts, id):
369 - '''
370 - A very simple hash function based on modulo. The except clause has been
371 - added because there are instances where the username is stored in userid
372 - tag and hence that's a string and not an integer.
373 - '''
374 - try:
375 - return int(id) % rts.max_filehandles
376 - except ValueError:
377 - return sum([ord(i) for i in id]) % rts.max_filehandles
378 -
379 -
380 -def prepare(rts):
381 - output_articles = os.path.join(rts.input_location, rts.language.code,
382 - rts.project.name)
383 - output_txt = os.path.join(rts.input_location, rts.language.code,
384 - rts.project.name, 'txt')
385 - res = file_utils.delete_file(output_articles, 'articles.csv')
386 - res = file_utils.delete_file(output_txt, None, directory=True)
387 - if res:
388 - res = file_utils.create_directory(output_txt)
389 - return res
390 -
391 -
392 -def unzip(properties):
393 - tasks = multiprocessing.JoinableQueue()
394 - canonical_filename = file_utils.determine_canonical_name(properties.filename)
395 - extension = file_utils.determine_file_extension(properties.filename)
396 - files = file_utils.retrieve_file_list(properties.location,
397 - extension,
398 - mask=canonical_filename)
399 - print properties.location
400 - print 'Checking if dump file has been extracted...'
401 - for fn in files:
402 - file_without_ext = fn.replace('%s%s' % ('.', extension), '')
403 - result = file_utils.check_file_exists(properties.location, file_without_ext)
404 - if not result:
405 - print 'Dump file %s has not yet been extracted...' % fn
406 - retcode = compression.launch_zip_extractor(properties.location,
407 - fn,
408 - properties)
409 - else:
410 - print 'Dump file has already been extracted...'
411 - retcode = 0
412 - if retcode == 0:
413 - tasks.put(file_without_ext)
414 - elif retcode != 0:
415 - print 'There was an error while extracting %s, please make sure \
416 - that %s is valid archive.' % (fn, fn)
417 - return False
418 - return tasks
419 -
420 -
421254 def launcher(rts):
422 - '''
423 - This is the main entry point for the extact phase of the data processing
424 - chain. First, it will put a the files that need to be extracted in a queue
425 - called tasks, then it will remove some old files to make sure that there is
426 - no data pollution and finally it will start the parser to actually extract
427 - the variables from the different dump files.
428 - '''
429 - tasks = unzip(rts)
430 - if not tasks:
431 - return False
 255+ lock = RLock()
 256+ mgr = Manager()
 257+ open_handles = []
 258+ open_handles = mgr.list(open_handles)
 259+ clock = CustomLock(lock, open_handles)
 260+ input_queue = JoinableQueue()
432261
433 - result = prepare(rts)
434 - if not result:
435 - return result
 262+ files = file_utils.retrieve_file_list(rts.input_location)
436263
437 - lock = multiprocessing.Lock()
 264+ if len(files) > cpu_count():
 265+ processors = cpu_count() - 1
 266+ else:
 267+ processors = len(files)
438268
439 - consumers = [multiprocessing.Process(target=parse_dumpfile,
440 - args=(tasks,
441 - rts,
442 - lock))
443 - for x in xrange(rts.number_of_processes)]
 269+ for filename in files:
 270+ filename = os.path.join(rts.input_location, filename)
 271+ print filename
 272+ input_queue.put(filename)
444273
445 - for x in xrange(rts.number_of_processes):
446 - tasks.put(None)
 274+ for x in xrange(processors):
 275+ print 'Inserting poison pill %s...' % x
 276+ input_queue.put(None)
447277
448 - for w in consumers:
449 - print 'Launching process...'
450 - w.start()
 278+ extracters = [Process(target=stream_raw_xml, args=[input_queue, process_id,
 279+ clock, rts])
 280+ for process_id in xrange(processors)]
 281+ for extracter in extracters:
 282+ extracter.start()
451283
452 - tasks.join()
 284+ input_queue.join()
 285+ print 'Finished parsing Wikipedia dump files.'
453286
454 - result = sum([consumer.exitcode for consumer in consumers
455 - if consumer.exitcode != None])
456287
457 - if result == 0:
458 - return True
459 - else:
460 - return False
461 -
462 -
463 -def debug():
464 - pass
465 - #project = 'wiki'
466 - #language_code = 'sv'
467 - #filename = 'svwiki-latest-stub-meta-history.xml'
468 - #parse_dumpfile(project, filename, language_code)
469 -
470 -
471288 if __name__ == '__main__':
472289 debug()
Index: trunk/tools/editor_trends/etl/sort.py
@@ -45,7 +45,7 @@
4646 self.result.put(None)
4747 break
4848 elif filename.startswith('comments') or \
49 - filename.startswith('title'):
 49+ filename.startswith('article'):
5050 continue
5151 fh = file_utils.create_txt_filehandle(self.rts.txt,
5252 filename,
@@ -148,7 +148,6 @@
149149 rts is an instance of RunTimeSettings
150150 '''
151151 files = file_utils.retrieve_file_list(rts.txt, 'csv')
152 -
153152 pbar = progressbar.ProgressBar(maxval=len(files)).start()
154153 tasks = multiprocessing.JoinableQueue()
155154 result = multiprocessing.JoinableQueue()
Index: trunk/tools/editor_trends/classes/buffer.py
@@ -0,0 +1,183 @@
 2+class CustomLock:
 3+ def __init__(self, lock, open_handles):
 4+ self.lock = lock
 5+ self.open_handles = open_handles
 6+
 7+ def available(self, handle):
 8+ self.lock.acquire()
 9+ try:
 10+ self.open_handles.index(handle)
 11+ #print 'RETRIEVED FILEHANDLE %s' % handle
 12+ return False
 13+ except (ValueError, Exception), error:
 14+ self.open_handles.append(handle)
 15+ #print 'ADDED FILEHANDLE %s' % handle
 16+ return True
 17+ finally:
 18+ #print 'FIles locked: %s' % len(self.open_handles)
 19+ self.lock.release()
 20+
 21+ def release(self, handle):
 22+ #print 'RELEASED FILEHANDLE %s' % handle
 23+ self.open_handles.remove(handle)
 24+
 25+
 26+class CSVBuffer:
 27+ def __init__(self, process_id, rts, lock):
 28+ self.rts = rts
 29+ self.lock = lock
 30+ self.revisions = {}
 31+ self.comments = {}
 32+ self.articles = {}
 33+ self.process_id = process_id
 34+ self.count_articles = 0
 35+ self.count_revisions = 0
 36+ self.filehandles = [file_utils.create_txt_filehandle(self.rts.txt,
 37+ file_id, 'a', 'utf-8') for file_id in xrange(self.rts.max_filehandles)]
 38+ self.keys = ['revision_id', 'article_id', 'id', 'username', 'namespace',
 39+ 'title', 'timestamp', 'hash', 'revert', 'bot', 'cur_size',
 40+ 'delta']
 41+ self.fh_articles = file_utils.create_txt_filehandle(self.rts.txt,
 42+ 'articles_%s' % self.process_id, 'w', 'utf-8')
 43+ self.fh_comments = file_utils.create_txt_filehandle(self.rts.txt,
 44+ 'comments_%s' % self.process_id, 'w', 'utf-8')
 45+
 46+ def get_hash(self, id):
 47+ '''
 48+ A very simple hash function based on modulo. The except clause has been
 49+ added because there are instances where the username is stored in userid
 50+ tag and hence that's a string and not an integer.
 51+ '''
 52+ try:
 53+ return int(id) % self.rts.max_filehandles
 54+ except ValueError:
 55+ return sum([ord(i) for i in id]) % self.rts.max_filehandles
 56+
 57+ def invert_dictionary(self, editors):
 58+ hashes = {}
 59+ for editor, file_id in editors.iteritems():
 60+ hashes.setdefault(file_id, [])
 61+ hashes[file_id].append(editor)
 62+ return hashes
 63+
 64+ def group_revisions_by_fileid(self):
 65+ '''
 66+ This function groups observation by editor id and then by file_id,
 67+ this way we have to make fewer file opening calls and should reduce
 68+ processing time.
 69+ '''
 70+ data = {}
 71+ editors = {}
 72+ #first, we group all revisions by editor
 73+
 74+ for revision in self.revisions.values():
 75+ row = []
 76+ #strip away the keys and make sure that the values are always in the same sequence
 77+ for key in self.keys:
 78+ row.append(revision[key].decode('utf-8'))
 79+ editor_id = row[0]
 80+ data.setdefault(editor_id, [])
 81+ data[editor_id].append(row)
 82+ editors.setdefault(editor_id, self.get_hash(editor_id))
 83+
 84+ #now, we are going to group all editors by file_id
 85+ file_ids = self.invert_dictionary(editors)
 86+ self.revisions = {}
 87+ for file_id, editors in file_ids.iteritems():
 88+ for editor in editors:
 89+ self.revisions.setdefault(file_id, [])
 90+ self.revisions[file_id].extend(data[editor])
 91+
 92+ def add(self, revision):
 93+ self.stringify(revision)
 94+ id = revision['revision_id']
 95+ self.revisions[id] = revision
 96+ if len(self.revisions) > 10000:
 97+ #print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions))
 98+ self.store()
 99+
 100+
 101+ def stringify(self, revision):
 102+ for key, value in revision.iteritems():
 103+ try:
 104+ value = str(value)
 105+ except UnicodeEncodeError:
 106+ value = value.encode('utf-8')
 107+ revision[key] = value
 108+
 109+
 110+ def summary(self):
 111+ print 'Worker %s: Number of articles: %s' % (self.process_id, self.count_articles)
 112+ print 'Worker %s: Number of revisions: %s' % (self.process_id, self.count_revisions)
 113+
 114+ def store(self):
 115+ self.write_revisions()
 116+ self.write_articles()
 117+ self.write_comments()
 118+
 119+ def close(self):
 120+ self.store()
 121+ self.filehandles = [fh.close() for fh in self.filehandles]
 122+
 123+ def write_comments(self):
 124+ rows = []
 125+ try:
 126+ for revision_id, comment in self.comments.iteritems():
 127+ #comment = comment.decode('utf-8')
 128+ #row = '\t'.join([revision_id, comment]) + '\n'
 129+ rows.append([revision_id, comment])
 130+ file_utils.write_list_to_csv(rows, self.fh_comments)
 131+ self.comments = {}
 132+ except Exception, error:
 133+ print '''Encountered the following error while writing comment data
 134+ to %s: %s''' % (self.fh_comments, error)
 135+
 136+ def write_articles(self):
 137+ #t0 = datetime.datetime.now()
 138+ if len(self.articles.keys()) > 10000:
 139+ rows = []
 140+ try:
 141+ for article_id, data in self.articles.iteritems():
 142+ keys = data.keys()
 143+ keys.insert(0, 'id')
 144+
 145+ values = data.values()
 146+ values.insert(0, article_id)
 147+
 148+ row = zip(keys, values)
 149+ row = list(itertools.chain(*row))
 150+ #row = '\t'.join([article_id, title]) + '\n'
 151+ rows.append(row)
 152+ file_utils.write_list_to_csv(rows, self.fh_articles, newline=False)
 153+ self.articles = {}
 154+ except Exception, error:
 155+ print '''Encountered the following error while writing article
 156+ data to %s: %s''' % (self.fh_articles, error)
 157+ #t1 = datetime.datetime.now()
 158+ #print '%s articles took %s' % (len(self.articles.keys()), (t1 - t0))
 159+
 160+ def write_revisions(self):
 161+ #t0 = datetime.datetime.now()
 162+ self.group_revisions_by_fileid()
 163+ file_ids = self.revisions.keys()
 164+ for file_id in file_ids:
 165+ wait = True
 166+ for i, revision in enumerate(self.revisions[file_id]):
 167+ if i == 0:
 168+ while wait:
 169+ #print file_id, self.lock
 170+ if self.lock.available(file_id):
 171+ fh = self.filehandles[file_id]
 172+ wait = False
 173+ try:
 174+ file_utils.write_list_to_csv(revision, fh)
 175+ except Exception, error:
 176+ print '''Encountered the following error while writing
 177+ revision data to %s: %s''' % (fh, error)
 178+ self.lock.release(file_id)
 179+ del self.revisions[file_id]
 180+ wait = True
 181+# t1 = datetime.datetime.now()
 182+# print 'Worker %s: %s revisions took %s' % (self.process_id,
 183+# len([1]),
 184+# (t1 - t0))
Property changes on: trunk/tools/editor_trends/classes/buffer.py
___________________________________________________________________
Added: svn:eol-style
1185 + native