Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -13,17 +13,15 @@ |
14 | 14 | ''' |
15 | 15 | |
16 | 16 | __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
17 | | -__author__email = 'dvanliere at gmail dot com' |
| 17 | +__email__ = 'dvanliere at gmail dot com' |
18 | 18 | __date__ = '2010-12-13' |
19 | 19 | __version__ = '0.1' |
20 | 20 | |
21 | 21 | import sys |
22 | 22 | import re |
23 | | -import json |
24 | 23 | import os |
25 | | -import xml.etree.cElementTree as cElementTree |
26 | 24 | import multiprocessing |
27 | | -from Queue import Empty |
| 25 | +import progressbar |
28 | 26 | |
29 | 27 | sys.path.append('..') |
30 | 28 | import configuration |
— | — | @@ -31,7 +29,9 @@ |
32 | 30 | |
33 | 31 | import wikitree.parser |
34 | 32 | from bots import bots |
35 | | -from utils import utils |
| 33 | +from utils import file_utils |
| 34 | +from utils import compression |
| 35 | +from utils import log |
36 | 36 | |
37 | 37 | try: |
38 | 38 | import psyco |
— | — | @@ -60,34 +60,15 @@ |
61 | 61 | return '' |
62 | 62 | |
63 | 63 | |
64 | | -def remove_namespace(element, namespace): |
65 | | - '''Remove namespace from the XML document.''' |
66 | | - ns = u'{%s}' % namespace |
67 | | - nsl = len(ns) |
68 | | - for elem in element.getiterator(): |
69 | | - if elem.tag.startswith(ns): |
70 | | - elem.tag = elem.tag[nsl:] |
71 | | - return element |
72 | | - |
73 | | - |
74 | | -def load_namespace(language): |
75 | | - file = '%s_ns.json' % language |
76 | | - fh = utils.create_txt_filehandle(settings.namespace_location, file, 'r', settings.encoding) |
77 | | - ns = json.load(fh) |
78 | | - fh.close() |
79 | | - ns = ns['query']['namespaces'] |
80 | | - return ns |
81 | | - |
82 | | - |
83 | 64 | def build_namespaces_locale(namespaces, include=['0']): |
84 | 65 | ''' |
85 | 66 | @include is a list of namespace keys that should not be ignored, the default |
86 | 67 | setting is to ignore all namespaces except the main namespace. |
87 | 68 | ''' |
88 | 69 | ns = [] |
89 | | - for namespace in namespaces: |
90 | | - if namespace not in include: |
91 | | - value = namespaces[namespace].get(u'*', None) |
| 70 | + for key, value in namespaces.iteritems(): |
| 71 | + if key not in include: |
| 72 | + #value = namespaces[namespace].get(u'*', None) |
92 | 73 | ns.append(value) |
93 | 74 | return ns |
94 | 75 | |
— | — | @@ -95,7 +76,6 @@ |
96 | 77 | def parse_comments(revisions, function): |
97 | 78 | for revision in revisions: |
98 | 79 | comment = revision.find('{%s}comment' % settings.xml_namespace) |
99 | | - #timestamp = revision.find('{%s}timestamp' % settings.xml_namespace).text |
100 | 80 | if comment != None and comment.text != None: |
101 | 81 | comment.text = function(comment.text) |
102 | 82 | return revisions |
— | — | @@ -104,7 +84,8 @@ |
105 | 85 | def verify_article_belongs_namespace(elem, namespaces): |
106 | 86 | ''' |
107 | 87 | @namespaces is a list of namespaces that should be ignored, hence if the |
108 | | - title of article starts with the namespace then return False else return True |
| 88 | + title of article starts with the namespace then return False else return |
| 89 | + True |
109 | 90 | ''' |
110 | 91 | title = elem.text |
111 | 92 | if title == None: |
— | — | @@ -114,11 +95,13 @@ |
115 | 96 | return False |
116 | 97 | return True |
117 | 98 | |
| 99 | + |
118 | 100 | def validate_hostname(address): |
119 | 101 | ''' |
120 | | - This is not a foolproof solution at all. The problem is that it's really hard |
121 | | - to determine whether a string is a hostname or not **reliably**. This is a |
122 | | - very fast rule of thumb. Will lead to false positives, but that's life :) |
| 102 | + This is not a foolproof solution at all. The problem is that it's really |
| 103 | + hard to determine whether a string is a hostname or not **reliably**. This |
| 104 | + is a very fast rule of thumb. Will lead to false positives, |
| 105 | + but that's life :) |
123 | 106 | ''' |
124 | 107 | parts = address.split(".") |
125 | 108 | if len(parts) > 2: |
— | — | @@ -174,13 +157,16 @@ |
175 | 158 | ignore anonymous editors. |
176 | 159 | ''' |
177 | 160 | if contributor.get('deleted'): |
178 | | - return None # ASK: Not sure if this is the best way to code deleted contributors. |
| 161 | + # ASK: Not sure if this is the best way to code deleted contributors. |
| 162 | + return None |
179 | 163 | elem = contributor.find('id') |
180 | 164 | if elem != None: |
181 | 165 | return {'id':elem.text} |
182 | 166 | else: |
183 | 167 | elem = contributor.find('ip') |
184 | | - if elem != None and elem.text != None and validate_ip(elem.text) == False and validate_hostname(elem.text) == False: |
| 168 | + if elem != None and elem.text != None \ |
| 169 | + and validate_ip(elem.text) == False \ |
| 170 | + and validate_hostname(elem.text) == False: |
185 | 171 | return {'username':elem.text, 'id': elem.text} |
186 | 172 | else: |
187 | 173 | return None |
— | — | @@ -192,8 +178,8 @@ |
193 | 179 | @output is where to store the data, a filehandle |
194 | 180 | @**kwargs contains extra information |
195 | 181 | |
196 | | - the variable tags determines which attributes are being parsed, the values in |
197 | | - this dictionary are the functions used to extract the data. |
| 182 | + the variable tags determines which attributes are being parsed, the values |
| 183 | + in this dictionary are the functions used to extract the data. |
198 | 184 | ''' |
199 | 185 | headers = ['id', 'date', 'article', 'username'] |
200 | 186 | tags = {'contributor': {'id': extract_contributor_id, |
— | — | @@ -218,17 +204,20 @@ |
219 | 205 | for function in tags[tag].keys(): |
220 | 206 | f = tags[tag][function] |
221 | 207 | value = f(el, bots=bots) |
222 | | - if type(value) == type({}): |
| 208 | + if isinstance(value, list): |
| 209 | + #if type(value) == type({}): |
223 | 210 | for kw in value: |
224 | 211 | vars[x][kw] = value[kw] |
225 | 212 | else: |
226 | 213 | vars[x][function] = value |
227 | 214 | |
228 | 215 | ''' |
229 | | - This loop determines for each observation whether it should be stored or not. |
| 216 | + This loop determines for each observation whether it should be stored |
| 217 | + or not. |
230 | 218 | ''' |
231 | 219 | for x in vars: |
232 | | - if vars[x]['bot'] == 1 or vars[x]['id'] == None or vars[x]['username'] == None: |
| 220 | + if vars[x]['bot'] == 1 or vars[x]['id'] == None \ |
| 221 | + or vars[x]['username'] == None: |
233 | 222 | continue |
234 | 223 | else: |
235 | 224 | f = [] |
— | — | @@ -238,38 +227,50 @@ |
239 | 228 | return flat |
240 | 229 | |
241 | 230 | |
242 | | -def parse_dumpfile(project, file, language_code, namespaces=['0']): |
| 231 | +def parse_dumpfile(tasks, project, language_code, filehandles, lock, namespaces=['0']): |
243 | 232 | bot_ids = bots.retrieve_bots(language_code) |
244 | | - ns = load_namespace(language_code) |
245 | | - ns = build_namespaces_locale(ns, namespaces) |
246 | | - |
247 | 233 | location = os.path.join(settings.input_location, language_code, project) |
248 | 234 | output = os.path.join(settings.input_location, language_code, project, 'txt') |
249 | | - filehandles = [utils.create_txt_filehandle(output, '%s.csv' % fh, 'a', settings.encoding) for fh in xrange(settings.max_filehandles)] |
| 235 | + widgets = log.init_progressbar_widgets('Extracting data') |
250 | 236 | |
251 | | - fh = utils.create_txt_filehandle(location, file, 'r', settings.encoding) |
252 | | - #fh = utils.create_txt_filehandle(location, '%s%s-latest-stub-meta-history.xml' % (language_code, project), 'r', settings.encoding) |
253 | | - total_pages, processed_pages = 0.0, 0.0 |
254 | | - for page in wikitree.parser.read_input(fh): |
255 | | - title = page.find('title') |
256 | | - total_pages += 1 |
257 | | - if verify_article_belongs_namespace(title, ns): |
258 | | - #cElementTree.dump(page) |
259 | | - article_id = page.find('id').text |
260 | | - revisions = page.findall('revision') |
261 | | - revisions = parse_comments(revisions, remove_numeric_character_references) |
262 | | - output = output_editor_information(revisions, article_id, bot_ids) |
263 | | - write_output(output, filehandles) |
264 | | - processed_pages += 1 |
265 | | - print processed_pages |
266 | | - page.clear() |
267 | | - fh.close() |
268 | | - print 'Total pages: %s' % total_pages |
269 | | - print 'Pages processed: %s (%s)' % (processed_pages, processed_pages / total_pages) |
270 | | - filehandles = [file.close() for file in filehandles] |
| 237 | + while True: |
| 238 | + total, processed = 0.0, 0.0 |
| 239 | + filename = tasks.get(block=False) |
| 240 | + tasks.task_done() |
| 241 | + if filename == None: |
| 242 | + print 'There are no more jobs in the queue left.' |
| 243 | + break |
271 | 244 | |
| 245 | + filesize = file_utils.determine_filesize(location, filename) |
| 246 | + fh = file_utils.create_txt_filehandle(location, filename, 'r', settings.encoding) |
| 247 | + ns, xml_namespace = wikitree.parser.extract_meta_information(fh) |
| 248 | + ns = build_namespaces_locale(ns, namespaces) |
| 249 | + settings.xml_namespace = xml_namespace |
272 | 250 | |
| 251 | + pbar = progressbar.ProgressBar(widgets=widgets, maxval=filesize).start() |
| 252 | + for page, article_size in wikitree.parser.read_input(fh): |
| 253 | + title = page.find('title') |
| 254 | + total += 1 |
| 255 | + if verify_article_belongs_namespace(title, ns): |
| 256 | + article_id = page.find('id').text |
| 257 | + revisions = page.findall('revision') |
| 258 | + revisions = parse_comments(revisions, remove_numeric_character_references) |
| 259 | + output = output_editor_information(revisions, article_id, bot_ids) |
| 260 | + write_output(output, filehandles, lock) |
| 261 | + processed += 1 |
| 262 | + page.clear() |
| 263 | + pbar.update(pbar.currval + article_size) |
| 264 | + fh.close() |
| 265 | + print 'Total pages: %s' % total |
| 266 | + print 'Pages processed: %s (%s)' % (processed, processed / total) |
| 267 | + |
| 268 | + return True |
| 269 | + |
| 270 | + |
273 | 271 | def group_observations(obs): |
| 272 | + ''' |
| 273 | + mmm forgot the purpose of this function |
| 274 | + ''' |
274 | 275 | d = {} |
275 | 276 | for o in obs: |
276 | 277 | id = o[0] |
— | — | @@ -279,14 +280,16 @@ |
280 | 281 | return d |
281 | 282 | |
282 | 283 | |
283 | | -def write_output(observations, filehandles): |
| 284 | +def write_output(observations, filehandles, lock): |
284 | 285 | observations = group_observations(observations) |
285 | 286 | for obs in observations: |
286 | 287 | for i, o in enumerate(observations[obs]): |
287 | 288 | if i == 0: |
288 | 289 | fh = filehandles[hash(obs)] |
289 | 290 | try: |
290 | | - utils.write_list_to_csv(o, fh) |
| 291 | + lock.acquire() |
| 292 | + file_utils.write_list_to_csv(o, fh) |
| 293 | + lock.releas() |
291 | 294 | except Exception, error: |
292 | 295 | print error |
293 | 296 | |
— | — | @@ -303,7 +306,88 @@ |
304 | 307 | return sum([ord(i) for i in id]) % settings.max_filehandles |
305 | 308 | |
306 | 309 | |
| 310 | +def prepare(output): |
| 311 | + file_utils.delete_file(output, None, directory=True) |
| 312 | + file_utils.create_directory(output) |
| 313 | + |
| 314 | + |
| 315 | +def unzip(properties): |
| 316 | + tasks = multiprocessing.JoinableQueue() |
| 317 | + canonical_filename = file_utils.determine_canonical_name(properties.filename) |
| 318 | + extension = file_utils.determine_file_extension(properties.filename) |
| 319 | + files = file_utils.retrieve_file_list(properties.location, |
| 320 | + extension, |
| 321 | + mask=canonical_filename) |
| 322 | + print 'Checking if dump file has been extracted...' |
| 323 | + for fn in files: |
| 324 | + file_without_ext = fn.replace('%s%s' % ('.', extension), '') |
| 325 | + result = file_utils.check_file_exists(properties.location, file_without_ext) |
| 326 | + if not result: |
| 327 | + print 'Dump file %s has not yet been extracted...' % fn |
| 328 | + retcode = compression.launch_zip_extractor(properties.location, |
| 329 | + fn, |
| 330 | + properties) |
| 331 | + else: |
| 332 | + print 'Dump file has already been extracted...' |
| 333 | + retcode = 0 |
| 334 | + if retcode == 0: |
| 335 | + tasks.put(file_without_ext) |
| 336 | + elif retcode != 0: |
| 337 | + print 'There was an error while extracting %s, please make sure \ |
| 338 | + that %s is valid archive.' % (fn, fn) |
| 339 | + return False |
| 340 | + |
| 341 | + return tasks |
| 342 | + |
| 343 | +def launcher(properties): |
| 344 | + ''' |
| 345 | + This is the main entry point for the extact phase of the data processing |
| 346 | + chain. First, it will put a the files that need to be extracted in a queue |
| 347 | + called tasks, then it will remove some old files to make sure that there is |
| 348 | + no data pollution and finally it will start the parser to actually extract |
| 349 | + the variables from the different dump files. |
| 350 | + ''' |
| 351 | + result = True |
| 352 | + tasks = unzip(properties) |
| 353 | + prepare(properties.language_code, properties.project) |
| 354 | + lock = multiprocessing.Lock() |
| 355 | + filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 'a', |
| 356 | + settings.encoding) for fh in xrange(settings.max_filehandles)] |
| 357 | + output = os.path.join(settings.input_location, properties.language_code, |
| 358 | + properties.project, 'txt') |
| 359 | + |
| 360 | + consumers = [multiprocessing.Process(target=parse_dumpfile, |
| 361 | + args=(tasks, |
| 362 | + properties.project, |
| 363 | + properties.language_code, |
| 364 | + filehandles, |
| 365 | + lock, |
| 366 | + properties.namespaces)) |
| 367 | + for x in xrange(settings.number_of_processes)] |
| 368 | + |
| 369 | + for x in xrange(settings.number_of_processes): |
| 370 | + tasks.put(None) |
| 371 | + |
| 372 | + for w in consumers: |
| 373 | + w.start() |
| 374 | + |
| 375 | + tasks.join() |
| 376 | + filehandles = [fh.close() for fh in filehandles] |
| 377 | +# result = parse_dumpfile(properties.project, file_without_ext, |
| 378 | +# properties.language_code, |
| 379 | +# namespaces=['0']) |
| 380 | + |
| 381 | + |
| 382 | + result = all([consumer.exitcode for consumer in consumers]) |
| 383 | + return result |
| 384 | + |
| 385 | + |
| 386 | +def debug(): |
| 387 | + project = 'wiki' |
| 388 | + language_code = 'sv' |
| 389 | + filename = 'svwiki-latest-stub-meta-history.xml' |
| 390 | + #parse_dumpfile(project, filename, language_code) |
| 391 | + launcher() |
| 392 | + |
307 | 393 | if __name__ == '__main__': |
308 | | - project = 'wiki' |
309 | | - language_code = 'en' |
310 | | - parse_dumpfile(project, language_code) |
| 394 | + debug() |