r77457 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r77456‎ | r77457 | r77458 >
Date:05:07, 30 November 2010
Author:diederik
Status:deferred
Tags:
Comment:
* Started implementation of Python logging module
* Rewriting multiprocessing launchers
* Numerous small bugfixes
Modified paths:
  • /trunk/tools/editor_trends/bots/bots.py (modified) (history)
  • /trunk/tools/editor_trends/bots/models.py (modified) (history)
  • /trunk/tools/editor_trends/configuration.py (modified) (history)
  • /trunk/tools/editor_trends/etl/chunker.py (modified) (history)
  • /trunk/tools/editor_trends/etl/exporter.py (modified) (history)
  • /trunk/tools/editor_trends/etl/loader.py (modified) (history)
  • /trunk/tools/editor_trends/etl/models.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/utils/compression.py (modified) (history)
  • /trunk/tools/editor_trends/utils/exceptions.py (modified) (history)
  • /trunk/tools/editor_trends/utils/utils.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -18,6 +18,7 @@
1919 __version__ = '0.1'
2020
2121 import os
 22+import logging
2223 import sys
2324 import datetime
2425 from argparse import ArgumentParser
@@ -32,6 +33,7 @@
3334 import languages
3435 from utils import utils
3536 from utils import dump_downloader
 37+from utils import compression
3638 from etl import chunker
3739 from etl import extract
3840 from etl import loader
@@ -65,12 +67,12 @@
6668 return language_code.split('_')[0]
6769
6870
69 -def retrieve_projectname(args):
70 - language_code = retrieve_language(args)
 71+def get_projectname(args):
 72+ language_code = get_language(args)
7173 if language_code == None:
7274 print 'Entered language: %s is not a valid Wikimedia language' % get_value(args, 'language')
7375 sys.exit(-1)
74 - project = retrieve_project(args)
 76+ project = get_project(args)
7577
7678 if project == None:
7779 print 'Entered project: %s is not valid Wikimedia Foundation project.' % get_value(args, 'project')
@@ -81,59 +83,87 @@
8284 return '%s%s' % (language_code, project)
8385
8486
85 -def retrieve_language(args):
 87+def get_language(args):
8688 language = get_value(args, 'language')
8789 language = language.title()
8890 return languages.MAPPING.get(language, 'en')
8991
9092
91 -def retrieve_project(args):
 93+def get_namespaces(args):
 94+ namespaces = get_value(args, 'namespace')
 95+ if namespaces != None:
 96+ return namespaces.split(',')
 97+ else:
 98+ return namespaces
 99+
 100+def write_message_to_log(logger, args, message=None, verb=None, **kwargs):
 101+ function = get_value(args, 'func')
 102+ logger.debug('Starting %s task' % function.func_name)
 103+ if message:
 104+ logger.debug(message)
 105+ for kw in kwargs:
 106+ if verb:
 107+ logger.debug('Action: %s\tSetting: %s' % (verb, kwargs[kw]))
 108+ else:
 109+ logger.debug('Key: %s\tSetting: %s' % (kw, kwargs[kw]))
 110+
 111+
 112+
 113+def get_project(args):
92114 project = get_value(args, 'project')
93115 if project != 'wiki':
94116 project = settings.projects.get(project, None)
95117 return project
96118
97119
98 -def generate_wikidump_filename(project, args):
99 - return '%s-%s-%s' % (project, 'latest', get_value(args, 'file'))
 120+def generate_wikidump_filename(language_code, project, args):
 121+ return '%s%s-%s-%s' % (language_code, project, 'latest', get_value(args, 'file'))
100122
101123
102 -def determine_file_locations(args):
103 - locations = {}
 124+def determine_file_locations(args, logger):
 125+ config = {}
104126 location = get_value(args, 'location') if get_value(args, 'location') != None else settings.input_location
105 - project = retrieve_project(args)
106 - language_code = retrieve_language(args)
107 - locations['language_code'] = language_code
108 - locations['language'] = get_value(args, 'language')
109 - locations['location'] = os.path.join(location, language_code, project)
110 - locations['chunks'] = os.path.join(locations['location'], 'chunks')
111 - locations['txt'] = os.path.join(locations['location'], 'txt')
112 - locations['sorted'] = os.path.join(locations['location'], 'sorted')
113 - locations['dbready'] = os.path.join(locations['location'], 'dbready')
114 - locations['project'] = project
115 - locations['full_project'] = retrieve_projectname(args)
116 - locations['filename'] = generate_wikidump_filename(project, args)
117 - locations['collection'] = get_value(args, 'collection')
118 - locations['directories'] = [locations['chunks'], locations['location'], locations['txt'], locations['sorted'], locations['dbready']]
119 - return locations
 127+ project = get_project(args)
 128+ language_code = get_language(args)
 129+ config['language_code'] = language_code
 130+ config['language'] = get_value(args, 'language')
 131+ config['location'] = os.path.join(location, language_code, project)
 132+ config['chunks'] = os.path.join(config['location'], 'chunks')
 133+ config['txt'] = os.path.join(config['location'], 'txt')
 134+ config['sorted'] = os.path.join(config['location'], 'sorted')
 135+ config['dbready'] = os.path.join(config['location'], 'dbready')
 136+ config['project'] = project
 137+ config['full_project'] = get_projectname(args)
 138+ config['filename'] = generate_wikidump_filename(language_code, project, args)
 139+ config['collection'] = get_value(args, 'collection')
 140+ config['namespaces'] = get_namespaces(args)
 141+ config['directories'] = [config['location'], config['chunks'], config['txt'], config['sorted'], config['dbready']]
