r86323 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r86322‎ | r86323 | r86324 >
Date:17:06, 18 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Fixed another pesky XML parsing memory leak.
Modified paths:
  • /trunk/tools/editor_trends/etl/extracter.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/extracter.py
@@ -23,7 +23,7 @@
2424 import os
2525 from datetime import datetime
2626 from xml.etree.cElementTree import iterparse, dump
27 -from multiprocessing import JoinableQueue, Process, cpu_count, Lock, Manager
 27+from multiprocessing import JoinableQueue, Process, cpu_count
2828
2929 if '..' not in sys.path:
3030 sys.path.append('..')
@@ -104,80 +104,6 @@
105105 return counts
106106
107107
108 -#def datacompetition_count_edits(fh, rts, process_id, file_id):
109 -# '''
110 -# This function counts for every editor the total number of edits that person
111 -# made. It follows the same logic as the parse_xml function although it
112 -# skips a bunch of extraction phases that are not relevant for counting
113 -# edits. This function is only to be used to create the prediction dataset
114 -# for the datacompetition.
115 -# '''
116 -# bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
117 -# include_ns = {}
118 -#
119 -# start = 'start'; end = 'end'
120 -# context = iterparse(fh, events=(start, end))
121 -# context = iter(context)
122 -#
123 -# counts = {}
124 -# id = False
125 -# ns = False
126 -# parse = False
127 -# count_articles = 0
128 -#
129 -# try:
130 -# for event, elem in context:
131 -# if event is end and elem.tag.endswith('siteinfo'):
132 -# xml_namespace = variables.determine_xml_namespace(elem)
133 -# namespaces = variables.create_namespace_dict(elem, xml_namespace)
134 -# ns = True
135 -# elem.clear()
136 -#
137 -# elif event is end and elem.tag.endswith('title'):
138 -# title = variables.parse_title(elem)
139 -# current_namespace = variables.determine_namespace(title, namespaces, include_ns)
140 -# if isinstance(current_namespace, int):
141 -# parse = True
142 -# count_articles += 1
143 -# if count_articles % 10000 == 0:
144 -# print 'Worker %s parsed %s articles' % (process_id, count_articles)
145 -#
146 -# elem.clear()
147 -#
148 -# elif elem.tag.endswith('revision') and parse == True:
149 -# if event is start:
150 -# clear = False
151 -# else:
152 -# counts = datacompetition_parse_revision(elem, xml_namespace, bots, counts)
153 -# clear = True
154 -# if clear:
155 -# elem.clear()
156 -#
157 -# elif event is end and elem.tag.endswith('page'):
158 -# elem.clear()
159 -# #Reset all variables for next article
160 -# id = False
161 -# parse = False
162 -#
163 -# except SyntaxError, error:
164 -# print 'Encountered invalid XML tag. Error message: %s' % error
165 -# dump(elem)
166 -# sys.exit(-1)
167 -# except IOError, error:
168 -# print '''Archive file is possibly corrupted. Please delete this archive
169 -# and retry downloading. Error message: %s''' % error
170 -# sys.exit(-1)
171 -# except Exception, error:
172 -# print error
173 -#
174 -# filename = 'counts_kaggle_%s.csv' % file_id
175 -# keys = counts.keys()
176 -# fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8')
177 -# file_utils.write_dict_to_csv(counts, fh, keys)
178 -# fh.close()
179 -# counts = {}
180 -
181 -
182108 def parse_xml(fh, rts, cache, process_id, file_id):
183109 bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
184110 include_ns = {3: 'User Talk',
@@ -218,14 +144,17 @@
219145 md5hashes = deque()
220146 elem.clear()
221147
222 - elif elem.tag.endswith('revision') and parse == True:
223 - if event is start:
224 - clear = False
 148+ elif elem.tag.endswith('revision'):
 149+ if parse:
 150+ if event is start:
 151+ clear = False
 152+ else:
 153+ md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
 154+ cache.count_revisions += 1
 155+ clear = True
 156+ if clear:
 157+ elem.clear()
225158 else:
226 - md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
227 - cache.count_revisions += 1
228 - clear = True
229 - if clear:
230159 elem.clear()
231160
232161 elif event is end and elem.tag.endswith('id') and id == False:
@@ -244,7 +173,6 @@
245174 id = False
246175 parse = False
247176
248 -
249177 except SyntaxError, error:
250178 print 'Encountered invalid XML tag. Error message: %s' % error
251179 dump(elem)
@@ -259,8 +187,7 @@
260188 def stream_raw_xml(input_queue, process_id, fhd, rts):
261189 t0 = datetime.now()
262190 file_id = 0
263 - if not rts.kaggle:
264 - cache = buffer.CSVBuffer(process_id, rts, fhd)
 191+ cache = buffer.CSVBuffer(process_id, rts, fhd)
265192
266193 while True:
267194 filename = input_queue.get()
@@ -272,12 +199,7 @@
273200
274201 print filename
275202 fh = file_utils.create_streaming_buffer(filename)
276 -
277 - if rts.kaggle:
278 - datacompetition_count_edits(fh, rts, process_id, file_id)
279 - else:
280 - parse_xml(fh, rts, cache, process_id, file_id)
281 -
 203+ parse_xml(fh, rts, cache, process_id, file_id)
282204 fh.close()
283205
284206 t1 = datetime.now()
@@ -285,9 +207,8 @@
286208 print 'There are %s files left in the queue' % (input_queue.qsize())
287209 t0 = t1
288210
289 - if not rts.kaggle:
290 - cache.close()
291 - cache.summary()
 211+ cache.close()
 212+ cache.summary()
292213
293214
294215 def debug():
@@ -300,7 +221,7 @@
301222 files = file_utils.retrieve_file_list(rts.input_location)
302223
303224 if len(files) > cpu_count():
304 - processors = cpu_count()
 225+ processors = cpu_count() - 1
305226 else:
306227 processors = len(files)
307228