Index: trunk/tools/editor_trends/map_wiki_editors.py |
— | — | @@ -120,7 +120,7 @@ |
121 | 121 | output.put(vars) |
122 | 122 | vars['date'] = utils.convert_timestamp_to_date(vars['date']) |
123 | 123 | elif destination == 'file': |
124 | | - data =[] |
| 124 | + data = [] |
125 | 125 | for head in headers: |
126 | 126 | data.append(vars[head]) |
127 | 127 | utils.write_list_to_csv(data, output) |
— | — | @@ -207,10 +207,10 @@ |
208 | 208 | if pbar: |
209 | 209 | print file, xml_queue.qsize() |
210 | 210 | #utils.update_progressbar(pbar, xml_queue) |
211 | | - |
| 211 | + |
212 | 212 | if debug: |
213 | 213 | break |
214 | | - |
| 214 | + |
215 | 215 | except Empty: |
216 | 216 | break |
217 | 217 | |
— | — | @@ -234,7 +234,7 @@ |
235 | 235 | collection = mongo['editors'] |
236 | 236 | mongo.collection.ensure_index('editor') |
237 | 237 | editor_cache = cache.EditorCache(collection) |
238 | | - |
| 238 | + |
239 | 239 | while True: |
240 | 240 | try: |
241 | 241 | edit = data_queue.get(block=False) |
— | — | @@ -305,7 +305,7 @@ |
306 | 306 | ids = load_bot_ids() |
307 | 307 | input = os.path.join(location, language, project) |
308 | 308 | output = os.path.join(input, 'txt') |
309 | | - |
| 309 | + |
310 | 310 | kwargs = {'bots': ids, |
311 | 311 | 'dbname': language + project, |
312 | 312 | 'language': language, |
— | — | @@ -317,22 +317,15 @@ |
318 | 318 | 'input': input, |
319 | 319 | 'output': output, |
320 | 320 | } |
321 | | -# chunks = {} |
322 | 321 | source = os.path.join(location, language, project) |
323 | 322 | files = utils.retrieve_file_list(source, 'xml') |
324 | | -# parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0)) |
325 | | -# a = 0 |
326 | | - |
| 323 | + |
327 | 324 | if not os.path.exists(input): |
328 | 325 | utils.create_directory(input) |
329 | 326 | if not os.path.exists(output): |
330 | 327 | utils.create_directory(output) |
331 | | - |
332 | | -# for x in xrange(settings.NUMBER_OF_PROCESSES): |
333 | | -# b = a + parts |
334 | | -# chunks[x] = files[a:b] |
335 | | -# a = (x + 1) * parts |
336 | | - chunks = utils.split_list(files ,settings.NUMBER_OF_PROCESSES) |
| 328 | + |
| 329 | + chunks = utils.split_list(files , settings.NUMBER_OF_PROCESSES) |
337 | 330 | pc.build_scaffolding(pc.load_queue, parse_editors, chunks, False, False, **kwargs) |
338 | 331 | |
339 | 332 | |
Index: trunk/tools/editor_trends/settings.py |
— | — | @@ -27,13 +27,14 @@ |
28 | 28 | import os |
29 | 29 | import sys |
30 | 30 | import platform |
| 31 | +#try: |
| 32 | +# from pywin import win32file |
| 33 | +# '''increase the maximum number of open files on Windows to 1024''' |
| 34 | +# win32file._setmaxstdio(1024) |
| 35 | +#except ImportError: |
| 36 | +# pass |
| 37 | + |
31 | 38 | try: |
32 | | - from pywin import win32file |
33 | | - '''increase the maximum number of open files on Windows to 1024''' |
34 | | - win32file._setmaxstdio(1024) |
35 | | -except ImportError: |
36 | | - pass |
37 | | -try: |
38 | 39 | import resource |
39 | 40 | except ImportError: |
40 | 41 | pass |
— | — | @@ -47,6 +48,8 @@ |
48 | 49 | if op() != ('', '', '') and op() != ('', ('', '', ''), ''): |
49 | 50 | OS = ops[op] |
50 | 51 | |
| 52 | +ARCH = platform.machine() |
| 53 | + |
51 | 54 | WORKING_DIRECTORY = os.getcwd() |
52 | 55 | IGNORE_DIRS = ['wikistats', 'zips'] |
53 | 56 | ROOT = '/' if OS != 'Windows' else 'c:\\' |
— | — | @@ -107,7 +110,12 @@ |
108 | 111 | # ==64Mb, see http://hadoop.apache.org/common/docs/r0.20.0/hdfs_design.html#Large+Data+Sets for reason |
109 | 112 | MAX_XML_FILE_SIZE = 67108864 |
110 | 113 | |
111 | | -MAX_FILES_OPEN = win32file._getmaxstdio() if OS == 'Windows' else resource.getrlimit('RLIMIT_NOFILE') |
| 114 | +if OS == 'Windows' and ARCH == 'i386': |
| 115 | + MAX_FILES_OPEN = win32file._getmaxstdio() |
| 116 | +elif OS != 'Windows': |
| 117 | + MAX_FILES_OPEN = resource.getrlimit(resource.RLIMIT_NOFILE) |
| 118 | +else: |
| 119 | + MAX_FILES_OPEN = 500 |
112 | 120 | |
113 | 121 | ENCODING = 'utf-8' |
114 | 122 | |
Index: trunk/tools/editor_trends/run.py |
— | — | @@ -31,7 +31,7 @@ |
32 | 32 | from utils import sort
|
33 | 33 | input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
|
34 | 34 | output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
|
35 | | -#sort.mergesort_launcher(input, output)
|
36 | | -
|
37 | | -#sort.debug_mergesort(input,output)
|
38 | | -sort.merge_sorted_files_launcher(output, output) |
\ No newline at end of file |
| 35 | +dbname = 'enwiki'
|
| 36 | +#sort.debug_mergesort_feeder(input, output)
|
| 37 | +sort.mergesort_launcher(input, output)
|
| 38 | +#sort.mergesort_external_launcher(dbname, output, output) |
\ No newline at end of file |
Index: trunk/tools/editor_trends/utils/models.py |
— | — | @@ -13,6 +13,9 @@ |
14 | 14 | ''' |
15 | 15 | |
16 | 16 | __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
| 17 | +__author__email = 'dvanliere at gmail dot com' |
| 18 | +__date__ = '2010-11-09' |
| 19 | +__version__ = '0.1' |
17 | 20 | |
18 | 21 | import multiprocessing |
19 | 22 | |
— | — | @@ -30,11 +33,10 @@ |
31 | 34 | def run(self): |
32 | 35 | proc_name = self.name |
33 | 36 | kwargs = {} |
34 | | - IGNORE = [self.input_queue, self.result_queue, self.target] |
| 37 | + IGNORE = ['input_queue', 'result_queue', 'target'] |
35 | 38 | for kw in self.__dict__: |
36 | 39 | if kw not in IGNORE and not kw.startswith('_'): |
37 | 40 | kwargs[kw] = getattr(self, kw) |
38 | | - |
39 | 41 | self.target(self.input_queue, self.result_queue, **kwargs) |
40 | 42 | |
41 | 43 | |
— | — | @@ -50,10 +52,9 @@ |
51 | 53 | |
52 | 54 | def run(self): |
53 | 55 | proc_name = self.name |
54 | | - kwargs= {} |
55 | | - IGNORE = [self.result_queue, self.target] |
| 56 | + kwargs = {} |
| 57 | + IGNORE = ['result_queue', 'target'] |
56 | 58 | for kw in self.__dict__: |
57 | 59 | if kw not in IGNORE and not kw.startswith('_'): |
58 | 60 | kwargs[kw] = getattr(self, kw) |
59 | | - |
60 | 61 | self.target(self.result_queue, **kwargs) |
Index: trunk/tools/editor_trends/utils/process_constructor.py |
— | — | @@ -80,7 +80,7 @@ |
81 | 81 | **kwargs) for i in xrange(nr_input_processors)] |
82 | 82 | |
83 | 83 | for input_process in input_processes: |
84 | | - input_process.start() |
| 84 | + input_process.run() |
85 | 85 | pids = [p.pid for p in input_processes] |
86 | 86 | kwargs['pids'] = pids |
87 | 87 | |
Index: trunk/tools/editor_trends/utils/sort.py |
— | — | @@ -99,7 +99,7 @@ |
100 | 100 | fh.close() |
101 | 101 | |
102 | 102 | |
103 | | -def store_editors(input, dbname): |
| 103 | +def store_editors(input, filename, dbname): |
104 | 104 | fh = utils.create_txt_filehandle(input, filename, 'r', settings.ENCODING) |
105 | 105 | mongo = db.init_mongo_db(dbname) |
106 | 106 | collection = mongo['editors'] |
— | — | @@ -116,34 +116,29 @@ |
117 | 117 | fh.close() |
118 | 118 | |
119 | 119 | |
120 | | -def merge_sorted_files_launcher(dbname, input, output): |
| 120 | +def mergesort_external_launcher(dbname, input, output): |
121 | 121 | files = utils.retrieve_file_list(input, 'txt', mask='') |
122 | 122 | x = 0 |
| 123 | + maxval = 99999 |
123 | 124 | while maxval >= settings.MAX_FILES_OPEN: |
124 | 125 | x += 1.0 |
125 | 126 | maxval = round(len(files) / x) |
126 | 127 | chunks = utils.split_list(files, int(x)) |
127 | 128 | '''1st iteration external mergesort''' |
128 | 129 | for chunk in chunks: |
129 | | - #filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in chunks[chunk]] |
130 | | - #filename = merge_sorted_files(output, filehandles, chunk) |
131 | | - #filehandles = [fh.close() for fh in filehandles] |
132 | | - pass |
| 130 | + filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in chunks[chunk]] |
| 131 | + filename = merge_sorted_files(output, filehandles, chunk) |
| 132 | + filehandles = [fh.close() for fh in filehandles] |
133 | 133 | '''2nd iteration external mergesort, if necessary''' |
134 | 134 | if len(chunks) > 1: |
135 | 135 | files = utils.retrieve_file_list(output, 'txt', mask='[merged]') |
136 | 136 | filehandles = [utils.create_txt_filehandle(output, file, 'r', settings.ENCODING) for file in files] |
137 | | - filename = merge_sorted_files(output, filehandles, chunk) |
| 137 | + filename = merge_sorted_files(output, filehandles, 'final') |
138 | 138 | filehandles = [fh.close() for fh in filehandles] |
139 | 139 | store_editors(output, filename, dbname) |
140 | 140 | |
141 | 141 | |
142 | | -def debug_mergesort(input, output): |
143 | | - files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)') |
144 | | - for file in files: |
145 | | - pass |
146 | | - |
147 | | -def mergesort_feeder(input_queue, **kwargs): |
| 142 | +def mergesort_feeder(input_queue, result_queue, **kwargs): |
148 | 143 | input = kwargs.get('input', None) |
149 | 144 | output = kwargs.get('output', None) |
150 | 145 | while True: |
— | — | @@ -160,7 +155,6 @@ |
161 | 156 | break |
162 | 157 | |
163 | 158 | |
164 | | - |
165 | 159 | def mergesort_launcher(input, output): |
166 | 160 | kwargs = {'pbar': True, |
167 | 161 | 'nr_input_processors': settings.NUMBER_OF_PROCESSES, |
— | — | @@ -168,23 +162,23 @@ |
169 | 163 | 'input': input, |
170 | 164 | 'output': output, |
171 | 165 | } |
172 | | - chunks = {} |
173 | | - |
174 | 166 | files = utils.retrieve_file_list(input, 'txt') |
175 | | - parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0)) |
176 | | - a = 0 |
177 | | - |
178 | | - for x in xrange(settings.NUMBER_OF_PROCESSES): |
179 | | - b = a + parts |
180 | | - chunks[x] = files[a:b] |
181 | | - a = (x + 1) * parts |
182 | | - |
| 167 | + chunks = utils.split_list(files, settings.NUMBER_OF_PROCESSES) |
183 | 168 | pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs) |
184 | | - merge_sorted_files(input, output) |
185 | 169 | |
| 170 | +def debug_mergesort_feeder(input, output): |
| 171 | + kwargs = { |
| 172 | + 'input': input, |
| 173 | + 'output': output, |
| 174 | + } |
| 175 | + files = utils.retrieve_file_list(input, 'txt') |
| 176 | + chunks = utils.split_list(files, settings.NUMBER_OF_PROCESSES) |
| 177 | + q = pc.load_queue(chunks[0]) |
| 178 | + mergesort_feeder(q, False, **kwargs) |
| 179 | + |
186 | 180 | if __name__ == '__main__': |
187 | 181 | input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt') |
188 | 182 | output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted') |
| 183 | + dbname = 'enwiki' |
189 | 184 | mergesort_launcher(input, output) |
190 | | - #debug_mergesort(input, output) |
191 | | - #debug_merge_sorted_files(input, output) |
| 185 | + mergesort_external_launcher(dbname, output, output) |