120142
 143+ message = 'Settings as generated from the configuration module.'
 144+ write_message_to_log(logger, args, message, None, **config)
 145+ #for c in config:
 146+ # logger.debug('Key: %s - Setting: %s' % (c, config[c]))
 147+ return config
121148
122 -def show_settings(args, **kwargs):
123 - project = settings.projects.get(kwargs.pop('project'), 'wiki')
 149+
 150+def show_settings(args, logger, **kwargs):
 151+ language_map = languages.language_map()
 152+ language = kwargs.pop('language')
124153 language_code = kwargs.pop('language_code')
125 - language = kwargs.pop('language')
126 - location = kwargs.pop('location')
127 - project = project.title()
128 - language_map = languages.language_map()
129 - print 'Project: %s' % (project)
130 - print 'Language: %s / %s' % (language_map[language_code].decode(settings.encoding), language.decode(settings.encoding))
131 - print 'Input directory: %s' % location
132 - print 'Output directory: %s and subdirectories' % location
 154+ config = {}
 155+ config['Project'] = settings.projects.get(kwargs.pop('project'), 'wiki').title()
 156+ config['Language'] = '%s / %s' % (language_map[language_code].decode(settings.encoding), language.decode(settings.encoding))
 157+ config['Input directory'] = kwargs.get('location')
 158+ config['Output directory'] = '%s and subdirectories' % kwargs.get('location')
133159
 160+ message = 'Final settings after parsing command line arguments:'
 161+ write_message_to_log(logger, args, message, None, **config)
134162
135 -def dump_downloader_launcher(args, **kwargs):
 163+
 164+def dump_downloader_launcher(args, logger, **kwargs):
136165 print 'dump downloader'
137166 timer = Timer()
 167+ write_message_to_log(logger, args, **kwargs)
138168 filename = kwargs.get('filename')
139169 extension = kwargs.get('extension')
140170 location = kwargs.get('location')
@@ -146,37 +176,42 @@
147177 timer.elapsed()
148178
149179
150 -def chunker_launcher(args, **kwargs):
 180+def chunker_launcher(args, logger, **kwargs):
151181 print 'split_settings.input_filename_launcher'
152182 timer = Timer()
 183+ write_message_to_log(logger, args, **kwargs)
153184 filename = kwargs.pop('filename')
154 - filename = 'en-latest-pages-meta-history.xml.bz2'
155185 location = kwargs.pop('location')
156186 project = kwargs.pop('project')
157187 language = kwargs.pop('language')
158188 language_code = kwargs.pop('language_code')
 189+ namespaces = kwargs.pop('namespaces')
 190+
159191 ext = utils.determine_file_extension(filename)
160 - if ext in settings.compression.keys():
161 - file = filename.replace('.'+ ext, '')
 192+ file = filename.replace('.' + ext, '')
162193 result = utils.check_file_exists(location, file)
163194 if not result:
164 - retcode = launch_zip_extractor(args, location, filename, ext)
 195+ retcode = launch_zip_extractor(args, location, filename)
165196 else:
166197 retcode = 0
167198 if retcode != 0:
168199 sys.exit(retcode)
169 - chunker.split_file(location, file, project, language_code, language)
 200+
 201+ chunker.split_file(location, file, project, language_code, namespaces, format='xml', zip=False)
170202 timer.elapsed()
171203
172204
173 -def launch_zip_extractor(args, location, file, ext):
 205+def launch_zip_extractor(args, location, file):
174206 timer = Timer()
175 - utils.zip_extract(location, file, ext)
 207+ write_message_to_log(logger, args, location=location, file=file)
 208+ compressor = compression.Compressor(location, file)
 209+ compressor.extract()
176210 timer.elapsed()
177211
178212
179 -def extract_launcher(args, **kwargs):
 213+def extract_launcher(args, logger, **kwargs):
180214 timer = Timer()
 215+ write_message_to_log(logger, args, **kwargs)
181216 location = kwargs.pop('location')
182217 language_code = kwargs.pop('language_code')
183218 project = kwargs.pop('project')
@@ -184,8 +219,9 @@
185220 timer.elapsed()
186221
187222
188 -def sort_launcher(args, **kwargs):
 223+def sort_launcher(args, logger, **kwargs):
189224 timer = Timer()
 225+ write_message_to_log(logger, args, **kwargs)
190226 location = kwargs.pop('location')
191227 input = os.path.join(location, 'txt')
192228 output = os.path.join(location, 'sorted')
@@ -196,8 +232,9 @@
197233 timer.elapsed()
198234
199235
200 -def store_launcher(args, **kwargs):
 236+def store_launcher(args, logger, **kwargs):
201237 timer = Timer()
 238+ write_message_to_log(logger, args, **kwargs)
202239 location = kwargs.pop('location')
203240 input = os.path.join(location, 'dbready')
204241 dbname = kwargs.pop('full_project')
@@ -206,37 +243,47 @@
207244 timer.elapsed()
208245
209246
210 -def transformer_launcher(args, **kwargs):
 247+def transformer_launcher(args, logger, **kwargs):
