r77110 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r77109‎ | r77110 | r77111 >
Date:17:51, 22 November 2010
Author:diederik
Status:deferred
Tags:
Comment:
Preparing for beta release.
1) Continued renaming exercise
2) Rewrote modules using objects.
Modified paths:
  • /trunk/tools/editor_trends/config.py (modified) (history)
  • /trunk/tools/editor_trends/configuration.py (modified) (history)
  • /trunk/tools/editor_trends/database/cache.py (modified) (history)
  • /trunk/tools/editor_trends/database/sqlite_logic.py (added) (history)
  • /trunk/tools/editor_trends/etl/chunker.py (modified) (history)
  • /trunk/tools/editor_trends/etl/construct_datasets.py (deleted) (history)
  • /trunk/tools/editor_trends/etl/exporter.py (added) (history)
  • /trunk/tools/editor_trends/etl/extract.py (modified) (history)
  • /trunk/tools/editor_trends/etl/loader.py (modified) (history)
  • /trunk/tools/editor_trends/etl/optimize_editors.py (deleted) (history)
  • /trunk/tools/editor_trends/etl/store.py (added) (history)
  • /trunk/tools/editor_trends/etl/transformer.py (added) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/utils/models.py (modified) (history)
  • /trunk/tools/editor_trends/utils/process_constructor.py (modified) (history)
  • /trunk/tools/editor_trends/utils/sort.py (modified) (history)
  • /trunk/tools/editor_trends/utils/utils.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -20,6 +20,7 @@
2121 import os
2222 import sys
2323 import subprocess
 24+import datetime
2425 from argparse import ArgumentParser
2526 from argparse import RawTextHelpFormatter
2627 import locale
@@ -34,17 +35,30 @@
3536 from utils import dump_downloader
3637 from etl import chunker
3738 from etl import extract
38 -from etl import optimize_editors
39 -from etl import construct_datasets
 39+from etl import loader
 40+from etl import transformer
 41+from etl import exporter
4042 import config
4143
4244
 45+class Timer(object):
 46+ def __init__(self):
 47+ self.t0 = datetime.datetime.now()
 48+
 49+ def stop(self):
 50+ self.t1 = datetime.datetime.now()
 51+
 52+ def elapsed(self):
 53+ self.stop()
 54+ print 'Processing time: %s' % (self.t1 - self.t0)
 55+
 56+
4357 def get_value(args, key):
4458 return getattr(args, key, None)
4559
4660
47 -def config_launcher(args, location, filename, project, full_project, language_code, language):
48 - config.load_configuration(args)
 61+def config_launcher(args, **kwargs):
 62+ settings.load_configuration()
4963
5064
5165 def determine_default_language():
@@ -99,10 +113,11 @@
100114 return locations
101115
102116
103 -
104 -
105 -def show_settings(args, location, filename, project, full_project, language_code, language):
106 - project = settings.projects.get(project, 'wiki')
 117+def show_settings(args, **kwargs):
 118+ project = settings.projects.get(kwargs.pop('project'), 'wiki')
 119+ language_code = kwargs.pop('language_code')
 120+ language = kwargs.pop('language')
 121+ location = kwargs.pop('location')
107122 project = project.title()
108123 language_map = utils.invert_dict(languages.MAPPING)
109124 print 'Project: %s' % (project)
@@ -111,18 +126,30 @@
112127 print 'Output directory: %s and subdirectories' % location
113128
114129
115 -def dump_downloader_launcher(args, location, filename, project, full_project, language_code, language):
 130+def dump_downloader_launcher(args, **kwargs):
116131 print 'dump downloader'
 132+ timer = Timer()
 133+ filename = kwargs.get('filename')
 134+ extension = kwargs.get('extension')
 135+ location = kwargs.get('location')
117136 pbar = get_value(args, 'progress')
118137 domain = settings.wp_dump_location
119138 path = '/%s/latest/' % project
120139 extension = utils.determine_file_extension(filename)
121140 filemode = utils.determine_file_mode(extension)
122141 dump_downloader.download_wiki_file(domain, path, filename, location, filemode, pbar)
 142+ timer.elapsed()
123143
124144
125 -def cruncher_launcher(args, location, filename, project, full_project, language_code, language):
 145+def chunker_launcher(args, **kwargs):
126146 print 'split_settings.input_filename_launcher'
 147+ timer = Timer()
 148+ filename = kwargs.pop('filename')
 149+ filename = 'en-latest-pages-meta-history.xml.bz2'
 150+ location = kwargs.pop('location')
 151+ project = kwargs.pop('project')
 152+ language = kwargs.pop('language')
 153+ language_code = kwargs.pop('language_code')
127154 ext = utils.determine_file_extension(filename)
128155 if ext in settings.compression_extensions:
129156 ext = '.%s' % ext
@@ -135,9 +162,12 @@
136163 if retcode != 0:
137164 sys.exit(retcode)
138165 chunker.split_file(location, file, project, language_code, language)
 166+ timer.elapsed()
 167+ #settings.set_custom_settings(xml_namespace='http://www.mediawiki.org/xml/export-0.3/')
139168
140169
141170 def launch_zip_extractor(args, location, file):
 171+ timer = Timer()
142172 path = settings.detect_installed_program('7zip')
143173 source = os.path.join(location, file)
144174 p = None
@@ -150,30 +180,57 @@
151181 raise NotImplementedError
152182 else:
153183 raise exceptions.PlatformNotSupportedError
 184+ timer.elapsed()
154185 return p
155186
156187
157 -def mongodb_script_launcher(args, location, filename, project, full_project, language_code, language):
 188+def extract_launcher(args, **kwargs):
158189 print 'mongodb_script_launcher'
159 - extract.run_parse_editors(project, language_code, location)
 190+ timer = Timer()
 191+ location = kwargs.pop('location')
 192+ language_code = kwargs.pop('language_code')
 193+ project = kwargs.pop('project')
 194+ extract.run_parse_editors(location, **kwargs)
 195+ timer.elapsed()
160196
161197
162 -def sort_launcher(args, location, filename, project, full_project, language_code):
163 - raise NotImplementedError
 198+def sort_launcher(args, **kwargs):
 199+ timer = Timer()
 200+ location = kwargs.pop('location')
 201+ input = os.path.join(location, 'txt')
 202+ output = os.path.join(location, 'sorted')
 203+ dbname = kwargs.pop('full_project')
 204+ loader.mergesort_launcher(input, output)
 205+ filename = loader.mergesort_external_launcher(dbname, output, output)
 206+ loader.store_editors(output, filename, dbname, 'editors')
 207+ timer.elapsed()
164208
165209
166 -def dataset_launcher(args, full_project):
 210+def transformer_launcher(args, **kwargs):
167211 print 'dataset launcher'
168 - optimize_editors.run_optimize_editors(project)
169 - construct_datasets.generate_editor_dataset_launcher(project)
 212+ timer = Timer()
 213+ project = kwargs.pop('full_project')
 214+ transformer.run_optimize_editors(project)
 215+ timer.elapsed()
170216
171217
172 -def all_launcher(args, location, filename, project, full_project, language_code, language):
 218+def exporter_launcher(args, **kwargs):
 219+ timer = Timer()
 220+ project = kwargs.pop('full_project')
 221+ exporter.generate_editor_dataset_launcher(project)
 222+ timer.elapsed()
 223+
 224+
 225+def all_launcher(args, **kwargs):
173226 print 'all_launcher'
174 - dump_downloader_launcher(args, location, filename, project, language_code)
175 - split_settings.input_filename_launcher(args, location, filename, project, language_code)
176 - mongodb_script_launcher(args, location, filename, project, language_code)
177 - dataset_launcher(args, location, filename, project, language_code)
 227+ timer = Timer()
 228+ dump_downloader_launcher(args, **kwargs)
 229+ chunker_launcher(args, **kwargs)
 230+ extract_launcher(args, **kwargs)
 231+ sort_launcher(args, **kwargs)
 232+ transformer_launcher(args, **kwargs)
 233+ exporter_launcher(args, **kwargs)
 234+ timer.elapsed()
178235
179236
180237 def supported_languages():
@@ -237,17 +294,20 @@
238295 parser_download.set_defaults(func=dump_downloader_launcher)
239296
240297 parser_split = subparsers.add_parser('split', help='The split sub command splits the downloaded file in smaller chunks to parallelize extracting information.')
241 - parser_split.set_defaults(func=cruncher_launcher)
 298+ parser_split.set_defaults(func=chunker_launcher)
242299
243300 parser_sort = subparsers.add_parser('sort', help='By presorting the data, significant processing time reducations are achieved.')
244301 parser_sort.set_defaults(func=sort_launcher)
245302
246 - parser_create = subparsers.add_parser('store', help='The store sub command parsers the XML chunk files, extracts the information and stores it in a MongoDB.')
247 - parser_create.set_defaults(func=mongodb_script_launcher)
 303+ parser_create = subparsers.add_parser('extract', help='The store sub command parsers the XML chunk files, extracts the information and stores it in a MongoDB.')
 304+ parser_create.set_defaults(func=extract_launcher)
248305
249 - parser_dataset = subparsers.add_parser('dataset', help='Create a dataset from the MongoDB and write it to a csv file.')
250 - parser_dataset.set_defaults(func=dataset_launcher)
 306+ parser_transform = subparsers.add_parser('transform', help='Transform the raw datatabe to an enriched dataset that can be exported.')
 307+ parser_transform.set_defaults(func=transformer_launcher)
251308
 309+ parser_dataset = subparsers.add_parser('export', help='Create a dataset from the MongoDB and write it to a csv file.')
 310+ parser_dataset.set_defaults(func=exporter_launcher)
 311+
252312 parser_all = subparsers.add_parser('all', help='The all sub command runs the download, split, store and dataset commands.\n\nWARNING: THIS COULD TAKE DAYS DEPENDING ON THE CONFIGURATION OF YOUR MACHINE AND THE SIZE OF THE WIKIMEDIA DUMP FILE.')
253313 parser_all.set_defaults(func=all_launcher)
254314
@@ -277,12 +337,14 @@
278338 detect_python_version()
279339 about()
280340 args = parser.parse_args()
281 - config.load_configuration(args)
 341+ if not os.path.exists('wiki.cfg'):
 342+ config.create_configuration(settings, args)
282343 locations = determine_file_locations(args)
283 - #prepare_file_locations(locations['location'])
284344 settings.verify_environment([locations['location']])
285345 show_settings(args, **locations)
 346+ #locations['settings'] = settings
286347 args.func(args, **locations)
 348+ t1 = datetime.datetime.now()
