r85571 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r85570‎ | r85571 | r85572 >
Date:20:38, 6 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Major refactoring of code:
1) Abstracted all database code into separate database class that is agnostic to the used db
2) Simplified directory structure
3) Removed obsolete configuration options
Modified paths:
  • /trunk/tools/editor_trends/analyses/adhoc/bot_detector.py (added) (history)
  • /trunk/tools/editor_trends/analyses/adhoc/community_graph.py (modified) (history)
  • /trunk/tools/editor_trends/analyses/analyzer.py (modified) (history)
  • /trunk/tools/editor_trends/analyses/plugins/edit_patterns.py (modified) (history)
  • /trunk/tools/editor_trends/bots/detector.py (deleted) (history)
  • /trunk/tools/editor_trends/classes/analytics.py (modified) (history)
  • /trunk/tools/editor_trends/classes/dataset.py (modified) (history)
  • /trunk/tools/editor_trends/classes/exceptions.py (modified) (history)
  • /trunk/tools/editor_trends/classes/runtime_settings.py (modified) (history)
  • /trunk/tools/editor_trends/classes/settings.py (modified) (history)
  • /trunk/tools/editor_trends/classes/storage.py (added) (history)
  • /trunk/tools/editor_trends/code-snippets/cassandra.py (added) (history)
  • /trunk/tools/editor_trends/code-snippets/cohort_confidence_intervals.py (modified) (history)
  • /trunk/tools/editor_trends/code-snippets/exporter.py (modified) (history)
  • /trunk/tools/editor_trends/code-snippets/mongo.py (added) (history)
  • /trunk/tools/editor_trends/code-snippets/mongodb/store.py (modified) (history)
  • /trunk/tools/editor_trends/cronjobs.py (modified) (history)
  • /trunk/tools/editor_trends/database/cache.py (modified) (history)
  • /trunk/tools/editor_trends/database/cassandra.py (deleted) (history)
  • /trunk/tools/editor_trends/database/db.py (deleted) (history)
  • /trunk/tools/editor_trends/database/launcher.py (modified) (history)
  • /trunk/tools/editor_trends/etl/downloader.py (modified) (history)
  • /trunk/tools/editor_trends/etl/enricher.py (modified) (history)
  • /trunk/tools/editor_trends/etl/store.py (modified) (history)
  • /trunk/tools/editor_trends/etl/transformer.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/utils/compression.py (modified) (history)
  • /trunk/tools/editor_trends/utils/inventory.py (modified) (history)
  • /trunk/tools/editor_trends/utils/log.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/analyses/plugins/edit_patterns.py
@@ -37,11 +37,11 @@
3838 for year in years:
3939 #for year in xrange(new_wikipedian.year, new_wikipedian.year + 2):
4040 obs = [False for x in xrange(13)]
41 - months = edit[year].keys()
42 - for month in xrange(13):
 41+ months = edits[year].keys()
 42+ for month in xrange(1, 13):
4343 count = edits[year].get(month, {}).get('0', 0)
44 - date = datetime(int(year), int(month), 1)
4544 if count >= var.cutoff:
4645 obs[month] = True
 46+ date = datetime(int(year), int(month), 1)
4747 var.add(date, obs)
4848 return var
Index: trunk/tools/editor_trends/analyses/analyzer.py
@@ -36,7 +36,7 @@
3737 from classes import consumers
3838 from classes import exceptions
3939 from classes import analytics
40 -from database import db
 40+from classes import storage
4141 from utils import timer
4242 from utils import log
4343
@@ -104,10 +104,10 @@
105105 lock = mgr.RLock()
106106 obs_proxy = mgr.dict(obs)
107107
108 - editors = db.retrieve_distinct_keys(rts.dbname, rts.editors_dataset, 'editor')
109 - min_year, max_year = determine_project_year_range(rts.dbname,
110 - rts.editors_dataset,
111 - 'new_wikipedian')
 108+ db = storage.Database('mongo', rts.dbname, rts.editors_dataset)
 109+ editors = db.retrieve_distinct_keys('editor')
 110+ min_year, max_year = determine_project_year_range(db, 'new_wikipedian')
 111+
112112 fmt = kwargs.pop('format', 'long')
113113 time_unit = kwargs.pop('time_unit', 'year')
114114 kwargs['min_year'] = min_year
@@ -155,19 +155,17 @@
156156 write_output(ds, rts, stopwatch)
157157
158158 ds.summary()
159 - #return True
160159
161160
162 -def determine_project_year_range(dbname, collection, var):
 161+def determine_project_year_range(db, var):
163162 '''
164163 Determine the first and final year for the observed data
165164 '''
166 - print dbname, collection, var
167165 try:
168 - max_year = db.run_query(dbname, collection, var, 'max')
169 - max_year = max_year[var].year + 1
170 - min_year = db.run_query(dbname, collection, var, 'min')
171 - min_year = min_year[var].year
 166+ obs = db.find(var, 'max')
 167+ max_year = obs[var].year + 1
 168+ obs = db.find(var, 'min')
 169+ min_year = obs[var].year
172170 except KeyError:
173171 min_year = 2001
174172 max_year = datetime.datetime.today().year + 1
Index: trunk/tools/editor_trends/analyses/adhoc/bot_detector.py
@@ -0,0 +1,274 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__email__ = 'dvanliere at gmail dot com'
 18+__date__ = '2010-11-25'
 19+__version__ = '0.1'
 20+
 21+
 22+import os
 23+import cStringIO
 24+import multiprocessing
 25+import xml.etree.cElementTree as cElementTree
 26+import sys
 27+from Queue import Empty
 28+
 29+if '..' not in sys.path:
 30+ sys.path.append('..')
 31+
 32+from classes import settings
 33+settings = settings.Settings()
 34+
 35+
 36+from classes import storage
 37+from utils import file_utils
 38+from utils import messages
 39+
 40+from classes import consumers
 41+from classes import bots
 42+
 43+
 44+def read_bots_csv_file(location, filename, encoding, manager=False):
 45+ '''
 46+ Constructs a dictionary from Bots.csv
 47+ key is language
 48+ value is a list of bot names
 49+ '''
 50+ if manager:
 51+ bot_dict = manager.dict()
 52+ else:
 53+ bot_dict = dict()
 54+ for line in file_utils.read_data_from_csv(location, filename, encoding):
 55+ line = line.strip()
 56+ language, bots = line.split(',')
 57+ bots = bots.split('|')
 58+ for bot in bots:
 59+ if bot not in bot_dict:
 60+ b = botconsumers.Bot(bot)
 61+ b.id = None
 62+ else:
 63+ b = bot_dict[bot]
 64+ b.projects.append(language)
 65+ bot_dict[bot] = b
 66+ return bot_dict
 67+
 68+
 69+def retrieve_bots(language_code):
 70+ '''
 71+ Loader function to retrieve list of id's of known Wikipedia bots.
 72+ '''
 73+ ids = []
 74+ db = storage.Database('mongo', 'bots', 'ids')
 75+ cursor = db.find()
 76+ for bot in cursor:
 77+ if bot['verified'] == 'True' and language_code in bot['projects']:
 78+ ids.append(bot['name'])
 79+ return ids
 80+
 81+
 82+def store_bots():
 83+ '''
 84+ This file reads the results from the lookup_bot_userid function and stores
 85+ it in a MongoDB collection.
 86+ '''
 87+ keys = ['name', 'verified', 'projects']
 88+ bots = file_utils.create_dict_from_csv_file(settings.csv_location,
 89+ 'bots_ids.csv',
 90+ 'utf-8',
 91+ keys)
 92+ db = storage.Database('mongo', 'wikilytics', 'bots')
 93+ db.drop_collection()
 94+ for id in bots:
 95+ bot = bots[id]
 96+ data = dict([(k, bot[k]) for k in keys])
 97+ data['id'] = id
 98+ db.insert(data)
 99+
 100+ print 'Stored %s bots' % db.count()
 101+
 102+
 103+def convert_object_to_dict(obj, exclude=[]):
 104+ '''
 105+ @obj is an arbitray object where the properties need to be translated to
 106+ keys and values to ease writing to a csv file.
 107+ '''
 108+ d = {}
 109+ for kw in obj.__dict__.keys():
 110+ if kw not in exclude:
 111+ d[kw] = getattr(obj, kw)
 112+ return d
 113+
 114+
 115+def write_bot_list_to_csv(bots, keys):
 116+ fh = file_utils.create_txt_filehandle(settings.csv_location, 'bots_ids.csv',
 117+ 'w', 'utf-8')
 118+ bot_dict = convert_object_to_dict(bots, exclude=['time', 'written'])
 119+ for bot in bot_dict:
 120+ bot = bot_dict[bot]
 121+ file_utils.write_dict_to_csv(bot, fh, keys, write_key=False,
 122+ newline=True)
 123+ fh.close()
 124+
 125+
 126+def lookup_bot_userid(data, fh, bots, keys):
 127+ '''
 128+ This function is used to find the id's belonging to the different bots that
 129+ are patrolling the Wikipedia sites.
 130+ @xml_nodes is a list of xml elements that need to be parsed
 131+ @bots is a dictionary containing the names of the bots to lookup
 132+ '''
 133+ username = data[3]
 134+ if username in bots:
 135+ bot = bots.pop(username)
 136+ setattr(bot, 'id', data[0])
 137+ setattr(bot, 'verified', True)
 138+ bot = convert_object_to_dict(bot, exclude=['time'])
 139+ file_utils.write_dict_to_csv(bot, fh, keys, write_key=False, newline=True)
 140+ return bots
 141+
 142+
 143+def create_bot_validation_dataset(xml_nodes, fh, bots):
 144+ revisions = xml_nodes.findall('revision')
 145+ for revision in revisions:
 146+ contributor = revision.find('contributor')
 147+ #contributor = xml.retrieve_xml_node(revision, 'contributor')
 148+ username = contributor.find('username')
 149+ if username == None or username.text == None:
 150+ continue
 151+ else:
 152+ username = username.text.lower()
 153+
 154+ #print username.encode('utf-8')
 155+ if username.find('bot') > -1 or username.find('script') > -1:
 156+ bot = bots.get(username, botconsumers.Bot(username, verified=False))
 157+ bot.id = contributor.find('id').text
 158+ timestamp = revision.find('timestamp').text
 159+ if timestamp != None:
 160+ timestamp = file_utils.convert_timestamp_to_datetime_naive(timestamp, settings.timestamp_format)
 161+ bot.time[str(timestamp.year)].append(timestamp)
 162+ bots[username] = bot
 163+
 164+ return bots
 165+
 166+ #bot = bots.get('PseudoBot')
 167+ #bot.hours_active()
 168+ #bot.avg_lag_between_edits()
 169+
 170+
 171+def bot_launcher(language_code, project, target, action, single=False, manager=False):
 172+ '''
 173+ This function sets the stage to launch bot id detection and collecting data
 174+ to discover new bots.
 175+ '''
 176+ file_utils.delete_file(settings.csv_location, 'bots_ids.csv')
 177+ location = os.path.join(settings.input_location, language_code, project)
 178+ input_xml = os.path.join(location, 'chunks')
 179+ input_txt = os.path.join(location, 'txt')
 180+
 181+ tasks = multiprocessing.JoinableQueue()
 182+ mgr = multiprocessing.Manager()
 183+ keys = ['id', 'name', 'verified', 'projects']
 184+
 185+ if action == 'lookup':
 186+ output_file = 'bots_ids.csv'
 187+ files = file_utils.retrieve_file_list(input_txt, 'txt', mask=None)
 188+ input_queue = pc.load_queue(files, poison_pill=True)
 189+ bots = read_bots_csv_file(settings.csv_location, 'Bots.csv', 'utf-8', manager=manager)
 190+ for file in files:
 191+ tasks.put(consumers.TXTFile(file, input_txt, settings.csv_location, output_file, target, bots=bots, keys=keys))
 192+
 193+ else:
 194+ output_file = 'bots_predictionset.csv'
 195+ files = file_utils.retrieve_file_list(input_xml, 'xml', mask=None)
 196+ bots = {}
 197+ for file in files:
 198+ tasks.put(consumers.XMLFile(file, input_xml, settings.csv_location, output_file, target, bots=bots, keys=keys))
 199+
 200+ #lock = mgr.Lock()
 201+ if manager:
 202+ manager = mgr
 203+
 204+ tracker = {}
 205+ if single:
 206+ while True:
 207+ try:
 208+ print '%s files left in the queue...' % messages.show(tasks.qsize)
 209+ task = tasks.get(block=False)
 210+ bots = task(bots)
 211+ except Empty:
 212+ break
 213+ else:
 214+ bot_launcher_multi(tasks)
 215+
 216+ file_utils.store_object(bots, settings.binary_location, 'bots.bin')
 217+ if action == 'lookup':
 218+ store_bots()
 219+ if bots != {}:
 220+ print 'The script was unable to retrieve the user id\s for the following %s bots:\n' % len(bots)
 221+ keys = bots.keys()
 222+ for key in keys:
 223+ try:
 224+ print '%s' % key.encode('utf-8')
 225+ except:
 226+ pass
 227+ else:
 228+ bot_training_dataset(bots)
 229+ #write_bot_list_to_csv(bots, keys)
 230+
 231+
 232+def bot_training_dataset(bots):
 233+ fh = file_utils.create_txt_filehandle(settings.csv_location, 'training_bots.csv', 'w', 'utf-8')
 234+ keys = bots.keys()
 235+ for key in keys:
 236+ bot = bots.get(key)
 237+ bot.hours_active()
 238+ bot.avg_lag_between_edits()
 239+ bot.write_training_dataset(fh)
 240+
 241+ fh.close()
 242+
 243+
 244+def bot_launcher_multi(tasks):
 245+ '''
 246+ This is the launcher that uses multiprocesses.
 247+ '''
 248+ consumers = [consumers.XMLFileConsumer(tasks, None) for i in xrange(settings.number_of_processes)]
 249+ for x in xrange(settings.number_of_processes):
 250+ tasks.put(None)
 251+
 252+ for w in consumers:
 253+ w.start()
 254+
 255+ tasks.join()
 256+
 257+
 258+def debug_bots_dict():
 259+ bots = file_utils.load_object(settings.binary_location, 'bots.bin')
 260+ for bot in bots:
 261+ bot = bots[bot]
 262+ edits = sum([len(bot.time[t]) for t in bot.time])
 263+ print bot.name, bot.verified, edits
 264+ print 'done'
 265+ return bots
 266+
 267+
 268+if __name__ == '__main__':
 269+ language_code = 'en'
 270+ project = 'wiki'
 271+ #store_bots()
 272+ #bots = debug_bots_dict()
 273+ #write_bot_list_to_csv(bots)
 274+ #language_code, project, lookup_bot_userid, single = False, manager = False
 275+ bot_launcher(language_code, project, create_bot_validation_dataset, action='training', single=True, manager=False)
