r79664 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r79663‎ | r79664 | r79665 >
Date:20:42, 5 January 2011
Author:diederik
Status:deferred
Tags:
Comment:
* Added work around for the fact that OSX has only partial multiprocessing support, in particular you cannot query the queue size. This patch adds a work around which effective ignores queue size calls on OSX. These calls were only made for progress information so it's not too bad.
* Numerous bug fixes.
Modified paths:
  • /trunk/tools/editor_trends/analyses/aggregates.py (modified) (history)
  • /trunk/tools/editor_trends/analyses/cohort_confidence_intervals.py (modified) (history)
  • /trunk/tools/editor_trends/bots/bots.py (modified) (history)
  • /trunk/tools/editor_trends/code-snippets/process_constructor.py (added) (history)
  • /trunk/tools/editor_trends/code-snippets/sqlite (added) (history)
  • /trunk/tools/editor_trends/code-snippets/sqlite/sqlite_logic.py (added) (history)
  • /trunk/tools/editor_trends/database/sqlite_logic.py (deleted) (history)
  • /trunk/tools/editor_trends/etl/chunker.py (modified) (history)
  • /trunk/tools/editor_trends/etl/exporter.py (modified) (history)
  • /trunk/tools/editor_trends/etl/extracter.py (modified) (history)
  • /trunk/tools/editor_trends/etl/models.py (modified) (history)
  • /trunk/tools/editor_trends/etl/sort.py (modified) (history)
  • /trunk/tools/editor_trends/etl/store.py (modified) (history)
  • /trunk/tools/editor_trends/etl/transformer.py (modified) (history)
  • /trunk/tools/editor_trends/experience/map.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/statistics/stata/cohort_charts.do (modified) (history)
  • /trunk/tools/editor_trends/utils/messages.py (added) (history)
  • /trunk/tools/editor_trends/utils/process_constructor.py (deleted) (history)
  • /trunk/tools/editor_trends/utils/utils.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -40,8 +40,9 @@
4141 from utils import exceptions
4242 from database import db
4343 from etl import chunker
44 -from etl import extract
45 -from etl import loader
 44+from etl import extracter
 45+from etl import store
 46+from etl import sort
4647 from etl import transformer
4748 from etl import exporter
4849
@@ -233,7 +234,7 @@
234235 language_code = kwargs.pop('language_code')
235236 project = kwargs.pop('project')
236237 write_message_to_log(logger, args, location=location, language_code=language_code, project=project)
237 - extract.run_parse_editors(location, **kwargs)
 238+ extracter.parse_dumpfile(project, language_code, namespaces=['0'])
238239 timer.elapsed()
239240
240241
@@ -245,7 +246,7 @@
246247 output = os.path.join(location, 'sorted')
247248 final_output = os.path.join(location, 'dbready')
248249 write_message_to_log(logger, args, location=location, input=input, output=output, final_output=final_output)
249 - loader.mergesort_launcher(input, output)
 250+ sort.mergesort_launcher(input, output)
250251 #loader.mergesort_external_launcher(output, final_output)
251252 timer.elapsed()
252253
@@ -278,6 +279,9 @@
279280 timer.elapsed()
280281
281282
 283+def debug_launcher(args, logger, **kwargs):
 284+ pass
 285+
282286 def exporter_launcher(args, logger, **kwargs):
283287 print 'Start exporting dataset'
284288 timer = Timer()
@@ -319,8 +323,8 @@
320324 if clean:
321325 cleanup(logger, args, **kwargs)
322326
323 - if format != 'xml':
324 - ignore = ignore + ',extract'
 327+ #if format != 'xml':
 328+ # ignore = ignore + ',extract'
