r85650 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r85649‎ | r85650 | r85651 >
Date:22:46, 7 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Handful of fixes, including alternative storage place for dump files, configuring database through command prompt and fixing a too many files open problem.
Modified paths:
  • /trunk/tools/editor_trends/classes/exceptions.py (modified) (history)
  • /trunk/tools/editor_trends/classes/runtime_settings.py (modified) (history)
  • /trunk/tools/editor_trends/classes/settings.py (modified) (history)
  • /trunk/tools/editor_trends/classes/storage.py (modified) (history)
  • /trunk/tools/editor_trends/etl/enricher.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/utils/text_utils.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -199,12 +199,12 @@
200200 config = ConfigParser.RawConfigParser()
201201 project = None
202202 language = None
 203+ db = None
203204 valid_storage = ['mongo', 'cassandra']
204205 working_directory = raw_input('Please indicate where you installed Wikilytics.\nCurrent location is %s\nPress Enter to accept default.\n' % os.getcwd())
205206 input_location = raw_input('Please indicate where the Wikipedia dump files are or will be located.\nDefault is: %s\nPress Enter to accept default.\n' % rts.input_location)
206207 output_location = raw_input('Please indicate where to store all Wikilytics project files.\nDefault is: %s\nPress Enter to accept default.\n' % rts.output_location)
207208
208 -
209209 while db not in valid_storage:
210210 db = raw_input('Please indicate what database you are using for storage. \nDefault is: Mongo\n')
211211 db = 'mongo' if len(db) == 0 else db.lower()
@@ -236,7 +236,7 @@
237237 config.set('wiki', 'project', project)
238238 config.set('wiki', 'language', language)
239239 config.add_section('storage')
240 - config.set('db', 'type', db)
 240+ config.set('storage', 'db', db)
241241
242242 fh = file_utils.create_binary_filehandle(working_directory, 'wiki.cfg', 'wb')
243243 config.write(fh)
Index: trunk/tools/editor_trends/etl/enricher.py
@@ -89,38 +89,22 @@
9090
9191
9292 class Buffer:
93 - def __init__(self, storage, process_id, rts=None, filehandles=None, locks=None):
94 - assert storage == 'cassandra' or storage == 'mongo' or storage == 'csv', \
95 - 'Valid storage options are cassandra and mongo.'
 93+ def __init__(self, storage, process_id, rts=None, locks=None):
9694 self.storage = storage
9795 self.revisions = {}
9896 self.comments = {}
9997 self.titles = {}
10098 self.process_id = process_id
101 - self.keyspace_name = 'enwiki'
10299 self.keys = ['revision_id', 'article_id', 'id', 'username', 'namespace',
103100 'title', 'timestamp', 'hash', 'revert', 'bot', 'cur_size',
104101 'delta']
105 - self.setup_storage()
106102 self.stats = Statistics(self.process_id)
107 - if storage == 'csv' and locks != None:
 103+ if locks != None:
108104 self.rts = rts
109105 self.lock1 = locks[0] #lock for generic data
110106 self.lock2 = locks[1] #lock for comment data
111107 self.lock3 = locks[2] #lock for article titles
112 - self.filehandles = filehandles[0]
113 - self.fh_titles = filehandles[1]
114 - self.fh_comments = filehandles[2]
115108
116 - def setup_storage(self):
117 - if self.storage == 'cassandra':
118 - self.db = pycassa.connect(self.keyspace_name)
119 - self.collection = pycassa.ColumnFamily(self.db, 'revisions')
120 -
121 - elif self.storage == 'mongo':
122 - self.db = db.init_mongo_db(self.keyspace_name)
123 - self.collection = self.db['kaggle']
124 -
125109 def get_hash(self, id):
126110 '''
127111 A very simple hash function based on modulo. The except clause has been
@@ -166,61 +150,60 @@
167151 self.store()
168152 self.clear()
169153
170 -
171154 def clear(self):
172155 self.revisions = {}
173156 self.comments = {}
174157 self.titles = {}
175158
176159 def store(self):
177 - if self.storage == 'cassandra':
178 - self.collection.batch_insert(self.revisions)
179 - elif self.storage == 'mongo':
180 - print 'insert into mongo'
181 - else:
182 - rows = []
183 - for id, revision in self.revisions.iteritems():
184 - values = []
185 - for key in self.keys:
186 - values.append(revision[key].decode('utf-8'))
187 - #values.insert(0, id)
188 - rows.append(values)
189 - self.write_output(rows)
 160+ rows = []
 161+ for id, revision in self.revisions.iteritems():
 162+ values = []
 163+ for key in self.keys:
 164+ values.append(revision[key].decode('utf-8'))
 165+ rows.append(values)
 166+ self.write_output(rows)
190167
191 - if self.comments:
192 - self.lock2.acquire()
193 - try:
194 - rows = []
195 - for revision_id, comment in self.comments.iteritems():
196 - #comment = comment.decode('utf-8')
197 - #row = '\t'.join([revision_id, comment]) + '\n'
198 - rows.append([revision_id, comment])
199 - file_utils.write_list_to_csv(row, self.fh_comments)
200 - except Exception, error:
201 - print error
202 - finally:
203 - self.lock2.release()
 168+ if self.comments:
 169+ self.lock2.acquire()
 170+ try:
 171+ fh = file_utils.create_txt_filehandle(self.rts.txt,
 172+ 'comments.csv', 'a', 'utf-8')
 173+ rows = []
 174+ for revision_id, comment in self.comments.iteritems():
 175+ #comment = comment.decode('utf-8')
 176+ #row = '\t'.join([revision_id, comment]) + '\n'
 177+ rows.append([revision_id, comment])
 178+ file_utils.write_list_to_csv(row, fh)
 179+ except Exception, error:
 180+ print 'Encountered the following error while writing data to %s: %s' % (fh, error)
 181+ finally:
 182+ fh.close()
 183+ self.lock2.release()
204184
205 - elif self.titles:
206 - self.lock3.acquire()
207 - try:
208 - rows = []
209 - for article_id, dict in self.titles.iteritems():
210 - keys = dict.keys()
211 - value = []
212 - for key in keys:
213 - value.append(key)
214 - value.append(dict[key])
215 - value.insert(0, article_id)
216 - value.insert(0, 'id')
217 - #title = title.encode('ascii')
218 - #row = '\t'.join([article_id, title]) + '\n'
219 - rows.append(value)
220 - file_utils.write_list_to_csv(rows, self.fh_titles, newline=False)
221 - except Exception, error:
222 - print error
223 - finally:
224 - self.lock3.release()
 185+ elif self.titles:
 186+ self.lock3.acquire()
 187+ try:
 188+ fh = file_utils.create_txt_filehandle(self.rts.txt,
 189+ 'titles.csv', 'a', 'utf-8')
 190+ rows = []
 191+ for article_id, dict in self.titles.iteritems():
 192+ keys = dict.keys()
 193+ value = []
 194+ for key in keys:
 195+ value.append(key)
 196+ value.append(dict[key])
 197+ value.insert(0, article_id)
 198+ value.insert(0, 'id')
 199+ #title = title.encode('ascii')
 200+ #row = '\t'.join([article_id, title]) + '\n'
 201+ rows.append(value)
 202+ file_utils.write_list_to_csv(rows, fh, newline=False)
 203+ except Exception, error:
 204+ print 'Encountered the following error while writing data to %s: %s' % (fh, error)
 205+ finally:
 206+ fh.close()
 207+ self.lock3.release()
225208
226209
227210 def write_output(self, data):
@@ -232,12 +215,14 @@
233216 for i, revision in enumerate(self.revisions[editor]):
234217 if i == 0:
235218 id = self.get_hash(revision[2])
236 - fh = self.filehandles[id]
 219+ fh = file_utils.create_txt_filehandle(self.rts.txt,
 220+ '%s.csv' % id, 'a', 'utf-8')
237221 try:
238222 file_utils.write_list_to_csv(revision, fh, lock=self.lock1)
239223 except Exception, error:
240 - print 'Encountered the following error while writing data to %s: %s' % (error, fh)
 224+ print 'Encountered the following error while writing data to %s: %s' % (fh, error)
241225 finally:
 226+ fh.close()
242227 self.lock1.release()
243228
244229
@@ -663,24 +648,11 @@
664649
665650 def stream_raw_xml(input_queue, storage, process_id, function, dataset, locks, rts):
666651 bots = bot_detector.retrieve_bots(rts.language.code)
667 - path = os.path.join(rts.output_location, 'txt')
668652
669 - filehandles = [file_utils.create_txt_filehandle(path, '%s.csv' % fh, 'a',
670 - 'utf-8') for fh in xrange(rts.max_filehandles)]
671 -
672 - title_file = os.path.join(path, 'titles.csv')
673 - comment_file = os.path.join(path, 'comments.csv')
674 - #file_utils.delete_file(path, title_file, directory=False)
675 - #file_utils.delete_file(path, comment_file, directory=False)
676 - fh_titles = codecs.open(title_file, 'a', 'utf-8')
677 - fh_comments = codecs.open(comment_file, 'a', 'utf-8')
678 - handles = [filehandles, fh_titles, fh_comments]
679 - wikilytics = False
680 -
681653 t0 = datetime.datetime.now()
682654 i = 0
683655 if dataset == 'training':
684 - cache = Buffer(storage, process_id, rts, handles, locks)
 656+ cache = Buffer(storage, process_id, rts, locks)
685657 else:
686658 counts = {}
687659
@@ -732,10 +704,7 @@
733705 preparations are made including setting up namespaces and cleaning up old
734706 files.
735707 '''
736 - if storage == 'cassandra':
737 - keyspace_name = 'enwiki'
738 - cassandra.install_schema(keyspace_name, drop_first=True)
739 - elif storage == 'csv':
 708+ if storage == 'csv':