211248 print 'dataset launcher'
212249 timer = Timer()
 250+ write_message_to_log(logger, args, **kwargs)
213251 project = kwargs.pop('full_project')
214252 collection = kwargs.pop('collection')
215253 transformer.run_optimize_editors(project, collection)
216254 timer.elapsed()
217255
218256
219 -def exporter_launcher(args, **kwargs):
 257+def exporter_launcher(args, logger, **kwargs):
220258 timer = Timer()
 259+ write_message_to_log(logger, args, **kwargs)
221260 project = kwargs.pop('full_project')
222261 exporter.generate_editor_dataset_launcher(project)
223262 timer.elapsed()
224263
225264
226 -def all_launcher(args, **kwargs):
 265+def all_launcher(args, logger, **kwargs):
227266 print 'all_launcher'
228267 timer = Timer()
 268+ message = 'Starting '
 269+ write_message_to_log(logger, args, message, **kwargs)
229270 ignore = get_value(args, 'except')
 271+ clean = get_value(args, 'clean')
 272+ if clean:
 273+ dirs = kwargs.get('directories')[1:]
 274+ for dir in dirs:
 275+ write_message_to_log(logger, args, verb='Deleting', **kwargs)
 276+ utils.delete_file(dir, '')
230277 functions = {dump_downloader_launcher: 'download',
231278 chunker_launcher: 'split',
232279 extract_launcher: 'extract',
233280 sort_launcher: 'sort',
234 - transformer_launcher: 'transform',
 281+ transformer_launcher: 'transform',
235282 exporter_launcher: 'export'
236283 }
237284 for function, callname in functions.iteritems():
238285 if callname not in ignore:
239286 function(args, **kwargs)
240 -
 287+
241288 timer.elapsed()
242289
243290
@@ -265,8 +312,9 @@
266313 print '%s' % language
267314
268315
269 -def detect_python_version():
 316+def detect_python_version(logger):
270317 version = sys.version_info[0:2]
 318+ logger.debug('Python version: %s' % '.'.join(str(version)))
271319 if version < settings.minimum_python_version:
272320 raise 'Please upgrade to Python 2.6 or higher (but not Python 3.x).'
273321
@@ -279,6 +327,9 @@
280328
281329
282330 def main():
 331+ logger = logging.getLogger('manager')
 332+ logger.setLevel(logging.DEBUG)
 333+
283334 default_language = determine_default_language()
284335 file_choices = ('stub-meta-history.xml.gz',
285336 'stub-meta-current.xml.gz',
@@ -303,6 +354,7 @@
304355 parser_download.set_defaults(func=dump_downloader_launcher)
305356
306357 parser_split = subparsers.add_parser('split', help='The split sub command splits the downloaded file in smaller chunks to parallelize extracting information.')
 358+
307359 parser_split.set_defaults(func=chunker_launcher)
308360
309361 parser_create = subparsers.add_parser('extract', help='The store sub command parsers the XML chunk files, extracts the information and stores it in a MongoDB.')
@@ -317,7 +369,7 @@
318370 help='Name of MongoDB collection',
319371 default='editors')
320372
321 - parser_transform = subparsers.add_parser('transform', help='Transform the raw datatabe to an enriched dataset that can be exported.')
 373+ parser_transform = subparsers.add_parser('transform', help='Transform the raw datatable to an enriched dataset that can be exported.')