325329
326330 functions = ordered_dict.OrderedDict(((dump_downloader_launcher, 'download'),
327331 #(chunker_launcher, 'split'),
@@ -380,8 +384,8 @@
381385 def main():
382386 default_language = determine_default_language()
383387
384 - datasets = {'cohort': 'generate_cohort_dataset',
385 - 'long': 'generate_long_editor_dataset',
 388+ datasets = {'forward': 'generate_cohort_dataset_forward',
 389+ 'backward': 'generate_cohort_dataset_backward',
386390 'wide': 'generate_wide_editor_dataset',
387391 }
388392
@@ -426,6 +430,9 @@
427431 parser_dataset = subparsers.add_parser('export', help='Create a dataset from the MongoDB and write it to a csv file.')
428432 parser_dataset.set_defaults(func=exporter_launcher)
429433
 434+ parser_debug = subparsers.add_parser('debug', help='Input custom dump files for debugging purposes')
 435+ parser_debug.set_defaults(func=debug_launcher)
 436+
430437 parser_all = subparsers.add_parser('all', help='The all sub command runs the download, split, store and dataset commands.\n\nWARNING: THIS COULD TAKE DAYS DEPENDING ON THE CONFIGURATION OF YOUR MACHINE AND THE SIZE OF THE WIKIMEDIA DUMP FILE.')
431438 parser_all.set_defaults(func=all_launcher)
432439 parser_all.add_argument('-e', '--except', action='store',
@@ -477,7 +484,7 @@
478485 parser.add_argument('-d', '--datasets', action='store',
479486 choices=datasets.keys(),
480487 help='Indicate what type of data should be exported.',
481 - default=datasets['cohort'])
 488+ default=datasets['backward'])
482489
483490 parser.add_argument('-prog', '--progress', action='store_true', default=True,
484491 help='Indicate whether you want to have a progressbar.')
Index: trunk/tools/editor_trends/analyses/aggregates.py
@@ -1,3 +1,17 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
216 __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
317 __author__email = 'dvanliere at gmail dot com'
418 __date__ = '2010-12-10'
@@ -14,6 +28,7 @@
1529 from database import db
1630 from etl import shaper
1731 from utils import utils
 32+from utils import messages
1833
1934
2035 class Dataset:
@@ -79,7 +94,7 @@
8095 tasks = multiprocessing.JoinableQueue()
8196 for editor in editors:
8297 tasks.put(editor)
83 - print 'The queue contains %s editors.' % tasks.qsize()
 98+ print 'The queue contains %s editors.' % messages.show(tasks.qsize)
8499 tasks.put(None)
85100 data = new_editor_count(tasks, dbname, collection, month=7)
86101 keys = data.keys()
@@ -95,7 +110,7 @@
96111 tasks = multiprocessing.JoinableQueue()
97112 for editor in editors:
98113 tasks.put(editor)
99 - print 'The queue contains %s editors.' % tasks.qsize()
 114+ print 'The queue contains %s editors.' % messages.show(tasks.qsize)
100115 tasks.put(None)
101116 data = active_editor_count(tasks, dbname, collection, month=7)
102117 keys = data.keys()
Index: trunk/tools/editor_trends/analyses/cohort_confidence_intervals.py
@@ -7,8 +7,9 @@
88 sys.path.append('..')
99
1010 import configuration
11 -settings = configuration.Settings()
 11+settings = configuration.Settings()
1212 from utils import utils
 13+from utils import messages
1314 from database import db
1415
1516
@@ -23,7 +24,7 @@
2425 # while True:
2526 # try:
2627 # id = input_queue.get(block=False)
27 -# print input_queue.qsize()
 28+# print messages.show(input_queue.qsize)
2829 # obs = editors.find_one({'editor': id})
2930 # obs = expand_observations(obs, vars_to_expand)
3031 # if x == 0:
@@ -45,5 +46,4 @@
4647
4748
4849 if __name__ == '__main__':
49 -
50 -
\ No newline at end of file
 50+
Index: trunk/tools/editor_trends/etl/exporter.py
@@ -30,7 +30,9 @@
3131 sys.path.append('..')
3232 import configuration
3333 settings = configuration.Settings()
34 -from utils import models, utils
 34+from utils import utils
 35+from utils import messages
 36+from utils import models
3537 from database import db
3638 from etl import shaper
3739 from analyses import cohort_charts
@@ -199,54 +201,54 @@
200202 return windows
201203
202204
203 -def generate_cohort_dataset_old(tasks, dbname, collection, **kwargs):
204 - mongo = db.init_mongo_db(dbname)
205 - editors = mongo[collection + '_dataset']
206 - windows = create_windows()
207 - data = shaper.create_datacontainer('dict')
208 - data = shaper.add_windows_to_datacontainer(data, windows)
 205+#def generate_cohort_dataset_old(tasks, dbname, collection, **kwargs):
 206+# mongo = db.init_mongo_db(dbname)
 207+# editors = mongo[collection + '_dataset']
 208+# windows = create_windows()
 209+# data = shaper.create_datacontainer('dict')
 210+# data = shaper.add_windows_to_datacontainer(data, windows)
 211+#
 212+# while True:
 213+# id = tasks.get(block=False)
 214+# tasks.task_done()
 215+# if id == None:
 216+# break
 217+# obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1})
 218+#
 219+# first_edit = obs['first_edit']
 220+# last_edit = obs['final_edit']
 221+# editor_dt = relativedelta(last_edit, first_edit)
 222+# editor_dt = (editor_dt.years * 12) + editor_dt.months
 223+# edits = []
 224+# for year in xrange(2001, datetime.datetime.now().year + 1):
 225+# if first_edit.year > year or last_edit.year < year:
 226+# continue
 227+# window_end = datetime.datetime(year, 12, 31)
 228+# for window in windows:
 229+# window_start = window_end - relativedelta(months=window)
 230+# if window_start < datetime.datetime(2001, 1, 1):
 231+# window_start = datetime.datetime(2001, 1, 1)
 232+#
 233+# if editor_dt > 11:
 234+# if date_falls_in_window(window_start, window_end, first_edit):
 235+# edits.append(window)
 236+# elif window > editor_dt:
 237+# data[year][window] += 1
 238+# break
 239+#
 240+# if edits != []:
 241+# w = min(edits)
 242+# data[year][w] += 1
 243+# edits = []
 244+#
 245+#
 246+# print 'Storing data as %s' % os.path.join(settings.binary_location, dbname + '_cohort_data.bin')
 247+# utils.store_object(data, settings.binary_location, dbname + '_cohort_data.bin')
 248+# cohort_charts.prepare_cohort_dataset(dbname)
209249
210 - while True:
211 - id = tasks.get(block=False)
212 - tasks.task_done()
213 - if id == None:
214 - break
215 - obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1})
216250
217 - first_edit = obs['first_edit']
218 - last_edit = obs['final_edit']
219 - editor_dt = relativedelta(last_edit, first_edit)
220 - editor_dt = (editor_dt.years * 12) + editor_dt.months
221 - edits = []
222 - for year in xrange(2001, datetime.datetime.now().year + 1):
223 - if first_edit.year > year or last_edit.year < year:
224 - continue
225 - window_end = datetime.datetime(year, 12, 31)
226 - for window in windows:
227 - window_start = window_end - relativedelta(months=window)
228 - if window_start < datetime.datetime(2001, 1, 1):
229 - window_start = datetime.datetime(2001, 1, 1)
230251
231 - if editor_dt > 11:
232 - if date_falls_in_window(window_start, window_end, first_edit):
233 - edits.append(window)
234 - elif window > editor_dt:
235 - data[year][window] += 1
236 - break
237252
238 - if edits != []:
239 - w = min(edits)
240 - data[year][w] += 1
241 - edits = []
242 -
243 -
244 - print 'Storing data as %s' % os.path.join(settings.binary_location, dbname + '_cohort_data.bin')
245 - utils.store_object(data, settings.binary_location, dbname + '_cohort_data.bin')
246 - cohort_charts.prepare_cohort_dataset(dbname)
247 -
248 -
249 -
250 -
251253 def generate_cohort_dataset_forward(tasks, dbname, collection, **kwargs):
252254 mongo = db.init_mongo_db(dbname)
253255 editors = mongo[collection + '_dataset']
@@ -353,7 +355,7 @@
354356 id = input_queue.get(block=False)
355357 if id == None:
356358 break
357 - print input_queue.qsize()
 359+ print messages.show(input_queue.qsize)
358360 obs = editors.find_one({'editor': id})
359361 obs = expand_observations(obs, vars_to_expand)
360362 if x == 0:
@@ -382,7 +384,7 @@
383385 #consumers = [multiprocessing.Process(target=target, args=(tasks, dbname, collection)) for i in xrange(settings.number_of_processes)]
384386 for editor in editors:
385387 tasks.put(editor)
386 - print 'The queue contains %s editors.' % tasks.qsize()
 388+ print 'The queue contains %s editors.' % messages.show(tasks.qsize)
387389 tasks.put(None)
388390 target(tasks, dbname, collection)
389391
Index: trunk/tools/editor_trends/etl/store.py
@@ -25,6 +25,7 @@
2626 import configuration
2727 settings = configuration.Settings()
2828 from utils import utils
 29+from utils import messages
2930 from database import cache
3031 from database import db
3132
@@ -35,8 +36,12 @@
3637 edits = 0
3738 while True:
3839 file = tasks.get(block=False)
 40+ tasks.task_done()
3941 if file == None:
 42+ print 'Swallowing a poison pill.'
4043 break
 44+ print '%s files left in the queue.' % messages.show(tasks.qsize)
 45+
4146 fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding)
4247 for line in utils.readline(fh):
4348 if len(line) == 0:
@@ -46,7 +51,7 @@
4752 if prev_contributor != contributor:
4853 if edits > 9:
4954 editor_cache.add(prev_contributor, 'NEXT')
50 - print 'Stored %s' % prev_contributor
 55+ #print 'Stored %s' % prev_contributor
