Index: trunk/tools/editor_trends/manage.py |
— | — | @@ -40,8 +40,9 @@ |
41 | 41 | from utils import exceptions |
42 | 42 | from database import db |
43 | 43 | from etl import chunker |
44 | | -from etl import extract |
45 | | -from etl import loader |
| 44 | +from etl import extracter |
| 45 | +from etl import store |
| 46 | +from etl import sort |
46 | 47 | from etl import transformer |
47 | 48 | from etl import exporter |
48 | 49 | |
— | — | @@ -233,7 +234,7 @@ |
234 | 235 | language_code = kwargs.pop('language_code') |
235 | 236 | project = kwargs.pop('project') |
236 | 237 | write_message_to_log(logger, args, location=location, language_code=language_code, project=project) |
237 | | - extract.run_parse_editors(location, **kwargs) |
| 238 | + extracter.parse_dumpfile(project, language_code, namespaces=['0']) |
238 | 239 | timer.elapsed() |
239 | 240 | |
240 | 241 | |
— | — | @@ -245,7 +246,7 @@ |
246 | 247 | output = os.path.join(location, 'sorted') |
247 | 248 | final_output = os.path.join(location, 'dbready') |
248 | 249 | write_message_to_log(logger, args, location=location, input=input, output=output, final_output=final_output) |
249 | | - loader.mergesort_launcher(input, output) |
| 250 | + sort.mergesort_launcher(input, output) |
250 | 251 | #loader.mergesort_external_launcher(output, final_output) |
251 | 252 | timer.elapsed() |
252 | 253 | |
— | — | @@ -278,6 +279,9 @@ |
279 | 280 | timer.elapsed() |
280 | 281 | |
281 | 282 | |
| 283 | +def debug_launcher(args, logger, **kwargs): |
| 284 | + pass |
| 285 | + |
282 | 286 | def exporter_launcher(args, logger, **kwargs): |
283 | 287 | print 'Start exporting dataset' |
284 | 288 | timer = Timer() |
— | — | @@ -319,8 +323,8 @@ |
320 | 324 | if clean: |
321 | 325 | cleanup(logger, args, **kwargs) |
322 | 326 | |
323 | | - if format != 'xml': |
324 | | - ignore = ignore + ',extract' |
| 327 | + #if format != 'xml': |
| 328 | + # ignore = ignore + ',extract' |
325 | 329 | |
326 | 330 | functions = ordered_dict.OrderedDict(((dump_downloader_launcher, 'download'), |
327 | 331 | #(chunker_launcher, 'split'), |
— | — | @@ -380,8 +384,8 @@ |
381 | 385 | def main(): |
382 | 386 | default_language = determine_default_language() |
383 | 387 | |
384 | | - datasets = {'cohort': 'generate_cohort_dataset', |
385 | | - 'long': 'generate_long_editor_dataset', |
| 388 | + datasets = {'forward': 'generate_cohort_dataset_forward', |
| 389 | + 'backward': 'generate_cohort_dataset_backward', |
386 | 390 | 'wide': 'generate_wide_editor_dataset', |
387 | 391 | } |
388 | 392 | |
— | — | @@ -426,6 +430,9 @@ |
427 | 431 | parser_dataset = subparsers.add_parser('export', help='Create a dataset from the MongoDB and write it to a csv file.') |
428 | 432 | parser_dataset.set_defaults(func=exporter_launcher) |
429 | 433 | |
| 434 | + parser_debug = subparsers.add_parser('debug', help='Input custom dump files for debugging purposes') |
| 435 | + parser_debug.set_defaults(func=debug_launcher) |
| 436 | + |
430 | 437 | parser_all = subparsers.add_parser('all', help='The all sub command runs the download, split, store and dataset commands.\n\nWARNING: THIS COULD TAKE DAYS DEPENDING ON THE CONFIGURATION OF YOUR MACHINE AND THE SIZE OF THE WIKIMEDIA DUMP FILE.') |
431 | 438 | parser_all.set_defaults(func=all_launcher) |
432 | 439 | parser_all.add_argument('-e', '--except', action='store', |
— | — | @@ -477,7 +484,7 @@ |
478 | 485 | parser.add_argument('-d', '--datasets', action='store', |
479 | 486 | choices=datasets.keys(), |
480 | 487 | help='Indicate what type of data should be exported.', |
481 | | - default=datasets['cohort']) |
| 488 | + default=datasets['backward']) |
482 | 489 | |
483 | 490 | parser.add_argument('-prog', '--progress', action='store_true', default=True, |
484 | 491 | help='Indicate whether you want to have a progressbar.') |
Index: trunk/tools/editor_trends/analyses/aggregates.py |
— | — | @@ -1,3 +1,17 @@ |
| 2 | +#!/usr/bin/python |
| 3 | +# -*- coding: utf-8 -*- |
| 4 | +''' |
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
2 | 16 | __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
3 | 17 | __author__email = 'dvanliere at gmail dot com' |
4 | 18 | __date__ = '2010-12-10' |
— | — | @@ -14,6 +28,7 @@ |
15 | 29 | from database import db |
16 | 30 | from etl import shaper |
17 | 31 | from utils import utils |
| 32 | +from utils import messages |
18 | 33 | |
19 | 34 | |
20 | 35 | class Dataset: |
— | — | @@ -79,7 +94,7 @@ |
80 | 95 | tasks = multiprocessing.JoinableQueue() |
81 | 96 | for editor in editors: |
82 | 97 | tasks.put(editor) |
83 | | - print 'The queue contains %s editors.' % tasks.qsize() |
| 98 | + print 'The queue contains %s editors.' % messages.show(tasks.qsize) |
84 | 99 | tasks.put(None) |
85 | 100 | data = new_editor_count(tasks, dbname, collection, month=7) |
86 | 101 | keys = data.keys() |
— | — | @@ -95,7 +110,7 @@ |
96 | 111 | tasks = multiprocessing.JoinableQueue() |
97 | 112 | for editor in editors: |
98 | 113 | tasks.put(editor) |
99 | | - print 'The queue contains %s editors.' % tasks.qsize() |
| 114 | + print 'The queue contains %s editors.' % messages.show(tasks.qsize) |
100 | 115 | tasks.put(None) |
101 | 116 | data = active_editor_count(tasks, dbname, collection, month=7) |
102 | 117 | keys = data.keys() |
Index: trunk/tools/editor_trends/analyses/cohort_confidence_intervals.py |
— | — | @@ -7,8 +7,9 @@ |
8 | 8 | sys.path.append('..') |
9 | 9 | |
10 | 10 | import configuration |
11 | | -settings = configuration.Settings() |
| 11 | +settings = configuration.Settings() |
12 | 12 | from utils import utils |
| 13 | +from utils import messages |
13 | 14 | from database import db |
14 | 15 | |
15 | 16 | |
— | — | @@ -23,7 +24,7 @@ |
24 | 25 | # while True: |
25 | 26 | # try: |
26 | 27 | # id = input_queue.get(block=False) |
27 | | -# print input_queue.qsize() |
| 28 | +# print messages.show(input_queue.qsize) |
28 | 29 | # obs = editors.find_one({'editor': id}) |
29 | 30 | # obs = expand_observations(obs, vars_to_expand) |
30 | 31 | # if x == 0: |
— | — | @@ -45,5 +46,4 @@ |
46 | 47 | |
47 | 48 | |
48 | 49 | if __name__ == '__main__': |
49 | | - |
50 | | - |
\ No newline at end of file |
| 50 | + |
Index: trunk/tools/editor_trends/etl/exporter.py |
— | — | @@ -30,7 +30,9 @@ |
31 | 31 | sys.path.append('..') |
32 | 32 | import configuration |
33 | 33 | settings = configuration.Settings() |
34 | | -from utils import models, utils |
| 34 | +from utils import utils |
| 35 | +from utils import messages |
| 36 | +from utils import models |
35 | 37 | from database import db |
36 | 38 | from etl import shaper |
37 | 39 | from analyses import cohort_charts |
— | — | @@ -199,54 +201,54 @@ |
200 | 202 | return windows |
201 | 203 | |
202 | 204 | |
203 | | -def generate_cohort_dataset_old(tasks, dbname, collection, **kwargs): |
204 | | - mongo = db.init_mongo_db(dbname) |
205 | | - editors = mongo[collection + '_dataset'] |
206 | | - windows = create_windows() |
207 | | - data = shaper.create_datacontainer('dict') |
208 | | - data = shaper.add_windows_to_datacontainer(data, windows) |
| 205 | +#def generate_cohort_dataset_old(tasks, dbname, collection, **kwargs): |
| 206 | +# mongo = db.init_mongo_db(dbname) |
| 207 | +# editors = mongo[collection + '_dataset'] |
| 208 | +# windows = create_windows() |
| 209 | +# data = shaper.create_datacontainer('dict') |
| 210 | +# data = shaper.add_windows_to_datacontainer(data, windows) |
| 211 | +# |
| 212 | +# while True: |
| 213 | +# id = tasks.get(block=False) |
| 214 | +# tasks.task_done() |
| 215 | +# if id == None: |
| 216 | +# break |
| 217 | +# obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1}) |
| 218 | +# |
| 219 | +# first_edit = obs['first_edit'] |
| 220 | +# last_edit = obs['final_edit'] |
| 221 | +# editor_dt = relativedelta(last_edit, first_edit) |
| 222 | +# editor_dt = (editor_dt.years * 12) + editor_dt.months |
| 223 | +# edits = [] |
| 224 | +# for year in xrange(2001, datetime.datetime.now().year + 1): |
| 225 | +# if first_edit.year > year or last_edit.year < year: |
| 226 | +# continue |
| 227 | +# window_end = datetime.datetime(year, 12, 31) |
| 228 | +# for window in windows: |
| 229 | +# window_start = window_end - relativedelta(months=window) |
| 230 | +# if window_start < datetime.datetime(2001, 1, 1): |
| 231 | +# window_start = datetime.datetime(2001, 1, 1) |
| 232 | +# |
| 233 | +# if editor_dt > 11: |
| 234 | +# if date_falls_in_window(window_start, window_end, first_edit): |
| 235 | +# edits.append(window) |
| 236 | +# elif window > editor_dt: |
| 237 | +# data[year][window] += 1 |
| 238 | +# break |
| 239 | +# |
| 240 | +# if edits != []: |
| 241 | +# w = min(edits) |
| 242 | +# data[year][w] += 1 |
| 243 | +# edits = [] |
| 244 | +# |
| 245 | +# |
| 246 | +# print 'Storing data as %s' % os.path.join(settings.binary_location, dbname + '_cohort_data.bin') |
| 247 | +# utils.store_object(data, settings.binary_location, dbname + '_cohort_data.bin') |
| 248 | +# cohort_charts.prepare_cohort_dataset(dbname) |
209 | 249 | |
210 | | - while True: |
211 | | - id = tasks.get(block=False) |
212 | | - tasks.task_done() |
213 | | - if id == None: |
214 | | - break |
215 | | - obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1}) |
216 | 250 | |
217 | | - first_edit = obs['first_edit'] |
218 | | - last_edit = obs['final_edit'] |
219 | | - editor_dt = relativedelta(last_edit, first_edit) |
220 | | - editor_dt = (editor_dt.years * 12) + editor_dt.months |
221 | | - edits = [] |
222 | | - for year in xrange(2001, datetime.datetime.now().year + 1): |
223 | | - if first_edit.year > year or last_edit.year < year: |
224 | | - continue |
225 | | - window_end = datetime.datetime(year, 12, 31) |
226 | | - for window in windows: |
227 | | - window_start = window_end - relativedelta(months=window) |
228 | | - if window_start < datetime.datetime(2001, 1, 1): |
229 | | - window_start = datetime.datetime(2001, 1, 1) |
230 | 251 | |
231 | | - if editor_dt > 11: |
232 | | - if date_falls_in_window(window_start, window_end, first_edit): |
233 | | - edits.append(window) |
234 | | - elif window > editor_dt: |
235 | | - data[year][window] += 1 |
236 | | - break |
237 | 252 | |
238 | | - if edits != []: |
239 | | - w = min(edits) |
240 | | - data[year][w] += 1 |
241 | | - edits = [] |
242 | | - |
243 | | - |
244 | | - print 'Storing data as %s' % os.path.join(settings.binary_location, dbname + '_cohort_data.bin') |
245 | | - utils.store_object(data, settings.binary_location, dbname + '_cohort_data.bin') |
246 | | - cohort_charts.prepare_cohort_dataset(dbname) |
247 | | - |
248 | | - |
249 | | - |
250 | | - |
251 | 253 | def generate_cohort_dataset_forward(tasks, dbname, collection, **kwargs): |
252 | 254 | mongo = db.init_mongo_db(dbname) |
253 | 255 | editors = mongo[collection + '_dataset'] |
— | — | @@ -353,7 +355,7 @@ |
354 | 356 | id = input_queue.get(block=False) |
355 | 357 | if id == None: |
356 | 358 | break |
357 | | - print input_queue.qsize() |
| 359 | + print messages.show(input_queue.qsize) |
358 | 360 | obs = editors.find_one({'editor': id}) |
359 | 361 | obs = expand_observations(obs, vars_to_expand) |
360 | 362 | if x == 0: |
— | — | @@ -382,7 +384,7 @@ |
383 | 385 | #consumers = [multiprocessing.Process(target=target, args=(tasks, dbname, collection)) for i in xrange(settings.number_of_processes)] |
384 | 386 | for editor in editors: |
385 | 387 | tasks.put(editor) |
386 | | - print 'The queue contains %s editors.' % tasks.qsize() |
| 388 | + print 'The queue contains %s editors.' % messages.show(tasks.qsize) |
387 | 389 | tasks.put(None) |
388 | 390 | target(tasks, dbname, collection) |
389 | 391 | |
Index: trunk/tools/editor_trends/etl/store.py |
— | — | @@ -25,6 +25,7 @@ |
26 | 26 | import configuration |
27 | 27 | settings = configuration.Settings() |
28 | 28 | from utils import utils |
| 29 | +from utils import messages |
29 | 30 | from database import cache |
30 | 31 | from database import db |
31 | 32 | |
— | — | @@ -35,8 +36,12 @@ |
36 | 37 | edits = 0 |
37 | 38 | while True: |
38 | 39 | file = tasks.get(block=False) |
| 40 | + tasks.task_done() |
39 | 41 | if file == None: |
| 42 | + print 'Swallowing a poison pill.' |
40 | 43 | break |
| 44 | + print '%s files left in the queue.' % messages.show(tasks.qsize) |
| 45 | + |
41 | 46 | fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding) |
42 | 47 | for line in utils.readline(fh): |
43 | 48 | if len(line) == 0: |
— | — | @@ -46,7 +51,7 @@ |
47 | 52 | if prev_contributor != contributor: |
48 | 53 | if edits > 9: |
49 | 54 | editor_cache.add(prev_contributor, 'NEXT') |
50 | | - print 'Stored %s' % prev_contributor |
| 55 | + #print 'Stored %s' % prev_contributor |
51 | 56 | else: |
52 | 57 | editor_cache.clear(prev_contributor) |
53 | 58 | edits = 0 |
Index: trunk/tools/editor_trends/etl/chunker.py |
— | — | @@ -259,6 +259,6 @@ |
260 | 260 | 'file': settings.input_filename, |
261 | 261 | 'project':'wiki', |
262 | 262 | 'language_code':'en', |
263 | | - 'format': 'tsv' |
| 263 | + 'format': 'csv' |
264 | 264 | } |
265 | 265 | split_file(**kwargs) |
Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -44,7 +44,7 @@ |
45 | 45 | |
46 | 46 | |
47 | 47 | def remove_numeric_character_references(text): |
48 | | - return re.sub(RE_NUMERIC_CHARACTER, lenient_deccharref, text).encode('utf-8') |
| 48 | + return re.sub(RE_NUMERIC_CHARACTER, lenient_deccharref, text).encode(settings.encoding) |
49 | 49 | |
50 | 50 | |
51 | 51 | def lenient_deccharref(m): |
— | — | @@ -242,9 +242,9 @@ |
243 | 243 | |
244 | 244 | location = os.path.join(settings.input_location, language_code, project) |
245 | 245 | output = os.path.join(settings.input_location, language_code, project, 'txt') |
246 | | - filehandles = [utils.create_txt_filehandle(output, '%s.csv' % file, 'a', settings.encoding) for file in xrange(500)] |
| 246 | + filehandles = [utils.create_txt_filehandle(output, '%s.csv' % file, 'a', settings.encoding) for file in xrange(settings.max_filehandles)] |
247 | 247 | |
248 | | - fh = utils.create_txt_filehandle(location, 'enwiki-latest-stub-meta-history.xml', 'r', settings.encoding) |
| 248 | + fh = utils.create_txt_filehandle(location, '%s%s-latest-stub-meta-history.xml' % (language_code, project), 'r', settings.encoding) |
249 | 249 | total_pages, processed_pages = 0.0, 0.0 |
250 | 250 | for page in wikitree.parser.read_input(fh): |
251 | 251 | title = page.find('title') |
— | — | @@ -290,13 +290,13 @@ |
291 | 291 | def hash(id): |
292 | 292 | ''' |
293 | 293 | A very simple hash function based on modulo. The except clause has been |
294 | | - addde because there are instances where the username is stored in userid |
| 294 | + added because there are instances where the username is stored in userid |
295 | 295 | tag and hence that's a string and not an integer. |
296 | 296 | ''' |
297 | 297 | try: |
298 | | - return int(id) % 500 |
| 298 | + return int(id) % settings.max_filehandles |
299 | 299 | except: |
300 | | - return sum([ord(i) for i in id]) % 500 |
| 300 | + return sum([ord(i) for i in id]) % settings.max_filehandles |
301 | 301 | |
302 | 302 | if __name__ == '__main__': |
303 | 303 | project = 'wiki' |
Index: trunk/tools/editor_trends/etl/transformer.py |
— | — | @@ -27,9 +27,10 @@ |
28 | 28 | import configuration |
29 | 29 | settings = configuration.Settings() |
30 | 30 | from database import db |
31 | | -from utils import process_constructor as pc |
| 31 | +#from utils import process_constructor as pc |
32 | 32 | from utils import utils |
33 | 33 | from utils import models |
| 34 | +from utils import messages |
34 | 35 | import exporter |
35 | 36 | import shaper |
36 | 37 | |
— | — | @@ -46,7 +47,7 @@ |
47 | 48 | while True: |
48 | 49 | new_editor = self.task_queue.get() |
49 | 50 | self.task_queue.task_done() |
50 | | - print '%s editors to go...' % self.task_queue.qsize() |
| 51 | + print '%s editors to go...' % messages.show(self.task_queue.qsize) |
51 | 52 | if new_editor == None: |
52 | 53 | break |
53 | 54 | new_editor() |
— | — | @@ -96,7 +97,7 @@ |
97 | 98 | 'monthly_edits': monthly_edits, |
98 | 99 | 'last_edit_by_year': last_edit_by_year, |
99 | 100 | 'username': username |
100 | | - }) |
| 101 | + }, safe=True) |
101 | 102 | |
102 | 103 | |
103 | 104 | def determine_last_edit_by_year(edits): |
— | — | @@ -162,7 +163,7 @@ |
163 | 164 | for x in xrange(settings.number_of_processes): |
164 | 165 | tasks.put(None) |
165 | 166 | |
166 | | - print tasks.qsize() |
| 167 | + print messages.show(tasks.qsize) |
167 | 168 | for w in consumers: |
168 | 169 | w.start() |
169 | 170 | |
Index: trunk/tools/editor_trends/etl/models.py |
— | — | @@ -27,6 +27,7 @@ |
28 | 28 | |
29 | 29 | from utils import models |
30 | 30 | from utils import utils |
| 31 | +from utils import messages |
31 | 32 | import wikitree |
32 | 33 | |
33 | 34 | class TXTFile(object): |
— | — | @@ -68,7 +69,7 @@ |
69 | 70 | if new_xmlfile == None: |
70 | 71 | print 'Swallowed a poison pill' |
71 | 72 | break |
72 | | - print 'Queue is %s files long...' % (self.task_queue.qsize() - settings.number_of_processes) |
| 73 | + print 'Queue is %s files long...' % (messages.show(self.task_queue.qsize) - settings.number_of_processes) |
73 | 74 | new_xmlfile() |
74 | 75 | |
75 | 76 | |
Index: trunk/tools/editor_trends/etl/sort.py |
— | — | @@ -31,9 +31,10 @@ |
32 | 32 | from database import db |
33 | 33 | from database import cache |
34 | 34 | from utils import utils |
35 | | -from utils import sort |
| 35 | +from utils import messages |
36 | 36 | |
37 | 37 | |
| 38 | + |
38 | 39 | #def mergesort_external_launcher(input, output): |
39 | 40 | # files = utils.retrieve_file_list(input, 'txt', mask='') |
40 | 41 | # x = 0 |
— | — | @@ -112,9 +113,9 @@ |
113 | 114 | |
114 | 115 | |
115 | 116 | def write_sorted_file(sorted_data, file, output): |
116 | | - file = file.split('.') |
117 | | - file[0] = file[0] + '_sorted' |
118 | | - file = '.'.join(file) |
| 117 | + #file = file.split('.') |
| 118 | + #file[0] = file[0] + '_sorted' |
| 119 | + #file = '.'.join(file) |
119 | 120 | fh = utils.create_txt_filehandle(output, file, 'w', settings.encoding) |
120 | 121 | utils.write_list_to_csv(sorted_data, fh) |
121 | 122 | fh.close() |
— | — | @@ -135,7 +136,7 @@ |
136 | 137 | data = [d.split('\t') for d in data] |
137 | 138 | sorted_data = mergesort(data) |
138 | 139 | write_sorted_file(sorted_data, file, output) |
139 | | - print file, tasks.qsize() |
| 140 | + print file, messages.show(tasks.qsize) |
140 | 141 | except Empty: |
141 | 142 | break |
142 | 143 | |
— | — | @@ -165,6 +166,6 @@ |
166 | 167 | output = os.path.join(settings.input_location, 'en', 'wiki', 'dbready') |
167 | 168 | dbname = 'enwiki' |
168 | 169 | collection = 'editors' |
169 | | - #mergesort_launcher(input, intermediate_output) |
| 170 | + mergesort_launcher(input, intermediate_output) |
170 | 171 | #mergesort_external_launcher(intermediate_output, output) |
171 | | - num_editors = store_editors(output, dbname, collection) |
| 172 | + #num_editors = store_editors(output, dbname, collection) |
Index: trunk/tools/editor_trends/experience/map.py |
— | — | @@ -15,13 +15,14 @@ |
16 | 16 | import configuration |
17 | 17 | settings = configuration.Settings() |
18 | 18 | |
19 | | -from etl import extract |
| 19 | + |
20 | 20 | from utils import models |
21 | | -from wikitree import xml |
| 21 | +from utils import messages |
22 | 22 | from utils import utils |
| 23 | +from etl import extract |
23 | 24 | from etl import chunker |
| 25 | +from wikitree import parser |
24 | 26 | |
25 | | - |
26 | 27 | def extract_article_talk_pages(page, output, **kwargs): |
27 | 28 | tags = {'title': xml.extract_text, |
28 | 29 | 'id': xml.extract_text, |
— | — | @@ -88,7 +89,7 @@ |
89 | 90 | for x in xrange(settings.number_of_processes): |
90 | 91 | tasks.put(None) |
91 | 92 | |
92 | | - print tasks.qsize() |
| 93 | + print messages.show(tasks.qsize) |
93 | 94 | for w in consumers: |
94 | 95 | w.start() |
95 | 96 | |
Index: trunk/tools/editor_trends/statistics/stata/cohort_charts.do |
— | — | @@ -76,7 +76,7 @@ |
77 | 77 | label var fewer_one_year_abs "Editors with less than one year experience" |
78 | 78 | label var more_one_year_abs "Editors with more than one year experience" |
79 | 79 | |
80 | | - twoway (line one_year_exp year), ylabel(0(10)100, labsize(vsmall)) ytitle(%, size(vsmall)) xtitle() xlabel(2001(1)2009, labsize(vsmall)) title(Percentage of Wikipedia editors with 1 year experience) note("Based on the `proj' project, dataset `obs' editors.", size(vsmall)) |
| 80 | + twoway (line one_year_exp year), ylabel(0(10)100, labsize(vsmall)) ytitle(%, size(vsmall)) xtitle() xlabel(2001(1)2010, labsize(vsmall)) title(Percentage of Wikipedia editors with 1 year experience) note("Based on the `proj' project, dataset `obs' editors.", size(vsmall)) |
81 | 81 | local f = "`loc'" + "`proj'" + "_line_rel_one_vs_multi_years.png" |
82 | 82 | graph export `f', replace |
83 | 83 | //subtitle(Editors are getting older and influx of new editors has stagnated) |
Index: trunk/tools/editor_trends/utils/process_constructor.py |
— | — | @@ -1,132 +0,0 @@ |
2 | | -#!/usr/bin/python |
3 | | -# -*- coding: utf-8 -*- |
4 | | -''' |
5 | | -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
6 | | -This program is free software; you can redistribute it and/or |
7 | | -modify it under the terms of the GNU General Public License version 2 |
8 | | -as published by the Free Software Foundation. |
9 | | -This program is distributed in the hope that it will be useful, |
10 | | -but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | | -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
12 | | -See the GNU General Public License for more details, at |
13 | | -http://www.fsf.org/licenses/gpl.html |
14 | | -''' |
15 | | - |
16 | | -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
17 | | -__author__email = 'dvanliere at gmail dot com' |
18 | | -__date__ = '2010-10-21' |
19 | | -__version__ = '0.1' |
20 | | - |
21 | | -from multiprocessing import Process, Queue, JoinableQueue |
22 | | -from Queue import Empty |
23 | | -import sys |
24 | | - |
25 | | - |
26 | | -sys.path.append('..') |
27 | | -import configuration |
28 | | -settings = configuration.Settings() |
29 | | -import utils |
30 | | -import models |
31 | | - |
32 | | -#3rd party dependency |
33 | | -import progressbar |
34 | | - |
35 | | - |
36 | | -def build_scaffolding(load_input_queue, main, obj, result_processor=False, result_queue=False, **kwargs): |
37 | | - ''' |
38 | | - This a generic producer/consumer process launcher. It can launch two types |
39 | | - of processes: |
40 | | - a) Processes that take a task from a queue and do their thing |
41 | | - b) Processes that take a task from a queue and put the result in the |
42 | | - result_queue. |
43 | | - If result_queue is False then a) is assumed. |
44 | | - |
45 | | - @load_input_queue is a function that is used to insert jobs into queue |
46 | | - |
47 | | - @main is the function that will process the input_queue |
48 | | - |
49 | | - @obj can be a pickled object or an enumerable variable that will be loaded |
50 | | - into the input_queue |
51 | | - |
52 | | - @result_queue, if set to True will become a true queue and will be provided |
53 | | - to main whose job it is to fill with new tasks. If False then this variable |
54 | | - is ignored. |
55 | | - |
56 | | - @result_processor, name of the function to process the @result_queue |
57 | | - |
58 | | - @kwargs is a dictionary with optional variables. Used to supply to main |
59 | | - ''' |
60 | | - |
61 | | - nr_input_processors = kwargs.pop('nr_input_processors') |
62 | | - nr_output_processors = kwargs.pop('nr_output_processors') |
63 | | - poison_pill = kwargs.get('poison_pill', True) |
64 | | - input_queues = {} |
65 | | - result_queues = {} |
66 | | - |
67 | | - assert len(obj) == nr_input_processors |
68 | | - if result_queue: |
69 | | - assert len(obj)== nr_output_processors |
70 | | - |
71 | | - for i, o in enumerate(obj): |
72 | | - input_queues[i] = load_input_queue(obj[o], poison_pill=poison_pill) |
73 | | - if result_queue: |
74 | | - result_queues[i] = JoinableQueue() |
75 | | - else: |
76 | | - result_queues[i] = False |
77 | | - |
78 | | - if settings.progressbar: |
79 | | - size = sum([input_queues[q].qsize() for q in input_queues]) |
80 | | - pbar = progressbar.ProgressBar(maxval=size).start() |
81 | | - kwargs['pbar'] = pbar |
82 | | - else: |
83 | | - pbar = False |
84 | | - |
85 | | - input_processes = [models.ProcessInputQueue(main, input_queues[i], result_queues[i], |
86 | | - **kwargs) for i in xrange(nr_input_processors)] |
87 | | - |
88 | | - for input_process in input_processes: |
89 | | - input_process.start() |
90 | | - pids = [p.pid for p in input_processes] |
91 | | - kwargs['pids'] = pids |
92 | | - |
93 | | - if result_queue: |
94 | | - result_processes = [models.ProcessResultQueue(result_processor, |
95 | | - result_queues[i], **kwargs) for i in xrange(nr_output_processors)] |
96 | | - for result_process in result_processes: |
97 | | - result_process.start(result_process.input_queue) |
98 | | - |
99 | | - for input_process in input_processes: |
100 | | - print 'Waiting for input process to finish' |
101 | | - input_process.join() |
102 | | - print 'Input process finished' |
103 | | - |
104 | | - if result_queue: |
105 | | - for result_process in result_processes: |
106 | | - print 'Waiting for result process to finish.' |
107 | | - result_process.join() |
108 | | - print 'Result process finished' |
109 | | - |
110 | | - if pbar: |
111 | | - pbar.finish() |
112 | | - print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed)) |
113 | | - |
114 | | - |
115 | | -def load_queue(obj, poison_pill=False): |
116 | | - ''' |
117 | | - @input_queue should be an instance of multiprocessing.Queue |
118 | | - |
119 | | - @obj either pickled or enumerable variable that contains the tasks |
120 | | - |
121 | | - @returns: queue with tasks |
122 | | - ''' |
123 | | - input_queue = Queue() |
124 | | - if isinstance(obj, type(list)): |
125 | | - data = utils.load_object(obj) |
126 | | - else: |
127 | | - data = obj |
128 | | - for d in data: |
129 | | - input_queue.put(d) |
130 | | - |
131 | | - if poison_pill: |
132 | | - input_queue.put(None) |
133 | | - return input_queue |
Index: trunk/tools/editor_trends/utils/utils.py |
— | — | @@ -39,6 +39,7 @@ |
40 | 40 | import configuration |
41 | 41 | settings = configuration.Settings() |
42 | 42 | import exceptions |
| 43 | +import messages |
43 | 44 | |
44 | 45 | try: |
45 | 46 | import psyco |
— | — | @@ -415,7 +416,7 @@ |
416 | 417 | ''' |
417 | 418 | Updates the progressbar by determining how much work is left in a queue |
418 | 419 | ''' |
419 | | - x = pbar.maxval - queue.qsize() |
| 420 | + x = pbar.maxval - messages.show(queue.qsize) |
420 | 421 | ''' |
421 | 422 | Currently, calling the pbar.update function gives the following error: |
422 | 423 | File "build\bdist.win32\egg\progressbar.py", line 352, in update |
Index: trunk/tools/editor_trends/utils/messages.py |
— | — | @@ -0,0 +1,32 @@ |
| 2 | +#!/usr/bin/python
|
| 3 | +# -*- coding: utf-8 -*-
|
| 4 | +'''
|
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
|
| 6 | +This program is free software; you can redistribute it and/or
|
| 7 | +modify it under the terms of the GNU General Public License version 2
|
| 8 | +as published by the Free Software Foundation.
|
| 9 | +This program is distributed in the hope that it will be useful,
|
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of
|
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
| 12 | +See the GNU General Public License for more details, at
|
| 13 | +http://www.fsf.org/licenses/gpl.html
|
| 14 | +'''
|
| 15 | +
|
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
|
| 17 | +__author__email = 'dvanliere at gmail dot com'
|
| 18 | +__date__ = '2011-01-05'
|
| 19 | +__version__ = '0.1'
|
| 20 | +
|
| 21 | +import sys
|
| 22 | +sys.path.append('..')
|
| 23 | +
|
| 24 | +import configuration
|
| 25 | +settings = configuration.Settings()
|
| 26 | +
|
| 27 | +
|
| 28 | +
|
| 29 | +def show(func):
|
| 30 | + try:
|
| 31 | + func()
|
| 32 | + except:
|
| 33 | + print 'Calling function %s caused an error, probably your platform is not supporting this function' % func
|
Index: trunk/tools/editor_trends/database/sqlite_logic.py |
— | — | @@ -1,156 +0,0 @@ |
2 | | -def retrieve_editor_ids_db(): |
3 | | - contributors = set() |
4 | | - connection = db.init_database() |
5 | | - cursor = connection.cursor() |
6 | | - if settings.PROGRESS_BAR: |
7 | | - cursor.execute('SELECT MAX(ROWID) FROM contributors') |
8 | | - for id in cursor: |
9 | | - pass |
10 | | - pbar = progressbar.ProgressBar(maxval=id[0]).start() |
11 | | - |
12 | | - cursor.execute('SELECT contributor FROM contributors WHERE bot=0') |
13 | | - |
14 | | - print 'Retrieving contributors...' |
15 | | - for x, contributor in enumerate(cursor): |
16 | | - contributors.add(contributor[0]) |
17 | | - if x % 100000 == 0: |
18 | | - pbar.update(x) |
19 | | - print 'Serializing contributors...' |
20 | | - utils.store_object(contributors, 'contributors') |
21 | | - print 'Finished serializing contributors...' |
22 | | - |
23 | | - if pbar: |
24 | | - pbar.finish() |
25 | | - print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed)) |
26 | | - |
27 | | - connection.close() |
28 | | - |
29 | | -def retrieve_edits_by_contributor(input_queue, result_queue, pbar): |
30 | | - connection = db.init_database() |
31 | | - cursor = connection.cursor() |
32 | | - |
33 | | - while True: |
34 | | - try: |
35 | | - contributor = input_queue.get(block=False) |
36 | | - if contributor == None: |
37 | | - break |
38 | | - |
39 | | - cursor.execute('SELECT contributor, timestamp, bot FROM contributors WHERE contributor=?', (contributor,)) |
40 | | - edits = {} |
41 | | - edits[contributor] = set() |
42 | | - for edit, timestamp, bot in cursor: |
43 | | - date = utils.convert_timestamp_to_date(timestamp) |
44 | | - edits[contributor].add(date) |
45 | | - #print edit, timestamp, bot |
46 | | - |
47 | | - utils.write_data_to_csv(edits, retrieve_edits_by_contributor) |
48 | | - if pbar: |
49 | | - utils.update_progressbar(pbar, input_queue) |
50 | | - |
51 | | - except Empty: |
52 | | - pass |
53 | | - |
54 | | - connection.close() |
55 | | - |
56 | | - |
57 | | -def store_data_db(data_queue, pids): |
58 | | - connection = db.init_database() |
59 | | - cursor = connection.cursor() |
60 | | - db.create_tables(cursor, db_settings.CONTRIBUTOR_TABLE) |
61 | | - empty = 0 |
62 | | - values = [] |
63 | | - while True: |
64 | | - try: |
65 | | - chunk = data_queue.get(block=False) |
66 | | - contributor = chunk['contributor'].encode(settings.encoding) |
67 | | - article = chunk['article'] |
68 | | - timestamp = chunk['timestamp'].encode(settings.encoding) |
69 | | - bot = chunk['bot'] |
70 | | - values.append((contributor, article, timestamp, bot)) |
71 | | - |
72 | | - if len(values) == 50000: |
73 | | - cursor.executemany('INSERT INTO contributors VALUES (?,?,?,?)', values) |
74 | | - connection.commit() |
75 | | - #print 'Size of queue: %s' % data_queue.qsize() |
76 | | - values = [] |
77 | | - |
78 | | - except Empty: |
79 | | - |
80 | | - if all([utils.check_if_process_is_running(pid) for pid in pids]): |
81 | | - pass |
82 | | - else: |
83 | | - break |
84 | | - connection.close() |
85 | | - |
86 | | - |
87 | | -def create_bots_db(db_name): |
88 | | - ''' |
89 | | - This function reads the csv file provided by Erik Zachte and constructs a |
90 | | - sqlite memory database. The reason for this is that I suspect I will need |
91 | | - some simple querying capabilities in the future, else a dictionary would |
92 | | - suffice. |
93 | | - ''' |
94 | | - connection = db.init_database('db_name') |
95 | | - #connection = db.init_database('data/database/bots.db') |
96 | | - cursor = connection.cursor() |
97 | | - db.create_tables(cursor, db_settings.BOT_TABLE) |
98 | | - values = [] |
99 | | - fields = [field[0] for field in db_settings.BOT_TABLE['bots']] |
100 | | - for line in utils.read_data_from_csv('data/csv/StatisticsBots.csv', settings.encoding): |
101 | | - line = line.split(',') |
102 | | - row = [] |
103 | | - for x, (field, value) in enumerate(zip(fields, line)): |
104 | | - if db_settings.BOT_TABLE['bots'][x][1] == 'INTEGER': |
105 | | - value = int(value) |
106 | | - elif db_settings.BOT_TABLE['bots'][x][1] == 'TEXT': |
107 | | - value = value.replace('/', '-') |
108 | | - #print field, value |
109 | | - row.append(value) |
110 | | - values.append(row) |
111 | | - |
112 | | - cursor.executemany('INSERT INTO bots VALUES (?,?,?,?,?,?,?,?,?,?);', values) |
113 | | - connection.commit() |
114 | | - if db_name == ':memory': |
115 | | - return cursor |
116 | | - else: |
117 | | - connection.close() |
118 | | - |
119 | | -def retrieve_botnames_without_id(cursor, language): |
120 | | - return cursor.execute('SELECT name FROM bots WHERE language=?', (language,)).fetchall() |
121 | | - |
122 | | - |
123 | | -def add_id_to_botnames(): |
124 | | - ''' |
125 | | - This is the worker function for the multi-process version of |
126 | | - lookup_username.First, the names of the bots are retrieved, then the |
127 | | - multiprocess is launched by making a call to pc.build_scaffolding. This is a |
128 | | - generic launcher that takes as input the function to load the input_queue, |
129 | | - the function that will do the main work and the objects to be put in the |
130 | | - input_queue. The launcher also accepts optional keyword arguments. |
131 | | - ''' |
132 | | - cursor = create_bots_db(':memory') |
133 | | - files = utils.retrieve_file_list(settings.input_location, 'xml') |
134 | | - |
135 | | - botnames = retrieve_botnames_without_id(cursor, 'en') |
136 | | - bots = {} |
137 | | - for botname in botnames: |
138 | | - bots[botname[0]] = 1 |
139 | | - pc.build_scaffolding(pc.load_queue, lookup_username, files, bots=bots) |
140 | | - cursor.close() |
141 | | - |
142 | | - |
143 | | -def debug_lookup_username(): |
144 | | - ''' |
145 | | - This function launches the lookup_username function but then single |
146 | | - threaded, this eases debugging. That's also the reason why the queue |
147 | | - parameters are set to None. When launching this function make sure that |
148 | | - debug=False when calling lookup_username |
149 | | - ''' |
150 | | - cursor = create_bots_db(':memory') |
151 | | - botnames = retrieve_botnames_without_id(cursor, 'en') |
152 | | - bots = {} |
153 | | - for botname in botnames: |
154 | | - bots[botname[0]] = 1 |
155 | | - |
156 | | - lookup_username('12.xml', None, None, bots, debug=True) |
157 | | - cursor.close() |
Index: trunk/tools/editor_trends/bots/bots.py |
— | — | @@ -31,8 +31,10 @@ |
32 | 32 | import wikitree |
33 | 33 | from database import db |
34 | 34 | from utils import utils |
| 35 | +from utils import messages |
35 | 36 | #from etl import extract |
36 | | -from utils import process_constructor as pc |
| 37 | +#from utils import process_constructor as pc |
| 38 | + |
37 | 39 | from etl import models |
38 | 40 | import models as botmodels |
39 | 41 | |
— | — | @@ -211,7 +213,7 @@ |
212 | 214 | if single: |
213 | 215 | while True: |
214 | 216 | try: |
215 | | - print '%s files left in the queue...' % tasks.qsize() |
| 217 | + print '%s files left in the queue...' % messages.show(tasks.qsize) |
216 | 218 | task = tasks.get(block=False) |
217 | 219 | bots = task(bots) |
218 | 220 | except Empty: |
Index: trunk/tools/editor_trends/code-snippets/sqlite/sqlite_logic.py |
— | — | @@ -0,0 +1,156 @@ |
| 2 | +def retrieve_editor_ids_db(): |
| 3 | + contributors = set() |
| 4 | + connection = db.init_database() |
| 5 | + cursor = connection.cursor() |
| 6 | + if settings.PROGRESS_BAR: |
| 7 | + cursor.execute('SELECT MAX(ROWID) FROM contributors') |
| 8 | + for id in cursor: |
| 9 | + pass |
| 10 | + pbar = progressbar.ProgressBar(maxval=id[0]).start() |
| 11 | + |
| 12 | + cursor.execute('SELECT contributor FROM contributors WHERE bot=0') |
| 13 | + |
| 14 | + print 'Retrieving contributors...' |
| 15 | + for x, contributor in enumerate(cursor): |
| 16 | + contributors.add(contributor[0]) |
| 17 | + if x % 100000 == 0: |
| 18 | + pbar.update(x) |
| 19 | + print 'Serializing contributors...' |
| 20 | + utils.store_object(contributors, 'contributors') |
| 21 | + print 'Finished serializing contributors...' |
| 22 | + |
| 23 | + if pbar: |
| 24 | + pbar.finish() |
| 25 | + print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed)) |
| 26 | + |
| 27 | + connection.close() |
| 28 | + |
| 29 | +def retrieve_edits_by_contributor(input_queue, result_queue, pbar): |
| 30 | + connection = db.init_database() |
| 31 | + cursor = connection.cursor() |
| 32 | + |
| 33 | + while True: |
| 34 | + try: |
| 35 | + contributor = input_queue.get(block=False) |
| 36 | + if contributor == None: |
| 37 | + break |
| 38 | + |
| 39 | + cursor.execute('SELECT contributor, timestamp, bot FROM contributors WHERE contributor=?', (contributor,)) |
| 40 | + edits = {} |
| 41 | + edits[contributor] = set() |
| 42 | + for edit, timestamp, bot in cursor: |
| 43 | + date = utils.convert_timestamp_to_date(timestamp) |
| 44 | + edits[contributor].add(date) |
| 45 | + #print edit, timestamp, bot |
| 46 | + |
| 47 | + utils.write_data_to_csv(edits, retrieve_edits_by_contributor) |
| 48 | + if pbar: |
| 49 | + utils.update_progressbar(pbar, input_queue) |
| 50 | + |
| 51 | + except Empty: |
| 52 | + pass |
| 53 | + |
| 54 | + connection.close() |
| 55 | + |
| 56 | + |
| 57 | +def store_data_db(data_queue, pids): |
| 58 | + connection = db.init_database() |
| 59 | + cursor = connection.cursor() |
| 60 | + db.create_tables(cursor, db_settings.CONTRIBUTOR_TABLE) |
| 61 | + empty = 0 |
| 62 | + values = [] |
| 63 | + while True: |
| 64 | + try: |
| 65 | + chunk = data_queue.get(block=False) |
| 66 | + contributor = chunk['contributor'].encode(settings.encoding) |
| 67 | + article = chunk['article'] |
| 68 | + timestamp = chunk['timestamp'].encode(settings.encoding) |
| 69 | + bot = chunk['bot'] |
| 70 | + values.append((contributor, article, timestamp, bot)) |
| 71 | + |
| 72 | + if len(values) == 50000: |
| 73 | + cursor.executemany('INSERT INTO contributors VALUES (?,?,?,?)', values) |
| 74 | + connection.commit() |
| 75 | + #print 'Size of queue: %s' % data_queue.qsize() |
| 76 | + values = [] |
| 77 | + |
| 78 | + except Empty: |
| 79 | + |
| 80 | + if all([utils.check_if_process_is_running(pid) for pid in pids]): |
| 81 | + pass |
| 82 | + else: |
| 83 | + break |
| 84 | + connection.close() |
| 85 | + |
| 86 | + |
| 87 | +def create_bots_db(db_name): |
| 88 | + ''' |
| 89 | + This function reads the csv file provided by Erik Zachte and constructs a |
| 90 | + sqlite memory database. The reason for this is that I suspect I will need |
| 91 | + some simple querying capabilities in the future, else a dictionary would |
| 92 | + suffice. |
| 93 | + ''' |
| 94 | + connection = db.init_database('db_name') |
| 95 | + #connection = db.init_database('data/database/bots.db') |
| 96 | + cursor = connection.cursor() |
| 97 | + db.create_tables(cursor, db_settings.BOT_TABLE) |
| 98 | + values = [] |
| 99 | + fields = [field[0] for field in db_settings.BOT_TABLE['bots']] |
| 100 | + for line in utils.read_data_from_csv('data/csv/StatisticsBots.csv', settings.encoding): |
| 101 | + line = line.split(',') |
| 102 | + row = [] |
| 103 | + for x, (field, value) in enumerate(zip(fields, line)): |
| 104 | + if db_settings.BOT_TABLE['bots'][x][1] == 'INTEGER': |
| 105 | + value = int(value) |
| 106 | + elif db_settings.BOT_TABLE['bots'][x][1] == 'TEXT': |
| 107 | + value = value.replace('/', '-') |
| 108 | + #print field, value |
| 109 | + row.append(value) |
| 110 | + values.append(row) |
| 111 | + |
| 112 | + cursor.executemany('INSERT INTO bots VALUES (?,?,?,?,?,?,?,?,?,?);', values) |
| 113 | + connection.commit() |
| 114 | + if db_name == ':memory': |
| 115 | + return cursor |
| 116 | + else: |
| 117 | + connection.close() |
| 118 | + |
| 119 | +def retrieve_botnames_without_id(cursor, language): |
| 120 | + return cursor.execute('SELECT name FROM bots WHERE language=?', (language,)).fetchall() |
| 121 | + |
| 122 | + |
| 123 | +def add_id_to_botnames(): |
| 124 | + ''' |
| 125 | + This is the worker function for the multi-process version of |
| 126 | + lookup_username.First, the names of the bots are retrieved, then the |
| 127 | + multiprocess is launched by making a call to pc.build_scaffolding. This is a |
| 128 | + generic launcher that takes as input the function to load the input_queue, |
| 129 | + the function that will do the main work and the objects to be put in the |
| 130 | + input_queue. The launcher also accepts optional keyword arguments. |
| 131 | + ''' |
| 132 | + cursor = create_bots_db(':memory') |
| 133 | + files = utils.retrieve_file_list(settings.input_location, 'xml') |
| 134 | + |
| 135 | + botnames = retrieve_botnames_without_id(cursor, 'en') |
| 136 | + bots = {} |
| 137 | + for botname in botnames: |
| 138 | + bots[botname[0]] = 1 |
| 139 | + pc.build_scaffolding(pc.load_queue, lookup_username, files, bots=bots) |
| 140 | + cursor.close() |
| 141 | + |
| 142 | + |
| 143 | +def debug_lookup_username(): |
| 144 | + ''' |
| 145 | + This function launches the lookup_username function but then single |
| 146 | + threaded, this eases debugging. That's also the reason why the queue |
| 147 | + parameters are set to None. When launching this function make sure that |
| 148 | + debug=False when calling lookup_username |
| 149 | + ''' |
| 150 | + cursor = create_bots_db(':memory') |
| 151 | + botnames = retrieve_botnames_without_id(cursor, 'en') |
| 152 | + bots = {} |
| 153 | + for botname in botnames: |
| 154 | + bots[botname[0]] = 1 |
| 155 | + |
| 156 | + lookup_username('12.xml', None, None, bots, debug=True) |
| 157 | + cursor.close() |
Property changes on: trunk/tools/editor_trends/code-snippets/sqlite/sqlite_logic.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 158 | + native |
Index: trunk/tools/editor_trends/code-snippets/process_constructor.py |
— | — | @@ -0,0 +1,132 @@ |
| 2 | +#!/usr/bin/python |
| 3 | +# -*- coding: utf-8 -*- |
| 4 | +''' |
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
| 17 | +__author__email = 'dvanliere at gmail dot com' |
| 18 | +__date__ = '2010-10-21' |
| 19 | +__version__ = '0.1' |
| 20 | + |
| 21 | +from multiprocessing import Process, Queue, JoinableQueue |
| 22 | +from Queue import Empty |
| 23 | +import sys |
| 24 | + |
| 25 | + |
| 26 | +sys.path.append('..') |
| 27 | +import configuration |
| 28 | +settings = configuration.Settings() |
| 29 | +import utils |
| 30 | +import models |
| 31 | + |
| 32 | +#3rd party dependency |
| 33 | +import progressbar |
| 34 | + |
| 35 | + |
| 36 | +def build_scaffolding(load_input_queue, main, obj, result_processor=False, result_queue=False, **kwargs): |
| 37 | + ''' |
| 38 | + This a generic producer/consumer process launcher. It can launch two types |
| 39 | + of processes: |
| 40 | + a) Processes that take a task from a queue and do their thing |
| 41 | + b) Processes that take a task from a queue and put the result in the |
| 42 | + result_queue. |
| 43 | + If result_queue is False then a) is assumed. |
| 44 | + |
| 45 | + @load_input_queue is a function that is used to insert jobs into queue |
| 46 | + |
| 47 | + @main is the function that will process the input_queue |
| 48 | + |
| 49 | + @obj can be a pickled object or an enumerable variable that will be loaded |
| 50 | + into the input_queue |
| 51 | + |
| 52 | + @result_queue, if set to True will become a true queue and will be provided |
| 53 | + to main whose job it is to fill with new tasks. If False then this variable |
| 54 | + is ignored. |
| 55 | + |
| 56 | + @result_processor, name of the function to process the @result_queue |
| 57 | + |
| 58 | + @kwargs is a dictionary with optional variables. Used to supply to main |
| 59 | + ''' |
| 60 | + |
| 61 | + nr_input_processors = kwargs.pop('nr_input_processors') |
| 62 | + nr_output_processors = kwargs.pop('nr_output_processors') |
| 63 | + poison_pill = kwargs.get('poison_pill', True) |
| 64 | + input_queues = {} |
| 65 | + result_queues = {} |
| 66 | + |
| 67 | + assert len(obj) == nr_input_processors |
| 68 | + if result_queue: |
| 69 | + assert len(obj)== nr_output_processors |
| 70 | + |
| 71 | + for i, o in enumerate(obj): |
| 72 | + input_queues[i] = load_input_queue(obj[o], poison_pill=poison_pill) |
| 73 | + if result_queue: |
| 74 | + result_queues[i] = JoinableQueue() |
| 75 | + else: |
| 76 | + result_queues[i] = False |
| 77 | + |
| 78 | + if settings.progressbar: |
| 79 | + size = sum([input_queues[q].qsize() for q in input_queues]) |
| 80 | + pbar = progressbar.ProgressBar(maxval=size).start() |
| 81 | + kwargs['pbar'] = pbar |
| 82 | + else: |
| 83 | + pbar = False |
| 84 | + |
| 85 | + input_processes = [models.ProcessInputQueue(main, input_queues[i], result_queues[i], |
| 86 | + **kwargs) for i in xrange(nr_input_processors)] |
| 87 | + |
| 88 | + for input_process in input_processes: |
| 89 | + input_process.start() |
| 90 | + pids = [p.pid for p in input_processes] |
| 91 | + kwargs['pids'] = pids |
| 92 | + |
| 93 | + if result_queue: |
| 94 | + result_processes = [models.ProcessResultQueue(result_processor, |
| 95 | + result_queues[i], **kwargs) for i in xrange(nr_output_processors)] |
| 96 | + for result_process in result_processes: |
| 97 | + result_process.start(result_process.input_queue) |
| 98 | + |
| 99 | + for input_process in input_processes: |
| 100 | + print 'Waiting for input process to finish' |
| 101 | + input_process.join() |
| 102 | + print 'Input process finished' |
| 103 | + |
| 104 | + if result_queue: |
| 105 | + for result_process in result_processes: |
| 106 | + print 'Waiting for result process to finish.' |
| 107 | + result_process.join() |
| 108 | + print 'Result process finished' |
| 109 | + |
| 110 | + if pbar: |
| 111 | + pbar.finish() |
| 112 | + print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed)) |
| 113 | + |
| 114 | + |
| 115 | +def load_queue(obj, poison_pill=False): |
| 116 | + ''' |
| 117 | + @input_queue should be an instance of multiprocessing.Queue |
| 118 | + |
| 119 | + @obj either pickled or enumerable variable that contains the tasks |
| 120 | + |
| 121 | + @returns: queue with tasks |
| 122 | + ''' |
| 123 | + input_queue = Queue() |
| 124 | + if isinstance(obj, type(list)): |
| 125 | + data = utils.load_object(obj) |
| 126 | + else: |
| 127 | + data = obj |
| 128 | + for d in data: |
| 129 | + input_queue.put(d) |
| 130 | + |
| 131 | + if poison_pill: |
| 132 | + input_queue.put(None) |
| 133 | + return input_queue |
Property changes on: trunk/tools/editor_trends/code-snippets/process_constructor.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 134 | + native |
Added: svn:mime-type |
2 | 135 | + text/plain |