322374 parser_transform.set_defaults(func=transformer_launcher)
323375 parser_transform.add_argument('-c', '--collection', action='store',
324376 help='Name of MongoDB collection',
@@ -331,8 +383,11 @@
332384 parser_all.add_argument('-e', '--except', action='store',
333385 help='Should be a list of functions that are to be ignored when executing \'all\'.',
334386 default=[])
335 -
 387+ parser_all.add_argument('-n', '--new', action='store_false',
 388+ help='This will delete all previous output and starts from scratch. Mostly useful for debugging purposes.',
 389+ default=False)
336390
 391+
337392 parser.add_argument('-l', '--language', action='store',
338393 help='Example of valid languages.',
339394 choices=supported_languages(),
@@ -346,25 +401,29 @@
347402 parser.add_argument('-o', '--location', action='store',
348403 help='Indicate where you want to store the downloaded file.',
349404 )
350 - #default=settings.input_location)
351405
 406+ parser.add_argument('-n', '--namespace', action='store',
 407+ help='A list of namespaces to include for analysis.',
 408+ default='0')
 409+
 410+
352411 parser.add_argument('-f', '--file', action='store',
353412 choices=file_choices,
354413 help='Indicate which dump you want to download. Valid choices are:\n %s' % ''.join([f + ',\n' for f in file_choices]),
355 - default='stub-meta-current.xml.gz')
 414+ default='stub-meta-history.xml.gz')
356415
357416 parser.add_argument('-prog', '--progress', action='store_true', default=True,
358417 help='Indicate whether you want to have a progressbar.')
359418
360 - detect_python_version()
 419+ detect_python_version(logger)
361420 about()
362421 args = parser.parse_args()
363422 config.create_configuration(settings, args)
364 - locations = determine_file_locations(args)
 423+ locations = determine_file_locations(args, logger)
365424 settings.verify_environment(locations['directories'])
366 - show_settings(args, **locations)
 425+ show_settings(args, logger, **locations)
367426 #locations['settings'] = settings
368 - args.func(args, **locations)
 427+ args.func(args, logger, **locations)
369428 t1 = datetime.datetime.now()
370429
371430
Index: trunk/tools/editor_trends/etl/exporter.py
@@ -21,7 +21,7 @@
2222 import datetime
2323 from dateutil.relativedelta import *
2424 import calendar
25 -from multiprocessing import Queue
 25+import multiprocessing
2626 from Queue import Empty
2727
2828
@@ -32,8 +32,6 @@
3333 from utils import models, utils
3434 from database import db
3535 from etl import shaper
36 -from utils import process_constructor as pc
37 -import progressbar
3836
3937 try:
4038 import psyco
@@ -181,10 +179,10 @@
182180 return headers
183181
184182
185 -def generate_long_editor_dataset(input_queue, vars, **kwargs):
186 - dbname = kwargs.pop('dbname')
 183+def generate_long_editor_dataset(tasks, dbname, collection, **kwargs):
187184 mongo = db.init_mongo_db(dbname)
188 - editors = mongo['dataset']
 185+ vars = ['monthly_edits']
 186+ editors = mongo[collection + 'dataset']
189187 name = dbname + '_long_editors.csv'
190188 #fh = utils.create_txt_filehandle(settings.dataset_location, name, 'w', settings.encoding)
191189 vars_to_expand = []
@@ -202,11 +200,9 @@
203201 ld.write_longitudinal_data()
204202
205203
206 -def generate_cohort_analysis(input_queue, **kwargs):
207 - dbname = kwargs.get('dbname')
208 - pbar = kwargs.get('pbar')
 204+def generate_cohort_dataset(tasks, dbname, collection, **kwargs):
209205 mongo = db.init_mongo_db(dbname)
210 - editors = mongo['dataset']
 206+ editors = mongo[collection + 'dataset']
211207 year = datetime.datetime.now().year + 1
212208 begin = year - 2001
213209 p = [3, 6, 9]
@@ -215,7 +211,7 @@
216212 data = {}
217213 while True:
218214 try:
219 - id = input_queue.get(block=False)
 215+ id = tasks.get(block=False)
220216 obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1})
221217 first_edit = obs['first_edit']
222218 last_edit = obs['final_edit']
@@ -244,7 +240,6 @@
245241 p = min(edits)
246242 data[y]['n'] += 1
247243 data[y][p] += 1
248 - #pbar.update(+1)
249244 except Empty:
250245 break
251246 utils.store_object(data, settings.binary_location, 'cohort_data')
@@ -257,20 +252,16 @@
258253 return False
259254
260255
261 -def generate_wide_editor_dataset(input_queue, **kwargs):
262 - dbname = kwargs.pop('dbname')
 256+def generate_wide_editor_dataset(tasks, dbname, collection, **kwargs):
263257 mongo = db.init_mongo_db(dbname)
264 - editors = mongo['dataset']
 258+ editors = mongo[collection + 'dataset']
265259 name = dbname + '_wide_editors.csv'
266260 fh = utils.create_txt_filehandle(settings.dataset_location, name, 'a', settings.encoding)
267261 x = 0
268262 vars_to_expand = ['edits', 'edits_by_year', 'articles_by_year']
269263 while True:
270264 try:
271 - if debug:
272 - id = u'99797'
273 - else:
274 - id = input_queue.get(block=False)
 265+ id = input_queue.get(block=False)
275266 print input_queue.qsize()
276267 obs = editors.find_one({'editor': id})
277268 obs = expand_observations(obs, vars_to_expand)
@@ -292,49 +283,25 @@
293284 fh.close()
294285
295286
296 -def retrieve_edits_by_contributor_launcher():
297 - pc.build_scaffolding(pc.load_queue, retrieve_edits_by_contributor, 'contributors')
 287+def dataset_launcher(dbname, collection, target):
 288+ editors = retrieve_editor_ids_mongo(dbname, collection)
 289+ tasks = multiprocessing.JoinableQueue()
 290+ consumers = [multiprocessing.Process(target=target, args=(tasks, dbname, collection)) for i in xrange(settings.number_of_processes)]
 291+ for editor in editors:
 292+ tasks.put(editor)
 293+ print 'The queue contains %s editors.' % tasks.qsize()
 294+ for x in xrange(settings.number_of_processes):
 295+ tasks.put(None)
298296
 297+ for w in consumers:
 298+ w.start()
299299
300 -def debug_retrieve_edits_by_contributor_launcher(dbname):
301 - kwargs = {'debug': False,
302 - 'dbname': dbname,
303 - }
304 - ids = retrieve_editor_ids_mongo(dbname, 'editors')
305 - input_queue = pc.load_queue(ids)
306 - q = Queue()
307 - generate_editor_dataset(input_queue, q, False, kwargs)
 300+ tasks.join()