287349
288350
289351 if __name__ == '__main__':
Index: trunk/tools/editor_trends/etl/optimize_editors.py
@@ -1,174 +0,0 @@
2 -#!/usr/bin/python
3 -# -*- coding: utf-8 -*-
4 -'''
5 -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
6 -This program is free software; you can redistribute it and/or
7 -modify it under the terms of the GNU General Public License version 2
8 -as published by the Free Software Foundation.
9 -This program is distributed in the hope that it will be useful,
10 -but WITHOUT ANY WARRANTY; without even the implied warranty of
11 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 -See the GNU General Public License for more details, at
13 -http://www.fsf.org/licenses/gpl.html
14 -'''
15 -
16 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__author__email = 'dvanliere at gmail dot com'
18 -__date__ = '2010-11-02'
19 -__version__ = '0.1'
20 -
21 -from multiprocessing import Queue
22 -from Queue import Empty
23 -from operator import itemgetter
24 -import datetime
25 -import sys
26 -
27 -sys.path.append('..')
28 -import configuration
29 -settings = configuration.Settings()
30 -from database import db
31 -from utils import process_constructor as pc
32 -from utils import utils
33 -import construct_datasets
34 -
35 -
36 -try:
37 - import psyco
38 - psyco.full()
39 -except ImportError:
40 - pass
41 -
42 -
43 -def create_datacontainer(init_value=0):
44 - '''
45 - This function initializes an empty dictionary with as key the year (starting
46 - 2001 and running through) and as value @init_value, in most cases this will
47 - be zero so the dictionary will act as a running tally for a variable but
48 - @init_value can also a list, [], or a dictionary, {}, or a set, set().
49 - '''
50 - data = {}
51 - year = datetime.datetime.now().year + 1
52 - for x in xrange(2001, year):
53 - data[str(x)] = init_value
54 - return data
55 -
56 -
57 -def add_months_to_datacontainer(datacontainer):
58 - for dc in datacontainer:
59 - datacontainer[dc] = {}
60 - for x in xrange(1, 13):
61 - datacontainer[dc][str(x)] = 0
62 - return datacontainer
63 -
64 -
65 -def determine_edits_by_month(edits):
66 - datacontainer = create_datacontainer(init_value=0)
67 - datacontainer = add_months_to_datacontainer(datacontainer)
68 - for year in edits:
69 - months = set()
70 - for edit in edits[year]:
71 - m = str(edit['date'].month)
72 - if m not in months:
73 - datacontainer[year][m] = 1
74 - months.add(m)
75 - if len(months) == 12:
76 - break
77 - return datacontainer
78 -
79 -
80 -def determine_edits_by_year(dates):
81 - '''
82 - This function counts the number of edits by year made by a particular editor.
83 - '''
84 - edits = create_datacontainer()
85 - for date in dates:
86 - year = str(date['date'].year)
87 - edits[year] += 1
88 - return edits
89 -
90 -
91 -def determine_articles_by_year(dates):
92 - '''
93 - This function counts the number of unique articles by year edited by a
94 - particular editor.
95 - '''
96 - articles = create_datacontainer(set())
97 - for date in dates:
98 - year = str(date['date'].year)
99 - articles[year].add(date['article'])
100 - for article in articles:
101 - articles[article] = len(articles[article])
102 - return articles
103 -
104 -
105 -def sort_edits(edits):
106 - edits = utils.merge_list(edits)
107 - return sorted(edits, key=itemgetter('date'))
108 -
109 -
110 -def optimize_editors(input_queue, result_queue, pbar, **kwargs):
111 - dbname = kwargs.pop('dbname')
112 - mongo = db.init_mongo_db(dbname)
113 - input = mongo['test']
114 - output = mongo['dataset']
115 - output.ensure_index('editor')
116 - output.ensure_index('year_joined')
117 - definition = kwargs.pop('definition')
118 - while True:
119 - try:
120 - id = input_queue.get(block=False)
121 - editor = input.find_one({'editor': id})
122 - if editor == None:
123 - continue
124 - edits = editor['edits']
125 - monthly_edits = determine_edits_by_month(edits)
126 - edits = sort_edits(edits)
127 - edit_count = len(edits)
128 - new_wikipedian = edits[9]['date']
129 - first_edit = edits[0]['date']
130 - final_edit = edits[-1]['date']
131 - edits_by_year = determine_edits_by_year(edits)
132 - articles_by_year = determine_articles_by_year(edits)
133 -
134 - edits = edits[:10]
135 -
136 - output.insert({'editor': id, 'edits': edits,
137 - 'edits_by_year': edits_by_year,
138 - 'new_wikipedian': new_wikipedian,
139 - 'edit_count': edit_count,
140 - 'final_edit': final_edit,
141 - 'first_edit': first_edit,
142 - 'articles_by_year': articles_by_year,
143 - 'monthly_edits': monthly_edits})
144 - print 'Items left: %s' % input_queue.qsize()
145 - except Empty:
146 - break
147 -
148 -
149 -def run_optimize_editors(dbname):
150 - ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors')
151 - kwargs = {'definition': 'traditional',
152 - 'pbar': True,
153 - 'dbname': 'enwiki',
154 - 'nr_input_processors': 1,
155 - 'nr_output_processors': 0,
156 - 'poison_pill': False
157 - }
158 - print len(ids)
159 - ids = list(ids)
160 - chunks = {0: ids}
161 - pc.build_scaffolding(pc.load_queue, optimize_editors, chunks, False, False, **kwargs)
162 -
163 -
164 -def debug_optimize_editors(dbname):
165 - ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors')
166 - q = pc.load_queue(ids)
167 - kwargs = {'definition': 'traditional',
168 - 'dbname': dbname
169 - }
170 - optimize_editors(q, False, True, kwargs)
171 -
172 -
173 -if __name__ == '__main__':
174 - #debug_optimize_editors('test')
175 - run_optimize_editors('enwiki')
\ No newline at end of file
Index: trunk/tools/editor_trends/etl/construct_datasets.py
@@ -1,256 +0,0 @@
2 -#!/usr/bin/python
3 -# -*- coding: utf-8 -*-
4 -'''
5 -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
6 -This program is free software; you can redistribute it and/or
7 -modify it under the terms of the GNU General Public License version 2
8 -as published by the Free Software Foundation.
9 -This program is distributed in the hope that it will be useful,
10 -but WITHOUT ANY WARRANTY; without even the implied warranty of
11 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 -See the GNU General Public License for more details, at
13 -http://www.fsf.org/licenses/gpl.html
14 -'''
15 -
16 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__author__email = 'dvanliere at gmail dot com'
18 -__date__ = '2010-10-21'
19 -__version__ = '0.1'
20 -
21 -from multiprocessing import Queue
22 -from Queue import Empty
23 -import datetime
24 -from dateutil.relativedelta import *
25 -import sys
26 -import progressbar
27 -
28 -sys.path.append('..')
29 -import configuration
30 -settings = configuration.Settings()
31 -from utils import models, utils
32 -from database import db
33 -from utils import process_constructor as pc
34 -
35 -try:
36 - import psyco
37 - psyco.full()
38 -except ImportError:
39 - pass
40 -
41 -
42 -def retrieve_editor_ids_mongo(dbname, collection):
43 - if utils.check_file_exists(settings.binary_location,
44 - 'editors.bin'):
45 - ids = utils.load_object(settings.binary_location,
46 - 'editors.bin')
47 - else:
48 - mongo = db.init_mongo_db(dbname)
49 - editors = mongo[collection]
50 - ids = editors.distinct('editor')
51 - utils.store_object(ids, settings.binary_location, retrieve_editor_ids_mongo)
52 - return ids
53 -
54 -
55 -def expand_edits(edits):
56 - data = []
57 - for edit in edits:
58 - data.append(edit['date'])
59 - return data
60 -
61 -
62 -def expand_observations(obs, vars_to_expand):
63 - for var in vars_to_expand:
64 - if var == 'edits':
65 - obs[var] = expand_edits(obs[var])
66 - elif var == 'edits_by_year':
67 - keys = obs[var].keys()
68 - keys.sort()
69 - edits = []
70 - for key in keys:
71 - edits.append(str(obs[var][key]))
72 - obs[var] = edits
73 - return obs
74 -
75 -def write_longitudinal_data(id, edits, fh):
76 - years = edits.keys()
77 - years.sort()
78 - for year in years:
79 - months = edits[year].keys()
80 - months = [int(m) for m in months]
81 - months.sort()
82 - for m in months:
83 - date = datetime.date(int(year), int(m), 1)
84 - fh.write('%s\t%s\t%s\n' % (id, date, edits[year][str(m)]))
85 -
86 -
87 -def expand_headers(headers, vars_to_expand, obs):
88 - for var in vars_to_expand:
89 - l = len(obs[var])
90 - pos = headers.index(var)
91 - for i in xrange(l):
92 - if var.endswith('year'):
93 - suffix = 2001 + i
94 - elif var.endswith('edits'):
95 - suffix = 1 + i
96 - headers.insert(pos + i, '%s_%s' % (var, suffix))
97 - headers.remove(var)
98 - return headers
99 -
100 -
101 -def generate_long_editor_dataset(input_queue, data_queue, pbar, **kwargs):
102 - debug = kwargs.pop('debug')
103 - dbname = kwargs.pop('dbname')
104 - mongo = db.init_mongo_db(dbname)
105 - editors = mongo['dataset']
106 - name = dbname + '_long_editors.csv'
107 - fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding)
108 - x = 0
109 - vars_to_expand = []
110 - while True:
111 - try:
112 - id = input_queue.get(block=False)
113 - obs = editors.find_one({'editor': id}, {'monthly_edits': 1})
114 - if x == 0:
115 - headers = obs.keys()
116 - headers.sort()
117 - headers = expand_headers(headers, vars_to_expand, obs)
118 - utils.write_list_to_csv(headers, fh)
119 - write_longitudinal_data(id, obs['monthly_edits'], fh)
120 - #utils.write_list_to_csv(data, fh)
121 - x += 1
122 - except Empty:
123 - break
124 -
125 -
126 -def generate_cohort_analysis(input_queue, data_queue, pbar, **kwargs):
127 - dbname = kwargs.get('dbname')
128 - pbar = kwargs.get('pbar')
129 - mongo = db.init_mongo_db(dbname)
130 - editors = mongo['dataset']
131 - year = datetime.datetime.now().year + 1
132 - begin = year - 2001
133 - p = [3, 6, 9]
134 - periods = [y * 12 for y in xrange(1, begin)]
135 - periods = p + periods
136 - data = {}
137 - while True:
138 - try:
139 - id = input_queue.get(block=False)
140 - obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1})
141 - first_edit = obs['first_edit']
142 - last_edit = obs['final_edit']
143 - for y in xrange(2001, year):
144 - if y == 2010 and first_edit > datetime.datetime(2010, 1, 1):
145 - print 'debug'
146 - if y not in data:
147 - data[y] = {}
148 - data[y]['n'] = 0
149 - window_end = datetime.datetime(y, 12, 31)
150 - if window_end > datetime.datetime.now():
151 - now = datetime.datetime.now()
152 - m = now.month - 1 #Dump files are always lagging at least one month....
153 - d = now.day
154 - window_end = datetime.datetime(y, m, d)
155 - edits = []
156 - for period in periods:
157 - if period not in data[y]:
158 - data[y][period] = 0
159 - window_start = datetime.datetime(y, 12, 31) - relativedelta(months=period)
160 - if window_start < datetime.datetime(2001, 1, 1):
161 - window_start = datetime.datetime(2001, 1, 1)
162 - if date_falls_in_window(window_start, window_end, first_edit, last_edit):
163 - edits.append(period)
164 - if edits != []:
165 - p = min(edits)
166 - data[y]['n'] += 1
167 - data[y][p] += 1
168 - #pbar.update(+1)
169 - except Empty:
170 - break
171 - utils.store_object(data, settings.binary_location, 'cohort_data')
172 -
173 -def date_falls_in_window(window_start, window_end, first_edit, last_edit):
174 - if first_edit >= window_start and first_edit <= window_end:
175 - return True
176 - else:
177 - return False
178 -
179 -
180 -def generate_wide_editor_dataset(input_queue, data_queue, pbar, **kwargs):
181 - dbname = kwargs.pop('dbname')
182 - mongo = db.init_mongo_db(dbname)
183 - editors = mongo['dataset']
184 - name = dbname + '_wide_editors.csv'
185 - fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding)
186 - x = 0
187 - vars_to_expand = ['edits', 'edits_by_year', 'articles_by_year']
188 - while True:
189 - try:
190 - if debug:
191 - id = u'99797'
192 - else:
193 - id = input_queue.get(block=False)
194 - print input_queue.qsize()
195 - obs = editors.find_one({'editor': id})
196 - obs = expand_observations(obs, vars_to_expand)
197 - if x == 0:
198 - headers = obs.keys()
199 - headers.sort()
200 - headers = expand_headers(headers, vars_to_expand, obs)
201 - utils.write_list_to_csv(headers, fh)
202 - data = []
203 - keys = obs.keys()
204 - keys.sort()
205 - for key in keys:
206 - data.append(obs[key])
207 - utils.write_list_to_csv(data, fh)
208 -
209 - x += 1
210 - except Empty:
211 - break
212 - fh.close()
213 -
214 -
215 -def retrieve_edits_by_contributor_launcher():
216 - pc.build_scaffolding(pc.load_queue, retrieve_edits_by_contributor, 'contributors')
217 -
218 -
219 -def debug_retrieve_edits_by_contributor_launcher(dbname):
220 - kwargs = {'debug': False,
221 - 'dbname': dbname,
222 - }
223 - ids = retrieve_editor_ids_mongo(dbname, 'editors')
224 - input_queue = pc.load_queue(ids)
225 - q = Queue()
226 - generate_editor_dataset(input_queue, q, False, kwargs)
227 -
228 -
229 -def generate_editor_dataset_launcher(dbname):
230 - kwargs = {'nr_input_processors': 1,
231 - 'nr_output_processors': 1,
232 - 'debug': False,
233 - 'dbname': dbname,
234 - 'poison_pill':False,
235 - 'pbar': True
236 - }
237 - ids = retrieve_editor_ids_mongo(dbname, 'editors')
238 - ids = list(ids)
239 - chunks = dict({0: ids})
240 - pc.build_scaffolding(pc.load_queue, generate_cohort_analysis, chunks, False, False, **kwargs)
241 -
242 -
243 -def generate_editor_dataset_debug(dbname):
244 - ids = retrieve_editor_ids_mongo(dbname, 'editors')
245 - input_queue = pc.load_queue(ids)
246 - kwargs = {'nr_input_processors': 1,
247 - 'nr_output_processors': 1,
248 - 'debug': True,
249 - 'dbname': dbname,
250 - }
251 - generate_editor_dataset(input_queue, False, False, kwargs)
252 -
253 -
254 -if __name__ == '__main__':
255 - #generate_editor_dataset_debug('test')
256 - generate_editor_dataset_launcher('enwiki')
257 - #debug_retrieve_edits_by_contributor_launcher()
Index: trunk/tools/editor_trends/etl/exporter.py
@@ -0,0 +1,256 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2010-10-21'
 19+__version__ = '0.1'
 20+
 21+from multiprocessing import Queue
 22+from Queue import Empty
 23+import datetime
 24+from dateutil.relativedelta import *
 25+import sys
 26+import progressbar
 27+
 28+sys.path.append('..')
 29+import configuration
 30+settings = configuration.Settings()
 31+from utils import models, utils
 32+from database import db
 33+from utils import process_constructor as pc
 34+
 35+try:
 36+ import psyco
 37+ psyco.full()
 38+except ImportError:
 39+ pass
 40+
 41+
 42+def retrieve_editor_ids_mongo(dbname, collection):
 43+ if utils.check_file_exists(settings.binary_location,
 44+ 'editors.bin'):
 45+ ids = utils.load_object(settings.binary_location,
 46+ 'editors.bin')
 47+ else:
 48+ mongo = db.init_mongo_db(dbname)
 49+ editors = mongo[collection]
 50+ ids = editors.distinct('editor')
 51+ utils.store_object(ids, settings.binary_location, retrieve_editor_ids_mongo)
 52+ return ids
 53+
 54+
 55+def expand_edits(edits):
 56+ data = []
 57+ for edit in edits:
 58+ data.append(edit['date'])
 59+ return data
 60+
 61+
 62+def expand_observations(obs, vars_to_expand):
 63+ for var in vars_to_expand:
 64+ if var == 'edits':
 65+ obs[var] = expand_edits(obs[var])
 66+ elif var == 'edits_by_year':
 67+ keys = obs[var].keys()
 68+ keys.sort()
 69+ edits = []
 70+ for key in keys:
 71+ edits.append(str(obs[var][key]))
 72+ obs[var] = edits
 73+ return obs
 74+
 75+def write_longitudinal_data(id, edits, fh):
 76+ years = edits.keys()
 77+ years.sort()
 78+ for year in years:
 79+ months = edits[year].keys()
 80+ months = [int(m) for m in months]
 81+ months.sort()
 82+ for m in months:
 83+ date = datetime.date(int(year), int(m), 1)
 84+ fh.write('%s\t%s\t%s\n' % (id, date, edits[year][str(m)]))
 85+
 86+
 87+def expand_headers(headers, vars_to_expand, obs):
 88+ for var in vars_to_expand:
 89+ l = len(obs[var])
 90+ pos = headers.index(var)
 91+ for i in xrange(l):
 92+ if var.endswith('year'):
 93+ suffix = 2001 + i
 94+ elif var.endswith('edits'):
 95+ suffix = 1 + i
 96+ headers.insert(pos + i, '%s_%s' % (var, suffix))
 97+ headers.remove(var)
 98+ return headers
 99+
 100+
 101+def generate_long_editor_dataset(input_queue, data_queue, pbar, **kwargs):
 102+ debug = kwargs.pop('debug')
 103+ dbname = kwargs.pop('dbname')
 104+ mongo = db.init_mongo_db(dbname)
 105+ editors = mongo['dataset']
 106+ name = dbname + '_long_editors.csv'
 107+ fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding)
 108+ x = 0
 109+ vars_to_expand = []
 110+ while True:
 111+ try:
 112+ id = input_queue.get(block=False)
 113+ obs = editors.find_one({'editor': id}, {'monthly_edits': 1})
 114+ if x == 0:
 115+ headers = obs.keys()
 116+ headers.sort()
 117+ headers = expand_headers(headers, vars_to_expand, obs)
 118+ utils.write_list_to_csv(headers, fh)
 119+ write_longitudinal_data(id, obs['monthly_edits'], fh)
 120+ #utils.write_list_to_csv(data, fh)
 121+ x += 1
 122+ except Empty:
 123+ break
 124+
 125+
 126+def generate_cohort_analysis(input_queue, data_queue, pbar, **kwargs):
 127+ dbname = kwargs.get('dbname')
 128+ pbar = kwargs.get('pbar')
 129+ mongo = db.init_mongo_db(dbname)
 130+ editors = mongo['dataset']
 131+ year = datetime.datetime.now().year + 1
 132+ begin = year - 2001
 133+ p = [3, 6, 9]
 134+ periods = [y * 12 for y in xrange(1, begin)]
 135+ periods = p + periods
 136+ data = {}
 137+ while True:
 138+ try:
 139+ id = input_queue.get(block=False)
 140+ obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1})
 141+ first_edit = obs['first_edit']
 142+ last_edit = obs['final_edit']
 143+ for y in xrange(2001, year):
 144+ if y == 2010 and first_edit > datetime.datetime(2010, 1, 1):
 145+ print 'debug'
 146+ if y not in data:
 147+ data[y] = {}
 148+ data[y]['n'] = 0
 149+ window_end = datetime.datetime(y, 12, 31)
 150+ if window_end > datetime.datetime.now():
 151+ now = datetime.datetime.now()
 152+ m = now.month - 1 #Dump files are always lagging at least one month....
 153+ d = now.day
 154+ window_end = datetime.datetime(y, m, d)
 155+ edits = []
 156+ for period in periods:
 157+ if period not in data[y]:
 158+ data[y][period] = 0
 159+ window_start = datetime.datetime(y, 12, 31) - relativedelta(months=period)
 160+ if window_start < datetime.datetime(2001, 1, 1):
 161+ window_start = datetime.datetime(2001, 1, 1)
 162+ if date_falls_in_window(window_start, window_end, first_edit, last_edit):
 163+ edits.append(period)
 164+ if edits != []:
 165+ p = min(edits)
 166+ data[y]['n'] += 1
 167+ data[y][p] += 1
 168+ #pbar.update(+1)
 169+ except Empty:
 170+ break
 171+ utils.store_object(data, settings.binary_location, 'cohort_data')
 172+
 173+def date_falls_in_window(window_start, window_end, first_edit, last_edit):
 174+ if first_edit >= window_start and first_edit <= window_end:
 175+ return True
 176+ else:
 177+ return False
 178+
 179+
 180+def generate_wide_editor_dataset(input_queue, data_queue, pbar, **kwargs):
 181+ dbname = kwargs.pop('dbname')
 182+ mongo = db.init_mongo_db(dbname)
 183+ editors = mongo['dataset']
 184+ name = dbname + '_wide_editors.csv'
 185+ fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding)
 186+ x = 0
 187+ vars_to_expand = ['edits', 'edits_by_year', 'articles_by_year']
 188+ while True:
 189+ try:
 190+ if debug:
 191+ id = u'99797'
 192+ else:
 193+ id = input_queue.get(block=False)
 194+ print input_queue.qsize()
 195+ obs = editors.find_one({'editor': id})
 196+ obs = expand_observations(obs, vars_to_expand)
 197+ if x == 0:
 198+ headers = obs.keys()
 199+ headers.sort()
 200+ headers = expand_headers(headers, vars_to_expand, obs)
 201+ utils.write_list_to_csv(headers, fh)
 202+ data = []
 203+ keys = obs.keys()
 204+ keys.sort()
 205+ for key in keys:
 206+ data.append(obs[key])
 207+ utils.write_list_to_csv(data, fh)
 208+
 209+ x += 1
 210+ except Empty:
 211+ break
 212+ fh.close()
 213+
 214+
 215+def retrieve_edits_by_contributor_launcher():
 216+ pc.build_scaffolding(pc.load_queue, retrieve_edits_by_contributor, 'contributors')
 217+
 218+
 219+def debug_retrieve_edits_by_contributor_launcher(dbname):
 220+ kwargs = {'debug': False,
 221+ 'dbname': dbname,
 222+ }
 223+ ids = retrieve_editor_ids_mongo(dbname, 'editors')
 224+ input_queue = pc.load_queue(ids)
 225+ q = Queue()
 226+ generate_editor_dataset(input_queue, q, False, kwargs)
 227+
 228+
 229+def generate_editor_dataset_launcher(dbname):
 230+ kwargs = {'nr_input_processors': 1,
 231+ 'nr_output_processors': 1,
 232+ 'debug': False,
 233+ 'dbname': dbname,
 234+ 'poison_pill':False,
 235+ 'pbar': True
 236+ }
 237+ ids = retrieve_editor_ids_mongo(dbname, 'editors')
 238+ ids = list(ids)
 239+ chunks = dict({0: ids})
 240+ pc.build_scaffolding(pc.load_queue, generate_cohort_analysis, chunks, False, False, **kwargs)
 241+
 242+
 243+def generate_editor_dataset_debug(dbname):
 244+ ids = retrieve_editor_ids_mongo(dbname, 'editors')
 245+ input_queue = pc.load_queue(ids)
 246+ kwargs = {'nr_input_processors': 1,
 247+ 'nr_output_processors': 1,
 248+ 'debug': True,
 249+ 'dbname': dbname,
 250+ }
 251+ generate_editor_dataset(input_queue, False, False, kwargs)
 252+
 253+
 254+if __name__ == '__main__':
 255+ #generate_editor_dataset_debug('test')
 256+ generate_editor_dataset_launcher('enwiki')
 257+ #debug_retrieve_edits_by_contributor_launcher()
