r76401 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r76400‎ | r76401 | r76402 >
Date:18:01, 9 November 2010
Author:diederik
Status:deferred
Tags:
Comment:
Fixed nasty bug where in rare cases the process builder would span unlimited child processes.
Modified paths:
  • /trunk/tools/editor_trends/map_wiki_editors.py (modified) (history)
  • /trunk/tools/editor_trends/run.py (modified) (history)
  • /trunk/tools/editor_trends/settings.py (modified) (history)
  • /trunk/tools/editor_trends/utils/models.py (modified) (history)
  • /trunk/tools/editor_trends/utils/process_constructor.py (modified) (history)
  • /trunk/tools/editor_trends/utils/sort.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/map_wiki_editors.py
@@ -120,7 +120,7 @@
121121 output.put(vars)
122122 vars['date'] = utils.convert_timestamp_to_date(vars['date'])
123123 elif destination == 'file':
124 - data =[]
 124+ data = []
125125 for head in headers:
126126 data.append(vars[head])
127127 utils.write_list_to_csv(data, output)
@@ -207,10 +207,10 @@
208208 if pbar:
209209 print file, xml_queue.qsize()
210210 #utils.update_progressbar(pbar, xml_queue)
211 -
 211+
212212 if debug:
213213 break
214 -
 214+
215215 except Empty:
216216 break
217217
@@ -234,7 +234,7 @@
235235 collection = mongo['editors']
236236 mongo.collection.ensure_index('editor')
237237 editor_cache = cache.EditorCache(collection)
238 -
 238+
239239 while True:
240240 try:
241241 edit = data_queue.get(block=False)
@@ -305,7 +305,7 @@
306306 ids = load_bot_ids()
307307 input = os.path.join(location, language, project)
308308 output = os.path.join(input, 'txt')
309 -
 309+
310310 kwargs = {'bots': ids,
311311 'dbname': language + project,
312312 'language': language,
@@ -317,22 +317,15 @@
318318 'input': input,
319319 'output': output,
320320 }
321 -# chunks = {}
322321 source = os.path.join(location, language, project)
323322 files = utils.retrieve_file_list(source, 'xml')
324 -# parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
325 -# a = 0
326 -
 323+
327324 if not os.path.exists(input):
328325 utils.create_directory(input)
329326 if not os.path.exists(output):
330327 utils.create_directory(output)
331 -
332 -# for x in xrange(settings.NUMBER_OF_PROCESSES):
333 -# b = a + parts
334 -# chunks[x] = files[a:b]
335 -# a = (x + 1) * parts
336 - chunks = utils.split_list(files ,settings.NUMBER_OF_PROCESSES)
 328+
 329+ chunks = utils.split_list(files , settings.NUMBER_OF_PROCESSES)
337330 pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, **kwargs)
338331
339332
Index: trunk/tools/editor_trends/settings.py
@@ -27,13 +27,14 @@
2828 import os
2929 import sys
3030 import platform
 31+#try:
 32+# from pywin import win32file
 33+# '''increase the maximum number of open files on Windows to 1024'''
 34+# win32file._setmaxstdio(1024)
 35+#except ImportError:
 36+# pass
 37+
3138 try:
32 - from pywin import win32file
33 - '''increase the maximum number of open files on Windows to 1024'''
34 - win32file._setmaxstdio(1024)
35 -except ImportError:
36 - pass
37 -try:
3839 import resource
3940 except ImportError:
4041 pass
@@ -47,6 +48,8 @@
4849 if op() != ('', '', '') and op() != ('', ('', '', ''), ''):
4950 OS = ops[op]
5051
 52+ARCH = platform.machine()
 53+
5154 WORKING_DIRECTORY = os.getcwd()
5255 IGNORE_DIRS = ['wikistats', 'zips']
5356 ROOT = '/' if OS != 'Windows' else 'c:\\'
@@ -107,7 +110,12 @@
108111 # ==64Mb, see http://hadoop.apache.org/common/docs/r0.20.0/hdfs_design.html#Large+Data+Sets for reason
109112 MAX_XML_FILE_SIZE = 67108864
110113
111 -MAX_FILES_OPEN = win32file._getmaxstdio() if OS == 'Windows' else resource.getrlimit('RLIMIT_NOFILE')
 114+if OS == 'Windows' and ARCH == 'i386':
 115+ MAX_FILES_OPEN = win32file._getmaxstdio()
 116+elif OS != 'Windows':
 117+ MAX_FILES_OPEN = resource.getrlimit(resource.RLIMIT_NOFILE)
 118+else:
 119+ MAX_FILES_OPEN = 500
112120
113121 ENCODING = 'utf-8'
114122
Index: trunk/tools/editor_trends/run.py
@@ -31,7 +31,7 @@
3232 from utils import sort
3333 input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
3434 output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
35 -#sort.mergesort_launcher(input, output)
36 -
37 -#sort.debug_mergesort(input,output)
38 -sort.merge_sorted_files_launcher(output, output)
\ No newline at end of file
 35+dbname = 'enwiki'
 36+#sort.debug_mergesort_feeder(input, output)
 37+sort.mergesort_launcher(input, output)
 38+#sort.mergesort_external_launcher(dbname, output, output)