740709 res = file_utils.delete_file(rts.txt, None, directory=True)
741710 if res:
742711 res = file_utils.create_directory(rts.txt)
@@ -743,7 +712,6 @@
744713
745714 def multiprocessor_launcher(function, dataset, storage, locks, rts):
746715 input_queue = JoinableQueue()
747 -
748716 files = file_utils.retrieve_file_list(rts.input_location)
749717 if len(files) > cpu_count():
750718 processors = cpu_count() - 1
@@ -767,9 +735,6 @@
768736 extracter.start()
769737
770738 input_queue.join()
771 - #filehandles = [fh.close() for fh in filehandles]
772 - #fh_titles.close()
773 - #fh_comments.close()
774739
775740
776741 def launcher_training():
Index: trunk/tools/editor_trends/classes/settings.py
@@ -12,7 +12,7 @@
1313 http://www.fsf.org/licenses/gpl.html
1414 '''
1515
16 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)'])
1717 __email__ = 'dvanliere at gmail dot com'
1818 __date__ = '2010-10-21'
1919 __version__ = '0.1'
@@ -53,61 +53,65 @@
5454 self.minimum_python_version = (2, 6)
5555 self.detect_python_version()
5656
 57+ self.platform = self.determine_platform()
 58+ self.root = os.path.expanduser('~') \
 59+ if self.platform != 'Windows' else 'c:\\'
 60+ self.working_directory = self.determine_working_directory()
 61+
 62+ result = self.load_configuration()
 63+ if not result:
 64+ self.input_location = os.path.join(self.root, 'wikimedia')
 65+ self.output_location = os.path.join(self.root, 'wikimedia')
 66+
5767 #Date format as used by Erik Zachte
5868 self.date_format = '%Y-%m-%d'
5969 # Timestamp format as generated by the MediaWiki dumps
6070 self.timestamp_format = '%Y-%m-%dT%H:%M:%SZ'
6171 self.timestamp_server = '%a, %d %b %Y %H:%M:%S %Z'
 72+
6273 #67108864 # ==64Mb, see http://hadoop.apache.org/common/docs/r0.20.0/hdfs_design.html#Large+Data+Setsfor reason
6374 self.max_xmlfile_size = 4096 * 1024
6475
6576 #Change this to match your computers configuration (RAM / CPU)
 77+ # I want to get rid off these two variables.
6678 self.number_of_processes = cpu_count()
 79+ self.windows_register = {'7z.exe': 'Software\\7-Zip'}
6780
6881 self.wp_dump_location = 'http://dumps.wikimedia.org'
69 - self.ascii_extensions = ['txt', 'csv', 'xml', 'sql', 'json']
70 - self.windows_register = {'7z.exe': 'Software\\7-Zip', }
71 - #Extensions of ascii files, this is used to determine the filemode to use
72 - self.platform = self.determine_platform()
7382
 83+
7484 self.architecture = platform.machine()
75 - self.working_directory = self.determine_working_directory()
 85+ self.tab_width = 4 if self.platform == 'Windows' else 8
 86+
 87+
7688 self.update_python_path()
7789
78 - self.root = os.path.expanduser('~') if self.platform != 'Windows' else 'c:\\'
7990 self.max_filehandles = self.determine_max_filehandles_open()
80 - self.tab_width = 4 if self.platform == 'Windows' else 8
8191
 92+ # This is the place where log files are stored for debugging purposes
 93+ self.log_location = os.path.join(self.working_directory, 'logs')
 94+ self.csv_location = os.path.join(self.working_directory, 'data', 'csv')
 95+ self.dataset_location = os.path.join(self.working_directory, 'datasets')
 96+ self.binary_location = os.path.join(self.working_directory, 'data', 'objects')
8297
83 - result = self.load_configuration()
84 - if not result:
85 - self.input_location = os.path.join(self.root, 'wikimedia')
86 - self.output_location = os.path.join(self.root, 'wikimedia')
 98+ def detect_config(self):
 99+ if not os.path.exists(os.path.join(self.working_directory, 'wiki.cfg')):
 100+ raise exceptions.GenericMessage('not_configured')
 101+ sys.exit(-1)
87102
88 - # Default Input file
89 - self.input_filename = os.path.join(self.input_location, 'en',
90 - 'wiki',
91 - 'enwiki-20100916-stub-meta-history.xml')
92 - # This is the place where error messages are stored for debugging purposes
93 - self.log_location = os.path.join(self.working_directory,
94 - 'logs')
95 - self.csv_location = os.path.join(self.working_directory,
96 - 'data', 'csv')
97 - self.dataset_location = os.path.join(self.working_directory, 'datasets')
98 - self.binary_location = os.path.join(self.working_directory,
99 - 'data', 'objects')
100 -
101103 def load_configuration(self):
102 - if os.path.exists(os.path.join(self.working_directory, 'wiki.cfg')):
103 - config = ConfigParser.RawConfigParser()
 104+ config = ConfigParser.RawConfigParser()
 105+ try:
104106 config.read(os.path.join(self.working_directory, 'wiki.cfg'))
105107 self.working_directory = config.get('file_locations', 'working_directory')
106108 self.input_location = config.get('file_locations', 'input_location')
107109 self.output_location = config.get('file_locations', 'output_location')
108110 self.default_project = config.get('wiki', 'project')
109111 self.default_language = config.get('wiki', 'language')
 112+ self.storage = config.get('storage', 'db')
110113 return True
111 - else:
 114+ except Exception, error:
 115+ #raise exceptions.GenericMessage('corrupted_config')
112116 return False
113117
114118 def determine_working_directory(self):
Index: trunk/tools/editor_trends/classes/exceptions.py
@@ -97,7 +97,6 @@
9898 subversion or contact Diederik van Liere.''' % self.func.func_name
9999
100100
101 -
102101 class GenericMessage(Error):
103102 def __init__(self, caller):
104103 self.caller = caller
@@ -108,5 +107,8 @@
109108 elif self.caller == 'corrupted_install':
110109 return 'I could not determine the location of manage.py, \
111110 please reinstall Wikilytics.'
 111+ elif self.caller == 'corrupted_config':
 112+ return 'Please delete wiki.cfg and run python manage.py config'
 113+ elif self.caller == 'not_configured':
 114+ return 'Please run first python manage.py config'