Property changes on: trunk/tools/editor_trends/etl/exporter.py
___________________________________________________________________
Added: svn:eol-style
1258 + native
Added: svn:mime-type
2259 + text/plain
Index: trunk/tools/editor_trends/etl/extract.py
@@ -28,11 +28,12 @@
2929 import re
3030 from operator import itemgetter
3131 import xml.etree.cElementTree as cElementTree
32 -from multiprocessing import Queue, JoinableQueue
 32+import multiprocessing
3333 from Queue import Empty
3434 import pymongo
3535
3636 # Custom written files
 37+sys.path.append('..')
3738 import configuration
3839 settings = configuration.Settings()
3940 from utils import utils, models
@@ -50,7 +51,89 @@
5152 except ImportError:
5253 pass
5354
 55+class XMLFileConsumer(models.BaseConsumer):
5456
 57+ def run(self):
 58+ while True:
 59+ new_xmlfile = self.task_queue.get()
 60+ self.task_queue.task_done()
 61+ if new_xmlfile == None:
 62+ print 'Swallowed a poison pill'
 63+ break
 64+ new_xmlfile()
 65+
 66+class XMLFile(object):
 67+ def __init__(self, input, output, file, bots, **kwargs):
 68+ self.file = file
 69+ self.input = input
 70+ self.output = output
 71+ self.bots = bots
 72+ for kw in kwargs:
 73+ setattr(self, kw, kwargs[kw])
 74+
 75+ def create_file_handle(self):
 76+ if self.destination == 'file':
 77+ self.name = self.file[:-4] + '.txt'
 78+ self.fh = utils.create_txt_filehandle(self.output, self.name, 'w', settings.encoding)
 79+
 80+ def __str__(self):
 81+ return '%s' % (self.file)
 82+
 83+ def __call__(self):
 84+ if settings.debug:
 85+ messages = {}
 86+ vars = {}
 87+
 88+ data = xml.read_input(utils.create_txt_filehandle(self.input,
 89+ self.file, 'r',
 90+ encoding=settings.encoding))
 91+ self.create_file_handle()
 92+ for raw_data in data:
 93+ xml_buffer = cStringIO.StringIO()
 94+ raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n')
 95+
 96+ try:
 97+ raw_data = ''.join(raw_data)
 98+ xml_buffer.write(raw_data)
 99+ elem = cElementTree.XML(xml_buffer.getvalue())
 100+ output_editor_information(elem, self.fh, bots=self.bots, destination=self.destination)
 101+ except SyntaxError, error:
 102+ print error
 103+ '''
 104+ There are few cases with invalid tokens, they are ignored
 105+ '''
 106+ if settings.debug:
 107+ utils.track_errors(xml_buffer, error, self.file, messages)
 108+ except UnicodeEncodeError, error:
 109+ print error
 110+ if settings.debug:
 111+ utils.track_errors(xml_buffer, error, self.file, messages)
 112+ except MemoryError, error:
 113+ print self.file, error
 114+ print raw_data[:12]
 115+ print 'String was supposed to be %s characters long' % sum([len(raw) for raw in raw_data])
 116+
 117+ if self.destination == 'queue':
 118+ self.output.put('NEXT')
 119+ while True:
 120+ if self.output.qsize() < 100000:
 121+ break
 122+ else:
 123+ time.sleep(10)
 124+ print 'Still sleeping, queue is %s items long' % self.output.qsize()
 125+
 126+ else:
 127+ self.fh.close()
 128+
 129+
 130+ if self.destination == 'queue':
 131+ data_queue.put(None)
 132+
 133+ if settings.debug:
 134+ utils.report_error_messages(messages, output_editor_information)
 135+
 136+
 137+
