r86864 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r86863‎ | r86864 | r86865 >
Date:15:19, 25 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
New queue module that adds some extra functionality to the standard Queue module.
Modified paths:
  • /trunk/tools/editor_trends/classes/queue.py (added) (history)
  • /trunk/tools/editor_trends/classes/storage.py (modified) (history)
  • /trunk/tools/editor_trends/etl/sort.py (modified) (history)
  • /trunk/tools/editor_trends/etl/transformer.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/transformer.py
@@ -34,17 +34,20 @@
3535 A simple class takes care of fetching an editor from the queue and start
3636 processing its edits.
3737 '''
38 - def __init__(self, rts, tasks):
 38+ def __init__(self, rts, tasks, db_raw, db_dataset):
3939 super(EditorConsumer, self).__init__(rts, tasks)
 40+ self.db_raw = db_raw
 41+ self.db_dataset = db_dataset
4042
4143 def run(self):
4244 while True:
43 - new_editor = self.tasks.get()
 45+ editor = self.tasks.get()
4446 self.tasks.task_done()
4547 print '%s editors to go...' % messages.show(self.tasks.qsize)
46 - if new_editor == None:
 48+ if editor == None:
4749 break
48 - new_editor()
 50+ editor = Editor(self.db_raw, self.db_dataset, editor)
 51+ editor()
4952
5053
5154 class Editor:
@@ -52,12 +55,12 @@
5356 self.editor_id = editor_id
5457 self.db_raw = db_raw #storage.init_database(self.rts.storage, self.rts.dbname, self.rts.editors_raw)
5558 self.db_dataset = db_dataset #storage.init_database(self.rts.storage, self.rts.dbname, self.rts.editors_dataset)
 59+ self.cutoff = 9
5660
5761 def __str__(self):
5862 return '%s' % (self.editor_id)
5963
6064 def __call__(self):
61 - cutoff = 9
6265 editor = self.db_raw.find_one('editor', self.editor_id)
6366 if editor == None:
6467 return
@@ -74,7 +77,7 @@
7578 character_count = determine_edit_volume(edits, first_year, final_year)
7679 revert_count = determine_number_reverts(edits, first_year, final_year)
7780
78 - edits = sort_edits(edits)
 81+
7982 edit_count = determine_number_edits(edits, first_year, final_year)
8083
8184 totals = {}
@@ -84,18 +87,21 @@
8588 totals = calculate_totals(totals, counts, article_count, 'article_count')
8689 totals = calculate_totals(totals, counts, edit_count, 'edit_count')
8790
88 - if len(edits) > cutoff:
89 - new_wikipedian = edits[cutoff]['date']
 91+ if len(edits) > self.cutoff:
 92+ new_wikipedian = edits[self.cutoff]['date']
9093 else:
9194 new_wikipedian = False
92 - cum_edit_count = len(edits)
 95+ cum_edit_count_main_ns, cum_edit_count_other_ns = calculate_cum_edits(edits)
 96+
 97+ edits = sort_edits(edits)
9398 first_edit = edits[0]['date']
9499 final_edit = edits[-1]['date']
95100
96101 data = {'editor': self.editor_id,
97102 'username': username,
98103 'new_wikipedian': new_wikipedian,
99 - 'cum_edit_count': cum_edit_count,
 104+ 'cum_edit_count_main_ns': cum_edit_count_main_ns,
 105+ 'cum_edit_count_other_ns': cum_edit_count_other_ns,
100106 'final_edit': final_edit,
101107 'first_edit': first_edit,
102108 'last_edit_by_year': last_edit_by_year,
@@ -109,6 +115,7 @@
110116 }
111117 self.db_dataset.insert(data)
112118
 119+
113120 def cleanup_datacontainer(dc, variable_type):
114121 '''
115122 valid variable_type are either a {}, a [] or 0.
@@ -154,15 +161,27 @@
155162 '''
156163 dc = data_converter.create_datacontainer(first_year, final_year)
157164 dc = data_converter.add_months_to_datacontainer(dc, 'dict')
158 - for edit in edits:
159 - ns = edit['ns']
160 - year, month = str(edit['date'].year), edit['date'].month
161 - dc[year][month].setdefault(ns, 0)
162 - dc[year][month][ns] += 1
 165+ for year in edits:
 166+ for edit in edits[year]:
 167+ ns = edit['ns']
 168+ month = edit['date'].month
 169+ dc[year][month].setdefault(ns, 0)
 170+ dc[year][month][ns] += 1
163171 dc = cleanup_datacontainer(dc, {})
164172 return dc
165173
 174+def calculate_cum_edits(edits):
 175+ cum_edit_count_main_ns = 0
 176+ cum_edit_count_other_ns = 0
 177+ for year in edits:
 178+ for edit in edits[year]:
 179+ if edit['ns'] == 0:
 180+ cum_edit_count_main_ns += 1
 181+ else:
 182+ cum_edit_count_other_ns += 1
166183
 184+ return cum_edit_count_main_ns, cum_edit_count_other_ns
 185+
167186 def determine_articles_workedon(edits, first_year, final_year):
168187 '''
169188 This function creates a list of article_ids that an editor has worked on in
@@ -264,9 +283,9 @@
265284 for edit in edits[year]:
266285 date = str(edit['date'].year)
267286 if dc[date] == 0:
268 - dc[date] = edit
269 - elif dc[date] < edit:
270 - dc[date] = edit
 287+ dc[date] = edit['date']
 288+ elif dc[date] < edit['date']:
 289+ dc[date] = edit['date']
