Index: trunk/tools/editor_trends/map_wiki_editors.py |
— | — | @@ -24,6 +24,7 @@ |
25 | 25 | import xml.etree.cElementTree as cElementTree |
26 | 26 | from multiprocessing import Queue |
27 | 27 | from Queue import Empty |
| 28 | +import pymongo |
28 | 29 | |
29 | 30 | # Custom written files |
30 | 31 | import settings |
— | — | @@ -71,15 +72,15 @@ |
72 | 73 | ignore anonymous editors. If you are interested in collecting data on |
73 | 74 | anonymous editors then add the string 'ip' to the tags variable. |
74 | 75 | ''' |
75 | | - tags = ['id'] |
| 76 | + tags = ['id'] |
76 | 77 | if contributor.get('deleted'): |
77 | | - return -1 #Not sure if this is the best way to code deleted contributors. |
| 78 | + return - 1 #Not sure if this is the best way to code deleted contributors. |
78 | 79 | for elem in contributor: |
79 | 80 | if elem.tag in tags: |
80 | 81 | if elem.text != None: |
81 | 82 | return elem.text.decode('utf-8') |
82 | 83 | else: |
83 | | - return -1 |
| 84 | + return - 1 |
84 | 85 | |
85 | 86 | |
86 | 87 | def output_editor_information(elem, data_queue, **kwargs): |
— | — | @@ -104,8 +105,8 @@ |
105 | 106 | vars.pop('bot') |
106 | 107 | vars['date'] = utils.convert_timestamp_to_date(vars['date']) |
107 | 108 | data_queue.put(vars) |
108 | | - vars={} |
109 | | - |
| 109 | + vars = {} |
| 110 | + |
110 | 111 | def lookup_new_editors(xml_queue, data_queue, pbar, bots, debug=False, separator='\t'): |
111 | 112 | if settings.DEBUG: |
112 | 113 | messages = {} |
— | — | @@ -118,37 +119,42 @@ |
119 | 120 | file = xml_queue.get(block=False) |
120 | 121 | #print 'parsing %s' % file |
121 | 122 | if file == None: |
| 123 | + print 'Swallowed a poison pill' |
122 | 124 | break |
123 | | - |
124 | | - data = xml.read_input(utils.open_txt_file(settings.XML_FILE_LOCATION |
| 125 | + |
| 126 | + data = xml.read_input(utils.open_txt_file(settings.XML_FILE_LOCATION |
125 | 127 | + file, 'r', encoding=settings.ENCODING)) |
126 | 128 | #data = read_input(sys.stdin) |
127 | 129 | #print xml_queue.qsize() |
128 | 130 | for raw_data in data: |
129 | 131 | xml_buffer = cStringIO.StringIO() |
130 | 132 | raw_data.insert(0, '<?xml version="1.0" encoding="UTF-8" ?>\n') |
131 | | - raw_data = ''.join(raw_data) |
132 | | - xml_buffer.write(raw_data) |
133 | | - |
134 | 133 | try: |
| 134 | + raw_data = ''.join(raw_data) |
| 135 | + xml_buffer.write(raw_data) |
135 | 136 | elem = cElementTree.XML(xml_buffer.getvalue()) |
136 | 137 | output_editor_information(elem, data_queue, bots=bots) |
137 | 138 | except SyntaxError, error: |
138 | 139 | print error |
139 | | - #There are few cases with invalid tokens, they are fixed |
140 | | - #here and then reinserted into the XML DOM |
141 | | - #data = convert_html_entities(xml_buffer.getvalue()) |
142 | | - #elem = cElementTree.XML(data) |
143 | | - #output_editor_information(elem) |
144 | | - if settings.DEBUG: |
145 | | - utils.track_errors(xml_buffer, error, file, messages) |
| 140 | + ''' |
| 141 | + There are few cases with invalid tokens, they are fixed |
| 142 | + here and then reinserted into the XML DOM |
| 143 | + data = convert_html_entities(xml_buffer.getvalue()) |
| 144 | + elem = cElementTree.XML(data) |
| 145 | + output_editor_information(elem) |
| 146 | + ''' |
146 | 147 | except UnicodeEncodeError, error: |
147 | 148 | print error |
| 149 | + except MemoryError, error: |
| 150 | + ''' |
| 151 | + There is one xml file causing an out of memory file, not |
| 152 | + sure which one yet. |
| 153 | + ''' |
| 154 | + print error |
| 155 | + finally: |
148 | 156 | if settings.DEBUG: |
149 | 157 | utils.track_errors(xml_buffer, error, file, messages) |
150 | | - #finally: |
151 | 158 | |
152 | | - |
153 | 159 | if pbar: |
154 | 160 | print xml_queue.qsize() |
155 | 161 | #utils.update_progressbar(pbar, xml_queue) |
— | — | @@ -171,11 +177,12 @@ |
172 | 178 | chunk = data_queue.get(block=False) |
173 | 179 | values.append(chunk) |
174 | 180 | #print chunk |
175 | | - if len(values) == 100000: |
176 | | - collection.insert(values) |
| 181 | + if len(values) == 25000: |
| 182 | + collection.insert(chunk) |
177 | 183 | values = [] |
178 | 184 | #print data_queue.qsize() |
179 | | - data_queue.task_done() |
| 185 | + |
| 186 | + |
180 | 187 | except Empty: |
181 | 188 | # The queue is empty but store the remaining values if present |
182 | 189 | if values != []: |
— | — | @@ -190,12 +197,15 @@ |
191 | 198 | are finished and this Queue is empty than break, else wait for the |
192 | 199 | Queue to fill. |
193 | 200 | ''' |
| 201 | + |
194 | 202 | if all([utils.check_if_process_is_running(pid) for pid in pids]): |
195 | 203 | pass |
| 204 | + #print 'Empty queue or not %s?' % data_queue.qsize() |
196 | 205 | else: |
197 | 206 | break |
198 | 207 | |
199 | 208 | |
| 209 | + |
200 | 210 | def store_data_db(data_queue, pids): |
201 | 211 | connection = db.init_database() |
202 | 212 | cursor = connection.cursor() |
— | — | @@ -238,8 +248,8 @@ |
239 | 249 | for bot in cursor: |
240 | 250 | ids[bot['id']] = bot['name'] |
241 | 251 | pc.build_scaffolding(pc.load_queue, lookup_new_editors, files, store_data_mongo, True, bots=ids) |
242 | | - db.add_index_to_collection('editors', 'date') |
243 | | - db.add_index_to_collection('editors', 'name') |
| 252 | + keys = [('date', pymongo.ASCENDING), ('name', pymongo.ASCENDING)] |
| 253 | + db.add_index_to_collection('editors', 'editors', keys) |
244 | 254 | |
245 | 255 | def debug_lookup_new_editors(): |
246 | 256 | q = Queue() |
— | — | @@ -247,18 +257,18 @@ |
248 | 258 | pbar = progressbar.ProgressBar().start() |
249 | 259 | edits = db.init_mongo_db('editors') |
250 | 260 | lookup_new_editors('1.xml', q, None, None, True) |
251 | | - db.add_index_to_collection('editors', 'date') |
252 | | - db.add_index_to_collection('editors', 'name') |
253 | | - |
| 261 | + keys = [('date', pymongo.ASCENDING), ('name', pymongo.ASCENDING)] |
| 262 | + db.add_index_to_collection('editors', 'editors', keys) |
254 | 263 | |
255 | 264 | |
| 265 | + |
256 | 266 | def run_hadoop(): |
257 | 267 | pass |
258 | 268 | |
259 | 269 | |
260 | 270 | if __name__ == "__main__": |
261 | 271 | #debug_lookup_new_editors() |
262 | | - |
| 272 | + |
263 | 273 | if settings.RUN_MODE == 'stand_alone': |
264 | 274 | run_stand_alone() |
265 | 275 | print 'Finished processing XML files.' |
Index: trunk/tools/editor_trends/utils/utils.py |
— | — | @@ -55,6 +55,7 @@ |
56 | 56 | if settings.OS == 'Windows': |
57 | 57 | PROCESS_TERMINATE = 1 |
58 | 58 | handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, pid) |
| 59 | + ctypes.windll.kernel32.CloseHandle(handle) |
59 | 60 | if handle != 0: |
60 | 61 | return True |
61 | 62 | else: |
Index: trunk/tools/editor_trends/utils/process_constructor.py |
— | — | @@ -14,7 +14,7 @@ |
15 | 15 | |
16 | 16 | __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
17 | 17 | |
18 | | -from multiprocessing import Process, Queue, JoinableQueue |
| 18 | +from multiprocessing import Process, Queue |
19 | 19 | from Queue import Empty |
20 | 20 | |
21 | 21 | import settings |
— | — | @@ -52,7 +52,7 @@ |
53 | 53 | |
54 | 54 | input_queue = Queue() |
55 | 55 | if result_queue: |
56 | | - result_queue = JoinableQueue() |
| 56 | + result_queue = Queue() |
57 | 57 | |
58 | 58 | load_input_queue(input_queue, obj, poison_pill=True) |
59 | 59 | |