r82943 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r82942‎ | r82943 | r82944 >
Date:18:31, 28 February 2011
Author:diederik
Status:deferred
Tags:
Comment:
Cassandra support (ALPHA)
Modified paths:
  • /trunk/tools/editor_trends/database/cassandra.py (added) (history)
  • /trunk/tools/editor_trends/etl/enricher.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/enricher.py
@@ -29,14 +29,17 @@
3030 from xml.etree.cElementTree import fromstring, dump
3131 from collections import deque
3232
 33+if '..' not in sys.path:
 34+ sys.path.append('..')
 35+
3336 try:
3437 import pycassa
 38+ from database import cassandra
3539 except ImportError:
3640 print 'I am not going to use Cassandra today, it\'s my off day.'
3741
38 -if '..' not in sys.path:
39 - sys.path.append('..')
4042
 43+
4144 from database import db
4245 from bots import detector
4346 from utils import file_utils
@@ -61,7 +64,15 @@
6265
6366 def setup_db(self):
6467 if self.storage == 'cassandra':
65 - pass
 68+ self.keyspace_name = 'enwiki'
 69+ try:
 70+ self.db = pycassa.connect(self.keyspace_name)
 71+ cassandra.install_schema(self.keyspace_name)
 72+ self.collection = pycassa.ColumnFamily(self.db, 'Standard1')
 73+
 74+ except NameError, e:
 75+ pass
 76+
6677 else:
6778 self.db = db.init_mongo_db('enwiki')
6879 self.collection = self.db['kaggle']
@@ -70,14 +81,19 @@
7182 self.revisions.append(revision)
7283 if len(self.revisions) == 100:
7384 self.store()
74 - self.revisions = []
 85+ self.clear()
7586
7687 def empty(self):
7788 self.store()
 89+ self.clear()
7890
 91+ def clear(self):
 92+ self.revisions = []
 93+
7994 def store(self):
8095 if self.storage == 'cassandra':
8196 print 'insert into cassandra'
 97+ self.collection.batch_insert(self.revisions)
8298 else:
8399 print 'insert into mongo'
84100
@@ -230,6 +246,8 @@
231247 revision_id = extracter.extract_revision_id(revision_id)
232248 row['revision_id'] = revision_id
233249
 250+ timestamp = revision.find('timestamp').text
 251+ row['timestamp'] = timestamp
234252
235253 hash = create_md5hash(revision)
236254 revert = is_revision_reverted(hash['hash'], hashes)
@@ -239,7 +257,7 @@
240258 row.update(hash)
241259 row.update(size)
242260 row.update(revert)
243 - print row
 261+ #print row
244262 # if row['username'] == None:
245263 # contributor = revision.find('contributor')
246264 # attrs = contributor.getchildren()
@@ -301,20 +319,22 @@
302320 input_queue = JoinableQueue()
303321 result_queue = JoinableQueue()
304322 storage = 'cassandra'
305 - files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
 323+ #files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
 324+ files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
 325+
306326 for file in files:
307327 input_queue.put(file)
308328
309 - for x in xrange(cpu_count):
 329+ for x in xrange(cpu_count()):
310330 input_queue.put(None)
311331
312332 extracters = [Process(target=create_article, args=[input_queue, result_queue])
313 - for x in xrange(cpu_count)]
 333+ for x in xrange(cpu_count())]
314334 for extracter in extracters:
315335 extracter.start()
316336
317337 creators = [Process(target=create_variables, args=[result_queue, storage])
318 - for x in xrange(cpu_count)]
 338+ for x in xrange(cpu_count())]
319339 for creator in creators:
320340 creator.start()
321341
Index: trunk/tools/editor_trends/database/cassandra.py
@@ -0,0 +1,32 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2011-02-25'
 19+__version__ = '0.1'
 20+
 21+def install_schema(keyspace_name, drop_first=False):
 22+
 23+ sm = pycassa.system_manager.SystemManager('127.0.0.1:9160')
 24+
 25+ sm.create_keyspace(keyspace_name, replication_factor=1) # TODO: Change
 26+
 27+ sm.create_column_family(keyspace_name, 'revisions',
 28+ comparator_type=pycassa.system_manager.UTF8_TYPE,
 29+ default_validation_class=pycassa.system_manager.UTF8_TYPE)
 30+
 31+ sm.create_index(keyspace_name, 'revisions', 'article', pycassa.system_manager.UTF8_TYPE)
 32+ sm.create_index(keyspace_name, 'revisions', 'username', pycassa.system_manager.UTF8_TYPE)
 33+ sm.create_index(keyspace_name, 'revisions', 'user_id', pycassa.system_manager.LONG_TYPE)
Property changes on: trunk/tools/editor_trends/database/cassandra.py
___________________________________________________________________
Added: svn:eol-style
134 + native

Status & tagging log