271290 return dc
272291
273292
@@ -290,51 +309,52 @@
291310 return sorted(edits, key=itemgetter('date'))
292311
293312
 313+def setup_database(rts):
 314+ '''
 315+ Initialize the database, including setting indexes and dropping the older
 316+ version of the collection.
 317+ '''
 318+ db_raw = storage.init_database(rts.storage, rts.dbname, rts.editors_raw)
 319+ db_dataset = storage.init_database(rts.storage, rts.dbname, rts.editors_dataset)
 320+ db_dataset.drop_collection()
 321+ editors = db_raw.retrieve_editors()
 322+ return db_raw, db_dataset, editors
 323+
 324+
294325 def transform_editors_multi_launcher(rts):
295 - tasks = multiprocessing.JoinableQueue()
296326 db_raw, db_dataset, editors = setup_database(rts)
297 - transformers = [EditorConsumer(rts, tasks) for i in xrange(rts.number_of_processes)]
 327+ transformers = [EditorConsumer(rts, editors, db_raw, db_dataset) for i in xrange(rts.number_of_processes)]
298328
299 - for editor in editors:
300 - tasks.put(Editor(rts, editor))
301329
302330 for x in xrange(rts.number_of_processes):
303 - tasks.put(None)
 331+ editors.put(None)
304332
305 - print messages.show(tasks.qsize)
306333 for transformer in transformers:
307334 transformer.start()
308335
309 - tasks.join()
 336+ editors.join()
310337
311338 db_dataset.add_index('editor')
312339 db_dataset.add_index('new_wikipedian')
313340
314341
315 -def setup_database(rts):
316 - '''
317 - Initialize the database, including setting indexes and dropping the older
318 - version of the collection.
319 - '''
320 - db_raw = storage.init_database(rts.storage, rts.dbname, rts.editors_raw)
321 - db_dataset = storage.init_database(rts.storage, rts.dbname, rts.editors_dataset)
322 - db_dataset.drop_collection()
323 - editors = []
324 - #editors = db_raw.retrieve_distinct_keys('editor')
325 - #db_dataset.add_index('editor')
326 - #db_dataset.add_index('new_wikipedian')
327 -
328 - return db_raw, db_dataset, editors
329 -
330 -
331342 def transform_editors_single_launcher(rts):
332343 print rts.dbname, rts.editors_raw
333344 db_raw, db_dataset, editors = setup_database(rts)
334345 n = db_raw.count()
335346 pbar = progressbar.ProgressBar(maxval=n).start()
336 - for editor in db_raw.find():
 347+
 348+ for x in xrange(rts.number_of_processes):
 349+ editors.put(None)
 350+
 351+ while True:
 352+ editor = editors.get()
 353+ editors.task_done()
 354+ if editor == None:
 355+ break
337356 editor = Editor(db_raw, db_dataset, editor)
338357 editor()
 358+
339359 pbar.update(pbar.currval + 1)
340360
341361 db_dataset.add_index('editor')
Index: trunk/tools/editor_trends/etl/sort.py
@@ -56,9 +56,9 @@
5757 for x, d in enumerate(data):
5858 d = d.strip().split('\t')
5959 #TEMP FIX:
60 - editor = d[2]
61 - d[2] = d[0]
62 - d[0] = editor
 60+ #editor = d[2]
 61+ #d[2] = d[0]
 62+ #d[0] = editor