55138 def determine_username_is_bot(username, kwargs):
56139 '''
57140 @username is the xml element containing the id of the user
@@ -70,6 +153,13 @@
71154 return 0
72155
73156
 157+def extract_username(contributor, kwargs):
 158+ for elem in contributor:
 159+ if elem.tag == 'username':
 160+ return elem.text #.encode(settings.encoding)
 161+ else:
 162+ return None
 163+
74164 def extract_contributor_id(contributor, kwargs):
75165 '''
76166 @contributor is the xml contributor node containing a number of attributes
@@ -84,7 +174,7 @@
85175 for elem in contributor:
86176 if elem.tag in tags:
87177 if elem.text != None:
88 - return elem.text.decode('utf-8')
 178+ return elem.text.encode(settings.encoding)
89179 else:
90180 return - 1
91181
@@ -99,11 +189,13 @@
100190 this dictionary are the functions used to extract the data.
101191 '''
102192 tags = {'contributor': {'editor': extract_contributor_id,
103 - 'bot': determine_username_is_bot},
 193+ 'bot': determine_username_is_bot,
 194+ 'username': extract_username,
 195+ },
104196 'timestamp': {'date': xml.extract_text},
105197 }
106198 vars = {}
107 - headers = ['editor', 'date', 'article']
 199+ headers = ['editor', 'date', 'article', 'username']
108200 destination = kwargs.pop('destination')
109201 revisions = elem.findall('revision')
110202 for revision in revisions:
@@ -128,167 +220,100 @@
129221 vars = {}
130222
131223
132 -def parse_editors(xml_queue, data_queue, **kwargs):
133 - '''
134 - @xml_queue contains the filenames of the files to be parsed
135 - @data_queue is an instance of Queue where the extracted data is stored for
136 - further processing
137 - @pbar is an instance of progressbar to display the progress
138 - @bots is a list of id's of known Wikipedia bots
139 - @debug is a flag to indicate whether the function is called for debugging.
140 -
141 - Output is the data_queue that will be used by store_editors()
142 - '''
143 - input = kwargs.get('input', None)
144 - output = kwargs.get('output', None)
145 - debug = kwargs.get('debug', False)
146 - destination = kwargs.get('destination', 'file')
147 - bots = kwargs.get('bots', None)
148 - pbar = kwargs.get('pbar', None)
149 - if settings.debug:
150 - messages = {}
151 - vars = {}
 224+#def parse_editors(xml_queue, data_queue, **kwargs):
 225+# '''
 226+# @xml_queue contains the filenames of the files to be parsed
 227+# @data_queue is an instance of Queue where the extracted data is stored for
 228+# further processing
 229+# @pbar is an instance of progressbar to display the progress
 230+# @bots is a list of id's of known Wikipedia bots
 231+# @debug is a flag to indicate whether the function is called for debugging.
 232+#
 233+# Output is the data_queue that will be used by store_editors()
 234+# '''
 235+# input = kwargs.get('input', None)
 236+# output = kwargs.get('output', None)
 237+# debug = kwargs.get('debug', False)
 238+# destination = kwargs.get('destination', 'file')
 239+# bots = kwargs.get('bots', None)
 240+# pbar = kwargs.get('pbar', None)
 241+# if settings.debug:
 242+# messages = {}
 243+# vars = {}
 244+#
 245+# while True:
 246+# try:
 247+# if debug:
 248+# file = xml_queue
 249+# else:
 250+# file = xml_queue.get(block=False)
 251+# if file == None:
 252+# print 'Swallowed a poison pill'
 253+# break
 254+#
 255+# data = xml.read_input(utils.create_txt_filehandle(input,
 256+# file, 'r',
 257+# encoding=settings.encoding))
 258+# if destination == 'file':
 259+# name = file[:-4] + '.txt'
 260+# fh = utils.create_txt_filehandle(output, name, 'w', settings.encoding)
 261+# for raw_data in data:
 262+# xml_buffer = cStringIO.StringIO()
 263+# raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n')
 264+#
 265+# try:
 266+# raw_data = ''.join(raw_data)
 267+# xml_buffer.write(raw_data)
 268+# elem = cElementTree.XML(xml_buffer.getvalue())
 269+# output_editor_information(elem, fh, bots=bots, destination=destination)
 270+# except SyntaxError, error:
 271+# print error
 272+# '''
 273+# There are few cases with invalid tokens, they are fixed
 274+# here and then reinserted into the XML DOM
 275+# data = convert_html_entities(xml_buffer.getvalue())
 276+# elem = cElementTree.XML(data)
 277+# output_editor_information(elem)
 278+# '''
 279+# if settings.debug:
 280+# utils.track_errors(xml_buffer, error, file, messages)
 281+# except UnicodeEncodeError, error:
 282+# print error
 283+# if settings.debug:
 284+# utils.track_errors(xml_buffer, error, file, messages)
 285+# except MemoryError, error:
 286+# print file, error
 287+# print raw_data[:12]
 288+# print 'String was supposed to be %s characters long' % sum([len(raw) for raw in raw_data])
 289+# if destination == 'queue':
 290+# output.put('NEXT')
 291+# while True:
 292+# if output.qsize() < 100000:
 293+# break
 294+# else:
 295+# time.sleep(10)
 296+# print 'Still sleeping, queue is %s items long' % output.qsize()
 297+#
 298+# else:
 299+# fh.close()
 300+#
 301+# if pbar:
 302+# print file, xml_queue.qsize()
 303+# #utils.update_progressbar(pbar, xml_queue)
 304+#
 305+# if debug:
 306+# break
 307+#
 308+# except Empty:
 309+# break
 310+#
 311+# if destination == 'queue':
 312+# data_queue.put(None)
 313+#
 314+# if settings.debug:
 315+# utils.report_error_messages(messages, parse_editors)
