r87102 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r87101‎ | r87102 | r87103 >
Date:21:55, 28 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Fixed memory leak in querying part with thanks to Giovanni.
Modified paths:
  • /trunk/tools/editor_trends/analyses/analyzer.py (modified) (history)
  • /trunk/tools/editor_trends/classes/analytics.py (modified) (history)
  • /trunk/tools/editor_trends/classes/dataset.py (modified) (history)
  • /trunk/tools/editor_trends/classes/storage.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/analyses/analyzer.py
@@ -17,13 +17,14 @@
1818 _date__ = '2010-12-10'
1919 __version__ = '0.1'
2020
21 -from multiprocessing import JoinableQueue, Manager, RLock, Process
 21+from multiprocessing import JoinableQueue, Queue, Manager, RLock, Process
2222 from multiprocessing.managers import BaseManager
2323 from Queue import Empty
2424
2525 import types
2626 import sys
2727 import cPickle
 28+import gc
2829 import os
2930 import progressbar
3031 import datetime
@@ -61,14 +62,6 @@
6263 return var
6364
6465
65 -def retrieve_plugin(func):
66 - functions = inventory.available_analyses()
67 - try:
68 - return functions[func]
69 - except KeyError:
70 - return False
71 -
72 -
7366 def feedback(plugin, rts):
7467 print 'Exporting data for chart: %s' % plugin
7568 print 'Project: %s' % rts.dbname
@@ -86,16 +79,26 @@
8780 #log.log_to_mongo(rts, 'chart', 'storing', stopwatch, event='finish')
8881
8982
 83+def retrieve_plugin(func):
 84+ functions = inventory.available_analyses()
 85+ try:
 86+ return functions[func]
 87+ except KeyError:
 88+ return False
 89+
 90+
9091 def generate_chart_data(rts, func, **kwargs):
9192 '''
9293 This is the entry function to be called to generate data for creating
9394 charts.
9495 '''
 96+
9597 stopwatch = timer.Timer()
9698 plugin = retrieve_plugin(func)
97 - available_plugins = inventory.available_analyses()
 99+
98100 if not plugin:
99 - raise exceptions.UnknownPluginError(plugin, available_plugins)
 101+ raise exceptions.UnknownPluginError(plugin, self.available_plugins)
 102+ plugin = getattr(plugin, func)
100103
101104 feedback(func, rts)
102105
@@ -109,7 +112,7 @@
110113
111114 db = storage.init_database(rts.storage, rts.dbname, rts.editors_dataset)
112115 editors = db.retrieve_distinct_keys('editor')
113 - editors = editors[:500]
 116+ #editors = editors[:500]
