Index: trunk/tools/editor_trends/etl/transformer.py |
— | — | @@ -17,10 +17,7 @@ |
18 | 18 | __date__ = '2010-11-02' |
19 | 19 | __version__ = '0.1' |
20 | 20 | |
21 | | -import sys |
22 | | -import datetime |
23 | 21 | import multiprocessing |
24 | | -from Queue import Empty |
25 | 22 | from operator import itemgetter |
26 | 23 | from copy import deepcopy |
27 | 24 | |
— | — | @@ -31,7 +28,14 @@ |
32 | 29 | from utils import data_converter |
33 | 30 | from classes import consumers |
34 | 31 | |
| 32 | + |
35 | 33 | class EditorConsumer(consumers.BaseConsumer): |
| 34 | + ''' |
| 35 | + A simple class takes care of fetching an editor from the queue and start |
| 36 | + processing its edits. |
| 37 | + ''' |
| 38 | + def __init__(self): |
| 39 | + super(EditorConsumer, self).__init__() |
36 | 40 | |
37 | 41 | def run(self): |
38 | 42 | while True: |
— | — | @@ -43,9 +47,9 @@ |
44 | 48 | new_editor() |
45 | 49 | |
46 | 50 | |
47 | | -class Editor(object): |
48 | | - def __init__(self, id, db_raw, db_dataset, **kwargs): |
49 | | - self.id = id |
| 51 | +class Editor: |
| 52 | + def __init__(self, editor_id, db_raw, db_dataset, **kwargs): |
| 53 | + self.id = editor_id |
50 | 54 | self.db_raw = db_raw |
51 | 55 | self.db_dataset = db_dataset |
52 | 56 | for kw in kwargs: |
— | — | @@ -121,6 +125,13 @@ |
122 | 126 | |
123 | 127 | |
124 | 128 | def calculate_totals(totals, counts, dc, var): |
| 129 | + ''' |
| 130 | + So far, counting a variable for an editor happens per month per year but |
| 131 | + this makes it cumbersome to determine how many edits an editor has made in |
| 132 | + a single year (you need to iterate over all the months and that can become |
| 133 | + quite expensive when you have 10000s of editors. Hence, this little helper |
| 134 | + function counts the total number of actions on a yearly basis. |
| 135 | + ''' |
125 | 136 | cnts = deepcopy(counts) |
126 | 137 | totals.setdefault(var, {}) |
127 | 138 | for year in dc: |
— | — | @@ -140,6 +151,9 @@ |
141 | 152 | |
142 | 153 | |
143 | 154 | def determine_number_edits(edits, first_year, final_year): |
| 155 | + ''' |
| 156 | + This function counts the number of edits per namespace per month per year. |
| 157 | + ''' |
144 | 158 | dc = data_converter.create_datacontainer(first_year, final_year) |
145 | 159 | dc = data_converter.add_months_to_datacontainer(dc, 'dict') |
146 | 160 | for edit in edits: |
— | — | @@ -152,6 +166,10 @@ |
153 | 167 | |
154 | 168 | |
155 | 169 | def determine_articles_workedon(edits, first_year, final_year): |
| 170 | + ''' |
| 171 | + This function creates a list of article_ids that an editor has worked on in |
| 172 | + a given month/year. |
| 173 | + ''' |
156 | 174 | dc = data_converter.create_datacontainer(first_year, final_year) |
157 | 175 | dc = data_converter.add_months_to_datacontainer(dc, 'dict') |
158 | 176 | for year in edits: |
— | — | @@ -161,6 +179,7 @@ |
162 | 180 | dc[year][month].setdefault(ns, set()) |
163 | 181 | dc[year][month][ns].add(edit['article']) |
164 | 182 | |
| 183 | + #convert the set to a list as mongo cannot store sets. |
165 | 184 | for year in dc: |
166 | 185 | for month in dc[year]: |
167 | 186 | for ns in dc[year][month]: |
— | — | @@ -170,6 +189,10 @@ |
171 | 190 | |
172 | 191 | |
173 | 192 | def determine_namespaces_workedon(edits, first_year, final_year): |
| 193 | + ''' |
| 194 | + This function creates a list of namespaces that an editor has worked on in |
| 195 | + a given month/year. |
| 196 | + ''' |
174 | 197 | dc = data_converter.create_datacontainer(first_year, final_year) |
175 | 198 | dc = data_converter.add_months_to_datacontainer(dc, 'set') |
176 | 199 | for year in edits: |
— | — | @@ -184,6 +207,10 @@ |
185 | 208 | |
186 | 209 | |
187 | 210 | def determine_number_reverts(edits, first_year, final_year): |
| 211 | + ''' |
| 212 | + This function counts the number of times an edit was reverted in a given |
| 213 | + month/year. |
| 214 | + ''' |
188 | 215 | dc = data_converter.create_datacontainer(first_year, final_year) |
189 | 216 | dc = data_converter.add_months_to_datacontainer(dc, 'dict') |
190 | 217 | for year in edits: |
— | — | @@ -220,6 +247,9 @@ |
221 | 248 | |
222 | 249 | |
223 | 250 | def determine_year_range(edits): |
| 251 | + ''' |
| 252 | + This function determines the first and final year that an editor was active. |
| 253 | + ''' |
224 | 254 | years = [year for year in edits if edits[year] != []] |
225 | 255 | first_year = int(min(years)) |
226 | 256 | final_year = int(max(years)) + 1 |
— | — | @@ -227,6 +257,10 @@ |
228 | 258 | |
229 | 259 | |
230 | 260 | def determine_last_edit_by_year(edits, first_year, final_year): |
| 261 | + ''' |
| 262 | + This function determines the date of the last edit in a given year for a |
| 263 | + given editor. |
| 264 | + ''' |
231 | 265 | dc = data_converter.create_datacontainer(first_year, final_year, 0) |
232 | 266 | for year in edits: |
233 | 267 | for edit in edits[year]: |
— | — | @@ -259,23 +293,29 @@ |
260 | 294 | |
261 | 295 | def transform_editors_multi_launcher(rts): |
262 | 296 | tasks = multiprocessing.JoinableQueue() |
263 | | - consumers = [EditorConsumer(tasks, None) for i in xrange(rts.number_of_processes)] |
| 297 | + input_db, output_db, editors = setup_database(rts) |
| 298 | + transformers = [EditorConsumer(tasks, None) for i in xrange(rts.number_of_processes)] |
264 | 299 | |
265 | | - for id in ids: |
266 | | - tasks.put(Editor(rts.dbname, rts.editors_raw, id)) |
| 300 | + for editor in editors: |
| 301 | + tasks.put(Editor(rts.dbname, rts.editors_raw, editor)) |
| 302 | + |
267 | 303 | for x in xrange(rts.number_of_processes): |
268 | 304 | tasks.put(None) |
269 | 305 | |
270 | 306 | print messages.show(tasks.qsize) |
271 | | - for w in consumers: |
272 | | - w.start() |
| 307 | + for transformer in transformers: |
| 308 | + transformer.start() |
273 | 309 | |
274 | 310 | tasks.join() |
275 | 311 | |
276 | 312 | |
277 | 313 | def setup_database(rts): |
278 | | - db_raw = storage.Database(rts.storage, rts.dbname, rts.editors_raw) |
279 | | - db_dataset = storage.Database(rts.storage, rts.dbname, rts.editors_dataset) |
| 314 | + ''' |
| 315 | + Initialize the database, including setting indexes and dropping the older |
| 316 | + version of the collection. |
| 317 | + ''' |
| 318 | + db_raw = storage.init_database(rts.storage, rts.dbname, rts.editors_raw) |
| 319 | + db_dataset = storage.init_database(rts.storage, rts.dbname, rts.editors_dataset) |
280 | 320 | db_dataset.drop_collection() |
281 | 321 | ids = db_dataset.retrieve_distinct_keys('editor') |
282 | 322 | db_dataset.add_index('editor') |
— | — | @@ -286,14 +326,15 @@ |
287 | 327 | |
288 | 328 | def transform_editors_single_launcher(rts): |
289 | 329 | print rts.dbname, rts.editors_raw |
290 | | - input_db, output_db, ids = setup_database(rts) |
291 | | - pbar = progressbar.ProgressBar(maxval=len(ids)).start() |
292 | | - for x, id in enumerate(ids): |
| 330 | + input_db, output_db, editors = setup_database(rts) |
| 331 | + pbar = progressbar.ProgressBar(maxval=len(editors)).start() |
| 332 | + for x, editor in enumerate(editors): |
293 | 333 | editor = Editor(id, input_db, output_db) |
294 | 334 | editor() |
295 | 335 | pbar.update(pbar.currval + 1) |
296 | 336 | |
297 | 337 | |
298 | 338 | if __name__ == '__main__': |
299 | | - transform_editors_single_launcher('enwiki', 'editors') |
300 | | - #transform_editors_multi_launcher('enwiki', 'editors') |
| 339 | + rts = None |
| 340 | + transform_editors_single_launcher(rts) |
| 341 | + #transform_editors_multi_launcher(rts) |