308301
309302
310 -def generate_editor_dataset_launcher(dbname):
311 - kwargs = {'nr_input_processors': 1,
312 - 'nr_output_processors': 1,
313 - 'debug': False,
314 - 'dbname': dbname,
315 - 'poison_pill':False,
316 - 'pbar': True
317 - }
318 - ids = retrieve_editor_ids_mongo(dbname, 'editors')
319 - ids = list(ids)
320 - chunks = dict({0: ids})
321 - pc.build_scaffolding(pc.load_queue, generate_cohort_analysis, chunks, False, False, **kwargs)
322 -
323 -
324 -def generate_editor_dataset_debug(dbname):
325 - ids = retrieve_editor_ids_mongo(dbname, 'editors')
326 - #ids = list(ids)[:1000]
327 - input_queue = pc.load_queue(ids)
328 - kwargs = {'nr_input_processors': 1,
329 - 'nr_output_processors': 1,
330 - 'debug': True,
331 - 'dbname': dbname,
332 - }
333 - #generate_editor_dataset(input_queue, False, False, kwargs)
334 - vars = ['monthly_edits']
335 - generate_long_editor_dataset(input_queue, vars, **kwargs)
336 -
337303 if __name__ == '__main__':
338 - #generate_editor_dataset_debug('test')
339 - #generate_editor_dataset_launcher('enwiki')
340 - generate_editor_dataset_debug('enwiki')
341 - #debug_retrieve_edits_by_contributor_launcher()
 304+ dbname = 'enwiki'
 305+ collection = 'editors'
 306+ dataset_launcher(dbname, collection, generate_cohort_dataset)
 307+ dataset_launcher(dbname, collection, generate_long_editor_dataset)
 308+ dataset_launcher(dbname, collection, generate_wide_editor_dataset)
Index: trunk/tools/editor_trends/etl/chunker.py
@@ -87,7 +87,7 @@
8888 '''
8989 ns = []
9090 for namespace in namespaces:
91 - if int(namespace) not in include:
 91+ if namespace not in include:
9292 value = namespaces[namespace].get(u'*', None)
9393 ns.append(value)
9494 return ns
@@ -165,8 +165,14 @@
166166 return flat
167167
168168
169 -def split_file(location, file, project, language_code, include, format='xml', zip=False):
170 - '''Reads xml file and splits it in N chunks'''
 169+def split_file(location, file, project, language_code, namespaces=[0], format='xml', zip=False):
 170+ '''
 171+ Reads xml file and splits it in N chunks
 172+
 173+ @namespaces is a list indicating which namespaces should be included, default
 174+ is to include namespace 0 (main namespace)
 175+ @zip indicates whether to compress the chunk or not
 176+ '''
171177 #location = os.path.join(settings.input_location, language)
172178 input = os.path.join(location, file)
173179 output = os.path.join(location, 'chunks')
@@ -178,7 +184,7 @@
179185 fh = utils.create_txt_filehandle(output, '%s.tsv' % f, 'w', settings.encoding)
180186
181187 ns = load_namespace(language_code)
182 - ns = build_namespaces_locale(ns, include)
 188+ ns = build_namespaces_locale(ns, namespaces)
183189
184190 counter = 0
185191 tag = '{%s}page' % settings.xml_namespace
Index: trunk/tools/editor_trends/etl/models.py
@@ -52,19 +52,19 @@
5353 self.lock = None
5454 for kw in kwargs:
5555 setattr(self, kw, kwargs[kw])
56 -
 56+
5757 def create_file_handle(self):
5858 self.mode = 'a'
5959 if self.output_file == None:
6060 self.mode = 'w'
6161 self.output_file = self.file[:-4] + '.txt'
62 -
 62+
6363 self.fh = utils.create_txt_filehandle(self.output, self.output_file, self.mode, settings.encoding)
6464
6565 def __str__(self):
6666 return '%s' % (self.file)
6767
68 - def __call__(self):
 68+ def __call__(self, bots=None):
6969 if settings.debug:
7070 messages = {}
7171 vars = {}
@@ -104,5 +104,5 @@
105105
106106 if settings.debug:
107107 utils.report_error_messages(messages, self.target)
108 -
109 - return self.bots
\ No newline at end of file
 108+
 109+ return self.bots
Index: trunk/tools/editor_trends/etl/loader.py
@@ -19,19 +19,21 @@
2020
2121 import os
2222 import sys
 23+import multiprocessing
2324 from Queue import Empty
2425
2526 sys.path.append('..')
26 -import configuration
 27+import configuration
2728 settings = configuration.Settings()
2829 from database import db
2930 from database import cache
3031 from utils import utils
3132 from utils import sort
32 -import process_constructor as pc
3333
 34+#import process_constructor as pc
3435
3536
 37+
3638 def store_editors(input, dbname, collection):
3739 filename = utils.retrieve_file_list(input, 'txt', mask=None)[0]
3840 fh = utils.create_txt_filehandle(input, filename, 'r', settings.encoding)
@@ -47,7 +49,7 @@
4850 for line in sort.readline(fh):
4951 if len(line) == 0:
5052 continue
51 - contributor = int(line[0])
 53+ contributor = int(line[0])
5254 if prev_contributor != contributor:
5355 if edits >= 10:
5456 result = editor_cache.add(prev_contributor, 'NEXT')
@@ -94,31 +96,47 @@
9597 filehandles = [fh.close() for fh in filehandles]
9698 filename = 'merged_final.txt'
9799 for r in to_remove:
98 - utils.delete_file(output ,r)
99 -
100 -
 100+ utils.delete_file(output , r)
101101
102102
103 -def mergesort_feeder(task_queue, **kwargs):
104 - input = kwargs.get('input', None)
105 - output = kwargs.get('output', None)
106 - #while True:
107 - # try:
108 - #file = input_queue.get(block=False)
109 - for file in task_queue:
110 - fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding)
111 - data = fh.readlines()
112 - fh.close()
113 - data = [d.replace('\n', '') for d in data]
114 - data = [d.split('\t') for d in data]
115 - sorted_data = sort.mergesort(data)
116 - sort.write_sorted_file(sorted_data, file, output)
117103
118104
 105+def mergesort_feeder(tasks, input, output):
 106+ while True:
 107+ try:
 108+ file = tasks.get(block=False)
 109+ print file
 110+ if file == None:
 111+ break
 112+ fh = utils.create_txt_filehandle(input, file, 'r', settings.encoding)
 113+ data = fh.readlines()
 114+ fh.close()
 115+ data = [d.replace('\n', '') for d in data]
 116+ data = [d.split('\t') for d in data]
 117+ sorted_data = sort.mergesort(data)
 118+ sort.write_sorted_file(sorted_data, file, output)
 119+ except Empty:
 120+ break
 121+
 122+
119123 def mergesort_launcher(input, output):
120124 settings.verify_environment([input, output])
121125 files = utils.retrieve_file_list(input, 'txt')
122 - mergesort_feeder(files, input=input, output=output)
 126+ tasks = multiprocessing.JoinableQueue()
 127+ consumers = [multiprocessing.Process(target=mergesort_feeder, args=(tasks, input, output)) for i in xrange(settings.number_of_processes)]
 128+ for file in files:
 129+ tasks.put(file)
 130+
 131+ for x in xrange(settings.number_of_processes):
 132+ tasks.put(None)
 133+
 134+ for w in consumers:
 135+ w.start()
 136+
 137+ tasks.join()
 138+ #mergesort_feeder(file, input=input, output=output)
 139+
 140+
123141 #chunks = utils.split_list(files, settings.number_of_processes)
124142 #pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs)
125143
@@ -138,7 +156,7 @@
139157 input = os.path.join(settings.input_location, 'en', 'wiki', 'txt')
140158 output = os.path.join(settings.input_location, 'en', 'wiki', 'sorted')
141159 dbname = 'enwiki'
142 - #mergesort_launcher(input, output)
 160+ mergesort_launcher(input, output)
143161 final_output = os.path.join(settings.input_location, 'en', 'wiki', 'dbready')
144162 mergesort_external_launcher(dbname, output, final_output)
145 - store_editors(input, dbname, collection)
\ No newline at end of file
 163+ store_editors(input, dbname, collection)
Index: trunk/tools/editor_trends/configuration.py
@@ -152,13 +152,13 @@
153153 if not program.endswith('.exe'):
154154 program = program + '.exe'
155155 path = self.detect_windows_program(program)
 156+ if path != None:
 157+ path = path + program
156158 elif self.platform == 'Linux':
157159 path = self.detect_linux_program(program)
158 - if path != None:
159 - return path + program
160 - else:
161 - return path
162160
 161+ return path
 162+
163163 def determine_max_filehandles_open(self):
164164 if self.platform == 'Windows' and self.architecture == 'i386':
165165 return win32file._getmaxstdio()
Index: trunk/tools/editor_trends/utils/utils.py
@@ -220,10 +220,10 @@
221221 keys.sort()
222222 for key in keys:
223223 if write_key:
224 - fh.write('%s' % key)
 224+ fh.write('%s\t' % key)
225225 if getattr(data[key], '__iter__', False):
226226 for d in data[key]:
227 - fh.write('\t%s' % d)
 227+ fh.write('%s\t' % d)
228228 else:
229229 fh.write('%s\t' % (data[key]))
230230 if newline:
Index: trunk/tools/editor_trends/utils/compression.py
@@ -18,8 +18,9 @@
1919
2020 def __init__(self, location, file, output=None):
2121 self.extension = utils.determine_file_extension(file)
22 - self.file = os.path.join(location, file)
 22+ self.file = file
2323 self.location = location
 24+ self.path = os.path.join(self.file, self.location)
2425 self.output = None
2526 self.name = None
2627 self.program = []
@@ -42,20 +43,20 @@
4344 '''
4445 if self.program == []:
4546 self.init_compression_tool(self.extension, 'compress')
46 -
 47+
