r80310 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r80309‎ | r80310 | r80311 >
Date:20:20, 14 January 2011
Author:diederik
Status:deferred
Tags:
Comment:
Dump downloader uses multiple processes if possible
Modified paths:
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/utils/dump_downloader.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -39,6 +39,7 @@
4040 from utils import ordered_dict
4141 from utils import exceptions
4242 from utils import log
 43+from utils import timer
4344 from database import db
4445 from etl import chunker
4546 from etl import extracter
@@ -65,21 +66,6 @@
6667 for item in self.__dict__:
6768 yield item
6869
69 -class Timer(object):
70 - def __init__(self):
71 - self.t0 = datetime.datetime.now()
72 -
73 - def __str__(self):
74 - return 'Timer started: %s' % self.t0
75 -
76 - def stop(self):
77 - self.t1 = datetime.datetime.now()
78 -
79 - def elapsed(self):
80 - self.stop()
81 - print 'Processing time: %s' % (self.t1 - self.t0)
82 -
83 -
8470 def get_value(args, key):
8571 return getattr(args, key, None)
8672
@@ -213,44 +199,30 @@
214200
215201 def dump_downloader_launcher(args, logger, config):
216202 print 'Start downloading'
217 - timer = Timer()
 203+ stopwatch = timer.Timer()
218204 write_message_to_log(logger, args, config)
 205+ log.log_to_mongo(config.full_project, 'download', stopwatch, type='start')
 206+ dump_downloader.launcher(config)
 207+ stopwatch.elapsed()
 208+ log.log_to_mongo(config.full_project, 'download', stopwatch, type='finish')
219209
220 - extension = utils.determine_file_extension(config.filename)
221 - filemode = utils.determine_file_mode(extension)
222 - log.log_to_mongo(config.full_project, 'download', timer, type='start')
223 - task_queue, result = dump_downloader.create_list_dumpfiles(settings.wp_dump_location, config.path, config.filename, extension)
224210
225 - if result:
226 - while True:
227 - filename = task_queue.get(block=False)
228 - if filename == None:
229 - break
230 - task_queue.task_done()
231 - dump_downloader.download_wiki_file(settings.wp_dump_location, config.path, filename, config.location, filemode)
232 - else:
233 - dump_downloader.download_wiki_file(settings.wp_dump_location, config.path, config.filename, config.location, filemode)
234 -
235 - timer.elapsed()
236 - log.log_to_mongo(config.full_project, 'download', timer, type='finish')
237 -
238 -
239211 def launch_zip_extractor(args, logger, location, file, config):
240212 print 'Unzipping zip file'
241 - timer = Timer()
242 - log.log_to_mongo(config.full_project, 'unpack', timer, type='start')
 213+ stopwatch = timer.Timer()
 214+ log.log_to_mongo(config.full_project, 'unpack', stopwatch, type='start')
243215 write_message_to_log(logger, args, None, message=None, verb=None, location=location, file=file)
244216 compressor = compression.Compressor(location, file)
245217 retcode = compressor.extract()
246 - timer.elapsed()
247 - log.log_to_mongo(config.full_project, 'unpack', timer, type='finish')
 218+ stopwatch.elapsed()
 219+ log.log_to_mongo(config.full_project, 'unpack', stopwatch, type='finish')
248220 return retcode
249221
250222
251223 def extract_launcher(args, logger, config):
252224 print 'Extracting data from XML'
253 - timer = Timer()
254 - log.log_to_mongo(config.full_project, 'extract', timer, type='start')
 225+ stopwatch = timer.Timer()
 226+ log.log_to_mongo(config.full_project, 'extract', stopwatch, type='start')
255227 write_message_to_log(logger, args, None, message=None, verb=None, location=config.location, language_code=config.language_code, project=config.project)
256228 '''make sure that the file exists, if it doesn't then expand it first'''
257229 print 'Checking if dump file has been extracted...'
@@ -269,59 +241,59 @@
270242 print 'Dump file has already been extracted...'
271243 retcode = 0
272244 if retcode != 0:
 245+ print 'There was an error while extracting %s, please make sure that %s is valid archive.' % (file, file)