Property changes on: trunk/tools/editor_trends/analyses/adhoc/bot_detector.py
___________________________________________________________________
Added: svn:eol-style
1276 + native
Index: trunk/tools/editor_trends/analyses/adhoc/community_graph.py
@@ -23,7 +23,7 @@
2424
2525 from classes import settings
2626 settings = settings.Settings()
27 -from database import db
 27+from classes import storage
2828 from utils import file_utils
2929
3030 try:
@@ -42,8 +42,8 @@
4343
4444
4545 def create_edgelist(project, collection):
46 - ids = db.retrieve_distinct_keys(project, collection, 'editor')
47 - conn = db.init_mongo_db(project)
 46+ db = storage.Database('mongo', project, collection)
 47+ ids = db.retrieve_distinct_keys('editor')
4848 ids.sort()
4949 fh = file_utils.create_txt_filehandle(settings.dataset_location, '%s_edgelist.csv' % project, 'w', 'utf-8')
5050 for i in ids:
Index: trunk/tools/editor_trends/manage.py
@@ -32,7 +32,7 @@
3333 from utils import ordered_dict
3434 from utils import log
3535 from utils import timer
36 -from database import db
 36+from classes import storage
3737 from etl import downloader
3838 from etl import enricher
3939 from etl import store
@@ -62,6 +62,12 @@
6363 pjc = projects.ProjectContainer()
6464 rts = runtime_settings.RunTimeSettings(project, language)
6565
 66+ file_choices = {'meta-full': 'stub-meta-history.xml.gz',
 67+ 'meta-current': 'stub-meta-current.xml.gz',
 68+ 'history-full': 'pages-meta-history.xml.7z',
 69+ 'history-current': 'pages-meta-current.xml.bz2'
 70+ }
 71+
6672 #Init Argument Parser
6773 parser = ArgumentParser(prog='manage', formatter_class=RawTextHelpFormatter)
6874 subparsers = parser.add_subparsers(help='sub - command help')
@@ -131,7 +137,7 @@
132138
133139 #ALL
134140 parser_all = subparsers.add_parser('all',
135 - help='The all sub command runs the download, split, store and dataset \
 141+ help='The all sub command runs the download, extract, store and dataset \
136142 commands.\n\nWARNING: THIS COULD TAKE DAYS DEPENDING ON THE \
137143 CONFIGURATION OF YOUR MACHINE AND THE SIZE OF THE WIKIMEDIA DUMP FILE.')
138144 parser_all.set_defaults(func=all_launcher)
@@ -141,12 +147,6 @@
142148 executing all.',
143149 default=[])
144150
145 - parser_all.add_argument('-n', '--new',
146 - action='store_true',
147 - help='This will delete all previous output and starts from scratch. \
148 - Mostly useful for debugging purposes.',
149 - default=False)
150 -
151151 #DJANGO
152152 parser_django = subparsers.add_parser('django')
153153 parser_django.add_argument('-e', '--except',
@@ -179,23 +179,23 @@
180180 help='Name of MongoDB collection',
181181 default='editors_raw')
182182
183 - parser.add_argument('-o', '--location',
184 - action='store',
185 - help='Indicate where you want to store the downloaded file.',
186 - #default=settings.input_location)
187 - default=rts.input_location)
188183
189184 parser.add_argument('-ns', '--namespace',
190185 action='store',
191186 help='A list of namespaces to include for analysis.',
192187 default='0')
193188
 189+ parser.add_argument('-db', '--database',
 190+ action='store',
 191+ help='Specify the database that you want to use. Valid choices are mongo and cassandra.',
 192+ default='mongo')
 193+
194194 parser.add_argument('-f', '--file',
195195 action='store',
196 - choices=rts.file_choices,
 196+ choices=file_choices,
197197 help='Indicate which dump you want to download. Valid choices are:\n \
198 - %s' % ''.join([f + ',\n' for f in rts.file_choices]),
199 - default='stub-meta-history.xml.gz')
 198+ %s' % ''.join([f + ',\n' for f in file_choices]),
 199+ default=file_choices['meta-full'])
