Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -23,7 +23,7 @@ |
24 | 24 | import os
|
25 | 25 | from datetime import datetime
|
26 | 26 | from xml.etree.cElementTree import iterparse, dump
|
27 | | -from multiprocessing import JoinableQueue, Process, cpu_count, Lock, Manager
|
| 27 | +from multiprocessing import JoinableQueue, Process, cpu_count
|
28 | 28 |
|
29 | 29 | if '..' not in sys.path:
|
30 | 30 | sys.path.append('..')
|
— | — | @@ -104,80 +104,6 @@ |
105 | 105 | return counts
|
106 | 106 |
|
107 | 107 |
|
108 | | -#def datacompetition_count_edits(fh, rts, process_id, file_id):
|
109 | | -# '''
|
110 | | -# This function counts for every editor the total number of edits that person
|
111 | | -# made. It follows the same logic as the parse_xml function although it
|
112 | | -# skips a bunch of extraction phases that are not relevant for counting
|
113 | | -# edits. This function is only to be used to create the prediction dataset
|
114 | | -# for the datacompetition.
|
115 | | -# '''
|
116 | | -# bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
|
117 | | -# include_ns = {}
|
118 | | -#
|
119 | | -# start = 'start'; end = 'end'
|
120 | | -# context = iterparse(fh, events=(start, end))
|
121 | | -# context = iter(context)
|
122 | | -#
|
123 | | -# counts = {}
|
124 | | -# id = False
|
125 | | -# ns = False
|
126 | | -# parse = False
|
127 | | -# count_articles = 0
|
128 | | -#
|
129 | | -# try:
|
130 | | -# for event, elem in context:
|
131 | | -# if event is end and elem.tag.endswith('siteinfo'):
|
132 | | -# xml_namespace = variables.determine_xml_namespace(elem)
|
133 | | -# namespaces = variables.create_namespace_dict(elem, xml_namespace)
|
134 | | -# ns = True
|
135 | | -# elem.clear()
|
136 | | -#
|
137 | | -# elif event is end and elem.tag.endswith('title'):
|
138 | | -# title = variables.parse_title(elem)
|
139 | | -# current_namespace = variables.determine_namespace(title, namespaces, include_ns)
|
140 | | -# if isinstance(current_namespace, int):
|
141 | | -# parse = True
|
142 | | -# count_articles += 1
|
143 | | -# if count_articles % 10000 == 0:
|
144 | | -# print 'Worker %s parsed %s articles' % (process_id, count_articles)
|
145 | | -#
|
146 | | -# elem.clear()
|
147 | | -#
|
148 | | -# elif elem.tag.endswith('revision') and parse == True:
|
149 | | -# if event is start:
|
150 | | -# clear = False
|
151 | | -# else:
|
152 | | -# counts = datacompetition_parse_revision(elem, xml_namespace, bots, counts)
|
153 | | -# clear = True
|
154 | | -# if clear:
|
155 | | -# elem.clear()
|
156 | | -#
|
157 | | -# elif event is end and elem.tag.endswith('page'):
|
158 | | -# elem.clear()
|
159 | | -# #Reset all variables for next article
|
160 | | -# id = False
|
161 | | -# parse = False
|
162 | | -#
|
163 | | -# except SyntaxError, error:
|
164 | | -# print 'Encountered invalid XML tag. Error message: %s' % error
|
165 | | -# dump(elem)
|
166 | | -# sys.exit(-1)
|
167 | | -# except IOError, error:
|
168 | | -# print '''Archive file is possibly corrupted. Please delete this archive
|
169 | | -# and retry downloading. Error message: %s''' % error
|
170 | | -# sys.exit(-1)
|
171 | | -# except Exception, error:
|
172 | | -# print error
|
173 | | -#
|
174 | | -# filename = 'counts_kaggle_%s.csv' % file_id
|
175 | | -# keys = counts.keys()
|
176 | | -# fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8')
|
177 | | -# file_utils.write_dict_to_csv(counts, fh, keys)
|
178 | | -# fh.close()
|
179 | | -# counts = {}
|
180 | | -
|
181 | | -
|
182 | 108 | def parse_xml(fh, rts, cache, process_id, file_id):
|
183 | 109 | bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
|
184 | 110 | include_ns = {3: 'User Talk',
|
— | — | @@ -218,14 +144,17 @@ |
219 | 145 | md5hashes = deque()
|
220 | 146 | elem.clear()
|
221 | 147 |
|
222 | | - elif elem.tag.endswith('revision') and parse == True:
|
223 | | - if event is start:
|
224 | | - clear = False
|
| 148 | + elif elem.tag.endswith('revision'):
|
| 149 | + if parse:
|
| 150 | + if event is start:
|
| 151 | + clear = False
|
| 152 | + else:
|
| 153 | + md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
|
| 154 | + cache.count_revisions += 1
|
| 155 | + clear = True
|
| 156 | + if clear:
|
| 157 | + elem.clear()
|
225 | 158 | else:
|
226 | | - md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
|
227 | | - cache.count_revisions += 1
|
228 | | - clear = True
|
229 | | - if clear:
|
230 | 159 | elem.clear()
|
231 | 160 |
|
232 | 161 | elif event is end and elem.tag.endswith('id') and id == False:
|
— | — | @@ -244,7 +173,6 @@ |
245 | 174 | id = False
|
246 | 175 | parse = False
|
247 | 176 |
|
248 | | -
|
249 | 177 | except SyntaxError, error:
|
250 | 178 | print 'Encountered invalid XML tag. Error message: %s' % error
|
251 | 179 | dump(elem)
|
— | — | @@ -259,8 +187,7 @@ |
260 | 188 | def stream_raw_xml(input_queue, process_id, fhd, rts):
|
261 | 189 | t0 = datetime.now()
|
262 | 190 | file_id = 0
|
263 | | - if not rts.kaggle:
|
264 | | - cache = buffer.CSVBuffer(process_id, rts, fhd)
|
| 191 | + cache = buffer.CSVBuffer(process_id, rts, fhd)
|
265 | 192 |
|
266 | 193 | while True:
|
267 | 194 | filename = input_queue.get()
|
— | — | @@ -272,12 +199,7 @@ |
273 | 200 |
|
274 | 201 | print filename
|
275 | 202 | fh = file_utils.create_streaming_buffer(filename)
|
276 | | -
|
277 | | - if rts.kaggle:
|
278 | | - datacompetition_count_edits(fh, rts, process_id, file_id)
|
279 | | - else:
|
280 | | - parse_xml(fh, rts, cache, process_id, file_id)
|
281 | | -
|
| 203 | + parse_xml(fh, rts, cache, process_id, file_id)
|
282 | 204 | fh.close()
|
283 | 205 |
|
284 | 206 | t1 = datetime.now()
|
— | — | @@ -285,9 +207,8 @@ |
286 | 208 | print 'There are %s files left in the queue' % (input_queue.qsize())
|
287 | 209 | t0 = t1
|
288 | 210 |
|
289 | | - if not rts.kaggle:
|
290 | | - cache.close()
|
291 | | - cache.summary()
|
| 211 | + cache.close()
|
| 212 | + cache.summary()
|
292 | 213 |
|
293 | 214 |
|
294 | 215 | def debug():
|
— | — | @@ -300,7 +221,7 @@ |
301 | 222 | files = file_utils.retrieve_file_list(rts.input_location)
|
302 | 223 |
|
303 | 224 | if len(files) > cpu_count():
|
304 | | - processors = cpu_count()
|
| 225 | + processors = cpu_count() - 1
|
305 | 226 | else:
|
306 | 227 | processors = len(files)
|
307 | 228 |
|