273246 sys.exit(retcode)
274247 extracter.parse_dumpfile(config.project, file_without_ext, config.language_code, namespaces=['0'])
275 - timer.elapsed()
276 - log.log_to_mongo(config.full_project, 'extract', timer, type='finish')
 248+ stopwatch.elapsed()
 249+ log.log_to_mongo(config.full_project, 'extract', stopwatch, type='finish')
277250
278251
279252 def sort_launcher(args, logger, config):
280253 print 'Start sorting data'
281 - timer = Timer()
282 - log.log_to_mongo(config.full_project, 'sort', timer, type='start')
 254+ stopwatch = timer.Timer()
 255+ log.log_to_mongo(config.full_project, 'sort', stopwatch, type='start')
283256 write_message_to_log(logger, args, None, message=None, verb=None, location=config.location, input=config.txt, output=config.sorted)
284257 sort.mergesort_launcher(config.txt, config.sorted)
285 - timer.elapsed()
286 - log.log_to_mongo(config.full_project, 'sort', timer, type='finish')
 258+ stopwatch.elapsed()
 259+ log.log_to_mongo(config.full_project, 'sort', stopwatch, type='finish')
287260
288261
289262 def store_launcher(args, logger, config):
290263 print 'Start storing data in MongoDB'
291 - timer = Timer()
292 - log.log_to_mongo(config.full_project, 'store', timer, type='start')
 264+ stopwatch = timer.Timer()
 265+ log.log_to_mongo(config.full_project, 'store', stopwatch, type='start')
293266 db.cleanup_database(config.project, logger)
294267 write_message_to_log(logger, args, None, message=None, verb='Storing', location=config.location, input=config.sorted, project=config.full_project, collection=config.collection)
295268 store.launcher(config.sorted, config.full_project, config.collection)
296 - timer.elapsed()
297 - log.log_to_mongo(config.full_project, 'store', timer, type='finish')
 269+ stopwatch.elapsed()
 270+ log.log_to_mongo(config.full_project, 'store', stopwatch, type='finish')
298271
299272
300273 def transformer_launcher(args, logger, **kwargs):
301274 print 'Start transforming dataset'
302 - timer = Timer()
303 - log.log_to_mongo(config.full_project, 'transform', timer, type='start')
 275+ stopwatch = timer.Timer()
 276+ log.log_to_mongo(config.full_project, 'transform', stopwatch, type='start')
304277 db.cleanup_database(config.project, logger, 'dataset')
305278 write_message_to_log(logger, args, None, message=None, verb='Transforming', project=config.project, collection=config.collection)
306279 transformer.transform_editors_single_launcher(config.project, config.collection)
307 - timer.elapsed()
308 - log.log_to_mongo(full_project, 'transform', timer, type='finish')
 280+ stopwatch.elapsed()
 281+ log.log_to_mongo(full_project, 'transform', stopwatch, type='finish')
309282
310283
311284 def exporter_launcher(args, logger, config):
312285 print 'Start exporting dataset'
313 - timer = Timer()
314 - log.log_to_mongo(config.full_project, 'export', timer, type='start')
 286+ stopwatch = timer.Timer()
 287+ log.log_to_mongo(config.full_project, 'export', stopwatch, type='start')
315288 for target in config.targets:
316289 write_message_to_log(logger, args, None, message=None, verb='Exporting', target=target, dbname=config.full_project, collection=config.collection)
317290 target = datasets[target]
318291 print 'Dataset is created by: %s' % target
319292 exporter.dataset_launcher(config.full_project, config.collection, target)
320 - timer.elapsed()
321 - log.log_to_mongo(config.full_project, 'export', timer, type='finish')
 293+ stopwatch.elapsed()
 294+ log.log_to_mongo(config.full_project, 'export', stopwatch, type='finish')
