r82425 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r82424‎ | r82425 | r82426 >
Date:23:43, 18 February 2011
Author:diederik
Status:deferred
Tags:
Comment:
Battle-tested version of synchronized Variable class, it seems to work now for real.
Modified paths:
  • /trunk/tools/editor_trends/analyses/analyzer.py (modified) (history)
  • /trunk/tools/editor_trends/classes/dataset.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/analyses/analyzer.py
@@ -17,7 +17,8 @@
1818 __date__ = '2010-12-10'
1919 __version__ = '0.1'
2020
21 -from multiprocessing import JoinableQueue, Lock, Manager, RLock
 21+from multiprocessing import JoinableQueue, Manager, RLock, Process
 22+from multiprocessing.managers import BaseManager
2223 from Queue import Empty
2324 import sys
2425 import cPickle
@@ -37,28 +38,16 @@
3839 from utils import timer
3940 from utils import log
4041
 42+
4143 class Analyzer(consumers.BaseConsumer):
42 -
4344 def __init__(self, rts, tasks, result, var):
4445 super(Analyzer, self).__init__(rts, tasks, result)
4546 self.var = var
4647
47 - def convert_synchronized_objects(self):
48 - for obs in self.var:
49 - obs = self.var[obs]
50 - obs.data = obs.data.value
51 -
52 - def store(self):
53 - #self.convert_synchronized_objects()
54 - location = os.path.join(self.rts.binary_location, '%s_%s.bin' % (self.var.name, self.name))
55 - fh = open(location, 'wb')
56 - cPickle.dump(self.var, fh)
57 - fh.close()
58 -
5948 def run(self):
6049 '''
6150 Generic loop function that loops over all the editors of a Wikipedia
62 - project and then calls the function that does the actual aggregation.
 51+ project and then calls the plugin that does the actual mapping.
6352 '''
6453 mongo = db.init_mongo_db(self.rts.dbname)
6554 coll = mongo[self.rts.editors_dataset]
@@ -67,8 +56,6 @@
6857 task = self.tasks.get(block=False)
6958 self.tasks.task_done()
7059 if task == None:
71 - #print self.var.number_of_obs(), len(self.var.obs)
72 - #self.store()
7360 self.result.put(self.var)
7461 break
7562 editor = coll.find_one({'editor': task.editor})
@@ -84,6 +71,24 @@
8572 self.editor = editor
8673
8774
 75+def reconstruct_observations(var):
 76+ '''
 77+ When the Task queue is empty then the Variable instance is returned. However,
 78+ the observations in var.obs are still pickled. This function does two things:
 79+ a) it uses an ordinary dictionary instead of a shared dictionary
 80+ b) it reconstructs the serialized observations to instances of Observation
 81+ '''
 82+ if not isinstance(var, dataset.Variable):
 83+ raise 'var should be an instance of Variable.'
 84+
 85+ keys = var.obs.keys()
 86+ d = {}
 87+ for key in keys:
 88+ d[key] = cPickle.loads(var.obs[key])
 89+ var.obs = d
 90+ return var
 91+
 92+
8893 def retrieve_plugin(func):
8994 functions = inventory.available_analyses()
9095 try:
@@ -118,11 +123,14 @@
119124 plugin = retrieve_plugin(func)
120125 feedback(plugin, rts)
121126
122 -
 127+ obs = dict()
123128 tasks = JoinableQueue()
124129 result = JoinableQueue()
 130+
125131 mgr = Manager()
126132 lock = mgr.RLock()
 133+ obs_proxy = mgr.dict(obs)
 134+
