r85808 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r85807‎ | r85808 | r85809 >
Date:18:28, 11 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Fixed some missing imports
Modified paths:
  • /trunk/tools/editor_trends/classes/buffer.py (modified) (history)
  • /trunk/tools/editor_trends/etl/extracter.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/utils/file_utils.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -34,7 +34,7 @@
3535 from utils import timer
3636 from classes import storage
3737 from etl import downloader
38 -from etl import enricher
 38+from etl import extracter
3939 from etl import store
4040 from etl import sort
4141 from etl import transformer
@@ -273,7 +273,10 @@
274274 stopwatch = timer.Timer()
275275 log.to_db(rts, 'dataset', 'extract', stopwatch, event='start')
276276 log.to_csv(logger, rts, 'Start', 'Extract', extract_launcher)
277 - enricher.launcher(rts)
 277+ res = file_utils.delete_file(rts.txt, None, directory=True)
 278+ if res:
 279+ res = file_utils.create_directory(rts.txt)
 280+ extracter.launcher(rts)
278281 stopwatch.elapsed()
279282 log.to_db(rts, 'dataset', 'extract', stopwatch, event='finish')
280283 log.to_csv(logger, rts, 'Finish', 'Extract', extract_launcher)
Index: trunk/tools/editor_trends/etl/extracter.py
@@ -1,288 +1,297 @@
2 -#!/usr/bin/python
3 -# -*- coding: utf-8 -*-
4 -'''
5 -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
6 -This program is free software; you can redistribute it and/or
7 -modify it under the terms of the GNU General Public License version 2
8 -as published by the Free Software Foundation.
9 -This program is distributed in the hope that it will be useful,
10 -but WITHOUT ANY WARRANTY; without even the implied warranty of
11 -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 -See the GNU General Public License for more details, at
13 -http://www.fsf.org/licenses/gpl.html
14 -'''
15 -
16 -
17 -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
18 -__author__email = 'dvanliere at gmail dot com'
19 -__date__ = '2011-04-10'
20 -__version__ = '0.1'
21 -
22 -
23 -from collections import deque
24 -import sys
25 -from xml.etree.cElementTree import iterparse, dump
26 -from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager
27 -
28 -if '..' not in sys.path:
29 - sys.path.append('..')
30 -
31 -from etl import variables
32 -from classes import buffer
33 -from analyses.adhoc import bot_detector
34 -
35 -def parse_revision(revision, article, xml_namespace, cache, bots, md5hashes, size):
36 - if revision == None:
37 - #the entire revision is empty, weird.
38 - #dump(revision)
39 - return md5hashes, size
40 -
41 - contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
42 - contributor = variables.parse_contributor(contributor, bots, xml_namespace)
43 - if not contributor:
44 - #editor is anonymous, ignore
45 - return md5hashes, size
46 -
47 - revision_id = revision.find('%s%s' % (xml_namespace, 'id'))
48 - revision_id = variables.extract_revision_id(revision_id)
49 - if revision_id == None:
50 - #revision_id is missing, which is weird
51 - return md5hashes, size
52 -
53 - article['revision_id'] = revision_id
54 - text = variables.extract_revision_text(revision)
55 - article.update(contributor)
56 -
57 - comment = variables.extract_comment_text(revision_id, revision)
58 - cache.comments.update(comment)
59 -
60 - timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text
61 - article['timestamp'] = timestamp
62 -
63 - hash = variables.create_md5hash(text)
64 - revert = variables.is_revision_reverted(hash['hash'], md5hashes)
65 - md5hashes.append(hash['hash'])
66 - size = variables.calculate_delta_article_size(size, text)
67 -
68 - article.update(hash)
69 - article.update(size)
70 - article.update(revert)
71 - cache.add(article)
72 - return md5hashes, size
73 -
74 -def setup_parser(rts):
75 - bots = '' #bot_detector.retrieve_bots(rts.language.code)
76 - start = 'start'; end = 'end'
77 - context = iterparse(fh, events=(start, end))
78 - context = iter(context)
79 -
80 - include_ns = {3: 'User Talk',
81 - 5: 'Wikipedia Talk',
82 - 1: 'Talk',
83 - 2: 'User',
84 - 4: 'Wikipedia'}
85 -
86 - return bots, context, include_ns
87 -
88 -
89 -def datacompetition_count_edits(rts, file_id):
90 - bots, context, include_ns = setup_parser(rts)
91 - counts = {}
92 - id = False
93 - ns = False
94 - parse = False
95 -
96 - try:
97 - for event, elem in context:
98 - if event is end and elem.tag.endswith('siteinfo'):
99 - xml_namespace = variables.determine_xml_namespace(elem)
100 - namespaces = variables.create_namespace_dict(elem, xml_namespace)
101 - ns = True
102 - elem.clear()
103 -
104 - elif event is end and elem.tag.endswith('title'):
105 - title = variables.parse_title(elem)
106 - article['title'] = title
107 - current_namespace = variables.determine_namespace(title, namespaces, include_ns)
108 - if current_namespace != False:
109 - parse = True
110 - article['namespace'] = current_namespace
111 - cache.count_articles += 1
112 - elem.clear()
113 -
114 - elif elem.tag.endswith('revision') and parse == True:
115 - if event is start:
116 - clear = False
117 - else:
118 - print 'IMPLEMENT'
119 - #md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
120 - cache.count_revisions += 1
121 - clear = True
122 - if clear:
123 - elem.clear()
124 -
125 - elif event is end and elem.tag.endswith('page'):
126 - elem.clear()
127 - #Reset all variables for next article
128 - id = False
129 - parse = False
130 -
131 - except SyntaxError, error:
132 - print 'Encountered invalid XML tag. Error message: %s' % error
133 - dump(elem)
134 - sys.exit(-1)
135 - except IOError, error:
136 - print '''Archive file is possibly corrupted. Please delete this archive
137 - and retry downloading. Error message: %s''' % error
138 - sys.exit(-1)
139 -
140 - filename = 'counts_kaggle_%s.csv' % file_id
141 - fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8')
142 - file_utils.write_dict_to_csv(counts, fh, keys)
143 - fh.close()
144 -
145 - filename = 'counts_kaggle_%s.bin' % file_id
146 - file_utils.store_object(counts, location, filename)
147 -
148 -
149 -def parse_xml(fh, cache, rts):
150 - bots, context, include_ns = setup_parser(rts)
151 - article = {}
152 - size = {}
153 - id = False
154 - ns = False
155 - parse = False
156 -
157 - try:
158 - for event, elem in context:
159 - if event is end and elem.tag.endswith('siteinfo'):
160 - xml_namespace = variables.determine_xml_namespace(elem)
161 - namespaces = variables.create_namespace_dict(elem, xml_namespace)
162 - ns = True
163 - elem.clear()
164 -
165 - elif event is end and elem.tag.endswith('title'):
166 - title = variables.parse_title(elem)
167 - article['title'] = title
168 - current_namespace = variables.determine_namespace(title, namespaces, include_ns)
169 - title_meta = variables.parse_title_meta_data(title, current_namespace)
170 - if current_namespace != False:
171 - parse = True
172 - article['namespace'] = current_namespace
173 - cache.count_articles += 1
174 - md5hashes = deque()
175 - elem.clear()
176 -
177 - elif elem.tag.endswith('revision') and parse == True:
178 - if event is start:
179 - clear = False
180 - else:
181 - md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
182 - cache.count_revisions += 1
183 - clear = True
184 - if clear:
185 - elem.clear()
186 -
187 - elif event is end and elem.tag.endswith('id') and id == False:
188 - article['article_id'] = elem.text
189 - if current_namespace:
190 - cache.articles[article['article_id']] = title_meta
191 - id = True
192 - elem.clear()
193 -
194 - elif event is end and elem.tag.endswith('page'):
195 - elem.clear()
196 - #Reset all variables for next article
197 - article = {}
198 - size = {}
199 - md5hashes = deque()
200 - id = False
201 - parse = False
202 -
203 -
204 - except SyntaxError, error:
205 - print 'Encountered invalid XML tag. Error message: %s' % error
206 - dump(elem)
207 - sys.exit(-1)
208 - except IOError, error:
209 - print '''Archive file is possibly corrupted. Please delete this archive
210 - and retry downloading. Error message: %s''' % error
211 - sys.exit(-1)
212 -
213 -
214 -def stream_raw_xml(input_queue, process_id, lock, rts):
215 - t0 = datetime.datetime.now()
216 - i = 0
217 - file_id = 0
218 - cache = buffer.CSVBuffer(process_id, rts, lock)
219 -
220 - while True:
221 - filename = input_queue.get()
222 - input_queue.task_done()
223 - file_id += 1
224 - if filename == None:
225 - print '%s files left in the queue' % input_queue.qsize()
226 - break
227 -
228 - fh = file_utils.create_streaming_buffer(filename)
229 -
230 - if rts.kaggle:
231 - datacompetition_count_edits(rts, file_id)
232 - else:
233 - parse_xml(fh, cache, rts)
234 -
235 - i += 1
236 - if i % 10000 == 0:
237 - print 'Worker %s parsed %s articles' % (process_id, i)
238 - fh.close()
239 -
240 - t1 = datetime.datetime.now()
241 - print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0))
242 - print 'There are %s files left in the queue' % (input_queue.qsize())
243 - t0 = t1
244 -
245 - if rts.kaggle:
246 - cache.close()
247 - cache.summary()
248 -
249 -
250 -def debug():
251 - fh = 'c:\\wikimedia\sv\wiki\svwiki-latest-stub-meta-history.xml'
252 -
253 -
254 -def launcher(rts):
255 - lock = RLock()
256 - mgr = Manager()
257 - open_handles = []
258 - open_handles = mgr.list(open_handles)
259 - clock = CustomLock(lock, open_handles)
260 - input_queue = JoinableQueue()
261 -
262 - files = file_utils.retrieve_file_list(rts.input_location)
263 -
264 - if len(files) > cpu_count():
265 - processors = cpu_count() - 1
266 - else:
267 - processors = len(files)
268 -
269 - for filename in files:
270 - filename = os.path.join(rts.input_location, filename)
271 - print filename
272 - input_queue.put(filename)
273 -
274 - for x in xrange(processors):
275 - print 'Inserting poison pill %s...' % x
276 - input_queue.put(None)
277 -
278 - extracters = [Process(target=stream_raw_xml, args=[input_queue, process_id,
279 - clock, rts])
280 - for process_id in xrange(processors)]
281 - for extracter in extracters:
282 - extracter.start()
283 -
284 - input_queue.join()
285 - print 'Finished parsing Wikipedia dump files.'
286 -
287 -
288 -if __name__ == '__main__':
289 - debug()
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+
 17+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 18+__author__email = 'dvanliere at gmail dot com'
 19+__date__ = '2011-04-10'
 20+__version__ = '0.1'
 21+
 22+
 23+from collections import deque
 24+import sys
 25+import os
 26+from datetime import datetime
 27+from xml.etree.cElementTree import iterparse, dump
 28+from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager
 29+
 30+if '..' not in sys.path:
 31+ sys.path.append('..')
 32+
 33+from etl import variables
 34+from utils import file_utils
 35+from classes import buffer
 36+from analyses.adhoc import bot_detector
 37+
 38+def parse_revision(revision, article, xml_namespace, cache, bots, md5hashes, size):
 39+ if revision == None:
 40+ #the entire revision is empty, weird.
 41+ #dump(revision)
 42+ return md5hashes, size
 43+
 44+ contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
 45+ contributor = variables.parse_contributor(contributor, bots, xml_namespace)
 46+ if not contributor:
 47+ #editor is anonymous, ignore
 48+ return md5hashes, size
 49+
 50+ revision_id = revision.find('%s%s' % (xml_namespace, 'id'))
 51+ revision_id = variables.extract_revision_id(revision_id)
 52+ if revision_id == None:
 53+ #revision_id is missing, which is weird
 54+ return md5hashes, size
 55+
 56+ article['revision_id'] = revision_id
 57+ text = variables.extract_revision_text(revision)
 58+ article.update(contributor)
 59+
 60+ comment = variables.extract_comment_text(revision_id, revision)
 61+ cache.comments.update(comment)
 62+
 63+ timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text
 64+ article['timestamp'] = timestamp
 65+
 66+ hash = variables.create_md5hash(text)
 67+ revert = variables.is_revision_reverted(hash['hash'], md5hashes)
 68+ md5hashes.append(hash['hash'])
 69+ size = variables.calculate_delta_article_size(size, text)
 70+
 71+ article.update(hash)
 72+ article.update(size)
 73+ article.update(revert)
 74+ cache.add(article)
 75+ return md5hashes, size
 76+
 77+def setup_parser(rts):
 78+ bots = bot_detector.retrieve_bots(rts.language.code)
 79+ include_ns = {3: 'User Talk',
 80+ 5: 'Wikipedia Talk',
 81+ 1: 'Talk',
 82+ 2: 'User',
 83+ 4: 'Wikipedia'}
 84+
 85+ return bots, include_ns
 86+
 87+
 88+def datacompetition_count_edits(fh, rts, file_id):
 89+ bots, include_ns = setup_parser(rts)
 90+
 91+ start = 'start'; end = 'end'
 92+ context = iterparse(fh, events=(start, end))
 93+ context = iter(context)
 94+
 95+ counts = {}
 96+ id = False
 97+ ns = False
 98+ parse = False
 99+
 100+ try:
 101+ for event, elem in context:
 102+ if event is end and elem.tag.endswith('siteinfo'):
 103+ xml_namespace = variables.determine_xml_namespace(elem)
 104+ namespaces = variables.create_namespace_dict(elem, xml_namespace)
 105+ ns = True
 106+ elem.clear()
 107+
 108+ elif event is end and elem.tag.endswith('title'):
 109+ title = variables.parse_title(elem)
 110+ article['title'] = title
 111+ current_namespace = variables.determine_namespace(title, namespaces, include_ns)
 112+ if current_namespace != False:
 113+ parse = True
 114+ article['namespace'] = current_namespace
 115+ cache.count_articles += 1
 116+ elem.clear()
 117+
 118+ elif elem.tag.endswith('revision') and parse == True:
 119+ if event is start:
 120+ clear = False
 121+ else:
 122+ print 'IMPLEMENT'
 123+ #md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
 124+ cache.count_revisions += 1
 125+ clear = True
 126+ if clear:
 127+ elem.clear()
 128+
 129+ elif event is end and elem.tag.endswith('page'):
 130+ elem.clear()
 131+ #Reset all variables for next article
 132+ id = False
 133+ parse = False
 134+
 135+ except SyntaxError, error:
 136+ print 'Encountered invalid XML tag. Error message: %s' % error
 137+ dump(elem)
 138+ sys.exit(-1)
 139+ except IOError, error:
 140+ print '''Archive file is possibly corrupted. Please delete this archive
 141+ and retry downloading. Error message: %s''' % error
 142+ sys.exit(-1)
 143+
 144+ filename = 'counts_kaggle_%s.csv' % file_id
 145+ fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8')
 146+ file_utils.write_dict_to_csv(counts, fh, keys)
 147+ fh.close()
 148+
 149+ filename = 'counts_kaggle_%s.bin' % file_id
 150+ file_utils.store_object(counts, location, filename)
 151+
 152+
 153+def parse_xml(fh, rts, cache, file_id):
 154+ bots, include_ns = setup_parser(rts)
 155+
 156+ start = 'start'; end = 'end'
 157+ context = iterparse(fh, events=(start, end))
 158+ context = iter(context)
 159+
 160+ article = {}
 161+ size = {}
 162+ id = False
 163+ ns = False
 164+ parse = False
 165+
 166+ try:
 167+ for event, elem in context:
 168+ if event is end and elem.tag.endswith('siteinfo'):
 169+ xml_namespace = variables.determine_xml_namespace(elem)
 170+ namespaces = variables.create_namespace_dict(elem, xml_namespace)
 171+ ns = True
 172+ elem.clear()
 173+
 174+ elif event is end and elem.tag.endswith('title'):
 175+ title = variables.parse_title(elem)
 176+ article['title'] = title
 177+ current_namespace = variables.determine_namespace(title, namespaces, include_ns)
 178+ title_meta = variables.parse_title_meta_data(title, current_namespace)
 179+ if isinstance(current_namespace, int):
 180+ parse = True
 181+ article['namespace'] = current_namespace
 182+ cache.count_articles += 1
 183+ if cache.count_articles % 10000 == 0:
 184+ print 'Worker %s parsed %s articles' % (file_id, cache.count_articles)
 185+ md5hashes = deque()
 186+ elem.clear()
 187+
 188+ elif elem.tag.endswith('revision') and parse == True:
 189+ if event is start:
 190+ clear = False
 191+ else:
 192+ md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
 193+ cache.count_revisions += 1
 194+ clear = True
 195+ if clear:
 196+ elem.clear()
 197+
 198+ elif event is end and elem.tag.endswith('id') and id == False:
 199+ article['article_id'] = elem.text
 200+ if current_namespace:
 201+ cache.articles[article['article_id']] = title_meta
 202+ id = True
 203+ elem.clear()
 204+
 205+ elif event is end and elem.tag.endswith('page'):
 206+ elem.clear()
 207+ #Reset all variables for next article
 208+ article = {}
 209+ size = {}
 210+ md5hashes = deque()
 211+ id = False
 212+ parse = False
 213+
 214+
 215+ except SyntaxError, error:
 216+ print 'Encountered invalid XML tag. Error message: %s' % error
 217+ dump(elem)
 218+ sys.exit(-1)
 219+ except IOError, error:
 220+ print '''Archive file is possibly corrupted. Please delete this archive
 221+ and retry downloading. Error message: %s''' % error
 222+ sys.exit(-1)
 223+ print 'Finished parsing Wikipedia dump file.'
 224+
 225+
 226+def stream_raw_xml(input_queue, process_id, lock, rts):
 227+ t0 = datetime.now()
 228+ file_id = 0
 229+ cache = buffer.CSVBuffer(process_id, rts, lock)
 230+
 231+ while True:
 232+ filename = input_queue.get()
 233+ input_queue.task_done()
 234+ file_id += 1
 235+ if filename == None:
 236+ print '%s files left in the queue' % input_queue.qsize()
 237+ break
 238+
 239+ print filename
 240+ fh = file_utils.create_streaming_buffer(filename)
 241+
 242+ if rts.kaggle:
 243+ datacompetition_count_edits(fh, rts, file_id)
 244+ else:
 245+ parse_xml(fh, rts, cache, file_id)
 246+
 247+ fh.close()
 248+
 249+ t1 = datetime.now()
 250+ print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0))
 251+ print 'There are %s files left in the queue' % (input_queue.qsize())
 252+ t0 = t1
 253+
 254+ if rts.kaggle:
 255+ cache.close()
 256+ cache.summary()
 257+
 258+
 259+def debug():
 260+ fh = 'c:\\wikimedia\sv\wiki\svwiki-latest-stub-meta-history.xml'
 261+
 262+
 263+def launcher(rts):
 264+ lock = RLock()
 265+ mgr = Manager()
 266+ open_handles = []
 267+ open_handles = mgr.list(open_handles)
 268+ clock = buffer.CustomLock(lock, open_handles)
 269+ input_queue = JoinableQueue()
 270+
 271+ files = file_utils.retrieve_file_list(rts.input_location)
 272+
 273+ if len(files) > cpu_count():
 274+ processors = cpu_count() - 1
 275+ else:
 276+ processors = len(files)
 277+
 278+ for filename in files:
 279+ filename = os.path.join(rts.input_location, filename)
 280+ print filename
 281+ input_queue.put(filename)
 282+
 283+ for x in xrange(processors):
 284+ print 'Inserting poison pill %s...' % x
 285+ input_queue.put(None)
 286+
 287+ extracters = [Process(target=stream_raw_xml, args=[input_queue, process_id,
 288+ clock, rts])
 289+ for process_id in xrange(processors)]
 290+ for extracter in extracters:
 291+ extracter.start()
 292+
 293+ input_queue.join()
 294+
 295+
 296+
 297+if __name__ == '__main__':
 298+ debug()
Index: trunk/tools/editor_trends/classes/buffer.py
@@ -1,3 +1,29 @@
 2+#!/usr/bin/python
 3+# -*- coding: utf-8 -*-
 4+'''
 5+Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+
 17+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
 18+__author__email = 'dvanliere at gmail dot com'
 19+__date__ = '2011-04-11'
 20+__version__ = '0.1'
 21+
 22+import sys
 23+if '..' not in sys.path:
 24+ sys.path.append('..')
 25+
 26+from utils import file_utils
 27+
228 class CustomLock:
329 def __init__(self, lock, open_handles):
430 self.lock = lock
Index: trunk/tools/editor_trends/utils/file_utils.py
@@ -202,8 +202,10 @@
203203 fh.write('\n')
204204
205205
206 -def create_txt_filehandle(location, name, mode, encoding):
207 - filename = construct_filename(name, '.csv')
 206+def create_txt_filehandle(location, filename, mode, encoding):
 207+ filename = str(filename)
 208+ if not filename.endswith('.csv'):
 209+ filename = construct_filename(filename, '.csv')
208210 path = os.path.join(location, filename)
209211 return codecs.open(path, mode, encoding=encoding)
210212