322295
323296
324297 def cleanup(logger, args, config):
325 - #dirs = kwargs.get('directories')[1:]
326298 for dir in config.directories[1:]:
327299 write_message_to_log(logger, args, None, message=None, verb='Deleting', dir=dir)
328300 utils.delete_file(dir, '', directory=True)
@@ -330,15 +302,14 @@
331303 settings.verify_environment(dirs)
332304
333305 file = '%s%s' % (config.full_project, '_editor.bin')
334 - #file = kwargs.get('full_project') + '_editor.bin'
335306 write_message_to_log(logger, args, None, message=None, verb='Deleting', file=file)
336307 utils.delete_file(settings.binary_location, file)
337308
338309
339310 def all_launcher(args, logger, config):
340311 print 'The entire data processing chain has been called, this will take a couple of hours (at least) to complete.'
341 - timer = Timer()
342 - log.log_to_mongo(config.full_project, 'all', timer, type='start')
 312+ stopwatch = timer.Timer()
 313+ log.log_to_mongo(config.full_project, 'all', stopwatch, type='start')
343314 message = 'Start of building %s dataset.' % config.full_project
344315
345316 write_message_to_log(logger, args, None, message=message, verb=None, full_project=config.full_project, ignore=config.ignore, clean=config.clean)
@@ -356,8 +327,8 @@
357328 for function, callname in functions.iteritems():
358329 if callname not in config.ignore:
359330 function(args, logger, config)
360 - timer.elapsed()
361 - log.log_to_mongo(full_project, 'all', timer, type='finish')
 331+ stopwatch.elapsed()
 332+ log.log_to_mongo(full_project, 'all', stopwatch, type='finish')
362333
363334
364335 def supported_languages():
Index: trunk/tools/editor_trends/utils/dump_downloader.py
@@ -29,27 +29,37 @@
3030 import utils
3131
3232
33 -def create_list_dumpfiles(domain, path, filename, ext):
 33+def launcher(config):
 34+ tasks = create_list_dumpfiles(settings.wp_dump_location, config.path, config.filename)
 35+ consumers = [multiprocessing.Process(target=download_wiki_file, args=(tasks, config)) for i in xrange(settings.number_of_processes)]
 36+ for w in consumers:
 37+ w.start()
 38+
 39+ tasks.join()
 40+
 41+
 42+def create_list_dumpfiles(domain, path, filename):
3443 '''
3544 Wikipedia offers the option to download one dump file in separate batches.
3645 This function determines how many files there are for a giving dump and puts
3746 them in a queue.
3847 '''
3948 task_queue = multiprocessing.JoinableQueue()
 49+ ext = utils.determine_file_extension(filename)
4050 canonical_filename = utils.determine_canonical_name(filename)
41 - result = False
4251 for x in xrange(1, 100):
4352 f = '%s%s.xml.%s' % (canonical_filename, x, ext)
4453 res = check_remote_file_exists(domain, path, f)
4554 if res == None or res.status != 200:
 55+ if x == 1:
 56+ task_queue.put(filename)
4657 break
4758 else:
4859 print 'Added chunk to download: %s' % f
4960 task_queue.put(f)
50 - result = True
5161 for x in xrange(settings.number_of_processes):
5262 task_queue.put(None)
53 - return task_queue, result
 63+ return task_queue
5464
5565
5666 def check_remote_file_exists(domain, path, filename):
@@ -79,58 +89,59 @@
8090 return - 1
8191
8292
83 -def download_wiki_file(domain, path, filename, location, filemode):
 93+def download_wiki_file(task_queue, config):
