Index: trunk/tools/editor_trends/analyses/analyzer.py |
— | — | @@ -17,7 +17,8 @@ |
18 | 18 | __date__ = '2010-12-10' |
19 | 19 | __version__ = '0.1' |
20 | 20 | |
21 | | -from multiprocessing import JoinableQueue, Lock, Manager, RLock |
| 21 | +from multiprocessing import JoinableQueue, Manager, RLock, Process |
| 22 | +from multiprocessing.managers import BaseManager |
22 | 23 | from Queue import Empty |
23 | 24 | import sys |
24 | 25 | import cPickle |
— | — | @@ -37,28 +38,16 @@ |
38 | 39 | from utils import timer |
39 | 40 | from utils import log |
40 | 41 | |
| 42 | + |
41 | 43 | class Analyzer(consumers.BaseConsumer): |
42 | | - |
43 | 44 | def __init__(self, rts, tasks, result, var): |
44 | 45 | super(Analyzer, self).__init__(rts, tasks, result) |
45 | 46 | self.var = var |
46 | 47 | |
47 | | - def convert_synchronized_objects(self): |
48 | | - for obs in self.var: |
49 | | - obs = self.var[obs] |
50 | | - obs.data = obs.data.value |
51 | | - |
52 | | - def store(self): |
53 | | - #self.convert_synchronized_objects() |
54 | | - location = os.path.join(self.rts.binary_location, '%s_%s.bin' % (self.var.name, self.name)) |
55 | | - fh = open(location, 'wb') |
56 | | - cPickle.dump(self.var, fh) |
57 | | - fh.close() |
58 | | - |
59 | 48 | def run(self): |
60 | 49 | ''' |
61 | 50 | Generic loop function that loops over all the editors of a Wikipedia |
62 | | - project and then calls the function that does the actual aggregation. |
| 51 | + project and then calls the plugin that does the actual mapping. |
63 | 52 | ''' |
64 | 53 | mongo = db.init_mongo_db(self.rts.dbname) |
65 | 54 | coll = mongo[self.rts.editors_dataset] |
— | — | @@ -67,8 +56,6 @@ |
68 | 57 | task = self.tasks.get(block=False) |
69 | 58 | self.tasks.task_done() |
70 | 59 | if task == None: |
71 | | - #print self.var.number_of_obs(), len(self.var.obs) |
72 | | - #self.store() |
73 | 60 | self.result.put(self.var) |
74 | 61 | break |
75 | 62 | editor = coll.find_one({'editor': task.editor}) |
— | — | @@ -84,6 +71,24 @@ |
85 | 72 | self.editor = editor |
86 | 73 | |
87 | 74 | |
| 75 | +def reconstruct_observations(var): |
| 76 | + ''' |
| 77 | + When the Task queue is empty then the Variable instance is returned. However, |
| 78 | + the observations in var.obs are still pickled. This function does two things: |
| 79 | + a) it uses an ordinary dictionary instead of a shared dictionary |
| 80 | + b) it reconstructs the serialized observations to instances of Observation |
| 81 | + ''' |
| 82 | + if not isinstance(var, dataset.Variable): |
| 83 | + raise 'var should be an instance of Variable.' |
| 84 | + |
| 85 | + keys = var.obs.keys() |
| 86 | + d = {} |
| 87 | + for key in keys: |
| 88 | + d[key] = cPickle.loads(var.obs[key]) |
| 89 | + var.obs = d |
| 90 | + return var |
| 91 | + |
| 92 | + |
88 | 93 | def retrieve_plugin(func): |
89 | 94 | functions = inventory.available_analyses() |
90 | 95 | try: |
— | — | @@ -118,11 +123,14 @@ |
119 | 124 | plugin = retrieve_plugin(func) |
120 | 125 | feedback(plugin, rts) |
121 | 126 | |
122 | | - |
| 127 | + obs = dict() |
123 | 128 | tasks = JoinableQueue() |
124 | 129 | result = JoinableQueue() |
| 130 | + |
125 | 131 | mgr = Manager() |
126 | 132 | lock = mgr.RLock() |
| 133 | + obs_proxy = mgr.dict(obs) |
| 134 | + |
127 | 135 | editors = db.retrieve_distinct_keys(rts.dbname, rts.editors_dataset, 'editor') |
128 | 136 | min_year, max_year = determine_project_year_range(rts.dbname, |
129 | 137 | rts.editors_dataset, |
— | — | @@ -133,7 +141,7 @@ |
134 | 142 | kwargs['max_year'] = max_year |
135 | 143 | |
136 | 144 | pbar = progressbar.ProgressBar(maxval=len(editors)).start() |
137 | | - var = dataset.Variable('count', time_unit, lock, **kwargs) |
| 145 | + var = dataset.Variable('count', time_unit, lock, obs_proxy, **kwargs) |
138 | 146 | |
139 | 147 | for editor in editors: |
140 | 148 | tasks.put(Task(plugin, editor)) |
— | — | @@ -141,13 +149,16 @@ |
142 | 150 | consumers = [Analyzer(rts, tasks, result, var) for |
143 | 151 | x in xrange(rts.number_of_processes)] |
144 | 152 | |
| 153 | + |
145 | 154 | for x in xrange(rts.number_of_processes): |
146 | 155 | tasks.put(None) |
147 | 156 | |
148 | 157 | for w in consumers: |
149 | 158 | w.start() |
150 | 159 | |
| 160 | + |
151 | 161 | ppills = rts.number_of_processes |
| 162 | + vars = [] |
152 | 163 | while True: |
153 | 164 | while ppills > 0: |
154 | 165 | try: |
— | — | @@ -157,21 +168,23 @@ |
158 | 169 | else: |
159 | 170 | ppills -= 1 |
160 | 171 | var = res |
| 172 | + #if res.number_of_obs() > var.number_of_obs(): |
| 173 | + #vars.append(res) |
161 | 174 | except Empty: |
162 | 175 | pass |
163 | 176 | break |
164 | 177 | |
| 178 | + tasks.join() |
165 | 179 | |
166 | | - tasks.join() |
| 180 | + reconstruct_observations(var) |
167 | 181 | ds = dataset.Dataset(plugin.func_name, rts, format=fmt) |
168 | | - #var = consumers[0].var |
169 | 182 | ds.add_variable(var) |
170 | 183 | |
171 | 184 | stopwatch.elapsed() |
172 | | - write_output(ds, rts, stopwatch) |
| 185 | + #write_output(ds, rts, stopwatch) |
173 | 186 | |
174 | 187 | ds.summary() |
175 | | - return True |
| 188 | + #return True |
176 | 189 | |
177 | 190 | |
178 | 191 | def determine_project_year_range(dbname, collection, var): |
— | — | @@ -204,8 +217,8 @@ |
205 | 218 | # generate_chart_data(rts, 'total_number_of_new_wikipedians', time_unit='year') |
206 | 219 | # generate_chart_data(rts, 'total_number_of_articles', time_unit='year') |
207 | 220 | # generate_chart_data(rts, 'total_cumulative_edits', time_unit='year') |
208 | | -# generate_chart_data(rts, 'cohort_dataset_forward_histogram', time_unit='month', cutoff=5, cum_cutoff=0) |
209 | | -# generate_chart_data(rts, 'cohort_dataset_backward_bar', time_unit='year', cutoff=10, cum_cutoff=0, format='wide') |
| 221 | +# generate_chart_data(rts, 'cohort_dataset_forward_histogram', time_unit='month', cutoff=1, cum_cutoff=10) |
| 222 | + generate_chart_data(rts, 'cohort_dataset_backward_bar', time_unit='year', cutoff=1, cum_cutoff=10, format='wide') |
210 | 223 | # generate_chart_data(rts, 'cohort_dataset_forward_bar', time_unit='year', cutoff=5, cum_cutoff=0, format='wide') |
211 | 224 | # generate_chart_data(rts, 'histogram_edits', time_unit='year', cutoff=0) |
212 | 225 | # generate_chart_data(rts, 'time_to_new_wikipedian', time_unit='year', cutoff=0) |
Index: trunk/tools/editor_trends/classes/dataset.py |
— | — | @@ -23,9 +23,10 @@ |
24 | 24 | import math |
25 | 25 | import operator |
26 | 26 | import sys |
| 27 | +import cPickle |
27 | 28 | import hashlib |
28 | 29 | from pymongo.son_manipulator import SONManipulator |
29 | | -from multiprocessing import RLock, Array, Value |
| 30 | +from multiprocessing import Manager |
30 | 31 | from texttable import Texttable |
31 | 32 | from datetime import timedelta |
32 | 33 | |
— | — | @@ -161,10 +162,8 @@ |
162 | 163 | def __init__(self, date, time_unit, id, meta): |
163 | 164 | assert isinstance(date, datetime.datetime), '''Date variable should be |
164 | 165 | a datetime.datetime instance.''' |
165 | | - #self.lock = lock #Lock() |
166 | 166 | self.date = date |
167 | 167 | self.data = 0 |
168 | | - #self.data = Value('i', 0) |
169 | 168 | self.time_unit = time_unit |
170 | 169 | self.t1, self.t0 = self.set_date_range(date) |
171 | 170 | self.id = id |
— | — | @@ -194,24 +193,36 @@ |
195 | 194 | def __getitem__(self, key): |
196 | 195 | return getattr(self, key, []) |
197 | 196 | |
| 197 | + def serialize(self): |
| 198 | + return cPickle.dumps(self) |
| 199 | + |
| 200 | + def deserialize(self): |
| 201 | + return cPickle.loads(self) |
| 202 | + |
198 | 203 | def add(self, value): |
199 | 204 | ''' |
200 | 205 | ''' |
201 | | - #self.lock.acquire() |
202 | | - #try: |
203 | 206 | if isinstance(value, list): |
204 | 207 | if self.count == 0: |
205 | 208 | self.data = [] |
206 | | - #self.data = Array('i', 0) |
207 | 209 | self.data.append(value) |
208 | 210 | else: |
209 | 211 | self.data += value |
210 | | - #self.data.value += value |
211 | | - #finally: |
212 | 212 | self.count += 1 |
213 | | - #self.lock.release() |
| 213 | +# self.lock.acquire() |
| 214 | +# try: |
| 215 | +# if isinstance(value, list): |
| 216 | +# if self.count == 0: |
| 217 | +# self.data = [] |
| 218 | +# self.data.append(value) |
| 219 | +# else: |
| 220 | +# self.data += value |
| 221 | +# finally: |
| 222 | +# self.count += 1 |
| 223 | +# self.lock.release() |
214 | 224 | |
215 | 225 | |
| 226 | + |
216 | 227 | def get_date_range(self): |
217 | 228 | return '%s-%s-%s:%s-%s-%s' % (self.t0.month, self.t0.day, self.t0.year, \ |
218 | 229 | self.t1.month, self.t1.day, self.t1.year) |
— | — | @@ -220,10 +231,10 @@ |
221 | 232 | ''' |
222 | 233 | This class constructs a time-based variable. |
223 | 234 | ''' |
224 | | - def __init__(self, name, time_unit, lock, **kwargs): |
| 235 | + def __init__(self, name, time_unit, lock, obs, **kwargs): |
225 | 236 | self.name = name |
226 | 237 | self.lock = lock |
227 | | - self.obs = {} |
| 238 | + self.obs = obs |
228 | 239 | self.time_unit = time_unit |
229 | 240 | self.groupbys = [] |
230 | 241 | self._type = 'variable' |
— | — | @@ -265,14 +276,7 @@ |
266 | 277 | return [o for o in self.itervalues()] |
267 | 278 | |
268 | 279 | def get_observation(self, id, date, meta): |
269 | | - self.lock.acquire() |
270 | | - try: |
271 | | - obs = self.obs.get(id, Observation(date, self.time_unit, id, meta)) |
272 | | - #self.obs[id] = obs |
273 | | - x = len(self.obs) |
274 | | - finally: |
275 | | - self.lock.release() |
276 | | - return obs |
| 280 | + return self.obs.get(id, Observation(date, self.time_unit, id, meta).serialize()) |
277 | 281 | |
278 | 282 | def add(self, date, value, meta={}): |
279 | 283 | ''' |
— | — | @@ -300,10 +304,13 @@ |
301 | 305 | values.insert(0, end) |
302 | 306 | values.insert(0, start) |
303 | 307 | id = self.__hash__(values) |
304 | | - obs = self.get_observation(id, date, meta) |
305 | | - obs.add(value) |
| 308 | + |
| 309 | + self.lock.acquire() |
306 | 310 | try: |
307 | | - self.lock.acquire() |
| 311 | + obs = self.get_observation(id, date, meta) |
| 312 | + obs = cPickle.loads(obs) |
| 313 | + obs.add(value) |
| 314 | + obs = obs.serialize() |
308 | 315 | self.obs[id] = obs |
309 | 316 | finally: |
310 | 317 | self.lock.release() |
— | — | @@ -376,7 +383,6 @@ |
377 | 384 | name = kwargs.pop('name') |
378 | 385 | setattr(self, name, Variable(name, **kwargs)) |
379 | 386 | self.variables.append(name) |
380 | | - #self.filename = self.create_filename() |
381 | 387 | |
382 | 388 | def __repr__(self): |
383 | 389 | return 'Dataset contains %s variables' % (len(self.variables)) |
— | — | @@ -426,15 +432,18 @@ |
427 | 433 | self.filename = filename |
428 | 434 | |
429 | 435 | |
430 | | - def add_variable(self, var): |
| 436 | + def add_variable(self, vars): |
431 | 437 | ''' |
432 | 438 | Call this function to add a Variable to a dataset. |
433 | 439 | ''' |
434 | | - if isinstance(var, Variable): |
435 | | - self.variables.append(var.name) |
436 | | - setattr(self, var.name, var) |
437 | | - else: |
438 | | - raise TypeError('You can only add an instance of Variable to a dataset.') |
| 440 | + if not isinstance(vars, list): |
| 441 | + vars = [vars] |
| 442 | + for var in vars: |
| 443 | + if isinstance(var, Variable): |
| 444 | + self.variables.append(var.name) |
| 445 | + setattr(self, var.name, var) |
| 446 | + else: |
| 447 | + raise TypeError('You can only add an instance of Variable to a dataset.') |
439 | 448 | |
440 | 449 | def write(self, format='csv'): |
441 | 450 | ''' |