\ No newline at end of file
Index: trunk/tools/editor_trends/utils/models.py
@@ -13,6 +13,9 @@
1414 '''
1515
1616 __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 17+__author__email = 'dvanliere at gmail dot com'
 18+__date__ = '2010-11-09'
 19+__version__ = '0.1'
1720
1821 import multiprocessing
1922
@@ -30,11 +33,10 @@
3134 def run(self):
3235 proc_name = self.name
3336 kwargs = {}
34 - IGNORE = [self.input_queue, self.result_queue, self.target]
 37+ IGNORE = ['input_queue', 'result_queue', 'target']
3538 for kw in self.__dict__:
3639 if kw not in IGNORE and not kw.startswith('_'):
3740 kwargs[kw] = getattr(self, kw)
38 -
3941 self.target(self.input_queue, self.result_queue, **kwargs)
4042
4143
@@ -50,10 +52,9 @@
5153
5254 def run(self):
5355 proc_name = self.name
54 - kwargs= {}
55 - IGNORE = [self.result_queue, self.target]
 56+ kwargs = {}
 57+ IGNORE = ['result_queue', 'target']
5658 for kw in self.__dict__:
5759 if kw not in IGNORE and not kw.startswith('_'):
5860 kwargs[kw] = getattr(self, kw)
59 -
6061 self.target(self.result_queue, **kwargs)
Index: trunk/tools/editor_trends/utils/process_constructor.py
@@ -80,7 +80,7 @@
8181 **kwargs) for i in xrange(nr_input_processors)]
8282
8383 for input_process in input_processes:
84 - input_process.start()
 84+ input_process.run()
8585 pids = [p.pid for p in input_processes]
8686 kwargs['pids'] = pids
8787
Index: trunk/tools/editor_trends/utils/sort.py
@@ -99,7 +99,7 @@
100100 fh.close()
101101
102102
103 -def store_editors(input, dbname):
 103+def store_editors(input, filename, dbname):
104104 fh = utils.create_txt_filehandle(input, filename, 'r', settings.ENCODING)
105105 mongo = db.init_mongo_db(dbname)
106106 collection = mongo['editors']
@@ -116,34 +116,29 @@
117117 fh.close()
118118
119119
120 -def merge_sorted_files_launcher(dbname, input, output):
 120+def mergesort_external_launcher(dbname, input, output):
121121 files = utils.retrieve_file_list(input, 'txt', mask='')
122122 x = 0
 123+ maxval = 99999
123124 while maxval >= settings.MAX_FILES_OPEN:
124125 x += 1.0
125126 maxval = round(len(files) / x)
126127 chunks = utils.split_list(files, int(x))
127128 '''1st iteration external mergesort'''
128129 for chunk in chunks:
129 - #filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in chunks[chunk]]
130 - #filename = merge_sorted_files(output, filehandles, chunk)
131 - #filehandles = [fh.close() for fh in filehandles]
132 - pass
 130+ filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in chunks[chunk]]
 131+ filename = merge_sorted_files(output, filehandles, chunk)
 132+ filehandles = [fh.close() for fh in filehandles]
133133 '''2nd iteration external mergesort, if necessary'''
134134 if len(chunks) > 1:
135135 files = utils.retrieve_file_list(output, 'txt', mask='[merged]')
136136 filehandles = [utils.create_txt_filehandle(output, file, 'r', settings.ENCODING) for file in files]
137 - filename = merge_sorted_files(output, filehandles, chunk)
 137+ filename = merge_sorted_files(output, filehandles, 'final')
138138 filehandles = [fh.close() for fh in filehandles]
139139 store_editors(output, filename, dbname)
140140
141141
142 -def debug_mergesort(input, output):
143 - files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)')
144 - for file in files:
145 - pass
146 -
147 -def mergesort_feeder(input_queue, **kwargs):
 142+def mergesort_feeder(input_queue, result_queue, **kwargs):
148143 input = kwargs.get('input', None)
149144 output = kwargs.get('output', None)
150145 while True:
@@ -160,7 +155,6 @@
161156 break
162157
163158
164 -
165159 def mergesort_launcher(input, output):
166160 kwargs = {'pbar': True,
167161 'nr_input_processors': settings.NUMBER_OF_PROCESSES,
@@ -168,23 +162,23 @@
169163 'input': input,
170164 'output': output,
171165 }
172 - chunks = {}
173 -
174166 files = utils.retrieve_file_list(input, 'txt')
175 - parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
176 - a = 0
177 -
178 - for x in xrange(settings.NUMBER_OF_PROCESSES):
179 - b = a + parts
180 - chunks[x] = files[a:b]
181 - a = (x + 1) * parts
182 -
 167+ chunks = utils.split_list(files, settings.NUMBER_OF_PROCESSES)
183168 pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs)
184 - merge_sorted_files(input, output)
185169
 170+def debug_mergesort_feeder(input, output):
 171+ kwargs = {
 172+ 'input': input,
 173+ 'output': output,
 174+ }
 175+ files = utils.retrieve_file_list(input, 'txt')
 176+ chunks = utils.split_list(files, settings.NUMBER_OF_PROCESSES)
 177+ q = pc.load_queue(chunks[0])
 178+ mergesort_feeder(q, False, **kwargs)
 179+
186180 if __name__ == '__main__':
187181 input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
188182 output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
 183+ dbname = 'enwiki'
189184 mergesort_launcher(input, output)
190 - #debug_mergesort(input, output)
191 - #debug_merge_sorted_files(input, output)
 185+ mergesort_external_launcher(dbname, output, output)

Status & tagging log