r93712 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r93711‎ | r93712 | r93713 >
Date:09:01, 2 August 2011
Author:rfaulk
Status:deferred
Tags:
Comment:
built dataloader to process category hierarchies
Modified paths:
  • /trunk/tools/wsor/scripts/classes/WSORSlaveDataLoader.py (modified) (history)

Diff [purge]

Index: trunk/tools/wsor/scripts/classes/WSORSlaveDataLoader.py
@@ -1,6 +1,6 @@
22 """
33
4 -WSOR dataloader class for the MySQL slave of enwiki
 4+WSOR dataloader class for the MySQL slave of enwiki and user dbs
55
66 """
77
@@ -13,11 +13,17 @@
1414
1515 """ Import python base modules """
1616 import sys, getopt, re, datetime, logging, MySQLdb, settings
 17+import networkx as nx
1718
1819 """ Import Analytics modules """
1920 from Fundraiser_Tools.classes.DataLoader import DataLoader
2021
 22+""" Configure the logger """
 23+LOGGING_STREAM = sys.stderr
 24+logging.basicConfig(level=logging.DEBUG, stream=LOGGING_STREAM, format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%b-%d %H:%M:%S')
2125
 26+
 27+
2228 """
2329 Inherits DataLoader
2430
@@ -27,10 +33,12 @@
2834 class WSORSlaveDataLoader(DataLoader):
2935
3036 def __init__(self):
31 -
32 - """ Configure the logger """
33 - LOGGING_STREAM = sys.stderr
34 - logging.basicConfig(level=logging.DEBUG, stream=LOGGING_STREAM, format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%b-%d %H:%M:%S')
 37+
 38+ self.init_db()
 39+
 40+ def __del__(self):
 41+
 42+ self.close_db()
3543
3644
3745 """
@@ -42,9 +50,9 @@
4351
4452 """ Establish connection """
4553 try:
46 - self._db_ = MySQLdb.connect(host=settings.__db_server__, user=settings.__user__, db=settings.__db__, port=settings.__db_port__, passwd=settings.__pass__)
 54+ self._db_ = MySQLdb.connect(host=settings.__db_server__, user=settings.__user__, db=settings.__db__, port=settings.__db_port__, passwd=settings.__pass__)
4755 self._db_enwiki_ = MySQLdb.connect(host=settings.__db_server__, user=settings.__user__, db=settings.__db_enwikislave__, port=settings.__db_port__, passwd=settings.__pass__)
48 - logging.info('Successfully connected.\n')
 56+ logging.info('Successfully connected.')
4957 except:
5058 logging.DEBUG('Could not establish a connection to %s @ %s : %s' % (settings.__user__, settings.__db_server__, settings.__db__))
5159 return
@@ -66,6 +74,255 @@
6775 """
6876 Inherits WSORSlaveDataLoader
6977
 78+ DataLoader class for querying category tables
 79+
 80+"""
 81+class CategoryLoader(WSORSlaveDataLoader):
 82+
 83+ def __init__(self):
 84+
 85+ self._query_names_['build_subcat_tbl'] = "CREATE TABLE rfaulk.categorylinks_cp select * from enwiki.categorylinks where cl_type = 'subcat'"
 86+ self._query_names_['drop_subcat_tbl'] = "drop table if exists rfaulk.categorylinks_cp;"
 87+ self._query_names_['get_first_rec'] = "select cl_from from categorylinks_cp limit 1"
 88+ self._query_names_['get_category_page_title'] = "select page_title from enwiki.page where page_id = %s"
 89+ self._query_names_['get_category_page_id'] = "select page_id from enwiki.page where page_title = '%s' and page_namespace = 14"
 90+ self._query_names_['get_subcategories'] = "select cl_to from categorylinks_cp where cl_from = %s"
 91+ self._query_names_['delete_from_recs'] = "delete from rfaulk.categorylinks_cp where cl_from = %s"
 92+ self._query_names_['is_empty'] = "select * from rfaulk.categorylinks_cp limit 1"
 93+
 94+ WSORSlaveDataLoader.__init__(self)
 95+ logging.info('Creating CategoryLoader')
 96+
 97+ """
 98+ Retrives the integer page id
 99+ """
 100+ def get_page_id(self, page_title):
 101+
 102+ try:
 103+ sql = self._query_names_['get_category_page_id'] % page_title
 104+ #logging.info('Executing: ' + sql)
 105+ results = self.execute_SQL(sql)
 106+ id = int(results[0][0])
 107+
 108+ except Exception as inst:
 109+
 110+ logging.error('Could not retrieve page_id.')
 111+ return -1
 112+
 113+ return id
 114+
 115+ """
 116+ Retrives the string page title
 117+ """
 118+ def get_page_title(self, page_id):
 119+
 120+ try:
 121+ sql = self._query_names_['get_category_page_title'] % page_id
 122+ #logging.info('Executing: ' + sql)
 123+ results = self.execute_SQL(sql)
 124+ title = str(results[0][0])
 125+
 126+ except Exception as inst:
 127+
 128+ logging.error('Could not retrieve page_title.')
 129+ return -1
 130+
 131+ return title
 132+
 133+ """
 134+ Look at the first category in caegorylinks_cp
 135+ """
 136+ def get_first_record_from_category_links(self):
 137+
 138+ try:
 139+
 140+ sql = self._query_names_['get_first_rec']
 141+ #logging.info('Executing: ' + sql)
 142+ results = self.execute_SQL(sql)
 143+
 144+ #logging.info('Retrieved first row from rfaulk.categorylinks_cp.')
 145+ category_page_id = int(results[0][0])
 146+ category_page_title = self.get_page_title(category_page_id)
 147+
 148+ except Exception as inst:
 149+
 150+ logging.error('Could not retrieve page_title.')
 151+ return ''
 152+
 153+ return category_page_title
 154+
 155+
 156+ """
 157+ Finds and returns a list of sub categories
 158+ """
 159+ def get_subcategories(self, category_title):
 160+
 161+ subcategories = list()
 162+
 163+ # category_upper, category_lower, category_camel = self.normalize_field_cl_from(category)
 164+
 165+ """ Retrieve the sub categories and add to list """
 166+ category_page_id = self.get_page_id(category_title)
 167+ category_page_id_str = str(category_page_id)
 168+ sql_select = self._query_names_['get_subcategories'] % category_page_id_str
 169+ sql_delete = self._query_names_['delete_from_recs'] % category_page_id_str
 170+
 171+ try:
 172+ #logging.info('Executing: ' + sql_select)
 173+ results = self.execute_SQL(sql_select)
 174+ #logging.info('Retrieved sub-categories of %s.' % category_page_id_str)
 175+
 176+ for row in results:
 177+ subcategories.append(row[0])
 178+
 179+ #logging.info('Executing: ' + sql_delete)
 180+ self.execute_SQL(sql_delete)
 181+ #logging.info('Removed references from the category with page_id %s.' % category_page_id_str)
 182+
 183+ except Exception as inst:
 184+
 185+ logging.error('Could not retrieve sub-categories.')
 186+ return []
 187+
 188+ return subcategories
 189+
 190+
 191+
 192+ """
 193+ Recursively builds subtree of categories
 194+ """
 195+ def build_category_tree(self, directed_graph, category_title):
 196+
 197+ # logging.info('Creating directed links for category: %s' % category_title)
 198+
 199+ """ Get sub-categories """
 200+ sub_categories = self.get_subcategories(category_title)
 201+
 202+ """ Build nodes for each """
 203+ for sub_cat in sub_categories:
 204+
 205+ directed_graph.add_weighted_edges_from([(category_title, sub_cat,1)])
 206+ self.build_category_tree(directed_graph, sub_cat)
 207+
 208+ return directed_graph
 209+
 210+
 211+ """
 212+ Execution entry point of the class - builds a full category hierarchy from categorylinks
 213+ """
 214+ def extract_hierarchy(self):
 215+
 216+ #self.drop_category_links_cp_table()
 217+ #self.create_category_links_cp_table()
 218+
 219+ """ Create graph """
 220+ logging.info('Initializing directed graph...')
 221+ directed_graph = nx.DiGraph()
 222+
 223+ """ while there are rows left in categorylinks_cp """
 224+ while(not self.is_empty()):
 225+
 226+ category_title = self.get_first_record_from_category_links()
 227+ self.build_category_tree(directed_graph, category_title)
 228+ directed_graph.add_weighted_edges_from([('ALL', category_title, 1)])
 229+
 230+ logging.info('Category links finished processing.')
 231+
 232+ return directed_graph
 233+
 234+
 235+ """ drop rfaulk.categorylinks_cp """
 236+ def drop_category_links_cp_table(self):
 237+
 238+ try:
 239+ sql = self._query_names_['drop_subcat_tbl']
 240+ logging.info('Executing: ' + sql)
 241+
 242+ self._cur_.execute(sql)
 243+ logging.info('Dropped rfaulk.categorylinks_cp table.')
 244+
 245+ except Exception as inst:
 246+
 247+ logging.error('Could not execute: %s\n' % sql)
 248+ logging.error(str(type(inst))) # the exception instance
 249+ logging.error(str(inst.args)) # arguments stored in .args
 250+ logging.error(inst.__str__()) # __str__ allows args to printed directly
 251+
 252+
 253+ """ create rfaulk.categorylinks_cp """
 254+ def create_category_links_cp_table(self):
 255+
 256+ try:
 257+ sql = self._query_names_['build_subcat_tbl']
 258+ logging.info('Executing: ' + sql)
 259+
 260+ self._cur_.execute(sql)
 261+ logging.info('Created rfaulk.categorylinks_cp table from enwiki.categorylinks_cp.')
 262+
 263+ except Exception as inst:
 264+
 265+ logging.error('Could not execute: %s\n' % sql)
 266+ logging.error(str(type(inst))) # the exception instance
 267+ logging.error(str(inst.args)) # arguments stored in .args
 268+ logging.error(inst.__str__()) # __str__ allows args to printed directly
 269+
 270+
 271+
 272+ """
 273+ Are there any records remaining in rfaulk.categorylinks_cp ??
 274+ """
 275+ def is_empty(self):
 276+
 277+ sql = self._query_names_['is_empty']
 278+
 279+ try:
 280+ self._cur_.execute(sql)
 281+ rows = self._cur_.fetchone()
 282+
 283+ except Exception as inst:
 284+
 285+ logging.error('Could not execute: %s\n' % sql)
 286+ logging.error(str(type(inst))) # the exception instance
 287+ logging.error(str(inst.args)) # arguments stored in .args
 288+ logging.error(inst.__str__()) # __str__ allows args to printed directly
 289+
 290+ return True
 291+
 292+ if len(rows) > 0:
 293+ return False
 294+ else:
 295+ return True
 296+
 297+ """
 298+ The cl_from key is formatted in uppercase with non-uniform whitespace
 299+
 300+ def normalize_field_cl_from(self, category):
 301+
 302+ category = category.lower()
 303+ words = category.split('\n')[0] # only keep text before the a carraige return
 304+ words = words.split()
 305+ len_words = len(words)
 306+
 307+ category = ''
 308+ category_camel = ''
 309+
 310+
 311+ for i in range(len_words - 1):
 312+ category = category + words[i] + ' '
 313+ category_camel = category_camel + words[i][0].upper() + words[i][1:] + ' '
 314+
 315+ category = category + words[len_words - 1]
 316+ category_camel = category_camel + words[len_words - 1][0].upper() + words[len_words - 1][1:]
 317+
 318+ category_upper = category.upper()
 319+ category_lower = category.lower()
 320+
 321+ return category_upper, category_lower, category_camel
 322+ """
 323+
 324+"""
 325+ Inherits WSORSlaveDataLoader
 326+
70327 DataLoader class for vandal reversion related queries
71328
72329 """
@@ -74,21 +331,21 @@
75332
76333 def __init__(self, query_key):
77334
78 - DataLoader.__init__(self)
 335+ WSORSlaveDataLoader.__init__(self)
79336 logging.info('Creating VadalLoader')
80337
81338 """
82339 DEFINE SQL queries
83340 """
84341 self._query_names_['query_test'] = 'select count(*) from revert_20110115'
85 - self._query_names_['query_vandal_count'] = 'select revision_id, username, user_id, sum(is_vandalism) from reverted_20110115 group by 1,2,3'
86 - self._query_names_['query_total_reverts'] = 'select revision_id, username, user_id, sum(is_vandalism) from reverted_20110115'
 342+ self._query_names_['query_vandal_count'] = 'select revision_id, sum(is_vandalism) from revert_20110115 group by 1'
 343+ self._query_names_['query_total_reverts'] = ''
87344
88345 """
89346 ASSIGN query
90347 """
91348 try:
92 - _sql_ = self._query_names_[query_key]
 349+ self._sql_ = self._query_names_[query_key]
93350 except KeyError:
94351 logging.debug('Query does not exist\n')
95352
@@ -98,12 +355,11 @@
99356 """
100357 def run_query(self):
101358
102 - logging.info('Running VandalLoader')
103 -
104359 self.init_db()
105360
106361 try:
107 - self._cur_.execute(self._query_names_['query_test'])
 362+ logging.info('Running VandalLoader')
 363+ self._cur_.execute(self._sql_)
108364
109365 """ GET THE COLUMN NAMES FROM THE QUERY RESULTS """
110366 self._col_names_ = list()
@@ -116,9 +372,10 @@
117373
118374 except Exception as inst:
119375
120 - logging.debug(str(type(inst))) # the exception instance
121 - logging.debug(str(inst.args)) # arguments stored in .args
122 - logging.debug(inst.__str__()) # __str__ allows args to printed directly
 376+ logging.error('Could not execute: %s\n' % self._sql_)
 377+ logging.error(str(type(inst))) # the exception instance
 378+ logging.error(str(inst.args)) # arguments stored in .args
 379+ logging.error(inst.__str__()) # __str__ allows args to printed directly
123380
124381 # self._db_.rollback()
125382

Status & tagging log