r76355 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r76354‎ | r76355 | r76356 >
Date:23:52, 8 November 2010
Author:diederik
Status:deferred
Tags:
Comment:
Parallelized mergesort by distributing tasks over multiple cores.
Modified paths:
  • /trunk/tools/editor_trends/map_wiki_editors.py (modified) (history)
  • /trunk/tools/editor_trends/utils/sort.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/map_wiki_editors.py
@@ -202,7 +202,7 @@
203203 print 'Still sleeping, queue is %s items long' % output.qsize()
204204
205205 else:
206 - output.close()
 206+ fh.close()
207207
208208 if pbar:
209209 print file, xml_queue.qsize()
Index: trunk/tools/editor_trends/utils/sort.py
@@ -25,9 +25,12 @@
2626 '''
2727
2828 import heapq
 29+from multiprocessing import Queue
 30+from Queue import Empty
2931
3032 import settings
3133 import utils
 34+import process_constructor as pc
3235 from database import cache
3336
3437 def quick_sort(obs):
@@ -110,7 +113,7 @@
111114 fh.close()
112115
113116
114 -def debug_merge_sorted_files(input, output):
 117+def merge_sorted_files(input, output):
115118 files = utils.retrieve_file_list(input, 'txt', mask='')
116119 filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in files]
117120 lines = merge_sorted_files(output, filehandles)
@@ -121,17 +124,50 @@
122125 def debug_mergesort(input, output):
123126 files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)')
124127 for file in files:
125 - fh = utils.create_txt_filehandle(input, file, 'r', settings.ENCODING)
126 - data = fh.readlines()
127 - fh.close()
128 - data = [d.replace('\n', '') for d in data]
129 - data = [d.split('\t') for d in data]
130 - sorted_data = mergesort(data)
131 - write_sorted_file(sorted_data, file, output)
 128+ pass
 129+
 130+def mergesort_feeder(input_queue, **kwargs):
 131+ input = kwargs.get('input', None)
 132+ output = kwargs.get('output', None)
 133+ while True:
 134+ try:
 135+ file = input_queue.get(block=False)
 136+ fh = utils.create_txt_filehandle(input, file, 'r', settings.ENCODING)
 137+ data = fh.readlines()
 138+ fh.close()
 139+ data = [d.replace('\n', '') for d in data]
 140+ data = [d.split('\t') for d in data]
 141+ sorted_data = mergesort(data)
 142+ write_sorted_file(sorted_data, file, output)
 143+ except Empty:
 144+ break
 145+
132146
133147
 148+def mergesort_launcher(input, output):
 149+ kwargs = {'pbar': True,
 150+ 'nr_input_processors': settings.NUMBER_OF_PROCESSES,
 151+ 'nr_output_processors': settings.NUMBER_OF_PROCESSES,
 152+ 'input': input,
 153+ 'output': output,
 154+ }
 155+ chunks = {}
 156+
 157+ files = utils.retrieve_file_list(input, 'txt')
 158+ parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0))
 159+ a = 0
 160+
 161+ for x in xrange(settings.NUMBER_OF_PROCESSES):
 162+ b = a + parts
 163+ chunks[x] = files[a:b]
 164+ a = (x + 1) * parts
 165+
 166+ pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs)
 167+ merge_sorted_files(input, output)
 168+
134169 if __name__ == '__main__':
135 - input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki')
 170+ input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
136171 output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
137 - debug_mergesort(input, output)
 172+ mergesort_launcher(input, output)
 173+ #debug_mergesort(input, output)
138174 #debug_merge_sorted_files(input, output)

Status & tagging log