5156 else:
5257 editor_cache.clear(prev_contributor)
5358 edits = 0
Index: trunk/tools/editor_trends/etl/chunker.py
@@ -259,6 +259,6 @@
260260 'file': settings.input_filename,
261261 'project':'wiki',
262262 'language_code':'en',
263 - 'format': 'tsv'
 263+ 'format': 'csv'
264264 }
265265 split_file(**kwargs)
Index: trunk/tools/editor_trends/etl/extracter.py
@@ -44,7 +44,7 @@
4545
4646
4747 def remove_numeric_character_references(text):
48 - return re.sub(RE_NUMERIC_CHARACTER, lenient_deccharref, text).encode('utf-8')
 48+ return re.sub(RE_NUMERIC_CHARACTER, lenient_deccharref, text).encode(settings.encoding)
4949
5050
5151 def lenient_deccharref(m):
@@ -242,9 +242,9 @@
243243
244244 location = os.path.join(settings.input_location, language_code, project)
245245 output = os.path.join(settings.input_location, language_code, project, 'txt')
246 - filehandles = [utils.create_txt_filehandle(output, '%s.csv' % file, 'a', settings.encoding) for file in xrange(500)]
 246+ filehandles = [utils.create_txt_filehandle(output, '%s.csv' % file, 'a', settings.encoding) for file in xrange(settings.max_filehandles)]
247247
248 - fh = utils.create_txt_filehandle(location, 'enwiki-latest-stub-meta-history.xml', 'r', settings.encoding)
 248+ fh = utils.create_txt_filehandle(location, '%s%s-latest-stub-meta-history.xml' % (language_code, project), 'r', settings.encoding)
249249 total_pages, processed_pages = 0.0, 0.0
250250 for page in wikitree.parser.read_input(fh):
251251 title = page.find('title')
@@ -290,13 +290,13 @@
291291 def hash(id):
292292 '''
293293 A very simple hash function based on modulo. The except clause has been
294 - addde because there are instances where the username is stored in userid
 294+ added because there are instances where the username is stored in userid
295295 tag and hence that's a string and not an integer.
296296 '''
297297 try:
298 - return int(id) % 500
 298+ return int(id) % settings.max_filehandles
299299 except:
300 - return sum([ord(i) for i in id]) % 500
 300+ return sum([ord(i) for i in id]) % settings.max_filehandles
301301
302302 if __name__ == '__main__':
303303 project = 'wiki'
Index: trunk/tools/editor_trends/etl/transformer.py
@@ -27,9 +27,10 @@
2828 import configuration
2929 settings = configuration.Settings()
3030 from database import db
31 -from utils import process_constructor as pc
 31+#from utils import process_constructor as pc
3232 from utils import utils
3333 from utils import models
 34+from utils import messages
3435 import exporter
3536 import shaper
3637
@@ -46,7 +47,7 @@
4748 while True:
4849 new_editor = self.task_queue.get()
4950 self.task_queue.task_done()
50 - print '%s editors to go...' % self.task_queue.qsize()
 51+ print '%s editors to go...' % messages.show(self.task_queue.qsize)