4748 if self.program_installed == None:
4849 raise exceptions.CompressionNotSupportedError
49 -
 50+
5051 args = {'7z': ['%s' % self.program_installed, 'a', '-scsUTF-8', '-t%s' % self.compression, '%s' % self.output, '%s' % self.input],
5152 }
52 -
 53+
5354 commands = args.get(self.name, None)
5455 if commands != None:
5556 p = subprocess.Popen(commands, shell=True).wait()
5657 else:
5758 raise exceptions.CompressionNotSupportedError
58 -
5959
 60+
6061 def extract(self):
6162 '''
6263 @location is the directory where to store the compressed file
@@ -68,18 +69,25 @@
6970 if self.program_installed == None:
7071 raise exceptions.CompressionNotSupportedError
7172
72 - args = {'7z': ['%s' % self.program_installed,'e', '-o%s' % self.location, '%s' % self.file],
73 - 'bunzip2': ['%s' % self.program_installed, '-k', '%s' % self.file],
74 - 'zip': ['%s' % self.program_installed, '-o', '%s' % self.file],
75 - 'gz': ['%s' % self.program_installed, '-xzvf', '%s' % self.file],
76 - 'tar': ['%s' % self.program_installed, '-xvf', '%s' % self.file]
 73+ print self.location
 74+ print self.file
 75+ if not utils.check_file_exists(self.location, self.file):
 76+ raise exceptions.FileNotFoundException(self.location, self.file)
 77+
 78+
 79+
 80+ args = {'7z': ['%s' % self.program_installed, 'e', '-y', '-o%s' % self.location, '%s' % self.path],
 81+ 'bunzip2': ['%s' % self.program_installed, '-k', '%s' % self.path],
 82+ 'zip': ['%s' % self.program_installed, '-o', '%s' % self.path],
 83+ 'gz': ['%s' % self.program_installed, '-xzvf', '%s' % self.path],
 84+ 'tar': ['%s' % self.program_installed, '-xvf', '%s' % self.path]
7785 }
7886 commands = args.get(self.name, None)
7987 if commands != None:
8088 p = subprocess.Popen(commands, shell=True).wait()
8189 else:
8290 raise exceptions.CompressionNotSupportedError
83 -
 91+
8492 # if self.name == '7z':
8593 # p = subprocess.Popen(['%s' % tool.extract_installed, 'e', '-o%s' % location, '%s' % input], shell=True).wait()
8694 # elif tool_extract_installed.endswith('bunzip2'):
@@ -112,7 +120,7 @@
113121 path = settings.detect_installed_program(p)
114122 if path != None:
115123 self.name = p
116 - self.program_installed = path
 124+ self.program_installed = path
117125
118126
119127 if __name__ == '__main__':
Index: trunk/tools/editor_trends/utils/exceptions.py
@@ -23,12 +23,13 @@
2424 pass
2525
2626 class FileNotFoundException(Error):
27 - def __init__(self, file):
28 - self.file = file
 27+ def __init__(self, file, path):
 28+ self.file = file
 29+ self.path = path
2930
3031 def __str__(self):
31 - print 'The file %s was not found. Please make sure your path is up-to-date.' % self.file
32 -
 32+ print 'The file %s was not found. Please make sure that the file exists and the path is correct.' % (os.path.join(self.path, self.file))
 33+
3334 class PlatformNotSupportedError(Error):
3435 def __init__(self, platform):
3536 self.platform = platform
@@ -39,6 +40,6 @@
4041 class CompressionNotSupportedError(Error):
4142 def __init__(self, extension):
4243 self.extension = extension
43 -
 44+
4445 def __str__(self):
45 - print 'You have not installed a program to extract %s archives.' % self.extension
\ No newline at end of file
 46+ print 'You have not installed a program to extract %s archives.' % self.extension
Index: trunk/tools/editor_trends/bots/models.py
@@ -31,12 +31,17 @@
3232
3333 class Bot(object):
3434
35 - def __init__(self, name):
 35+ def __init__(self, name, **kwargs):
3636 self.name = name
3737 self.projects = []
3838 self.time = shaper.create_datacontainer(datatype='list')
3939 self.verified = True
 40+ for kw in kwargs:
 41+ setattr(self, kw, kwargs[kw])
4042
 43+ def __repr__(self):
 44+ return self.name
 45+
4146 def hours_active(self):
4247 self.clock = shaper.create_clock()
4348 years = self.time.keys()
@@ -49,38 +54,42 @@
5055 hours.sort()
5156 for hour in hours:
5257 self.data.append(self.clock[hour])
53 -
54 -
5558
 59+
 60+ def active(self):
 61+ return float(sum(self.clock.values())) / 24.0
 62+
 63+
5664 def avg_lag_between_edits(self):
5765 years = self.time.keys()
5866 edits = []
5967 for year in years:
6068 for x in self.time[year]:
6169 edits.append(x)
62 -
 70+
6371 if edits != []:
6472 edits.sort()
6573 dt = datetime.timedelta()
6674 for i, edit in enumerate(edits):
67 - if i == len(edits) -1:
 75+ if i == len(edits) - 1:
6876 break
6977 dt += edits[i + 1] - edits[i]
7078 dt = dt / len(edits)
7179 self.dt = dt
72 - self.n = i
 80+ self.n = i
7381 else:
7482 self.dt = None
75 -
 83+
7684 def write_training_dataset(self, fh):
7785 self.data = []
7886 self.data.append(self.name)
7987 self.data.append(self.verified)
8088 self.add_clock_data()
 89+ self.active()
8190 self.data.append(self.dt)
8291 utils.write_list_to_csv(self.data, fh, recursive=False, newline=True)
83 -
8492
8593
 94+
8695 if __name__ == '__main__':
8796 pass
Index: trunk/tools/editor_trends/bots/bots.py
@@ -44,13 +44,16 @@
4545 pass
4646
4747
48 -def read_bots_csv_file(manager, location, filename, encoding):
 48+def read_bots_csv_file(location, filename, encoding, manager=False):
4949 '''
5050 Constructs a dictionary from Bots.csv
5151 key is language
5252 value is a list of bot names
5353 '''
54 - bot_dict = manager.dict()
 54+ if manager:
 55+ bot_dict = manager.dict()
 56+ else:
 57+ bot_dict = dict()
5558 for line in utils.read_data_from_csv(location, filename, encoding):
5659 line = utils.clean_string(line)
5760 language, bots = line.split(',')
@@ -84,7 +87,7 @@
8588 This file reads the results from the lookup_bot_userid function and stores
8689 it in a MongoDB collection.
8790 '''
88 - bots = utils.read_dict_from_csv(settings.csv_location, 'bots_ids.csv', settings.encoding)
 91+ #bots = utils.read_dict_from_csv(settings.csv_location, 'bots_ids.csv', settings.encoding)