112115
113 -
Index: trunk/tools/editor_trends/classes/runtime_settings.py
@@ -146,20 +146,16 @@
147147 requested_charts.append(chart.func_name)
148148 return requested_charts
149149
150 -
151150 def set_input_location(self):
152151 files = os.listdir(self.input_location)
153152 extensions = ['gz', '7z', 'bz2']
154 - valid = False
155 - for ext in extensions:
156 - if ext in files:
157 - valid = True
158 - if valid:
159 - #ABS path case: check if files are stored here
160 - return input_location
161 - else:
162 - return os.path.join(self.input_location, self.language.code,
163 - self.project.name)
 153+ for file in files:
 154+ basename, ext = os.path.splitext(file)
 155+ if ext in extension:
 156+ #ABS path case: check if files are stored here
 157+ return self.input_location
 158+ return os.path.join(self.input_location, self.language.code,
 159+ self.project.name)
164160
165161 def set_output_location(self):
166162 '''
Index: trunk/tools/editor_trends/classes/storage.py
@@ -193,9 +193,8 @@
194194
195195 def start_server(self, port, path):
196196 default_port = 27017
197 - port = default_port + port
198197 if settings.platform == 'Windows':
199 - p = subprocess.Popen([path, '--port %s', port, '--dbpath',
 198+ p = subprocess.Popen([path, '--port %s', self.port, '--dbpath',
200199 'c:\data\db', '--logpath', 'c:\mongodb\logs'])
201200 elif settings.platform == 'Linux':
202201 subprocess.Popen([path, '--port %s' % port])
@@ -207,11 +206,15 @@
208207
209208 class Cassandra(AbstractDatabase):
210209 @classmethod
 210+ def __init__(self):
 211+ self.port = 9160
 212+ self.host = '127.0.0.1'
 213+
211214 def is_registrar_for(cls, storage):
212215 return storage == 'cassandra'
213216
214217 def install_schema(self, drop_first=False):
215 - sm = pycassa.system_manager.SystemManager('127.0.0.1:9160')
 218+ sm = pycassa.system_manager.SystemManager('%s:%s' % (sef.host, self.port))
216219 if drop_first:
217220 sm.drop_keyspace(keyspace_name)
218221
@@ -225,8 +228,32 @@
226229 sm.create_index(self.dbname, self.collection, 'username', pycassa.system_manager.UTF8_TYPE)
227230 sm.create_index(self.dbname, self.collection, 'user_id', pycassa.system_manager.LONG_TYPE)
228231
 232+ def connect(self):
 233+ self.db = pycassa.connect(self.dbname)
 234+ self.collection = pycassa.ColumnFamily(self.dbname, self.collection)
229235
 236+ def drop_collection(self):
 237+ return
230238
 239+ def add_index(self, key):
 240+ return
 241+
 242+ def insert(self, data):
 243+ return
 244+
 245+ def update(self, key, data):
 246+ return
 247+
 248+ def find(self, key, qualifier=None):
 249+ return
 250+
 251+ def save(self, data):
 252+ return
 253+
 254+ def count(self):
 255+ return
 256+
 257+
231258 def Database(storage, dbname, collection):
232259 for cls in AbstractDatabase.__subclasses__():
233260 if cls.is_registrar_for(storage):
Index: trunk/tools/editor_trends/utils/text_utils.py
@@ -43,8 +43,6 @@
4444 return d
4545
4646
47 -
48 -
4947 def invert_dict(dictionary):
5048 '''
5149 @dictionary is a simple dictionary containing simple values, ie. no lists,
@@ -54,3 +52,39 @@
5553 return dict([[v, k] for k, v in dictionary.items()])
5654
5755
 56+def get_max_width(table, index):
 57+ '''
 58+ Get the maximum width of the given column index
 59+ Gracefully borrowed from: http://ginstrom.com/scribbles/2007/09/04/pretty-printing-a-table-in-python/
 60+ '''
 61+ return max([len(row[index]) for row in table])
 62+
 63+
 64+
 65+
 66+def pprint_table(table):
 67+ '''
 68+ Prints out a table of data, padded for alignment
 69+ @param out: Output stream (file-like object)
 70+ @param table: The table to print. A list of lists.
 71+ Each row must have the same number of columns.
 72+ Gracefully borrowed from: http://ginstrom.com/scribbles/2007/09/04/pretty-printing-a-table-in-python/
 73+ '''
 74+
 75+ col_paddings = []
 76+ text = ''
 77+ for i in range(len(table[0])):
 78+ col_paddings.append(get_max_width(table, i))
 79+
 80+ for row in table:
 81+ # left col
 82+ #print >> out, row[0].ljust(col_paddings[0] + 1),
 83+ # rest of the cols
 84+ for i in xrange(0, len(row)):
 85+ col = row[i].rjust(col_paddings[i] + 2)
 86+ text = text + col
 87+ if i == len(row) - 1:
 88+ text = text + '\n'
 89+ #print >> out, col,
 90+ #print >> out
 91+ return text