5152 if new_editor == None:
5253 break
5354 new_editor()
@@ -96,7 +97,7 @@
9798 'monthly_edits': monthly_edits,
9899 'last_edit_by_year': last_edit_by_year,
99100 'username': username
100 - })
 101+ }, safe=True)
101102
102103
103104 def determine_last_edit_by_year(edits):
@@ -162,7 +163,7 @@
163164 for x in xrange(settings.number_of_processes):
164165 tasks.put(None)
165166
166 - print tasks.qsize()
 167+ print messages.show(tasks.qsize)
167168 for w in consumers:
168169 w.start()
169170
Index: trunk/tools/editor_trends/etl/models.py
@@ -27,6 +27,7 @@
2828
2929 from utils import models
3030 from utils import utils
 31+from utils import messages
3132 import wikitree
3233
3334 class TXTFile(object):
@@ -68,7 +69,7 @@
6970 if new_xmlfile == None:
7071 print 'Swallowed a poison pill'
7172 break
72 - print 'Queue is %s files long...' % (self.task_queue.qsize() - settings.number_of_processes)
 73+ print 'Queue is %s files long...' % (messages.show(self.task_queue.qsize) - settings.number_of_processes)
7374 new_xmlfile()
7475
7576
Index: trunk/tools/editor_trends/etl/sort.py
@@ -31,9 +31,10 @@
3232 from database import db
3333 from database import cache
3434 from utils import utils
35 -from utils import sort
 35+from utils import messages
3636
3737
 38+
3839 #def mergesort_external_launcher(input, output):
3940 # files = utils.retrieve_file_list(input, 'txt', mask='')
4041 # x = 0
@@ -112,9 +113,9 @@
113114
114115
115116 def write_sorted_file(sorted_data, file, output):
116 - file = file.split('.')
117 - file[0] = file[0] + '_sorted'
118 - file = '.'.join(file)
 117+ #file = file.split('.')
 118+ #file[0] = file[0] + '_sorted'
 119+ #file = '.'.join(file)
119120 fh = utils.create_txt_filehandle(output, file, 'w', settings.encoding)
120121 utils.write_list_to_csv(sorted_data, fh)
121122 fh.close()
@@ -135,7 +136,7 @@
136137 data = [d.split('\t') for d in data]
137138 sorted_data = mergesort(data)
138139 write_sorted_file(sorted_data, file, output)
139 - print file, tasks.qsize()
 140+ print file, messages.show(tasks.qsize)
140141 except Empty:
141142 break
142143
@@ -165,6 +166,6 @@
166167 output = os.path.join(settings.input_location, 'en', 'wiki', 'dbready')
167168 dbname = 'enwiki'
168169 collection = 'editors'
169 - #mergesort_launcher(input, intermediate_output)
 170+ mergesort_launcher(input, intermediate_output)
170171 #mergesort_external_launcher(intermediate_output, output)
171 - num_editors = store_editors(output, dbname, collection)
 172+ #num_editors = store_editors(output, dbname, collection)
Index: trunk/tools/editor_trends/experience/map.py
@@ -15,13 +15,14 @@
1616 import configuration
1717 settings = configuration.Settings()
1818
19 -from etl import extract
 19+
2020 from utils import models
21 -from wikitree import xml
 21+from utils import messages
2222 from utils import utils
 23+from etl import extract
2324 from etl import chunker
 25+from wikitree import parser