8494 '''
8595 This is a very simple replacement for wget and curl because Windows does
8696 not have these tools installed by default
87 - @domain of the website where dump file is located
88 - @path location of the dumpfile
89 - @filename name of the file to be downloaded
90 - @location indicates where to store the file locally
91 - @filemode indicates whether we are downloading a binary or ascii file. (zip,
92 - 7z,gz are binary, json is ascii)
93 - @pbar is an instance of progressbar.ProgressBar()
9497 '''
9598 chunk = 4096
96 - filesize = determine_remote_filesize(domain, path, filename)
97 - if filemode == 'w':
98 - fh = utils.create_txt_filehandle(location, filename, filemode, settings.encoding)
99 - else:
100 - fh = utils.create_binary_filehandle(location, filename, 'wb')
 99+ while True:
 100+ filename = task_queue.get(block=False)
 101+ task_queue.task_done()
 102+ if filename == None:
 103+ print 'Swallowed a poison pill'
 104+ break
 105+ filename = 'zhwiki-latest-page_props.sql.gz'
 106+ extension = utils.determine_file_extension(filename)
 107+ filemode = utils.determine_file_mode(extension)
 108+ filesize = determine_remote_filesize(settings.wp_dump_location, config.path, filename)
 109+ if filemode == 'w':
 110+ fh = utils.create_txt_filehandle(config.location, filename, filemode, settings.encoding)
 111+ else:
 112+ fh = utils.create_binary_filehandle(config.location, filename, 'wb')
101113
102 - if filesize != -1:
103 - widgets = ['%s: ' % filename, progressbar.Percentage(), ' ',
104 - progressbar.Bar(marker=progressbar.RotatingMarker()), ' ',
105 - progressbar.ETA(), ' ', progressbar.FileTransferSpeed()]
 114+ if filesize != -1:
 115+ widgets = ['%s: ' % filename, progressbar.Percentage(), ' ',
 116+ progressbar.Bar(marker=progressbar.RotatingMarker()), ' ',
 117+ progressbar.ETA(), ' ', progressbar.FileTransferSpeed()]
106118
107 - pbar = progressbar.ProgressBar(widgets=widgets, maxval=filesize).start()
 119+ pbar = progressbar.ProgressBar(widgets=widgets, maxval=filesize).start()
108120
109 - try:
110 - if filename.endswith('json'):
111 - req = urllib2.Request(domain + path)
112 - else:
113 - req = urllib2.Request(domain + path + filename)
114 - response = urllib2.urlopen(req)
115 - while True:
116 - data = response.read(chunk)
117 - if not data:
118 - print 'Finished downloading %s%s%s.' % (domain, path, filename)
119 - break
120 - fh.write(data)
 121+ try:
 122+ if filename.endswith('json'):
 123+ req = urllib2.Request(settings.wp_dump_location + config.path)
 124+ else:
 125+ req = urllib2.Request(settings.wp_dump_location + config.path + filename)
 126+ response = urllib2.urlopen(req)
 127+ while True:
 128+ data = response.read(chunk)
 129+ if not data:
 130+ print 'Finished downloading %s%s%s.' % (settings.wp_dump_location, config.path, filename)
 131+ break
 132+ fh.write(data)
121133
122 - filesize -= chunk
123 - if filesize < 0:
124 - chunk = chunk + filesize
125 - pbar.update(pbar.currval + chunk)
 134+ filesize -= chunk
 135+ if filesize < 0:
 136+ chunk = chunk + filesize
 137+ pbar.update(pbar.currval + chunk)
126138
127 - except urllib2.URLError, error:
128 - print 'Reason: %s' % error
129 - except urllib2.HTTPError, error:
130 - print 'Error: %s' % error
131 - finally:
132 - fh.close()
 139+ except urllib2.URLError, error:
 140+ print 'Reason: %s' % error
 141+ except urllib2.HTTPError, error:
 142+ print 'Error: %s' % error
 143+ finally:
 144+ fh.close()
133145
134146
135147 if __name__ == '__main__':
136 - pbar = progressbar.ProgressBar()
137 - download_wp_dump('http://download.wikimedia.org/enwiki/latest', 'bla.xml', settings.input_location, pbar)
 148+ download_wp_dump('http://download.wikimedia.org/enwiki/latest', 'enwiki-latest-page_props.sql.gz', settings.input_location)

Status & tagging log