r80865 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r80864‎ | r80865 | r80866 >
Date:16:43, 24 January 2011
Author:diederik
Status:deferred
Tags:
Comment:
Added multiprocessing support for extracting data from the dump files. This is achieved by trying to download partial dump files (these are available for large Wikimedia projects like the English Wikipedia). These partial dump files are processed in parallel but I need benchmarks to see how big the improvement is.
Modified paths:
  • /trunk/tools/editor_trends/etl/extracter.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/extracter.py
@@ -13,17 +13,15 @@
1414 '''
1515
1616 __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
17 -__author__email = 'dvanliere at gmail dot com'
 17+__email__ = 'dvanliere at gmail dot com'
1818 __date__ = '2010-12-13'
1919 __version__ = '0.1'
2020
2121 import sys
2222 import re
23 -import json
2423 import os
25 -import xml.etree.cElementTree as cElementTree
2624 import multiprocessing
27 -from Queue import Empty
 25+import progressbar
2826
2927 sys.path.append('..')
3028 import configuration
@@ -31,7 +29,9 @@
3230
3331 import wikitree.parser
3432 from bots import bots
35 -from utils import utils
 33+from utils import file_utils
 34+from utils import compression
 35+from utils import log
3636
3737 try:
3838 import psyco
@@ -60,34 +60,15 @@
6161 return ''
6262
6363
64 -def remove_namespace(element, namespace):
65 - '''Remove namespace from the XML document.'''
66 - ns = u'{%s}' % namespace
67 - nsl = len(ns)
68 - for elem in element.getiterator():
69 - if elem.tag.startswith(ns):
70 - elem.tag = elem.tag[nsl:]
71 - return element
72 -
73 -
74 -def load_namespace(language):
75 - file = '%s_ns.json' % language
76 - fh = utils.create_txt_filehandle(settings.namespace_location, file, 'r', settings.encoding)
77 - ns = json.load(fh)
78 - fh.close()
79 - ns = ns['query']['namespaces']
80 - return ns
81 -
82 -
8364 def build_namespaces_locale(namespaces, include=['0']):
8465 '''
8566 @include is a list of namespace keys that should not be ignored, the default
8667 setting is to ignore all namespaces except the main namespace.
8768 '''
8869 ns = []
89 - for namespace in namespaces:
90 - if namespace not in include:
91 - value = namespaces[namespace].get(u'*', None)
 70+ for key, value in namespaces.iteritems():
 71+ if key not in include:
 72+ #value = namespaces[namespace].get(u'*', None)
9273 ns.append(value)
9374 return ns
9475
@@ -95,7 +76,6 @@
9677 def parse_comments(revisions, function):
9778 for revision in revisions:
9879 comment = revision.find('{%s}comment' % settings.xml_namespace)
99 - #timestamp = revision.find('{%s}timestamp' % settings.xml_namespace).text
10080 if comment != None and comment.text != None:
10181 comment.text = function(comment.text)
10282 return revisions
@@ -104,7 +84,8 @@
10585 def verify_article_belongs_namespace(elem, namespaces):
10686 '''
10787 @namespaces is a list of namespaces that should be ignored, hence if the
108 - title of article starts with the namespace then return False else return True
 88+ title of article starts with the namespace then return False else return
 89+ True
10990 '''
11091 title = elem.text
11192 if title == None:
@@ -114,11 +95,13 @@
11596 return False
11697 return True
11798
 99+
118100 def validate_hostname(address):
119101 '''
120 - This is not a foolproof solution at all. The problem is that it's really hard
121 - to determine whether a string is a hostname or not **reliably**. This is a
122 - very fast rule of thumb. Will lead to false positives, but that's life :)
 102+ This is not a foolproof solution at all. The problem is that it's really
 103+ hard to determine whether a string is a hostname or not **reliably**. This
 104+ is a very fast rule of thumb. Will lead to false positives,
 105+ but that's life :)
123106 '''
124107 parts = address.split(".")
125108 if len(parts) > 2:
@@ -174,13 +157,16 @@
175158 ignore anonymous editors.
176159 '''
177160 if contributor.get('deleted'):
178 - return None # ASK: Not sure if this is the best way to code deleted contributors.
 161+ # ASK: Not sure if this is the best way to code deleted contributors.
 162+ return None
179163 elem = contributor.find('id')
180164 if elem != None:
181165 return {'id':elem.text}
182166 else:
183167 elem = contributor.find('ip')
184 - if elem != None and elem.text != None and validate_ip(elem.text) == False and validate_hostname(elem.text) == False:
 168+ if elem != None and elem.text != None \
 169+ and validate_ip(elem.text) == False \
 170+ and validate_hostname(elem.text) == False:
185171 return {'username':elem.text, 'id': elem.text}
186172 else:
187173 return None
@@ -192,8 +178,8 @@
193179 @output is where to store the data, a filehandle
194180 @**kwargs contains extra information
195181
196 - the variable tags determines which attributes are being parsed, the values in
197 - this dictionary are the functions used to extract the data.
 182+ the variable tags determines which attributes are being parsed, the values
 183+ in this dictionary are the functions used to extract the data.
198184 '''
199185 headers = ['id', 'date', 'article', 'username']
200186 tags = {'contributor': {'id': extract_contributor_id,
@@ -218,17 +204,20 @@
219205 for function in tags[tag].keys():
220206 f = tags[tag][function]
221207 value = f(el, bots=bots)
222 - if type(value) == type({}):
 208+ if isinstance(value, list):
 209+ #if type(value) == type({}):
223210 for kw in value:
224211 vars[x][kw] = value[kw]
225212 else:
226213 vars[x][function] = value
227214
228215 '''
229 - This loop determines for each observation whether it should be stored or not.
 216+ This loop determines for each observation whether it should be stored
 217+ or not.
230218 '''
231219 for x in vars:
232 - if vars[x]['bot'] == 1 or vars[x]['id'] == None or vars[x]['username'] == None:
 220+ if vars[x]['bot'] == 1 or vars[x]['id'] == None \
 221+ or vars[x]['username'] == None:
233222 continue
234223 else:
235224 f = []
@@ -238,38 +227,50 @@
239228 return flat
240229
241230
242 -def parse_dumpfile(project, file, language_code, namespaces=['0']):
 231+def parse_dumpfile(tasks, project, language_code, filehandles, lock, namespaces=['0']):
243232 bot_ids = bots.retrieve_bots(language_code)
244 - ns = load_namespace(language_code)
245 - ns = build_namespaces_locale(ns, namespaces)
246 -
247233 location = os.path.join(settings.input_location, language_code, project)
248234 output = os.path.join(settings.input_location, language_code, project, 'txt')
249 - filehandles = [utils.create_txt_filehandle(output, '%s.csv' % fh, 'a', settings.encoding) for fh in xrange(settings.max_filehandles)]
 235+ widgets = log.init_progressbar_widgets('Extracting data')
250236
251 - fh = utils.create_txt_filehandle(location, file, 'r', settings.encoding)
252 - #fh = utils.create_txt_filehandle(location, '%s%s-latest-stub-meta-history.xml' % (language_code, project), 'r', settings.encoding)
253 - total_pages, processed_pages = 0.0, 0.0
254 - for page in wikitree.parser.read_input(fh):
255 - title = page.find('title')
256 - total_pages += 1
257 - if verify_article_belongs_namespace(title, ns):
258 - #cElementTree.dump(page)
259 - article_id = page.find('id').text
260 - revisions = page.findall('revision')
261 - revisions = parse_comments(revisions, remove_numeric_character_references)
262 - output = output_editor_information(revisions, article_id, bot_ids)
263 - write_output(output, filehandles)
264 - processed_pages += 1
265 - print processed_pages
266 - page.clear()
267 - fh.close()
268 - print 'Total pages: %s' % total_pages
269 - print 'Pages processed: %s (%s)' % (processed_pages, processed_pages / total_pages)
270 - filehandles = [file.close() for file in filehandles]
 237+ while True:
 238+ total, processed = 0.0, 0.0
 239+ filename = tasks.get(block=False)
 240+ tasks.task_done()
 241+ if filename == None:
 242+ print 'There are no more jobs in the queue left.'
 243+ break
271244
 245+ filesize = file_utils.determine_filesize(location, filename)
 246+ fh = file_utils.create_txt_filehandle(location, filename, 'r', settings.encoding)
 247+ ns, xml_namespace = wikitree.parser.extract_meta_information(fh)
 248+ ns = build_namespaces_locale(ns, namespaces)
 249+ settings.xml_namespace = xml_namespace
272250
 251+ pbar = progressbar.ProgressBar(widgets=widgets, maxval=filesize).start()
 252+ for page, article_size in wikitree.parser.read_input(fh):
 253+ title = page.find('title')
 254+ total += 1
 255+ if verify_article_belongs_namespace(title, ns):
 256+ article_id = page.find('id').text
 257+ revisions = page.findall('revision')
 258+ revisions = parse_comments(revisions, remove_numeric_character_references)
 259+ output = output_editor_information(revisions, article_id, bot_ids)
 260+ write_output(output, filehandles, lock)
 261+ processed += 1
 262+ page.clear()
 263+ pbar.update(pbar.currval + article_size)
 264+ fh.close()
 265+ print 'Total pages: %s' % total
 266+ print 'Pages processed: %s (%s)' % (processed, processed / total)
 267+
 268+ return True
 269+
 270+
273271 def group_observations(obs):
 272+ '''
 273+ mmm forgot the purpose of this function
 274+ '''
274275 d = {}
275276 for o in obs:
276277 id = o[0]
@@ -279,14 +280,16 @@
280281 return d
281282
282283
283 -def write_output(observations, filehandles):
 284+def write_output(observations, filehandles, lock):
284285 observations = group_observations(observations)
285286 for obs in observations:
286287 for i, o in enumerate(observations[obs]):
287288 if i == 0:
288289 fh = filehandles[hash(obs)]
289290 try:
290 - utils.write_list_to_csv(o, fh)
 291+ lock.acquire()
 292+ file_utils.write_list_to_csv(o, fh)
 293+ lock.releas()
291294 except Exception, error:
292295 print error
293296
@@ -303,7 +306,88 @@
304307 return sum([ord(i) for i in id]) % settings.max_filehandles
305308
306309
 310+def prepare(output):
 311+ file_utils.delete_file(output, None, directory=True)
 312+ file_utils.create_directory(output)
 313+
 314+
 315+def unzip(properties):
 316+ tasks = multiprocessing.JoinableQueue()
 317+ canonical_filename = file_utils.determine_canonical_name(properties.filename)
 318+ extension = file_utils.determine_file_extension(properties.filename)
 319+ files = file_utils.retrieve_file_list(properties.location,
 320+ extension,
 321+ mask=canonical_filename)
 322+ print 'Checking if dump file has been extracted...'
 323+ for fn in files:
 324+ file_without_ext = fn.replace('%s%s' % ('.', extension), '')
 325+ result = file_utils.check_file_exists(properties.location, file_without_ext)
 326+ if not result:
 327+ print 'Dump file %s has not yet been extracted...' % fn
 328+ retcode = compression.launch_zip_extractor(properties.location,
 329+ fn,
 330+ properties)
 331+ else:
 332+ print 'Dump file has already been extracted...'
 333+ retcode = 0
 334+ if retcode == 0:
 335+ tasks.put(file_without_ext)
 336+ elif retcode != 0:
 337+ print 'There was an error while extracting %s, please make sure \
 338+ that %s is valid archive.' % (fn, fn)
 339+ return False
 340+
 341+ return tasks
 342+
 343+def launcher(properties):
 344+ '''
 345+ This is the main entry point for the extact phase of the data processing
 346+ chain. First, it will put a the files that need to be extracted in a queue
 347+ called tasks, then it will remove some old files to make sure that there is
 348+ no data pollution and finally it will start the parser to actually extract
 349+ the variables from the different dump files.
 350+ '''
 351+ result = True
 352+ tasks = unzip(properties)
 353+ prepare(properties.language_code, properties.project)
 354+ lock = multiprocessing.Lock()
 355+ filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 'a',
 356+ settings.encoding) for fh in xrange(settings.max_filehandles)]
 357+ output = os.path.join(settings.input_location, properties.language_code,
 358+ properties.project, 'txt')
 359+
 360+ consumers = [multiprocessing.Process(target=parse_dumpfile,
 361+ args=(tasks,
 362+ properties.project,
 363+ properties.language_code,
 364+ filehandles,
 365+ lock,
 366+ properties.namespaces))
 367+ for x in xrange(settings.number_of_processes)]
 368+
 369+ for x in xrange(settings.number_of_processes):
 370+ tasks.put(None)
 371+
 372+ for w in consumers:
 373+ w.start()
 374+
 375+ tasks.join()
 376+ filehandles = [fh.close() for fh in filehandles]
 377+# result = parse_dumpfile(properties.project, file_without_ext,
 378+# properties.language_code,
 379+# namespaces=['0'])
 380+
 381+
 382+ result = all([consumer.exitcode for consumer in consumers])
 383+ return result
 384+
 385+
 386+def debug():
 387+ project = 'wiki'
 388+ language_code = 'sv'
 389+ filename = 'svwiki-latest-stub-meta-history.xml'
 390+ #parse_dumpfile(project, filename, language_code)
 391+ launcher()
 392+
307393 if __name__ == '__main__':
308 - project = 'wiki'
309 - language_code = 'en'
310 - parse_dumpfile(project, language_code)
 394+ debug()

Status & tagging log