152316
153 - while True:
154 - try:
155 - if debug:
156 - file = xml_queue
157 - else:
158 - file = xml_queue.get(block=False)
159 - if file == None:
160 - print 'Swallowed a poison pill'
161 - break
162317
163 - data = xml.read_input(utils.create_txt_filehandle(input,
164 - file, 'r',
165 - encoding=settings.encoding))
166 - if destination == 'file':
167 - name = file[:-4] + '.txt'
168 - fh = utils.create_txt_filehandle(output, name, 'w', settings.encoding)
169 - for raw_data in data:
170 - xml_buffer = cStringIO.StringIO()
171 - raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n')
172 -
173 - try:
174 - raw_data = ''.join(raw_data)
175 - xml_buffer.write(raw_data)
176 - elem = cElementTree.XML(xml_buffer.getvalue())
177 - output_editor_information(elem, fh, bots=bots, destination=destination)
178 - except SyntaxError, error:
179 - print error
180 - '''
181 - There are few cases with invalid tokens, they are fixed
182 - here and then reinserted into the XML DOM
183 - data = convert_html_entities(xml_buffer.getvalue())
184 - elem = cElementTree.XML(data)
185 - output_editor_information(elem)
186 - '''
187 - if settings.debug:
188 - utils.track_errors(xml_buffer, error, file, messages)
189 - except UnicodeEncodeError, error:
190 - print error
191 - if settings.debug:
192 - utils.track_errors(xml_buffer, error, file, messages)
193 - except MemoryError, error:
194 - print file, error
195 - print raw_data[:12]
196 - print 'String was supposed to be %s characters long' % sum([len(raw) for raw in raw_data])
197 - if destination == 'queue':
198 - output.put('NEXT')
199 - while True:
200 - if output.qsize() < 100000:
201 - break
202 - else:
203 - time.sleep(10)
204 - print 'Still sleeping, queue is %s items long' % output.qsize()
205 -
206 - else:
207 - fh.close()
208 -
209 - if pbar:
210 - print file, xml_queue.qsize()
211 - #utils.update_progressbar(pbar, xml_queue)
212 -
213 - if debug:
214 - break
215 -
216 - except Empty:
217 - break
218 -
219 - if destination == 'queue':
220 - data_queue.put(None)
221 -
222 - if settings.debug:
223 - utils.report_error_messages(messages, parse_editors)
224 -
225 -
226 -def store_editors(data_queue, **kwargs):
227 - '''
228 - @data_queue is an instance of Queue containing information extracted by
229 - parse_editors()
230 - @pids is a list of PIDs used to check if other processes are finished
231 - running
232 - @dbname is the name of the MongoDB collection where to store the information.
233 - '''
234 - dbname = kwargs.get('dbname', None)
235 - mongo = db.init_mongo_db(dbname)
236 - collection = mongo['editors']
237 - mongo.collection.ensure_index('editor')
238 - editor_cache = cache.EditorCache(collection)
239 -
240 - while True:
241 - try:
242 - edit = data_queue.get(block=False)
243 - data_queue.task_done()
244 - if edit == None:
245 - print 'Swallowing poison pill'
246 - break
247 - elif edit == 'NEXT':
248 - editor_cache.add('NEXT', '')
249 - else:
250 - contributor = edit['editor']
251 - value = {'date': edit['date'], 'article': edit['article']}
252 - editor_cache.add(contributor, value)
253 - #collection.update({'editor': contributor}, {'$push': {'edits': value}}, True)
254 - #'$inc': {'edit_count': 1},
255 -
256 - except Empty:
257 - '''
258 - This checks whether the Queue is empty because the preprocessors are
259 - finished or because this function is faster in emptying the Queue
260 - then the preprocessors are able to fill it. If the preprocessors
261 - are finished and this Queue is empty than break, else wait for the
262 - Queue to fill.
263 - '''
264 - pass
265 -
266 - print 'Emptying entire cache.'
267 - editor_cache.store()
268 - print 'Time elapsed: %s and processed %s items.' % (datetime.datetime.now() - editor_cache.init_time, editor_cache.cumulative_n)
269 -
270 -
271 -def load_cache_objects():
272 - cache = {}
273 - files = utils.retrieve_file_list(settings.binary_location, '.bin')
274 - for x, file in enumerate(files):
275 - cache[x] = utils.load_object(settings.binary_location, file)
276 - return cache
277 -
278 -
279 -def search_cache_for_missed_editors(dbname):
280 - mongo = db.init_mongo_db(dbname)
281 - collection = mongo['editors']
282 - editor_cache = cache.EditorCache(collection)
283 - cache = load_cache_objects()
284 - for c in cache:
285 - for editor in cache[c]:
286 - editor_cache.add(editor, cache[c][editor])
287 - cache[c] = {}
288 - editor_cache.add('NEXT', '')
289 - cache = {}
290 -
291 -
292 -
293318 def load_bot_ids():
294319 '''
295320 Loader function to retrieve list of id's of known Wikipedia bots.
@@ -302,30 +327,32 @@
303328 return ids
304329
305330
306 -def run_parse_editors(location, language, project):
307 - ids = load_bot_ids()
308 - base = os.path.join(location, language, project)
309 - input = os.path.join(base, 'chunks')
310 - output = os.path.join(base, 'txt')
 331+def run_parse_editors(location, **kwargs):
 332+ bots = load_bot_ids()
 333+ input = os.path.join(location, 'chunks')
 334+ output = os.path.join(location, 'txt')
311335 settings.verify_environment([input, output])
312336 files = utils.retrieve_file_list(input, 'xml')
313 -
314 - kwargs = {'bots': ids,
315 - 'dbname': language + project,
316 - 'language': language,
317 - 'project': project,
318 - 'pbar': True,
319 - 'destination': 'file',
320 - 'nr_input_processors': settings.number_of_processes,
321 - 'nr_output_processors': settings.number_of_processes,
322 - 'input': input,
323 - 'output': output,
324 - }
 337+ kwargs['destination'] = 'file'
325338
326 - chunks = utils.split_list(files, settings.number_of_processes)
327 - pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, **kwargs)
 339+
 340+ tasks = multiprocessing.JoinableQueue()
 341+ consumers = [XMLFileConsumer(tasks, None) for i in xrange(settings.number_of_processes)]
 342+ for file in files:
 343+ tasks.put(XMLFile(input, output, file, bots, **kwargs))
 344+ for x in xrange(settings.number_of_processes):
 345+ tasks.put(None)
328346
 347+ print tasks.qsize()
 348+ for w in consumers:
 349+ w.start()
329350
 351+ tasks.join()
 352+
 353+ #chunks = utils.split_list(files, settings.number_of_processes)
 354+ #pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, **kwargs)
 355+
 356+
330357 def debug_parse_editors(dbname):
331358 q = JoinableQueue()
332359 parse_editors('522.xml', q, None, None, debug=True, destination='file')
@@ -334,5 +361,4 @@
335362
336363 if __name__ == "__main__":
337364 #debug_parse_editors('test2')
338 - run_parse_editors(settings.input_location, 'en', 'wiki')
339 - pass
 365+ run_parse_editors(os.path.join(settings.input_location, 'en', 'wiki'))
Index: trunk/tools/editor_trends/etl/store.py
@@ -0,0 +1,97 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2010-11-19'
 19+__version__ = '0.1'
 20+
 21+
 22+from Queue import Empty
 23+import datetime
 24+import sys
 25+sys.path.append('..')
 26+
 27+import configuration
 28+settings = configuration.Settings()
 29+
 30+from database import cache
 31+
 32+
 33+def store_editors(data_queue, **kwargs):
 34+ '''
 35+ @data_queue is an instance of Queue containing information extracted by
 36+ parse_editors()
 37+ @pids is a list of PIDs used to check if other processes are finished
 38+ running
 39+ @dbname is the name of the MongoDB collection where to store the information.
 40+ '''
 41+ dbname = kwargs.get('dbname', None)
 42+ mongo = db.init_mongo_db(dbname)
 43+ collection = mongo['editors']
 44+ mongo[collection].ensure_index('editor')
 45+ editor_cache = cache.EditorCache(collection)
 46+
 47+ while True:
 48+ try:
 49+ edit = data_queue.get(block=False)
 50+ data_queue.task_done()
 51+ if edit == None:
 52+ print 'Swallowing poison pill'
 53+ break
 54+ elif edit == 'NEXT':
 55+ editor_cache.add('NEXT', '')
 56+ else:
 57+ contributor = edit['editor']
 58+ value = {'date': edit['date'], 'article': edit['article']}
 59+ editor_cache.add(contributor, value)
 60+ #collection.update({'editor': contributor}, {'$push': {'edits': value}}, True)
 61+ #'$inc': {'edit_count': 1},
 62+
 63+ except Empty:
 64+ '''
 65+ This checks whether the Queue is empty because the preprocessors are
 66+ finished or because this function is faster in emptying the Queue
 67+ then the preprocessors are able to fill it. If the preprocessors
 68+ are finished and this Queue is empty than break, else wait for the
 69+ Queue to fill.
 70+ '''
 71+ pass
 72+
 73+ print 'Emptying entire cache.'
 74+ editor_cache.store()
 75+ print 'Time elapsed: %s and processed %s items.' % (datetime.datetime.now() - editor_cache.init_time, editor_cache.cumulative_n)
 76+
 77+
 78+def load_cache_objects():
 79+ cache = {}
 80+ files = utils.retrieve_file_list(settings.binary_location, '.bin')
 81+ for x, file in enumerate(files):
 82+ cache[x] = utils.load_object(settings.binary_location, file)
 83+ return cache
 84+
 85+
 86+def search_cache_for_missed_editors(dbname):
 87+ mongo = db.init_mongo_db(dbname)
 88+ collection = mongo['editors']
 89+ editor_cache = cache.EditorCache(collection)
 90+ cache = load_cache_objects()
 91+ for c in cache:
 92+ for editor in cache[c]:
 93+ editor_cache.add(editor, cache[c][editor])
 94+ cache[c] = {}
 95+ editor_cache.add('NEXT', '')
 96+ cache = {}
 97+
 98+
Property changes on: trunk/tools/editor_trends/etl/store.py
___________________________________________________________________
Added: svn:eol-style
199 + native
Index: trunk/tools/editor_trends/etl/chunker.py
@@ -112,10 +112,10 @@
113113 return True
114114
115115
116 -def write_xml_file(element, fh, counter, language):
 116+def write_xml_file(element, fh, output, counter):
117117 '''Get file handle and write xml element to file'''
118118 size = len(cElementTree.tostring(element))
119 - fh, counter = create_file_handle(fh, counter, size, language)
 119+ fh, counter = create_file_handle(fh, output, counter, size)
120120 try:
121121 fh.write(cElementTree.tostring(element))
122122 except MemoryError:
@@ -124,15 +124,15 @@
125125 return fh, counter
126126
127127
128 -def create_file_handle(fh, counter, size, language):
 128+def create_file_handle(fh, output, counter, size):
129129 '''Create file handle if none is supplied or if file size > max file size.'''
130130 if not counter:
131131 counter = 0
132 - path = os.path.join(settings.input_location, language, '%s.xml' % counter)
 132+ path = os.path.join(output, '%s.xml' % counter)
133133 if not fh:
134134 fh = codecs.open(path, 'w', encoding=settings.encoding)
135135 return fh, counter
136 - elif (fh.tell() + size) > settings.binary_location:
 136+ elif (fh.tell() + size) > settings.max_settings_xmlfile_size:
137137 print 'Created chunk %s' % counter
138138 fh.close
139139 counter += 1
@@ -156,10 +156,11 @@
157157 return flat
158158
159159
160 -def split_file(output, input, project, language_code, language, format='xml'):
 160+def split_file(location, file, project, language_code, language, format='xml'):
161161 '''Reads xml file and splits it in N chunks'''
162162 #location = os.path.join(settings.input_location, language)
163 - output = os.path.join(output, language_code, project)
 163+ input = os.path.join(location, file)
 164+ output = os.path.join(location, 'chunks')
164165 settings.verify_environment([output])
165166 if format == 'xml':
166167 fh = None
@@ -170,15 +171,12 @@
171172 ns = load_namespace(language_code)
172173 ns = build_namespaces_locale(ns)
173174
174 -
 175+ settings.xml_namespace = 'http://www.mediawiki.org/xml/export-0.3/'
175176 counter = None
176177 tag = '{%s}page' % settings.xml_namespace
177 -
178 -
179178 context = cElementTree.iterparse(input, events=('start', 'end'))
180179 context = iter(context)
181180 event, root = context.next() #get the root element of the XML doc
182 -
183181 try:
184182 for event, elem in context:
185183 if event == 'end':
@@ -188,7 +186,7 @@
189187 page = elem.find('id').text
190188 elem = parse_comments(elem, remove_numeric_character_references)
191189 if format == 'xml':
192 - fh, counter = write_settings.input_filename(elem, fh, counter, language_code)
 190+ fh, counter = write_xml_file(elem, fh, output, counter)
193191 else:
194192 data = [el.getchildren() for el in elem if el.tag == 'revision']
195193 data = flatten_xml_elements(data, page)
Index: trunk/tools/editor_trends/etl/transformer.py
@@ -0,0 +1,215 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2010-11-02'
 19+__version__ = '0.1'
 20+
 21+import multiprocessing
 22+from Queue import Empty
 23+from operator import itemgetter
 24+import datetime
 25+import sys
 26+
 27+sys.path.append('..')
 28+import configuration
 29+settings = configuration.Settings()
 30+from database import db
 31+from utils import process_constructor as pc
 32+from utils import utils
 33+from utils import models
 34+import construct_datasets
 35+
 36+
 37+try:
 38+ import psyco
 39+ psyco.full()
 40+except ImportError:
 41+ pass
 42+
 43+
 44+class EditorConsumer(models.BaseConsumer):
 45+
 46+ def run(self):
 47+ while True:
 48+ new_editor = self.task_queue.get()
 49+ self.task_queue.task_done()
 50+ if new_editor == None:
 51+ break
 52+ new_editor()
 53+
 54+
 55+class Editor(object):
 56+ def __init__(self, dbname, id, **kwargs):
 57+ self.dbname = dbname
 58+ self.id = id
 59+ for kw in kwargs:
 60+ setattr(self, kw, kwargs[kw])
 61+
 62+ def __str__(self):
 63+ return '%s' % (self.id)
 64+ # mongo = db.init_mongo_db(dbname)
 65+ # input = mongo[dbname]
 66+ # output = mongo['dataset']
 67+ # output.ensure_index('editor')
 68+ # output.ensure_index('year_joined')
 69+
 70+ def __call__(self):
 71+ self.mongo = db.init_mongo_db(self.dbname)
 72+ input_db = self.mongo['editors']
 73+ output_db = self.mongo['dataset']
 74+
 75+ output_db.ensure_index('editor')
 76+ output_db.create_index('editor')
 77+
 78+ editor = input_db.find_one({'editor': self.id})
 79+ if editor == None:
 80+ return
 81+ edits = editor['edits']
 82+ username = editor['username']
 83+ monthly_edits = determine_edits_by_month(edits)
 84+ edits = sort_edits(edits)
 85+ edit_count = len(edits)
 86+ new_wikipedian = edits[9]['date']
 87+ first_edit = edits[0]['date']
 88+ final_edit = edits[-1]['date']
 89+ edits_by_year = determine_edits_by_year(edits)
 90+ articles_by_year = determine_articles_by_year(edits)
 91+ edits = edits[:10]
 92+ output_db.insert({'editor': self.id,
 93+ 'edits': edits,
 94+ 'edits_by_year': edits_by_year,
 95+ 'new_wikipedian': new_wikipedian,
 96+ 'edit_count': edit_count,
 97+ 'final_edit': final_edit,
 98+ 'first_edit': first_edit,
 99+ 'articles_by_year': articles_by_year,
 100+ 'monthly_edits': monthly_edits,
 101+ 'username': username
 102+ })
 103+
 104+def create_datacontainer(init_value=0):
 105+ '''
 106+ This function initializes an empty dictionary with as key the year (starting
 107+ 2001 and running through) and as value @init_value, in most cases this will
 108+ be zero so the dictionary will act as a running tally for a variable but
 109+ @init_value can also a list, [], or a dictionary, {}, or a set, set().
 110+ '''
 111+ data = {}
 112+ year = datetime.datetime.now().year + 1
 113+ for x in xrange(2001, year):
 114+ if init_value == 'set':
 115+ data[str(x)] = set()
 116+ else:
 117+ data[str(x)] = init_value
 118+ return data
 119+
 120+
 121+def add_months_to_datacontainer(datacontainer):
 122+ for dc in datacontainer:
 123+ datacontainer[dc] = {}
 124+ for x in xrange(1, 13):
 125+ datacontainer[dc][str(x)] = 0
 126+ return datacontainer
 127+
 128+
 129+def determine_edits_by_month(edits):
 130+ datacontainer = create_datacontainer(init_value=0)
 131+ datacontainer = add_months_to_datacontainer(datacontainer)
 132+ for year in edits:
 133+ months = set()
 134+ for edit in edits[year]:
 135+ m = str(edit['date'].month)
 136+ if m not in months:
 137+ datacontainer[year][m] = 1
 138+ months.add(m)
 139+ if len(months) == 12:
 140+ break
 141+ return datacontainer
 142+
 143+
 144+def determine_edits_by_year(dates):
 145+ '''
 146+ This function counts the number of edits by year made by a particular editor.
 147+ '''
 148+ edits = create_datacontainer()
 149+ for date in dates:
 150+ year = str(date['date'].year)
 151+ edits[year] += 1
 152+ return edits
 153+
 154+
 155+def determine_articles_by_year(dates):
 156+ '''
 157+ This function counts the number of unique articles by year edited by a
 158+ particular editor.
 159+ '''
 160+ articles = create_datacontainer('set')
 161+ for date in dates:
 162+ year = str(date['date'].year)
 163+ articles[year].add(date['article'])
 164+ for year in articles:
 165+ articles[year] = len(articles[year])
 166+ return articles
 167+
 168+
 169+def sort_edits(edits):
 170+ edits = utils.merge_list(edits)
 171+ return sorted(edits, key=itemgetter('date'))
 172+
 173+#def optimize_editors(input_queue, result_queue, pbar, **kwargs):
 174+# dbname = kwargs.pop('dbname')
 175+# mongo = db.init_mongo_db(dbname)
 176+# input = mongo[dbname]
 177+# output = mongo['dataset']
 178+# output.ensure_index('editor')
 179+# output.ensure_index('year_joined')
 180+# definition = kwargs.pop('definition')
 181+
 182+
 183+def run_optimize_editors(dbname):
 184+ ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors')
 185+ kwargs = {'definition': 'traditional',
 186+ 'pbar': True,
 187+ }
 188+ #input_db = db.init_mongo_db(dbname)
 189+ #output_db = db.init_mongo_db('dataset')
 190+ tasks = multiprocessing.JoinableQueue()
 191+ consumers = [EditorConsumer(tasks, None) for i in xrange(settings.number_of_processes)]
 192+
 193+ for id in ids:
 194+ tasks.put(Editor(dbname, id))
 195+ for x in xrange(settings.number_of_processes):
 196+ tasks.put(None)
 197+
 198+ print tasks.qsize()
 199+ for w in consumers:
 200+ w.start()
 201+
 202+ tasks.join()
 203+
 204+
 205+def debug_optimize_editors(dbname):
 206+ ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors')
 207+ q = pc.load_queue(ids)
 208+ kwargs = {'definition': 'traditional',
 209+ 'dbname': dbname
 210+ }
 211+ optimize_editors(q, False, True, kwargs)
 212+
 213+
 214+if __name__ == '__main__':
 215+ #debug_optimize_editors('test')
 216+ run_optimize_editors('enwiki')
Property changes on: trunk/tools/editor_trends/etl/transformer.py
___________________________________________________________________
Added: svn:eol-style
1217 + native
Index: trunk/tools/editor_trends/etl/loader.py
@@ -32,23 +32,21 @@
3333
3434
3535
36 -def store_editors(input, filename, dbname):
 36+def store_editors(input, filename, dbname, collection):
3737 fh = utils.create_txt_filehandle(input, filename, 'r', settings.encoding)
3838 mongo = db.init_mongo_db(dbname)
39 - collection = mongo['test']
40 - mongo.collection.ensure_index('editor')
41 - mongo.collection.create_index('editor')
 39+ collection = mongo[collection]
 40+ collection.ensure_index('editor')
 41+ collection.create_index('editor')
4242 editor_cache = cache.EditorCache(collection)
4343 prev_contributor = -1
4444 x = 0
4545 edits = 0
4646 editors = set()
47 - for line in readline(fh):
 47+ for line in sort.readline(fh):
4848 if len(line) == 0:
4949 continue
5050 contributor = int(line[0])
51 - if contributor == 5767932:
52 - print 'debug'
5351 if prev_contributor != contributor:
5452 if edits >= 10:
5553 result = editor_cache.add(prev_contributor, 'NEXT')
@@ -61,9 +59,11 @@
6260 editor_cache.clear(prev_contributor)
6361 edits = 0
6462 edits += 1
65 - date = utils.convert_timestamp_to_date(line[1]) #+ datetime.timedelta(days=1)
 63+ date = utils.convert_timestamp_to_datetime_utc(line[1]) #+ datetime.timedelta(days=1)
6664 article_id = int(line[2])
67 - value = {'date': date, 'article': article_id}
 65+ username = line[3].encode(settings.encoding)
 66+ #print line[3]
 67+ value = {'date': date, 'article': article_id, 'username': username}
6868 editor_cache.add(contributor, value)
6969 prev_contributor = contributor
7070 fh.close()
@@ -80,48 +80,42 @@
8181 chunks = utils.split_list(files, int(x))
8282 '''1st iteration external mergesort'''
8383 for chunk in chunks:
84 - #filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.encoding) for file in chunks[chunk]]
85 - #filename = sort.merge_sorted_files(output, filehandles, chunk)
86 - #filehandles = [fh.close() for fh in filehandles]
87 - pass
 84+ filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.encoding) for file in chunks[chunk]]
 85+ filename = sort.merge_sorted_files(output, filehandles, chunk)
 86+ filehandles = [fh.close() for fh in filehandles]
 87+# pass
8888 '''2nd iteration external mergesort, if necessary'''
8989 if len(chunks) > 1:
9090 files = utils.retrieve_file_list(output, 'txt', mask='[merged]')
9191 filehandles = [utils.create_txt_filehandle(output, file, 'r', settings.encoding) for file in files]
9292 filename = sort.merge_sorted_files(output, filehandles, 'final')
9393 filehandles = [fh.close() for fh in filehandles]
94 - filename = 'merged_final.txt'
95 - store_editors(output, filename, dbname)
 94+ filename = 'merged_final.txt'
 95+ return filename
9696
9797
98 -def mergesort_feeder(input_queue, result_queue, **kwargs):
 98+def mergesort_feeder(task_queue, **kwargs):
9999 input = kwargs.get('input', None)
100100 output = kwargs.get('output', None)
101 - while True:
102 - try:
103 - file = input_queue.get(block=False)
104 - fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding)
105 - data = fh.readlines()
106 - fh.close()
107 - data = [d.replace('\n', '') for d in data]
108 - data = [d.split('\t') for d in data]
109 - sorted_data = sort.mergesort(data)
110 - sort.write_sorted_file(sorted_data, file, output)
111 - except Empty:
112 - break
 101+ #while True:
 102+ # try:
 103+ #file = input_queue.get(block=False)
 104+ for file in task_queue:
 105+ fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding)
 106+ data = fh.readlines()
 107+ fh.close()
 108+ data = [d.replace('\n', '') for d in data]
 109+ data = [d.split('\t') for d in data]
 110+ sorted_data = sort.mergesort(data)
 111+ sort.write_sorted_file(sorted_data, file, output)
113112
114113
115114 def mergesort_launcher(input, output):
116 - kwargs = {'pbar': True,
117 - 'nr_input_processors': settings.number_of_processes,
118 - 'nr_output_processors': settings.number_of_processes,
119 - 'input': input,
120 - 'output': output,
121 - 'poison_pill': False
122 - }
 115+ settings.verify_environment([input, output])
123116 files = utils.retrieve_file_list(input, 'txt')
124 - chunks = utils.split_list(files, settings.number_of_processes)
125 - pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs)
 117+ mergesort_feeder(files, input=input, output=output)
 118+ #chunks = utils.split_list(files, settings.number_of_processes)
 119+ #pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs)
126120
127121
128122 def debug_mergesort_feeder(input, output):
Index: trunk/tools/editor_trends/config.py
@@ -21,46 +21,30 @@
2222 import os
2323 import ConfigParser
2424
25 -
26 -import configuration
27 -settings = configuration.Settings()
2825 from utils import utils
2926
3027
31 -
32 -def load_configuration(args):
 28+def create_configuration(settings, args):
3329 config = ConfigParser.RawConfigParser()
34 - if not utils.check_file_exists(settings.working_directory, 'wiki.cfg'):
35 - working_directory = raw_input('Please indicate where you installed Editor Trends Analytics.\nCurrent location is %s\nPress Enter to accept default.' % os.getcwd())
36 - if working_directory == '':
37 - working_directory = os.getcwd()
3830
39 - settings.input_location = raw_input('Please indicate where to store the Wikipedia dump files.\nDefault is: %s\nPress Enter to accept default.' % settings.input_location)
40 - if settings.input_location == '':
41 - settings.input_location = settings.input_location
42 -
43 - create_configuration(working_directory=working_directory, input_location=settings.input_location)
44 -
45 - config.read('wiki.cfg')
46 - settings.working_directory = config.get('file_locations', 'working_directory')
47 - settings.input_location = config.get('file_locations', 'settings.input_location')
48 -
49 -
50 -def create_configuration(**kwargs):
51 - working_directory = kwargs.get('working_directory', settings.working_directory)
 31+ working_directory = raw_input('Please indicate where you installed Editor Trends Analytics.\nCurrent location is %s\nPress Enter to accept default.' % os.getcwd())
 32+ input_location = raw_input('Please indicate where to store the Wikipedia dump files.\nDefault is: %s\nPress Enter to accept default.' % settings.input_location)
 33+ input_location = input_location if len(input_location) > 0 else settings.input_location
 34+ working_directory = working_directory if len(working_directory) > 0 else os.getcwd()
 35+
5236 config = ConfigParser.RawConfigParser()
5337 config.add_section('file_locations')
5438 config.set('file_locations', 'working_directory', working_directory)
55 - config.set('file_locations', 'settings.input_location', kwargs.get('settings.input_location', settings.input_location))
 39+ config.set('file_locations', 'input_location', input_location)
5640
5741 fh = utils.create_binary_filehandle(working_directory, 'wiki.cfg', 'wb')
5842 config.write(fh)
59 - fh.close()
 43+ fh.close()
 44+
 45+ settings.working_directory = config.get('file_locations', 'working_directory')
 46+ settings.input_location = config.get('file_locations', 'input_location')
 47+ return settings
6048
6149
6250 if __name__ == '__main__':
63 - p =detect_windows_program('7zip')
64 - print p
65 - #load_configuration([])
66 -
67 -
 51+ pass
\ No newline at end of file
Index: trunk/tools/editor_trends/configuration.py
@@ -23,6 +23,7 @@
2424 '''
2525
2626 from multiprocessing import cpu_count
 27+import ConfigParser