2426
25 -
2627 def extract_article_talk_pages(page, output, **kwargs):
2728 tags = {'title': xml.extract_text,
2829 'id': xml.extract_text,
@@ -88,7 +89,7 @@
8990 for x in xrange(settings.number_of_processes):
9091 tasks.put(None)
9192
92 - print tasks.qsize()
 93+ print messages.show(tasks.qsize)
9394 for w in consumers:
9495 w.start()
9596
Index: trunk/tools/editor_trends/statistics/stata/cohort_charts.do
@@ -76,7 +76,7 @@
7777 label var fewer_one_year_abs "Editors with less than one year experience"
7878 label var more_one_year_abs "Editors with more than one year experience"
7979
80 - twoway (line one_year_exp year), ylabel(0(10)100, labsize(vsmall)) ytitle(%, size(vsmall)) xtitle() xlabel(2001(1)2009, labsize(vsmall)) title(Percentage of Wikipedia editors with 1 year experience) note("Based on the `proj' project, dataset `obs' editors.", size(vsmall))
 80+ twoway (line one_year_exp year), ylabel(0(10)100, labsize(vsmall)) ytitle(%, size(vsmall)) xtitle() xlabel(2001(1)2010, labsize(vsmall)) title(Percentage of Wikipedia editors with 1 year experience) note("Based on the `proj' project, dataset `obs' editors.", size(vsmall))
8181 local f = "`loc'" + "`proj'" + "_line_rel_one_vs_multi_years.png"
8282 graph export `f', replace
8383 //subtitle(Editors are getting older and influx of new editors has stagnated)
Index: trunk/tools/editor_trends/utils/process_constructor.py
@@ -1,132 +0,0 @@
2 -#!/usr/bin/python
3 -# -*- coding: utf-8 -*-
4 -'''
5 -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
6 -This program is free software; you can redistribute it and/or
7 -modify it under the terms of the GNU General Public License version 2
8 -as published by the Free Software Foundation.
9 -This program is distributed in the hope that it will be useful,
10 -but WITHOUT ANY WARRANTY; without even the implied warranty of
11 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 -See the GNU General Public License for more details, at
13 -http://www.fsf.org/licenses/gpl.html
14 -'''
15 -
16 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__author__email = 'dvanliere at gmail dot com'
18 -__date__ = '2010-10-21'
19 -__version__ = '0.1'
20 -
21 -from multiprocessing import Process, Queue, JoinableQueue
22 -from Queue import Empty
23 -import sys
24 -
25 -
26 -sys.path.append('..')
27 -import configuration
28 -settings = configuration.Settings()
29 -import utils
30 -import models
31 -
32 -#3rd party dependency
33 -import progressbar
34 -
35 -
36 -def build_scaffolding(load_input_queue, main, obj, result_processor=False, result_queue=False, **kwargs):
37 - '''
38 - This a generic producer/consumer process launcher. It can launch two types
39 - of processes:
40 - a) Processes that take a task from a queue and do their thing
41 - b) Processes that take a task from a queue and put the result in the
42 - result_queue.
43 - If result_queue is False then a) is assumed.
44 -
45 - @load_input_queue is a function that is used to insert jobs into queue
46 -
47 - @main is the function that will process the input_queue
48 -
49 - @obj can be a pickled object or an enumerable variable that will be loaded
50 - into the input_queue
51 -
52 - @result_queue, if set to True will become a true queue and will be provided
53 - to main whose job it is to fill with new tasks. If False then this variable
54 - is ignored.
55 -
56 - @result_processor, name of the function to process the @result_queue
57 -
58 - @kwargs is a dictionary with optional variables. Used to supply to main
59 - '''
60 -
61 - nr_input_processors = kwargs.pop('nr_input_processors')
62 - nr_output_processors = kwargs.pop('nr_output_processors')
63 - poison_pill = kwargs.get('poison_pill', True)
64 - input_queues = {}
65 - result_queues = {}
66 -
67 - assert len(obj) == nr_input_processors
68 - if result_queue:
69 - assert len(obj)== nr_output_processors
70 -
71 - for i, o in enumerate(obj):
72 - input_queues[i] = load_input_queue(obj[o], poison_pill=poison_pill)
73 - if result_queue:
74 - result_queues[i] = JoinableQueue()
75 - else:
76 - result_queues[i] = False
77 -
78 - if settings.progressbar:
79 - size = sum([input_queues[q].qsize() for q in input_queues])
80 - pbar = progressbar.ProgressBar(maxval=size).start()
81 - kwargs['pbar'] = pbar
82 - else:
83 - pbar = False
84 -
85 - input_processes = [models.ProcessInputQueue(main, input_queues[i], result_queues[i],
86 - **kwargs) for i in xrange(nr_input_processors)]
87 -
88 - for input_process in input_processes:
89 - input_process.start()
90 - pids = [p.pid for p in input_processes]
91 - kwargs['pids'] = pids
92 -
93 - if result_queue:
94 - result_processes = [models.ProcessResultQueue(result_processor,
95 - result_queues[i], **kwargs) for i in xrange(nr_output_processors)]
96 - for result_process in result_processes:
97 - result_process.start(result_process.input_queue)
98 -
99 - for input_process in input_processes:
100 - print 'Waiting for input process to finish'
101 - input_process.join()
102 - print 'Input process finished'
103 -
104 - if result_queue:
105 - for result_process in result_processes:
106 - print 'Waiting for result process to finish.'
107 - result_process.join()
108 - print 'Result process finished'
109 -
110 - if pbar:
111 - pbar.finish()
112 - print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed))
113 -
114 -
115 -def load_queue(obj, poison_pill=False):
116 - '''
117 - @input_queue should be an instance of multiprocessing.Queue
118 -
119 - @obj either pickled or enumerable variable that contains the tasks
120 -
121 - @returns: queue with tasks
122 - '''
123 - input_queue = Queue()
124 - if isinstance(obj, type(list)):
125 - data = utils.load_object(obj)
126 - else:
127 - data = obj
128 - for d in data:
129 - input_queue.put(d)
130 -
131 - if poison_pill:
132 - input_queue.put(None)
133 - return input_queue
Index: trunk/tools/editor_trends/utils/utils.py
@@ -39,6 +39,7 @@
4040 import configuration
4141 settings = configuration.Settings()
4242 import exceptions
 43+import messages
4344
4445 try:
4546 import psyco
@@ -415,7 +416,7 @@
416417 '''
417418 Updates the progressbar by determining how much work is left in a queue
418419 '''
419 - x = pbar.maxval - queue.qsize()
 420+ x = pbar.maxval - messages.show(queue.qsize)
420421 '''
421422 Currently, calling the pbar.update function gives the following error:
422423 File "build\bdist.win32\egg\progressbar.py", line 352, in update
Index: trunk/tools/editor_trends/utils/messages.py
@@ -0,0 +1,32 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2011-01-05'
 19+__version__ = '0.1'
 20+
 21+import sys
 22+sys.path.append('..')
 23+
 24+import configuration
 25+settings = configuration.Settings()
 26+
 27+
 28+
 29+def show(func):
 30+ try:
 31+ func()
 32+ except:
 33+ print 'Calling function %s caused an error, probably your platform is not supporting this function' % func
Index: trunk/tools/editor_trends/database/sqlite_logic.py
@@ -1,156 +0,0 @@
2 -def retrieve_editor_ids_db():
3 - contributors = set()
4 - connection = db.init_database()
5 - cursor = connection.cursor()
6 - if settings.PROGRESS_BAR:
7 - cursor.execute('SELECT MAX(ROWID) FROM contributors')
8 - for id in cursor:
9 - pass
10 - pbar = progressbar.ProgressBar(maxval=id[0]).start()
11 -
12 - cursor.execute('SELECT contributor FROM contributors WHERE bot=0')
13 -
14 - print 'Retrieving contributors...'
15 - for x, contributor in enumerate(cursor):
16 - contributors.add(contributor[0])
17 - if x % 100000 == 0:
18 - pbar.update(x)
19 - print 'Serializing contributors...'
20 - utils.store_object(contributors, 'contributors')
21 - print 'Finished serializing contributors...'
22 -
23 - if pbar:
24 - pbar.finish()
25 - print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed))
26 -
27 - connection.close()
28 -
29 -def retrieve_edits_by_contributor(input_queue, result_queue, pbar):
30 - connection = db.init_database()
31 - cursor = connection.cursor()
32 -
33 - while True:
34 - try:
35 - contributor = input_queue.get(block=False)
36 - if contributor == None:
37 - break
38 -
39 - cursor.execute('SELECT contributor, timestamp, bot FROM contributors WHERE contributor=?', (contributor,))
40 - edits = {}
41 - edits[contributor] = set()
42 - for edit, timestamp, bot in cursor:
43 - date = utils.convert_timestamp_to_date(timestamp)
44 - edits[contributor].add(date)
45 - #print edit, timestamp, bot
46 -
47 - utils.write_data_to_csv(edits, retrieve_edits_by_contributor)
48 - if pbar:
49 - utils.update_progressbar(pbar, input_queue)
50 -
51 - except Empty:
52 - pass
53 -
54 - connection.close()
55 -
56 -
57 -def store_data_db(data_queue, pids):
58 - connection = db.init_database()
59 - cursor = connection.cursor()
60 - db.create_tables(cursor, db_settings.CONTRIBUTOR_TABLE)
61 - empty = 0
62 - values = []
63 - while True:
64 - try:
65 - chunk = data_queue.get(block=False)
66 - contributor = chunk['contributor'].encode(settings.encoding)
67 - article = chunk['article']
68 - timestamp = chunk['timestamp'].encode(settings.encoding)
69 - bot = chunk['bot']
70 - values.append((contributor, article, timestamp, bot))
71 -
72 - if len(values) == 50000:
73 - cursor.executemany('INSERT INTO contributors VALUES (?,?,?,?)', values)
74 - connection.commit()
75 - #print 'Size of queue: %s' % data_queue.qsize()
76 - values = []
77 -
78 - except Empty:
79 -
80 - if all([utils.check_if_process_is_running(pid) for pid in pids]):
81 - pass
82 - else:
83 - break
84 - connection.close()
85 -
86 -
87 -def create_bots_db(db_name):
88 - '''
89 - This function reads the csv file provided by Erik Zachte and constructs a
90 - sqlite memory database. The reason for this is that I suspect I will need
91 - some simple querying capabilities in the future, else a dictionary would
92 - suffice.
93 - '''
94 - connection = db.init_database('db_name')
95 - #connection = db.init_database('data/database/bots.db')
96 - cursor = connection.cursor()
97 - db.create_tables(cursor, db_settings.BOT_TABLE)
98 - values = []
99 - fields = [field[0] for field in db_settings.BOT_TABLE['bots']]
100 - for line in utils.read_data_from_csv('data/csv/StatisticsBots.csv', settings.encoding):
101 - line = line.split(',')
102 - row = []
103 - for x, (field, value) in enumerate(zip(fields, line)):
104 - if db_settings.BOT_TABLE['bots'][x][1] == 'INTEGER':
105 - value = int(value)
106 - elif db_settings.BOT_TABLE['bots'][x][1] == 'TEXT':
107 - value = value.replace('/', '-')
108 - #print field, value
109 - row.append(value)
110 - values.append(row)
111 -
112 - cursor.executemany('INSERT INTO bots VALUES (?,?,?,?,?,?,?,?,?,?);', values)
113 - connection.commit()
114 - if db_name == ':memory':
115 - return cursor
116 - else:
117 - connection.close()
118 -
119 -def retrieve_botnames_without_id(cursor, language):
120 - return cursor.execute('SELECT name FROM bots WHERE language=?', (language,)).fetchall()
121 -
122 -
123 -def add_id_to_botnames():
124 - '''
125 - This is the worker function for the multi-process version of
126 - lookup_username.First, the names of the bots are retrieved, then the
127 - multiprocess is launched by making a call to pc.build_scaffolding. This is a
128 - generic launcher that takes as input the function to load the input_queue,
129 - the function that will do the main work and the objects to be put in the
130 - input_queue. The launcher also accepts optional keyword arguments.
131 - '''
132 - cursor = create_bots_db(':memory')
133 - files = utils.retrieve_file_list(settings.input_location, 'xml')
134 -
135 - botnames = retrieve_botnames_without_id(cursor, 'en')
136 - bots = {}
137 - for botname in botnames:
138 - bots[botname[0]] = 1
139 - pc.build_scaffolding(pc.load_queue, lookup_username, files, bots=bots)
140 - cursor.close()
141 -
142 -
143 -def debug_lookup_username():
144 - '''
145 - This function launches the lookup_username function but then single
146 - threaded, this eases debugging. That's also the reason why the queue
147 - parameters are set to None. When launching this function make sure that
148 - debug=False when calling lookup_username
149 - '''
150 - cursor = create_bots_db(':memory')
151 - botnames = retrieve_botnames_without_id(cursor, 'en')
152 - bots = {}
153 - for botname in botnames:
154 - bots[botname[0]] = 1
155 -
156 - lookup_username('12.xml', None, None, bots, debug=True)
157 - cursor.close()
Index: trunk/tools/editor_trends/bots/bots.py
@@ -31,8 +31,10 @@
3232 import wikitree
3333 from database import db
3434 from utils import utils
 35+from utils import messages
3536 #from etl import extract
36 -from utils import process_constructor as pc
 37+#from utils import process_constructor as pc
 38+
3739 from etl import models
3840 import models as botmodels
3941
@@ -211,7 +213,7 @@
212214 if single:
213215 while True:
214216 try:
215 - print '%s files left in the queue...' % tasks.qsize()
 217+ print '%s files left in the queue...' % messages.show(tasks.qsize)
216218 task = tasks.get(block=False)
217219 bots = task(bots)
218220 except Empty:
Index: trunk/tools/editor_trends/code-snippets/sqlite/sqlite_logic.py
@@ -0,0 +1,156 @@
 2+def retrieve_editor_ids_db():
 3+ contributors = set()
 4+ connection = db.init_database()
 5+ cursor = connection.cursor()
 6+ if settings.PROGRESS_BAR:
 7+ cursor.execute('SELECT MAX(ROWID) FROM contributors')
 8+ for id in cursor:
 9+ pass
 10+ pbar = progressbar.ProgressBar(maxval=id[0]).start()
 11+
 12+ cursor.execute('SELECT contributor FROM contributors WHERE bot=0')
 13+
 14+ print 'Retrieving contributors...'
 15+ for x, contributor in enumerate(cursor):
 16+ contributors.add(contributor[0])
 17+ if x % 100000 == 0:
 18+ pbar.update(x)
 19+ print 'Serializing contributors...'
 20+ utils.store_object(contributors, 'contributors')
 21+ print 'Finished serializing contributors...'
 22+
 23+ if pbar:
 24+ pbar.finish()
 25+ print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed))
 26+
 27+ connection.close()
 28+
 29+def retrieve_edits_by_contributor(input_queue, result_queue, pbar):
 30+ connection = db.init_database()
 31+ cursor = connection.cursor()
 32+
 33+ while True:
 34+ try:
 35+ contributor = input_queue.get(block=False)
 36+ if contributor == None:
 37+ break
 38+
 39+ cursor.execute('SELECT contributor, timestamp, bot FROM contributors WHERE contributor=?', (contributor,))
 40+ edits = {}
 41+ edits[contributor] = set()
 42+ for edit, timestamp, bot in cursor:
 43+ date = utils.convert_timestamp_to_date(timestamp)
 44+ edits[contributor].add(date)
 45+ #print edit, timestamp, bot
 46+
 47+ utils.write_data_to_csv(edits, retrieve_edits_by_contributor)
 48+ if pbar:
 49+ utils.update_progressbar(pbar, input_queue)
 50+
 51+ except Empty:
 52+ pass
 53+
 54+ connection.close()
 55+
 56+
 57+def store_data_db(data_queue, pids):
 58+ connection = db.init_database()
 59+ cursor = connection.cursor()
 60+ db.create_tables(cursor, db_settings.CONTRIBUTOR_TABLE)
 61+ empty = 0
 62+ values = []
 63+ while True:
 64+ try:
 65+ chunk = data_queue.get(block=False)
 66+ contributor = chunk['contributor'].encode(settings.encoding)
 67+ article = chunk['article']
 68+ timestamp = chunk['timestamp'].encode(settings.encoding)
 69+ bot = chunk['bot']
 70+ values.append((contributor, article, timestamp, bot))
 71+
 72+ if len(values) == 50000:
 73+ cursor.executemany('INSERT INTO contributors VALUES (?,?,?,?)', values)
 74+ connection.commit()
 75+ #print 'Size of queue: %s' % data_queue.qsize()
 76+ values = []
 77+
 78+ except Empty:
 79+
 80+ if all([utils.check_if_process_is_running(pid) for pid in pids]):
 81+ pass
 82+ else:
 83+ break
 84+ connection.close()
 85+
 86+
 87+def create_bots_db(db_name):
 88+ '''
 89+ This function reads the csv file provided by Erik Zachte and constructs a
 90+ sqlite memory database. The reason for this is that I suspect I will need
 91+ some simple querying capabilities in the future, else a dictionary would
 92+ suffice.
 93+ '''
 94+ connection = db.init_database('db_name')
 95+ #connection = db.init_database('data/database/bots.db')
 96+ cursor = connection.cursor()
 97+ db.create_tables(cursor, db_settings.BOT_TABLE)
 98+ values = []
 99+ fields = [field[0] for field in db_settings.BOT_TABLE['bots']]
 100+ for line in utils.read_data_from_csv('data/csv/StatisticsBots.csv', settings.encoding):
 101+ line = line.split(',')
 102+ row = []
 103+ for x, (field, value) in enumerate(zip(fields, line)):
 104+ if db_settings.BOT_TABLE['bots'][x][1] == 'INTEGER':
 105+ value = int(value)
 106+ elif db_settings.BOT_TABLE['bots'][x][1] == 'TEXT':
 107+ value = value.replace('/', '-')
 108+ #print field, value
 109+ row.append(value)
 110+ values.append(row)
 111+
 112+ cursor.executemany('INSERT INTO bots VALUES (?,?,?,?,?,?,?,?,?,?);', values)
 113+ connection.commit()
 114+ if db_name == ':memory':
 115+ return cursor
 116+ else:
 117+ connection.close()
 118+
 119+def retrieve_botnames_without_id(cursor, language):
 120+ return cursor.execute('SELECT name FROM bots WHERE language=?', (language,)).fetchall()
 121+
 122+
 123+def add_id_to_botnames():
 124+ '''
 125+ This is the worker function for the multi-process version of
 126+ lookup_username.First, the names of the bots are retrieved, then the
 127+ multiprocess is launched by making a call to pc.build_scaffolding. This is a
 128+ generic launcher that takes as input the function to load the input_queue,
 129+ the function that will do the main work and the objects to be put in the
 130+ input_queue. The launcher also accepts optional keyword arguments.
 131+ '''
 132+ cursor = create_bots_db(':memory')
 133+ files = utils.retrieve_file_list(settings.input_location, 'xml')
 134+
 135+ botnames = retrieve_botnames_without_id(cursor, 'en')
 136+ bots = {}
 137+ for botname in botnames:
 138+ bots[botname[0]] = 1
 139+ pc.build_scaffolding(pc.load_queue, lookup_username, files, bots=bots)
 140+ cursor.close()
 141+
 142+
 143+def debug_lookup_username():
 144+ '''
 145+ This function launches the lookup_username function but then single
 146+ threaded, this eases debugging. That's also the reason why the queue
 147+ parameters are set to None. When launching this function make sure that
 148+ debug=False when calling lookup_username
 149+ '''
 150+ cursor = create_bots_db(':memory')
 151+ botnames = retrieve_botnames_without_id(cursor, 'en')
 152+ bots = {}
 153+ for botname in botnames:
 154+ bots[botname[0]] = 1
 155+
 156+ lookup_username('12.xml', None, None, bots, debug=True)
 157+ cursor.close()
Property changes on: trunk/tools/editor_trends/code-snippets/sqlite/sqlite_logic.py
___________________________________________________________________
Added: svn:eol-style
1158 + native
Index: trunk/tools/editor_trends/code-snippets/process_constructor.py
@@ -0,0 +1,132 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2010-10-21'
 19+__version__ = '0.1'
 20+
 21+from multiprocessing import Process, Queue, JoinableQueue
 22+from Queue import Empty
 23+import sys
 24+
 25+
 26+sys.path.append('..')
 27+import configuration
 28+settings = configuration.Settings()
 29+import utils
 30+import models
 31+
 32+#3rd party dependency
 33+import progressbar
 34+
 35+
 36+def build_scaffolding(load_input_queue, main, obj, result_processor=False, result_queue=False, **kwargs):
 37+ '''
 38+ This a generic producer/consumer process launcher. It can launch two types
 39+ of processes:
 40+ a) Processes that take a task from a queue and do their thing
 41+ b) Processes that take a task from a queue and put the result in the
 42+ result_queue.
 43+ If result_queue is False then a) is assumed.
 44+
 45+ @load_input_queue is a function that is used to insert jobs into queue
 46+
 47+ @main is the function that will process the input_queue
 48+
 49+ @obj can be a pickled object or an enumerable variable that will be loaded
 50+ into the input_queue
 51+
 52+ @result_queue, if set to True will become a true queue and will be provided
 53+ to main whose job it is to fill with new tasks. If False then this variable
 54+ is ignored.
 55+
 56+ @result_processor, name of the function to process the @result_queue
 57+
 58+ @kwargs is a dictionary with optional variables. Used to supply to main
 59+ '''
 60+
 61+ nr_input_processors = kwargs.pop('nr_input_processors')
 62+ nr_output_processors = kwargs.pop('nr_output_processors')
 63+ poison_pill = kwargs.get('poison_pill', True)
 64+ input_queues = {}
 65+ result_queues = {}
 66+
 67+ assert len(obj) == nr_input_processors
 68+ if result_queue:
 69+ assert len(obj)== nr_output_processors
 70+
 71+ for i, o in enumerate(obj):
 72+ input_queues[i] = load_input_queue(obj[o], poison_pill=poison_pill)
 73+ if result_queue:
 74+ result_queues[i] = JoinableQueue()
 75+ else:
 76+ result_queues[i] = False
 77+
 78+ if settings.progressbar:
 79+ size = sum([input_queues[q].qsize() for q in input_queues])
 80+ pbar = progressbar.ProgressBar(maxval=size).start()
 81+ kwargs['pbar'] = pbar
 82+ else:
 83+ pbar = False
 84+
 85+ input_processes = [models.ProcessInputQueue(main, input_queues[i], result_queues[i],
 86+ **kwargs) for i in xrange(nr_input_processors)]
 87+
 88+ for input_process in input_processes:
 89+ input_process.start()
 90+ pids = [p.pid for p in input_processes]
 91+ kwargs['pids'] = pids
 92+
 93+ if result_queue:
 94+ result_processes = [models.ProcessResultQueue(result_processor,
 95+ result_queues[i], **kwargs) for i in xrange(nr_output_processors)]
 96+ for result_process in result_processes:
 97+ result_process.start(result_process.input_queue)
 98+
 99+ for input_process in input_processes:
 100+ print 'Waiting for input process to finish'
 101+ input_process.join()
 102+ print 'Input process finished'
 103+
 104+ if result_queue:
 105+ for result_process in result_processes:
 106+ print 'Waiting for result process to finish.'
 107+ result_process.join()
 108+ print 'Result process finished'
 109+
 110+ if pbar:
 111+ pbar.finish()
 112+ print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed))
 113+
 114+
 115+def load_queue(obj, poison_pill=False):
 116+ '''
 117+ @input_queue should be an instance of multiprocessing.Queue
 118+
 119+ @obj either pickled or enumerable variable that contains the tasks
 120+
 121+ @returns: queue with tasks
 122+ '''
 123+ input_queue = Queue()
 124+ if isinstance(obj, type(list)):
 125+ data = utils.load_object(obj)
 126+ else:
 127+ data = obj
 128+ for d in data:
 129+ input_queue.put(d)
 130+
 131+ if poison_pill:
 132+ input_queue.put(None)
 133+ return input_queue
Property changes on: trunk/tools/editor_trends/code-snippets/process_constructor.py
___________________________________________________________________
Added: svn:eol-style
1134 + native
Added: svn:mime-type
2135 + text/plain

Status & tagging log