127135 editors = db.retrieve_distinct_keys(rts.dbname, rts.editors_dataset, 'editor')
128136 min_year, max_year = determine_project_year_range(rts.dbname,
129137 rts.editors_dataset,
@@ -133,7 +141,7 @@
134142 kwargs['max_year'] = max_year
135143
136144 pbar = progressbar.ProgressBar(maxval=len(editors)).start()
137 - var = dataset.Variable('count', time_unit, lock, **kwargs)
 145+ var = dataset.Variable('count', time_unit, lock, obs_proxy, **kwargs)
138146
139147 for editor in editors:
140148 tasks.put(Task(plugin, editor))
@@ -141,13 +149,16 @@
142150 consumers = [Analyzer(rts, tasks, result, var) for
143151 x in xrange(rts.number_of_processes)]
144152
 153+
145154 for x in xrange(rts.number_of_processes):
146155 tasks.put(None)
147156
148157 for w in consumers:
149158 w.start()
150159
 160+
151161 ppills = rts.number_of_processes
 162+ vars = []
152163 while True:
153164 while ppills > 0:
154165 try:
@@ -157,21 +168,23 @@
158169 else:
159170 ppills -= 1
160171 var = res
 172+ #if res.number_of_obs() > var.number_of_obs():
 173+ #vars.append(res)
161174 except Empty:
162175 pass
163176 break
164177
 178+ tasks.join()
165179
166 - tasks.join()
 180+ reconstruct_observations(var)
167181 ds = dataset.Dataset(plugin.func_name, rts, format=fmt)
168 - #var = consumers[0].var
169182 ds.add_variable(var)
170183
171184 stopwatch.elapsed()
172 - write_output(ds, rts, stopwatch)
 185+ #write_output(ds, rts, stopwatch)
173186
174187 ds.summary()
175 - return True
 188+ #return True
176189
177190
178191 def determine_project_year_range(dbname, collection, var):
@@ -204,8 +217,8 @@
205218 # generate_chart_data(rts, 'total_number_of_new_wikipedians', time_unit='year')
206219 # generate_chart_data(rts, 'total_number_of_articles', time_unit='year')
207220 # generate_chart_data(rts, 'total_cumulative_edits', time_unit='year')
208 -# generate_chart_data(rts, 'cohort_dataset_forward_histogram', time_unit='month', cutoff=5, cum_cutoff=0)
209 -# generate_chart_data(rts, 'cohort_dataset_backward_bar', time_unit='year', cutoff=10, cum_cutoff=0, format='wide')
 221+# generate_chart_data(rts, 'cohort_dataset_forward_histogram', time_unit='month', cutoff=1, cum_cutoff=10)
 222+ generate_chart_data(rts, 'cohort_dataset_backward_bar', time_unit='year', cutoff=1, cum_cutoff=10, format='wide')
210223 # generate_chart_data(rts, 'cohort_dataset_forward_bar', time_unit='year', cutoff=5, cum_cutoff=0, format='wide')
211224 # generate_chart_data(rts, 'histogram_edits', time_unit='year', cutoff=0)
212225 # generate_chart_data(rts, 'time_to_new_wikipedian', time_unit='year', cutoff=0)
Index: trunk/tools/editor_trends/classes/dataset.py
@@ -23,9 +23,10 @@
2424 import math
2525 import operator
2626 import sys
 27+import cPickle
2728 import hashlib
2829 from pymongo.son_manipulator import SONManipulator
29 -from multiprocessing import RLock, Array, Value
 30+from multiprocessing import Manager
3031 from texttable import Texttable
3132 from datetime import timedelta
3233
@@ -161,10 +162,8 @@
162163 def __init__(self, date, time_unit, id, meta):
163164 assert isinstance(date, datetime.datetime), '''Date variable should be
164165 a datetime.datetime instance.'''
165 - #self.lock = lock #Lock()
166166 self.date = date
167167 self.data = 0
168 - #self.data = Value('i', 0)
169168 self.time_unit = time_unit
170169 self.t1, self.t0 = self.set_date_range(date)
171170 self.id = id
@@ -194,24 +193,36 @@
195194 def __getitem__(self, key):
196195 return getattr(self, key, [])
197196
 197+ def serialize(self):
 198+ return cPickle.dumps(self)
 199+
 200+ def deserialize(self):
 201+ return cPickle.loads(self)
 202+
198203 def add(self, value):
199204 '''
200205 '''
201 - #self.lock.acquire()
202 - #try:
203206 if isinstance(value, list):
204207 if self.count == 0:
205208 self.data = []
206 - #self.data = Array('i', 0)
207209 self.data.append(value)
208210 else:
209211 self.data += value
210 - #self.data.value += value
211 - #finally:
212212 self.count += 1
213 - #self.lock.release()
 213+# self.lock.acquire()
 214+# try:
 215+# if isinstance(value, list):
 216+# if self.count == 0:
 217+# self.data = []
 218+# self.data.append(value)
 219+# else:
 220+# self.data += value
 221+# finally:
 222+# self.count += 1
 223+# self.lock.release()
214224
215225
 226+
216227 def get_date_range(self):
217228 return '%s-%s-%s:%s-%s-%s' % (self.t0.month, self.t0.day, self.t0.year, \
218229 self.t1.month, self.t1.day, self.t1.year)
@@ -220,10 +231,10 @@
221232 '''
222233 This class constructs a time-based variable.
223234 '''
224 - def __init__(self, name, time_unit, lock, **kwargs):
 235+ def __init__(self, name, time_unit, lock, obs, **kwargs):
225236 self.name = name
226237 self.lock = lock
227 - self.obs = {}
 238+ self.obs = obs
228239 self.time_unit = time_unit
229240 self.groupbys = []
230241 self._type = 'variable'
@@ -265,14 +276,7 @@
266277 return [o for o in self.itervalues()]
267278
268279 def get_observation(self, id, date, meta):
269 - self.lock.acquire()
270 - try:
271 - obs = self.obs.get(id, Observation(date, self.time_unit, id, meta))
272 - #self.obs[id] = obs
273 - x = len(self.obs)
274 - finally:
275 - self.lock.release()
276 - return obs
 280+ return self.obs.get(id, Observation(date, self.time_unit, id, meta).serialize())
277281
278282 def add(self, date, value, meta={}):
279283 '''
@@ -300,10 +304,13 @@
301305 values.insert(0, end)
302306 values.insert(0, start)
303307 id = self.__hash__(values)
304 - obs = self.get_observation(id, date, meta)
305 - obs.add(value)
 308+
 309+ self.lock.acquire()
306310 try:
307 - self.lock.acquire()
 311+ obs = self.get_observation(id, date, meta)
 312+ obs = cPickle.loads(obs)
 313+ obs.add(value)
 314+ obs = obs.serialize()
308315 self.obs[id] = obs
309316 finally:
310317 self.lock.release()
@@ -376,7 +383,6 @@
377384 name = kwargs.pop('name')
378385 setattr(self, name, Variable(name, **kwargs))
379386 self.variables.append(name)
380 - #self.filename = self.create_filename()
381387
382388 def __repr__(self):
383389 return 'Dataset contains %s variables' % (len(self.variables))
@@ -426,15 +432,18 @@
427433 self.filename = filename
428434
429435
430 - def add_variable(self, var):
 436+ def add_variable(self, vars):
431437 '''
432438 Call this function to add a Variable to a dataset.
433439 '''
434 - if isinstance(var, Variable):
435 - self.variables.append(var.name)
436 - setattr(self, var.name, var)
437 - else:
438 - raise TypeError('You can only add an instance of Variable to a dataset.')
 440+ if not isinstance(vars, list):
 441+ vars = [vars]
 442+ for var in vars:
 443+ if isinstance(var, Variable):
 444+ self.variables.append(var.name)
 445+ setattr(self, var.name, var)
 446+ else:
 447+ raise TypeError('You can only add an instance of Variable to a dataset.')
439448
440449 def write(self, format='csv'):
441450 '''

Status & tagging log