8992 mongo = db.init_mongo_db('bots')
9093 collection = mongo['ids']
9194 db.remove_documents_from_mongo_db(collection, None)
@@ -124,42 +127,47 @@
125128 if username == None or username.text == None:
126129 continue
127130 else:
128 - username = username.text
 131+ username = username.text #encode(settings.encoding)
 132+ name = username.lower()
 133+
129134 #print username.encode('utf-8')
130135 if username in bots and bots[username].verified == True:
131136 id = contributor.find('id').text
132137 bot = bots[username]
133138
134139 if not hasattr(bot, 'written'):
135 - bot_dict = convert_object_to_dict(bot, exclude=['time', 'name', 'written'])
136 - bot_dict['_username'] = username
137 - bot_dict['id'] = id
 140+ if username == 'Robbot':
 141+ print 'test'
 142+ bot.id = id
 143+ bot.name = username
 144+ bot_dict = convert_object_to_dict(bot, exclude=['time', 'written'])
 145+ #bot_dict['_username'] = username
 146+ #bot_dict['id'] = id
138147 lock.acquire()
139148 utils.write_dict_to_csv(bot_dict, fh, write_key=False)
140149 lock.release()
141 - bot.written = True
 150+ bot.written = True
 151+ bots[username] = bot
142152 #bots.pop(username)
143153 #if bots == {}:
144154 # print 'Found id numbers for all bots.'
145155 # return 'break'
146156 #print username.encode('utf-8')
147 - name = username.lower()
 157+