6363 #END TEMP FIX
6464 data[x] = d
6565 #data = [d.strip() for d in data]
Index: trunk/tools/editor_trends/classes/queue.py
@@ -0,0 +1,58 @@
 2+#!/usr/bin/python
 3+# coding=utf-8
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http,//www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+
 17+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)'])
 18+__email__ = 'dvanliere at gmail dot com'
 19+__date__ = '2011-04-21'
 20+__version__ = '0.1'
 21+
 22+
 23+from multiprocessing.queues import JoinableQueue, Queue
 24+import errno
 25+
 26+def retry_on_eintr(function, *args, **kw):
 27+ while True:
 28+ try:
 29+ return function(*args, **kw)
 30+ except IOError, e:
 31+ if e.errno == errno.EINTR:
 32+ continue
 33+ else:
 34+ raise
 35+
 36+class RetryQueue(Queue):
 37+ """Queue which will retry if interrupted with EINTR."""
 38+ def get(self, block=True, timeout=None):
 39+ return retry_on_eintr(Queue.get, self, block, timeout)
 40+
 41+ def qsize(self):
 42+ try:
 43+ return self.qsize()
 44+ except:
 45+ #OSX does not support the qsize function so we return unknown
 46+ return 'unknown'
 47+
 48+
 49+class JoinableRetryQueue(JoinableQueue):
 50+ """Queue which will retry if interrupted with EINTR."""
 51+ def get(self, block=True, timeout=None):
 52+ return retry_on_eintr(Queue.get, self, block, timeout)
 53+
 54+ def qsize(self):
 55+ try:
 56+ return self.qsize()
 57+ except:
 58+ #OSX does not support the qsize function so we return unknown
 59+ return 'unknown'
Property changes on: trunk/tools/editor_trends/classes/queue.py
___________________________________________________________________
Added: svn:eol-style
160 + native
Index: trunk/tools/editor_trends/classes/storage.py
@@ -23,10 +23,11 @@
2424 if '..' not in sys.path:
2525 sys.path.append('..')
2626
27 -from classes import settings
 27+import settings
2828 settings = settings.Settings()
2929
30 -from classes import exceptions
 30+import exceptions
 31+import queue
3132 from utils import file_utils
3233
3334 import_error = 0
@@ -81,7 +82,7 @@
8283 '''Update an observation in a collection'''
8384
8485 @abstractmethod
85 - def find(self, key, value, qualifier):
 86+ def find(self, key, qualifier):
8687 '''Find multiple observations in a collection'''
8788
8889 @abstractmethod
@@ -146,7 +147,7 @@
147148 assert isinstance(data, dict), 'You need to feed me dictionaries.'
148149 self.db[self.collection].update({key: value}, data, upsert=True)
149150
150 - def find(self, key=None, value=1, qualifier=None):
 151+ def find(self, key=None, qualifier=None):
151152 if qualifier == 'min':
152153 return self.db[self.collection].find({
153154 key : {'$ne' : False}}).sort(key, pymongo.ASCENDING).limit(1)[0]
@@ -154,7 +155,7 @@
155156 return self.db[self.collection].find({
156157 key : {'$ne' : False}}).sort(key, pymongo.DESCENDING).limit(1)[0]
157158 elif key != None:
158 - return self.db[self.collection].find({key: value})
 159+ return self.db[self.collection].find({}, fields=[key])
159160 else:
160161 return self.db[self.collection].find()
161162
@@ -171,6 +172,13 @@
172173 def count(self):
173174 return self.db[self.collection].count()
174175
 176+ def retrieve_editors(self):
 177+ q = queue.JoinableRetryQueue()
 178+ cursor = self.find('editor')
 179+ for editor in cursor:
 180+ q.put(editor['editor'])
 181+ return q
 182+
175183 def retrieve_distinct_keys(self, key, force_new=False):
176184 '''
177185 TODO: figure out how big the index is and then take appropriate action,
@@ -290,7 +298,7 @@
291299 def update(self, key, value, data):
292300 return
293301
294 - def find(self, key, value, qualifier=None):
 302+ def find(self, key, qualifier=None):
295303 return
296304
297305 def save(self, data):