Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -29,14 +29,17 @@ |
30 | 30 | from xml.etree.cElementTree import fromstring, dump |
31 | 31 | from collections import deque |
32 | 32 | |
| 33 | +if '..' not in sys.path: |
| 34 | + sys.path.append('..') |
| 35 | + |
33 | 36 | try: |
34 | 37 | import pycassa |
| 38 | + from database import cassandra |
35 | 39 | except ImportError: |
36 | 40 | print 'I am not going to use Cassandra today, it\'s my off day.' |
37 | 41 | |
38 | | -if '..' not in sys.path: |
39 | | - sys.path.append('..') |
40 | 42 | |
| 43 | + |
41 | 44 | from database import db |
42 | 45 | from bots import detector |
43 | 46 | from utils import file_utils |
— | — | @@ -61,7 +64,15 @@ |
62 | 65 | |
63 | 66 | def setup_db(self): |
64 | 67 | if self.storage == 'cassandra': |
65 | | - pass |
| 68 | + self.keyspace_name = 'enwiki' |
| 69 | + try: |
| 70 | + self.db = pycassa.connect(self.keyspace_name) |
| 71 | + cassandra.install_schema(self.keyspace_name) |
| 72 | + self.collection = pycassa.ColumnFamily(self.db, 'Standard1') |
| 73 | + |
| 74 | + except NameError, e: |
| 75 | + pass |
| 76 | + |
66 | 77 | else: |
67 | 78 | self.db = db.init_mongo_db('enwiki') |
68 | 79 | self.collection = self.db['kaggle'] |
— | — | @@ -70,14 +81,19 @@ |
71 | 82 | self.revisions.append(revision) |
72 | 83 | if len(self.revisions) == 100: |
73 | 84 | self.store() |
74 | | - self.revisions = [] |
| 85 | + self.clear() |
75 | 86 | |
76 | 87 | def empty(self): |
77 | 88 | self.store() |
| 89 | + self.clear() |
78 | 90 | |
| 91 | + def clear(self): |
| 92 | + self.revisions = [] |
| 93 | + |
79 | 94 | def store(self): |
80 | 95 | if self.storage == 'cassandra': |
81 | 96 | print 'insert into cassandra' |
| 97 | + self.collection.batch_insert(self.revisions) |
82 | 98 | else: |
83 | 99 | print 'insert into mongo' |
84 | 100 | |
— | — | @@ -230,6 +246,8 @@ |
231 | 247 | revision_id = extracter.extract_revision_id(revision_id) |
232 | 248 | row['revision_id'] = revision_id |
233 | 249 | |
| 250 | + timestamp = revision.find('timestamp').text |
| 251 | + row['timestamp'] = timestamp |
234 | 252 | |
235 | 253 | hash = create_md5hash(revision) |
236 | 254 | revert = is_revision_reverted(hash['hash'], hashes) |
— | — | @@ -239,7 +257,7 @@ |
240 | 258 | row.update(hash) |
241 | 259 | row.update(size) |
242 | 260 | row.update(revert) |
243 | | - print row |
| 261 | + #print row |
244 | 262 | # if row['username'] == None: |
245 | 263 | # contributor = revision.find('contributor') |
246 | 264 | # attrs = contributor.getchildren() |
— | — | @@ -301,20 +319,22 @@ |
302 | 320 | input_queue = JoinableQueue() |
303 | 321 | result_queue = JoinableQueue() |
304 | 322 | storage = 'cassandra' |
305 | | - files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2'] |
| 323 | + #files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2'] |
| 324 | + files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2'] |
| 325 | + |
306 | 326 | for file in files: |
307 | 327 | input_queue.put(file) |
308 | 328 | |
309 | | - for x in xrange(cpu_count): |
| 329 | + for x in xrange(cpu_count()): |
310 | 330 | input_queue.put(None) |
311 | 331 | |
312 | 332 | extracters = [Process(target=create_article, args=[input_queue, result_queue]) |
313 | | - for x in xrange(cpu_count)] |
| 333 | + for x in xrange(cpu_count())] |
314 | 334 | for extracter in extracters: |
315 | 335 | extracter.start() |
316 | 336 | |
317 | 337 | creators = [Process(target=create_variables, args=[result_queue, storage]) |
318 | | - for x in xrange(cpu_count)] |
| 338 | + for x in xrange(cpu_count())] |
319 | 339 | for creator in creators: |
320 | 340 | creator.start() |
321 | 341 | |
Index: trunk/tools/editor_trends/database/cassandra.py |
— | — | @@ -0,0 +1,32 @@ |
| 2 | +#!/usr/bin/python |
| 3 | +# -*- coding: utf-8 -*- |
| 4 | +''' |
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
| 17 | +__author__email = 'dvanliere at gmail dot com' |
| 18 | +__date__ = '2011-02-25' |
| 19 | +__version__ = '0.1' |
| 20 | + |
| 21 | +def install_schema(keyspace_name, drop_first=False): |
| 22 | + |
| 23 | + sm = pycassa.system_manager.SystemManager('127.0.0.1:9160') |
| 24 | + |
| 25 | + sm.create_keyspace(keyspace_name, replication_factor=1) # TODO: Change |
| 26 | + |
| 27 | + sm.create_column_family(keyspace_name, 'revisions', |
| 28 | + comparator_type=pycassa.system_manager.UTF8_TYPE, |
| 29 | + default_validation_class=pycassa.system_manager.UTF8_TYPE) |
| 30 | + |
| 31 | + sm.create_index(keyspace_name, 'revisions', 'article', pycassa.system_manager.UTF8_TYPE) |
| 32 | + sm.create_index(keyspace_name, 'revisions', 'username', pycassa.system_manager.UTF8_TYPE) |
| 33 | + sm.create_index(keyspace_name, 'revisions', 'user_id', pycassa.system_manager.LONG_TYPE) |
Property changes on: trunk/tools/editor_trends/database/cassandra.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 34 | + native |