Index: trunk/tools/editor_trends/map_wiki_editors.py |
— | — | @@ -202,7 +202,7 @@ |
203 | 203 | print 'Still sleeping, queue is %s items long' % output.qsize() |
204 | 204 | |
205 | 205 | else: |
206 | | - output.close() |
| 206 | + fh.close() |
207 | 207 | |
208 | 208 | if pbar: |
209 | 209 | print file, xml_queue.qsize() |
Index: trunk/tools/editor_trends/utils/sort.py |
— | — | @@ -25,9 +25,12 @@ |
26 | 26 | ''' |
27 | 27 | |
28 | 28 | import heapq |
| 29 | +from multiprocessing import Queue |
| 30 | +from Queue import Empty |
29 | 31 | |
30 | 32 | import settings |
31 | 33 | import utils |
| 34 | +import process_constructor as pc |
32 | 35 | from database import cache |
33 | 36 | |
34 | 37 | def quick_sort(obs): |
— | — | @@ -110,7 +113,7 @@ |
111 | 114 | fh.close() |
112 | 115 | |
113 | 116 | |
114 | | -def debug_merge_sorted_files(input, output): |
| 117 | +def merge_sorted_files(input, output): |
115 | 118 | files = utils.retrieve_file_list(input, 'txt', mask='') |
116 | 119 | filehandles = [utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) for file in files] |
117 | 120 | lines = merge_sorted_files(output, filehandles) |
— | — | @@ -121,17 +124,50 @@ |
122 | 125 | def debug_mergesort(input, output): |
123 | 126 | files = utils.retrieve_file_list(input, 'txt', mask='((?!_sorted)\d)') |
124 | 127 | for file in files: |
125 | | - fh = utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) |
126 | | - data = fh.readlines() |
127 | | - fh.close() |
128 | | - data = [d.replace('\n', '') for d in data] |
129 | | - data = [d.split('\t') for d in data] |
130 | | - sorted_data = mergesort(data) |
131 | | - write_sorted_file(sorted_data, file, output) |
| 128 | + pass |
| 129 | + |
| 130 | +def mergesort_feeder(input_queue, **kwargs): |
| 131 | + input = kwargs.get('input', None) |
| 132 | + output = kwargs.get('output', None) |
| 133 | + while True: |
| 134 | + try: |
| 135 | + file = input_queue.get(block=False) |
| 136 | + fh = utils.create_txt_filehandle(input, file, 'r', settings.ENCODING) |
| 137 | + data = fh.readlines() |
| 138 | + fh.close() |
| 139 | + data = [d.replace('\n', '') for d in data] |
| 140 | + data = [d.split('\t') for d in data] |
| 141 | + sorted_data = mergesort(data) |
| 142 | + write_sorted_file(sorted_data, file, output) |
| 143 | + except Empty: |
| 144 | + break |
| 145 | + |
132 | 146 | |
133 | 147 | |
| 148 | +def mergesort_launcher(input, output): |
| 149 | + kwargs = {'pbar': True, |
| 150 | + 'nr_input_processors': settings.NUMBER_OF_PROCESSES, |
| 151 | + 'nr_output_processors': settings.NUMBER_OF_PROCESSES, |
| 152 | + 'input': input, |
| 153 | + 'output': output, |
| 154 | + } |
| 155 | + chunks = {} |
| 156 | + |
| 157 | + files = utils.retrieve_file_list(input, 'txt') |
| 158 | + parts = int(round(float(len(files)) / settings.NUMBER_OF_PROCESSES, 0)) |
| 159 | + a = 0 |
| 160 | + |
| 161 | + for x in xrange(settings.NUMBER_OF_PROCESSES): |
| 162 | + b = a + parts |
| 163 | + chunks[x] = files[a:b] |
| 164 | + a = (x + 1) * parts |
| 165 | + |
| 166 | + pc.build_scaffolding(pc.load_queue, mergesort_feeder, chunks, False, False, **kwargs) |
| 167 | + merge_sorted_files(input, output) |
| 168 | + |
134 | 169 | if __name__ == '__main__': |
135 | | - input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki') |
| 170 | + input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt') |
136 | 171 | output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted') |
137 | | - debug_mergesort(input, output) |
| 172 | + mergesort_launcher(input, output) |
| 173 | + #debug_mergesort(input, output) |
138 | 174 | #debug_merge_sorted_files(input, output) |