2728 import os
2829 import sys
2930 import platform
@@ -42,7 +43,7 @@
4344
4445 class Settings(object):
4546
46 - def __init__(self, debug=True, process_multiplier=1):
 47+ def __init__(self, debug=True, process_multiplier=1, **kwargs):
4748 self.debug = debug
4849 self.progressbar = True
4950 self.encoding = 'utf-8'
@@ -69,7 +70,8 @@
7071 self.max_filehandles = self.determine_max_filehandles_open()
7172
7273 self.windows_register = {'7zip': 'Software\\7-Zip', }
73 -
 74+ self.load_configuration()
 75+ self.set_custom_settings(**kwargs)
7476 self.projects = {'commons': 'commonswiki',
7577 'wikibooks': 'wikibooks',
7678 'wikinews': 'wikinews',
@@ -88,7 +90,17 @@
8991 'multilingual wikisource': None
9092 }
9193
 94+ def set_custom_settings(self, **kwargs):
 95+ for kw in kwargs:
 96+ setattr(self, kw, kwargs[kw])
9297
 98+ def load_configuration(self):
 99+ if os.path.exists(os.path.join(self.working_directory, 'wiki.cfg')):
 100+ config = ConfigParser.RawConfigParser()
 101+ config.read(os.path.join(self.working_directory, 'wiki.cfg'))
 102+ self.working_directory = config.get('file_locations', 'working_directory')
 103+ self.input_location = config.get('file_locations', 'input_location')
 104+