148158 if name.find('bot') > -1:
149 - bot = bots.get(username, botmodels.Bot(username))
150 - if bot not in bots:
151 - bot.verified = False
 159+ bot = bots.get(username, botmodels.Bot(username, verified=False))
152160 timestamp = revision.find('timestamp').text
153161 if timestamp != None:
154162 timestamp = utils.convert_timestamp_to_datetime_naive(timestamp)
155163 bot.time[str(timestamp.year)].append(timestamp)
156164 bots[username] = bot
157 -
 165+ return bots
158166 #bot = bots.get('PseudoBot')
159167 #bot.hours_active()
160168 #bot.avg_lag_between_edits()
161169
162170
163 -def bot_launcher(language_code, project, single=False):
 171+def bot_launcher(language_code, project, single=False, manager=False):
164172 '''
165173 This function sets the stage to launch bot id detection and collecting data
166174 to discover new bots.
@@ -171,25 +179,28 @@
172180 files = utils.retrieve_file_list(input, 'xml', mask=None)
173181 input_queue = pc.load_queue(files, poison_pill=True)
174182 tasks = multiprocessing.JoinableQueue()
175 - manager = multiprocessing.Manager()
176 - bots = manager.dict()
177 - lock = manager.Lock()
178 - bots = read_bots_csv_file(manager, settings.csv_location, 'Bots.csv', settings.encoding)
 183+ mgr = multiprocessing.Manager()
 184+ bots = mgr.dict()
 185+ lock = mgr.Lock()
 186+ if manager:
 187+ manager = mgr
 188+ bots = read_bots_csv_file(settings.csv_location, 'Bots.csv', settings.encoding, manager=manager)
179189
180190 for file in files:
181191 tasks.put(models.XMLFile(input, settings.csv_location, file, bots, lookup_bot_userid, 'bots_ids.csv', lock=lock))
182192
 193+ tracker = {}
183194 if single:
184195 while True:
185196 try:
186197 print '%s files left in the queue...' % tasks.qsize()
187198 task = tasks.get(block=False)
188 - task()
 199+ bots = task(bots)
189200 except Empty:
190201 break
191202 else:
192203 bot_launcher_multi(tasks)
193 -
 204+
194205 utils.store_object(bots, settings.binary_location, 'bots.bin')
195206 bot_training_dataset(bots)
196207 store_bots()
@@ -201,10 +212,10 @@
202213 print '%s' % key.encode(settings.encoding)
203214 except:
204215 pass
205 -
206 -
207216
208217
 218+
 219+
209220 def bot_training_dataset(bots):
210221 fh = utils.create_txt_filehandle(settings.csv_location, 'training_bots.csv', 'w', settings.encoding)
211222 keys = bots.keys()
@@ -213,10 +224,10 @@
214225 bot.hours_active()
215226 bot.avg_lag_between_edits()
216227 bot.write_training_dataset(fh)
217 -
 228+
218229 fh.close()
219 -
220 -
 230+
 231+
221232 def bot_launcher_multi(tasks):
222233 '''
223234 This is the launcher that uses multiprocesses.
@@ -230,9 +241,14 @@
231242
232243 tasks.join()
233244
 245+def debug_bots_dict():
 246+ bots = utils.load_object(settings.binary_location, 'bots.bin')
 247+ print 'done'
234248
 249+
235250 if __name__ == '__main__':
236251 language_code = 'en'
237252 project = 'wiki'
238 - #bot_launcher(language_code, project, single=True)
239 - cProfile.run(bot_launcher(language_code, project, single=True), 'profile')
 253+ #debug_bots_dict()
 254+ bot_launcher(language_code, project, single=True)
 255+ #cProfile.run(bot_launcher(language_code, project, single=True), 'profile')

Status & tagging log