200200
201201 return project, language, parser
202202
@@ -247,7 +247,7 @@
248248 rts.input_location = config.get('file_locations', 'input_location')
249249 rts.output_location = config.get('file_locations', 'output_location')
250250
251 - log.log_to_csv(logger, rts, 'New configuration', 'Creating',
 251+ log.to_csv(logger, rts, 'New configuration', 'Creating',
252252 config_launcher,
253253 working_directory=working_directory,
254254 input_location=input_location,
@@ -262,10 +262,10 @@
263263 '''
264264 print 'Start downloading'
265265 stopwatch = timer.Timer()
266 - log.log_to_mongo(rts, 'dataset', 'download', stopwatch, event='start')
 266+ log.to_db(rts, 'dataset', 'download', stopwatch, event='start')
267267 downloader.launcher(rts, logger)
268268 stopwatch.elapsed()
269 - log.log_to_mongo(rts, 'dataset', 'download', stopwatch, event='finish')
 269+ log.to_db(rts, 'dataset', 'download', stopwatch, event='finish')
270270
271271
272272 def extract_launcher(rts, logger):
@@ -276,12 +276,12 @@
277277 '''
278278 print 'Extracting data from XML'
279279 stopwatch = timer.Timer()
280 - log.log_to_mongo(rts, 'dataset', 'extract', stopwatch, event='start')
281 - log.log_to_csv(logger, rts, 'Start', 'Extract', extract_launcher)
 280+ log.to_db(rts, 'dataset', 'extract', stopwatch, event='start')
 281+ log.to_csv(logger, rts, 'Start', 'Extract', extract_launcher)
282282 enricher.launcher(rts)
283283 stopwatch.elapsed()
284 - log.log_to_mongo(rts, 'dataset', 'extract', stopwatch, event='finish')
285 - log.log_to_csv(logger, rts, 'Finish', 'Extract', extract_launcher)
 284+ log.to_db(rts, 'dataset', 'extract', stopwatch, event='finish')
 285+ log.to_csv(logger, rts, 'Finish', 'Extract', extract_launcher)
286286
287287
288288 def sort_launcher(rts, logger):
@@ -291,12 +291,12 @@
292292 '''
293293 print 'Start sorting data'
294294 stopwatch = timer.Timer()
295 - log.log_to_mongo(rts, 'dataset', 'sort', stopwatch, event='start')
296 - log.log_to_csv(logger, rts, 'Start', 'Sort', sort_launcher)
 295+ log.to_db(rts, 'dataset', 'sort', stopwatch, event='start')
 296+ log.to_csv(logger, rts, 'Start', 'Sort', sort_launcher)
297297 sort.launcher(rts)
298298 stopwatch.elapsed()
299 - log.log_to_mongo(rts, 'dataset', 'sort', stopwatch, event='finish')
300 - log.log_to_csv(logger, rts, 'Finish', 'Sort', sort_launcher)
 299+ log.to_db(rts, 'dataset', 'sort', stopwatch, event='finish')
 300+ log.to_csv(logger, rts, 'Finish', 'Sort', sort_launcher)
301301
302302
303303 def store_launcher(rts, logger):
@@ -306,13 +306,12 @@
307307 '''
308308 print 'Start storing data in MongoDB'
309309 stopwatch = timer.Timer()
310 - log.log_to_mongo(rts, 'dataset', 'store', stopwatch, event='start')
311 - log.log_to_csv(logger, rts, 'Start', 'Store', store_launcher)
312 - db.cleanup_database(rts.dbname, logger)
 310+ log.to_db(rts, 'dataset', 'store', stopwatch, event='start')
 311+ log.to_csv(logger, rts, 'Start', 'Store', store_launcher)
313312 store.launcher(rts)
314313 stopwatch.elapsed()
315 - log.log_to_mongo(rts, 'dataset', 'store', stopwatch, event='finish')
316 - log.log_to_csv(logger, rts, 'Finish', 'Store', store_launcher)
 314+ log.to_db(rts, 'dataset', 'store', stopwatch, event='finish')
 315+ log.to_csv(logger, rts, 'Finish', 'Store', store_launcher)
317316
318317
319318 def transformer_launcher(rts, logger):
@@ -322,13 +321,12 @@
323322 '''
324323 print 'Start transforming dataset'
325324 stopwatch = timer.Timer()
326 - log.log_to_mongo(rts, 'dataset', 'transform', stopwatch, event='start')
327 - log.log_to_csv(logger, rts, 'Start', 'Transform', transformer_launcher)
328 - db.cleanup_database(rts.dbname, logger, 'dataset')
 325+ log.to_db(rts, 'dataset', 'transform', stopwatch, event='start')
 326+ log.to_csv(logger, rts, 'Start', 'Transform', transformer_launcher)
329327 transformer.transform_editors_single_launcher(rts)
330328 stopwatch.elapsed()
331 - log.log_to_mongo(rts, 'dataset', 'transform', stopwatch, event='finish')
332 - log.log_to_csv(logger, rts, 'Finish', 'Transform', transformer_launcher)
 329+ log.to_db(rts, 'dataset', 'transform', stopwatch, event='finish')
 330+ log.to_csv(logger, rts, 'Finish', 'Transform', transformer_launcher)
333331
334332
335333 def dataset_launcher(rts, logger):
@@ -338,17 +336,17 @@
339337 '''
340338 print 'Start generating dataset'
341339 stopwatch = timer.Timer()
342 - log.log_to_mongo(rts, 'dataset', 'export', stopwatch, event='start')
 340+ log.to_db(rts, 'dataset', 'export', stopwatch, event='start')
343341
344342 for chart in rts.charts:
345343 analyzer.generate_chart_data(rts, chart, **rts.keywords)
346 - log.log_to_csv(logger, rts, 'Start', 'Dataset', dataset_launcher,
 344+ log.to_csv(logger, rts, 'Start', 'Dataset', dataset_launcher,
347345 chart=chart,
348346 dbname=rts.dbname,
349347 collection=rts.editors_dataset)
350348 stopwatch.elapsed()
351 - log.log_to_mongo(rts, 'dataset', 'export', stopwatch, event='finish')
352 - log.log_to_csv(logger, rts, 'Finish', 'Dataset', dataset_launcher)
 349+ log.to_db(rts, 'dataset', 'export', stopwatch, event='finish')
 350+ log.to_csv(logger, rts, 'Finish', 'Dataset', dataset_launcher)
353351
354352
355353 def cleanup(rts, logger):
@@ -360,20 +358,20 @@
361359 #remove directories
362360 for directory in directories:
363361 file_utils.delete_file(directory, '', directory=True)
364 - log.log_to_csv(logger, rts,
 362+ log.to_csv(logger, rts,
365363 message='Deleting %s' % directory,
366364 verb='Deleting',
367365 function=cleanup)
368366
369367 #create directories
370368 rts.verify_environment(directories)
371 - log.log_to_csv(logger, rts, message='Deleting %s' % directory,
 369+ log.to_csv(logger, rts, message='Deleting %s' % directory,
372370 verb='Creating', function=rts.verify_environment)
373371
374372 #remove binary files
375373 filename = '%s%s' % (rts.full_project, '_editor.bin')
376374 file_utils.delete_file(rts.binary_location, filename)
377 - log.log_to_csv(logger, rts, message='Deleting %s' % filename,
 375+ log.to_csv(logger, rts, message='Deleting %s' % filename,
378376 verb='Deleting',
379377 function=file_utils.delete_file)
380378
@@ -386,13 +384,9 @@
387385 '''
388386
389387 stopwatch = timer.Timer()
390 - log.log_to_mongo(rts, 'dataset', 'all', stopwatch, event='start')
 388+ log.to_db(rts, 'dataset', 'all', stopwatch, event='start')
391389 print 'Start of building %s %s dataset.' % (rts.language.name, rts.project)
392390
393 - if rts.clean:
394 - print 'Removing previous datasets...'
395 - cleanup(rts, logger)
396 -
397391 functions = ordered_dict.OrderedDict(((downloader_launcher, 'download'),
398392 (extract_launcher, 'extract'),
399393 (sort_launcher, 'sort'),
@@ -408,23 +402,9 @@
409403 elif res == None:
410404 pass
411405 stopwatch.elapsed()
412 - log.log_to_mongo(rts, 'dataset', 'all', stopwatch, event='finish')
 406+ log.to_db(rts, 'dataset', 'all', stopwatch, event='finish')
413407
414408
415 -
416 -def about_statement():
417 - '''
418 - prints generic version information.
419 - '''
420 - print ''
421 - print 'Wikilytics is (c) 2010-2011 by the Wikimedia Foundation.'
422 - print 'Written by Diederik van Liere (dvanliere@gmail.com).'
423 - print '''This software comes with ABSOLUTELY NO WARRANTY. This is free
424 - software, and you are welcome to distribute it under certain conditions.'''
425 - print 'See the README.1ST file for more information.'
426 - print ''
427 -
428 -
429409 def main():
430410 '''
431411 This function initializes the command line parser.
@@ -449,10 +429,6 @@
450430 logger.debug('Chosen language: \t%s' % rts.language)
451431
452432 #start manager
453 - #detect_python_version(logger)
454 - about_statement()
455 - #config.create_configuration(settings, args)
456 -
457433 rts.show_settings()
458434 args.func(rts, logger)
459435
Index: trunk/tools/editor_trends/etl/store.py
@@ -25,7 +25,7 @@
2626 from utils import file_utils
2727 from utils import text_utils
2828 from database import cache
29 -from database import db
 29+from classes import storage
3030 from classes import consumers
3131
3232
@@ -38,10 +38,8 @@
3939 The treshold is currently more than 9 edits and is not yet configurable.
4040 '''
4141 def run(self):
42 - mongo = db.init_mongo_db(self.rts.dbname)
43 - collection = mongo[self.rts.editors_raw]
44 -
45 - editor_cache = cache.EditorCache(collection)
 42+ db = storage.Database('mongo', self.rts.dbname, self.rts.editors_raw)
 43+ editor_cache = cache.EditorCache(db)
4644 prev_editor = -1
4745 while True:
4846 try:
@@ -102,17 +100,12 @@
103101 * category (if any)
104102 * article id
105103 '''
106 - mongo = db.init_mongo_db(rts.dbname)
107 - db.drop_collection(rts.dbname, rts.articles_raw)
108 - collection = mongo[rts.articles_raw]
109 - collection.create_index('id')
110 - collection.create_index('title')
111 - collection.create_index('ns')
112 - collection.create_index('category')
113 - collection.ensure_index('id')
114 - collection.ensure_index('title')
115 - collection.ensure_index('ns')
116 - collection.ensure_index('category')
 104+ db = storage.Database('mongo', rts.dbname, rts.articles_raw)
 105+ db.drop_collection()
 106+ db.add_index('id')
 107+ db.add_index('title')
 108+ db.add_index('ns')
 109+ db.add_index('category')
117110
118111 location = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt')
119112 fh = file_utils.create_txt_filehandle(location, 'titles.csv', 'r', 'utf-8')
@@ -128,7 +121,7 @@
129122 data[key] = value
130123 x += 2
131124 y += 2
132 - collection.insert(data)
 125+ db.insert(data)
133126 fh.close()
134127 print 'Done...'
135128
@@ -140,11 +133,9 @@
141134 '''
142135 store_articles(rts)
143136 print 'Input directory is: %s ' % rts.sorted
144 - db.drop_collection(rts.dbname, rts.editors_dataset)
145 - mongo = db.init_mongo_db(rts.dbname)
146 - coll = mongo[rts.editors_raw]
147 - coll.ensure_index('editor')
148 - coll.create_index('editor')
 137+ db = storage.Database('mongo', rts.dbname, rts.editors_raw)
 138+ db.drop_collection()
 139+ db.add_index('editor')
149140
150141 files = file_utils.retrieve_file_list(rts.sorted, 'csv')
151142 pbar = progressbar.ProgressBar(maxval=len(files)).start()
Index: trunk/tools/editor_trends/etl/downloader.py
@@ -117,7 +117,4 @@
118118 w.start()
119119
120120 tasks.join()
121 -# for consumer in consumers:
122 -# if consumer.exitcode != 0:
123 -# result = False
124121
Index: trunk/tools/editor_trends/etl/enricher.py
@@ -32,13 +32,7 @@
3333 if '..' not in sys.path:
3434 sys.path.append('..')
3535
36 -try:
37 - from database import cassandra
38 - import pycassa
39 -except ImportError:
40 - pass
41 -
42 -from database import db
 36+from classes import storage
4337 from bots import detector
4438 from utils import file_utils
4539
@@ -759,14 +753,14 @@
760754 def multiprocessor_launcher(function, dataset, storage, locks, rts):
761755 input_queue = JoinableQueue()
762756
763 - files = file_utils.retrieve_file_list(rts.location)
 757+ files = file_utils.retrieve_file_list(rts.input_location)
764758 if len(files) > cpu_count():
765759 processors = cpu_count() - 1
766760 else:
767761 processors = len(files)
768762
769763 for filename in files:
770 - filename = os.path.join(rts.location, filename)
 764+ filename = os.path.join(rts.input_location, filename)
771765 print filename
772766 input_queue.put(filename)
773767
@@ -818,7 +812,6 @@
819813 This is the generic entry point for regular Wikilytics usage.
820814 '''
821815 # launcher for creating regular mongo dataset
822 - path = rts.location
823816 function = create_variables
824817 storage = 'csv'
825818 dataset = 'training'
Index: trunk/tools/editor_trends/etl/transformer.py
@@ -17,28 +17,20 @@
1818 __date__ = '2010-11-02'
1919 __version__ = '0.1'
2020
21 -import progressbar
 21+import sys
 22+import datetime
2223 import multiprocessing
2324 from Queue import Empty
2425 from operator import itemgetter
25 -import datetime
26 -import sys
2726 from copy import deepcopy
2827
29 -from database import db
 28+import progressbar
 29+from classes import storage
3030 from utils import file_utils
3131 from utils import messages
3232 from utils import data_converter
3333 from classes import consumers
3434
35 -
36 -try:
37 - import psyco
38 - psyco.full()
39 -except ImportError:
40 - pass
41 -
42 -
4335 class EditorConsumer(consumers.BaseConsumer):
4436
4537 def run(self):
@@ -52,10 +44,10 @@
5345
5446
5547 class Editor(object):
56 - def __init__(self, id, input_db, output_db, **kwargs):
 48+ def __init__(self, id, db_raw, db_dataset, **kwargs):
5749 self.id = id
58 - self.input_db = input_db
59 - self.output_db = output_db
 50+ self.db_raw = db_raw
 51+ self.db_dataset = db_dataset
6052 for kw in kwargs:
6153 setattr(self, kw, kwargs[kw])
6254
@@ -64,7 +56,7 @@
6557
6658 def __call__(self):
6759 cutoff = 9
68 - editor = self.input_db.find_one({'editor': self.id})
 60+ editor = self.db_raw.find_one('editor', self.id)
6961 if editor == None:
7062 return
7163 edits = editor['edits']
@@ -75,7 +67,6 @@
7668 last_edit_by_year = determine_last_edit_by_year(edits, first_year, final_year)
7769 articles_edited = determine_articles_workedon(edits, first_year, final_year)
7870 article_count = determine_article_count(articles_edited, first_year, final_year)
79 - articles_edited = db.stringify_keys(articles_edited)
8071
8172 namespaces_edited = determine_namespaces_workedon(edits, first_year, final_year)
8273 character_count = determine_edit_volume(edits, first_year, final_year)
@@ -90,7 +81,6 @@
9182 totals = calculate_totals(totals, counts, revert_count, 'revert_count')
9283 totals = calculate_totals(totals, counts, article_count, 'article_count')
9384 totals = calculate_totals(totals, counts, edit_count, 'edit_count')
94 - totals = db.stringify_keys(totals)
9585
9686 if len(edits) > cutoff:
9787 new_wikipedian = edits[cutoff]['date']
@@ -100,24 +90,23 @@
10191 first_edit = edits[0]['date']
10292 final_edit = edits[-1]['date']
10393
104 - self.output_db.insert({'editor': self.id,
105 - 'username': username,
106 - 'new_wikipedian': new_wikipedian,
107 - 'cum_edit_count': cum_edit_count,
108 - 'final_edit': final_edit,
109 - 'first_edit': first_edit,
110 - 'last_edit_by_year': last_edit_by_year,
111 - 'articles_edited': articles_edited,
112 - 'edit_count': edit_count,
113 - 'namespaces_edited': namespaces_edited,
114 - 'article_count': article_count,
115 - 'character_count': character_count,
116 - 'revert_count': revert_count,
117 - 'totals': totals,
118 - },
119 - safe=True)
 94+ data = {'editor': self.id,
 95+ 'username': username,
 96+ 'new_wikipedian': new_wikipedian,
 97+ 'cum_edit_count': cum_edit_count,
 98+ 'final_edit': final_edit,
 99+ 'first_edit': first_edit,
 100+ 'last_edit_by_year': last_edit_by_year,
 101+ 'articles_edited': articles_edited,
 102+ 'edit_count': edit_count,
 103+ 'namespaces_edited': namespaces_edited,
 104+ 'article_count': article_count,
 105+ 'character_count': character_count,
 106+ 'revert_count': revert_count,
 107+ 'totals': totals,
 108+ }
 109+ self.db_dataset.insert(data)
120110
121 -
122111 def cleanup_datacontainer(dc, variable_type):
123112 '''
124113 valid variable_type are either a {}, a [] or 0.
@@ -159,7 +148,6 @@
160149 dc[year][month].setdefault(ns, 0)
161150 dc[year][month][ns] += 1
162151 dc = cleanup_datacontainer(dc, {})
163 - dc = db.stringify_keys(dc)
164152 return dc
165153
166154
@@ -192,7 +180,6 @@
193181 for month in dc[year]:
194182 dc[year][month] = list(dc[year][month])
195183 dc = cleanup_datacontainer(dc, [])
196 - dc = db.stringify_keys(dc)
197184 return dc
198185
199186
@@ -207,7 +194,6 @@
208195 dc[year][month].setdefault(ns, 0)
209196 dc[year][month][ns] += 1
210197 dc = cleanup_datacontainer(dc, {})
211 - dc = db.stringify_keys(dc)
212198 return dc
213199
214200
@@ -225,13 +211,11 @@
226212 dc[year][month].setdefault(ns, {})
227213 dc[year][month][ns].setdefault('added', 0)
228214 dc[year][month][ns].setdefault('removed', 0)
229 - print edit
230215 if edit['delta'] < 0:
231216 dc[year][month][ns]['removed'] += edit['delta']
232217 elif edit['delta'] > 0:
233218 dc[year][month][ns]['added'] += edit['delta']
234219 dc = cleanup_datacontainer(dc, {})
235 - dc = db.stringify_keys(dc)
236220 return dc
237221
238222
@@ -251,7 +235,6 @@
252236 dc[date] = edit
253237 elif dc[date] < edit:
254238 dc[date] = edit
255 - dc = db.stringify_keys(dc)
256239 return dc
257240
258241
@@ -266,7 +249,6 @@
267250 for month in articles_edited[year]:
268251 for ns in articles_edited[year][month]:
269252 dc[year][month][ns] = len(articles_edited[year][month][ns])
270 - dc = db.stringify_keys(dc)
271253 return dc
272254
273255
@@ -276,7 +258,6 @@
277259
278260
279261 def transform_editors_multi_launcher(rts):
280 - ids = db.retrieve_distinct_keys(rts.dbname, rts.editors_raw, 'editor')
281262 tasks = multiprocessing.JoinableQueue()
282263 consumers = [EditorConsumer(tasks, None) for i in xrange(rts.number_of_processes)]
283264
@@ -293,23 +274,19 @@
294275
295276
296277 def setup_database(rts):
297 - mongo = db.init_mongo_db(rts.dbname)
298 - input_db = mongo[rts.editors_raw]
299 - db.drop_collection(rts.dbname, rts.editors_dataset)
300 - output_db = mongo[rts.editors_dataset]
 278+ db_raw = storage.Database('mongo', rts.dbname, rts.editors_raw)
 279+ db_dataset = storage.Database('mongo', rts.dbname, rts.editors_dataset)
 280+ db_dataset.drop_collection()
 281+ ids = db_dataset.retrieve_distinct_keys('editor')
 282+ db_dataset.add_index('editor')
 283+ db_dataset.add_index('new_wikipedian')
301284
302 - output_db.ensure_index('editor')
303 - output_db.create_index('editor')
304 - output_db.ensure_index('new_wikipedian')
305 - output_db.create_index('new_wikipedian')
306 - return input_db, output_db
 285+ return db_raw, db_dataset, ids
307286
308287
309288 def transform_editors_single_launcher(rts):
310289 print rts.dbname, rts.editors_raw
311 - ids = db.retrieve_distinct_keys(rts.dbname, rts.editors_raw, 'editor')
312 - print len(ids)
313 - input_db, output_db = setup_database(rts)
 290+ input_db, output_db, ids = setup_database(rts)
314291 pbar = progressbar.ProgressBar(maxval=len(ids)).start()
315292 for x, id in enumerate(ids):
316293 editor = Editor(id, input_db, output_db)
Index: trunk/tools/editor_trends/classes/settings.py
@@ -97,13 +97,6 @@
9898 self.binary_location = os.path.join(self.working_directory,
9999 'data', 'objects')
100100
101 - self.chart_location = os.path.join(self.working_directory, 'statistics',
102 - 'charts')
103 - self.file_choices = ('stub-meta-history.xml.gz',
104 - 'stub-meta-current.xml.gz',
105 - 'pages-meta-history.xml.7z',
106 - 'pages-meta-current.xml.bz2',)
107 -
108101 def load_configuration(self):
109102 if os.path.exists(os.path.join(self.working_directory, 'wiki.cfg')):
110103 config = ConfigParser.RawConfigParser()
@@ -117,7 +110,6 @@
118111 else:
119112 return False
120113
121 -
122114 def determine_working_directory(self):
123115 cwd = os.getcwd()
124116 slashes = cwd.count(os.sep)
Index: trunk/tools/editor_trends/classes/exceptions.py
@@ -71,14 +71,12 @@
7272 return 'There is no JSON encoder called %s, please make sure that you \
7373 entered the right name' % self.func
7474
75 -class UnknownChartError(Error):
76 - def __init__(self, chart, charts):
77 - self.chart = chart
78 - self.charts = charts
 75+class NoDatabaseProviderInstalled(Error):
 76+ def __init__(self):
 77+ pass
7978
80 - def __str__(self):
81 - return 'Currently, chart type %s is not supported. Please choose one of \
82 - the following charts: %s' % (self.chart, self.charts)
 79+ def __str__(self):
 80+ return 'You need either to install Mongo or Cassandra to use Wikiltyics.'
8381
8482 class UnknownPluginError(Error):
8583 def __init__(self, plugin, plugins):
Index: trunk/tools/editor_trends/classes/runtime_settings.py
@@ -66,7 +66,6 @@
6767 self.function = self.get_value('func')
6868
6969 self.ignore = self.get_value('except')
70 - self.clean = self.get_value('new')
7170 self.force = self.get_value('force')
7271 self.location = self.get_project_location()
7372 self.filename = self.generate_wikidump_filename()
@@ -92,8 +91,8 @@
9392 self.verify_environment(self.directories)
9493
9594 def __str__(self):
96 - return 'Runtime Settings for project %s%s' % (self.language.name,
97 - self.project.name)
 95+ return 'Runtime Settings for project %s %s' % (self.language.name,
 96+ self.project.full_name)
9897
9998 def __iter__(self):
10099 for item in self.__dict__:
@@ -144,12 +143,8 @@
145144 '''
146145 Construct the full project location
147146 '''
148 - if self.kaggle:
149 - return os.path.join(self.input_location, self.language.code,
 147+ return os.path.join(self.output_location, self.language.code,
150148 self.project.name)
151 - else:
152 - return os.path.join(self.input_location, self.language.code,
153 - self.project.name)
154149
155150 def show_settings(self):
156151 '''
@@ -164,6 +159,15 @@
165160 about['Output directory'] = '%s and subdirectories' % self.location
166161
167162 max_length_key = max([len(key) for key in about.keys()])
 163+
 164+ print ''
 165+ print 'Wikilytics is (c) 2010-2011 by the Wikimedia Foundation.'
 166+ print 'Written by Diederik van Liere (dvanliere@gmail.com).'
 167+ print '''This software comes with ABSOLUTELY NO WARRANTY. This is free
 168+ software, and you are welcome to distribute it under certain conditions.'''
 169+ print 'See the README.1ST file for more information.'
 170+ print '\nPlatform: %s' % self.platform
 171+
168172 print 'Final settings after parsing command line arguments:'
169173 for ab in about:
170174 print '%s: %s' % (ab.rjust(max_length_key), about[ab].encode('utf-8'))
Index: trunk/tools/editor_trends/classes/storage.py
@@ -0,0 +1,243 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)'])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2011-04-05'
 19+__version__ = '0.1'
 20+
 21+import sys
 22+from abc import ABCMeta, abstractmethod
 23+if '..' not in sys.path:
 24+ sys.path.append('..')
 25+
 26+from classes import settings
 27+from classes import exceptions
 28+settings = settings.Settings()
 29+import file_utils
 30+
 31+error = 0
 32+try:
 33+ import pymongo
 34+ import bson
 35+ from bson.code import Code
 36+ from pymongo.errors import OperationFailure
 37+except ImportError:
 38+ error += 1
 39+try:
 40+ import pycassa
 41+except ImportError:
 42+ error += 1
 43+if error == 2:
 44+ raise exceptions.NoDatabaseProviderInstalled()
 45+
 46+
 47+class AbstractDatabase:
 48+ __metaclass__ = ABCMeta
 49+
 50+ def __init__(self, dbname, collection):
 51+ self.dbname = dbname
 52+ self.collection = collection
 53+ self.db = self.connect()
 54+
 55+ def __repr__(self):
 56+ return '%s:%s' % (self.dbname, self.collection)
 57+
 58+ @abstractmethod
 59+ def connect(self):
 60+ '''Initializes a connection to a specific collection in a database'''
 61+
 62+ @abstractmethod
 63+ def drop_collection(self):
 64+ '''Deletes a collection from a database'''
 65+
 66+ @abstractmethod
 67+ def add_index(self, key):
 68+ '''Add index to a collection'''
 69+
 70+ @abstractmethod
 71+ def insert(self, data):
 72+ '''Insert observation to a collection'''
 73+
 74+ @abstractmethod
 75+ def update(self, key, data):
 76+ '''Update an observation in a collection'''
 77+
 78+ @abstractmethod
 79+ def find(self, key, qualifier=None):
 80+ '''Find an observationi in a collection'''
 81+
 82+ @abstractmethod
 83+ def save(self, data):
 84+ '''Saves an observation and returns the id from the db'''
 85+
 86+ @abstractmethod
 87+ def count(self):
 88+ '''Counts the number of observations in a collection'''
 89+
 90+class Mongo(AbstractDatabase):
 91+ @classmethod
 92+ def is_registrar_for(cls, storage):
 93+ return storage == 'mongo'
 94+
 95+ def connect(self):
 96+ db = pymongo.Connection()
 97+ return db[self.dbname]
 98+
 99+ def save(self, data):
 100+ assert isinstance(data, dict), 'You need to feed me dictionaries.'
 101+ return self.db[self.collection].save(data)
 102+
 103+ def insert(self, data, qualifiers=None):
 104+ assert isinstance(data, dict), 'You need to feed me dictionaries.'
 105+ data = self.stringify_keys(data)
 106+ try:
 107+ if qualifiers:
 108+ self.db[self.collection].insert(data, qualifiers, safe=True)
 109+ else:
 110+ self.db[self.collection].insert(data, safe=True)
 111+ except bson.errors.InvalidDocument, error:
 112+ print error
 113+ print 'BSON document too large, unable to store %s' % \
 114+ (data.keys()[0])
 115+ except OperationFailure, error:
 116+ print 'It seems that you are running out of disk space. \
 117+ Error message: %s' % error
 118+ sys.exit(-1)
 119+
 120+ def update(self, key, value, data):
 121+ assert isinstance(data, dict), 'You need to feed me dictionaries.'
 122+ self.db[self.collection].update({key: value}, data, upsert=True)
 123+
 124+ def find(self, key=None, qualifier=None):
 125+ if qualifier == 'min':
 126+ return self.db[self.collection].find({
 127+ key : {'$ne' : False}}).sort(key, pymongo.ASCENDING).limit(1)[0]
 128+ elif qualifier == 'max':
 129+ return self.db[self.collection].find({
 130+ key : {'$ne' : False}}).sort(key, pymongo.DESCENDING).limit(1)[0]
 131+ elif key != None:
 132+ return self.db[self.collection].find({key: 1})
 133+ else:
 134+ return self.db[self.collection].find()
 135+
 136+ def find_one(self, key, value):
 137+ return self.db[self.collection].find_one({key: value})
 138+
 139+ def drop_collection(self):
 140+ self.db.drop_collection(self.collection)
 141+
 142+ def add_index(self, key):
 143+ self.db[self.collection].create_index(key)
 144+ self.db[self.collection].ensure_index(key)
 145+
 146+ def count(self):
 147+ return self.db[self.collection].count()
 148+
 149+ def retrieve_distinct_keys(self, key, force_new=False):
 150+ '''
 151+ TODO: figure out how big the index is and then take appropriate action,
 152+ index < 4mb just do a distinct query, index > 4mb do a map reduce.
 153+ '''
 154+ if force_new == False and \
 155+ file_utils.check_file_exists(settings.binary_location, '%s_%s_%s.bin'
 156+ % (self.dbname, self.collection, key)):
 157+ ids = file_utils.load_object(settings.binary_location, '%s_%s_%s.bin'
 158+ % (self.dbname, self.collection, key))
 159+ else:
 160+ #TODO this is a bit arbitrary, should check if index > 4Mb.
 161+ if self.db[self.collection].count() < 200000:
 162+ ids = self.db[self.collection].distinct(key)
 163+ else:
 164+ ids = self.retrieve_distinct_keys_mapreduce(key)
 165+ file_utils.store_object(ids, settings.binary_location, \
 166+ '%s_%s_%s.bin' % (self.dbname,
 167+ self.collection, key))
 168+ return ids
 169+
 170+ def retrieve_distinct_keys_mapreduce(self, key):
 171+ emit = 'function () { emit(this.%s, 1)};' % key
 172+ map = Code(emit)
 173+ reduce = Code("function()")
 174+
 175+ ids = []
 176+ cursor = self.db[self.collection].map_reduce(map, reduce)
 177+ for c in cursor.find():
 178+ ids.append(c['_id'])
 179+ return ids
 180+
 181+ def stringify_keys(self, data):
 182+ '''
 183+ @data should be a dictionary where the keys are not yet strings. This
 184+ function is called just prior any insert / update query in mongo
 185+ because mongo only accepts strings as keys.
 186+ '''
 187+ new_data = {}
 188+ for key, value in data.iteritems():
 189+ if isinstance(value, dict):
 190+ new_data[str(key)] = self.stringify_keys(value)
 191+ else:
 192+ new_data[str(key)] = value
 193+ return new_data
 194+
 195+ def start_server(self, port, path):
 196+ default_port = 27017
 197+ port = default_port + port
 198+ if settings.platform == 'Windows':
 199+ p = subprocess.Popen([path, '--port %s', port, '--dbpath',
 200+ 'c:\data\db', '--logpath', 'c:\mongodb\logs'])
 201+ elif settings.platform == 'Linux':
 202+ subprocess.Popen([path, '--port %s' % port])
 203+ elif settings.platform == 'OSX':
 204+ raise exceptions.NotImplementedError
 205+ else:
 206+ raise exceptions.PlatformNotSupportedError(platform)
 207+
 208+
 209+class Cassandra(AbstractDatabase):
 210+ @classmethod
 211+ def is_registrar_for(cls, storage):
 212+ return storage == 'cassandra'
 213+
 214+ def install_schema(self, drop_first=False):
 215+ sm = pycassa.system_manager.SystemManager('127.0.0.1:9160')
 216+ if drop_first:
 217+ sm.drop_keyspace(keyspace_name)
 218+
 219+ sm.create_keyspace(keyspace_name, replication_factor=1)
 220+
 221+ sm.create_column_family(self.dbname, self.collection,
 222+ comparator_type=pycassa.system_manager.UTF8_TYPE,
 223+ default_validation_class=pycassa.system_manager.UTF8_TYPE)
 224+
 225+ sm.create_index(self.dbname, self.collection, 'article', pycassa.system_manager.UTF8_TYPE)
 226+ sm.create_index(self.dbname, self.collection, 'username', pycassa.system_manager.UTF8_TYPE)
 227+ sm.create_index(self.dbname, self.collection, 'user_id', pycassa.system_manager.LONG_TYPE)
 228+
 229+
 230+
 231+def Database(storage, dbname, collection):
 232+ for cls in AbstractDatabase.__subclasses__():
 233+ if cls.is_registrar_for(storage):
 234+ return cls(dbname, collection)
 235+ raise ValueError
 236+
 237+
 238+if __name__ == '__main__':
 239+ db = Database('mongo', 'wikilytics', 'zhwiki_editors_raw')
 240+ ids = db.retrieve_distinct_keys('editor', force_new=True)
 241+ #db.insert({'foo':'bar'})
 242+ #db.update('foo', 'bar', {})
 243+ #db.drop_collection()
 244+ print db
Property changes on: trunk/tools/editor_trends/classes/storage.py
___________________________________________________________________
Added: svn:eol-style
1245 + native
Index: trunk/tools/editor_trends/classes/dataset.py
@@ -39,7 +39,7 @@
4040
4141 from utils import file_utils
4242 from utils import data_converter
43 -from database import db
 43+from classes import storage
4444 from analyses import json_encoders
4545 from classes import exceptions
4646
@@ -445,12 +445,11 @@
446446
447447 def to_mongo(self):
448448 dbname = '%s%s' % (self.language_code, self.project)
449 - mongo = db.init_mongo_db(dbname)
450 - coll = mongo['%s_%s' % (dbname, 'charts')]
451 - mongo.add_son_manipulator(Transform())
452 - coll.remove({'hash':self.hash, 'project':self.project,
 449+ db = storage.Database('mongo', dbname, 'charts')
 450+ db.add_son_manipulator(Transform())
 451+ db.remove({'hash':self.hash, 'project':self.project,
453452 'language_code':self.language_code})
454 - coll.insert({'variables': self})
 453+ db.insert({'variables': self})
455454
456455 def to_csv(self):
457456 data = data_converter.convert_dataset_to_lists(self, 'manage')
@@ -542,8 +541,7 @@
543542
544543
545544 def debug():
546 - mongo = db.init_mongo_db('enwiki')
547 - rawdata = mongo['enwiki_charts']
 545+ db = storage.Database('mongo', 'wikilytics', 'enwiki_charts')
548546 mongo.add_son_manipulator(Transform())
549547
550548 d1 = datetime.datetime.today()
Index: trunk/tools/editor_trends/classes/analytics.py
@@ -24,7 +24,7 @@
2525 sys.path.append('..')
2626
2727 from classes import consumers
28 -from database import db
 28+from classes import storage
2929
3030 class Replicator:
3131 def __init__(self, plugin, time_unit, cutoff=None, cum_cutoff=None, **kwargs):
@@ -56,13 +56,9 @@
5757
5858 def __call__(self):
5959 project = 'wiki'
60 -
6160 #rts = runtime_settings.init_environment('wiki', 'en', args)
62 -
6361 for lang in self.languages:
6462 self.rts = runtime_settings.init_environment(project, lang, self.args)
65 - #TEMP FIX, REMOVE
66 - #rts.dbname = 'enwiki'
6763 self.rts.editors_dataset = 'editors_dataset'
6864
6965 self.rts.dbname = '%s%s' % (lang, project)
@@ -70,7 +66,8 @@
7167 for cutoff in self.cutoff:
7268 generate_chart_data(self.rts, self.plugin,
7369 time_unit=self.time_unit,
74 - cutoff=cutoff, cum_cutoff=cum_cutoff,
 70+ cutoff=cutoff,
 71+ cum_cutoff=cum_cutoff,
7572 **self.kwargs)
7673
7774
@@ -84,8 +81,7 @@
8582 Generic loop function that loops over all the editors of a Wikipedia
8683 project and then calls the plugin that does the actual mapping.
8784 '''
88 - mongo = db.init_mongo_db(self.rts.dbname)
89 - coll = mongo[self.rts.editors_dataset]
 85+ db = storage.Database('mongo', self.rts.dbname, self.rts.editors_dataset)
9086 while True:
9187 try:
9288 task = self.tasks.get(block=False)
@@ -93,7 +89,7 @@
9490 if task == None:
9591 self.result.put(self.var)
9692 break
97 - editor = coll.find_one({'editor': task.editor})
 93+ editor = db.find_one('editor', task.editor)
9894
9995 task.plugin(self.var, editor, dbname=self.rts.dbname)
10096 self.result.put(True)
Index: trunk/tools/editor_trends/utils/inventory.py
@@ -27,7 +27,7 @@
2828 from classes import settings
2929 settings = settings.Settings()
3030
31 -from database import db
 31+from classes import storage
3232 from utils import http_utils
3333 from classes import runtime_settings
3434 from classes import languages
@@ -65,11 +65,9 @@
6666
6767
6868 def store_available_dumps(self):
69 - mongo = db.init_mongo_db('wikilytics')
70 - coll = mongo['available_dumps']
 69+ db = storage.Database('mongo', 'wikilytics', 'available_dumps')
 70+ db.save({'project': self.project, 'dumps': self.data})
7171
72 - coll.save({'project': self.project, 'dumps': self.data})
73 -
7472 def run(self):
7573 project = self.props.projects[self.project]
7674 langs = self.props.project_supports_language(project)
Index: trunk/tools/editor_trends/utils/compression.py
@@ -136,11 +136,11 @@
137137 '''
138138 print 'Unzipping zip file'
139139 stopwatch = timer.Timer()
140 - log.log_to_mongo(properties, 'dataset', 'unpack', stopwatch, event='start')
 140+ log.to_db(properties, 'dataset', 'unpack', stopwatch, event='start')
141141 compressor = Compressor(location, filename)
142142 retcode = compressor.extract()
143143 stopwatch.elapsed()
144 - log.log_to_mongo(properties, 'dataset', 'unpack', stopwatch, event='finish')
 144+ log.to_db(properties, 'dataset', 'unpack', stopwatch, event='finish')
145145 return retcode
146146
147147
Index: trunk/tools/editor_trends/utils/log.py
@@ -26,35 +26,33 @@
2727 from classes import settings
2828 settings = settings.Settings()
2929
30 -from database import db
 30+from classes import storage
3131
32 -def log_to_mongo(rts, jobtype, task, timer, event='start'):
33 - conn = db.init_mongo_db(rts.dbname)
 32+def to_db(rts, jobtype, task, timer, event='start'):
 33+ db = storage.Database('mongo', rts.dbname, 'jobs')
3434 created = datetime.datetime.now()
3535 hash = '%s_%s' % (rts.project, rts.hash)
36 - coll = conn['jobs']
3736
38 - job = coll.find_one({'hash': hash})
 37+ job = db.find_one('hash', hash)
3938
 39+ data = {'hash': hash,
 40+ 'created': created,
 41+ 'jobtype': jobtype,
 42+ 'in_progress': True,
 43+ 'language_code': rts.language.code,
 44+ 'project': rts.project.name,
 45+ 'tasks': {},
 46+ }
 47+
4048 if job == None:
4149 if jobtype == 'dataset':
42 - _id = coll.save({'hash': hash, 'created': created, 'finished': False,
43 - 'language_code': rts.language.code,
44 - 'project': rts.project.name,
45 - 'in_progress': True, 'jobtype': jobtype,
46 - 'tasks': {}})
47 -
48 -
 50+ data['finished'] = False
 51+ _id = db.save(data)
4952 elif jobtype == 'chart':
50 - _id = coll.save({'hash': hash, 'created': created,
51 - 'jobtype': jobtype,
52 - 'finished': True,
53 - 'in_progress': True,
54 - 'project': rts.project.name,
55 - 'language_code': rts.language.code,
56 - 'tasks': {}})
 53+ data['finished'] = True
 54+ _id = db.save(data)
5755
58 - job = coll.find_one({'_id': _id})
 56+ job = db.find_one('_id', _id)
5957
6058 tasks = job['tasks']
6159 t = tasks.get(task, {})
@@ -62,20 +60,22 @@
6361 t['start'] = timer.t0
6462 t['in_progress'] = True
6563 tasks[task] = t
66 - coll.update({'hash': hash}, {'$set': {'tasks': tasks}})
 64+ db.update('hash', hash, {'$set': {'tasks': tasks}})
 65+ #coll.update({'hash': hash}, {'$set': {'tasks': tasks}})
6766 elif event == 'finish':
6867 t['finish'] = timer.t1
6968 t['in_progress'] = False
7069 tasks[task] = t
71 - if task == 'transform' or jobtype == 'chart': #final task, set entire task to finished
72 - coll.update({'hash': hash}, {'$set': {'tasks': tasks,
73 - 'in_progress': False,
74 - 'finished': True}})
 70+ if task == 'transform' or jobtype == 'chart':
 71+ #final task, set entire task to finished
 72+ db.update('hash', hash, {'$set': {'tasks': tasks,
 73+ 'in_progress': False,
 74+ 'finished': True}})
7575 else:
76 - coll.update({'hash': hash}, {'$set': {'tasks': tasks}})
 76+ db.update('hash', hash, {'$set': {'tasks': tasks}})
7777
7878
79 -def log_to_csv(logger, settings, message, verb, function, **kwargs):
 79+def to_csv(logger, settings, message, verb, function, **kwargs):
8080 '''
8181 Writes detailed log information to logs / projectname_date.csv
8282 '''
Index: trunk/tools/editor_trends/database/cassandra.py
@@ -1,36 +0,0 @@
2 -#!/usr/bin/python
3 -# -*- coding: utf-8 -*-
4 -'''
5 -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
6 -This program is free software; you can redistribute it and/or
7 -modify it under the terms of the GNU General Public License version 2
8 -as published by the Free Software Foundation.
9 -This program is distributed in the hope that it will be useful,
10 -but WITHOUT ANY WARRANTY; without even the implied warranty of
11 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 -See the GNU General Public License for more details, at
13 -http://www.fsf.org/licenses/gpl.html
14 -'''
15 -
16 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__author__email = 'dvanliere at gmail dot com'
18 -__date__ = '2011-02-25'
19 -__version__ = '0.1'
20 -
21 -import pycassa
22 -
23 -def install_schema(keyspace_name, drop_first=False):
24 -
25 - sm = pycassa.system_manager.SystemManager('127.0.0.1:9160')
26 - if drop_first:
27 - sm.drop_keyspace(keyspace_name)
28 -
29 - sm.create_keyspace(keyspace_name, replication_factor=1)
30 -
31 - sm.create_column_family(keyspace_name, 'revisions',
32 - comparator_type=pycassa.system_manager.UTF8_TYPE,
33 - default_validation_class=pycassa.system_manager.UTF8_TYPE)
34 -
35 - sm.create_index(keyspace_name, 'revisions', 'article', pycassa.system_manager.UTF8_TYPE)
36 - sm.create_index(keyspace_name, 'revisions', 'username', pycassa.system_manager.UTF8_TYPE)
37 - sm.create_index(keyspace_name, 'revisions', 'user_id', pycassa.system_manager.LONG_TYPE)
Index: trunk/tools/editor_trends/database/db.py
@@ -1,168 +0,0 @@
2 -#!/usr/bin/python
3 -# -*- coding: utf-8 -*-
4 -'''
5 -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
6 -This program is free software; you can redistribute it and/or
7 -modify it under the terms of the GNU General Public License version 2
8 -as published by the Free Software Foundation.
9 -This program is distributed in the hope that it will be useful,
10 -but WITHOUT ANY WARRANTY; without even the implied warranty of
11 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 -See the GNU General Public License for more details, at
13 -http://www.fsf.org/licenses/gpl.html
14 -'''
15 -
16 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__email__ = 'dvanliere at gmail dot com'
18 -__date__ = '2010-10-21'
19 -__version__ = '0.1'
20 -
21 -import sys
22 -import pymongo
23 -from bson.code import Code
24 -
25 -if '..' not in sys.path:
26 - sys.path.append('..')
27 -
28 -from classes import settings
29 -settings = settings.Settings()
30 -import file_utils
31 -
32 -
33 -def init_mongo_db(dbname):
34 - connection = pymongo.Connection()
35 - db = connection[dbname]
36 - return db
37 -
38 -
39 -def drop_collection(dbname, collection):
40 - db = init_mongo_db(dbname)
41 - db.drop_collection(collection)
42 -
43 -
44 -def get_collections(dbname):
45 - db = init_mongo_db(dbname)
46 - return db.collection_names()
47 -
48 -
49 -def count_records(dbname, collection):
50 - db = init_mongo_db(dbname)
51 - return db[collection].count()
52 -
53 -
54 -def cleanup_database(dbname, logger, endswith=None):
55 - coll = get_collections(dbname)
56 - for c in coll:
57 - if not c.startswith('system'):
58 - if endswith != None and c.endswith(endswith):
59 - drop_collection(dbname, c)
60 - logger.debug('Deleting collection %s from database %s.' % (c, dbname))
61 -
62 -
63 -def remove_documents_from_mongo_db(collection, ids):
64 - collection.remove(ids)
65 -
66 -
67 -def add_index_to_collection(db, collection, key):
68 - '''
69 - @db is the name of the mongodb
70 - @collection is the name of the 'table' in mongodb
71 - @key name of the field to create the index
72 - '''
73 -
74 - mongo = init_mongo_db(db)
75 - collection = mongo[collection]
76 - mongo.collection.create_index(key)
77 - mongo.collection.ensure_index(key)
78 -
79 -
80 -def run_query(dbname, collection, var, qualifier=None):
81 - mongo = init_mongo_db(dbname)
82 - collection = mongo[collection]
83 - if qualifier == 'min':
84 - return collection.find({var : {'$ne' : False}}).sort(var, pymongo.ASCENDING).limit(1)[0]
85 - elif qualifier == 'max':
86 - return collection.find({var : {'$ne' : False}}).sort(var, pymongo.DESCENDING).limit(1)[0]
87 - else:
88 - return collection.find({var: 1})
89 -
90 -
91 -def stringify_keys(obj):
92 - '''
93 - @obj should be a dictionary where the keys are not yet strings. this function
94 - is called just prior any insert / update query in mongo because mongo only
95 - accepts strings as keys.
96 - '''
97 - d = {}
98 - for o in obj:
99 - if isinstance(obj[o], dict):
100 - #if type(obj[o]) == type({}):
101 - obj[o] = stringify_keys(obj[o])
102 - d[str(o)] = obj[o]
103 - return d
104 -
105 -
106 -def retrieve_min_value(dbname, collection, var):
107 - mongo = init_mongo_db(dbname)
108 - coll = mongo[collection]
109 - emit = 'function () {emit(this.editor, this.edit_count);}'
110 - map = Code(emit)
111 - reduce = Code("function()")
112 -# reduce = Code("""reduce = function (key, value) {
113 -# return Math.max(value);
114 -# }
115 -# """)
116 - cursor = coll.map_reduce(map, reduce)
117 -
118 - data = []
119 - for c in cursor.find():
120 - data.append(c)
121 - return data
122 -
123 -
124 -def retrieve_distinct_keys(dbname, collection, field, force_new=False):
125 - #mongo = init_mongo_db(dbname)
126 - #editors = mongo[collection]
127 - #ids = retrieve_distinct_keys_mapreduce(editors, field)
128 - '''
129 - TODO: figure how big the index is and then take appropriate action, index
130 - < 4mb just do a distinct query, index > 4mb do a map reduce.
131 - '''
132 - if force_new == False and file_utils.check_file_exists(settings.binary_location,
133 - '%s_%s_%s.bin' % (dbname, collection, field)):
134 - ids = file_utils.load_object(settings.binary_location, '%s_%s_%s.bin' % (dbname, collection, field))
135 - else:
136 - mongo = init_mongo_db(dbname)
137 - editors = mongo[collection]
138 - #index_size = mongo.stats()
139 - if editors.count () < 200000: #TODO this is a bit arbitrary, should check if index > 4Mb.
140 - ids = editors.distinct(field)
141 - else:
142 - #params = {}
143 - #params['size'] = 'size'
144 - #size = editors.find_one({'size': 1})
145 - ids = retrieve_distinct_keys_mapreduce(editors, field)
146 - file_utils.store_object(ids, settings.binary_location, '%s_%s_%s.bin' % (dbname, collection, field))
147 - return ids
148 -
149 -
150 -def retrieve_distinct_keys_mapreduce(collection, field):
151 - emit = 'function () { emit(this.%s, 1)};' % field
152 - map = Code(emit)
153 -
154 - reduce = Code("function()")
155 -
156 - ids = []
157 - cursor = collection.map_reduce(map, reduce)
158 - for c in cursor.find():
159 - ids.append(c['_id'])
160 - return ids
161 -
162 -
163 -def debug():
164 - #retrieve_distinct_keys('enwiki', 'editors_dataset', 'editor')
165 - retrieve_min_value('enwiki', 'editors_dataset', 'new_wikipedian')
166 -
167 -
168 -if __name__ == '__main__':
169 - debug()
Index: trunk/tools/editor_trends/database/launcher.py
@@ -28,15 +28,15 @@
2929 from utils import file_utils
3030
3131
32 -def start_mongodb_server(x, path):
 32+def start_mongodb_server(port, path):
3333 default_port = 27017
34 - port = default_port + x
 34+ port = default_port + port
3535 if settings.platform == 'Windows':
3636 p = subprocess.Popen([path, '--port', str(port), '--dbpath', 'c:\data\db', '--logpath', 'c:\mongodb\logs'])
3737 elif settings.platform == 'Linux':
3838 subprocess.Popen([path, '--port %s' % port])
3939 elif settings.platform == 'OSX':
40 - raise NotImplementedError
 40+ raise exceptions.NotImplementedError
4141 else:
4242 raise exceptions.PlatformNotSupportedError(platform)
4343
Index: trunk/tools/editor_trends/database/cache.py
@@ -20,8 +20,6 @@
2121
2222 import datetime
2323 import sys
24 -import bson
25 -from pymongo.errors import OperationFailure
2624
2725 if '..' not in sys.path:
2826 sys.path.append('..')
@@ -29,13 +27,12 @@
3028 from classes import settings
3129 settings = settings.Settings()
3230
33 -import db
3431 from utils import file_utils
3532 from utils import data_converter
3633
37 -class EditorCache(object):
38 - def __init__(self, collection):
39 - self.collection = collection
 34+class EditorCache:
 35+ def __init__(self, db):
 36+ self.db = db
4037 self.editors = {}
4138 self.final_year = datetime.datetime.now().year + 1
4239 self.n = 0
@@ -57,8 +54,7 @@
5855 def add(self, key, value):
5956 if value == 'NEXT':
6057 self.n += 1
61 - edits = db.stringify_keys(self.editors[key]['edits'])
62 - edits = self.drop_years_no_obs(edits)
 58+ edits = self.drop_years_no_obs(self.editors[key]['edits'])
6359 self.insert(key, edits, self.editors[key]['username'])
6460 del self.editors[key]
6561 else:
@@ -74,22 +70,21 @@
7571 self.editors[key]['edits'][year].append(value)
7672 self.editors[key]['obs'] += 1
7773
78 - def update(self, editor, values):
79 - self.collection.update({'editor': editor}, {'$pushAll': {'edits': values}}, upsert=True)
80 -
8174 def insert(self, editor, values, username):
82 - '''
83 - Adding the safe=True statement slows down the insert process but this
84 - assures that all data will be written.
85 - '''
86 - try:
87 - self.collection.insert({'editor': editor, 'edits': values, 'username': username}, safe=True)
88 - except bson.errors.InvalidDocument:
89 - print 'BSON document too large, unable to store %s' % (username)
90 - except OperationFailure, error:
91 - print error
92 - print 'It seems that you are running out of disk space.'
93 - sys.exit(-1)
 75+ data = {'editor': editor, 'edits': values, 'username': username}
 76+ self.db.insert(data)
 77+# '''
 78+# Adding the safe=True statement slows down the insert process but this
 79+# assures that all data will be written.
 80+# '''
 81+# try:
 82+# self.collection.insert({'editor': editor, 'edits': values, 'username': username}, safe=True)
 83+# except bson.errors.InvalidDocument:
 84+# print 'BSON document too large, unable to store %s' % (username)
 85+# except OperationFailure, error:
 86+# print error
 87+# print 'It seems that you are running out of disk space.'
 88+# sys.exit(-1)
9489
9590 def store(self):
9691 file_utils.store_object(self, settings.binary_location, self.__repr__())
Index: trunk/tools/editor_trends/cronjobs.py
@@ -22,7 +22,7 @@
2323
2424 import manage as manager
2525
26 -from database import db
 26+from classes import storage
2727 from classes import languages
2828 from classes import projects
2929 from classes import runtime_settings
@@ -86,12 +86,11 @@
8787 This is the main entry point, it creates a queue with jobs and determines
8888 the type of job and fires it off
8989 '''
90 - mongo = db.init_mongo_db('wikilytics')
91 - coll = mongo['jobs']
 90+ db = storage.Database('mongo', 'wikilytics', 'jobs')
9291 tasks = []
9392 project, language, parser = manager.init_args_parser()
9493 args = parser.parse_args(['django'])
95 - jobs = coll.find({'finished': False, 'in_progress': False, 'error': False})
 94+ jobs = db.find({'finished': False, 'in_progress': False, 'error': False})
9695 for job in jobs:
9796 tasks.append(job)
9897
@@ -99,20 +98,19 @@
10099 if task['jobtype'] == 'dataset':
101100 print 'Launching the Editor Trends Analytics Toolkit.'
102101 res = launch_editor_trends_toolkit(task, args)
103 - #res = False
104102 else:
105103 print 'Launching %s.' % task['jobtype']
106104 res = launch_chart(task, args)
107105
108106 if res:
109 - coll.update({'_id': task['_id']}, {'$set': {'finished': True}})
 107+ db.update({'_id': task['_id']}, {'$set': {'finished': True}})
110108 else:
111109 '''
112110 To prevent jobs from recurring non-stop, set error to True. These
113111 jobs will be excluded and need to be investigated to see what's
114112 happening.
115113 '''
116 - coll.update({'_id': task['_id']}, {'$set': {'error': True}})
 114+ db.update('_id', task['_id'], {'$set': {'error': True}})
117115
118116
119117 def debug():
Index: trunk/tools/editor_trends/bots/detector.py
@@ -1,283 +0,0 @@
2 -#!/usr/bin/python
3 -# -*- coding: utf-8 -*-
4 -'''
5 -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
6 -This program is free software; you can redistribute it and/or
7 -modify it under the terms of the GNU General Public License version 2
8 -as published by the Free Software Foundation.
9 -This program is distributed in the hope that it will be useful,
10 -but WITHOUT ANY WARRANTY; without even the implied warranty of
11 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 -See the GNU General Public License for more details, at
13 -http://www.fsf.org/licenses/gpl.html
14 -'''
15 -
16 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__email__ = 'dvanliere at gmail dot com'
18 -__date__ = '2010-11-25'
19 -__version__ = '0.1'
20 -
21 -
22 -import os
23 -import cStringIO
24 -import multiprocessing
25 -import xml.etree.cElementTree as cElementTree
26 -import sys
27 -from Queue import Empty
28 -
29 -if '..' not in sys.path:
30 - sys.path.append('..')
31 -
32 -from classes import settings
33 -settings = settings.Settings()
34 -
35 -
36 -from database import db
37 -from utils import file_utils
38 -from utils import messages
39 -
40 -from classes import consumers
41 -from classes import bots
42 -
43 -try:
44 - import psyco
45 - psyco.full()
46 -except ImportError:
47 - pass
48 -
49 -
50 -def read_bots_csv_file(location, filename, encoding, manager=False):
51 - '''
52 - Constructs a dictionary from Bots.csv
53 - key is language
54 - value is a list of bot names
55 - '''
56 - if manager:
57 - bot_dict = manager.dict()
58 - else:
59 - bot_dict = dict()
60 - for line in file_utils.read_data_from_csv(location, filename, encoding):
61 - line = line.strip()
62 - language, bots = line.split(',')
63 - bots = bots.split('|')
64 - for bot in bots:
65 - if bot not in bot_dict:
66 - b = botconsumers.Bot(bot)
67 - b.id = None
68 - else:
69 - b = bot_dict[bot]
70 - b.projects.append(language)
71 - bot_dict[bot] = b
72 - return bot_dict
73 -
74 -
75 -def retrieve_bots(language_code):
76 - '''
77 - Loader function to retrieve list of id's of known Wikipedia bots.
78 - '''
79 - ids = []
80 - mongo = db.init_mongo_db('bots')
81 - bots = mongo['ids']
82 - cursor = bots.find()
83 - for bot in cursor:
84 - if bot['verified'] == 'True' and language_code in bot['projects']:
85 - ids.append(bot['name'])
86 - return ids
87 -
88 -
89 -def store_bots():
90 - '''
91 - This file reads the results from the lookup_bot_userid function and stores
92 - it in a MongoDB collection.
93 - '''
94 - keys = ['name', 'verified', 'projects']
95 - bots = file_utils.create_dict_from_csv_file(settings.csv_location,
96 - 'bots_ids.csv',
97 - 'utf-8',
98 - keys)
99 - mongo = db.init_mongo_db('bots')
100 - collection = mongo['ids']
101 - db.remove_documents_from_mongo_db(collection, None)
102 -
103 - for id in bots:
104 - bot = bots[id]
105 - data = dict([(k, bot[k]) for k in keys])
106 - data['id'] = id
107 - collection.insert(data)
108 -
109 - print 'Stored %s bots' % collection.count()
110 -
111 -
112 -def convert_object_to_dict(obj, exclude=[]):
113 - '''
114 - @obj is an arbitray object where the properties need to be translated to
115 - keys and values to ease writing to a csv file.
116 - '''
117 - d = {}
118 - for kw in obj.__dict__.keys():
119 - if kw not in exclude:
120 - d[kw] = getattr(obj, kw)
121 - return d
122 -
123 -
124 -def write_bot_list_to_csv(bots, keys):
125 - fh = file_utils.create_txt_filehandle(settings.csv_location, 'bots_ids.csv',
126 - 'w', 'utf-8')
127 - bot_dict = convert_object_to_dict(bots, exclude=['time', 'written'])
128 - for bot in bot_dict:
129 - bot = bot_dict[bot]
130 - file_utils.write_dict_to_csv(bot, fh, keys, write_key=False,
131 - newline=True)
132 - fh.close()
133 -
134 -
135 -def lookup_bot_userid(data, fh, bots, keys):
136 - '''
137 - This function is used to find the id's belonging to the different bots that
138 - are patrolling the Wikipedia sites.
139 - @xml_nodes is a list of xml elements that need to be parsed
140 - @bots is a dictionary containing the names of the bots to lookup
141 - '''
142 - username = data[3]
143 - if username in bots:
144 - bot = bots.pop(username)
145 - setattr(bot, 'id', data[0])
146 - setattr(bot, 'verified', True)
147 - bot = convert_object_to_dict(bot, exclude=['time'])
148 - file_utils.write_dict_to_csv(bot, fh, keys, write_key=False, newline=True)
149 - return bots
150 -
151 -
152 -def create_bot_validation_dataset(xml_nodes, fh, bots):
153 - revisions = xml_nodes.findall('revision')
154 - for revision in revisions:
155 - contributor = revision.find('contributor')
156 - #contributor = xml.retrieve_xml_node(revision, 'contributor')
157 - username = contributor.find('username')
158 - if username == None or username.text == None:
159 - continue
160 - else:
161 - username = username.text.lower()
162 -
163 - #print username.encode('utf-8')
164 - if username.find('bot') > -1 or username.find('script') > -1:
165 - bot = bots.get(username, botconsumers.Bot(username, verified=False))
166 - bot.id = contributor.find('id').text
167 - timestamp = revision.find('timestamp').text
168 - if timestamp != None:
169 - timestamp = file_utils.convert_timestamp_to_datetime_naive(timestamp, settings.timestamp_format)
170 - bot.time[str(timestamp.year)].append(timestamp)
171 - bots[username] = bot
172 -
173 - return bots
174 -
175 - #bot = bots.get('PseudoBot')
176 - #bot.hours_active()
177 - #bot.avg_lag_between_edits()
178 -
179 -
180 -def bot_launcher(language_code, project, target, action, single=False, manager=False):
181 - '''
182 - This function sets the stage to launch bot id detection and collecting data
183 - to discover new bots.
184 - '''
185 - file_utils.delete_file(settings.csv_location, 'bots_ids.csv')
186 - location = os.path.join(settings.input_location, language_code, project)
187 - input_xml = os.path.join(location, 'chunks')
188 - input_txt = os.path.join(location, 'txt')
189 -
190 - tasks = multiprocessing.JoinableQueue()
191 - mgr = multiprocessing.Manager()
192 - keys = ['id', 'name', 'verified', 'projects']
193 -
194 - if action == 'lookup':
195 - output_file = 'bots_ids.csv'
196 - files = file_utils.retrieve_file_list(input_txt, 'txt', mask=None)
197 - input_queue = pc.load_queue(files, poison_pill=True)
198 - bots = read_bots_csv_file(settings.csv_location, 'Bots.csv', 'utf-8', manager=manager)
199 - for file in files:
200 - tasks.put(consumers.TXTFile(file, input_txt, settings.csv_location, output_file, target, bots=bots, keys=keys))
201 -
202 - else:
203 - output_file = 'bots_predictionset.csv'
204 - files = file_utils.retrieve_file_list(input_xml, 'xml', mask=None)
205 - bots = {}
206 - for file in files:
207 - tasks.put(consumers.XMLFile(file, input_xml, settings.csv_location, output_file, target, bots=bots, keys=keys))
208 -
209 - #lock = mgr.Lock()
210 - if manager:
211 - manager = mgr
212 -
213 - tracker = {}
214 - if single:
215 - while True:
216 - try:
217 - print '%s files left in the queue...' % messages.show(tasks.qsize)
218 - task = tasks.get(block=False)
219 - bots = task(bots)
220 - except Empty:
221 - break
222 - else:
223 - bot_launcher_multi(tasks)
224 -
225 - file_utils.store_object(bots, settings.binary_location, 'bots.bin')
226 - if action == 'lookup':
227 - store_bots()
228 - if bots != {}:
229 - print 'The script was unable to retrieve the user id\s for the following %s bots:\n' % len(bots)
230 - keys = bots.keys()
231 - for key in keys:
232 - try:
233 - print '%s' % key.encode('utf-8')
234 - except:
235 - pass
236 - else:
237 - bot_training_dataset(bots)
238 - #write_bot_list_to_csv(bots, keys)
239 -
240 -
241 -def bot_training_dataset(bots):
242 - fh = file_utils.create_txt_filehandle(settings.csv_location, 'training_bots.csv', 'w', 'utf-8')
243 - keys = bots.keys()
244 - for key in keys:
245 - bot = bots.get(key)
246 - bot.hours_active()
247 - bot.avg_lag_between_edits()
248 - bot.write_training_dataset(fh)
249 -
250 - fh.close()
251 -
252 -
253 -def bot_launcher_multi(tasks):
254 - '''
255 - This is the launcher that uses multiprocesses.
256 - '''
257 - consumers = [consumers.XMLFileConsumer(tasks, None) for i in xrange(settings.number_of_processes)]
258 - for x in xrange(settings.number_of_processes):
259 - tasks.put(None)
260 -
261 - for w in consumers:
262 - w.start()
263 -
264 - tasks.join()
265 -
266 -
267 -def debug_bots_dict():
268 - bots = file_utils.load_object(settings.binary_location, 'bots.bin')
269 - for bot in bots:
270 - bot = bots[bot]
271 - edits = sum([len(bot.time[t]) for t in bot.time])
272 - print bot.name, bot.verified, edits
273 - print 'done'
274 - return bots
275 -
276 -
277 -if __name__ == '__main__':
278 - language_code = 'en'
279 - project = 'wiki'
280 - #store_bots()
281 - #bots = debug_bots_dict()
282 - #write_bot_list_to_csv(bots)
283 - #language_code, project, lookup_bot_userid, single = False, manager = False
284 - bot_launcher(language_code, project, create_bot_validation_dataset, action='training', single=True, manager=False)
Index: trunk/tools/editor_trends/code-snippets/mongodb/store.py
@@ -6,7 +6,7 @@
77 import datetime
88 import calendar
99 import time
10 -from database import db
 10+from classes import storage
1111
1212
1313 def test_date():
Index: trunk/tools/editor_trends/code-snippets/cassandra.py
@@ -0,0 +1,36 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2011-02-25'
 19+__version__ = '0.1'
 20+
 21+import pycassa
 22+
 23+def install_schema(keyspace_name, drop_first=False):
 24+
 25+ sm = pycassa.system_manager.SystemManager('127.0.0.1:9160')
 26+ if drop_first:
 27+ sm.drop_keyspace(keyspace_name)
 28+
 29+ sm.create_keyspace(keyspace_name, replication_factor=1)
 30+
 31+ sm.create_column_family(keyspace_name, 'revisions',
 32+ comparator_type=pycassa.system_manager.UTF8_TYPE,
 33+ default_validation_class=pycassa.system_manager.UTF8_TYPE)
 34+
 35+ sm.create_index(keyspace_name, 'revisions', 'article', pycassa.system_manager.UTF8_TYPE)
 36+ sm.create_index(keyspace_name, 'revisions', 'username', pycassa.system_manager.UTF8_TYPE)
 37+ sm.create_index(keyspace_name, 'revisions', 'user_id', pycassa.system_manager.LONG_TYPE)
Property changes on: trunk/tools/editor_trends/code-snippets/cassandra.py
___________________________________________________________________
Added: svn:eol-style
138 + native
Index: trunk/tools/editor_trends/code-snippets/cohort_confidence_intervals.py
@@ -24,7 +24,7 @@
2525 settings = configuration.Settings()
2626 from utils import file_utils
2727 from utils import messages
28 -from database import db
 28+from classes import storage
2929
3030
3131 #def dataset_edits_by_month(dbname, **kwargs):
Index: trunk/tools/editor_trends/code-snippets/exporter.py
@@ -31,7 +31,7 @@
3232 settings = configuration.Settings()
3333 from utils import file_utils
3434 from utils import messages
35 -from database import db
 35+from classes import storage
3636 from etl import shaper
3737 from analyses import cohort_charts
3838
Index: trunk/tools/editor_trends/code-snippets/mongo.py
@@ -0,0 +1,171 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__email__ = 'dvanliere at gmail dot com'
 18+__date__ = '2010-10-21'
 19+__version__ = '0.1'
 20+
 21+
 22+import sys
 23+import pymongo
 24+from bson.code import Code
 25+
 26+if '..' not in sys.path:
 27+ sys.path.append('..')
 28+
 29+from classes import settings
 30+settings = settings.Settings()
 31+import file_utils
 32+
 33+
 34+
 35+
 36+def init_mongo_db(dbname):
 37+ connection = pymongo.Connection()
 38+ db = connection[dbname]
 39+ return db
 40+
 41+
 42+def drop_collection(dbname, collection):
 43+ db = init_mongo_db(dbname)
 44+ db.drop_collection(collection)
 45+
 46+
 47+def get_collections(dbname):
 48+ db = init_mongo_db(dbname)
 49+ return db.collection_names()
 50+
 51+
 52+def count_records(dbname, collection):
 53+ db = init_mongo_db(dbname)
 54+ return db[collection].count()
 55+
 56+
 57+def cleanup_database(dbname, logger, endswith=None):
 58+ coll = get_collections(dbname)
 59+ for c in coll:
 60+ if not c.startswith('system'):
 61+ if endswith != None and c.endswith(endswith):
 62+ drop_collection(dbname, c)
 63+ logger.debug('Deleting collection %s from database %s.' % (c, dbname))
 64+
 65+
 66+def remove_documents_from_mongo_db(collection, ids):
 67+ collection.remove(ids)
 68+
 69+
 70+def add_index_to_collection(db, collection, key):
 71+ '''
 72+ @db is the name of the mongodb
 73+ @collection is the name of the 'table' in mongodb
 74+ @key name of the field to create the index
 75+ '''
 76+
 77+ mongo = init_mongo_db(db)
 78+ collection = mongo[collection]
 79+ mongo.collection.create_index(key)
 80+ mongo.collection.ensure_index(key)
 81+
 82+
 83+def run_query(dbname, collection, var, qualifier=None):
 84+ mongo = init_mongo_db(dbname)
 85+ collection = mongo[collection]
 86+ if qualifier == 'min':
 87+ return collection.find({var : {'$ne' : False}}).sort(var, pymongo.ASCENDING).limit(1)[0]
 88+ elif qualifier == 'max':
 89+ return collection.find({var : {'$ne' : False}}).sort(var, pymongo.DESCENDING).limit(1)[0]
 90+ else:
 91+ return collection.find({var: 1})
 92+
 93+
 94+def stringify_keys(obj):
 95+ '''
 96+ @obj should be a dictionary where the keys are not yet strings. this function
 97+ is called just prior any insert / update query in mongo because mongo only
 98+ accepts strings as keys.
 99+ '''
 100+ d = {}
 101+ for o in obj:
 102+ if isinstance(obj[o], dict):
 103+ #if type(obj[o]) == type({}):
 104+ obj[o] = stringify_keys(obj[o])
 105+ d[str(o)] = obj[o]
 106+ return d
 107+
 108+
 109+def retrieve_min_value(dbname, collection, var):
 110+ mongo = init_mongo_db(dbname)
 111+ coll = mongo[collection]
 112+ emit = 'function () {emit(this.editor, this.edit_count);}'
 113+ map = Code(emit)
 114+ reduce = Code("function()")
 115+# reduce = Code("""reduce = function (key, value) {
 116+# return Math.max(value);
 117+# }
 118+# """)
 119+ cursor = coll.map_reduce(map, reduce)
 120+
 121+ data = []
 122+ for c in cursor.find():
 123+ data.append(c)
 124+ return data
 125+
 126+
 127+def retrieve_distinct_keys(dbname, collection, field, force_new=False):
 128+ #mongo = init_mongo_db(dbname)
 129+ #editors = mongo[collection]
 130+ #ids = retrieve_distinct_keys_mapreduce(editors, field)
 131+ '''
 132+ TODO: figure how big the index is and then take appropriate action, index
 133+ < 4mb just do a distinct query, index > 4mb do a map reduce.
 134+ '''
 135+ if force_new == False and file_utils.check_file_exists(settings.binary_location,
 136+ '%s_%s_%s.bin' % (dbname, collection, field)):
 137+ ids = file_utils.load_object(settings.binary_location, '%s_%s_%s.bin' % (dbname, collection, field))
 138+ else:
 139+ mongo = init_mongo_db(dbname)
 140+ editors = mongo[collection]
 141+ #index_size = mongo.stats()
 142+ if editors.count () < 200000: #TODO this is a bit arbitrary, should check if index > 4Mb.
 143+ ids = editors.distinct(field)
 144+ else:
 145+ #params = {}
 146+ #params['size'] = 'size'
 147+ #size = editors.find_one({'size': 1})
 148+ ids = retrieve_distinct_keys_mapreduce(editors, field)
 149+ file_utils.store_object(ids, settings.binary_location, '%s_%s_%s.bin' % (dbname, collection, field))
 150+ return ids
 151+
 152+
 153+def retrieve_distinct_keys_mapreduce(collection, field):
 154+ emit = 'function () { emit(this.%s, 1)};' % field
 155+ map = Code(emit)
 156+
 157+ reduce = Code("function()")
 158+
 159+ ids = []
 160+ cursor = collection.map_reduce(map, reduce)
 161+ for c in cursor.find():
 162+ ids.append(c['_id'])
 163+ return ids
 164+
 165+
 166+def debug():
 167+ #retrieve_distinct_keys('enwiki', 'editors_dataset', 'editor')
 168+ retrieve_min_value('enwiki', 'editors_dataset', 'new_wikipedian')
 169+
 170+
 171+if __name__ == '__main__':
 172+ debug()
Property changes on: trunk/tools/editor_trends/code-snippets/mongo.py
___________________________________________________________________
Added: svn:eol-style
1173 + native