93105 def determine_working_directory(self):
94106 cwd = os.getcwd()
95107 if not cwd.endswith('editor_trends%s' % os.sep):
@@ -105,10 +117,9 @@
106118
107119 def verify_environment(self, directories):
108120 for dir in directories:
109 - result = os.path.exists(dir)
110 - if not result:
 121+ if not os.path.exists(dir):
111122 try:
112 - os.mkdir(dir)
 123+ os.makedirs(dir)
113124 except IOError:
114125 raise 'Configuration Error, could not create directory.'
115126
Index: trunk/tools/editor_trends/utils/utils.py
@@ -30,6 +30,7 @@
3131 import codecs
3232 import os
3333 import ctypes
 34+import time
3435
3536 import configuration
3637 settings = configuration.Settings()
@@ -49,13 +50,21 @@
5051
5152
5253 def convert_timestamp_to_date(timestamp):
53 - return datetime.datetime.strptime(timestamp[:10], settings.DATE_FORMAT)
 54+ return datetime.datetime.strptime(timestamp[:10], settings.date_format)
5455
5556
56 -def convert_timestamp_to_datetime(timestamp):
57 - return datetime.datetime.strptime(timestamp, settings.DATETIME_FORMAT)
 57+def convert_timestamp_to_datetime_naive(timestamp):
 58+ return datetime.datetime.strptime(timestamp, settings.timestamp_format)
5859
5960
 61+def convert_timestamp_to_datetime_utc(timestamp):
 62+ tz = datetime.tzinfo('utc')
 63+ d = convert_timestamp_to_datetime_naive(timestamp)
 64+ #return d.replace(tzinfo=tz) #enabling this line crashes pymongo
 65+ return d
 66+
 67+
 68+
6069 def check_if_process_is_running(pid):
6170 try:
6271 if settings.OS == 'Windows':
Index: trunk/tools/editor_trends/utils/models.py
@@ -20,27 +20,29 @@
2121 import multiprocessing
2222
2323
24 -class ProcessInputQueue(multiprocessing.Process):
 24+class BaseConsumer(multiprocessing.Process):
2525
26 - def __init__(self, target, input_queue, result_queue, **kwargs):
 26+ def __init__(self, task_queue, result_queue):
