Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -1,823 +0,0 @@ |
2 | | -#!/usr/bin/python |
3 | | -# -*- coding: utf-8 -*- |
4 | | -''' |
5 | | -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
6 | | -This program is free software; you can redistribute it and/or |
7 | | -modify it under the terms of the GNU General Public License version 2 |
8 | | -as published by the Free Software Foundation. |
9 | | -This program is distributed in the hope that it will be useful, |
10 | | -but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | | -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
12 | | -See the GNU General Public License for more details, at |
13 | | -http://www.fsf.org/licenses/gpl.html |
14 | | -''' |
15 | | - |
16 | | -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
17 | | -__email__ = 'dvanliere at gmail dot com' |
18 | | -__date__ = '2011-02-06' |
19 | | -__version__ = '0.1' |
20 | | - |
21 | | - |
22 | | -import os |
23 | | -import hashlib |
24 | | -import codecs |
25 | | -import sys |
26 | | -import itertools |
27 | | -import datetime |
28 | | -import progressbar |
29 | | -from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager |
30 | | -from xml.etree.cElementTree import iterparse, dump |
31 | | -from collections import deque |
32 | | - |
33 | | -if '..' not in sys.path: |
34 | | - sys.path.append('..') |
35 | | - |
36 | | -from classes import storage |
37 | | -from analyses.adhoc import bot_detector |
38 | | -from utils import file_utils |
39 | | - |
40 | | -EXCLUDE_NAMESPACE = { |
41 | | - #0:'Main', |
42 | | - #1:'Talk', |
43 | | - #2:'User', |
44 | | - #3:'User talk', |
45 | | - #4:'Wikipedia', |
46 | | - #5:'Wikipedia talk', |
47 | | - 6:'File', |
48 | | - #7:'File talk', |
49 | | - 8:'MediaWiki', |
50 | | - #9:'MediaWiki talk', |
51 | | - 10:'Template', |
52 | | - #11:'Template talk', |
53 | | - 12:'Help', |
54 | | - #13:'Help talk', |
55 | | - 14:'Category', |
56 | | - #15:'Category talk', |
57 | | - 90:'Thread', |
58 | | - #91:'Thread talk', |
59 | | - 92:'Summary', |
60 | | - #93:'Summary talk', |
61 | | - 100:'Portal', |
62 | | - #101:'Portal talk', |
63 | | - 108:'Book', |
64 | | - #109:'Book talk' |
65 | | -} |
66 | | - |
67 | | -COUNT_EXCLUDE_NAMESPACE = { |
68 | | - 1:'Talk', |
69 | | - 2:'User', |
70 | | - 4:'Wikipedia', |
71 | | - 6:'File', |
72 | | - 8:'MediaWiki', |
73 | | - 10:'Template', |
74 | | - 12:'Help', |
75 | | - 14:'Category', |
76 | | - 90:'Thread', |
77 | | - 92:'Summary', |
78 | | - 100:'Portal', |
79 | | - 108:'Book', |
80 | | -} |
81 | | - |
82 | | - |
83 | | -class CustomLock: |
84 | | - def __init__(self, lock, open_handles): |
85 | | - self.lock = lock |
86 | | - self.open_handles = open_handles |
87 | | - |
88 | | - def available(self, handle): |
89 | | - self.lock.acquire() |
90 | | - try: |
91 | | - self.open_handles.index(handle) |
92 | | - #print 'RETRIEVED FILEHANDLE %s' % handle |
93 | | - return False |
94 | | - except (ValueError, Exception), error: |
95 | | - self.open_handles.append(handle) |
96 | | - #print 'ADDED FILEHANDLE %s' % handle |
97 | | - return True |
98 | | - finally: |
99 | | - #print 'FIles locked: %s' % len(self.open_handles) |
100 | | - self.lock.release() |
101 | | - |
102 | | - def release(self, handle): |
103 | | - #print 'RELEASED FILEHANDLE %s' % handle |
104 | | - self.open_handles.remove(handle) |
105 | | - |
106 | | - |
107 | | -class Buffer: |
108 | | - def __init__(self, process_id, rts, lock): |
109 | | - self.rts = rts |
110 | | - self.lock = lock |
111 | | - self.revisions = {} |
112 | | - self.comments = {} |
113 | | - self.articles = {} |
114 | | - self.process_id = process_id |
115 | | - self.count_articles = 0 |
116 | | - self.count_revisions = 0 |
117 | | - self.filehandles = [file_utils.create_txt_filehandle(self.rts.txt, |
118 | | - file_id, 'a', 'utf-8') for file_id in xrange(self.rts.max_filehandles)] |
119 | | - self.keys = ['revision_id', 'article_id', 'id', 'username', 'namespace', |
120 | | - 'title', 'timestamp', 'hash', 'revert', 'bot', 'cur_size', |
121 | | - 'delta'] |
122 | | - self.fh_articles = file_utils.create_txt_filehandle(self.rts.txt, |
123 | | - 'articles_%s' % self.process_id, 'w', 'utf-8') |
124 | | - self.fh_comments = file_utils.create_txt_filehandle(self.rts.txt, |
125 | | - 'comments_%s' % self.process_id, 'w', 'utf-8') |
126 | | - |
127 | | - def get_hash(self, id): |
128 | | - ''' |
129 | | - A very simple hash function based on modulo. The except clause has been |
130 | | - added because there are instances where the username is stored in userid |
131 | | - tag and hence that's a string and not an integer. |
132 | | - ''' |
133 | | - try: |
134 | | - return int(id) % self.rts.max_filehandles |
135 | | - except ValueError: |
136 | | - return sum([ord(i) for i in id]) % self.rts.max_filehandles |
137 | | - |
138 | | - def invert_dictionary(self, editors): |
139 | | - hashes = {} |
140 | | - for editor, file_id in editors.iteritems(): |
141 | | - hashes.setdefault(file_id, []) |
142 | | - hashes[file_id].append(editor) |
143 | | - return hashes |
144 | | - |
145 | | - def group_revisions_by_fileid(self): |
146 | | - ''' |
147 | | - This function groups observation by editor id and then by file_id, |
148 | | - this way we have to make fewer file opening calls and should reduce |
149 | | - processing time. |
150 | | - ''' |
151 | | - data = {} |
152 | | - editors = {} |
153 | | - #first, we group all revisions by editor |
154 | | - |
155 | | - for revision in self.revisions.values(): |
156 | | - row = [] |
157 | | - #strip away the keys and make sure that the values are always in the same sequence |
158 | | - for key in self.keys: |
159 | | - row.append(revision[key].decode('utf-8')) |
160 | | - editor_id = row[0] |
161 | | - data.setdefault(editor_id, []) |
162 | | - data[editor_id].append(row) |
163 | | - editors.setdefault(editor_id, self.get_hash(editor_id)) |
164 | | - |
165 | | - #now, we are going to group all editors by file_id |
166 | | - file_ids = self.invert_dictionary(editors) |
167 | | - self.revisions = {} |
168 | | - for file_id, editors in file_ids.iteritems(): |
169 | | - for editor in editors: |
170 | | - self.revisions.setdefault(file_id, []) |
171 | | - self.revisions[file_id].extend(data[editor]) |
172 | | - |
173 | | - def add(self, revision): |
174 | | - self.stringify(revision) |
175 | | - id = revision['revision_id'] |
176 | | - self.revisions[id] = revision |
177 | | - if len(self.revisions) > 10000: |
178 | | - #print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions)) |
179 | | - self.store() |
180 | | - |
181 | | - |
182 | | - def stringify(self, revision): |
183 | | - for key, value in revision.iteritems(): |
184 | | - try: |
185 | | - value = str(value) |
186 | | - except UnicodeEncodeError: |
187 | | - value = value.encode('utf-8') |
188 | | - revision[key] = value |
189 | | - |
190 | | - |
191 | | - def summary(self): |
192 | | - print 'Worker %s: Number of articles: %s' % (self.process_id, self.count_articles) |
193 | | - print 'Worker %s: Number of revisions: %s' % (self.process_id, self.count_revisions) |
194 | | - |
195 | | - def store(self): |
196 | | - self.write_revisions() |
197 | | - self.write_articles() |
198 | | - self.write_comments() |
199 | | - |
200 | | - def close(self): |
201 | | - self.store() |
202 | | - self.filehandles = [fh.close() for fh in self.filehandles] |
203 | | - |
204 | | - def write_comments(self): |
205 | | - rows = [] |
206 | | - try: |
207 | | - for revision_id, comment in self.comments.iteritems(): |
208 | | - #comment = comment.decode('utf-8') |
209 | | - #row = '\t'.join([revision_id, comment]) + '\n' |
210 | | - rows.append([revision_id, comment]) |
211 | | - file_utils.write_list_to_csv(rows, self.fh_comments) |
212 | | - self.comments = {} |
213 | | - except Exception, error: |
214 | | - print '''Encountered the following error while writing comment data |
215 | | - to %s: %s''' % (self.fh_comments, error) |
216 | | - |
217 | | - def write_articles(self): |
218 | | - #t0 = datetime.datetime.now() |
219 | | - if len(self.articles.keys()) > 10000: |
220 | | - rows = [] |
221 | | - try: |
222 | | - for article_id, data in self.articles.iteritems(): |
223 | | - keys = data.keys() |
224 | | - keys.insert(0, 'id') |
225 | | - |
226 | | - values = data.values() |
227 | | - values.insert(0, article_id) |
228 | | - |
229 | | - row = zip(keys, values) |
230 | | - row = list(itertools.chain(*row)) |
231 | | - #row = '\t'.join([article_id, title]) + '\n' |
232 | | - rows.append(row) |
233 | | - file_utils.write_list_to_csv(rows, self.fh_articles, newline=False) |
234 | | - self.articles = {} |
235 | | - except Exception, error: |
236 | | - print '''Encountered the following error while writing article |
237 | | - data to %s: %s''' % (self.fh_articles, error) |
238 | | - #t1 = datetime.datetime.now() |
239 | | - #print '%s articles took %s' % (len(self.articles.keys()), (t1 - t0)) |
240 | | - |
241 | | - def write_revisions(self): |
242 | | - t0 = datetime.datetime.now() |
243 | | - self.group_revisions_by_fileid() |
244 | | - file_ids = self.revisions.keys() |
245 | | - for file_id in file_ids: |
246 | | - wait = True |
247 | | - for i, revision in enumerate(self.revisions[file_id]): |
248 | | - if i == 0: |
249 | | - while wait: |
250 | | - print file_id, self.lock |
251 | | - if self.lock.available(file_id): |
252 | | - fh = self.filehandles[file_id] |
253 | | - wait = False |
254 | | - try: |
255 | | - file_utils.write_list_to_csv(revision, fh) |
256 | | - except Exception, error: |
257 | | - print '''Encountered the following error while writing |
258 | | - revision data to %s: %s''' % (fh, error) |
259 | | - self.lock.release(file_id) |
260 | | - del self.revisions[file_id] |
261 | | - wait = True |
262 | | - t1 = datetime.datetime.now() |
263 | | - print 'Worker %s: %s revisions took %s' % (self.process_id, |
264 | | - len(self.revisions), |
265 | | - (t1 - t0)) |
266 | | - |
267 | | -def extract_categories(): |
268 | | - ''' |
269 | | - Field 1: page id |
270 | | - Field 2: name category |
271 | | - Field 3: sort key |
272 | | - Field 4: timestamp last change |
273 | | - ''' |
274 | | - filename = 'C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-categorylinks.sql' |
275 | | - output = codecs.open('categories.csv', 'w', encoding='utf-8') |
276 | | - fh = codecs.open(filename, 'r', encoding='utf-8') |
277 | | - |
278 | | - try: |
279 | | - for line in fh: |
280 | | - if line.startswith('INSERT INTO `categorylinks` VALUES ('): |
281 | | - line = line.replace('INSERT INTO `categorylinks` VALUES (', '') |
282 | | - line = line.replace("'", '') |
283 | | - categories = line.split('),(') |
284 | | - for category in categories: |
285 | | - category = category.split(',') |
286 | | - if len(category) == 4: |
287 | | - output.write('%s\t%s\n' % (category[0], category[1])) |
288 | | - except UnicodeDecodeError, e: |
289 | | - print e |
290 | | - |
291 | | - output.close() |
292 | | - fh.close() |
293 | | - |
294 | | - |
295 | | -def validate_hostname(address): |
296 | | - ''' |
297 | | - This is not a foolproof solution at all. The problem is that it's really |
298 | | - hard to determine whether a string is a hostname or not **reliably**. This |
299 | | - is a very fast rule of thumb. Will lead to false positives, |
300 | | - but that's life :) |
301 | | - ''' |
302 | | - parts = address.split(".") |
303 | | - if len(parts) > 2: |
304 | | - return True |
305 | | - else: |
306 | | - return False |
307 | | - |
308 | | - |
309 | | -def validate_ip(address): |
310 | | - parts = address.split(".") |
311 | | - if len(parts) != 4: |
312 | | - return False |
313 | | - parts = parts[:3] |
314 | | - for item in parts: |
315 | | - try: |
316 | | - if not 0 <= int(item) <= 255: |
317 | | - return False |
318 | | - except ValueError: |
319 | | - return False |
320 | | - return True |
321 | | - |
322 | | - |
323 | | -def extract_revision_text(revision): |
324 | | - rev = revision.find('ns0:text') |
325 | | - if rev != None: |
326 | | - if rev.text == None: |
327 | | - rev = fix_revision_text(revision) |
328 | | - return rev.text.encode('utf-8') |
329 | | - else: |
330 | | - return '' |
331 | | - |
332 | | - |
333 | | -def parse_title(title): |
334 | | - return title.text |
335 | | - |
336 | | - |
337 | | -def parse_title_meta_data(title, namespace): |
338 | | - ''' |
339 | | - This function categorizes an article to assist the Wikimedia Taxonomy |
340 | | - project. See |
341 | | - http://meta.wikimedia.org/wiki/Contribution_Taxonomy_Project/Research_Questions |
342 | | - ''' |
343 | | - title_meta = {} |
344 | | - if not namespace: |
345 | | - return title_meta |
346 | | - |
347 | | - title_meta['title'] = title |
348 | | - ns = namespace['namespace'] |
349 | | - title_meta['ns'] = ns |
350 | | - if title.startswith('List of'): |
351 | | - title_meta['category'] = 'List' |
352 | | - elif ns == 4 or ns == 5: |
353 | | - if title.find('Articles for deletion') > -1: |
354 | | - title_meta['category'] = 'Deletion' |
355 | | - elif title.find('Mediation Committee') > -1: |
356 | | - title_meta['category'] = 'Mediation' |
357 | | - elif title.find('Mediation Cabal') > -1: |
358 | | - title_meta['category'] = 'Mediation' |
359 | | - elif title.find('Arbitration') > -1: |
360 | | - title_meta['category'] = 'Arbitration' |
361 | | - elif title.find('Featured Articles') > -1: |
362 | | - title_meta['category'] = 'Featured Article' |
363 | | - elif title.find('Featured picture candidates') > -1: |
364 | | - title_meta['category'] = 'Featured Pictures' |
365 | | - elif title.find('Featured sound candidates') > -1: |
366 | | - title_meta['category'] = 'Featured Sounds' |
367 | | - elif title.find('Featured list candidates') > -1: |
368 | | - title_meta['category'] = 'Featured Lists' |
369 | | - elif title.find('Featured portal candidates') > -1: |
370 | | - title_meta['category'] = 'Featured Portal' |
371 | | - elif title.find('Featured topic candidates') > -1: |
372 | | - title_meta['category'] = 'Featured Topic' |
373 | | - elif title.find('Good Article') > -1: |
374 | | - title_meta['category'] = 'Good Article' |
375 | | - return title_meta |
376 | | - |
377 | | - |
378 | | -def extract_username(contributor, xml_namespace): |
379 | | - contributor = contributor.find('%s%s' % (xml_namespace, 'username')) |
380 | | - if contributor != None: |
381 | | - return contributor.text |
382 | | - else: |
383 | | - return None |
384 | | - |
385 | | - |
386 | | -def determine_username_is_bot(contributor, bots, xml_namespace): |
387 | | - ''' |
388 | | - #contributor is an xml element containing the id of the contributor |
389 | | - @bots should have a dict with all the bot ids and bot names |
390 | | - @Return False if username id is not in bot dict id or True if username id |
391 | | - is a bot id. |
392 | | - ''' |
393 | | - username = contributor.find('%s%s' % (xml_namespace, 'username')) |
394 | | - if username == None: |
395 | | - return 0 |
396 | | - else: |
397 | | - if username.text in bots: |
398 | | - return 1 |
399 | | - else: |
400 | | - return 0 |
401 | | - |
402 | | - |
403 | | -def extract_contributor_id(revision, xml_namespace): |
404 | | - ''' |
405 | | - @contributor is the xml contributor node containing a number of attributes |
406 | | - Currently, we are only interested in registered contributors, hence we |
407 | | - ignore anonymous editors. |
408 | | - ''' |
409 | | - if revision.get('deleted'): |
410 | | - # ASK: Not sure if this is the best way to code deleted contributors. |
411 | | - return None |
412 | | - elem = revision.find('%s%s' % (xml_namespace, 'id')) |
413 | | - if elem != None: |
414 | | - return {'id':elem.text} |
415 | | - else: |
416 | | - elem = revision.find('%s%s' % (xml_namespace, 'ip')) |
417 | | - if elem == None or elem.text == None: |
418 | | - return None |
419 | | - elif validate_ip(elem.text) == False and validate_hostname(elem.text) == False: |
420 | | - return {'username':elem.text, 'id': elem.text} |
421 | | - else: |
422 | | - return None |
423 | | - |
424 | | - |
425 | | -def fix_revision_text(revision): |
426 | | - if revision.text == None: |
427 | | - revision.text = '' |
428 | | - return revision |
429 | | - |
430 | | - |
431 | | -def create_md5hash(text): |
432 | | - hash = {} |
433 | | - if text != None: |
434 | | - m = hashlib.md5() |
435 | | - m.update(text) |
436 | | - #echo m.digest() |
437 | | - hash['hash'] = m.hexdigest() |
438 | | - else: |
439 | | - hash['hash'] = -1 |
440 | | - return hash |
441 | | - |
442 | | - |
443 | | -def calculate_delta_article_size(size, text): |
444 | | - if 'prev_size' not in size: |
445 | | - size['prev_size'] = 0 |
446 | | - size['cur_size'] = len(text) |
447 | | - size['delta'] = len(text) |
448 | | - else: |
449 | | - size['prev_size'] = size['cur_size'] |
450 | | - delta = len(text) - size['prev_size'] |
451 | | - size['cur_size'] = len(text) |
452 | | - size['delta'] = delta |
453 | | - return size |
454 | | - |
455 | | - |
456 | | -def parse_contributor(revision, bots, xml_namespace): |
457 | | - username = extract_username(revision, xml_namespace) |
458 | | - user_id = extract_contributor_id(revision, xml_namespace) |
459 | | - bot = determine_username_is_bot(revision, bots, xml_namespace) |
460 | | - editor = {} |
461 | | - editor['username'] = username |
462 | | - editor['bot'] = bot |
463 | | - if user_id != None: |
464 | | - editor.update(user_id) |
465 | | - else: |
466 | | - editor = False |
467 | | - return editor |
468 | | - |
469 | | - |
470 | | -def determine_namespace(title, namespaces, include_ns, exclude_ns): |
471 | | - ''' |
472 | | - You can only determine whether an article belongs to the Main Namespace |
473 | | - by ruling out that it does not belong to any other namepace |
474 | | - ''' |
475 | | - ns = {} |
476 | | - if title != None: |
477 | | - for key in include_ns: |
478 | | - namespace = namespaces.get(key) |
479 | | - if namespace and title.startswith(namespace): |
480 | | - ns['namespace'] = key |
481 | | - if ns == {}: |
482 | | - for key in exclude_ns: |
483 | | - namespace = namespaces.get(key) |
484 | | - if namespace and title.startswith(namespace): |
485 | | - '''article does not belong to any of the include_ns |
486 | | - namespaces''' |
487 | | - ns = False |
488 | | - return ns |
489 | | - ns['namespace'] = 0 |
490 | | - else: |
491 | | - ns = False |
492 | | - return ns |
493 | | - |
494 | | - |
495 | | -def prefill_row(title, article_id, namespace): |
496 | | - row = {} |
497 | | - row['title'] = title |
498 | | - row['article_id'] = article_id |
499 | | - row.update(namespace) |
500 | | - return row |
501 | | - |
502 | | - |
503 | | -def is_revision_reverted(hash_cur, hashes): |
504 | | - revert = {} |
505 | | - if hash_cur in hashes and hash_cur != -1: |
506 | | - revert['revert'] = 1 |
507 | | - else: |
508 | | - revert['revert'] = 0 |
509 | | - return revert |
510 | | - |
511 | | - |
512 | | -def extract_revision_id(revision_id): |
513 | | - if revision_id != None: |
514 | | - return revision_id.text |
515 | | - else: |
516 | | - return None |
517 | | - |
518 | | - |
519 | | -def extract_comment_text(revision_id, revision): |
520 | | - comment = {} |
521 | | - text = revision.find('comment') |
522 | | - if text != None and text.text != None: |
523 | | - comment[revision_id] = text.text.encode('utf-8') |
524 | | - return comment |
525 | | - |
526 | | - |
527 | | -def create_namespace_dict(siteinfo, xml_namespace): |
528 | | - ''' |
529 | | - This function determines the local names of the different namespaces. |
530 | | - ''' |
531 | | - namespaces = {} |
532 | | - print 'Detected xml namespace: %s' % xml_namespace |
533 | | - print 'Constructing namespace dictionary' |
534 | | - elements = siteinfo.find('%s%s' % (xml_namespace, 'namespaces')) |
535 | | - for elem in elements.getchildren(): |
536 | | - key = int(elem.get('key')) |
537 | | - namespaces[key] = elem.text #extract_text(ns) |
538 | | - text = elem.text if elem.text != None else '' |
539 | | - try: |
540 | | - print key, text.encode('utf-8') |
541 | | - except UnicodeEncodeError: |
542 | | - print key |
543 | | - elem.clear() |
544 | | - siteinfo.clear() |
545 | | - if namespaces == {}: |
546 | | - sys.exit(-1) |
547 | | - return namespaces |
548 | | - |
549 | | - |
550 | | -def determine_xml_namespace(siteinfo): |
551 | | - ''' |
552 | | - This function determines the xml_namespace version |
553 | | - ''' |
554 | | - for elem in siteinfo : |
555 | | - if elem.tag.endswith('sitename'): |
556 | | - xml_namespace = elem.tag |
557 | | - pos = xml_namespace.find('sitename') |
558 | | - xml_namespace = xml_namespace[0:pos] |
559 | | - elem.clear() |
560 | | - return xml_namespace |
561 | | - else: |
562 | | - elem.clear() |
563 | | - |
564 | | - |
565 | | -def count_edits(article, counts, bots, xml_namespace): |
566 | | - title = parse_title(article['title']) |
567 | | - namespace = determine_namespace(title, {}, COUNT_EXCLUDE_NAMESPACE) |
568 | | - if namespace != False: |
569 | | - article_id = article['id'].text |
570 | | - revisions = article['revisions'] |
571 | | - for revision in revisions: |
572 | | - if revision == None: |
573 | | - #the entire revision is empty, weird. |
574 | | - continue |
575 | | - |
576 | | - contributor = revision.find('%s%s' % (xml_namespace, 'contributor')) |
577 | | - contributor = parse_contributor(contributor, bots) |
578 | | - if not contributor: |
579 | | - #editor is anonymous, ignore |
580 | | - continue |
581 | | - counts.setdefault(contributor['username'], 0) |
582 | | - counts[contributor['username']] += 1 |
583 | | - revision.clear() |
584 | | - return counts |
585 | | - |
586 | | - |
587 | | -def create_variables(article, cache, bots, xml_namespace, comments=False): |
588 | | - include_ns = {3: 'User Talk', |
589 | | - 5: 'Wikipedia Talk', |
590 | | - 1: 'Talk', |
591 | | - 2: 'User', |
592 | | - 4: 'Wikipedia'} |
593 | | - |
594 | | - title = parse_title(article['title']) |
595 | | - namespaces = article['namespaces'] |
596 | | - namespace = determine_namespace(title, namespaces, include_ns, EXCLUDE_NAMESPACE) |
597 | | - title_meta = parse_title_meta_data(title, namespace) |
598 | | - if namespace != False: |
599 | | - cache.count_articles += 1 |
600 | | - article_id = article['id'].text |
601 | | - article['id'].clear() |
602 | | - cache.articles[article_id] = title_meta |
603 | | - hashes = deque() |
604 | | - size = {} |
605 | | - revisions = article['revisions'] |
606 | | - for revision in revisions: |
607 | | - cache.count_revisions += 1 |
608 | | - if revision == None: |
609 | | - #the entire revision is empty, weird. |
610 | | - continue |
611 | | - #dump(revision) |
612 | | - contributor = revision.find('%s%s' % (xml_namespace, 'contributor')) |
613 | | - contributor = parse_contributor(contributor, bots, xml_namespace) |
614 | | - if not contributor: |
615 | | - #editor is anonymous, ignore |
616 | | - continue |
617 | | - revision_id = revision.find('%s%s' % (xml_namespace, 'id')) |
618 | | - revision_id = extract_revision_id(revision_id) |
619 | | - if revision_id == None: |
620 | | - #revision_id is missing, which is weird |
621 | | - continue |
622 | | - |
623 | | - row = prefill_row(title, article_id, namespace) |
624 | | - row['revision_id'] = revision_id |
625 | | - text = extract_revision_text(revision) |
626 | | - row.update(contributor) |
627 | | - |
628 | | - if comments: |
629 | | - comment = extract_comment_text(revision_id, revision) |
630 | | - cache.comments.update(comment) |
631 | | - |
632 | | - timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text |
633 | | - row['timestamp'] = timestamp |
634 | | - |
635 | | - hash = create_md5hash(text) |
636 | | - revert = is_revision_reverted(hash['hash'], hashes) |
637 | | - hashes.append(hash['hash']) |
638 | | - size = calculate_delta_article_size(size, text) |
639 | | - |
640 | | - row.update(hash) |
641 | | - row.update(size) |
642 | | - row.update(revert) |
643 | | - cache.add(row) |
644 | | - revision.clear() |
645 | | - |
646 | | - |
647 | | -def parse_xml(fh, rts): |
648 | | - context = iterparse(fh, events=('end',)) |
649 | | - context = iter(context) |
650 | | - |
651 | | - article = {} |
652 | | - article['revisions'] = [] |
653 | | - elements = [] |
654 | | - id = False |
655 | | - ns = False |
656 | | - |
657 | | - try: |
658 | | - for event, elem in context: |
659 | | - if event == 'end' and elem.tag.endswith('siteinfo'): |
660 | | - xml_namespace = determine_xml_namespace(elem) |
661 | | - namespaces = create_namespace_dict(elem, xml_namespace) |
662 | | - article['namespaces'] = namespaces |
663 | | - ns = True |
664 | | - elif event == 'end' and elem.tag.endswith('title'): |
665 | | - article['title'] = elem |
666 | | - elif event == 'end' and elem.tag.endswith('revision'): |
667 | | - article['revisions'].append(elem) |
668 | | - elif event == 'end' and elem.tag.endswith('id') and id == False: |
669 | | - article['id'] = elem |
670 | | - id = True |
671 | | - elif event == 'end' and elem.tag.endswith('page'): |
672 | | - yield article, xml_namespace |
673 | | - elem.clear() |
674 | | - article = {} |
675 | | - article['revisions'] = [] |
676 | | - article['namespaces'] = namespaces |
677 | | - id = False |
678 | | - #elif event == 'end' and ns == True: |
679 | | - # elem.clear() |
680 | | - except SyntaxError, error: |
681 | | - print 'Encountered invalid XML tag. Error message: %s' % error |
682 | | - dump(elem) |
683 | | - sys.exit(-1) |
684 | | - except IOError, error: |
685 | | - print '''Archive file is possibly corrupted. Please delete this archive |
686 | | - and retry downloading. Error message: %s''' % error |
687 | | - sys.exit(-1) |
688 | | - |
689 | | -def stream_raw_xml(input_queue, process_id, function, dataset, lock, rts): |
690 | | - bots = bot_detector.retrieve_bots(rts.language.code) |
691 | | - |
692 | | - t0 = datetime.datetime.now() |
693 | | - i = 0 |
694 | | - if dataset == 'training': |
695 | | - cache = Buffer(process_id, rts, lock) |
696 | | - else: |
697 | | - counts = {} |
698 | | - |
699 | | - while True: |
700 | | - filename = input_queue.get() |
701 | | - input_queue.task_done() |
702 | | - if filename == None: |
703 | | - print '%s files left in the queue' % input_queue.qsize() |
704 | | - break |
705 | | - |
706 | | - fh = file_utils.create_streaming_buffer(filename) |
707 | | - filename = os.path.split(filename)[1] |
708 | | - filename = os.path.splitext(filename)[0] |
709 | | - for article, xml_namespace in parse_xml(fh, rts): |
710 | | - if dataset == 'training': |
711 | | - function(article, cache, bots, xml_namespace) |
712 | | - elif dataset == 'prediction': |
713 | | - counts = function(article, counts, bots, xml_namespace) |
714 | | - i += 1 |
715 | | - if i % 10000 == 0: |
716 | | - print 'Worker %s parsed %s articles' % (process_id, i) |
717 | | - fh.close() |
718 | | - |
719 | | - t1 = datetime.datetime.now() |
720 | | - print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0)) |
721 | | - print 'There are %s files left in the queue' % (input_queue.qsize()) |
722 | | - t0 = t1 |
723 | | - |
724 | | - if dataset == 'training': |
725 | | - cache.close() |
726 | | - cache.summary() |
727 | | - else: |
728 | | - location = os.getcwd() |
729 | | - keys = counts.keys() |
730 | | - filename = 'counts_%s.csv' % filename |
731 | | - fh = file_utils.create_txt_filehandle(location, filename, 'w', 'utf-8') |
732 | | - file_utils.write_dict_to_csv(counts, fh, keys) |
733 | | - fh.close() |
734 | | - |
735 | | - filename = 'counts_%s.bin' % filename |
736 | | - file_utils.store_object(counts, location, filename) |
737 | | - |
738 | | - print 'Finished parsing Wikipedia dump files.' |
739 | | - |
740 | | - |
741 | | -def setup(rts): |
742 | | - ''' |
743 | | - Depending on the storage system selected (cassandra, csv or mongo) some |
744 | | - preparations are made including setting up namespaces and cleaning up old |
745 | | - files. |
746 | | - ''' |
747 | | - res = file_utils.delete_file(rts.txt, None, directory=True) |
748 | | - if res: |
749 | | - res = file_utils.create_directory(rts.txt) |
750 | | - |
751 | | - |
752 | | -def multiprocessor_launcher(function, dataset, lock, rts): |
753 | | - mgr = Manager() |
754 | | - open_handles = [] |
755 | | - open_handles = mgr.list(open_handles) |
756 | | - clock = CustomLock(lock, open_handles) |
757 | | - input_queue = JoinableQueue() |
758 | | - |
759 | | - files = file_utils.retrieve_file_list(rts.input_location) |
760 | | - |
761 | | - if len(files) > cpu_count(): |
762 | | - processors = cpu_count() - 1 |
763 | | - else: |
764 | | - processors = len(files) |
765 | | - |
766 | | - for filename in files: |
767 | | - filename = os.path.join(rts.input_location, filename) |
768 | | - print filename |
769 | | - input_queue.put(filename) |
770 | | - |
771 | | - for x in xrange(processors): |
772 | | - print 'Inserting poison pill %s...' % x |
773 | | - input_queue.put(None) |
774 | | - |
775 | | - extracters = [Process(target=stream_raw_xml, args=[input_queue, |
776 | | - process_id, function, |
777 | | - dataset, clock, rts]) |
778 | | - for process_id in xrange(processors)] |
779 | | - for extracter in extracters: |
780 | | - extracter.start() |
781 | | - |
782 | | - input_queue.join() |
783 | | - |
784 | | - |
785 | | -def launcher_training(): |
786 | | - ''' |
787 | | - Launcher for creating training dataset for data competition |
788 | | - ''' |
789 | | - path = '/mnt/wikipedia_dumps/batch2/' |
790 | | - function = create_variables |
791 | | - dataset = 'training' |
792 | | - rts = DummyRTS(path) |
793 | | - locks = [] |
794 | | - multiprocessor_launcher(function, dataset, locks, rts) |
795 | | - |
796 | | - |
797 | | -def launcher_prediction(): |
798 | | - ''' |
799 | | - Launcher for creating prediction dataset for datacompetition |
800 | | - ''' |
801 | | - path = '/mnt/wikipedia_dumps/batch1/' |
802 | | - function = count_edits |
803 | | - dataset = 'prediction' |
804 | | - rts = DummyRTS(path) |
805 | | - locks = [] |
806 | | - multiprocessor_launcher(function, dataset, locks, rts) |
807 | | - |
808 | | - |
809 | | -def launcher(rts): |
810 | | - ''' |
811 | | - This is the generic entry point for regular Wikilytics usage. |
812 | | - ''' |
813 | | - # launcher for creating regular mongo dataset |
814 | | - function = create_variables |
815 | | - dataset = 'training' |
816 | | - lock = RLock() |
817 | | - setup(rts) |
818 | | - multiprocessor_launcher(function, dataset, lock, rts) |
819 | | - |
820 | | - |
821 | | -if __name__ == '__main__': |
822 | | - launcher_training() |
823 | | - #launcher_prediction() |
824 | | - #launcher(rts) |
Index: trunk/tools/editor_trends/etl/variables.py |
— | — | @@ -0,0 +1,280 @@ |
| 2 | +#!/usr/bin/python |
| 3 | +# -*- coding: utf-8 -*- |
| 4 | +''' |
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | + |
| 17 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
| 18 | +__author__email = 'dvanliere at gmail dot com' |
| 19 | +__date__ = '2011-04-10' |
| 20 | +__version__ = '0.1' |
| 21 | + |
| 22 | +import hashlib |
| 23 | + |
| 24 | +def validate_hostname(address): |
| 25 | + ''' |
| 26 | + This is not a foolproof solution at all. The problem is that it's really |
| 27 | + hard to determine whether a string is a hostname or not **reliably**. This |
| 28 | + is a very fast rule of thumb. Will lead to false positives, |
| 29 | + but that's life :) |
| 30 | + ''' |
| 31 | + parts = address.split(".") |
| 32 | + if len(parts) > 2: |
| 33 | + return True |
| 34 | + else: |
| 35 | + return False |
| 36 | + |
| 37 | + |
| 38 | +def validate_ip(address): |
| 39 | + parts = address.split(".") |
| 40 | + if len(parts) != 4: |
| 41 | + return False |
| 42 | + parts = parts[:3] |
| 43 | + for item in parts: |
| 44 | + try: |
| 45 | + if not 0 <= int(item) <= 255: |
| 46 | + return False |
| 47 | + except ValueError: |
| 48 | + return False |
| 49 | + return True |
| 50 | + |
| 51 | + |
| 52 | +def extract_revision_text(revision): |
| 53 | + rev = revision.find('ns0:text') |
| 54 | + if rev != None: |
| 55 | + if rev.text == None: |
| 56 | + rev = fix_revision_text(revision) |
| 57 | + return rev.text.encode('utf-8') |
| 58 | + else: |
| 59 | + return '' |
| 60 | + |
| 61 | + |
| 62 | +def parse_title(title): |
| 63 | + return title.text |
| 64 | + |
| 65 | + |
| 66 | +def parse_title_meta_data(title, namespace): |
| 67 | + ''' |
| 68 | + This function categorizes an article to assist the Wikimedia Taxonomy |
| 69 | + project. See |
| 70 | + http://meta.wikimedia.org/wiki/Contribution_Taxonomy_Project/Research_Questions |
| 71 | + ''' |
| 72 | + title_meta = {} |
| 73 | + if not namespace: |
| 74 | + return title_meta |
| 75 | + |
| 76 | + title_meta['title'] = title |
| 77 | + ns = namespace['namespace'] |
| 78 | + title_meta['ns'] = ns |
| 79 | + if title.startswith('List of'): |
| 80 | + title_meta['category'] = 'List' |
| 81 | + elif ns == 4 or ns == 5: |
| 82 | + if title.find('Articles for deletion') > -1: |
| 83 | + title_meta['category'] = 'Deletion' |
| 84 | + elif title.find('Mediation Committee') > -1: |
| 85 | + title_meta['category'] = 'Mediation' |
| 86 | + elif title.find('Mediation Cabal') > -1: |
| 87 | + title_meta['category'] = 'Mediation' |
| 88 | + elif title.find('Arbitration') > -1: |
| 89 | + title_meta['category'] = 'Arbitration' |
| 90 | + elif title.find('Featured Articles') > -1: |
| 91 | + title_meta['category'] = 'Featured Article' |
| 92 | + elif title.find('Featured picture candidates') > -1: |
| 93 | + title_meta['category'] = 'Featured Pictures' |
| 94 | + elif title.find('Featured sound candidates') > -1: |
| 95 | + title_meta['category'] = 'Featured Sounds' |
| 96 | + elif title.find('Featured list candidates') > -1: |
| 97 | + title_meta['category'] = 'Featured Lists' |
| 98 | + elif title.find('Featured portal candidates') > -1: |
| 99 | + title_meta['category'] = 'Featured Portal' |
| 100 | + elif title.find('Featured topic candidates') > -1: |
| 101 | + title_meta['category'] = 'Featured Topic' |
| 102 | + elif title.find('Good Article') > -1: |
| 103 | + title_meta['category'] = 'Good Article' |
| 104 | + return title_meta |
| 105 | + |
| 106 | + |
| 107 | +def extract_username(contributor, xml_namespace): |
| 108 | + contributor = contributor.find('%s%s' % (xml_namespace, 'username')) |
| 109 | + if contributor != None: |
| 110 | + return contributor.text |
| 111 | + else: |
| 112 | + return None |
| 113 | + |
| 114 | + |
| 115 | +def determine_username_is_bot(contributor, bots, xml_namespace): |
| 116 | + ''' |
| 117 | + #contributor is an xml element containing the id of the contributor |
| 118 | + @bots should have a dict with all the bot ids and bot names |
| 119 | + @Return False if username id is not in bot dict id or True if username id |
| 120 | + is a bot id. |
| 121 | + ''' |
| 122 | + username = contributor.find('%s%s' % (xml_namespace, 'username')) |
| 123 | + if username == None: |
| 124 | + return 0 |
| 125 | + else: |
| 126 | + if username.text in bots: |
| 127 | + return 1 |
| 128 | + else: |
| 129 | + return 0 |
| 130 | + |
| 131 | + |
| 132 | +def extract_contributor_id(revision, xml_namespace): |
| 133 | + ''' |
| 134 | + @contributor is the xml contributor node containing a number of attributes |
| 135 | + Currently, we are only interested in registered contributors, hence we |
| 136 | + ignore anonymous editors. |
| 137 | + ''' |
| 138 | + if revision.get('deleted'): |
| 139 | + # ASK: Not sure if this is the best way to code deleted contributors. |
| 140 | + return None |
| 141 | + elem = revision.find('%s%s' % (xml_namespace, 'id')) |
| 142 | + if elem != None: |
| 143 | + return {'id':elem.text} |
| 144 | + else: |
| 145 | + elem = revision.find('%s%s' % (xml_namespace, 'ip')) |
| 146 | + if elem == None or elem.text == None: |
| 147 | + return None |
| 148 | + elif validate_ip(elem.text) == False and validate_hostname(elem.text) == False: |
| 149 | + return {'username':elem.text, 'id': elem.text} |
| 150 | + else: |
| 151 | + return None |
| 152 | + |
| 153 | + |
| 154 | +def fix_revision_text(revision): |
| 155 | + if revision.text == None: |
| 156 | + revision.text = '' |
| 157 | + return revision |
| 158 | + |
| 159 | + |
| 160 | +def create_md5hash(text): |
| 161 | + hash = {} |
| 162 | + if text != None: |
| 163 | + m = hashlib.md5() |
| 164 | + m.update(text) |
| 165 | + #echo m.digest() |
| 166 | + hash['hash'] = m.hexdigest() |
| 167 | + else: |
| 168 | + hash['hash'] = -1 |
| 169 | + return hash |
| 170 | + |
| 171 | + |
| 172 | +def calculate_delta_article_size(size, text): |
| 173 | + if 'prev_size' not in size: |
| 174 | + size['prev_size'] = 0 |
| 175 | + size['cur_size'] = len(text) |
| 176 | + size['delta'] = len(text) |
| 177 | + else: |
| 178 | + size['prev_size'] = size['cur_size'] |
| 179 | + delta = len(text) - size['prev_size'] |
| 180 | + size['cur_size'] = len(text) |
| 181 | + size['delta'] = delta |
| 182 | + return size |
| 183 | + |
| 184 | + |
| 185 | +def parse_contributor(revision, bots, xml_namespace): |
| 186 | + username = extract_username(revision, xml_namespace) |
| 187 | + user_id = extract_contributor_id(revision, xml_namespace) |
| 188 | + bot = determine_username_is_bot(revision, bots, xml_namespace) |
| 189 | + editor = {} |
| 190 | + editor['username'] = username |
| 191 | + editor['bot'] = bot |
| 192 | + if user_id != None: |
| 193 | + editor.update(user_id) |
| 194 | + else: |
| 195 | + editor = False |
| 196 | + return editor |
| 197 | + |
| 198 | + |
| 199 | +def determine_namespace(title, namespaces, include_ns): |
| 200 | + ''' |
| 201 | + You can only determine whether an article belongs to the Main Namespace |
| 202 | + by ruling out that it does not belong to any other namepace |
| 203 | + ''' |
| 204 | + ns = {} |
| 205 | + if title != None: |
| 206 | + for key in include_ns: |
| 207 | + namespace = namespaces.pop(key, None) |
| 208 | + if namespace and title.startswith(namespace): |
| 209 | + ns['namespace'] = key |
| 210 | + if ns == {}: |
| 211 | + for namespace in namespaces.itervalues(): |
| 212 | + if namespace and title.startswith(namespace): |
| 213 | + '''article does not belong to any of the include_ns |
| 214 | + namespaces''' |
| 215 | + return False |
| 216 | + ns = 0 |
| 217 | + else: |
| 218 | + ns = False |
| 219 | + return ns |
| 220 | + |
| 221 | + |
| 222 | +def is_revision_reverted(hash_cur, hashes): |
| 223 | + revert = {} |
| 224 | + if hash_cur in hashes and hash_cur != -1: |
| 225 | + revert['revert'] = 1 |
| 226 | + else: |
| 227 | + revert['revert'] = 0 |
| 228 | + return revert |
| 229 | + |
| 230 | + |
| 231 | +def extract_revision_id(revision_id): |
| 232 | + if revision_id != None: |
| 233 | + return revision_id.text |
| 234 | + else: |
| 235 | + return None |
| 236 | + |
| 237 | + |
| 238 | +def extract_comment_text(revision_id, revision): |
| 239 | + comment = {} |
| 240 | + text = revision.find('comment') |
| 241 | + if text != None and text.text != None: |
| 242 | + comment[revision_id] = text.text.encode('utf-8') |
| 243 | + return comment |
| 244 | + |
| 245 | + |
| 246 | +def create_namespace_dict(siteinfo, xml_namespace): |
| 247 | + ''' |
| 248 | + This function determines the local names of the different namespaces. |
| 249 | + ''' |
| 250 | + namespaces = {} |
| 251 | + print 'Detected xml namespace: %s' % xml_namespace |
| 252 | + print 'Constructing namespace dictionary' |
| 253 | + elements = siteinfo.find('%s%s' % (xml_namespace, 'namespaces')) |
| 254 | + for elem in elements.getchildren(): |
| 255 | + key = int(elem.get('key')) |
| 256 | + namespaces[key] = elem.text #extract_text(ns) |
| 257 | + text = elem.text if elem.text != None else '' |
| 258 | + try: |
| 259 | + print key, text.encode('utf-8') |
| 260 | + except UnicodeEncodeError: |
| 261 | + print key |
| 262 | + elem.clear() |
| 263 | + if namespaces == {}: |
| 264 | + print 'Could not determine namespaces. Exiting.' |
| 265 | + sys.exit(-1) |
| 266 | + return namespaces |
| 267 | + |
| 268 | + |
| 269 | +def determine_xml_namespace(siteinfo): |
| 270 | + ''' |
| 271 | + This function determines the xml_namespace version |
| 272 | + ''' |
| 273 | + for elem in siteinfo : |
| 274 | + if elem.tag.endswith('sitename'): |
| 275 | + xml_namespace = elem.tag |
| 276 | + pos = xml_namespace.find('sitename') |
| 277 | + xml_namespace = xml_namespace[0:pos] |
| 278 | + elem.clear() |
| 279 | + return xml_namespace |
| 280 | + else: |
| 281 | + elem.clear() |
Property changes on: trunk/tools/editor_trends/etl/variables.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 282 | + native |
Index: trunk/tools/editor_trends/etl/store.py |
— | — | @@ -108,24 +108,27 @@ |
109 | 109 | db.add_index('category') |
110 | 110 | |
111 | 111 | location = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt') |
112 | | - fh = file_utils.create_txt_filehandle(location, 'titles.csv', 'r', 'utf-8') |
113 | | - print 'Storing article titles...' |
114 | | - for line in fh: |
115 | | - line = line.strip() |
116 | | - #print line.encode('utf-8') |
117 | | - line = line.split('\t') |
118 | | - data = {} |
119 | | - x, y = 0, 1 |
120 | | - while y < len(line): |
121 | | - key, value = line[x], line[y] |
122 | | - data[key] = value |
123 | | - x += 2 |
124 | | - y += 2 |
125 | | - db.insert(data) |
126 | | - fh.close() |
127 | | - print 'Done...' |
| 112 | + files = file_utils.retrieve_file_list(rts.txt, extension='csv', mask='article') |
128 | 113 | |
| 114 | + print 'Storing article...' |
| 115 | + for file in files: |
| 116 | + print file |
| 117 | + fh = file_utils.create_txt_filehandle(rts.txt, file, 'r', 'utf-8') |
| 118 | + for line in fh: |
| 119 | + line = line.strip() |
| 120 | + line = line.split('\t') |
| 121 | + data = {} |
| 122 | + x, y = 0, 1 |
| 123 | + while y < len(line): |
| 124 | + key, value = line[x], line[y] |
| 125 | + data[key] = value |
| 126 | + x += 2 |
| 127 | + y += 2 |
| 128 | + db.insert(data) |
| 129 | + fh.close() |
| 130 | + print 'Done storing articles...' |
129 | 131 | |
| 132 | + |
130 | 133 | def launcher(rts): |
131 | 134 | ''' |
132 | 135 | This is the main entry point and creates a number of workers and launches |
Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -12,460 +12,277 @@ |
13 | 13 | http://www.fsf.org/licenses/gpl.html |
14 | 14 | ''' |
15 | 15 | |
| 16 | + |
16 | 17 | __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
17 | | -__email__ = 'dvanliere at gmail dot com' |
18 | | -__date__ = '2010-12-13' |
| 18 | +__author__email = 'dvanliere at gmail dot com' |
| 19 | +__date__ = '2011-04-10' |
19 | 20 | __version__ = '0.1' |
20 | 21 | |
| 22 | + |
| 23 | +from collections import deque |
21 | 24 | import sys |
22 | | -import re |
23 | | -import os |
24 | | -import multiprocessing |
25 | | -import progressbar |
26 | | -from Queue import Empty |
| 25 | +from xml.etree.cElementTree import iterparse, dump |
| 26 | +from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager |
27 | 27 | |
28 | 28 | if '..' not in sys.path: |
29 | 29 | sys.path.append('..') |
30 | 30 | |
31 | | -import wikitree.parser |
32 | | -from bots import detector |
33 | | -from utils import file_utils |
34 | | -from utils import compression |
35 | | -from utils import log |
| 31 | +from etl import variables |
| 32 | +from classes import buffer |
| 33 | +from analyses.adhoc import bot_detector |
36 | 34 | |
37 | | -try: |
38 | | - import psyco |
39 | | - psyco.full() |
40 | | -except ImportError: |
41 | | - pass |
| 35 | +def parse_revision(revision, article, xml_namespace, cache, bots, md5hashes, size): |
| 36 | + if revision == None: |
| 37 | + #the entire revision is empty, weird. |
| 38 | + #dump(revision) |
| 39 | + return md5hashes, size |
42 | 40 | |
| 41 | + contributor = revision.find('%s%s' % (xml_namespace, 'contributor')) |
| 42 | + contributor = variables.parse_contributor(contributor, bots, xml_namespace) |
| 43 | + if not contributor: |
| 44 | + #editor is anonymous, ignore |
| 45 | + return md5hashes, size |
43 | 46 | |
44 | | -RE_NUMERIC_CHARACTER = re.compile('&#(\d+);') |
| 47 | + revision_id = revision.find('%s%s' % (xml_namespace, 'id')) |
| 48 | + revision_id = variables.extract_revision_id(revision_id) |
| 49 | + if revision_id == None: |
| 50 | + #revision_id is missing, which is weird |
| 51 | + return md5hashes, size |
45 | 52 | |
| 53 | + article['revision_id'] = revision_id |
| 54 | + text = variables.extract_revision_text(revision) |
| 55 | + article.update(contributor) |
46 | 56 | |
47 | | -def remove_numeric_character_references(rts, text): |
48 | | - return re.sub(RE_NUMERIC_CHARACTER, lenient_deccharref, text).encode('utf-8') |
| 57 | + comment = variables.extract_comment_text(revision_id, revision) |
| 58 | + cache.comments.update(comment) |
49 | 59 | |
| 60 | + timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text |
| 61 | + article['timestamp'] = timestamp |
50 | 62 | |
51 | | -def lenient_deccharref(m): |
52 | | - try: |
53 | | - return unichr(int(m.group(1))) |
54 | | - except ValueError: |
55 | | - ''' |
56 | | - There are a few articles that raise a Value Error here, the reason is |
57 | | - that I am using a narrow Python build (UCS2) instead of a wide build |
58 | | - (UCS4). The quick fix is to return an empty string... |
59 | | - Real solution is to rebuild Python with UCS4 support..... |
60 | | - ''' |
61 | | - return '' |
| 63 | + hash = variables.create_md5hash(text) |
| 64 | + revert = variables.is_revision_reverted(hash['hash'], md5hashes) |
| 65 | + md5hashes.append(hash['hash']) |
| 66 | + size = variables.calculate_delta_article_size(size, text) |
62 | 67 | |
| 68 | + article.update(hash) |
| 69 | + article.update(size) |
| 70 | + article.update(revert) |
| 71 | + cache.add(article) |
| 72 | + return md5hashes, size |
63 | 73 | |
64 | | -def build_namespaces_locale(namespaces, include=['0']): |
65 | | - ''' |
66 | | - @include is a list of namespace keys that should not be ignored, the default |
67 | | - setting is to ignore all namespaces except the main namespace. |
68 | | - ''' |
69 | | - ns = {} |
70 | | - for key, value in namespaces.iteritems(): |
71 | | - if key in include: |
72 | | - #value = namespaces[namespace].get(u'*', None) |
73 | | - #ns.append(value) |
74 | | - ns[key] = value |
75 | | - return ns |
| 74 | +def setup_parser(rts): |
| 75 | + bots = '' #bot_detector.retrieve_bots(rts.language.code) |
| 76 | + start = 'start'; end = 'end' |
| 77 | + context = iterparse(fh, events=(start, end)) |
| 78 | + context = iter(context) |
76 | 79 | |
| 80 | + include_ns = {3: 'User Talk', |
| 81 | + 5: 'Wikipedia Talk', |
| 82 | + 1: 'Talk', |
| 83 | + 2: 'User', |
| 84 | + 4: 'Wikipedia'} |
77 | 85 | |
78 | | -def parse_comments(rts, revisions, function): |
79 | | - for revision in revisions: |
80 | | - comment = revision.find('{%s}comment' % rts.xml_namespace) |
81 | | - if comment != None and comment.text != None: |
82 | | - comment.text = function(comment.text) |
83 | | - return revisions |
| 86 | + return bots, context, include_ns |
84 | 87 | |
85 | 88 | |
86 | | -def parse_article(elem, namespaces): |
87 | | - ''' |
88 | | - @namespaces is a list of valid namespaces that should be included in the analysis |
89 | | - if the article should be ignored then this function returns false, else it returns |
90 | | - the namespace identifier and namespace name. |
91 | | - ''' |
92 | | - title = elem.text |
93 | | - if title == None: |
94 | | - return False |
95 | | - ns = title.split(':') |
96 | | - if len(ns) == 1 and '0' in namespaces: |
97 | | - return {'id': 0, 'name': 'main namespace'} |
98 | | - else: |
99 | | - if ns[0] in namespaces.values(): |
100 | | - #print namespaces, title.decode('utf-8'), ns |
101 | | - return {'id': ns[0], 'name': ns[1]} |
102 | | - else: |
103 | | - return False |
| 89 | +def datacompetition_count_edits(rts, file_id): |
| 90 | + bots, context, include_ns = setup_parser(rts) |
| 91 | + counts = {} |
| 92 | + id = False |
| 93 | + ns = False |
| 94 | + parse = False |
104 | 95 | |
| 96 | + try: |
| 97 | + for event, elem in context: |
| 98 | + if event is end and elem.tag.endswith('siteinfo'): |
| 99 | + xml_namespace = variables.determine_xml_namespace(elem) |
| 100 | + namespaces = variables.create_namespace_dict(elem, xml_namespace) |
| 101 | + ns = True |
| 102 | + elem.clear() |
105 | 103 | |
106 | | -def validate_hostname(address): |
107 | | - ''' |
108 | | - This is not a foolproof solution at all. The problem is that it's really |
109 | | - hard to determine whether a string is a hostname or not **reliably**. This |
110 | | - is a very fast rule of thumb. Will lead to false positives, |
111 | | - but that's life :) |
112 | | - ''' |
113 | | - parts = address.split(".") |
114 | | - if len(parts) > 2: |
115 | | - return True |
116 | | - else: |
117 | | - return False |
| 104 | + elif event is end and elem.tag.endswith('title'): |
| 105 | + title = variables.parse_title(elem) |
| 106 | + article['title'] = title |
| 107 | + current_namespace = variables.determine_namespace(title, namespaces, include_ns) |
| 108 | + if current_namespace != False: |
| 109 | + parse = True |
| 110 | + article['namespace'] = current_namespace |
| 111 | + cache.count_articles += 1 |
| 112 | + elem.clear() |
118 | 113 | |
| 114 | + elif elem.tag.endswith('revision') and parse == True: |
| 115 | + if event is start: |
| 116 | + clear = False |
| 117 | + else: |
| 118 | + print 'IMPLEMENT' |
| 119 | + #md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size) |
| 120 | + cache.count_revisions += 1 |
| 121 | + clear = True |
| 122 | + if clear: |
| 123 | + elem.clear() |
119 | 124 | |
120 | | -def validate_ip(address): |
121 | | - parts = address.split(".") |
122 | | - if len(parts) != 4: |
123 | | - return False |
124 | | - parts = parts[:3] |
125 | | - for item in parts: |
126 | | - try: |
127 | | - if not 0 <= int(item) <= 255: |
128 | | - return False |
129 | | - except ValueError: |
130 | | - return False |
131 | | - return True |
| 125 | + elif event is end and elem.tag.endswith('page'): |
| 126 | + elem.clear() |
| 127 | + #Reset all variables for next article |
| 128 | + id = False |
| 129 | + parse = False |
132 | 130 | |
| 131 | + except SyntaxError, error: |
| 132 | + print 'Encountered invalid XML tag. Error message: %s' % error |
| 133 | + dump(elem) |
| 134 | + sys.exit(-1) |
| 135 | + except IOError, error: |
| 136 | + print '''Archive file is possibly corrupted. Please delete this archive |
| 137 | + and retry downloading. Error message: %s''' % error |
| 138 | + sys.exit(-1) |
133 | 139 | |
134 | | -def determine_username_is_bot(contributor, **kwargs): |
135 | | - ''' |
136 | | - #contributor is an xml element containing the id of the contributor |
137 | | - @bots should have a dict with all the bot ids and bot names |
138 | | - @Return False if username id is not in bot dict id or True if username id |
139 | | - is a bot id. |
140 | | - ''' |
141 | | - bots = kwargs.get('bots') |
142 | | - username = contributor.find('username') |
143 | | - if username == None: |
144 | | - return 0 |
145 | | - else: |
146 | | - if username.text in bots: |
147 | | - return 1 |
148 | | - else: |
149 | | - return 0 |
| 140 | + filename = 'counts_kaggle_%s.csv' % file_id |
| 141 | + fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8') |
| 142 | + file_utils.write_dict_to_csv(counts, fh, keys) |
| 143 | + fh.close() |
150 | 144 | |
| 145 | + filename = 'counts_kaggle_%s.bin' % file_id |
| 146 | + file_utils.store_object(counts, location, filename) |
151 | 147 | |
152 | | -def extract_username(contributor, **kwargs): |
153 | | - contributor = contributor.find('username') |
154 | | - if contributor != None and contributor.text != None: |
155 | | - contributor = contributor.text.encode('utf-8') |
156 | | - return contributor.decode('utf-8') |
157 | | - else: |
158 | | - return None |
159 | 148 | |
| 149 | +def parse_xml(fh, cache, rts): |
| 150 | + bots, context, include_ns = setup_parser(rts) |
| 151 | + article = {} |
| 152 | + size = {} |
| 153 | + id = False |
| 154 | + ns = False |
| 155 | + parse = False |
160 | 156 | |
161 | | -def extract_revision_id(revision_id, **kwargs): |
162 | | - if revision_id != None: |
163 | | - return revision_id.text |
164 | | - else: |
165 | | - return None |
| 157 | + try: |
| 158 | + for event, elem in context: |
| 159 | + if event is end and elem.tag.endswith('siteinfo'): |
| 160 | + xml_namespace = variables.determine_xml_namespace(elem) |
| 161 | + namespaces = variables.create_namespace_dict(elem, xml_namespace) |
| 162 | + ns = True |
| 163 | + elem.clear() |
166 | 164 | |
| 165 | + elif event is end and elem.tag.endswith('title'): |
| 166 | + title = variables.parse_title(elem) |
| 167 | + article['title'] = title |
| 168 | + current_namespace = variables.determine_namespace(title, namespaces, include_ns) |
| 169 | + title_meta = variables.parse_title_meta_data(title, current_namespace) |
| 170 | + if current_namespace != False: |
| 171 | + parse = True |
| 172 | + article['namespace'] = current_namespace |
| 173 | + cache.count_articles += 1 |
| 174 | + md5hashes = deque() |
| 175 | + elem.clear() |
167 | 176 | |
168 | | -def extract_contributor_id(contributor, **kwargs): |
169 | | - ''' |
170 | | - @contributor is the xml contributor node containing a number of attributes |
171 | | - Currently, we are only interested in registered contributors, hence we |
172 | | - ignore anonymous editors. |
173 | | - ''' |
174 | | - if contributor.get('deleted'): |
175 | | - # ASK: Not sure if this is the best way to code deleted contributors. |
176 | | - return None |
177 | | - elem = contributor.find('id') |
178 | | - if elem != None: |
179 | | - return {'id':elem.text} |
180 | | - else: |
181 | | - elem = contributor.find('ip') |
182 | | - if elem != None and elem.text != None \ |
183 | | - and validate_ip(elem.text) == False \ |
184 | | - and validate_hostname(elem.text) == False: |
185 | | - return {'username':elem.text, 'id': elem.text} |
186 | | - else: |
187 | | - return None |
| 177 | + elif elem.tag.endswith('revision') and parse == True: |
| 178 | + if event is start: |
| 179 | + clear = False |
| 180 | + else: |
| 181 | + md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size) |
| 182 | + cache.count_revisions += 1 |
| 183 | + clear = True |
| 184 | + if clear: |
| 185 | + elem.clear() |
188 | 186 | |
| 187 | + elif event is end and elem.tag.endswith('id') and id == False: |
| 188 | + article['article_id'] = elem.text |
| 189 | + if current_namespace: |
| 190 | + cache.articles[article['article_id']] = title_meta |
| 191 | + id = True |
| 192 | + elem.clear() |
189 | 193 | |
190 | | -def parse_title(title): |
191 | | - title_data = {} |
192 | | - t1 = type(title.text) |
193 | | - if type(title.text) != type('str'): |
194 | | - print 'encodign' |
195 | | - print title.text.encode('utf-8') |
196 | | - title = title.text.encode('utf-8') |
197 | | - title = title.decode('utf-8', 'ignore') |
198 | | - print title |
199 | | - else: |
200 | | - title = title.text |
| 194 | + elif event is end and elem.tag.endswith('page'): |
| 195 | + elem.clear() |
| 196 | + #Reset all variables for next article |
| 197 | + article = {} |
| 198 | + size = {} |
| 199 | + md5hashes = deque() |
| 200 | + id = False |
| 201 | + parse = False |
201 | 202 | |
202 | | - #title = title.encode('utf-8') |
203 | | - title_data['title'] = title #.decode('utf-8') |
204 | | - t2 = type(title_data['title']) |
205 | | - print t1, t2 |
206 | | - #title_data['title'] = title |
207 | | - if title_data['title'].startswith('List of'): |
208 | | - title_data['list'] = True |
209 | | - else: |
210 | | - title_data['list'] = False |
211 | | - return title_data |
212 | 203 | |
| 204 | + except SyntaxError, error: |
| 205 | + print 'Encountered invalid XML tag. Error message: %s' % error |
| 206 | + dump(elem) |
| 207 | + sys.exit(-1) |
| 208 | + except IOError, error: |
| 209 | + print '''Archive file is possibly corrupted. Please delete this archive |
| 210 | + and retry downloading. Error message: %s''' % error |
| 211 | + sys.exit(-1) |
213 | 212 | |
214 | | -def output_editor_information(revisions, page, bots, rts): |
215 | | - ''' |
216 | | - @elem is an XML element containing 1 revision from a page |
217 | | - @output is where to store the data, a filehandle |
218 | | - @**kwargs contains extra information |
219 | | - |
220 | | - the variable tags determines which attributes are being parsed, the values |
221 | | - in this dictionary are the functions used to extract the data. |
222 | | - ''' |
223 | | - headers = ['id', 'date', 'article', 'username', 'revision_id'] |
224 | | - tags = {'contributor': {'id': extract_contributor_id, |
225 | | - 'bot': determine_username_is_bot, |
226 | | - 'username': extract_username, |
227 | | - }, |
228 | | - 'timestamp': {'date': wikitree.parser.extract_text}, |
229 | | - 'id': {'revision_id': extract_revision_id, |
230 | | - } |
231 | | - } |
232 | | - vars = {} |
233 | | - flat = [] |
234 | 213 | |
235 | | - for x, revision in enumerate(revisions): |
236 | | - vars[x] = {} |
237 | | - vars[x]['article'] = page |
238 | | - for tag in tags: |
239 | | - el = revision.find('%s' % tag) |
240 | | - if el == None: |
241 | | - #print cElementTree.tostring(revision, 'utf-8') |
242 | | - del vars[x] |
243 | | - break |
244 | | - for function in tags[tag].keys(): |
245 | | - f = tags[tag][function] |
246 | | - value = f(el, bots=bots) |
247 | | - if isinstance(value, dict): |
248 | | - for kw in value: |
249 | | - vars[x][kw] = value[kw] |
250 | | - else: |
251 | | - vars[x][function] = value |
| 214 | +def stream_raw_xml(input_queue, process_id, lock, rts): |
| 215 | + t0 = datetime.datetime.now() |
| 216 | + i = 0 |
| 217 | + file_id = 0 |
| 218 | + cache = buffer.CSVBuffer(process_id, rts, lock) |
252 | 219 | |
253 | | - ''' |
254 | | - This loop determines for each observation whether it should be stored |
255 | | - or not. |
256 | | - ''' |
257 | | - for x in vars: |
258 | | - if vars[x]['bot'] == 1 or vars[x]['id'] == None \ |
259 | | - or vars[x]['username'] == None: |
260 | | - continue |
261 | | - else: |
262 | | - f = [] |
263 | | - for head in headers: |
264 | | - f.append(vars[x][head]) |
265 | | - flat.append(f) |
266 | | - return flat |
267 | | - |
268 | | - |
269 | | -def add_namespace_to_output(output, namespace): |
270 | | - for x, o in enumerate(output): |
271 | | - o.append(namespace['id']) |
272 | | - output[x] = o |
273 | | - return output |
274 | | - |
275 | | - |
276 | | -def parse_dumpfile(tasks, rts, lock): |
277 | | - bot_ids = detector.retrieve_bots(rts.language.code) |
278 | | - location = os.path.join(rts.input_location, rts.language.code, rts.project.name) |
279 | | - output = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt') |
280 | | - widgets = log.init_progressbar_widgets('Extracting data') |
281 | | - filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 'a', |
282 | | - 'utf-8') for fh in xrange(rts.max_filehandles)] |
283 | 220 | while True: |
284 | | - total, processed = 0.0, 0.0 |
285 | | - try: |
286 | | - filename = tasks.get(block=False) |
287 | | - except Empty: |
288 | | - break |
289 | | - finally: |
290 | | - print tasks.qsize() |
291 | | - tasks.task_done() |
292 | | - |
| 221 | + filename = input_queue.get() |
| 222 | + input_queue.task_done() |
| 223 | + file_id += 1 |
293 | 224 | if filename == None: |
294 | | - print '\nThere are no more jobs in the queue left.' |
| 225 | + print '%s files left in the queue' % input_queue.qsize() |
295 | 226 | break |
296 | 227 | |
297 | | - filesize = file_utils.determine_filesize(location, filename) |
298 | | - print 'Opening %s...' % (os.path.join(location, filename)) |
299 | | - print 'Filesize: %s' % filesize |
300 | | - fh1 = file_utils.create_txt_filehandle(location, filename, 'r', 'ascii') |
301 | | - fh2 = file_utils.create_txt_filehandle(location, 'articles.csv', 'a', 'utf-8') |
302 | | - ns, xml_namespace = wikitree.parser.extract_meta_information(fh1) |
303 | | - ns = build_namespaces_locale(ns, rts.namespaces) |
304 | | - rts.xml_namespace = xml_namespace |
| 228 | + fh = file_utils.create_streaming_buffer(filename) |
305 | 229 | |
306 | | - pbar = progressbar.ProgressBar(widgets=widgets, maxval=filesize).start() |
307 | | - for page, article_size in wikitree.parser.read_input(fh1): |
308 | | - title = page.find('title') |
309 | | - total += 1 |
310 | | - namespace = parse_article(title, ns) |
311 | | - if namespace != False: |
312 | | - article_id = page.find('id').text |
313 | | - title = parse_title(title) |
314 | | - revisions = page.findall('revision') |
315 | | - revisions = parse_comments(rts, revisions, remove_numeric_character_references) |
316 | | - output = output_editor_information(revisions, article_id, bot_ids, rts) |
317 | | - output = add_namespace_to_output(output, namespace) |
318 | | - write_output(output, filehandles, lock, rts) |
319 | | - #file_utils.write_list_to_csv([article_id, title.values()], fh2, newline=False, lock=lock) |
320 | | - processed += 1 |
321 | | - page.clear() |
322 | | - pbar.update(pbar.currval + article_size) |
| 230 | + if rts.kaggle: |
| 231 | + datacompetition_count_edits(rts, file_id) |
| 232 | + else: |
| 233 | + parse_xml(fh, cache, rts) |
323 | 234 | |
324 | | - fh1.close() |
325 | | - fh2.close() |
326 | | - print 'Closing %s...' % (os.path.join(location, filename)) |
327 | | - print 'Total pages: %s' % total |
328 | | - try: |
329 | | - print 'Pages processed: %s (%s)' % (processed, processed / total) |
330 | | - except ZeroDivisionError: |
331 | | - print 'Pages processed: %s' % processed |
| 235 | + i += 1 |
| 236 | + if i % 10000 == 0: |
| 237 | + print 'Worker %s parsed %s articles' % (process_id, i) |
| 238 | + fh.close() |
332 | 239 | |
333 | | - return True |
| 240 | + t1 = datetime.datetime.now() |
| 241 | + print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0)) |
| 242 | + print 'There are %s files left in the queue' % (input_queue.qsize()) |
| 243 | + t0 = t1 |
334 | 244 | |
| 245 | + if rts.kaggle: |
| 246 | + cache.close() |
| 247 | + cache.summary() |
335 | 248 | |
336 | | -def group_observations(obs): |
337 | | - ''' |
338 | | - This function groups observation by editor id, this way we have to make |
339 | | - fewer fileopening calls. |
340 | | - ''' |
341 | | - d = {} |
342 | | - for o in obs: |
343 | | - id = o[0] |
344 | | - if id not in d: |
345 | | - d[id] = [] |
346 | | - #if len(o) == 6: |
347 | | - d[id].append(o) |
348 | | - return d |
349 | 249 | |
| 250 | +def debug(): |
| 251 | + fh = 'c:\\wikimedia\sv\wiki\svwiki-latest-stub-meta-history.xml' |
350 | 252 | |
351 | | -def write_output(observations, filehandles, lock, rts): |
352 | | - observations = group_observations(observations) |
353 | | - for obs in observations: |
354 | | - #lock the write around all edits of an editor for a particular page |
355 | | - lock.acquire() |
356 | | - try: |
357 | | - for i, o in enumerate(observations[obs]): |
358 | | - if i == 0: |
359 | | - fh = filehandles[hash(rts, obs)] |
360 | | - file_utils.write_list_to_csv(o, fh) |
361 | 253 | |
362 | | - except Exception, error: |
363 | | - print 'Encountered the following error while writing data to %s: %s' % (error, fh) |
364 | | - finally: |
365 | | - lock.release() |
366 | | - |
367 | | - |
368 | | -def hash(rts, id): |
369 | | - ''' |
370 | | - A very simple hash function based on modulo. The except clause has been |
371 | | - added because there are instances where the username is stored in userid |
372 | | - tag and hence that's a string and not an integer. |
373 | | - ''' |
374 | | - try: |
375 | | - return int(id) % rts.max_filehandles |
376 | | - except ValueError: |
377 | | - return sum([ord(i) for i in id]) % rts.max_filehandles |
378 | | - |
379 | | - |
380 | | -def prepare(rts): |
381 | | - output_articles = os.path.join(rts.input_location, rts.language.code, |
382 | | - rts.project.name) |
383 | | - output_txt = os.path.join(rts.input_location, rts.language.code, |
384 | | - rts.project.name, 'txt') |
385 | | - res = file_utils.delete_file(output_articles, 'articles.csv') |
386 | | - res = file_utils.delete_file(output_txt, None, directory=True) |
387 | | - if res: |
388 | | - res = file_utils.create_directory(output_txt) |
389 | | - return res |
390 | | - |
391 | | - |
392 | | -def unzip(properties): |
393 | | - tasks = multiprocessing.JoinableQueue() |
394 | | - canonical_filename = file_utils.determine_canonical_name(properties.filename) |
395 | | - extension = file_utils.determine_file_extension(properties.filename) |
396 | | - files = file_utils.retrieve_file_list(properties.location, |
397 | | - extension, |
398 | | - mask=canonical_filename) |
399 | | - print properties.location |
400 | | - print 'Checking if dump file has been extracted...' |
401 | | - for fn in files: |
402 | | - file_without_ext = fn.replace('%s%s' % ('.', extension), '') |
403 | | - result = file_utils.check_file_exists(properties.location, file_without_ext) |
404 | | - if not result: |
405 | | - print 'Dump file %s has not yet been extracted...' % fn |
406 | | - retcode = compression.launch_zip_extractor(properties.location, |
407 | | - fn, |
408 | | - properties) |
409 | | - else: |
410 | | - print 'Dump file has already been extracted...' |
411 | | - retcode = 0 |
412 | | - if retcode == 0: |
413 | | - tasks.put(file_without_ext) |
414 | | - elif retcode != 0: |
415 | | - print 'There was an error while extracting %s, please make sure \ |
416 | | - that %s is valid archive.' % (fn, fn) |
417 | | - return False |
418 | | - return tasks |
419 | | - |
420 | | - |
421 | 254 | def launcher(rts): |
422 | | - ''' |
423 | | - This is the main entry point for the extact phase of the data processing |
424 | | - chain. First, it will put a the files that need to be extracted in a queue |
425 | | - called tasks, then it will remove some old files to make sure that there is |
426 | | - no data pollution and finally it will start the parser to actually extract |
427 | | - the variables from the different dump files. |
428 | | - ''' |
429 | | - tasks = unzip(rts) |
430 | | - if not tasks: |
431 | | - return False |
| 255 | + lock = RLock() |
| 256 | + mgr = Manager() |
| 257 | + open_handles = [] |
| 258 | + open_handles = mgr.list(open_handles) |
| 259 | + clock = CustomLock(lock, open_handles) |
| 260 | + input_queue = JoinableQueue() |
432 | 261 | |
433 | | - result = prepare(rts) |
434 | | - if not result: |
435 | | - return result |
| 262 | + files = file_utils.retrieve_file_list(rts.input_location) |
436 | 263 | |
437 | | - lock = multiprocessing.Lock() |
| 264 | + if len(files) > cpu_count(): |
| 265 | + processors = cpu_count() - 1 |
| 266 | + else: |
| 267 | + processors = len(files) |
438 | 268 | |
439 | | - consumers = [multiprocessing.Process(target=parse_dumpfile, |
440 | | - args=(tasks, |
441 | | - rts, |
442 | | - lock)) |
443 | | - for x in xrange(rts.number_of_processes)] |
| 269 | + for filename in files: |
| 270 | + filename = os.path.join(rts.input_location, filename) |
| 271 | + print filename |
| 272 | + input_queue.put(filename) |
444 | 273 | |
445 | | - for x in xrange(rts.number_of_processes): |
446 | | - tasks.put(None) |
| 274 | + for x in xrange(processors): |
| 275 | + print 'Inserting poison pill %s...' % x |
| 276 | + input_queue.put(None) |
447 | 277 | |
448 | | - for w in consumers: |
449 | | - print 'Launching process...' |
450 | | - w.start() |
| 278 | + extracters = [Process(target=stream_raw_xml, args=[input_queue, process_id, |
| 279 | + clock, rts]) |
| 280 | + for process_id in xrange(processors)] |
| 281 | + for extracter in extracters: |
| 282 | + extracter.start() |
451 | 283 | |
452 | | - tasks.join() |
| 284 | + input_queue.join() |
| 285 | + print 'Finished parsing Wikipedia dump files.' |
453 | 286 | |
454 | | - result = sum([consumer.exitcode for consumer in consumers |
455 | | - if consumer.exitcode != None]) |
456 | 287 | |
457 | | - if result == 0: |
458 | | - return True |
459 | | - else: |
460 | | - return False |
461 | | - |
462 | | - |
463 | | -def debug(): |
464 | | - pass |
465 | | - #project = 'wiki' |
466 | | - #language_code = 'sv' |
467 | | - #filename = 'svwiki-latest-stub-meta-history.xml' |
468 | | - #parse_dumpfile(project, filename, language_code) |
469 | | - |
470 | | - |
471 | 288 | if __name__ == '__main__': |
472 | 289 | debug() |
Index: trunk/tools/editor_trends/etl/sort.py |
— | — | @@ -45,7 +45,7 @@ |
46 | 46 | self.result.put(None) |
47 | 47 | break |
48 | 48 | elif filename.startswith('comments') or \ |
49 | | - filename.startswith('title'): |
| 49 | + filename.startswith('article'): |
50 | 50 | continue |
51 | 51 | fh = file_utils.create_txt_filehandle(self.rts.txt, |
52 | 52 | filename, |
— | — | @@ -148,7 +148,6 @@ |
149 | 149 | rts is an instance of RunTimeSettings |
150 | 150 | ''' |
151 | 151 | files = file_utils.retrieve_file_list(rts.txt, 'csv') |
152 | | - |
153 | 152 | pbar = progressbar.ProgressBar(maxval=len(files)).start() |
154 | 153 | tasks = multiprocessing.JoinableQueue() |
155 | 154 | result = multiprocessing.JoinableQueue() |
Index: trunk/tools/editor_trends/classes/buffer.py |
— | — | @@ -0,0 +1,183 @@ |
| 2 | +class CustomLock: |
| 3 | + def __init__(self, lock, open_handles): |
| 4 | + self.lock = lock |
| 5 | + self.open_handles = open_handles |
| 6 | + |
| 7 | + def available(self, handle): |
| 8 | + self.lock.acquire() |
| 9 | + try: |
| 10 | + self.open_handles.index(handle) |
| 11 | + #print 'RETRIEVED FILEHANDLE %s' % handle |
| 12 | + return False |
| 13 | + except (ValueError, Exception), error: |
| 14 | + self.open_handles.append(handle) |
| 15 | + #print 'ADDED FILEHANDLE %s' % handle |
| 16 | + return True |
| 17 | + finally: |
| 18 | + #print 'FIles locked: %s' % len(self.open_handles) |
| 19 | + self.lock.release() |
| 20 | + |
| 21 | + def release(self, handle): |
| 22 | + #print 'RELEASED FILEHANDLE %s' % handle |
| 23 | + self.open_handles.remove(handle) |
| 24 | + |
| 25 | + |
| 26 | +class CSVBuffer: |
| 27 | + def __init__(self, process_id, rts, lock): |
| 28 | + self.rts = rts |
| 29 | + self.lock = lock |
| 30 | + self.revisions = {} |
| 31 | + self.comments = {} |
| 32 | + self.articles = {} |
| 33 | + self.process_id = process_id |
| 34 | + self.count_articles = 0 |
| 35 | + self.count_revisions = 0 |
| 36 | + self.filehandles = [file_utils.create_txt_filehandle(self.rts.txt, |
| 37 | + file_id, 'a', 'utf-8') for file_id in xrange(self.rts.max_filehandles)] |
| 38 | + self.keys = ['revision_id', 'article_id', 'id', 'username', 'namespace', |
| 39 | + 'title', 'timestamp', 'hash', 'revert', 'bot', 'cur_size', |
| 40 | + 'delta'] |
| 41 | + self.fh_articles = file_utils.create_txt_filehandle(self.rts.txt, |
| 42 | + 'articles_%s' % self.process_id, 'w', 'utf-8') |
| 43 | + self.fh_comments = file_utils.create_txt_filehandle(self.rts.txt, |
| 44 | + 'comments_%s' % self.process_id, 'w', 'utf-8') |
| 45 | + |
| 46 | + def get_hash(self, id): |
| 47 | + ''' |
| 48 | + A very simple hash function based on modulo. The except clause has been |
| 49 | + added because there are instances where the username is stored in userid |
| 50 | + tag and hence that's a string and not an integer. |
| 51 | + ''' |
| 52 | + try: |
| 53 | + return int(id) % self.rts.max_filehandles |
| 54 | + except ValueError: |
| 55 | + return sum([ord(i) for i in id]) % self.rts.max_filehandles |
| 56 | + |
| 57 | + def invert_dictionary(self, editors): |
| 58 | + hashes = {} |
| 59 | + for editor, file_id in editors.iteritems(): |
| 60 | + hashes.setdefault(file_id, []) |
| 61 | + hashes[file_id].append(editor) |
| 62 | + return hashes |
| 63 | + |
| 64 | + def group_revisions_by_fileid(self): |
| 65 | + ''' |
| 66 | + This function groups observation by editor id and then by file_id, |
| 67 | + this way we have to make fewer file opening calls and should reduce |
| 68 | + processing time. |
| 69 | + ''' |
| 70 | + data = {} |
| 71 | + editors = {} |
| 72 | + #first, we group all revisions by editor |
| 73 | + |
| 74 | + for revision in self.revisions.values(): |
| 75 | + row = [] |
| 76 | + #strip away the keys and make sure that the values are always in the same sequence |
| 77 | + for key in self.keys: |
| 78 | + row.append(revision[key].decode('utf-8')) |
| 79 | + editor_id = row[0] |
| 80 | + data.setdefault(editor_id, []) |
| 81 | + data[editor_id].append(row) |
| 82 | + editors.setdefault(editor_id, self.get_hash(editor_id)) |
| 83 | + |
| 84 | + #now, we are going to group all editors by file_id |
| 85 | + file_ids = self.invert_dictionary(editors) |
| 86 | + self.revisions = {} |
| 87 | + for file_id, editors in file_ids.iteritems(): |
| 88 | + for editor in editors: |
| 89 | + self.revisions.setdefault(file_id, []) |
| 90 | + self.revisions[file_id].extend(data[editor]) |
| 91 | + |
| 92 | + def add(self, revision): |
| 93 | + self.stringify(revision) |
| 94 | + id = revision['revision_id'] |
| 95 | + self.revisions[id] = revision |
| 96 | + if len(self.revisions) > 10000: |
| 97 | + #print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions)) |
| 98 | + self.store() |
| 99 | + |
| 100 | + |
| 101 | + def stringify(self, revision): |
| 102 | + for key, value in revision.iteritems(): |
| 103 | + try: |
| 104 | + value = str(value) |
| 105 | + except UnicodeEncodeError: |
| 106 | + value = value.encode('utf-8') |
| 107 | + revision[key] = value |
| 108 | + |
| 109 | + |
| 110 | + def summary(self): |
| 111 | + print 'Worker %s: Number of articles: %s' % (self.process_id, self.count_articles) |
| 112 | + print 'Worker %s: Number of revisions: %s' % (self.process_id, self.count_revisions) |
| 113 | + |
| 114 | + def store(self): |
| 115 | + self.write_revisions() |
| 116 | + self.write_articles() |
| 117 | + self.write_comments() |
| 118 | + |
| 119 | + def close(self): |
| 120 | + self.store() |
| 121 | + self.filehandles = [fh.close() for fh in self.filehandles] |
| 122 | + |
| 123 | + def write_comments(self): |
| 124 | + rows = [] |
| 125 | + try: |
| 126 | + for revision_id, comment in self.comments.iteritems(): |
| 127 | + #comment = comment.decode('utf-8') |
| 128 | + #row = '\t'.join([revision_id, comment]) + '\n' |
| 129 | + rows.append([revision_id, comment]) |
| 130 | + file_utils.write_list_to_csv(rows, self.fh_comments) |
| 131 | + self.comments = {} |
| 132 | + except Exception, error: |
| 133 | + print '''Encountered the following error while writing comment data |
| 134 | + to %s: %s''' % (self.fh_comments, error) |
| 135 | + |
| 136 | + def write_articles(self): |
| 137 | + #t0 = datetime.datetime.now() |
| 138 | + if len(self.articles.keys()) > 10000: |
| 139 | + rows = [] |
| 140 | + try: |
| 141 | + for article_id, data in self.articles.iteritems(): |
| 142 | + keys = data.keys() |
| 143 | + keys.insert(0, 'id') |
| 144 | + |
| 145 | + values = data.values() |
| 146 | + values.insert(0, article_id) |
| 147 | + |
| 148 | + row = zip(keys, values) |
| 149 | + row = list(itertools.chain(*row)) |
| 150 | + #row = '\t'.join([article_id, title]) + '\n' |
| 151 | + rows.append(row) |
| 152 | + file_utils.write_list_to_csv(rows, self.fh_articles, newline=False) |
| 153 | + self.articles = {} |
| 154 | + except Exception, error: |
| 155 | + print '''Encountered the following error while writing article |
| 156 | + data to %s: %s''' % (self.fh_articles, error) |
| 157 | + #t1 = datetime.datetime.now() |
| 158 | + #print '%s articles took %s' % (len(self.articles.keys()), (t1 - t0)) |
| 159 | + |
| 160 | + def write_revisions(self): |
| 161 | + #t0 = datetime.datetime.now() |
| 162 | + self.group_revisions_by_fileid() |
| 163 | + file_ids = self.revisions.keys() |
| 164 | + for file_id in file_ids: |
| 165 | + wait = True |
| 166 | + for i, revision in enumerate(self.revisions[file_id]): |
| 167 | + if i == 0: |
| 168 | + while wait: |
| 169 | + #print file_id, self.lock |
| 170 | + if self.lock.available(file_id): |
| 171 | + fh = self.filehandles[file_id] |
| 172 | + wait = False |
| 173 | + try: |
| 174 | + file_utils.write_list_to_csv(revision, fh) |
| 175 | + except Exception, error: |
| 176 | + print '''Encountered the following error while writing |
| 177 | + revision data to %s: %s''' % (fh, error) |
| 178 | + self.lock.release(file_id) |
| 179 | + del self.revisions[file_id] |
| 180 | + wait = True |
| 181 | +# t1 = datetime.datetime.now() |
| 182 | +# print 'Worker %s: %s revisions took %s' % (self.process_id, |
| 183 | +# len([1]), |
| 184 | +# (t1 - t0)) |
Property changes on: trunk/tools/editor_trends/classes/buffer.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 185 | + native |