114117 min_year, max_year = determine_project_year_range(db, 'new_wikipedian')
115118
116119 fmt = kwargs.pop('format', 'long')
@@ -130,35 +133,37 @@
131134 finally:
132135 print 'Finished preloading data.'
133136
134 - plugin = getattr(plugin, func)
135 - for editor in editors:
136 - tasks.put(analytics.Task(plugin, editor))
137137
138 - analyzers = [analytics.Analyzer(rts, tasks, result, var, data) for
 138+ for editor_id in editors:
 139+ #tasks.put({'plugin':func, 'editor_id':editor_id})
 140+ tasks.put(editor_id)
 141+ n = len(editors)
 142+ del editors
 143+
 144+ analyzers = [analytics.Analyzer(rts, tasks, result, var, data, plugin, func) for
139145 x in xrange(rts.number_of_processes)]
140146
141147
142148 for x in xrange(rts.number_of_processes):
143149 tasks.put(None)
144150
145 - pbar = progressbar.ProgressBar(maxval=len(editors)).start()
 151+ pbar = progressbar.ProgressBar(maxval=n).start()
146152 for analyzer in analyzers:
147153 analyzer.start()
148154
149 - editors = None
 155+
150156 ppills = rts.number_of_processes
151 - while True:
152 - while ppills > 0:
153 - try:
154 - res = result.get(block=True)
155 - if res == True:
156 - pbar.update(pbar.currval + 1)
157 - else:
158 - ppills -= 1
159 - var = res
160 - except Empty:
161 - pass
162 - break
 157+ while ppills > 0:
 158+ try:
 159+ res = result.get()
 160+ if res == True:
 161+ pbar.update(pbar.currval + 1)
 162+ else:
 163+ ppills -= 1
 164+ var = res
 165+ print 'ppills: %s' % ppills
 166+ except Empty:
 167+ pass
163168
164169 tasks.join()
165170
Index: trunk/tools/editor_trends/classes/storage.py
@@ -186,13 +186,16 @@
187187 TODO: figure out how big the index is and then take appropriate action,
188188 index < 4mb just do a distinct query, index > 4mb do a map reduce.
189189 '''
 190+ print 'Check if distinct keys have previously been saved...'
190191 if force_new == False and \
191192 file_utils.check_file_exists(settings.binary_location, '%s_%s_%s.bin'
192193 % (self.dbname, self.collection, key)):
 194+ print 'Loading distinct keys from previous session...'
193195 ids = file_utils.load_object(settings.binary_location, '%s_%s_%s.bin'
194196 % (self.dbname, self.collection, key))
195197 else:
196198 #TODO this is a bit arbitrary, should check if index > 4Mb.
 199+ print 'Determining distinct keys...'
197200 if self.db[self.collection].count() < 200000:
198201 ids = self.db[self.collection].distinct(key)
199202 else:
Index: trunk/tools/editor_trends/classes/dataset.py
@@ -17,6 +17,8 @@
1818 __date__ = '2011-01-14'
1919 __version__ = '0.1'
2020
 21+import gc
 22+import random
2123 import calendar
2224 import datetime
2325 import time
@@ -171,7 +173,7 @@
172174 def __init__(self, date, time_unit, id, meta):
173175 assert isinstance(date, datetime.datetime), '''Date variable should be
174176 a datetime.datetime instance.'''
175 - self.date = date
 177+ #self.date = date
176178 self.data = 0
177179 self.time_unit = time_unit
178180 self.t1, self.t0 = self.set_date_range(date)
@@ -278,7 +280,11 @@
279281
280282 def get_observation(self, key, date, meta):
281283 '''Get a single observation based on a date key and possibly meta data'''
282 - return self.obs.get(key, Observation(date, self.time_unit, key, meta))
 284+ if key in self.obs:
 285+ return self.obs.get(key)
 286+ else:
 287+ obs = Observation(date, self.time_unit, key, meta)
 288+ return obs
283289
284290 def add(self, date, value, meta={}):
285291 '''
@@ -509,7 +515,7 @@
510516 variable.max = get_max(data)
511517 variable.num_obs = variable.number_of_obs()
512518 variable.num_dates = len(variable)
513 - variable.first_obs, variable.last_obs = variable.get_date_range()
 519+ #variable.first_obs, variable.last_obs = variable.get_date_range()
514520
515521 def summary(self):
516522 '''
@@ -533,6 +539,7 @@
534540 print self
535541 print self.details()
536542
 543+
537544 def get_standard_deviation(number_list):
538545 '''Given a list of numbers, calculate the standard deviation of the list'''
539546 mean = get_mean(number_list)
@@ -584,8 +591,20 @@
585592 db = storage.init_database('mongo', 'wikilytics', 'enwiki_charts')
586593 #db.add_son_manipulator(Transform())
587594
588 - d1 = datetime.datetime.today()
589 - d2 = datetime.datetime(2007, 6, 7)
 595+ lock = RLock()
 596+ v = Variable('test', 'year', lock, {})
 597+
 598+ for x in xrange(100000):
 599+ year = random.randrange(2005, 2010)
 600+ month = random.randrange(1, 12)
 601+ day = random.randrange(1, 28)
 602+ d = datetime.datetime(year, month, day)
 603+ x = random.randrange(1, 10000)
 604+ v.add(d, x, {'username': 'diederik'})
 605+ gc.collect()
 606+
 607+# d1 = datetime.datetime.today()
 608+# d2 = datetime.datetime(2007, 6, 7)
590609 # ds = Dataset('histogram', rts, [{'name': 'count', 'time_unit': 'year'},
591610 # #{'name': 'testest', 'time_unit': 'year'}
592611 # ])
@@ -599,18 +618,16 @@
600619 #
601620 # ds.encode()
602621 #name, time_unit, lock, **kwargs
603 - lock = RLock()
604 - v = Variable('test', 'year', lock, {})
605 - v.add(d1, 10, {'exp': 3, 'test': 10})
606 - v.add(d1, 135, {'exp': 3, 'test': 10})
607 - v.add(d2, 1, {'exp': 4, 'test': 10})
608 - v.add(d2, 1, {'exp': 4, 'test': 10})
609 - v.add(d2 , 1, {'exp': 3, 'test': 8})
610 - v.add(d2 , 1, {'exp': 2, 'test': 10})
611 - v.add(d2 , 1, {'exp': 4, 'test': 11})
612 - v.add(d2 , 1, {'exp': 8, 'test': 13})
613 - v.add(d2 , 1, {'exp': 9, 'test': 12})
614622
 623+
 624+# v.add(d, 135, {'exp': 3, 'test': 10})
 625+# v.add(d, 1, {'exp': 4, 'test': 10})
 626+# v.add(d, 1, {'exp': 4, 'test': 10})
 627+# v.add(d , 1, {'exp': 3, 'test': 8})
 628+# v.add(d , 1, {'exp': 2, 'test': 10})
 629+# v.add(d , 1, {'exp': 4, 'test': 11})
 630+# v.add(d , 1, {'exp': 8, 'test': 13})
 631+# v.add(d , 1, {'exp': 9, 'test': 12})
615632 #mem = get_refcounts()
616633
617634 # v.add(d2 + timedelta(days=400), 1, {'exp': 4, 'test': 10})
@@ -624,4 +641,5 @@
625642
626643 # mongo.test.insert({'variables': ds})
627644 if __name__ == '__main__':
628 - cProfile.run('debug()')
 645+ #cProfile.run('debug()')
 646+ debug()
Index: trunk/tools/editor_trends/classes/analytics.py
@@ -18,13 +18,17 @@
1919 __version__ = '0.1'
2020
2121 import sys
 22+import gc
2223 from Queue import Empty
 24+from multiprocessing import Process
2325
2426 if '..' not in sys.path:
2527 sys.path.append('..')
2628
2729 from classes import consumers
2830 from classes import storage
 31+from classes import exceptions
 32+from analyses import inventory
2933
3034 class Replicator:
3135 '''
@@ -76,10 +80,11 @@
7781
7882
7983 class Analyzer(consumers.BaseConsumer):
80 - def __init__(self, rts, tasks, result, var, data):
 84+ def __init__(self, rts, tasks, result, var, data, plugin_module, plugin_name):
8185 super(Analyzer, self).__init__(rts, tasks, result)
8286 self.var = var
8387 self.data = data
 88+ self.plugin = getattr(plugin_module, plugin_name)
8489
8590 def run(self):
8691 '''
@@ -87,21 +92,15 @@
8893 project and then calls the plugin that does the actual mapping.
8994 '''
9095 db = storage.init_database(self.rts.storage, self.rts.dbname, self.rts.editors_dataset)
 96+ x = 0
9197 while True:
9298 try:
93 - task = self.tasks.get(block=False)
94 - self.tasks.task_done()
95 - if task == None:
 99+ editor_id = self.tasks.get(block=False)
 100+ if editor_id == None:
96101 self.result.put(self.var)
97102 break
98 - editor = db.find_one('editor', task.editor)
99 -
100 - task.plugin(self.var, editor, dbname=self.rts.dbname, data=self.data)
 103+ editor = db.find_one('editor', editor_id)
 104+ self.plugin(self.var, editor, dbname=self.rts.dbname, data=self.data)
101105 self.result.put(True)
102106 except Empty:
103107 pass
104 -
105 -class Task:
106 - def __init__(self, plugin, editor):
107 - self.plugin = plugin
108 - self.editor = editor