2727 multiprocessing.Process.__init__(self)
28 - self.input_queue = input_queue
 28+ self.task_queue = task_queue
2929 self.result_queue = result_queue
30 - self.target = target
31 - for kw in kwargs:
32 - setattr(self, kw, kwargs[kw])
3330
34 - def start(self):
35 - proc_name = self.name
36 - kwargs = {}
37 - IGNORE = ['input_queue', 'result_queue', 'target']
38 - for kw in self.__dict__:
39 - if kw not in IGNORE and not kw.startswith('_'):
40 - kwargs[kw] = getattr(self, kw)
41 - self._popen = True
42 - self.target(self.input_queue, self.result_queue, **kwargs)
4331
 32+
4433
 34+# for kw in kwargs:
 35+# setattr(self, kw, kwargs[kw])
 36+#
 37+# def run(self):
 38+# proc_name = self.name
 39+# kwargs = {}
 40+# IGNORE = ['input_queue', 'result_queue', 'target']
 41+# for kw in self.__dict__:
 42+# if kw not in IGNORE and not kw.startswith('_'):
 43+# kwargs[kw] = getattr(self, kw)
 44+# self.target(self.input_queue, self.result_queue, **kwargs)
 45+
 46+
4547 class ProcessResultQueue(multiprocessing.Process):
4648
4749 def __init__(self, target, result_queue, **kwargs):
@@ -51,12 +53,11 @@
5254 setattr(self, kw, kwargs[kw])
5355
5456
55 - def start(self):
 57+ def run(self):
5658 proc_name = self.name
5759 kwargs = {}
5860 IGNORE = ['result_queue', 'target']
5961 for kw in self.__dict__:
6062 if kw not in IGNORE and not kw.startswith('_'):
6163 kwargs[kw] = getattr(self, kw)
62 - self._popen = True
6364 self.target(self.result_queue, **kwargs)
Index: trunk/tools/editor_trends/utils/process_constructor.py
@@ -63,9 +63,9 @@
6464 input_queues = {}
6565 result_queues = {}
6666
67 - #assert len(obj) == nr_input_processors
68 - #if result_queue:
69 - # assert len(obj)== nr_output_processors
 67+ assert len(obj) == nr_input_processors
 68+ if result_queue:
 69+ assert len(obj)== nr_output_processors
7070
7171 for i, o in enumerate(obj):
7272 input_queues[i] = load_input_queue(obj[o], poison_pill=poison_pill)
@@ -93,7 +93,7 @@
9494 result_processes = [models.ProcessResultQueue(result_processor,
9595 result_queues[i], **kwargs) for i in xrange(nr_output_processors)]
9696 for result_process in result_processes:
97 - result_process.start()
 97+ result_process.start(result_process.input_queue)
9898
9999 for input_process in input_processes:
100100 print 'Waiting for input process to finish'
Index: trunk/tools/editor_trends/utils/sort.py
@@ -75,8 +75,10 @@
7676
7777
7878 def readline(file):
 79+ '''
 80+ @file should be a file object
 81+ '''
7982 for line in file:
80 - print file.stream.name
8183 line = line.replace('\n', '')
8284 if line == '':
8385 continue
Index: trunk/tools/editor_trends/database/sqlite_logic.py
@@ -0,0 +1,156 @@
 2+def retrieve_editor_ids_db():
 3+ contributors = set()
 4+ connection = db.init_database()
 5+ cursor = connection.cursor()
 6+ if settings.PROGRESS_BAR:
 7+ cursor.execute('SELECT MAX(ROWID) FROM contributors')
 8+ for id in cursor:
 9+ pass
 10+ pbar = progressbar.ProgressBar(maxval=id[0]).start()
 11+
 12+ cursor.execute('SELECT contributor FROM contributors WHERE bot=0')
 13+
 14+ print 'Retrieving contributors...'
 15+ for x, contributor in enumerate(cursor):
 16+ contributors.add(contributor[0])
 17+ if x % 100000 == 0:
 18+ pbar.update(x)
 19+ print 'Serializing contributors...'
 20+ utils.store_object(contributors, 'contributors')
 21+ print 'Finished serializing contributors...'
 22+
 23+ if pbar:
 24+ pbar.finish()
 25+ print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed))
 26+
 27+ connection.close()
 28+
 29+def retrieve_edits_by_contributor(input_queue, result_queue, pbar):
 30+ connection = db.init_database()
 31+ cursor = connection.cursor()
 32+
 33+ while True:
 34+ try:
 35+ contributor = input_queue.get(block=False)
 36+ if contributor == None:
 37+ break
 38+
 39+ cursor.execute('SELECT contributor, timestamp, bot FROM contributors WHERE contributor=?', (contributor,))
 40+ edits = {}
 41+ edits[contributor] = set()
 42+ for edit, timestamp, bot in cursor:
 43+ date = utils.convert_timestamp_to_date(timestamp)
 44+ edits[contributor].add(date)
 45+ #print edit, timestamp, bot
 46+
 47+ utils.write_data_to_csv(edits, retrieve_edits_by_contributor)
 48+ if pbar:
 49+ utils.update_progressbar(pbar, input_queue)
 50+
 51+ except Empty:
 52+ pass
 53+
 54+ connection.close()
 55+
 56+
 57+def store_data_db(data_queue, pids):
 58+ connection = db.init_database()
 59+ cursor = connection.cursor()
 60+ db.create_tables(cursor, db_settings.CONTRIBUTOR_TABLE)
 61+ empty = 0
 62+ values = []
 63+ while True:
 64+ try:
 65+ chunk = data_queue.get(block=False)
 66+ contributor = chunk['contributor'].encode(settings.encoding)
 67+ article = chunk['article']
 68+ timestamp = chunk['timestamp'].encode(settings.encoding)
 69+ bot = chunk['bot']
 70+ values.append((contributor, article, timestamp, bot))
 71+
 72+ if len(values) == 50000:
 73+ cursor.executemany('INSERT INTO contributors VALUES (?,?,?,?)', values)
 74+ connection.commit()
 75+ #print 'Size of queue: %s' % data_queue.qsize()
 76+ values = []
 77+
 78+ except Empty:
 79+
 80+ if all([utils.check_if_process_is_running(pid) for pid in pids]):
 81+ pass
 82+ else:
 83+ break
 84+ connection.close()
 85+
 86+
 87+def create_bots_db(db_name):
 88+ '''
 89+ This function reads the csv file provided by Erik Zachte and constructs a
 90+ sqlite memory database. The reason for this is that I suspect I will need
 91+ some simple querying capabilities in the future, else a dictionary would
 92+ suffice.
 93+ '''
 94+ connection = db.init_database('db_name')
 95+ #connection = db.init_database('data/database/bots.db')
 96+ cursor = connection.cursor()
 97+ db.create_tables(cursor, db_settings.BOT_TABLE)
 98+ values = []
 99+ fields = [field[0] for field in db_settings.BOT_TABLE['bots']]
 100+ for line in utils.read_data_from_csv('data/csv/StatisticsBots.csv', settings.encoding):
 101+ line = line.split(',')
 102+ row = []
 103+ for x, (field, value) in enumerate(zip(fields, line)):
 104+ if db_settings.BOT_TABLE['bots'][x][1] == 'INTEGER':
 105+ value = int(value)
 106+ elif db_settings.BOT_TABLE['bots'][x][1] == 'TEXT':
 107+ value = value.replace('/', '-')
 108+ #print field, value
 109+ row.append(value)
 110+ values.append(row)
 111+
 112+ cursor.executemany('INSERT INTO bots VALUES (?,?,?,?,?,?,?,?,?,?);', values)
 113+ connection.commit()
 114+ if db_name == ':memory':
 115+ return cursor
 116+ else:
 117+ connection.close()
 118+
 119+def retrieve_botnames_without_id(cursor, language):
 120+ return cursor.execute('SELECT name FROM bots WHERE language=?', (language,)).fetchall()
 121+
 122+
 123+def add_id_to_botnames():
 124+ '''
 125+ This is the worker function for the multi-process version of
 126+ lookup_username.First, the names of the bots are retrieved, then the
 127+ multiprocess is launched by making a call to pc.build_scaffolding. This is a
 128+ generic launcher that takes as input the function to load the input_queue,
 129+ the function that will do the main work and the objects to be put in the
 130+ input_queue. The launcher also accepts optional keyword arguments.
 131+ '''
 132+ cursor = create_bots_db(':memory')
 133+ files = utils.retrieve_file_list(settings.input_location, 'xml')
 134+
 135+ botnames = retrieve_botnames_without_id(cursor, 'en')
 136+ bots = {}
 137+ for botname in botnames:
 138+ bots[botname[0]] = 1
 139+ pc.build_scaffolding(pc.load_queue, lookup_username, files, bots=bots)
 140+ cursor.close()
 141+
 142+
 143+def debug_lookup_username():
 144+ '''
 145+ This function launches the lookup_username function but then single
 146+ threaded, this eases debugging. That's also the reason why the queue
 147+ parameters are set to None. When launching this function make sure that
 148+ debug=False when calling lookup_username
 149+ '''
 150+ cursor = create_bots_db(':memory')
 151+ botnames = retrieve_botnames_without_id(cursor, 'en')
 152+ bots = {}
 153+ for botname in botnames:
 154+ bots[botname[0]] = 1
 155+
 156+ lookup_username('12.xml', None, None, bots, debug=True)
 157+ cursor.close()
Index: trunk/tools/editor_trends/database/cache.py
@@ -70,7 +70,7 @@
7171
7272 def add(self, key, value):
7373 if value == 'NEXT':
74 - result = self.insert(key, self.editors[key]['edits'])
 74+ result = self.insert(key, self.editors[key]['edits'], self.editors[key]['username'])
7575 self.n -= self.editors[key]['obs']
7676 self.number_editors -= 1
7777 del self.editors[key]
@@ -84,11 +84,13 @@
8585 self.editors[key]['edits'] = {}
8686 self.add_years(key)
8787 self.number_editors += 1
88 -
 88+ self.editors[key]['username'] = value['username']
 89+
8990 id = str(self.editors[key]['obs'])
9091 year = str(value['date'].year)
9192 self.editors[key]['edits'][year].append(value)
9293 self.editors[key]['obs'] += 1
 94+
9395
9496 #if self.editors[key]['obs'] == self.treshold:
9597 # self.treshold_editors.add(key)
@@ -102,9 +104,9 @@
103105 def update(self, editor, values):
104106 self.collection.update({'editor': editor}, {'$pushAll': {'edits': values}}, upsert=True)
105107
106 - def insert(self, editor, values):
 108+ def insert(self, editor, values, username):
107109 try:
108 - self.collection.insert({'editor': editor, 'edits': values})
 110+ self.collection.insert({'editor': editor, 'edits': values, 'username': username})
109111 return True
110112 except:
111113 return False

Status & tagging log