Index: trunk/tools/editor_trends/analyses/analyzer.py |
— | — | @@ -17,13 +17,14 @@ |
18 | 18 | _date__ = '2010-12-10' |
19 | 19 | __version__ = '0.1' |
20 | 20 | |
21 | | -from multiprocessing import JoinableQueue, Manager, RLock, Process |
| 21 | +from multiprocessing import JoinableQueue, Queue, Manager, RLock, Process |
22 | 22 | from multiprocessing.managers import BaseManager |
23 | 23 | from Queue import Empty |
24 | 24 | |
25 | 25 | import types |
26 | 26 | import sys |
27 | 27 | import cPickle |
| 28 | +import gc |
28 | 29 | import os |
29 | 30 | import progressbar |
30 | 31 | import datetime |
— | — | @@ -61,14 +62,6 @@ |
62 | 63 | return var |
63 | 64 | |
64 | 65 | |
65 | | -def retrieve_plugin(func): |
66 | | - functions = inventory.available_analyses() |
67 | | - try: |
68 | | - return functions[func] |
69 | | - except KeyError: |
70 | | - return False |
71 | | - |
72 | | - |
73 | 66 | def feedback(plugin, rts): |
74 | 67 | print 'Exporting data for chart: %s' % plugin |
75 | 68 | print 'Project: %s' % rts.dbname |
— | — | @@ -86,16 +79,26 @@ |
87 | 80 | #log.log_to_mongo(rts, 'chart', 'storing', stopwatch, event='finish') |
88 | 81 | |
89 | 82 | |
| 83 | +def retrieve_plugin(func): |
| 84 | + functions = inventory.available_analyses() |
| 85 | + try: |
| 86 | + return functions[func] |
| 87 | + except KeyError: |
| 88 | + return False |
| 89 | + |
| 90 | + |
90 | 91 | def generate_chart_data(rts, func, **kwargs): |
91 | 92 | ''' |
92 | 93 | This is the entry function to be called to generate data for creating |
93 | 94 | charts. |
94 | 95 | ''' |
| 96 | + |
95 | 97 | stopwatch = timer.Timer() |
96 | 98 | plugin = retrieve_plugin(func) |
97 | | - available_plugins = inventory.available_analyses() |
| 99 | + |
98 | 100 | if not plugin: |
99 | | - raise exceptions.UnknownPluginError(plugin, available_plugins) |
| 101 | + raise exceptions.UnknownPluginError(plugin, self.available_plugins) |
| 102 | + plugin = getattr(plugin, func) |
100 | 103 | |
101 | 104 | feedback(func, rts) |
102 | 105 | |
— | — | @@ -109,7 +112,7 @@ |
110 | 113 | |
111 | 114 | db = storage.init_database(rts.storage, rts.dbname, rts.editors_dataset) |
112 | 115 | editors = db.retrieve_distinct_keys('editor') |
113 | | - editors = editors[:500] |
| 116 | + #editors = editors[:500] |
114 | 117 | min_year, max_year = determine_project_year_range(db, 'new_wikipedian') |
115 | 118 | |
116 | 119 | fmt = kwargs.pop('format', 'long') |
— | — | @@ -130,35 +133,37 @@ |
131 | 134 | finally: |
132 | 135 | print 'Finished preloading data.' |
133 | 136 | |
134 | | - plugin = getattr(plugin, func) |
135 | | - for editor in editors: |
136 | | - tasks.put(analytics.Task(plugin, editor)) |
137 | 137 | |
138 | | - analyzers = [analytics.Analyzer(rts, tasks, result, var, data) for |
| 138 | + for editor_id in editors: |
| 139 | + #tasks.put({'plugin':func, 'editor_id':editor_id}) |
| 140 | + tasks.put(editor_id) |
| 141 | + n = len(editors) |
| 142 | + del editors |
| 143 | + |
| 144 | + analyzers = [analytics.Analyzer(rts, tasks, result, var, data, plugin, func) for |
139 | 145 | x in xrange(rts.number_of_processes)] |
140 | 146 | |
141 | 147 | |
142 | 148 | for x in xrange(rts.number_of_processes): |
143 | 149 | tasks.put(None) |
144 | 150 | |
145 | | - pbar = progressbar.ProgressBar(maxval=len(editors)).start() |
| 151 | + pbar = progressbar.ProgressBar(maxval=n).start() |
146 | 152 | for analyzer in analyzers: |
147 | 153 | analyzer.start() |
148 | 154 | |
149 | | - editors = None |
| 155 | + |
150 | 156 | ppills = rts.number_of_processes |
151 | | - while True: |
152 | | - while ppills > 0: |
153 | | - try: |
154 | | - res = result.get(block=True) |
155 | | - if res == True: |
156 | | - pbar.update(pbar.currval + 1) |
157 | | - else: |
158 | | - ppills -= 1 |
159 | | - var = res |
160 | | - except Empty: |
161 | | - pass |
162 | | - break |
| 157 | + while ppills > 0: |
| 158 | + try: |
| 159 | + res = result.get() |
| 160 | + if res == True: |
| 161 | + pbar.update(pbar.currval + 1) |
| 162 | + else: |
| 163 | + ppills -= 1 |
| 164 | + var = res |
| 165 | + print 'ppills: %s' % ppills |
| 166 | + except Empty: |
| 167 | + pass |
163 | 168 | |
164 | 169 | tasks.join() |
165 | 170 | |
Index: trunk/tools/editor_trends/classes/storage.py |
— | — | @@ -186,13 +186,16 @@ |
187 | 187 | TODO: figure out how big the index is and then take appropriate action, |
188 | 188 | index < 4mb just do a distinct query, index > 4mb do a map reduce. |
189 | 189 | ''' |
| 190 | + print 'Check if distinct keys have previously been saved...' |
190 | 191 | if force_new == False and \ |
191 | 192 | file_utils.check_file_exists(settings.binary_location, '%s_%s_%s.bin' |
192 | 193 | % (self.dbname, self.collection, key)): |
| 194 | + print 'Loading distinct keys from previous session...' |
193 | 195 | ids = file_utils.load_object(settings.binary_location, '%s_%s_%s.bin' |
194 | 196 | % (self.dbname, self.collection, key)) |
195 | 197 | else: |
196 | 198 | #TODO this is a bit arbitrary, should check if index > 4Mb. |
| 199 | + print 'Determining distinct keys...' |
197 | 200 | if self.db[self.collection].count() < 200000: |
198 | 201 | ids = self.db[self.collection].distinct(key) |
199 | 202 | else: |
Index: trunk/tools/editor_trends/classes/dataset.py |
— | — | @@ -17,6 +17,8 @@ |
18 | 18 | __date__ = '2011-01-14' |
19 | 19 | __version__ = '0.1' |
20 | 20 | |
| 21 | +import gc |
| 22 | +import random |
21 | 23 | import calendar |
22 | 24 | import datetime |
23 | 25 | import time |
— | — | @@ -171,7 +173,7 @@ |
172 | 174 | def __init__(self, date, time_unit, id, meta): |
173 | 175 | assert isinstance(date, datetime.datetime), '''Date variable should be |
174 | 176 | a datetime.datetime instance.''' |
175 | | - self.date = date |
| 177 | + #self.date = date |
176 | 178 | self.data = 0 |
177 | 179 | self.time_unit = time_unit |
178 | 180 | self.t1, self.t0 = self.set_date_range(date) |
— | — | @@ -278,7 +280,11 @@ |
279 | 281 | |
280 | 282 | def get_observation(self, key, date, meta): |
281 | 283 | '''Get a single observation based on a date key and possibly meta data''' |
282 | | - return self.obs.get(key, Observation(date, self.time_unit, key, meta)) |
| 284 | + if key in self.obs: |
| 285 | + return self.obs.get(key) |
| 286 | + else: |
| 287 | + obs = Observation(date, self.time_unit, key, meta) |
| 288 | + return obs |
283 | 289 | |
284 | 290 | def add(self, date, value, meta={}): |
285 | 291 | ''' |
— | — | @@ -509,7 +515,7 @@ |
510 | 516 | variable.max = get_max(data) |
511 | 517 | variable.num_obs = variable.number_of_obs() |
512 | 518 | variable.num_dates = len(variable) |
513 | | - variable.first_obs, variable.last_obs = variable.get_date_range() |
| 519 | + #variable.first_obs, variable.last_obs = variable.get_date_range() |
514 | 520 | |
515 | 521 | def summary(self): |
516 | 522 | ''' |
— | — | @@ -533,6 +539,7 @@ |
534 | 540 | print self |
535 | 541 | print self.details() |
536 | 542 | |
| 543 | + |
537 | 544 | def get_standard_deviation(number_list): |
538 | 545 | '''Given a list of numbers, calculate the standard deviation of the list''' |
539 | 546 | mean = get_mean(number_list) |
— | — | @@ -584,8 +591,20 @@ |
585 | 592 | db = storage.init_database('mongo', 'wikilytics', 'enwiki_charts') |
586 | 593 | #db.add_son_manipulator(Transform()) |
587 | 594 | |
588 | | - d1 = datetime.datetime.today() |
589 | | - d2 = datetime.datetime(2007, 6, 7) |
| 595 | + lock = RLock() |
| 596 | + v = Variable('test', 'year', lock, {}) |
| 597 | + |
| 598 | + for x in xrange(100000): |
| 599 | + year = random.randrange(2005, 2010) |
| 600 | + month = random.randrange(1, 12) |
| 601 | + day = random.randrange(1, 28) |
| 602 | + d = datetime.datetime(year, month, day) |
| 603 | + x = random.randrange(1, 10000) |
| 604 | + v.add(d, x, {'username': 'diederik'}) |
| 605 | + gc.collect() |
| 606 | + |
| 607 | +# d1 = datetime.datetime.today() |
| 608 | +# d2 = datetime.datetime(2007, 6, 7) |
590 | 609 | # ds = Dataset('histogram', rts, [{'name': 'count', 'time_unit': 'year'}, |
591 | 610 | # #{'name': 'testest', 'time_unit': 'year'} |
592 | 611 | # ]) |
— | — | @@ -599,18 +618,16 @@ |
600 | 619 | # |
601 | 620 | # ds.encode() |
602 | 621 | #name, time_unit, lock, **kwargs |
603 | | - lock = RLock() |
604 | | - v = Variable('test', 'year', lock, {}) |
605 | | - v.add(d1, 10, {'exp': 3, 'test': 10}) |
606 | | - v.add(d1, 135, {'exp': 3, 'test': 10}) |
607 | | - v.add(d2, 1, {'exp': 4, 'test': 10}) |
608 | | - v.add(d2, 1, {'exp': 4, 'test': 10}) |
609 | | - v.add(d2 , 1, {'exp': 3, 'test': 8}) |
610 | | - v.add(d2 , 1, {'exp': 2, 'test': 10}) |
611 | | - v.add(d2 , 1, {'exp': 4, 'test': 11}) |
612 | | - v.add(d2 , 1, {'exp': 8, 'test': 13}) |
613 | | - v.add(d2 , 1, {'exp': 9, 'test': 12}) |
614 | 622 | |
| 623 | + |
| 624 | +# v.add(d, 135, {'exp': 3, 'test': 10}) |
| 625 | +# v.add(d, 1, {'exp': 4, 'test': 10}) |
| 626 | +# v.add(d, 1, {'exp': 4, 'test': 10}) |
| 627 | +# v.add(d , 1, {'exp': 3, 'test': 8}) |
| 628 | +# v.add(d , 1, {'exp': 2, 'test': 10}) |
| 629 | +# v.add(d , 1, {'exp': 4, 'test': 11}) |
| 630 | +# v.add(d , 1, {'exp': 8, 'test': 13}) |
| 631 | +# v.add(d , 1, {'exp': 9, 'test': 12}) |
615 | 632 | #mem = get_refcounts() |
616 | 633 | |
617 | 634 | # v.add(d2 + timedelta(days=400), 1, {'exp': 4, 'test': 10}) |
— | — | @@ -624,4 +641,5 @@ |
625 | 642 | |
626 | 643 | # mongo.test.insert({'variables': ds}) |
627 | 644 | if __name__ == '__main__': |
628 | | - cProfile.run('debug()') |
| 645 | + #cProfile.run('debug()') |
| 646 | + debug() |
Index: trunk/tools/editor_trends/classes/analytics.py |
— | — | @@ -18,13 +18,17 @@ |
19 | 19 | __version__ = '0.1' |
20 | 20 | |
21 | 21 | import sys |
| 22 | +import gc |
22 | 23 | from Queue import Empty |
| 24 | +from multiprocessing import Process |
23 | 25 | |
24 | 26 | if '..' not in sys.path: |
25 | 27 | sys.path.append('..') |
26 | 28 | |
27 | 29 | from classes import consumers |
28 | 30 | from classes import storage |
| 31 | +from classes import exceptions |
| 32 | +from analyses import inventory |
29 | 33 | |
30 | 34 | class Replicator: |
31 | 35 | ''' |
— | — | @@ -76,10 +80,11 @@ |
77 | 81 | |
78 | 82 | |
79 | 83 | class Analyzer(consumers.BaseConsumer): |
80 | | - def __init__(self, rts, tasks, result, var, data): |
| 84 | + def __init__(self, rts, tasks, result, var, data, plugin_module, plugin_name): |
81 | 85 | super(Analyzer, self).__init__(rts, tasks, result) |
82 | 86 | self.var = var |
83 | 87 | self.data = data |
| 88 | + self.plugin = getattr(plugin_module, plugin_name) |
84 | 89 | |
85 | 90 | def run(self): |
86 | 91 | ''' |
— | — | @@ -87,21 +92,15 @@ |
88 | 93 | project and then calls the plugin that does the actual mapping. |
89 | 94 | ''' |
90 | 95 | db = storage.init_database(self.rts.storage, self.rts.dbname, self.rts.editors_dataset) |
| 96 | + x = 0 |
91 | 97 | while True: |
92 | 98 | try: |
93 | | - task = self.tasks.get(block=False) |
94 | | - self.tasks.task_done() |
95 | | - if task == None: |
| 99 | + editor_id = self.tasks.get(block=False) |
| 100 | + if editor_id == None: |
96 | 101 | self.result.put(self.var) |
97 | 102 | break |
98 | | - editor = db.find_one('editor', task.editor) |
99 | | - |
100 | | - task.plugin(self.var, editor, dbname=self.rts.dbname, data=self.data) |
| 103 | + editor = db.find_one('editor', editor_id) |
| 104 | + self.plugin(self.var, editor, dbname=self.rts.dbname, data=self.data) |
101 | 105 | self.result.put(True) |
102 | 106 | except Empty: |
103 | 107 | pass |
104 | | - |
105 | | -class Task: |
106 | | - def __init__(self, plugin, editor): |
107 | | - self.plugin = plugin |
108 | | - self.editor = editor |