r86538 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r86537‎ | r86538 | r86539 >
Date:19:55, 20 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Use multiprocessing for the transform phase as well.
Modified paths:
  • /trunk/tools/editor_trends/etl/transformer.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -24,6 +24,7 @@
2525 import datetime
2626 import ConfigParser
2727 from argparse import ArgumentParser, RawTextHelpFormatter
 28+import cProfile
2829
2930 from classes import languages
3031 from classes import projects
@@ -32,7 +33,6 @@
3334 from utils import ordered_dict
3435 from utils import log
3536 from utils import timer
36 -from classes import storage
3737 from etl import downloader
3838 from etl import extracter
3939 from etl import store
@@ -337,7 +337,7 @@
338338 stopwatch = timer.Timer()
339339 log.to_db(rts, 'dataset', 'transform', stopwatch, event='start')
340340 log.to_csv(logger, rts, 'Start', 'Transform', transformer_launcher)
341 - transformer.transform_editors_single_launcher(rts)
 341+ transformer.transform_editors_multi_launcher(rts)
342342 stopwatch.elapsed()
343343 log.to_db(rts, 'dataset', 'transform', stopwatch, event='finish')
344344 log.to_csv(logger, rts, 'Finish', 'Transform', transformer_launcher)
@@ -363,34 +363,6 @@
364364 log.to_csv(logger, rts, 'Finish', 'Dataset', dataset_launcher)
365365
366366
367 -def cleanup(rts, logger):
368 - '''
369 - This function deletes all files of a previous Wikilytics run.
370 - '''
371 - directories = rts.directories[1:]
372 -
373 - #remove directories
374 - for directory in directories:
375 - file_utils.delete_file(directory, '', directory=True)
376 - log.to_csv(logger, rts,
377 - message='Deleting %s' % directory,
378 - verb='Deleting',
379 - function=cleanup)
380 -
381 - #create directories
382 - rts.verify_environment(directories)
383 - log.to_csv(logger, rts, message='Deleting %s' % directory,
384 - verb='Creating', function=rts.verify_environment)
385 -
386 - #remove binary files
387 - filename = '%s%s' % (rts.full_project, '_editor.bin')
388 - file_utils.delete_file(rts.binary_location, filename)
389 - log.to_csv(logger, rts, message='Deleting %s' % filename,
390 - verb='Deleting',
391 - function=file_utils.delete_file)
392 -
393 -
394 -
395367 def all_launcher(rts, logger):
396368 '''
397369 The entire data processing chain has been called, this will take a
@@ -448,4 +420,5 @@
449421
450422
451423 if __name__ == '__main__':
452 - main()
 424+ #main()
 425+ cProfile.run('main()')
Index: trunk/tools/editor_trends/etl/transformer.py
@@ -34,8 +34,8 @@
3535 A simple class takes care of fetching an editor from the queue and start
3636 processing its edits.
3737 '''
38 - def __init__(self):
39 - super(EditorConsumer, self).__init__()
 38+ def __init__(self, rts, tasks):
 39+ super(EditorConsumer, self).__init__(rts, tasks)
4040
4141 def run(self):
4242 while True:
@@ -294,7 +294,7 @@
295295 def transform_editors_multi_launcher(rts):
296296 tasks = multiprocessing.JoinableQueue()
297297 input_db, output_db, editors = setup_database(rts)
298 - transformers = [EditorConsumer(tasks, None) for i in xrange(rts.number_of_processes)]
 298+ transformers = [EditorConsumer(rts, tasks) for i in xrange(rts.number_of_processes)]
299299
300300 for editor in editors:
301301 tasks.put(Editor(rts.dbname, rts.editors_raw, editor))