Index: trunk/tools/editor_trends/manage.py |
— | — | @@ -39,6 +39,7 @@ |
40 | 40 | from utils import ordered_dict |
41 | 41 | from utils import exceptions |
42 | 42 | from utils import log |
| 43 | +from utils import timer |
43 | 44 | from database import db |
44 | 45 | from etl import chunker |
45 | 46 | from etl import extracter |
— | — | @@ -65,21 +66,6 @@ |
66 | 67 | for item in self.__dict__: |
67 | 68 | yield item |
68 | 69 | |
69 | | -class Timer(object): |
70 | | - def __init__(self): |
71 | | - self.t0 = datetime.datetime.now() |
72 | | - |
73 | | - def __str__(self): |
74 | | - return 'Timer started: %s' % self.t0 |
75 | | - |
76 | | - def stop(self): |
77 | | - self.t1 = datetime.datetime.now() |
78 | | - |
79 | | - def elapsed(self): |
80 | | - self.stop() |
81 | | - print 'Processing time: %s' % (self.t1 - self.t0) |
82 | | - |
83 | | - |
84 | 70 | def get_value(args, key): |
85 | 71 | return getattr(args, key, None) |
86 | 72 | |
— | — | @@ -213,44 +199,30 @@ |
214 | 200 | |
215 | 201 | def dump_downloader_launcher(args, logger, config): |
216 | 202 | print 'Start downloading' |
217 | | - timer = Timer() |
| 203 | + stopwatch = timer.Timer() |
218 | 204 | write_message_to_log(logger, args, config) |
| 205 | + log.log_to_mongo(config.full_project, 'download', stopwatch, type='start') |
| 206 | + dump_downloader.launcher(config) |
| 207 | + stopwatch.elapsed() |
| 208 | + log.log_to_mongo(config.full_project, 'download', stopwatch, type='finish') |
219 | 209 | |
220 | | - extension = utils.determine_file_extension(config.filename) |
221 | | - filemode = utils.determine_file_mode(extension) |
222 | | - log.log_to_mongo(config.full_project, 'download', timer, type='start') |
223 | | - task_queue, result = dump_downloader.create_list_dumpfiles(settings.wp_dump_location, config.path, config.filename, extension) |
224 | 210 | |
225 | | - if result: |
226 | | - while True: |
227 | | - filename = task_queue.get(block=False) |
228 | | - if filename == None: |
229 | | - break |
230 | | - task_queue.task_done() |
231 | | - dump_downloader.download_wiki_file(settings.wp_dump_location, config.path, filename, config.location, filemode) |
232 | | - else: |
233 | | - dump_downloader.download_wiki_file(settings.wp_dump_location, config.path, config.filename, config.location, filemode) |
234 | | - |
235 | | - timer.elapsed() |
236 | | - log.log_to_mongo(config.full_project, 'download', timer, type='finish') |
237 | | - |
238 | | - |
239 | 211 | def launch_zip_extractor(args, logger, location, file, config): |
240 | 212 | print 'Unzipping zip file' |
241 | | - timer = Timer() |
242 | | - log.log_to_mongo(config.full_project, 'unpack', timer, type='start') |
| 213 | + stopwatch = timer.Timer() |
| 214 | + log.log_to_mongo(config.full_project, 'unpack', stopwatch, type='start') |
243 | 215 | write_message_to_log(logger, args, None, message=None, verb=None, location=location, file=file) |
244 | 216 | compressor = compression.Compressor(location, file) |
245 | 217 | retcode = compressor.extract() |
246 | | - timer.elapsed() |
247 | | - log.log_to_mongo(config.full_project, 'unpack', timer, type='finish') |
| 218 | + stopwatch.elapsed() |
| 219 | + log.log_to_mongo(config.full_project, 'unpack', stopwatch, type='finish') |
248 | 220 | return retcode |
249 | 221 | |
250 | 222 | |
251 | 223 | def extract_launcher(args, logger, config): |
252 | 224 | print 'Extracting data from XML' |
253 | | - timer = Timer() |
254 | | - log.log_to_mongo(config.full_project, 'extract', timer, type='start') |
| 225 | + stopwatch = timer.Timer() |
| 226 | + log.log_to_mongo(config.full_project, 'extract', stopwatch, type='start') |
255 | 227 | write_message_to_log(logger, args, None, message=None, verb=None, location=config.location, language_code=config.language_code, project=config.project) |
256 | 228 | '''make sure that the file exists, if it doesn't then expand it first''' |
257 | 229 | print 'Checking if dump file has been extracted...' |
— | — | @@ -269,59 +241,59 @@ |
270 | 242 | print 'Dump file has already been extracted...' |
271 | 243 | retcode = 0 |
272 | 244 | if retcode != 0: |
| 245 | + print 'There was an error while extracting %s, please make sure that %s is valid archive.' % (file, file) |
273 | 246 | sys.exit(retcode) |
274 | 247 | extracter.parse_dumpfile(config.project, file_without_ext, config.language_code, namespaces=['0']) |
275 | | - timer.elapsed() |
276 | | - log.log_to_mongo(config.full_project, 'extract', timer, type='finish') |
| 248 | + stopwatch.elapsed() |
| 249 | + log.log_to_mongo(config.full_project, 'extract', stopwatch, type='finish') |
277 | 250 | |
278 | 251 | |
279 | 252 | def sort_launcher(args, logger, config): |
280 | 253 | print 'Start sorting data' |
281 | | - timer = Timer() |
282 | | - log.log_to_mongo(config.full_project, 'sort', timer, type='start') |
| 254 | + stopwatch = timer.Timer() |
| 255 | + log.log_to_mongo(config.full_project, 'sort', stopwatch, type='start') |
283 | 256 | write_message_to_log(logger, args, None, message=None, verb=None, location=config.location, input=config.txt, output=config.sorted) |
284 | 257 | sort.mergesort_launcher(config.txt, config.sorted) |
285 | | - timer.elapsed() |
286 | | - log.log_to_mongo(config.full_project, 'sort', timer, type='finish') |
| 258 | + stopwatch.elapsed() |
| 259 | + log.log_to_mongo(config.full_project, 'sort', stopwatch, type='finish') |
287 | 260 | |
288 | 261 | |
289 | 262 | def store_launcher(args, logger, config): |
290 | 263 | print 'Start storing data in MongoDB' |
291 | | - timer = Timer() |
292 | | - log.log_to_mongo(config.full_project, 'store', timer, type='start') |
| 264 | + stopwatch = timer.Timer() |
| 265 | + log.log_to_mongo(config.full_project, 'store', stopwatch, type='start') |
293 | 266 | db.cleanup_database(config.project, logger) |
294 | 267 | write_message_to_log(logger, args, None, message=None, verb='Storing', location=config.location, input=config.sorted, project=config.full_project, collection=config.collection) |
295 | 268 | store.launcher(config.sorted, config.full_project, config.collection) |
296 | | - timer.elapsed() |
297 | | - log.log_to_mongo(config.full_project, 'store', timer, type='finish') |
| 269 | + stopwatch.elapsed() |
| 270 | + log.log_to_mongo(config.full_project, 'store', stopwatch, type='finish') |
298 | 271 | |
299 | 272 | |
300 | 273 | def transformer_launcher(args, logger, **kwargs): |
301 | 274 | print 'Start transforming dataset' |
302 | | - timer = Timer() |
303 | | - log.log_to_mongo(config.full_project, 'transform', timer, type='start') |
| 275 | + stopwatch = timer.Timer() |
| 276 | + log.log_to_mongo(config.full_project, 'transform', stopwatch, type='start') |
304 | 277 | db.cleanup_database(config.project, logger, 'dataset') |
305 | 278 | write_message_to_log(logger, args, None, message=None, verb='Transforming', project=config.project, collection=config.collection) |
306 | 279 | transformer.transform_editors_single_launcher(config.project, config.collection) |
307 | | - timer.elapsed() |
308 | | - log.log_to_mongo(full_project, 'transform', timer, type='finish') |
| 280 | + stopwatch.elapsed() |
| 281 | + log.log_to_mongo(full_project, 'transform', stopwatch, type='finish') |
309 | 282 | |
310 | 283 | |
311 | 284 | def exporter_launcher(args, logger, config): |
312 | 285 | print 'Start exporting dataset' |
313 | | - timer = Timer() |
314 | | - log.log_to_mongo(config.full_project, 'export', timer, type='start') |
| 286 | + stopwatch = timer.Timer() |
| 287 | + log.log_to_mongo(config.full_project, 'export', stopwatch, type='start') |
315 | 288 | for target in config.targets: |
316 | 289 | write_message_to_log(logger, args, None, message=None, verb='Exporting', target=target, dbname=config.full_project, collection=config.collection) |
317 | 290 | target = datasets[target] |
318 | 291 | print 'Dataset is created by: %s' % target |
319 | 292 | exporter.dataset_launcher(config.full_project, config.collection, target) |
320 | | - timer.elapsed() |
321 | | - log.log_to_mongo(config.full_project, 'export', timer, type='finish') |
| 293 | + stopwatch.elapsed() |
| 294 | + log.log_to_mongo(config.full_project, 'export', stopwatch, type='finish') |
322 | 295 | |
323 | 296 | |
324 | 297 | def cleanup(logger, args, config): |
325 | | - #dirs = kwargs.get('directories')[1:] |
326 | 298 | for dir in config.directories[1:]: |
327 | 299 | write_message_to_log(logger, args, None, message=None, verb='Deleting', dir=dir) |
328 | 300 | utils.delete_file(dir, '', directory=True) |
— | — | @@ -330,15 +302,14 @@ |
331 | 303 | settings.verify_environment(dirs) |
332 | 304 | |
333 | 305 | file = '%s%s' % (config.full_project, '_editor.bin') |
334 | | - #file = kwargs.get('full_project') + '_editor.bin' |
335 | 306 | write_message_to_log(logger, args, None, message=None, verb='Deleting', file=file) |
336 | 307 | utils.delete_file(settings.binary_location, file) |
337 | 308 | |
338 | 309 | |
339 | 310 | def all_launcher(args, logger, config): |
340 | 311 | print 'The entire data processing chain has been called, this will take a couple of hours (at least) to complete.' |
341 | | - timer = Timer() |
342 | | - log.log_to_mongo(config.full_project, 'all', timer, type='start') |
| 312 | + stopwatch = timer.Timer() |
| 313 | + log.log_to_mongo(config.full_project, 'all', stopwatch, type='start') |
343 | 314 | message = 'Start of building %s dataset.' % config.full_project |
344 | 315 | |
345 | 316 | write_message_to_log(logger, args, None, message=message, verb=None, full_project=config.full_project, ignore=config.ignore, clean=config.clean) |
— | — | @@ -356,8 +327,8 @@ |
357 | 328 | for function, callname in functions.iteritems(): |
358 | 329 | if callname not in config.ignore: |
359 | 330 | function(args, logger, config) |
360 | | - timer.elapsed() |
361 | | - log.log_to_mongo(full_project, 'all', timer, type='finish') |
| 331 | + stopwatch.elapsed() |
| 332 | + log.log_to_mongo(full_project, 'all', stopwatch, type='finish') |
362 | 333 | |
363 | 334 | |
364 | 335 | def supported_languages(): |
Index: trunk/tools/editor_trends/utils/dump_downloader.py |
— | — | @@ -29,27 +29,37 @@ |
30 | 30 | import utils |
31 | 31 | |
32 | 32 | |
33 | | -def create_list_dumpfiles(domain, path, filename, ext): |
| 33 | +def launcher(config): |
| 34 | + tasks = create_list_dumpfiles(settings.wp_dump_location, config.path, config.filename) |
| 35 | + consumers = [multiprocessing.Process(target=download_wiki_file, args=(tasks, config)) for i in xrange(settings.number_of_processes)] |
| 36 | + for w in consumers: |
| 37 | + w.start() |
| 38 | + |
| 39 | + tasks.join() |
| 40 | + |
| 41 | + |
| 42 | +def create_list_dumpfiles(domain, path, filename): |
34 | 43 | ''' |
35 | 44 | Wikipedia offers the option to download one dump file in separate batches. |
36 | 45 | This function determines how many files there are for a giving dump and puts |
37 | 46 | them in a queue. |
38 | 47 | ''' |
39 | 48 | task_queue = multiprocessing.JoinableQueue() |
| 49 | + ext = utils.determine_file_extension(filename) |
40 | 50 | canonical_filename = utils.determine_canonical_name(filename) |
41 | | - result = False |
42 | 51 | for x in xrange(1, 100): |
43 | 52 | f = '%s%s.xml.%s' % (canonical_filename, x, ext) |
44 | 53 | res = check_remote_file_exists(domain, path, f) |
45 | 54 | if res == None or res.status != 200: |
| 55 | + if x == 1: |
| 56 | + task_queue.put(filename) |
46 | 57 | break |
47 | 58 | else: |
48 | 59 | print 'Added chunk to download: %s' % f |
49 | 60 | task_queue.put(f) |
50 | | - result = True |
51 | 61 | for x in xrange(settings.number_of_processes): |
52 | 62 | task_queue.put(None) |
53 | | - return task_queue, result |
| 63 | + return task_queue |
54 | 64 | |
55 | 65 | |
56 | 66 | def check_remote_file_exists(domain, path, filename): |
— | — | @@ -79,58 +89,59 @@ |
80 | 90 | return - 1 |
81 | 91 | |
82 | 92 | |
83 | | -def download_wiki_file(domain, path, filename, location, filemode): |
| 93 | +def download_wiki_file(task_queue, config): |
84 | 94 | ''' |
85 | 95 | This is a very simple replacement for wget and curl because Windows does |
86 | 96 | not have these tools installed by default |
87 | | - @domain of the website where dump file is located |
88 | | - @path location of the dumpfile |
89 | | - @filename name of the file to be downloaded |
90 | | - @location indicates where to store the file locally |
91 | | - @filemode indicates whether we are downloading a binary or ascii file. (zip, |
92 | | - 7z,gz are binary, json is ascii) |
93 | | - @pbar is an instance of progressbar.ProgressBar() |
94 | 97 | ''' |
95 | 98 | chunk = 4096 |
96 | | - filesize = determine_remote_filesize(domain, path, filename) |
97 | | - if filemode == 'w': |
98 | | - fh = utils.create_txt_filehandle(location, filename, filemode, settings.encoding) |
99 | | - else: |
100 | | - fh = utils.create_binary_filehandle(location, filename, 'wb') |
| 99 | + while True: |
| 100 | + filename = task_queue.get(block=False) |
| 101 | + task_queue.task_done() |
| 102 | + if filename == None: |
| 103 | + print 'Swallowed a poison pill' |
| 104 | + break |
| 105 | + filename = 'zhwiki-latest-page_props.sql.gz' |
| 106 | + extension = utils.determine_file_extension(filename) |
| 107 | + filemode = utils.determine_file_mode(extension) |
| 108 | + filesize = determine_remote_filesize(settings.wp_dump_location, config.path, filename) |
| 109 | + if filemode == 'w': |
| 110 | + fh = utils.create_txt_filehandle(config.location, filename, filemode, settings.encoding) |
| 111 | + else: |
| 112 | + fh = utils.create_binary_filehandle(config.location, filename, 'wb') |
101 | 113 | |
102 | | - if filesize != -1: |
103 | | - widgets = ['%s: ' % filename, progressbar.Percentage(), ' ', |
104 | | - progressbar.Bar(marker=progressbar.RotatingMarker()), ' ', |
105 | | - progressbar.ETA(), ' ', progressbar.FileTransferSpeed()] |
| 114 | + if filesize != -1: |
| 115 | + widgets = ['%s: ' % filename, progressbar.Percentage(), ' ', |
| 116 | + progressbar.Bar(marker=progressbar.RotatingMarker()), ' ', |
| 117 | + progressbar.ETA(), ' ', progressbar.FileTransferSpeed()] |
106 | 118 | |
107 | | - pbar = progressbar.ProgressBar(widgets=widgets, maxval=filesize).start() |
| 119 | + pbar = progressbar.ProgressBar(widgets=widgets, maxval=filesize).start() |
108 | 120 | |
109 | | - try: |
110 | | - if filename.endswith('json'): |
111 | | - req = urllib2.Request(domain + path) |
112 | | - else: |
113 | | - req = urllib2.Request(domain + path + filename) |
114 | | - response = urllib2.urlopen(req) |
115 | | - while True: |
116 | | - data = response.read(chunk) |
117 | | - if not data: |
118 | | - print 'Finished downloading %s%s%s.' % (domain, path, filename) |
119 | | - break |
120 | | - fh.write(data) |
| 121 | + try: |
| 122 | + if filename.endswith('json'): |
| 123 | + req = urllib2.Request(settings.wp_dump_location + config.path) |
| 124 | + else: |
| 125 | + req = urllib2.Request(settings.wp_dump_location + config.path + filename) |
| 126 | + response = urllib2.urlopen(req) |
| 127 | + while True: |
| 128 | + data = response.read(chunk) |
| 129 | + if not data: |
| 130 | + print 'Finished downloading %s%s%s.' % (settings.wp_dump_location, config.path, filename) |
| 131 | + break |
| 132 | + fh.write(data) |
121 | 133 | |
122 | | - filesize -= chunk |
123 | | - if filesize < 0: |
124 | | - chunk = chunk + filesize |
125 | | - pbar.update(pbar.currval + chunk) |
| 134 | + filesize -= chunk |
| 135 | + if filesize < 0: |
| 136 | + chunk = chunk + filesize |
| 137 | + pbar.update(pbar.currval + chunk) |
126 | 138 | |
127 | | - except urllib2.URLError, error: |
128 | | - print 'Reason: %s' % error |
129 | | - except urllib2.HTTPError, error: |
130 | | - print 'Error: %s' % error |
131 | | - finally: |
132 | | - fh.close() |
| 139 | + except urllib2.URLError, error: |
| 140 | + print 'Reason: %s' % error |
| 141 | + except urllib2.HTTPError, error: |
| 142 | + print 'Error: %s' % error |
| 143 | + finally: |
| 144 | + fh.close() |
133 | 145 | |
134 | 146 | |
135 | 147 | if __name__ == '__main__': |
136 | | - pbar = progressbar.ProgressBar() |
137 | | - download_wp_dump('http://download.wikimedia.org/enwiki/latest', 'bla.xml', settings.input_location, pbar) |
| 148 | + download_wp_dump('http://download.wikimedia.org/enwiki/latest', 'enwiki-latest-page_props.sql.gz', settings.input_location) |