Index: trunk/tools/editor_trends/manage.py |
— | — | @@ -20,6 +20,7 @@ |
21 | 21 | import os |
22 | 22 | import sys |
23 | 23 | import subprocess |
| 24 | +import datetime |
24 | 25 | from argparse import ArgumentParser |
25 | 26 | from argparse import RawTextHelpFormatter |
26 | 27 | import locale |
— | — | @@ -34,17 +35,30 @@ |
35 | 36 | from utils import dump_downloader |
36 | 37 | from etl import chunker |
37 | 38 | from etl import extract |
38 | | -from etl import optimize_editors |
39 | | -from etl import construct_datasets |
| 39 | +from etl import loader |
| 40 | +from etl import transformer |
| 41 | +from etl import exporter |
40 | 42 | import config |
41 | 43 | |
42 | 44 | |
| 45 | +class Timer(object): |
| 46 | + def __init__(self): |
| 47 | + self.t0 = datetime.datetime.now() |
| 48 | + |
| 49 | + def stop(self): |
| 50 | + self.t1 = datetime.datetime.now() |
| 51 | + |
| 52 | + def elapsed(self): |
| 53 | + self.stop() |
| 54 | + print 'Processing time: %s' % (self.t1 - self.t0) |
| 55 | + |
| 56 | + |
43 | 57 | def get_value(args, key): |
44 | 58 | return getattr(args, key, None) |
45 | 59 | |
46 | 60 | |
47 | | -def config_launcher(args, location, filename, project, full_project, language_code, language): |
48 | | - config.load_configuration(args) |
| 61 | +def config_launcher(args, **kwargs): |
| 62 | + settings.load_configuration() |
49 | 63 | |
50 | 64 | |
51 | 65 | def determine_default_language(): |
— | — | @@ -99,10 +113,11 @@ |
100 | 114 | return locations |
101 | 115 | |
102 | 116 | |
103 | | - |
104 | | - |
105 | | -def show_settings(args, location, filename, project, full_project, language_code, language): |
106 | | - project = settings.projects.get(project, 'wiki') |
| 117 | +def show_settings(args, **kwargs): |
| 118 | + project = settings.projects.get(kwargs.pop('project'), 'wiki') |
| 119 | + language_code = kwargs.pop('language_code') |
| 120 | + language = kwargs.pop('language') |
| 121 | + location = kwargs.pop('location') |
107 | 122 | project = project.title() |
108 | 123 | language_map = utils.invert_dict(languages.MAPPING) |
109 | 124 | print 'Project: %s' % (project) |
— | — | @@ -111,18 +126,30 @@ |
112 | 127 | print 'Output directory: %s and subdirectories' % location |
113 | 128 | |
114 | 129 | |
115 | | -def dump_downloader_launcher(args, location, filename, project, full_project, language_code, language): |
| 130 | +def dump_downloader_launcher(args, **kwargs): |
116 | 131 | print 'dump downloader' |
| 132 | + timer = Timer() |
| 133 | + filename = kwargs.get('filename') |
| 134 | + extension = kwargs.get('extension') |
| 135 | + location = kwargs.get('location') |
117 | 136 | pbar = get_value(args, 'progress') |
118 | 137 | domain = settings.wp_dump_location |
119 | 138 | path = '/%s/latest/' % project |
120 | 139 | extension = utils.determine_file_extension(filename) |
121 | 140 | filemode = utils.determine_file_mode(extension) |
122 | 141 | dump_downloader.download_wiki_file(domain, path, filename, location, filemode, pbar) |
| 142 | + timer.elapsed() |
123 | 143 | |
124 | 144 | |
125 | | -def cruncher_launcher(args, location, filename, project, full_project, language_code, language): |
| 145 | +def chunker_launcher(args, **kwargs): |
126 | 146 | print 'split_settings.input_filename_launcher' |
| 147 | + timer = Timer() |
| 148 | + filename = kwargs.pop('filename') |
| 149 | + filename = 'en-latest-pages-meta-history.xml.bz2' |
| 150 | + location = kwargs.pop('location') |
| 151 | + project = kwargs.pop('project') |
| 152 | + language = kwargs.pop('language') |
| 153 | + language_code = kwargs.pop('language_code') |
127 | 154 | ext = utils.determine_file_extension(filename) |
128 | 155 | if ext in settings.compression_extensions: |
129 | 156 | ext = '.%s' % ext |
— | — | @@ -135,9 +162,12 @@ |
136 | 163 | if retcode != 0: |
137 | 164 | sys.exit(retcode) |
138 | 165 | chunker.split_file(location, file, project, language_code, language) |
| 166 | + timer.elapsed() |
| 167 | + #settings.set_custom_settings(xml_namespace='http://www.mediawiki.org/xml/export-0.3/') |
139 | 168 | |
140 | 169 | |
141 | 170 | def launch_zip_extractor(args, location, file): |
| 171 | + timer = Timer() |
142 | 172 | path = settings.detect_installed_program('7zip') |
143 | 173 | source = os.path.join(location, file) |
144 | 174 | p = None |
— | — | @@ -150,30 +180,57 @@ |
151 | 181 | raise NotImplementedError |
152 | 182 | else: |
153 | 183 | raise exceptions.PlatformNotSupportedError |
| 184 | + timer.elapsed() |
154 | 185 | return p |
155 | 186 | |
156 | 187 | |
157 | | -def mongodb_script_launcher(args, location, filename, project, full_project, language_code, language): |
| 188 | +def extract_launcher(args, **kwargs): |
158 | 189 | print 'mongodb_script_launcher' |
159 | | - extract.run_parse_editors(project, language_code, location) |
| 190 | + timer = Timer() |
| 191 | + location = kwargs.pop('location') |
| 192 | + language_code = kwargs.pop('language_code') |
| 193 | + project = kwargs.pop('project') |
| 194 | + extract.run_parse_editors(location, **kwargs) |
| 195 | + timer.elapsed() |
160 | 196 | |
161 | 197 | |
162 | | -def sort_launcher(args, location, filename, project, full_project, language_code): |
163 | | - raise NotImplementedError |
| 198 | +def sort_launcher(args, **kwargs): |
| 199 | + timer = Timer() |
| 200 | + location = kwargs.pop('location') |
| 201 | + input = os.path.join(location, 'txt') |
| 202 | + output = os.path.join(location, 'sorted') |
| 203 | + dbname = kwargs.pop('full_project') |
| 204 | + loader.mergesort_launcher(input, output) |
| 205 | + filename = loader.mergesort_external_launcher(dbname, output, output) |
| 206 | + loader.store_editors(output, filename, dbname, 'editors') |
| 207 | + timer.elapsed() |
164 | 208 | |
165 | 209 | |
166 | | -def dataset_launcher(args, full_project): |
| 210 | +def transformer_launcher(args, **kwargs): |
167 | 211 | print 'dataset launcher' |
168 | | - optimize_editors.run_optimize_editors(project) |
169 | | - construct_datasets.generate_editor_dataset_launcher(project) |
| 212 | + timer = Timer() |
| 213 | + project = kwargs.pop('full_project') |
| 214 | + transformer.run_optimize_editors(project) |
| 215 | + timer.elapsed() |
170 | 216 | |
171 | 217 | |
172 | | -def all_launcher(args, location, filename, project, full_project, language_code, language): |
| 218 | +def exporter_launcher(args, **kwargs): |
| 219 | + timer = Timer() |
| 220 | + project = kwargs.pop('full_project') |
| 221 | + exporter.generate_editor_dataset_launcher(project) |
| 222 | + timer.elapsed() |
| 223 | + |
| 224 | + |
| 225 | +def all_launcher(args, **kwargs): |
173 | 226 | print 'all_launcher' |
174 | | - dump_downloader_launcher(args, location, filename, project, language_code) |
175 | | - split_settings.input_filename_launcher(args, location, filename, project, language_code) |
176 | | - mongodb_script_launcher(args, location, filename, project, language_code) |
177 | | - dataset_launcher(args, location, filename, project, language_code) |
| 227 | + timer = Timer() |
| 228 | + dump_downloader_launcher(args, **kwargs) |
| 229 | + chunker_launcher(args, **kwargs) |
| 230 | + extract_launcher(args, **kwargs) |
| 231 | + sort_launcher(args, **kwargs) |
| 232 | + transformer_launcher(args, **kwargs) |
| 233 | + exporter_launcher(args, **kwargs) |
| 234 | + timer.elapsed() |
178 | 235 | |
179 | 236 | |
180 | 237 | def supported_languages(): |
— | — | @@ -237,17 +294,20 @@ |
238 | 295 | parser_download.set_defaults(func=dump_downloader_launcher) |
239 | 296 | |
240 | 297 | parser_split = subparsers.add_parser('split', help='The split sub command splits the downloaded file in smaller chunks to parallelize extracting information.') |
241 | | - parser_split.set_defaults(func=cruncher_launcher) |
| 298 | + parser_split.set_defaults(func=chunker_launcher) |
242 | 299 | |
243 | 300 | parser_sort = subparsers.add_parser('sort', help='By presorting the data, significant processing time reducations are achieved.') |
244 | 301 | parser_sort.set_defaults(func=sort_launcher) |
245 | 302 | |
246 | | - parser_create = subparsers.add_parser('store', help='The store sub command parsers the XML chunk files, extracts the information and stores it in a MongoDB.') |
247 | | - parser_create.set_defaults(func=mongodb_script_launcher) |
| 303 | + parser_create = subparsers.add_parser('extract', help='The store sub command parsers the XML chunk files, extracts the information and stores it in a MongoDB.') |
| 304 | + parser_create.set_defaults(func=extract_launcher) |
248 | 305 | |
249 | | - parser_dataset = subparsers.add_parser('dataset', help='Create a dataset from the MongoDB and write it to a csv file.') |
250 | | - parser_dataset.set_defaults(func=dataset_launcher) |
| 306 | + parser_transform = subparsers.add_parser('transform', help='Transform the raw datatabe to an enriched dataset that can be exported.') |
| 307 | + parser_transform.set_defaults(func=transformer_launcher) |
251 | 308 | |
| 309 | + parser_dataset = subparsers.add_parser('export', help='Create a dataset from the MongoDB and write it to a csv file.') |
| 310 | + parser_dataset.set_defaults(func=exporter_launcher) |
| 311 | + |
252 | 312 | parser_all = subparsers.add_parser('all', help='The all sub command runs the download, split, store and dataset commands.\n\nWARNING: THIS COULD TAKE DAYS DEPENDING ON THE CONFIGURATION OF YOUR MACHINE AND THE SIZE OF THE WIKIMEDIA DUMP FILE.') |
253 | 313 | parser_all.set_defaults(func=all_launcher) |
254 | 314 | |
— | — | @@ -277,12 +337,14 @@ |
278 | 338 | detect_python_version() |
279 | 339 | about() |
280 | 340 | args = parser.parse_args() |
281 | | - config.load_configuration(args) |
| 341 | + if not os.path.exists('wiki.cfg'): |
| 342 | + config.create_configuration(settings, args) |
282 | 343 | locations = determine_file_locations(args) |
283 | | - #prepare_file_locations(locations['location']) |
284 | 344 | settings.verify_environment([locations['location']]) |
285 | 345 | show_settings(args, **locations) |
| 346 | + #locations['settings'] = settings |
286 | 347 | args.func(args, **locations) |
| 348 | + t1 = datetime.datetime.now() |
287 | 349 | |
288 | 350 | |
289 | 351 | if __name__ == '__main__': |
Index: trunk/tools/editor_trends/etl/optimize_editors.py |
— | — | @@ -1,174 +0,0 @@ |
2 | | -#!/usr/bin/python |
3 | | -# -*- coding: utf-8 -*- |
4 | | -''' |
5 | | -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
6 | | -This program is free software; you can redistribute it and/or |
7 | | -modify it under the terms of the GNU General Public License version 2 |
8 | | -as published by the Free Software Foundation. |
9 | | -This program is distributed in the hope that it will be useful, |
10 | | -but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | | -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
12 | | -See the GNU General Public License for more details, at |
13 | | -http://www.fsf.org/licenses/gpl.html |
14 | | -''' |
15 | | - |
16 | | -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
17 | | -__author__email = 'dvanliere at gmail dot com' |
18 | | -__date__ = '2010-11-02' |
19 | | -__version__ = '0.1' |
20 | | - |
21 | | -from multiprocessing import Queue |
22 | | -from Queue import Empty |
23 | | -from operator import itemgetter |
24 | | -import datetime |
25 | | -import sys |
26 | | - |
27 | | -sys.path.append('..') |
28 | | -import configuration |
29 | | -settings = configuration.Settings() |
30 | | -from database import db |
31 | | -from utils import process_constructor as pc |
32 | | -from utils import utils |
33 | | -import construct_datasets |
34 | | - |
35 | | - |
36 | | -try: |
37 | | - import psyco |
38 | | - psyco.full() |
39 | | -except ImportError: |
40 | | - pass |
41 | | - |
42 | | - |
43 | | -def create_datacontainer(init_value=0): |
44 | | - ''' |
45 | | - This function initializes an empty dictionary with as key the year (starting |
46 | | - 2001 and running through) and as value @init_value, in most cases this will |
47 | | - be zero so the dictionary will act as a running tally for a variable but |
48 | | - @init_value can also a list, [], or a dictionary, {}, or a set, set(). |
49 | | - ''' |
50 | | - data = {} |
51 | | - year = datetime.datetime.now().year + 1 |
52 | | - for x in xrange(2001, year): |
53 | | - data[str(x)] = init_value |
54 | | - return data |
55 | | - |
56 | | - |
57 | | -def add_months_to_datacontainer(datacontainer): |
58 | | - for dc in datacontainer: |
59 | | - datacontainer[dc] = {} |
60 | | - for x in xrange(1, 13): |
61 | | - datacontainer[dc][str(x)] = 0 |
62 | | - return datacontainer |
63 | | - |
64 | | - |
65 | | -def determine_edits_by_month(edits): |
66 | | - datacontainer = create_datacontainer(init_value=0) |
67 | | - datacontainer = add_months_to_datacontainer(datacontainer) |
68 | | - for year in edits: |
69 | | - months = set() |
70 | | - for edit in edits[year]: |
71 | | - m = str(edit['date'].month) |
72 | | - if m not in months: |
73 | | - datacontainer[year][m] = 1 |
74 | | - months.add(m) |
75 | | - if len(months) == 12: |
76 | | - break |
77 | | - return datacontainer |
78 | | - |
79 | | - |
80 | | -def determine_edits_by_year(dates): |
81 | | - ''' |
82 | | - This function counts the number of edits by year made by a particular editor. |
83 | | - ''' |
84 | | - edits = create_datacontainer() |
85 | | - for date in dates: |
86 | | - year = str(date['date'].year) |
87 | | - edits[year] += 1 |
88 | | - return edits |
89 | | - |
90 | | - |
91 | | -def determine_articles_by_year(dates): |
92 | | - ''' |
93 | | - This function counts the number of unique articles by year edited by a |
94 | | - particular editor. |
95 | | - ''' |
96 | | - articles = create_datacontainer(set()) |
97 | | - for date in dates: |
98 | | - year = str(date['date'].year) |
99 | | - articles[year].add(date['article']) |
100 | | - for article in articles: |
101 | | - articles[article] = len(articles[article]) |
102 | | - return articles |
103 | | - |
104 | | - |
105 | | -def sort_edits(edits): |
106 | | - edits = utils.merge_list(edits) |
107 | | - return sorted(edits, key=itemgetter('date')) |
108 | | - |
109 | | - |
110 | | -def optimize_editors(input_queue, result_queue, pbar, **kwargs): |
111 | | - dbname = kwargs.pop('dbname') |
112 | | - mongo = db.init_mongo_db(dbname) |
113 | | - input = mongo['test'] |
114 | | - output = mongo['dataset'] |
115 | | - output.ensure_index('editor') |
116 | | - output.ensure_index('year_joined') |
117 | | - definition = kwargs.pop('definition') |
118 | | - while True: |
119 | | - try: |
120 | | - id = input_queue.get(block=False) |
121 | | - editor = input.find_one({'editor': id}) |
122 | | - if editor == None: |
123 | | - continue |
124 | | - edits = editor['edits'] |
125 | | - monthly_edits = determine_edits_by_month(edits) |
126 | | - edits = sort_edits(edits) |
127 | | - edit_count = len(edits) |
128 | | - new_wikipedian = edits[9]['date'] |
129 | | - first_edit = edits[0]['date'] |
130 | | - final_edit = edits[-1]['date'] |
131 | | - edits_by_year = determine_edits_by_year(edits) |
132 | | - articles_by_year = determine_articles_by_year(edits) |
133 | | - |
134 | | - edits = edits[:10] |
135 | | - |
136 | | - output.insert({'editor': id, 'edits': edits, |
137 | | - 'edits_by_year': edits_by_year, |
138 | | - 'new_wikipedian': new_wikipedian, |
139 | | - 'edit_count': edit_count, |
140 | | - 'final_edit': final_edit, |
141 | | - 'first_edit': first_edit, |
142 | | - 'articles_by_year': articles_by_year, |
143 | | - 'monthly_edits': monthly_edits}) |
144 | | - print 'Items left: %s' % input_queue.qsize() |
145 | | - except Empty: |
146 | | - break |
147 | | - |
148 | | - |
149 | | -def run_optimize_editors(dbname): |
150 | | - ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors') |
151 | | - kwargs = {'definition': 'traditional', |
152 | | - 'pbar': True, |
153 | | - 'dbname': 'enwiki', |
154 | | - 'nr_input_processors': 1, |
155 | | - 'nr_output_processors': 0, |
156 | | - 'poison_pill': False |
157 | | - } |
158 | | - print len(ids) |
159 | | - ids = list(ids) |
160 | | - chunks = {0: ids} |
161 | | - pc.build_scaffolding(pc.load_queue, optimize_editors, chunks, False, False, **kwargs) |
162 | | - |
163 | | - |
164 | | -def debug_optimize_editors(dbname): |
165 | | - ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors') |
166 | | - q = pc.load_queue(ids) |
167 | | - kwargs = {'definition': 'traditional', |
168 | | - 'dbname': dbname |
169 | | - } |
170 | | - optimize_editors(q, False, True, kwargs) |
171 | | - |
172 | | - |
173 | | -if __name__ == '__main__': |
174 | | - #debug_optimize_editors('test') |
175 | | - run_optimize_editors('enwiki') |
\ No newline at end of file |
Index: trunk/tools/editor_trends/etl/construct_datasets.py |
— | — | @@ -1,256 +0,0 @@ |
2 | | -#!/usr/bin/python |
3 | | -# -*- coding: utf-8 -*- |
4 | | -''' |
5 | | -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
6 | | -This program is free software; you can redistribute it and/or |
7 | | -modify it under the terms of the GNU General Public License version 2 |
8 | | -as published by the Free Software Foundation. |
9 | | -This program is distributed in the hope that it will be useful, |
10 | | -but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | | -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
12 | | -See the GNU General Public License for more details, at |
13 | | -http://www.fsf.org/licenses/gpl.html |
14 | | -''' |
15 | | - |
16 | | -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
17 | | -__author__email = 'dvanliere at gmail dot com' |
18 | | -__date__ = '2010-10-21' |
19 | | -__version__ = '0.1' |
20 | | - |
21 | | -from multiprocessing import Queue |
22 | | -from Queue import Empty |
23 | | -import datetime |
24 | | -from dateutil.relativedelta import * |
25 | | -import sys |
26 | | -import progressbar |
27 | | - |
28 | | -sys.path.append('..') |
29 | | -import configuration |
30 | | -settings = configuration.Settings() |
31 | | -from utils import models, utils |
32 | | -from database import db |
33 | | -from utils import process_constructor as pc |
34 | | - |
35 | | -try: |
36 | | - import psyco |
37 | | - psyco.full() |
38 | | -except ImportError: |
39 | | - pass |
40 | | - |
41 | | - |
42 | | -def retrieve_editor_ids_mongo(dbname, collection): |
43 | | - if utils.check_file_exists(settings.binary_location, |
44 | | - 'editors.bin'): |
45 | | - ids = utils.load_object(settings.binary_location, |
46 | | - 'editors.bin') |
47 | | - else: |
48 | | - mongo = db.init_mongo_db(dbname) |
49 | | - editors = mongo[collection] |
50 | | - ids = editors.distinct('editor') |
51 | | - utils.store_object(ids, settings.binary_location, retrieve_editor_ids_mongo) |
52 | | - return ids |
53 | | - |
54 | | - |
55 | | -def expand_edits(edits): |
56 | | - data = [] |
57 | | - for edit in edits: |
58 | | - data.append(edit['date']) |
59 | | - return data |
60 | | - |
61 | | - |
62 | | -def expand_observations(obs, vars_to_expand): |
63 | | - for var in vars_to_expand: |
64 | | - if var == 'edits': |
65 | | - obs[var] = expand_edits(obs[var]) |
66 | | - elif var == 'edits_by_year': |
67 | | - keys = obs[var].keys() |
68 | | - keys.sort() |
69 | | - edits = [] |
70 | | - for key in keys: |
71 | | - edits.append(str(obs[var][key])) |
72 | | - obs[var] = edits |
73 | | - return obs |
74 | | - |
75 | | -def write_longitudinal_data(id, edits, fh): |
76 | | - years = edits.keys() |
77 | | - years.sort() |
78 | | - for year in years: |
79 | | - months = edits[year].keys() |
80 | | - months = [int(m) for m in months] |
81 | | - months.sort() |
82 | | - for m in months: |
83 | | - date = datetime.date(int(year), int(m), 1) |
84 | | - fh.write('%s\t%s\t%s\n' % (id, date, edits[year][str(m)])) |
85 | | - |
86 | | - |
87 | | -def expand_headers(headers, vars_to_expand, obs): |
88 | | - for var in vars_to_expand: |
89 | | - l = len(obs[var]) |
90 | | - pos = headers.index(var) |
91 | | - for i in xrange(l): |
92 | | - if var.endswith('year'): |
93 | | - suffix = 2001 + i |
94 | | - elif var.endswith('edits'): |
95 | | - suffix = 1 + i |
96 | | - headers.insert(pos + i, '%s_%s' % (var, suffix)) |
97 | | - headers.remove(var) |
98 | | - return headers |
99 | | - |
100 | | - |
101 | | -def generate_long_editor_dataset(input_queue, data_queue, pbar, **kwargs): |
102 | | - debug = kwargs.pop('debug') |
103 | | - dbname = kwargs.pop('dbname') |
104 | | - mongo = db.init_mongo_db(dbname) |
105 | | - editors = mongo['dataset'] |
106 | | - name = dbname + '_long_editors.csv' |
107 | | - fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding) |
108 | | - x = 0 |
109 | | - vars_to_expand = [] |
110 | | - while True: |
111 | | - try: |
112 | | - id = input_queue.get(block=False) |
113 | | - obs = editors.find_one({'editor': id}, {'monthly_edits': 1}) |
114 | | - if x == 0: |
115 | | - headers = obs.keys() |
116 | | - headers.sort() |
117 | | - headers = expand_headers(headers, vars_to_expand, obs) |
118 | | - utils.write_list_to_csv(headers, fh) |
119 | | - write_longitudinal_data(id, obs['monthly_edits'], fh) |
120 | | - #utils.write_list_to_csv(data, fh) |
121 | | - x += 1 |
122 | | - except Empty: |
123 | | - break |
124 | | - |
125 | | - |
126 | | -def generate_cohort_analysis(input_queue, data_queue, pbar, **kwargs): |
127 | | - dbname = kwargs.get('dbname') |
128 | | - pbar = kwargs.get('pbar') |
129 | | - mongo = db.init_mongo_db(dbname) |
130 | | - editors = mongo['dataset'] |
131 | | - year = datetime.datetime.now().year + 1 |
132 | | - begin = year - 2001 |
133 | | - p = [3, 6, 9] |
134 | | - periods = [y * 12 for y in xrange(1, begin)] |
135 | | - periods = p + periods |
136 | | - data = {} |
137 | | - while True: |
138 | | - try: |
139 | | - id = input_queue.get(block=False) |
140 | | - obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1}) |
141 | | - first_edit = obs['first_edit'] |
142 | | - last_edit = obs['final_edit'] |
143 | | - for y in xrange(2001, year): |
144 | | - if y == 2010 and first_edit > datetime.datetime(2010, 1, 1): |
145 | | - print 'debug' |
146 | | - if y not in data: |
147 | | - data[y] = {} |
148 | | - data[y]['n'] = 0 |
149 | | - window_end = datetime.datetime(y, 12, 31) |
150 | | - if window_end > datetime.datetime.now(): |
151 | | - now = datetime.datetime.now() |
152 | | - m = now.month - 1 #Dump files are always lagging at least one month.... |
153 | | - d = now.day |
154 | | - window_end = datetime.datetime(y, m, d) |
155 | | - edits = [] |
156 | | - for period in periods: |
157 | | - if period not in data[y]: |
158 | | - data[y][period] = 0 |
159 | | - window_start = datetime.datetime(y, 12, 31) - relativedelta(months=period) |
160 | | - if window_start < datetime.datetime(2001, 1, 1): |
161 | | - window_start = datetime.datetime(2001, 1, 1) |
162 | | - if date_falls_in_window(window_start, window_end, first_edit, last_edit): |
163 | | - edits.append(period) |
164 | | - if edits != []: |
165 | | - p = min(edits) |
166 | | - data[y]['n'] += 1 |
167 | | - data[y][p] += 1 |
168 | | - #pbar.update(+1) |
169 | | - except Empty: |
170 | | - break |
171 | | - utils.store_object(data, settings.binary_location, 'cohort_data') |
172 | | - |
173 | | -def date_falls_in_window(window_start, window_end, first_edit, last_edit): |
174 | | - if first_edit >= window_start and first_edit <= window_end: |
175 | | - return True |
176 | | - else: |
177 | | - return False |
178 | | - |
179 | | - |
180 | | -def generate_wide_editor_dataset(input_queue, data_queue, pbar, **kwargs): |
181 | | - dbname = kwargs.pop('dbname') |
182 | | - mongo = db.init_mongo_db(dbname) |
183 | | - editors = mongo['dataset'] |
184 | | - name = dbname + '_wide_editors.csv' |
185 | | - fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding) |
186 | | - x = 0 |
187 | | - vars_to_expand = ['edits', 'edits_by_year', 'articles_by_year'] |
188 | | - while True: |
189 | | - try: |
190 | | - if debug: |
191 | | - id = u'99797' |
192 | | - else: |
193 | | - id = input_queue.get(block=False) |
194 | | - print input_queue.qsize() |
195 | | - obs = editors.find_one({'editor': id}) |
196 | | - obs = expand_observations(obs, vars_to_expand) |
197 | | - if x == 0: |
198 | | - headers = obs.keys() |
199 | | - headers.sort() |
200 | | - headers = expand_headers(headers, vars_to_expand, obs) |
201 | | - utils.write_list_to_csv(headers, fh) |
202 | | - data = [] |
203 | | - keys = obs.keys() |
204 | | - keys.sort() |
205 | | - for key in keys: |
206 | | - data.append(obs[key]) |
207 | | - utils.write_list_to_csv(data, fh) |
208 | | - |
209 | | - x += 1 |
210 | | - except Empty: |
211 | | - break |
212 | | - fh.close() |
213 | | - |
214 | | - |
215 | | -def retrieve_edits_by_contributor_launcher(): |
216 | | - pc.build_scaffolding(pc.load_queue, retrieve_edits_by_contributor, 'contributors') |
217 | | - |
218 | | - |
219 | | -def debug_retrieve_edits_by_contributor_launcher(dbname): |
220 | | - kwargs = {'debug': False, |
221 | | - 'dbname': dbname, |
222 | | - } |
223 | | - ids = retrieve_editor_ids_mongo(dbname, 'editors') |
224 | | - input_queue = pc.load_queue(ids) |
225 | | - q = Queue() |
226 | | - generate_editor_dataset(input_queue, q, False, kwargs) |
227 | | - |
228 | | - |
229 | | -def generate_editor_dataset_launcher(dbname): |
230 | | - kwargs = {'nr_input_processors': 1, |
231 | | - 'nr_output_processors': 1, |
232 | | - 'debug': False, |
233 | | - 'dbname': dbname, |
234 | | - 'poison_pill':False, |
235 | | - 'pbar': True |
236 | | - } |
237 | | - ids = retrieve_editor_ids_mongo(dbname, 'editors') |
238 | | - ids = list(ids) |
239 | | - chunks = dict({0: ids}) |
240 | | - pc.build_scaffolding(pc.load_queue, generate_cohort_analysis, chunks, False, False, **kwargs) |
241 | | - |
242 | | - |
243 | | -def generate_editor_dataset_debug(dbname): |
244 | | - ids = retrieve_editor_ids_mongo(dbname, 'editors') |
245 | | - input_queue = pc.load_queue(ids) |
246 | | - kwargs = {'nr_input_processors': 1, |
247 | | - 'nr_output_processors': 1, |
248 | | - 'debug': True, |
249 | | - 'dbname': dbname, |
250 | | - } |
251 | | - generate_editor_dataset(input_queue, False, False, kwargs) |
252 | | - |
253 | | - |
254 | | -if __name__ == '__main__': |
255 | | - #generate_editor_dataset_debug('test') |
256 | | - generate_editor_dataset_launcher('enwiki') |
257 | | - #debug_retrieve_edits_by_contributor_launcher() |
Index: trunk/tools/editor_trends/etl/exporter.py |
— | — | @@ -0,0 +1,256 @@ |
| 2 | +#!/usr/bin/python |
| 3 | +# -*- coding: utf-8 -*- |
| 4 | +''' |
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
| 17 | +__author__email = 'dvanliere at gmail dot com' |
| 18 | +__date__ = '2010-10-21' |
| 19 | +__version__ = '0.1' |
| 20 | + |
| 21 | +from multiprocessing import Queue |
| 22 | +from Queue import Empty |
| 23 | +import datetime |
| 24 | +from dateutil.relativedelta import * |
| 25 | +import sys |
| 26 | +import progressbar |
| 27 | + |
| 28 | +sys.path.append('..') |
| 29 | +import configuration |
| 30 | +settings = configuration.Settings() |
| 31 | +from utils import models, utils |
| 32 | +from database import db |
| 33 | +from utils import process_constructor as pc |
| 34 | + |
| 35 | +try: |
| 36 | + import psyco |
| 37 | + psyco.full() |
| 38 | +except ImportError: |
| 39 | + pass |
| 40 | + |
| 41 | + |
| 42 | +def retrieve_editor_ids_mongo(dbname, collection): |
| 43 | + if utils.check_file_exists(settings.binary_location, |
| 44 | + 'editors.bin'): |
| 45 | + ids = utils.load_object(settings.binary_location, |
| 46 | + 'editors.bin') |
| 47 | + else: |
| 48 | + mongo = db.init_mongo_db(dbname) |
| 49 | + editors = mongo[collection] |
| 50 | + ids = editors.distinct('editor') |
| 51 | + utils.store_object(ids, settings.binary_location, retrieve_editor_ids_mongo) |
| 52 | + return ids |
| 53 | + |
| 54 | + |
| 55 | +def expand_edits(edits): |
| 56 | + data = [] |
| 57 | + for edit in edits: |
| 58 | + data.append(edit['date']) |
| 59 | + return data |
| 60 | + |
| 61 | + |
| 62 | +def expand_observations(obs, vars_to_expand): |
| 63 | + for var in vars_to_expand: |
| 64 | + if var == 'edits': |
| 65 | + obs[var] = expand_edits(obs[var]) |
| 66 | + elif var == 'edits_by_year': |
| 67 | + keys = obs[var].keys() |
| 68 | + keys.sort() |
| 69 | + edits = [] |
| 70 | + for key in keys: |
| 71 | + edits.append(str(obs[var][key])) |
| 72 | + obs[var] = edits |
| 73 | + return obs |
| 74 | + |
| 75 | +def write_longitudinal_data(id, edits, fh): |
| 76 | + years = edits.keys() |
| 77 | + years.sort() |
| 78 | + for year in years: |
| 79 | + months = edits[year].keys() |
| 80 | + months = [int(m) for m in months] |
| 81 | + months.sort() |
| 82 | + for m in months: |
| 83 | + date = datetime.date(int(year), int(m), 1) |
| 84 | + fh.write('%s\t%s\t%s\n' % (id, date, edits[year][str(m)])) |
| 85 | + |
| 86 | + |
| 87 | +def expand_headers(headers, vars_to_expand, obs): |
| 88 | + for var in vars_to_expand: |
| 89 | + l = len(obs[var]) |
| 90 | + pos = headers.index(var) |
| 91 | + for i in xrange(l): |
| 92 | + if var.endswith('year'): |
| 93 | + suffix = 2001 + i |
| 94 | + elif var.endswith('edits'): |
| 95 | + suffix = 1 + i |
| 96 | + headers.insert(pos + i, '%s_%s' % (var, suffix)) |
| 97 | + headers.remove(var) |
| 98 | + return headers |
| 99 | + |
| 100 | + |
| 101 | +def generate_long_editor_dataset(input_queue, data_queue, pbar, **kwargs): |
| 102 | + debug = kwargs.pop('debug') |
| 103 | + dbname = kwargs.pop('dbname') |
| 104 | + mongo = db.init_mongo_db(dbname) |
| 105 | + editors = mongo['dataset'] |
| 106 | + name = dbname + '_long_editors.csv' |
| 107 | + fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding) |
| 108 | + x = 0 |
| 109 | + vars_to_expand = [] |
| 110 | + while True: |
| 111 | + try: |
| 112 | + id = input_queue.get(block=False) |
| 113 | + obs = editors.find_one({'editor': id}, {'monthly_edits': 1}) |
| 114 | + if x == 0: |
| 115 | + headers = obs.keys() |
| 116 | + headers.sort() |
| 117 | + headers = expand_headers(headers, vars_to_expand, obs) |
| 118 | + utils.write_list_to_csv(headers, fh) |
| 119 | + write_longitudinal_data(id, obs['monthly_edits'], fh) |
| 120 | + #utils.write_list_to_csv(data, fh) |
| 121 | + x += 1 |
| 122 | + except Empty: |
| 123 | + break |
| 124 | + |
| 125 | + |
| 126 | +def generate_cohort_analysis(input_queue, data_queue, pbar, **kwargs): |
| 127 | + dbname = kwargs.get('dbname') |
| 128 | + pbar = kwargs.get('pbar') |
| 129 | + mongo = db.init_mongo_db(dbname) |
| 130 | + editors = mongo['dataset'] |
| 131 | + year = datetime.datetime.now().year + 1 |
| 132 | + begin = year - 2001 |
| 133 | + p = [3, 6, 9] |
| 134 | + periods = [y * 12 for y in xrange(1, begin)] |
| 135 | + periods = p + periods |
| 136 | + data = {} |
| 137 | + while True: |
| 138 | + try: |
| 139 | + id = input_queue.get(block=False) |
| 140 | + obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1}) |
| 141 | + first_edit = obs['first_edit'] |
| 142 | + last_edit = obs['final_edit'] |
| 143 | + for y in xrange(2001, year): |
| 144 | + if y == 2010 and first_edit > datetime.datetime(2010, 1, 1): |
| 145 | + print 'debug' |
| 146 | + if y not in data: |
| 147 | + data[y] = {} |
| 148 | + data[y]['n'] = 0 |
| 149 | + window_end = datetime.datetime(y, 12, 31) |
| 150 | + if window_end > datetime.datetime.now(): |
| 151 | + now = datetime.datetime.now() |
| 152 | + m = now.month - 1 #Dump files are always lagging at least one month.... |
| 153 | + d = now.day |
| 154 | + window_end = datetime.datetime(y, m, d) |
| 155 | + edits = [] |
| 156 | + for period in periods: |
| 157 | + if period not in data[y]: |
| 158 | + data[y][period] = 0 |
| 159 | + window_start = datetime.datetime(y, 12, 31) - relativedelta(months=period) |
| 160 | + if window_start < datetime.datetime(2001, 1, 1): |
| 161 | + window_start = datetime.datetime(2001, 1, 1) |
| 162 | + if date_falls_in_window(window_start, window_end, first_edit, last_edit): |
| 163 | + edits.append(period) |
| 164 | + if edits != []: |
| 165 | + p = min(edits) |
| 166 | + data[y]['n'] += 1 |
| 167 | + data[y][p] += 1 |
| 168 | + #pbar.update(+1) |
| 169 | + except Empty: |
| 170 | + break |
| 171 | + utils.store_object(data, settings.binary_location, 'cohort_data') |
| 172 | + |
| 173 | +def date_falls_in_window(window_start, window_end, first_edit, last_edit): |
| 174 | + if first_edit >= window_start and first_edit <= window_end: |
| 175 | + return True |
| 176 | + else: |
| 177 | + return False |
| 178 | + |
| 179 | + |
| 180 | +def generate_wide_editor_dataset(input_queue, data_queue, pbar, **kwargs): |
| 181 | + dbname = kwargs.pop('dbname') |
| 182 | + mongo = db.init_mongo_db(dbname) |
| 183 | + editors = mongo['dataset'] |
| 184 | + name = dbname + '_wide_editors.csv' |
| 185 | + fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding) |
| 186 | + x = 0 |
| 187 | + vars_to_expand = ['edits', 'edits_by_year', 'articles_by_year'] |
| 188 | + while True: |
| 189 | + try: |
| 190 | + if debug: |
| 191 | + id = u'99797' |
| 192 | + else: |
| 193 | + id = input_queue.get(block=False) |
| 194 | + print input_queue.qsize() |
| 195 | + obs = editors.find_one({'editor': id}) |
| 196 | + obs = expand_observations(obs, vars_to_expand) |
| 197 | + if x == 0: |
| 198 | + headers = obs.keys() |
| 199 | + headers.sort() |
| 200 | + headers = expand_headers(headers, vars_to_expand, obs) |
| 201 | + utils.write_list_to_csv(headers, fh) |
| 202 | + data = [] |
| 203 | + keys = obs.keys() |
| 204 | + keys.sort() |
| 205 | + for key in keys: |
| 206 | + data.append(obs[key]) |
| 207 | + utils.write_list_to_csv(data, fh) |
| 208 | + |
| 209 | + x += 1 |
| 210 | + except Empty: |
| 211 | + break |
| 212 | + fh.close() |
| 213 | + |
| 214 | + |
| 215 | +def retrieve_edits_by_contributor_launcher(): |
| 216 | + pc.build_scaffolding(pc.load_queue, retrieve_edits_by_contributor, 'contributors') |
| 217 | + |
| 218 | + |
| 219 | +def debug_retrieve_edits_by_contributor_launcher(dbname): |
| 220 | + kwargs = {'debug': False, |
| 221 | + 'dbname': dbname, |
| 222 | + } |
| 223 | + ids = retrieve_editor_ids_mongo(dbname, 'editors') |
| 224 | + input_queue = pc.load_queue(ids) |
| 225 | + q = Queue() |
| 226 | + generate_editor_dataset(input_queue, q, False, kwargs) |
| 227 | + |
| 228 | + |
| 229 | +def generate_editor_dataset_launcher(dbname): |
| 230 | + kwargs = {'nr_input_processors': 1, |
| 231 | + 'nr_output_processors': 1, |
| 232 | + 'debug': False, |
| 233 | + 'dbname': dbname, |
| 234 | + 'poison_pill':False, |
| 235 | + 'pbar': True |
| 236 | + } |
| 237 | + ids = retrieve_editor_ids_mongo(dbname, 'editors') |
| 238 | + ids = list(ids) |
| 239 | + chunks = dict({0: ids}) |
| 240 | + pc.build_scaffolding(pc.load_queue, generate_cohort_analysis, chunks, False, False, **kwargs) |
| 241 | + |
| 242 | + |
| 243 | +def generate_editor_dataset_debug(dbname): |
| 244 | + ids = retrieve_editor_ids_mongo(dbname, 'editors') |
| 245 | + input_queue = pc.load_queue(ids) |
| 246 | + kwargs = {'nr_input_processors': 1, |
| 247 | + 'nr_output_processors': 1, |
| 248 | + 'debug': True, |
| 249 | + 'dbname': dbname, |
| 250 | + } |
| 251 | + generate_editor_dataset(input_queue, False, False, kwargs) |
| 252 | + |
| 253 | + |
| 254 | +if __name__ == '__main__': |
| 255 | + #generate_editor_dataset_debug('test') |
| 256 | + generate_editor_dataset_launcher('enwiki') |
| 257 | + #debug_retrieve_edits_by_contributor_launcher() |
Property changes on: trunk/tools/editor_trends/etl/exporter.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 258 | + native |
Added: svn:mime-type |
2 | 259 | + text/plain |
Index: trunk/tools/editor_trends/etl/extract.py |
— | — | @@ -28,11 +28,12 @@ |
29 | 29 | import re |
30 | 30 | from operator import itemgetter |
31 | 31 | import xml.etree.cElementTree as cElementTree |
32 | | -from multiprocessing import Queue, JoinableQueue |
| 32 | +import multiprocessing |
33 | 33 | from Queue import Empty |
34 | 34 | import pymongo |
35 | 35 | |
36 | 36 | # Custom written files |
| 37 | +sys.path.append('..') |
37 | 38 | import configuration |
38 | 39 | settings = configuration.Settings() |
39 | 40 | from utils import utils, models |
— | — | @@ -50,7 +51,89 @@ |
51 | 52 | except ImportError: |
52 | 53 | pass |
53 | 54 | |
| 55 | +class XMLFileConsumer(models.BaseConsumer): |
54 | 56 | |
| 57 | + def run(self): |
| 58 | + while True: |
| 59 | + new_xmlfile = self.task_queue.get() |
| 60 | + self.task_queue.task_done() |
| 61 | + if new_xmlfile == None: |
| 62 | + print 'Swallowed a poison pill' |
| 63 | + break |
| 64 | + new_xmlfile() |
| 65 | + |
| 66 | +class XMLFile(object): |
| 67 | + def __init__(self, input, output, file, bots, **kwargs): |
| 68 | + self.file = file |
| 69 | + self.input = input |
| 70 | + self.output = output |
| 71 | + self.bots = bots |
| 72 | + for kw in kwargs: |
| 73 | + setattr(self, kw, kwargs[kw]) |
| 74 | + |
| 75 | + def create_file_handle(self): |
| 76 | + if self.destination == 'file': |
| 77 | + self.name = self.file[:-4] + '.txt' |
| 78 | + self.fh = utils.create_txt_filehandle(self.output, self.name, 'w', settings.encoding) |
| 79 | + |
| 80 | + def __str__(self): |
| 81 | + return '%s' % (self.file) |
| 82 | + |
| 83 | + def __call__(self): |
| 84 | + if settings.debug: |
| 85 | + messages = {} |
| 86 | + vars = {} |
| 87 | + |
| 88 | + data = xml.read_input(utils.create_txt_filehandle(self.input, |
| 89 | + self.file, 'r', |
| 90 | + encoding=settings.encoding)) |
| 91 | + self.create_file_handle() |
| 92 | + for raw_data in data: |
| 93 | + xml_buffer = cStringIO.StringIO() |
| 94 | + raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n') |
| 95 | + |
| 96 | + try: |
| 97 | + raw_data = ''.join(raw_data) |
| 98 | + xml_buffer.write(raw_data) |
| 99 | + elem = cElementTree.XML(xml_buffer.getvalue()) |
| 100 | + output_editor_information(elem, self.fh, bots=self.bots, destination=self.destination) |
| 101 | + except SyntaxError, error: |
| 102 | + print error |
| 103 | + ''' |
| 104 | + There are few cases with invalid tokens, they are ignored |
| 105 | + ''' |
| 106 | + if settings.debug: |
| 107 | + utils.track_errors(xml_buffer, error, self.file, messages) |
| 108 | + except UnicodeEncodeError, error: |
| 109 | + print error |
| 110 | + if settings.debug: |
| 111 | + utils.track_errors(xml_buffer, error, self.file, messages) |
| 112 | + except MemoryError, error: |
| 113 | + print self.file, error |
| 114 | + print raw_data[:12] |
| 115 | + print 'String was supposed to be %s characters long' % sum([len(raw) for raw in raw_data]) |
| 116 | + |
| 117 | + if self.destination == 'queue': |
| 118 | + self.output.put('NEXT') |
| 119 | + while True: |
| 120 | + if self.output.qsize() < 100000: |
| 121 | + break |
| 122 | + else: |
| 123 | + time.sleep(10) |
| 124 | + print 'Still sleeping, queue is %s items long' % self.output.qsize() |
| 125 | + |
| 126 | + else: |
| 127 | + self.fh.close() |
| 128 | + |
| 129 | + |
| 130 | + if self.destination == 'queue': |
| 131 | + data_queue.put(None) |
| 132 | + |
| 133 | + if settings.debug: |
| 134 | + utils.report_error_messages(messages, output_editor_information) |
| 135 | + |
| 136 | + |
| 137 | + |
55 | 138 | def determine_username_is_bot(username, kwargs): |
56 | 139 | ''' |
57 | 140 | @username is the xml element containing the id of the user |
— | — | @@ -70,6 +153,13 @@ |
71 | 154 | return 0 |
72 | 155 | |
73 | 156 | |
| 157 | +def extract_username(contributor, kwargs): |
| 158 | + for elem in contributor: |
| 159 | + if elem.tag == 'username': |
| 160 | + return elem.text #.encode(settings.encoding) |
| 161 | + else: |
| 162 | + return None |
| 163 | + |
74 | 164 | def extract_contributor_id(contributor, kwargs): |
75 | 165 | ''' |
76 | 166 | @contributor is the xml contributor node containing a number of attributes |
— | — | @@ -84,7 +174,7 @@ |
85 | 175 | for elem in contributor: |
86 | 176 | if elem.tag in tags: |
87 | 177 | if elem.text != None: |
88 | | - return elem.text.decode('utf-8') |
| 178 | + return elem.text.encode(settings.encoding) |
89 | 179 | else: |
90 | 180 | return - 1 |
91 | 181 | |
— | — | @@ -99,11 +189,13 @@ |
100 | 190 | this dictionary are the functions used to extract the data. |
101 | 191 | ''' |
102 | 192 | tags = {'contributor': {'editor': extract_contributor_id, |
103 | | - 'bot': determine_username_is_bot}, |
| 193 | + 'bot': determine_username_is_bot, |
| 194 | + 'username': extract_username, |
| 195 | + }, |
104 | 196 | 'timestamp': {'date': xml.extract_text}, |
105 | 197 | } |
106 | 198 | vars = {} |
107 | | - headers = ['editor', 'date', 'article'] |
| 199 | + headers = ['editor', 'date', 'article', 'username'] |
108 | 200 | destination = kwargs.pop('destination') |
109 | 201 | revisions = elem.findall('revision') |
110 | 202 | for revision in revisions: |
— | — | @@ -128,167 +220,100 @@ |
129 | 221 | vars = {} |
130 | 222 | |
131 | 223 | |
132 | | -def parse_editors(xml_queue, data_queue, **kwargs): |
133 | | - ''' |
134 | | - @xml_queue contains the filenames of the files to be parsed |
135 | | - @data_queue is an instance of Queue where the extracted data is stored for |
136 | | - further processing |
137 | | - @pbar is an instance of progressbar to display the progress |
138 | | - @bots is a list of id's of known Wikipedia bots |
139 | | - @debug is a flag to indicate whether the function is called for debugging. |
140 | | - |
141 | | - Output is the data_queue that will be used by store_editors() |
142 | | - ''' |
143 | | - input = kwargs.get('input', None) |
144 | | - output = kwargs.get('output', None) |
145 | | - debug = kwargs.get('debug', False) |
146 | | - destination = kwargs.get('destination', 'file') |
147 | | - bots = kwargs.get('bots', None) |
148 | | - pbar = kwargs.get('pbar', None) |
149 | | - if settings.debug: |
150 | | - messages = {} |
151 | | - vars = {} |
| 224 | +#def parse_editors(xml_queue, data_queue, **kwargs): |
| 225 | +# ''' |
| 226 | +# @xml_queue contains the filenames of the files to be parsed |
| 227 | +# @data_queue is an instance of Queue where the extracted data is stored for |
| 228 | +# further processing |
| 229 | +# @pbar is an instance of progressbar to display the progress |
| 230 | +# @bots is a list of id's of known Wikipedia bots |
| 231 | +# @debug is a flag to indicate whether the function is called for debugging. |
| 232 | +# |
| 233 | +# Output is the data_queue that will be used by store_editors() |
| 234 | +# ''' |
| 235 | +# input = kwargs.get('input', None) |
| 236 | +# output = kwargs.get('output', None) |
| 237 | +# debug = kwargs.get('debug', False) |
| 238 | +# destination = kwargs.get('destination', 'file') |
| 239 | +# bots = kwargs.get('bots', None) |
| 240 | +# pbar = kwargs.get('pbar', None) |
| 241 | +# if settings.debug: |
| 242 | +# messages = {} |
| 243 | +# vars = {} |
| 244 | +# |
| 245 | +# while True: |
| 246 | +# try: |
| 247 | +# if debug: |
| 248 | +# file = xml_queue |
| 249 | +# else: |
| 250 | +# file = xml_queue.get(block=False) |
| 251 | +# if file == None: |
| 252 | +# print 'Swallowed a poison pill' |
| 253 | +# break |
| 254 | +# |
| 255 | +# data = xml.read_input(utils.create_txt_filehandle(input, |
| 256 | +# file, 'r', |
| 257 | +# encoding=settings.encoding)) |
| 258 | +# if destination == 'file': |
| 259 | +# name = file[:-4] + '.txt' |
| 260 | +# fh = utils.create_txt_filehandle(output, name, 'w', settings.encoding) |
| 261 | +# for raw_data in data: |
| 262 | +# xml_buffer = cStringIO.StringIO() |
| 263 | +# raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n') |
| 264 | +# |
| 265 | +# try: |
| 266 | +# raw_data = ''.join(raw_data) |
| 267 | +# xml_buffer.write(raw_data) |
| 268 | +# elem = cElementTree.XML(xml_buffer.getvalue()) |
| 269 | +# output_editor_information(elem, fh, bots=bots, destination=destination) |
| 270 | +# except SyntaxError, error: |
| 271 | +# print error |
| 272 | +# ''' |
| 273 | +# There are few cases with invalid tokens, they are fixed |
| 274 | +# here and then reinserted into the XML DOM |
| 275 | +# data = convert_html_entities(xml_buffer.getvalue()) |
| 276 | +# elem = cElementTree.XML(data) |
| 277 | +# output_editor_information(elem) |
| 278 | +# ''' |
| 279 | +# if settings.debug: |
| 280 | +# utils.track_errors(xml_buffer, error, file, messages) |
| 281 | +# except UnicodeEncodeError, error: |
| 282 | +# print error |
| 283 | +# if settings.debug: |
| 284 | +# utils.track_errors(xml_buffer, error, file, messages) |
| 285 | +# except MemoryError, error: |
| 286 | +# print file, error |
| 287 | +# print raw_data[:12] |
| 288 | +# print 'String was supposed to be %s characters long' % sum([len(raw) for raw in raw_data]) |
| 289 | +# if destination == 'queue': |
| 290 | +# output.put('NEXT') |
| 291 | +# while True: |
| 292 | +# if output.qsize() < 100000: |
| 293 | +# break |
| 294 | +# else: |
| 295 | +# time.sleep(10) |
| 296 | +# print 'Still sleeping, queue is %s items long' % output.qsize() |
| 297 | +# |
| 298 | +# else: |
| 299 | +# fh.close() |
| 300 | +# |
| 301 | +# if pbar: |
| 302 | +# print file, xml_queue.qsize() |
| 303 | +# #utils.update_progressbar(pbar, xml_queue) |
| 304 | +# |
| 305 | +# if debug: |
| 306 | +# break |
| 307 | +# |
| 308 | +# except Empty: |
| 309 | +# break |
| 310 | +# |
| 311 | +# if destination == 'queue': |
| 312 | +# data_queue.put(None) |
| 313 | +# |
| 314 | +# if settings.debug: |
| 315 | +# utils.report_error_messages(messages, parse_editors) |
152 | 316 | |
153 | | - while True: |
154 | | - try: |
155 | | - if debug: |
156 | | - file = xml_queue |
157 | | - else: |
158 | | - file = xml_queue.get(block=False) |
159 | | - if file == None: |
160 | | - print 'Swallowed a poison pill' |
161 | | - break |
162 | 317 | |
163 | | - data = xml.read_input(utils.create_txt_filehandle(input, |
164 | | - file, 'r', |
165 | | - encoding=settings.encoding)) |
166 | | - if destination == 'file': |
167 | | - name = file[:-4] + '.txt' |
168 | | - fh = utils.create_txt_filehandle(output, name, 'w', settings.encoding) |
169 | | - for raw_data in data: |
170 | | - xml_buffer = cStringIO.StringIO() |
171 | | - raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n') |
172 | | - |
173 | | - try: |
174 | | - raw_data = ''.join(raw_data) |
175 | | - xml_buffer.write(raw_data) |
176 | | - elem = cElementTree.XML(xml_buffer.getvalue()) |
177 | | - output_editor_information(elem, fh, bots=bots, destination=destination) |
178 | | - except SyntaxError, error: |
179 | | - print error |
180 | | - ''' |
181 | | - There are few cases with invalid tokens, they are fixed |
182 | | - here and then reinserted into the XML DOM |
183 | | - data = convert_html_entities(xml_buffer.getvalue()) |
184 | | - elem = cElementTree.XML(data) |
185 | | - output_editor_information(elem) |
186 | | - ''' |
187 | | - if settings.debug: |
188 | | - utils.track_errors(xml_buffer, error, file, messages) |
189 | | - except UnicodeEncodeError, error: |
190 | | - print error |
191 | | - if settings.debug: |
192 | | - utils.track_errors(xml_buffer, error, file, messages) |
193 | | - except MemoryError, error: |
194 | | - print file, error |
195 | | - print raw_data[:12] |
196 | | - print 'String was supposed to be %s characters long' % sum([len(raw) for raw in raw_data]) |
197 | | - if destination == 'queue': |
198 | | - output.put('NEXT') |
199 | | - while True: |
200 | | - if output.qsize() < 100000: |
201 | | - break |
202 | | - else: |
203 | | - time.sleep(10) |
204 | | - print 'Still sleeping, queue is %s items long' % output.qsize() |
205 | | - |
206 | | - else: |
207 | | - fh.close() |
208 | | - |
209 | | - if pbar: |
210 | | - print file, xml_queue.qsize() |
211 | | - #utils.update_progressbar(pbar, xml_queue) |
212 | | - |
213 | | - if debug: |
214 | | - break |
215 | | - |
216 | | - except Empty: |
217 | | - break |
218 | | - |
219 | | - if destination == 'queue': |
220 | | - data_queue.put(None) |
221 | | - |
222 | | - if settings.debug: |
223 | | - utils.report_error_messages(messages, parse_editors) |
224 | | - |
225 | | - |
226 | | -def store_editors(data_queue, **kwargs): |
227 | | - ''' |
228 | | - @data_queue is an instance of Queue containing information extracted by |
229 | | - parse_editors() |
230 | | - @pids is a list of PIDs used to check if other processes are finished |
231 | | - running |
232 | | - @dbname is the name of the MongoDB collection where to store the information. |
233 | | - ''' |
234 | | - dbname = kwargs.get('dbname', None) |
235 | | - mongo = db.init_mongo_db(dbname) |
236 | | - collection = mongo['editors'] |
237 | | - mongo.collection.ensure_index('editor') |
238 | | - editor_cache = cache.EditorCache(collection) |
239 | | - |
240 | | - while True: |
241 | | - try: |
242 | | - edit = data_queue.get(block=False) |
243 | | - data_queue.task_done() |
244 | | - if edit == None: |
245 | | - print 'Swallowing poison pill' |
246 | | - break |
247 | | - elif edit == 'NEXT': |
248 | | - editor_cache.add('NEXT', '') |
249 | | - else: |
250 | | - contributor = edit['editor'] |
251 | | - value = {'date': edit['date'], 'article': edit['article']} |
252 | | - editor_cache.add(contributor, value) |
253 | | - #collection.update({'editor': contributor}, {'$push': {'edits': value}}, True) |
254 | | - #'$inc': {'edit_count': 1}, |
255 | | - |
256 | | - except Empty: |
257 | | - ''' |
258 | | - This checks whether the Queue is empty because the preprocessors are |
259 | | - finished or because this function is faster in emptying the Queue |
260 | | - then the preprocessors are able to fill it. If the preprocessors |
261 | | - are finished and this Queue is empty than break, else wait for the |
262 | | - Queue to fill. |
263 | | - ''' |
264 | | - pass |
265 | | - |
266 | | - print 'Emptying entire cache.' |
267 | | - editor_cache.store() |
268 | | - print 'Time elapsed: %s and processed %s items.' % (datetime.datetime.now() - editor_cache.init_time, editor_cache.cumulative_n) |
269 | | - |
270 | | - |
271 | | -def load_cache_objects(): |
272 | | - cache = {} |
273 | | - files = utils.retrieve_file_list(settings.binary_location, '.bin') |
274 | | - for x, file in enumerate(files): |
275 | | - cache[x] = utils.load_object(settings.binary_location, file) |
276 | | - return cache |
277 | | - |
278 | | - |
279 | | -def search_cache_for_missed_editors(dbname): |
280 | | - mongo = db.init_mongo_db(dbname) |
281 | | - collection = mongo['editors'] |
282 | | - editor_cache = cache.EditorCache(collection) |
283 | | - cache = load_cache_objects() |
284 | | - for c in cache: |
285 | | - for editor in cache[c]: |
286 | | - editor_cache.add(editor, cache[c][editor]) |
287 | | - cache[c] = {} |
288 | | - editor_cache.add('NEXT', '') |
289 | | - cache = {} |
290 | | - |
291 | | - |
292 | | - |
293 | 318 | def load_bot_ids(): |
294 | 319 | ''' |
295 | 320 | Loader function to retrieve list of id's of known Wikipedia bots. |
— | — | @@ -302,30 +327,32 @@ |
303 | 328 | return ids |
304 | 329 | |
305 | 330 | |
306 | | -def run_parse_editors(location, language, project): |
307 | | - ids = load_bot_ids() |
308 | | - base = os.path.join(location, language, project) |
309 | | - input = os.path.join(base, 'chunks') |
310 | | - output = os.path.join(base, 'txt') |
| 331 | +def run_parse_editors(location, **kwargs): |
| 332 | + bots = load_bot_ids() |
| 333 | + input = os.path.join(location, 'chunks') |
| 334 | + output = os.path.join(location, 'txt') |
311 | 335 | settings.verify_environment([input, output]) |
312 | 336 | files = utils.retrieve_file_list(input, 'xml') |
313 | | - |
314 | | - kwargs = {'bots': ids, |
315 | | - 'dbname': language + project, |
316 | | - 'language': language, |
317 | | - 'project': project, |
318 | | - 'pbar': True, |
319 | | - 'destination': 'file', |
320 | | - 'nr_input_processors': settings.number_of_processes, |
321 | | - 'nr_output_processors': settings.number_of_processes, |
322 | | - 'input': input, |
323 | | - 'output': output, |
324 | | - } |
| 337 | + kwargs['destination'] = 'file' |
325 | 338 | |
326 | | - chunks = utils.split_list(files, settings.number_of_processes) |
327 | | - pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, **kwargs) |
| 339 | + |
| 340 | + tasks = multiprocessing.JoinableQueue() |
| 341 | + consumers = [XMLFileConsumer(tasks, None) for i in xrange(settings.number_of_processes)] |
| 342 | + for file in files: |
| 343 | + tasks.put(XMLFile(input, output, file, bots, **kwargs)) |
| 344 | + for x in xrange(settings.number_of_processes): |
| 345 | + tasks.put(None) |
328 | 346 | |
| 347 | + print tasks.qsize() |
| 348 | + for w in consumers: |
| 349 | + w.start() |
329 | 350 | |
| 351 | + tasks.join() |
| 352 | + |
| 353 | + #chunks = utils.split_list(files, settings.number_of_processes) |
| 354 | + #pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, **kwargs) |
| 355 | + |
| 356 | + |
330 | 357 | def debug_parse_editors(dbname): |
331 | 358 | q = JoinableQueue() |
332 | 359 | parse_editors('522.xml', q, None, None, debug=True, destination='file') |
— | — | @@ -334,5 +361,4 @@ |
335 | 362 | |
336 | 363 | if __name__ == "__main__": |
337 | 364 | #debug_parse_editors('test2') |
338 | | - run_parse_editors(settings.input_location, 'en', 'wiki') |
339 | | - pass |
| 365 | + run_parse_editors(os.path.join(settings.input_location, 'en', 'wiki')) |
Index: trunk/tools/editor_trends/etl/store.py |
— | — | @@ -0,0 +1,97 @@ |
| 2 | +#!/usr/bin/python |
| 3 | +# -*- coding: utf-8 -*- |
| 4 | +''' |
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
| 17 | +__author__email = 'dvanliere at gmail dot com' |
| 18 | +__date__ = '2010-11-19' |
| 19 | +__version__ = '0.1' |
| 20 | + |
| 21 | + |
| 22 | +from Queue import Empty |
| 23 | +import datetime |
| 24 | +import sys |
| 25 | +sys.path.append('..') |
| 26 | + |
| 27 | +import configuration |
| 28 | +settings = configuration.Settings() |
| 29 | + |
| 30 | +from database import cache |
| 31 | + |
| 32 | + |
| 33 | +def store_editors(data_queue, **kwargs): |
| 34 | + ''' |
| 35 | + @data_queue is an instance of Queue containing information extracted by |
| 36 | + parse_editors() |
| 37 | + @pids is a list of PIDs used to check if other processes are finished |
| 38 | + running |
| 39 | + @dbname is the name of the MongoDB collection where to store the information. |
| 40 | + ''' |
| 41 | + dbname = kwargs.get('dbname', None) |
| 42 | + mongo = db.init_mongo_db(dbname) |
| 43 | + collection = mongo['editors'] |
| 44 | + mongo[collection].ensure_index('editor') |
| 45 | + editor_cache = cache.EditorCache(collection) |
| 46 | + |
| 47 | + while True: |
| 48 | + try: |
| 49 | + edit = data_queue.get(block=False) |
| 50 | + data_queue.task_done() |
| 51 | + if edit == None: |
| 52 | + print 'Swallowing poison pill' |
| 53 | + break |
| 54 | + elif edit == 'NEXT': |
| 55 | + editor_cache.add('NEXT', '') |
| 56 | + else: |
| 57 | + contributor = edit['editor'] |
| 58 | + value = {'date': edit['date'], 'article': edit['article']} |
| 59 | + editor_cache.add(contributor, value) |
| 60 | + #collection.update({'editor': contributor}, {'$push': {'edits': value}}, True) |
| 61 | + #'$inc': {'edit_count': 1}, |
| 62 | + |
| 63 | + except Empty: |
| 64 | + ''' |
| 65 | + This checks whether the Queue is empty because the preprocessors are |
| 66 | + finished or because this function is faster in emptying the Queue |
| 67 | + then the preprocessors are able to fill it. If the preprocessors |
| 68 | + are finished and this Queue is empty than break, else wait for the |
| 69 | + Queue to fill. |
| 70 | + ''' |
| 71 | + pass |
| 72 | + |
| 73 | + print 'Emptying entire cache.' |
| 74 | + editor_cache.store() |
| 75 | + print 'Time elapsed: %s and processed %s items.' % (datetime.datetime.now() - editor_cache.init_time, editor_cache.cumulative_n) |
| 76 | + |
| 77 | + |
| 78 | +def load_cache_objects(): |
| 79 | + cache = {} |
| 80 | + files = utils.retrieve_file_list(settings.binary_location, '.bin') |
| 81 | + for x, file in enumerate(files): |
| 82 | + cache[x] = utils.load_object(settings.binary_location, file) |
| 83 | + return cache |
| 84 | + |
| 85 | + |
| 86 | +def search_cache_for_missed_editors(dbname): |
| 87 | + mongo = db.init_mongo_db(dbname) |
| 88 | + collection = mongo['editors'] |
| 89 | + editor_cache = cache.EditorCache(collection) |
| 90 | + cache = load_cache_objects() |
| 91 | + for c in cache: |
| 92 | + for editor in cache[c]: |
| 93 | + editor_cache.add(editor, cache[c][editor]) |
| 94 | + cache[c] = {} |
| 95 | + editor_cache.add('NEXT', '') |
| 96 | + cache = {} |
| 97 | + |
| 98 | + |
Property changes on: trunk/tools/editor_trends/etl/store.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 99 | + native |
Index: trunk/tools/editor_trends/etl/chunker.py |
— | — | @@ -112,10 +112,10 @@ |
113 | 113 | return True |
114 | 114 | |
115 | 115 | |
116 | | -def write_xml_file(element, fh, counter, language): |
| 116 | +def write_xml_file(element, fh, output, counter): |
117 | 117 | '''Get file handle and write xml element to file''' |
118 | 118 | size = len(cElementTree.tostring(element)) |
119 | | - fh, counter = create_file_handle(fh, counter, size, language) |
| 119 | + fh, counter = create_file_handle(fh, output, counter, size) |
120 | 120 | try: |
121 | 121 | fh.write(cElementTree.tostring(element)) |
122 | 122 | except MemoryError: |
— | — | @@ -124,15 +124,15 @@ |
125 | 125 | return fh, counter |
126 | 126 | |
127 | 127 | |
128 | | -def create_file_handle(fh, counter, size, language): |
| 128 | +def create_file_handle(fh, output, counter, size): |
129 | 129 | '''Create file handle if none is supplied or if file size > max file size.''' |
130 | 130 | if not counter: |
131 | 131 | counter = 0 |
132 | | - path = os.path.join(settings.input_location, language, '%s.xml' % counter) |
| 132 | + path = os.path.join(output, '%s.xml' % counter) |
133 | 133 | if not fh: |
134 | 134 | fh = codecs.open(path, 'w', encoding=settings.encoding) |
135 | 135 | return fh, counter |
136 | | - elif (fh.tell() + size) > settings.binary_location: |
| 136 | + elif (fh.tell() + size) > settings.max_settings_xmlfile_size: |
137 | 137 | print 'Created chunk %s' % counter |
138 | 138 | fh.close |
139 | 139 | counter += 1 |
— | — | @@ -156,10 +156,11 @@ |
157 | 157 | return flat |
158 | 158 | |
159 | 159 | |
160 | | -def split_file(output, input, project, language_code, language, format='xml'): |
| 160 | +def split_file(location, file, project, language_code, language, format='xml'): |
161 | 161 | '''Reads xml file and splits it in N chunks''' |
162 | 162 | #location = os.path.join(settings.input_location, language) |
163 | | - output = os.path.join(output, language_code, project) |
| 163 | + input = os.path.join(location, file) |
| 164 | + output = os.path.join(location, 'chunks') |
164 | 165 | settings.verify_environment([output]) |
165 | 166 | if format == 'xml': |
166 | 167 | fh = None |
— | — | @@ -170,15 +171,12 @@ |
171 | 172 | ns = load_namespace(language_code) |
172 | 173 | ns = build_namespaces_locale(ns) |
173 | 174 | |
174 | | - |
| 175 | + settings.xml_namespace = 'http://www.mediawiki.org/xml/export-0.3/' |
175 | 176 | counter = None |
176 | 177 | tag = '{%s}page' % settings.xml_namespace |
177 | | - |
178 | | - |
179 | 178 | context = cElementTree.iterparse(input, events=('start', 'end')) |
180 | 179 | context = iter(context) |
181 | 180 | event, root = context.next() #get the root element of the XML doc |
182 | | - |
183 | 181 | try: |
184 | 182 | for event, elem in context: |
185 | 183 | if event == 'end': |
— | — | @@ -188,7 +186,7 @@ |
189 | 187 | page = elem.find('id').text |
190 | 188 | elem = parse_comments(elem, remove_numeric_character_references) |
191 | 189 | if format == 'xml': |
192 | | - fh, counter = write_settings.input_filename(elem, fh, counter, language_code) |
| 190 | + fh, counter = write_xml_file(elem, fh, output, counter) |
193 | 191 | else: |
194 | 192 | data = [el.getchildren() for el in elem if el.tag == 'revision'] |
195 | 193 | data = flatten_xml_elements(data, page) |
Index: trunk/tools/editor_trends/etl/transformer.py |
— | — | @@ -0,0 +1,215 @@ |
| 2 | +#!/usr/bin/python |
| 3 | +# -*- coding: utf-8 -*- |
| 4 | +''' |
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
| 17 | +__author__email = 'dvanliere at gmail dot com' |
| 18 | +__date__ = '2010-11-02' |
| 19 | +__version__ = '0.1' |
| 20 | + |
| 21 | +import multiprocessing |
| 22 | +from Queue import Empty |
| 23 | +from operator import itemgetter |
| 24 | +import datetime |
| 25 | +import sys |
| 26 | + |
| 27 | +sys.path.append('..') |
| 28 | +import configuration |
| 29 | +settings = configuration.Settings() |
| 30 | +from database import db |
| 31 | +from utils import process_constructor as pc |
| 32 | +from utils import utils |
| 33 | +from utils import models |
| 34 | +import construct_datasets |
| 35 | + |
| 36 | + |
| 37 | +try: |
| 38 | + import psyco |
| 39 | + psyco.full() |
| 40 | +except ImportError: |
| 41 | + pass |
| 42 | + |
| 43 | + |
| 44 | +class EditorConsumer(models.BaseConsumer): |
| 45 | + |
| 46 | + def run(self): |
| 47 | + while True: |
| 48 | + new_editor = self.task_queue.get() |
| 49 | + self.task_queue.task_done() |
| 50 | + if new_editor == None: |
| 51 | + break |
| 52 | + new_editor() |
| 53 | + |
| 54 | + |
| 55 | +class Editor(object): |
| 56 | + def __init__(self, dbname, id, **kwargs): |
| 57 | + self.dbname = dbname |
| 58 | + self.id = id |
| 59 | + for kw in kwargs: |
| 60 | + setattr(self, kw, kwargs[kw]) |
| 61 | + |
| 62 | + def __str__(self): |
| 63 | + return '%s' % (self.id) |
| 64 | + # mongo = db.init_mongo_db(dbname) |
| 65 | + # input = mongo[dbname] |
| 66 | + # output = mongo['dataset'] |
| 67 | + # output.ensure_index('editor') |
| 68 | + # output.ensure_index('year_joined') |
| 69 | + |
| 70 | + def __call__(self): |
| 71 | + self.mongo = db.init_mongo_db(self.dbname) |
| 72 | + input_db = self.mongo['editors'] |
| 73 | + output_db = self.mongo['dataset'] |
| 74 | + |
| 75 | + output_db.ensure_index('editor') |
| 76 | + output_db.create_index('editor') |
| 77 | + |
| 78 | + editor = input_db.find_one({'editor': self.id}) |
| 79 | + if editor == None: |
| 80 | + return |
| 81 | + edits = editor['edits'] |
| 82 | + username = editor['username'] |
| 83 | + monthly_edits = determine_edits_by_month(edits) |
| 84 | + edits = sort_edits(edits) |
| 85 | + edit_count = len(edits) |
| 86 | + new_wikipedian = edits[9]['date'] |
| 87 | + first_edit = edits[0]['date'] |
| 88 | + final_edit = edits[-1]['date'] |
| 89 | + edits_by_year = determine_edits_by_year(edits) |
| 90 | + articles_by_year = determine_articles_by_year(edits) |
| 91 | + edits = edits[:10] |
| 92 | + output_db.insert({'editor': self.id, |
| 93 | + 'edits': edits, |
| 94 | + 'edits_by_year': edits_by_year, |
| 95 | + 'new_wikipedian': new_wikipedian, |
| 96 | + 'edit_count': edit_count, |
| 97 | + 'final_edit': final_edit, |
| 98 | + 'first_edit': first_edit, |
| 99 | + 'articles_by_year': articles_by_year, |
| 100 | + 'monthly_edits': monthly_edits, |
| 101 | + 'username': username |
| 102 | + }) |
| 103 | + |
| 104 | +def create_datacontainer(init_value=0): |
| 105 | + ''' |
| 106 | + This function initializes an empty dictionary with as key the year (starting |
| 107 | + 2001 and running through) and as value @init_value, in most cases this will |
| 108 | + be zero so the dictionary will act as a running tally for a variable but |
| 109 | + @init_value can also a list, [], or a dictionary, {}, or a set, set(). |
| 110 | + ''' |
| 111 | + data = {} |
| 112 | + year = datetime.datetime.now().year + 1 |
| 113 | + for x in xrange(2001, year): |
| 114 | + if init_value == 'set': |
| 115 | + data[str(x)] = set() |
| 116 | + else: |
| 117 | + data[str(x)] = init_value |
| 118 | + return data |
| 119 | + |
| 120 | + |
| 121 | +def add_months_to_datacontainer(datacontainer): |
| 122 | + for dc in datacontainer: |
| 123 | + datacontainer[dc] = {} |
| 124 | + for x in xrange(1, 13): |
| 125 | + datacontainer[dc][str(x)] = 0 |
| 126 | + return datacontainer |
| 127 | + |
| 128 | + |
| 129 | +def determine_edits_by_month(edits): |
| 130 | + datacontainer = create_datacontainer(init_value=0) |
| 131 | + datacontainer = add_months_to_datacontainer(datacontainer) |
| 132 | + for year in edits: |
| 133 | + months = set() |
| 134 | + for edit in edits[year]: |
| 135 | + m = str(edit['date'].month) |
| 136 | + if m not in months: |
| 137 | + datacontainer[year][m] = 1 |
| 138 | + months.add(m) |
| 139 | + if len(months) == 12: |
| 140 | + break |
| 141 | + return datacontainer |
| 142 | + |
| 143 | + |
| 144 | +def determine_edits_by_year(dates): |
| 145 | + ''' |
| 146 | + This function counts the number of edits by year made by a particular editor. |
| 147 | + ''' |
| 148 | + edits = create_datacontainer() |
| 149 | + for date in dates: |
| 150 | + year = str(date['date'].year) |
| 151 | + edits[year] += 1 |
| 152 | + return edits |
| 153 | + |
| 154 | + |
| 155 | +def determine_articles_by_year(dates): |
| 156 | + ''' |
| 157 | + This function counts the number of unique articles by year edited by a |
| 158 | + particular editor. |
| 159 | + ''' |
| 160 | + articles = create_datacontainer('set') |
| 161 | + for date in dates: |
| 162 | + year = str(date['date'].year) |
| 163 | + articles[year].add(date['article']) |
| 164 | + for year in articles: |
| 165 | + articles[year] = len(articles[year]) |
| 166 | + return articles |
| 167 | + |
| 168 | + |
| 169 | +def sort_edits(edits): |
| 170 | + edits = utils.merge_list(edits) |
| 171 | + return sorted(edits, key=itemgetter('date')) |
| 172 | + |
| 173 | +#def optimize_editors(input_queue, result_queue, pbar, **kwargs): |
| 174 | +# dbname = kwargs.pop('dbname') |
| 175 | +# mongo = db.init_mongo_db(dbname) |
| 176 | +# input = mongo[dbname] |
| 177 | +# output = mongo['dataset'] |
| 178 | +# output.ensure_index('editor') |
| 179 | +# output.ensure_index('year_joined') |
| 180 | +# definition = kwargs.pop('definition') |
| 181 | + |
| 182 | + |
| 183 | +def run_optimize_editors(dbname): |
| 184 | + ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors') |
| 185 | + kwargs = {'definition': 'traditional', |
| 186 | + 'pbar': True, |
| 187 | + } |
| 188 | + #input_db = db.init_mongo_db(dbname) |
| 189 | + #output_db = db.init_mongo_db('dataset') |
| 190 | + tasks = multiprocessing.JoinableQueue() |
| 191 | + consumers = [EditorConsumer(tasks, None) for i in xrange(settings.number_of_processes)] |
| 192 | + |
| 193 | + for id in ids: |
| 194 | + tasks.put(Editor(dbname, id)) |
| 195 | + for x in xrange(settings.number_of_processes): |
| 196 | + tasks.put(None) |
| 197 | + |
| 198 | + print tasks.qsize() |
| 199 | + for w in consumers: |
| 200 | + w.start() |
| 201 | + |
| 202 | + tasks.join() |
| 203 | + |
| 204 | + |
| 205 | +def debug_optimize_editors(dbname): |
| 206 | + ids = construct_datasets.retrieve_editor_ids_mongo(dbname, 'editors') |
| 207 | + q = pc.load_queue(ids) |
| 208 | + kwargs = {'definition': 'traditional', |
| 209 | + 'dbname': dbname |
| 210 | + } |
| 211 | + optimize_editors(q, False, True, kwargs) |
| 212 | + |
| 213 | + |
| 214 | +if __name__ == '__main__': |
| 215 | + #debug_optimize_editors('test') |
| 216 | + run_optimize_editors('enwiki') |
Property changes on: trunk/tools/editor_trends/etl/transformer.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 217 | + native |
Index: trunk/tools/editor_trends/etl/loader.py |
— | — | @@ -32,23 +32,21 @@ |
33 | 33 | |
34 | 34 | |
35 | 35 | |
36 | | -def store_editors(input, filename, dbname): |
| 36 | +def store_editors(input, filename, dbname, collection): |
37 | 37 | fh = utils.create_txt_filehandle(input, filename, 'r', settings.encoding) |
38 | 38 | mongo = db.init_mongo_db(dbname) |
39 | | - collection = mongo['test'] |
40 | | - mongo.collection.ensure_index('editor') |
41 | | - mongo.collection.create_index('editor') |
| 39 | + collection = mongo[collection] |
| 40 | + collection.ensure_index('editor') |
| 41 | + collection.create_index('editor') |
42 | 42 | editor_cache = cache.EditorCache(collection) |
43 | 43 | prev_contributor = -1 |
44 | 44 | x = 0 |
45 | 45 | edits = 0 |
46 | 46 | editors = set() |
47 | | - for line in readline(fh): |
| 47 | + for line in sort.readline(fh): |
48 | 48 | if len(line) == 0: |
49 | 49 | continue |
50 | 50 | contributor = int(line[0]) |
51 | | - if contributor == 5767932: |
52 | | - print 'debug' |
53 | 51 | if prev_contributor != contributor: |
54 | 52 | if edits >= 10: |
55 | 53 | result = editor_cache.add(prev_contributor, 'NEXT') |
— | — | @@ -61,9 +59,11 @@ |
62 | 60 | editor_cache.clear(prev_contributor) |
63 | 61 | edits = 0 |
64 | 62 | edits += 1 |
65 | | - date = utils.convert_timestamp_to_date(line[1]) #+ datetime.timedelta(days=1) |
| 63 | + date = utils.convert_timestamp_to_datetime_utc(line[1]) #+ datetime.timedelta(days=1) |
66 | 64 | article_id = int(line[2]) |
67 | | - value = {'date': date, 'article': article_id} |
| 65 | + username = line[3].encode(settings.encoding) |
| 66 | + #print line[3] |
| 67 | + value = {'date': date, 'article': article_id, 'username': username} |
68 | 68 | editor_cache.add(contributor, value) |
69 | 69 | prev_contributor = contributor |
70 | 70 | fh.close() |
— | — | @@ -80,48 +80,42 @@ |
81 | 81 | chunks = utils.split_list(files, int(x)) |
82 | 82 | '''1st iteration external mergesort''' |
83 | 83 | for chunk in chunks: |
84 | | - #filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.encoding) for file in chunks[chunk]] |
85 | | - #filename = sort.merge_sorted_files(output, filehandles, chunk) |
86 | | - #filehandles = [fh.close() for fh in filehandles] |
87 | | - pass |
| 84 | + filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.encoding) for file in chunks[chunk]] |
| 85 | + filename = sort.merge_sorted_files(output, filehandles, chunk) |
| 86 | + filehandles = [fh.close() for fh in filehandles] |
| 87 | +# pass |
88 | 88 | '''2nd iteration external mergesort, if necessary''' |
89 | 89 | if len(chunks) > 1: |
90 | 90 | files = utils.retrieve_file_list(output, 'txt', mask='[merged]') |
91 | 91 | filehandles = [utils.create_txt_filehandle(output, file, 'r', settings.encoding) for file in files] |
92 | 92 | filename = sort.merge_sorted_files(output, filehandles, 'final') |
93 | 93 | filehandles = [fh.close() for fh in filehandles] |
94 | | - filename = 'merged_final.txt' |
95 | | - store_editors(output, filename, dbname) |
| 94 | + filename = 'merged_final.txt' |
| 95 | + return filename |
96 | 96 | |
97 | 97 | |
98 | | -def mergesort_feeder(input_queue, result_queue, **kwargs): |
| 98 | +def mergesort_feeder(task_queue, **kwargs): |
99 | 99 | input = kwargs.get('input', None) |
100 | 100 | output = kwargs.get('output', None) |
101 | | - while True: |
102 | | - try: |
103 | | - file = input_queue.get(block=False) |
104 | | - fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding) |
105 | | - data = fh.readlines() |
106 | | - fh.close() |
107 | | - data = [d.replace('\n', '') for d in data] |
108 | | - data = [d.split('\t') for d in data] |
109 | | - sorted_data = sort.mergesort(data) |
110 | | - sort.write_sorted_file(sorted_data, file, output) |
111 | | - except Empty: |
112 | | - break |
| 101 | + #while True: |
| 102 | + # try: |
| 103 | + #file = input_queue.get(block=False) |
| 104 | + for file in task_queue: |
| 105 | + fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding) |
| 106 | + data = fh.readlines() |
| 107 | + fh.close() |
| 108 | + data = [d.replace('\n', '') for d in data] |
| 109 | + data = [d.split('\t') for d in data] |
| 110 | + sorted_data = sort.mergesort(data) |
| 111 | + sort.write_sorted_file(sorted_data, file, output) |
113 | 112 | |
114 | 113 | |
115 | 114 | def mergesort_launcher(input, output): |
116 | | - kwargs = {'pbar': True, |
117 | | - 'nr_input_processors': settings.number_of_processes, |
118 | | - 'nr_output_processors': settings.number_of_processes, |
119 | | - 'input': input, |
120 | | - 'output': output, |
121 | | - 'poison_pill': False |
122 | | - } |
| 115 | + settings.verify_environment([input, output]) |
123 | 116 | files = utils.retrieve_file_list(input, 'txt') |
124 | | - chunks = utils.split_list(files, settings.number_of_processes) |
125 | | - pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs) |
| 117 | + mergesort_feeder(files, input=input, output=output) |
| 118 | + #chunks = utils.split_list(files, settings.number_of_processes) |
| 119 | + #pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs) |
126 | 120 | |
127 | 121 | |
128 | 122 | def debug_mergesort_feeder(input, output): |
Index: trunk/tools/editor_trends/config.py |
— | — | @@ -21,46 +21,30 @@ |
22 | 22 | import os |
23 | 23 | import ConfigParser |
24 | 24 | |
25 | | - |
26 | | -import configuration |
27 | | -settings = configuration.Settings() |
28 | 25 | from utils import utils |
29 | 26 | |
30 | 27 | |
31 | | - |
32 | | -def load_configuration(args): |
| 28 | +def create_configuration(settings, args): |
33 | 29 | config = ConfigParser.RawConfigParser() |
34 | | - if not utils.check_file_exists(settings.working_directory, 'wiki.cfg'): |
35 | | - working_directory = raw_input('Please indicate where you installed Editor Trends Analytics.\nCurrent location is %s\nPress Enter to accept default.' % os.getcwd()) |
36 | | - if working_directory == '': |
37 | | - working_directory = os.getcwd() |
38 | 30 | |
39 | | - settings.input_location = raw_input('Please indicate where to store the Wikipedia dump files.\nDefault is: %s\nPress Enter to accept default.' % settings.input_location) |
40 | | - if settings.input_location == '': |
41 | | - settings.input_location = settings.input_location |
42 | | - |
43 | | - create_configuration(working_directory=working_directory, input_location=settings.input_location) |
44 | | - |
45 | | - config.read('wiki.cfg') |
46 | | - settings.working_directory = config.get('file_locations', 'working_directory') |
47 | | - settings.input_location = config.get('file_locations', 'settings.input_location') |
48 | | - |
49 | | - |
50 | | -def create_configuration(**kwargs): |
51 | | - working_directory = kwargs.get('working_directory', settings.working_directory) |
| 31 | + working_directory = raw_input('Please indicate where you installed Editor Trends Analytics.\nCurrent location is %s\nPress Enter to accept default.' % os.getcwd()) |
| 32 | + input_location = raw_input('Please indicate where to store the Wikipedia dump files.\nDefault is: %s\nPress Enter to accept default.' % settings.input_location) |
| 33 | + input_location = input_location if len(input_location) > 0 else settings.input_location |
| 34 | + working_directory = working_directory if len(working_directory) > 0 else os.getcwd() |
| 35 | + |
52 | 36 | config = ConfigParser.RawConfigParser() |
53 | 37 | config.add_section('file_locations') |
54 | 38 | config.set('file_locations', 'working_directory', working_directory) |
55 | | - config.set('file_locations', 'settings.input_location', kwargs.get('settings.input_location', settings.input_location)) |
| 39 | + config.set('file_locations', 'input_location', input_location) |
56 | 40 | |
57 | 41 | fh = utils.create_binary_filehandle(working_directory, 'wiki.cfg', 'wb') |
58 | 42 | config.write(fh) |
59 | | - fh.close() |
| 43 | + fh.close() |
| 44 | + |
| 45 | + settings.working_directory = config.get('file_locations', 'working_directory') |
| 46 | + settings.input_location = config.get('file_locations', 'input_location') |
| 47 | + return settings |
60 | 48 | |
61 | 49 | |
62 | 50 | if __name__ == '__main__': |
63 | | - p =detect_windows_program('7zip') |
64 | | - print p |
65 | | - #load_configuration([]) |
66 | | - |
67 | | - |
| 51 | + pass |
\ No newline at end of file |
Index: trunk/tools/editor_trends/configuration.py |
— | — | @@ -23,6 +23,7 @@ |
24 | 24 | ''' |
25 | 25 | |
26 | 26 | from multiprocessing import cpu_count |
| 27 | +import ConfigParser |
27 | 28 | import os |
28 | 29 | import sys |
29 | 30 | import platform |
— | — | @@ -42,7 +43,7 @@ |
43 | 44 | |
44 | 45 | class Settings(object): |
45 | 46 | |
46 | | - def __init__(self, debug=True, process_multiplier=1): |
| 47 | + def __init__(self, debug=True, process_multiplier=1, **kwargs): |
47 | 48 | self.debug = debug |
48 | 49 | self.progressbar = True |
49 | 50 | self.encoding = 'utf-8' |
— | — | @@ -69,7 +70,8 @@ |
70 | 71 | self.max_filehandles = self.determine_max_filehandles_open() |
71 | 72 | |
72 | 73 | self.windows_register = {'7zip': 'Software\\7-Zip', } |
73 | | - |
| 74 | + self.load_configuration() |
| 75 | + self.set_custom_settings(**kwargs) |
74 | 76 | self.projects = {'commons': 'commonswiki', |
75 | 77 | 'wikibooks': 'wikibooks', |
76 | 78 | 'wikinews': 'wikinews', |
— | — | @@ -88,7 +90,17 @@ |
89 | 91 | 'multilingual wikisource': None |
90 | 92 | } |
91 | 93 | |
| 94 | + def set_custom_settings(self, **kwargs): |
| 95 | + for kw in kwargs: |
| 96 | + setattr(self, kw, kwargs[kw]) |
92 | 97 | |
| 98 | + def load_configuration(self): |
| 99 | + if os.path.exists(os.path.join(self.working_directory, 'wiki.cfg')): |
| 100 | + config = ConfigParser.RawConfigParser() |
| 101 | + config.read(os.path.join(self.working_directory, 'wiki.cfg')) |
| 102 | + self.working_directory = config.get('file_locations', 'working_directory') |
| 103 | + self.input_location = config.get('file_locations', 'input_location') |
| 104 | + |
93 | 105 | def determine_working_directory(self): |
94 | 106 | cwd = os.getcwd() |
95 | 107 | if not cwd.endswith('editor_trends%s' % os.sep): |
— | — | @@ -105,10 +117,9 @@ |
106 | 118 | |
107 | 119 | def verify_environment(self, directories): |
108 | 120 | for dir in directories: |
109 | | - result = os.path.exists(dir) |
110 | | - if not result: |
| 121 | + if not os.path.exists(dir): |
111 | 122 | try: |
112 | | - os.mkdir(dir) |
| 123 | + os.makedirs(dir) |
113 | 124 | except IOError: |
114 | 125 | raise 'Configuration Error, could not create directory.' |
115 | 126 | |
Index: trunk/tools/editor_trends/utils/utils.py |
— | — | @@ -30,6 +30,7 @@ |
31 | 31 | import codecs |
32 | 32 | import os |
33 | 33 | import ctypes |
| 34 | +import time |
34 | 35 | |
35 | 36 | import configuration |
36 | 37 | settings = configuration.Settings() |
— | — | @@ -49,13 +50,21 @@ |
50 | 51 | |
51 | 52 | |
52 | 53 | def convert_timestamp_to_date(timestamp): |
53 | | - return datetime.datetime.strptime(timestamp[:10], settings.DATE_FORMAT) |
| 54 | + return datetime.datetime.strptime(timestamp[:10], settings.date_format) |
54 | 55 | |
55 | 56 | |
56 | | -def convert_timestamp_to_datetime(timestamp): |
57 | | - return datetime.datetime.strptime(timestamp, settings.DATETIME_FORMAT) |
| 57 | +def convert_timestamp_to_datetime_naive(timestamp): |
| 58 | + return datetime.datetime.strptime(timestamp, settings.timestamp_format) |
58 | 59 | |
59 | 60 | |
| 61 | +def convert_timestamp_to_datetime_utc(timestamp): |
| 62 | + tz = datetime.tzinfo('utc') |
| 63 | + d = convert_timestamp_to_datetime_naive(timestamp) |
| 64 | + #return d.replace(tzinfo=tz) #enabling this line crashes pymongo |
| 65 | + return d |
| 66 | + |
| 67 | + |
| 68 | + |
60 | 69 | def check_if_process_is_running(pid): |
61 | 70 | try: |
62 | 71 | if settings.OS == 'Windows': |
Index: trunk/tools/editor_trends/utils/models.py |
— | — | @@ -20,27 +20,29 @@ |
21 | 21 | import multiprocessing |
22 | 22 | |
23 | 23 | |
24 | | -class ProcessInputQueue(multiprocessing.Process): |
| 24 | +class BaseConsumer(multiprocessing.Process): |
25 | 25 | |
26 | | - def __init__(self, target, input_queue, result_queue, **kwargs): |
| 26 | + def __init__(self, task_queue, result_queue): |
27 | 27 | multiprocessing.Process.__init__(self) |
28 | | - self.input_queue = input_queue |
| 28 | + self.task_queue = task_queue |
29 | 29 | self.result_queue = result_queue |
30 | | - self.target = target |
31 | | - for kw in kwargs: |
32 | | - setattr(self, kw, kwargs[kw]) |
33 | 30 | |
34 | | - def start(self): |
35 | | - proc_name = self.name |
36 | | - kwargs = {} |
37 | | - IGNORE = ['input_queue', 'result_queue', 'target'] |
38 | | - for kw in self.__dict__: |
39 | | - if kw not in IGNORE and not kw.startswith('_'): |
40 | | - kwargs[kw] = getattr(self, kw) |
41 | | - self._popen = True |
42 | | - self.target(self.input_queue, self.result_queue, **kwargs) |
43 | 31 | |
| 32 | + |
44 | 33 | |
| 34 | +# for kw in kwargs: |
| 35 | +# setattr(self, kw, kwargs[kw]) |
| 36 | +# |
| 37 | +# def run(self): |
| 38 | +# proc_name = self.name |
| 39 | +# kwargs = {} |
| 40 | +# IGNORE = ['input_queue', 'result_queue', 'target'] |
| 41 | +# for kw in self.__dict__: |
| 42 | +# if kw not in IGNORE and not kw.startswith('_'): |
| 43 | +# kwargs[kw] = getattr(self, kw) |
| 44 | +# self.target(self.input_queue, self.result_queue, **kwargs) |
| 45 | + |
| 46 | + |
45 | 47 | class ProcessResultQueue(multiprocessing.Process): |
46 | 48 | |
47 | 49 | def __init__(self, target, result_queue, **kwargs): |
— | — | @@ -51,12 +53,11 @@ |
52 | 54 | setattr(self, kw, kwargs[kw]) |
53 | 55 | |
54 | 56 | |
55 | | - def start(self): |
| 57 | + def run(self): |
56 | 58 | proc_name = self.name |
57 | 59 | kwargs = {} |
58 | 60 | IGNORE = ['result_queue', 'target'] |
59 | 61 | for kw in self.__dict__: |
60 | 62 | if kw not in IGNORE and not kw.startswith('_'): |
61 | 63 | kwargs[kw] = getattr(self, kw) |
62 | | - self._popen = True |
63 | 64 | self.target(self.result_queue, **kwargs) |
Index: trunk/tools/editor_trends/utils/process_constructor.py |
— | — | @@ -63,9 +63,9 @@ |
64 | 64 | input_queues = {} |
65 | 65 | result_queues = {} |
66 | 66 | |
67 | | - #assert len(obj) == nr_input_processors |
68 | | - #if result_queue: |
69 | | - # assert len(obj)== nr_output_processors |
| 67 | + assert len(obj) == nr_input_processors |
| 68 | + if result_queue: |
| 69 | + assert len(obj)== nr_output_processors |
70 | 70 | |
71 | 71 | for i, o in enumerate(obj): |
72 | 72 | input_queues[i] = load_input_queue(obj[o], poison_pill=poison_pill) |
— | — | @@ -93,7 +93,7 @@ |
94 | 94 | result_processes = [models.ProcessResultQueue(result_processor, |
95 | 95 | result_queues[i], **kwargs) for i in xrange(nr_output_processors)] |
96 | 96 | for result_process in result_processes: |
97 | | - result_process.start() |
| 97 | + result_process.start(result_process.input_queue) |
98 | 98 | |
99 | 99 | for input_process in input_processes: |
100 | 100 | print 'Waiting for input process to finish' |
Index: trunk/tools/editor_trends/utils/sort.py |
— | — | @@ -75,8 +75,10 @@ |
76 | 76 | |
77 | 77 | |
78 | 78 | def readline(file): |
| 79 | + ''' |
| 80 | + @file should be a file object |
| 81 | + ''' |
79 | 82 | for line in file: |
80 | | - print file.stream.name |
81 | 83 | line = line.replace('\n', '') |
82 | 84 | if line == '': |
83 | 85 | continue |
Index: trunk/tools/editor_trends/database/sqlite_logic.py |
— | — | @@ -0,0 +1,156 @@ |
| 2 | +def retrieve_editor_ids_db():
|
| 3 | + contributors = set()
|
| 4 | + connection = db.init_database()
|
| 5 | + cursor = connection.cursor()
|
| 6 | + if settings.PROGRESS_BAR:
|
| 7 | + cursor.execute('SELECT MAX(ROWID) FROM contributors')
|
| 8 | + for id in cursor:
|
| 9 | + pass
|
| 10 | + pbar = progressbar.ProgressBar(maxval=id[0]).start()
|
| 11 | +
|
| 12 | + cursor.execute('SELECT contributor FROM contributors WHERE bot=0')
|
| 13 | +
|
| 14 | + print 'Retrieving contributors...'
|
| 15 | + for x, contributor in enumerate(cursor):
|
| 16 | + contributors.add(contributor[0])
|
| 17 | + if x % 100000 == 0:
|
| 18 | + pbar.update(x)
|
| 19 | + print 'Serializing contributors...'
|
| 20 | + utils.store_object(contributors, 'contributors')
|
| 21 | + print 'Finished serializing contributors...'
|
| 22 | +
|
| 23 | + if pbar:
|
| 24 | + pbar.finish()
|
| 25 | + print 'Total elapsed time: %s.' % (utils.humanize_time_difference(pbar.seconds_elapsed))
|
| 26 | +
|
| 27 | + connection.close()
|
| 28 | +
|
| 29 | +def retrieve_edits_by_contributor(input_queue, result_queue, pbar):
|
| 30 | + connection = db.init_database()
|
| 31 | + cursor = connection.cursor()
|
| 32 | +
|
| 33 | + while True:
|
| 34 | + try:
|
| 35 | + contributor = input_queue.get(block=False)
|
| 36 | + if contributor == None:
|
| 37 | + break
|
| 38 | +
|
| 39 | + cursor.execute('SELECT contributor, timestamp, bot FROM contributors WHERE contributor=?', (contributor,))
|
| 40 | + edits = {}
|
| 41 | + edits[contributor] = set()
|
| 42 | + for edit, timestamp, bot in cursor:
|
| 43 | + date = utils.convert_timestamp_to_date(timestamp)
|
| 44 | + edits[contributor].add(date)
|
| 45 | + #print edit, timestamp, bot
|
| 46 | +
|
| 47 | + utils.write_data_to_csv(edits, retrieve_edits_by_contributor)
|
| 48 | + if pbar:
|
| 49 | + utils.update_progressbar(pbar, input_queue)
|
| 50 | +
|
| 51 | + except Empty:
|
| 52 | + pass
|
| 53 | +
|
| 54 | + connection.close()
|
| 55 | +
|
| 56 | +
|
| 57 | +def store_data_db(data_queue, pids):
|
| 58 | + connection = db.init_database()
|
| 59 | + cursor = connection.cursor()
|
| 60 | + db.create_tables(cursor, db_settings.CONTRIBUTOR_TABLE)
|
| 61 | + empty = 0
|
| 62 | + values = []
|
| 63 | + while True:
|
| 64 | + try:
|
| 65 | + chunk = data_queue.get(block=False)
|
| 66 | + contributor = chunk['contributor'].encode(settings.encoding)
|
| 67 | + article = chunk['article']
|
| 68 | + timestamp = chunk['timestamp'].encode(settings.encoding)
|
| 69 | + bot = chunk['bot']
|
| 70 | + values.append((contributor, article, timestamp, bot))
|
| 71 | +
|
| 72 | + if len(values) == 50000:
|
| 73 | + cursor.executemany('INSERT INTO contributors VALUES (?,?,?,?)', values)
|
| 74 | + connection.commit()
|
| 75 | + #print 'Size of queue: %s' % data_queue.qsize()
|
| 76 | + values = []
|
| 77 | +
|
| 78 | + except Empty:
|
| 79 | +
|
| 80 | + if all([utils.check_if_process_is_running(pid) for pid in pids]):
|
| 81 | + pass
|
| 82 | + else:
|
| 83 | + break
|
| 84 | + connection.close()
|
| 85 | +
|
| 86 | +
|
| 87 | +def create_bots_db(db_name):
|
| 88 | + '''
|
| 89 | + This function reads the csv file provided by Erik Zachte and constructs a
|
| 90 | + sqlite memory database. The reason for this is that I suspect I will need
|
| 91 | + some simple querying capabilities in the future, else a dictionary would
|
| 92 | + suffice.
|
| 93 | + '''
|
| 94 | + connection = db.init_database('db_name')
|
| 95 | + #connection = db.init_database('data/database/bots.db')
|
| 96 | + cursor = connection.cursor()
|
| 97 | + db.create_tables(cursor, db_settings.BOT_TABLE)
|
| 98 | + values = []
|
| 99 | + fields = [field[0] for field in db_settings.BOT_TABLE['bots']]
|
| 100 | + for line in utils.read_data_from_csv('data/csv/StatisticsBots.csv', settings.encoding):
|
| 101 | + line = line.split(',')
|
| 102 | + row = []
|
| 103 | + for x, (field, value) in enumerate(zip(fields, line)):
|
| 104 | + if db_settings.BOT_TABLE['bots'][x][1] == 'INTEGER':
|
| 105 | + value = int(value)
|
| 106 | + elif db_settings.BOT_TABLE['bots'][x][1] == 'TEXT':
|
| 107 | + value = value.replace('/', '-')
|
| 108 | + #print field, value
|
| 109 | + row.append(value)
|
| 110 | + values.append(row)
|
| 111 | +
|
| 112 | + cursor.executemany('INSERT INTO bots VALUES (?,?,?,?,?,?,?,?,?,?);', values)
|
| 113 | + connection.commit()
|
| 114 | + if db_name == ':memory':
|
| 115 | + return cursor
|
| 116 | + else:
|
| 117 | + connection.close()
|
| 118 | +
|
| 119 | +def retrieve_botnames_without_id(cursor, language):
|
| 120 | + return cursor.execute('SELECT name FROM bots WHERE language=?', (language,)).fetchall()
|
| 121 | +
|
| 122 | +
|
| 123 | +def add_id_to_botnames():
|
| 124 | + '''
|
| 125 | + This is the worker function for the multi-process version of
|
| 126 | + lookup_username.First, the names of the bots are retrieved, then the
|
| 127 | + multiprocess is launched by making a call to pc.build_scaffolding. This is a
|
| 128 | + generic launcher that takes as input the function to load the input_queue,
|
| 129 | + the function that will do the main work and the objects to be put in the
|
| 130 | + input_queue. The launcher also accepts optional keyword arguments.
|
| 131 | + '''
|
| 132 | + cursor = create_bots_db(':memory')
|
| 133 | + files = utils.retrieve_file_list(settings.input_location, 'xml')
|
| 134 | +
|
| 135 | + botnames = retrieve_botnames_without_id(cursor, 'en')
|
| 136 | + bots = {}
|
| 137 | + for botname in botnames:
|
| 138 | + bots[botname[0]] = 1
|
| 139 | + pc.build_scaffolding(pc.load_queue, lookup_username, files, bots=bots)
|
| 140 | + cursor.close()
|
| 141 | +
|
| 142 | +
|
| 143 | +def debug_lookup_username():
|
| 144 | + '''
|
| 145 | + This function launches the lookup_username function but then single
|
| 146 | + threaded, this eases debugging. That's also the reason why the queue
|
| 147 | + parameters are set to None. When launching this function make sure that
|
| 148 | + debug=False when calling lookup_username
|
| 149 | + '''
|
| 150 | + cursor = create_bots_db(':memory')
|
| 151 | + botnames = retrieve_botnames_without_id(cursor, 'en')
|
| 152 | + bots = {}
|
| 153 | + for botname in botnames:
|
| 154 | + bots[botname[0]] = 1
|
| 155 | +
|
| 156 | + lookup_username('12.xml', None, None, bots, debug=True)
|
| 157 | + cursor.close()
|
Index: trunk/tools/editor_trends/database/cache.py |
— | — | @@ -70,7 +70,7 @@ |
71 | 71 | |
72 | 72 | def add(self, key, value): |
73 | 73 | if value == 'NEXT': |
74 | | - result = self.insert(key, self.editors[key]['edits']) |
| 74 | + result = self.insert(key, self.editors[key]['edits'], self.editors[key]['username']) |
75 | 75 | self.n -= self.editors[key]['obs'] |
76 | 76 | self.number_editors -= 1 |
77 | 77 | del self.editors[key] |
— | — | @@ -84,11 +84,13 @@ |
85 | 85 | self.editors[key]['edits'] = {} |
86 | 86 | self.add_years(key) |
87 | 87 | self.number_editors += 1 |
88 | | - |
| 88 | + self.editors[key]['username'] = value['username'] |
| 89 | + |
89 | 90 | id = str(self.editors[key]['obs']) |
90 | 91 | year = str(value['date'].year) |
91 | 92 | self.editors[key]['edits'][year].append(value) |
92 | 93 | self.editors[key]['obs'] += 1 |
| 94 | + |
93 | 95 | |
94 | 96 | #if self.editors[key]['obs'] == self.treshold: |
95 | 97 | # self.treshold_editors.add(key) |
— | — | @@ -102,9 +104,9 @@ |
103 | 105 | def update(self, editor, values): |
104 | 106 | self.collection.update({'editor': editor}, {'$pushAll': {'edits': values}}, upsert=True) |
105 | 107 | |
106 | | - def insert(self, editor, values): |
| 108 | + def insert(self, editor, values, username): |
107 | 109 | try: |
108 | | - self.collection.insert({'editor': editor, 'edits': values}) |
| 110 | + self.collection.insert({'editor': editor, 'edits': values, 'username': username}) |
109 | 111 | return True |
110 | 112 | except: |
111 | 113 | return False |