Index: trunk/tools/editor_trends/manage.py |
— | — | @@ -34,7 +34,7 @@ |
35 | 35 | from utils import timer |
36 | 36 | from database import db |
37 | 37 | from etl import downloader |
38 | | -from etl import extracter |
| 38 | +from etl import enricher |
39 | 39 | from etl import store |
40 | 40 | from etl import sort |
41 | 41 | from etl import transformer |
— | — | @@ -187,7 +187,6 @@ |
188 | 188 | %s' % ''.join([f + ',\n' for f in rts.file_choices]), |
189 | 189 | default='stub-meta-history.xml.gz') |
190 | 190 | |
191 | | - |
192 | 191 | return project, language, parser |
193 | 192 | |
194 | 193 | |
— | — | @@ -236,22 +235,20 @@ |
237 | 236 | rts.input_location = config.get('file_locations', 'input_location') |
238 | 237 | |
239 | 238 | |
240 | | - |
241 | | - |
242 | | -def downloader_launcher(properties, logger): |
| 239 | +def downloader_launcher(rts, logger): |
243 | 240 | ''' |
244 | 241 | This launcher calls the dump downloader to download a Wikimedia dump file. |
245 | 242 | ''' |
246 | 243 | print 'Start downloading' |
247 | 244 | stopwatch = timer.Timer() |
248 | | - log.log_to_mongo(properties, 'dataset', 'download', stopwatch, event='start') |
249 | | - res = downloader.launcher(properties, logger) |
| 245 | + log.log_to_mongo(rts, 'dataset', 'download', stopwatch, event='start') |
| 246 | + res = downloader.launcher(rts, logger) |
250 | 247 | stopwatch.elapsed() |
251 | | - log.log_to_mongo(properties, 'dataset', 'download', stopwatch, event='finish') |
| 248 | + log.log_to_mongo(rts, 'dataset', 'download', stopwatch, event='finish') |
252 | 249 | return res |
253 | 250 | |
254 | 251 | |
255 | | -def extract_launcher(properties, logger): |
| 252 | +def extract_launcher(rts, logger): |
256 | 253 | ''' |
257 | 254 | The extract launcher is used to extract the required variables from a dump |
258 | 255 | file. If the zip file is a known archive then it will first launch the |
— | — | @@ -259,10 +256,10 @@ |
260 | 257 | ''' |
261 | 258 | print 'Extracting data from XML' |
262 | 259 | stopwatch = timer.Timer() |
263 | | - log.log_to_mongo(properties, 'dataset', 'extract', stopwatch, event='start') |
264 | | - extracter.launcher(properties) |
| 260 | + log.log_to_mongo(rts, 'dataset', 'extract', stopwatch, event='start') |
| 261 | + enricher.launcher(rts) |
265 | 262 | stopwatch.elapsed() |
266 | | - log.log_to_mongo(properties, 'dataset', 'extract', stopwatch, event='finish') |
| 263 | + log.log_to_mongo(rts, 'dataset', 'extract', stopwatch, event='finish') |
267 | 264 | |
268 | 265 | |
269 | 266 | def sort_launcher(rts, logger): |
Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -18,21 +18,20 @@ |
19 | 19 | __version__ = '0.1' |
20 | 20 | |
21 | 21 | |
22 | | -import bz2 |
23 | 22 | import os |
24 | 23 | import hashlib |
25 | 24 | import codecs |
26 | 25 | import sys |
27 | 26 | import datetime |
28 | 27 | import progressbar |
29 | | -from multiprocessing import JoinableQueue, Process, cpu_count, current_process |
| 28 | +from multiprocessing import JoinableQueue, Process, cpu_count, current_process, RLock |
30 | 29 | from xml.etree.cElementTree import iterparse, dump |
31 | 30 | from collections import deque |
32 | 31 | |
| 32 | + |
33 | 33 | if '..' not in sys.path: |
34 | 34 | sys.path.append('..') |
35 | 35 | |
36 | | - |
37 | 36 | try: |
38 | 37 | from database import cassandra |
39 | 38 | import pycassa |
— | — | @@ -44,7 +43,6 @@ |
45 | 44 | from database import db |
46 | 45 | from bots import detector |
47 | 46 | from utils import file_utils |
48 | | -import extracter |
49 | 47 | |
50 | 48 | NAMESPACE = { |
51 | 49 | #0:'Main', |
— | — | @@ -84,12 +82,13 @@ |
85 | 83 | |
86 | 84 | |
87 | 85 | class Buffer: |
88 | | - def __init__(self, storage, id): |
| 86 | + def __init__(self, storage, id, rts=None, filehandles=None, locks=None): |
89 | 87 | assert storage == 'cassandra' or storage == 'mongo' or storage == 'csv', \ |
90 | 88 | 'Valid storage options are cassandra and mongo.' |
91 | 89 | self.storage = storage |
92 | 90 | self.revisions = {} |
93 | 91 | self.comments = {} |
| 92 | + self.titles = {} |
94 | 93 | self.id = id |
95 | 94 | self.keyspace_name = 'enwiki' |
96 | 95 | self.keys = ['revision_id', 'article_id', 'id', 'namespace', |
— | — | @@ -97,6 +96,12 @@ |
98 | 97 | 'cur_size', 'delta'] |
99 | 98 | self.setup_storage() |
100 | 99 | self.stats = Statistics() |
| 100 | + if storage == 'csv': |
| 101 | + self.rts = rts |
| 102 | + self.lock1 = locks[0] #lock for generic data |
| 103 | + self.lock2 = locks[1] #lock for comment data |
| 104 | + self.lock3 = locks[2] #lock for article titles |
| 105 | + self.filehandles = filehandles |
101 | 106 | |
102 | 107 | def setup_storage(self): |
103 | 108 | if self.storage == 'cassandra': |
— | — | @@ -108,19 +113,43 @@ |
109 | 114 | self.collection = self.db['kaggle'] |
110 | 115 | |
111 | 116 | else: |
112 | | - kaggle_file = 'kaggle_%s.csv' % self.id |
113 | | - comment_file = 'kaggle_comments_%s.csv' % self.id |
114 | | - file_utils.delete_file('', kaggle_file, directory=False) |
| 117 | + title_file = 'titles.csv' |
| 118 | + comment_file = 'comments.csv' |
| 119 | + file_utils.delete_file('', title_file, directory=False) |
115 | 120 | file_utils.delete_file('', comment_file, directory=False) |
116 | | - self.fh_main = codecs.open(kaggle_file, 'a', 'utf-8') |
117 | | - self.fh_extra = codecs.open(comment_file, 'a', 'utf-8') |
| 121 | + self.fh_titles = codecs.open(title_file, 'a', 'utf-8') |
| 122 | + self.fh_comments = codecs.open(comment_file, 'a', 'utf-8') |
118 | 123 | |
| 124 | + def get_hash(self, id): |
| 125 | + ''' |
| 126 | + A very simple hash function based on modulo. The except clause has been |
| 127 | + added because there are instances where the username is stored in userid |
| 128 | + tag and hence that's a string and not an integer. |
| 129 | + ''' |
| 130 | + try: |
| 131 | + return int(id) % self.rts.max_filehandles |
| 132 | + except ValueError: |
| 133 | + return sum([ord(i) for i in id]) % self.rts.max_filehandles |
| 134 | + |
| 135 | + def group_observations(self, revisions): |
| 136 | + ''' |
| 137 | + This function groups observation by editor id, this way we have to make |
| 138 | + fewer fileopening calls. |
| 139 | + ''' |
| 140 | + data = {} |
| 141 | + for revision in revisions: |
| 142 | + id = revision[0] |
| 143 | + if id not in data: |
| 144 | + data[id] = [] |
| 145 | + data[id].append(revision) |
| 146 | + self.revisions = data |
| 147 | + |
119 | 148 | def add(self, revision): |
120 | 149 | self.stringify(revision) |
121 | 150 | id = revision['revision_id'] |
122 | 151 | self.revisions[id] = revision |
123 | 152 | if len(self.revisions) > 10000: |
124 | | - print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions)) |
| 153 | + #print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions)) |
125 | 154 | self.store() |
126 | 155 | self.clear() |
127 | 156 | |
— | — | @@ -142,6 +171,7 @@ |
143 | 172 | def clear(self): |
144 | 173 | self.revisions = {} |
145 | 174 | self.comments = {} |
| 175 | + self.titles = {} |
146 | 176 | |
147 | 177 | def store(self): |
148 | 178 | if self.storage == 'cassandra': |
— | — | @@ -149,21 +179,59 @@ |
150 | 180 | elif self.storage == 'mongo': |
151 | 181 | print 'insert into mongo' |
152 | 182 | else: |
153 | | - for revision in self.revisions.itervalues(): |
| 183 | + rows = [] |
| 184 | + for id, revision in self.revisions.iteritems(): |
154 | 185 | values = [] |
155 | 186 | for key in self.keys: |
156 | 187 | values.append(revision[key].decode('utf-8')) |
| 188 | + values.insert(0, id) |
| 189 | + rows.append(values) |
| 190 | + self.write_output(rows) |
157 | 191 | |
158 | | - value = '\t'.join(values) + '\n' |
159 | | - row = '\t'.join([key, value]) |
160 | | - self.fh_main.write(row) |
| 192 | + if self.comments: |
| 193 | + self.lock2.acquire() |
| 194 | + try: |
| 195 | + for revision_id, comment in self.comments.iteritems(): |
| 196 | + comment = comment.decode('utf-8') |
| 197 | + row = '\t'.join([revision_id, comment]) + '\n' |
| 198 | + file_utils.write_list_to_csv(row, fh_comments, lock=self.lock2) |
| 199 | + except: |
| 200 | + pass |
| 201 | + finally: |
| 202 | + self.lock2.release() |
161 | 203 | |
162 | | - for revision_id, comment in self.comments.iteritems(): |
163 | | - comment = comment.decode('utf-8') |
164 | | - row = '\t'.join([revision_id, comment]) + '\n' |
165 | | - self.fh_extra.write(row) |
| 204 | + elif self.titles: |
| 205 | + self.lock3.acquire() |
| 206 | + try: |
| 207 | + for article_id, title in self.titles.iteritems(): |
| 208 | + title = title.decode('utf-8') |
| 209 | + row = '\t'.join([article_id, title]) + '\n' |
| 210 | + file_utils.write_list_to_csv(row, fh_titles, lock=self.lock3) |
| 211 | + except: |
| 212 | + pass |
| 213 | + finally: |
| 214 | + self.lock3.release() |
166 | 215 | |
167 | 216 | |
| 217 | + def write_output(self, data): |
| 218 | + self.group_observations(data) |
| 219 | + for editor in self.revisions: |
| 220 | + #lock the write around all edits of an editor for a particular page |
| 221 | + self.lock1.acquire() |
| 222 | + try: |
| 223 | + for i, revision in enumerate(self.revisions[editor]): |
| 224 | + if i == 0: |
| 225 | + id = self.get_hash(revision[2]) |
| 226 | + fh = self.filehandles[id] |
| 227 | + try: |
| 228 | + file_utils.write_list_to_csv(revision, fh, lock=self.lock1) |
| 229 | + except Exception, error: |
| 230 | + print 'Encountered the following error while writing data to %s: %s' % (error, fh) |
| 231 | + finally: |
| 232 | + self.lock1.release() |
| 233 | + |
| 234 | + |
| 235 | + |
168 | 236 | def extract_categories(): |
169 | 237 | ''' |
170 | 238 | Field 1: page id |
— | — | @@ -192,6 +260,34 @@ |
193 | 261 | fh.close() |
194 | 262 | |
195 | 263 | |
| 264 | +def validate_hostname(address): |
| 265 | + ''' |
| 266 | + This is not a foolproof solution at all. The problem is that it's really |
| 267 | + hard to determine whether a string is a hostname or not **reliably**. This |
| 268 | + is a very fast rule of thumb. Will lead to false positives, |
| 269 | + but that's life :) |
| 270 | + ''' |
| 271 | + parts = address.split(".") |
| 272 | + if len(parts) > 2: |
| 273 | + return True |
| 274 | + else: |
| 275 | + return False |
| 276 | + |
| 277 | + |
| 278 | +def validate_ip(address): |
| 279 | + parts = address.split(".") |
| 280 | + if len(parts) != 4: |
| 281 | + return False |
| 282 | + parts = parts[:3] |
| 283 | + for item in parts: |
| 284 | + try: |
| 285 | + if not 0 <= int(item) <= 255: |
| 286 | + return False |
| 287 | + except ValueError: |
| 288 | + return False |
| 289 | + return True |
| 290 | + |
| 291 | + |
196 | 292 | def extract_revision_text(revision): |
197 | 293 | rev = revision.find('ns0:text') |
198 | 294 | if rev != None: |
— | — | @@ -202,22 +298,22 @@ |
203 | 299 | return '' |
204 | 300 | |
205 | 301 | |
206 | | -def extract_username(contributor): |
207 | | - contributor = contributor.find('ns0:username') |
| 302 | +def extract_username(contributor, xml_namespace): |
| 303 | + contributor = contributor.find('%s%s' % (xml_namespace, 'username')) |
208 | 304 | if contributor != None: |
209 | 305 | return contributor.text |
210 | 306 | else: |
211 | 307 | return None |
212 | 308 | |
213 | 309 | |
214 | | -def determine_username_is_bot(contributor, bots): |
| 310 | +def determine_username_is_bot(contributor, bots, xml_namespace): |
215 | 311 | ''' |
216 | 312 | #contributor is an xml element containing the id of the contributor |
217 | 313 | @bots should have a dict with all the bot ids and bot names |
218 | 314 | @Return False if username id is not in bot dict id or True if username id |
219 | 315 | is a bot id. |
220 | 316 | ''' |
221 | | - username = contributor.find('ns0:username') |
| 317 | + username = contributor.find('%s%s' % (xml_namespace, 'username')) |
222 | 318 | if username == None: |
223 | 319 | return 0 |
224 | 320 | else: |
— | — | @@ -227,23 +323,23 @@ |
228 | 324 | return 0 |
229 | 325 | |
230 | 326 | |
231 | | -def extract_contributor_id(contributor): |
| 327 | +def extract_contributor_id(revision, xml_namespace): |
232 | 328 | ''' |
233 | 329 | @contributor is the xml contributor node containing a number of attributes |
234 | 330 | Currently, we are only interested in registered contributors, hence we |
235 | 331 | ignore anonymous editors. |
236 | 332 | ''' |
237 | | - if contributor.get('deleted'): |
| 333 | + if revision.get('deleted'): |
238 | 334 | # ASK: Not sure if this is the best way to code deleted contributors. |
239 | 335 | return None |
240 | | - elem = contributor.find('ns0:id') |
| 336 | + elem = revision.find('%s%s' % (xml_namespace, 'id')) |
241 | 337 | if elem != None: |
242 | 338 | return {'id':elem.text} |
243 | 339 | else: |
244 | | - elem = contributor.find('ns0:ip') |
245 | | - if elem != None and elem.text != None \ |
246 | | - and validate_ip(elem.text) == False \ |
247 | | - and validate_hostname(elem.text) == False: |
| 340 | + elem = revision.find('%s%s' % (xml_namespace, 'ip')) |
| 341 | + if elem == None or elem.text == None: |
| 342 | + return None |
| 343 | + elif validate_ip(elem.text) == False and validate_hostname(elem.text) == False: |
248 | 344 | return {'username':elem.text, 'id': elem.text} |
249 | 345 | else: |
250 | 346 | return None |
— | — | @@ -280,11 +376,11 @@ |
281 | 377 | return size |
282 | 378 | |
283 | 379 | |
284 | | -def parse_contributor(contributor, bots): |
285 | | - username = extract_username(contributor) |
286 | | - user_id = extract_contributor_id(contributor) |
287 | | - bot = determine_username_is_bot(contributor, bots) |
288 | | - contributor.clear() |
| 380 | +def parse_contributor(revision, bots, xml_namespace): |
| 381 | + username = extract_username(revision, xml_namespace) |
| 382 | + user_id = extract_contributor_id(revision, xml_namespace) |
| 383 | + bot = determine_username_is_bot(revision, bots, xml_namespace) |
| 384 | + revision.clear() |
289 | 385 | editor = {} |
290 | 386 | editor['username'] = username |
291 | 387 | editor['bot'] = bot |
— | — | @@ -333,6 +429,13 @@ |
334 | 430 | return revert |
335 | 431 | |
336 | 432 | |
| 433 | +def extract_revision_id(revision_id): |
| 434 | + if revision_id != None: |
| 435 | + return revision_id.text |
| 436 | + else: |
| 437 | + return None |
| 438 | + |
| 439 | + |
337 | 440 | def extract_comment_text(revision_id, revision): |
338 | 441 | comment = {} |
339 | 442 | text = revision.find('comment') |
— | — | @@ -341,11 +444,10 @@ |
342 | 445 | return comment |
343 | 446 | |
344 | 447 | |
345 | | -def count_edits(article, counts, bots): |
| 448 | +def count_edits(article, counts, bots, xml_namespace): |
346 | 449 | namespaces = {} |
347 | 450 | title = article['title'].text |
348 | 451 | namespace = determine_namespace(title, namespaces) |
349 | | - xml_namespace = '{http://www.mediawiki.org/xml/export-0.4/}' |
350 | 452 | if namespace != False: |
351 | 453 | article_id = article['id'].text |
352 | 454 | revisions = article['revisions'] |
— | — | @@ -353,6 +455,7 @@ |
354 | 456 | if revision == None: |
355 | 457 | #the entire revision is empty, weird. |
356 | 458 | continue |
| 459 | + |
357 | 460 | contributor = revision.find('%s%s' % (xml_namespace, 'contributor')) |
358 | 461 | contributor = parse_contributor(contributor, bots) |
359 | 462 | if not contributor: |
— | — | @@ -366,12 +469,12 @@ |
367 | 470 | return counts |
368 | 471 | |
369 | 472 | |
370 | | -def create_variables(article, cache, bots): |
| 473 | +def create_variables(article, cache, bots, xml_namespace): |
371 | 474 | namespaces = {'User': 2, |
372 | 475 | 'Talk': 1, |
373 | 476 | 'User Talk': 3, |
374 | 477 | } |
375 | | - title = article['title'] |
| 478 | + title = article['title'].text |
376 | 479 | namespace = determine_namespace(title, namespaces) |
377 | 480 | |
378 | 481 | if namespace != False: |
— | — | @@ -386,14 +489,15 @@ |
387 | 490 | if revision == None: |
388 | 491 | #the entire revision is empty, weird. |
389 | 492 | continue |
390 | | - contributor = revision.find('ns0:contributor') |
391 | | - contributor = parse_contributor(contributor, bots) |
| 493 | + #dump(revision) |
| 494 | + contributor = revision.find('%s%s' % (xml_namespace, 'contributor')) |
| 495 | + contributor = parse_contributor(contributor, bots, xml_namespace) |
392 | 496 | if not contributor: |
393 | 497 | #editor is anonymous, ignore |
394 | 498 | continue |
395 | 499 | |
396 | | - revision_id = revision.find('ns0:id') |
397 | | - revision_id = extracter.extract_revision_id(revision_id) |
| 500 | + revision_id = revision.find('%s%s' % (xml_namespace, 'id')) |
| 501 | + revision_id = extract_revision_id(revision_id) |
398 | 502 | if revision_id == None: |
399 | 503 | #revision_id is missing, which is weird |
400 | 504 | continue |
— | — | @@ -406,7 +510,7 @@ |
407 | 511 | comment = extract_comment_text(revision_id, revision) |
408 | 512 | cache.comments.update(comment) |
409 | 513 | |
410 | | - timestamp = revision.find('ns0:timestamp').text |
| 514 | + timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text |
411 | 515 | row['timestamp'] = timestamp |
412 | 516 | |
413 | 517 | hash = create_md5hash(text) |
— | — | @@ -417,43 +521,55 @@ |
418 | 522 | row.update(hash) |
419 | 523 | row.update(size) |
420 | 524 | row.update(revert) |
| 525 | + cache.add(row) |
421 | 526 | revision.clear() |
422 | | - cache.add(row) |
423 | 527 | |
424 | 528 | |
425 | | -def parse_xml(fh): |
| 529 | + |
| 530 | +def parse_xml(fh, xml_namespace): |
426 | 531 | context = iterparse(fh, events=('end',)) |
427 | 532 | context = iter(context) |
428 | 533 | |
429 | 534 | article = {} |
430 | 535 | article['revisions'] = [] |
431 | 536 | id = False |
432 | | - namespace = '{http://www.mediawiki.org/xml/export-0.4/}' |
433 | 537 | |
434 | 538 | for event, elem in context: |
435 | | - if event == 'end' and elem.tag == '%s%s' % (namespace, 'title'): |
| 539 | + if event == 'end' and elem.tag == '%s%s' % (xml_namespace, 'title'): |
436 | 540 | article['title'] = elem |
437 | | - elif event == 'end' and elem.tag == '%s%s' % (namespace, 'revision'): |
| 541 | + elif event == 'end' and elem.tag == '%s%s' % (xml_namespace, 'revision'): |
438 | 542 | article['revisions'].append(elem) |
439 | | - elif event == 'end' and elem.tag == '%s%s' % (namespace, 'id') and id == False: |
| 543 | + elif event == 'end' and elem.tag == '%s%s' % (xml_namespace, 'id') and id == False: |
440 | 544 | article['id'] = elem |
441 | 545 | id = True |
442 | | - elif event == 'end' and elem.tag == '%s%s' % (namespace, 'page'): |
| 546 | + elif event == 'end' and elem.tag == '%s%s' % (xml_namespace, 'page'): |
443 | 547 | yield article |
444 | 548 | elem.clear() |
445 | 549 | article = {} |
446 | 550 | article['revisions'] = [] |
447 | 551 | id = False |
448 | | - elif event == 'end': |
449 | | - elem.clear() |
| 552 | + #elif event == 'end': |
| 553 | + # elem.clear() |
450 | 554 | |
451 | 555 | |
452 | | -def stream_raw_xml(input_queue, storage, id, function, dataset): |
| 556 | +def delayediter(iterable): |
| 557 | + iterable = iter(iterable) |
| 558 | + prev = iterable.next() |
| 559 | + for item in iterable: |
| 560 | + yield prev |
| 561 | + prev = item |
| 562 | + yield prev |
| 563 | + |
| 564 | +def stream_raw_xml(input_queue, storage, id, function, dataset, locks, rts): |
453 | 565 | bots = detector.retrieve_bots('en') |
| 566 | + xml_namespace = '{http://www.mediawiki.org/xml/export-0.4/}' |
| 567 | + path = os.path.join(rts.location, 'txt') |
| 568 | + filehandles = [file_utils.create_txt_filehandle(path, '%s.csv' % fh, 'a', |
| 569 | + rts.encoding) for fh in xrange(rts.max_filehandles)] |
454 | 570 | t0 = datetime.datetime.now() |
455 | 571 | i = 0 |
456 | 572 | if dataset == 'training': |
457 | | - cache = Buffer(storage, id) |
| 573 | + cache = Buffer(storage, id, rts, filehandles, locks) |
458 | 574 | else: |
459 | 575 | counts = {} |
460 | 576 | |
— | — | @@ -463,12 +579,12 @@ |
464 | 580 | if filename == None: |
465 | 581 | break |
466 | 582 | |
467 | | - fh = bz2.BZ2File(filename, 'rb') |
468 | | - for article in parse_xml(fh): |
| 583 | + fh = file_utils.create_streaming_buffer(filename) |
| 584 | + for article in parse_xml(fh, xml_namespace): |
469 | 585 | if dataset == 'training': |
470 | | - function(article, cache, bots) |
471 | | - else: |
472 | | - counts = function(article, counts, bots) |
| 586 | + function(article, cache, bots, xml_namespace) |
| 587 | + elif dataset == 'prediction': |
| 588 | + counts = function(article, counts, bots, xml_namespace) |
473 | 589 | i += 1 |
474 | 590 | if i % 10000 == 0: |
475 | 591 | print 'Worker %s parsed %s articles' % (id, i) |
— | — | @@ -488,19 +604,27 @@ |
489 | 605 | file_utils.store_object(counts, location, filename) |
490 | 606 | |
491 | 607 | |
492 | | -def setup(storage): |
| 608 | +def setup(storage, rts=None): |
493 | 609 | keyspace_name = 'enwiki' |
494 | 610 | if storage == 'cassandra': |
495 | 611 | cassandra.install_schema(keyspace_name, drop_first=True) |
| 612 | + elif storage == 'csv': |
| 613 | + output_articles = os.path.join(rts.input_location, rts.language.code, |
| 614 | + rts.project.name) |
| 615 | + output_txt = os.path.join(rts.input_location, rts.language.code, |
| 616 | + rts.project.name, 'txt') |
| 617 | + res = file_utils.delete_file(output_articles, 'articles.csv') |
| 618 | + res = file_utils.delete_file(output_txt, None, directory=True) |
| 619 | + if res: |
| 620 | + res = file_utils.create_directory(output_txt) |
496 | 621 | |
497 | 622 | |
498 | | -def launcher(function, path, dataset, storage, processors): |
499 | | - setup(storage) |
| 623 | +def multiprocessor_launcher(function, path, dataset, storage, processors, extension, locks=None, rts=None): |
500 | 624 | input_queue = JoinableQueue() |
501 | 625 | #files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2'] |
502 | 626 | #files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2'] |
503 | 627 | |
504 | | - files = file_utils.retrieve_file_list(path, 'bz2') |
| 628 | + files = file_utils.retrieve_file_list(path, extension) |
505 | 629 | |
506 | 630 | for file in files: |
507 | 631 | filename = os.path.join(path, file) |
— | — | @@ -510,7 +634,7 @@ |
511 | 635 | for x in xrange(processors): |
512 | 636 | input_queue.put(None) |
513 | 637 | |
514 | | - extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, id, function, dataset]) |
| 638 | + extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, id, function, dataset, locks, rts]) |
515 | 639 | for id in xrange(processors)] |
516 | 640 | for extracter in extracters: |
517 | 641 | extracter.start() |
— | — | @@ -534,19 +658,40 @@ |
535 | 659 | storage = 'csv' |
536 | 660 | dataset = 'training' |
537 | 661 | processors = cpu_count() |
538 | | - launcher(function, path, dataset, storage, processors) |
| 662 | + extension = 'bz2' |
| 663 | + setup(storage) |
| 664 | + multiprocessor_launcher(function, path, dataset, storage, processors, extension) |
539 | 665 | |
540 | 666 | |
541 | 667 | def launcher_prediction(): |
542 | 668 | # launcher for creating test data |
543 | | - path = '/media/wikipedia_dumps/batch1/' |
| 669 | + path = '/mnt/wikipedia_dumps/batch1/' |
544 | 670 | function = count_edits |
545 | 671 | storage = 'csv' |
546 | 672 | dataset = 'prediction' |
547 | 673 | processors = 7 |
548 | | - launcher(function, path, dataset, storage, processors) |
| 674 | + extension = 'bz2' |
| 675 | + setup(storage) |
| 676 | + multiprocessor_launcher(function, path, dataset, storage, processors, extension) |
549 | 677 | |
550 | 678 | |
| 679 | +def launcher(rts): |
| 680 | + # launcher for creating regular mongo dataset |
| 681 | + path = rts.location |
| 682 | + function = create_variables |
| 683 | + storage = 'csv' |
| 684 | + dataset = 'training' |
| 685 | + processors = 1 |
| 686 | + extension = 'gz' |
| 687 | + lock1 = RLock() |
| 688 | + lock2 = RLock() |
| 689 | + lock3 = RLock() |
| 690 | + locks = [lock1, lock2, lock3] |
| 691 | + setup(storage, rts) |
| 692 | + multiprocessor_launcher(function, path, dataset, storage, processors, extension, locks, rts) |
| 693 | + |
| 694 | + |
551 | 695 | if __name__ == '__main__': |
552 | 696 | #launcher_training() |
553 | | - launcher_prediction() |
| 697 | + #launcher_prediction() |
| 698 | + launcher() |
Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -188,10 +188,21 @@ |
189 | 189 | |
190 | 190 | def parse_title(title): |
191 | 191 | title_data = {} |
192 | | - if type(title.text) == type('str'): |
193 | | - title_data['title'] = title.text.decode('utf-8') |
| 192 | + t1 = type(title.text) |
| 193 | + if type(title.text) != type('str'): |
| 194 | + print 'encodign' |
| 195 | + print title.text.encode('utf-8') |
| 196 | + title = title.text.encode('utf-8') |
| 197 | + title = title.decode('utf-8', 'ignore') |
| 198 | + print title |
194 | 199 | else: |
195 | | - title_data['title'] = title.text |
| 200 | + title = title.text |
| 201 | + |
| 202 | + #title = title.encode('utf-8') |
| 203 | + title_data['title'] = title #.decode('utf-8') |
| 204 | + t2 = type(title_data['title']) |
| 205 | + print t1, t2 |
| 206 | + #title_data['title'] = title |
196 | 207 | if title_data['title'].startswith('List of'): |
197 | 208 | title_data['list'] = True |
198 | 209 | else: |
— | — | @@ -268,7 +279,6 @@ |
269 | 280 | widgets = log.init_progressbar_widgets('Extracting data') |
270 | 281 | filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 'a', |
271 | 282 | rts.encoding) for fh in xrange(rts.max_filehandles)] |
272 | | - |
273 | 283 | while True: |
274 | 284 | total, processed = 0.0, 0.0 |
275 | 285 | try: |
— | — | @@ -286,7 +296,7 @@ |
287 | 297 | filesize = file_utils.determine_filesize(location, filename) |
288 | 298 | print 'Opening %s...' % (os.path.join(location, filename)) |
289 | 299 | print 'Filesize: %s' % filesize |
290 | | - fh1 = file_utils.create_txt_filehandle(location, filename, 'r', rts.encoding) |
| 300 | + fh1 = file_utils.create_txt_filehandle(location, filename, 'r', 'ascii') |
291 | 301 | fh2 = file_utils.create_txt_filehandle(location, 'articles.csv', 'a', rts.encoding) |
292 | 302 | ns, xml_namespace = wikitree.parser.extract_meta_information(fh1) |
293 | 303 | ns = build_namespaces_locale(ns, rts.namespaces) |
— | — | @@ -305,7 +315,7 @@ |
306 | 316 | output = output_editor_information(revisions, article_id, bot_ids, rts) |
307 | 317 | output = add_namespace_to_output(output, namespace) |
308 | 318 | write_output(output, filehandles, lock, rts) |
309 | | - file_utils.write_list_to_csv([article_id, title.values()], fh2) |
| 319 | + #file_utils.write_list_to_csv([article_id, title.values()], fh2, newline=False, lock=lock) |
310 | 320 | processed += 1 |
311 | 321 | page.clear() |
312 | 322 | pbar.update(pbar.currval + article_size) |
— | — | @@ -366,11 +376,15 @@ |
367 | 377 | return sum([ord(i) for i in id]) % rts.max_filehandles |
368 | 378 | |
369 | 379 | |
370 | | -def prepare(output): |
371 | | - res = file_utils.delete_file(output, 'articles.csv') |
372 | | - res = file_utils.delete_file(output, None, directory=True) |
| 380 | +def prepare(rts): |
| 381 | + output_articles = os.path.join(rts.input_location, rts.language.code, |
| 382 | + rts.project.name) |
| 383 | + output_txt = os.path.join(rts.input_location, rts.language.code, |
| 384 | + rts.project.name, 'txt') |
| 385 | + res = file_utils.delete_file(output_articles, 'articles.csv') |
| 386 | + res = file_utils.delete_file(output_txt, None, directory=True) |
373 | 387 | if res: |
374 | | - res = file_utils.create_directory(output) |
| 388 | + res = file_utils.create_directory(output_txt) |
375 | 389 | return res |
376 | 390 | |
377 | 391 | |
— | — | @@ -415,9 +429,7 @@ |
416 | 430 | if not tasks: |
417 | 431 | return False |
418 | 432 | |
419 | | - output = os.path.join(rts.input_location, rts.language.code, |
420 | | - rts.project.name, 'txt') |
421 | | - result = prepare(output) |
| 433 | + result = prepare(rts) |
422 | 434 | if not result: |
423 | 435 | return result |
424 | 436 | |
— | — | @@ -448,10 +460,11 @@ |
449 | 461 | |
450 | 462 | |
451 | 463 | def debug(): |
452 | | - project = 'wiki' |
453 | | - language_code = 'sv' |
454 | | - filename = 'svwiki-latest-stub-meta-history.xml' |
455 | | - parse_dumpfile(project, filename, language_code) |
| 464 | + pass |
| 465 | + #project = 'wiki' |
| 466 | + #language_code = 'sv' |
| 467 | + #filename = 'svwiki-latest-stub-meta-history.xml' |
| 468 | + #parse_dumpfile(project, filename, language_code) |
456 | 469 | |
457 | 470 | |
458 | 471 | if __name__ == '__main__': |
Index: trunk/tools/editor_trends/utils/file_utils.py |
— | — | @@ -23,6 +23,8 @@ |
24 | 24 | and track error messages. |
25 | 25 | ''' |
26 | 26 | |
| 27 | +import bz2 |
| 28 | +import gzip |
27 | 29 | import re |
28 | 30 | import htmlentitydefs |
29 | 31 | import time |
— | — | @@ -142,8 +144,6 @@ |
143 | 145 | The calling function is responsible for: |
144 | 146 | 1) closing the filehandle |
145 | 147 | ''' |
146 | | - |
147 | | - |
148 | 148 | tab = False |
149 | 149 | wrote_newline = None |
150 | 150 | if recursive: |
— | — | @@ -207,6 +207,16 @@ |
208 | 208 | return codecs.open(path, mode, encoding=encoding) |
209 | 209 | |
210 | 210 | |
| 211 | +def create_streaming_buffer(path): |
| 212 | + extension = determine_file_extension(path) |
| 213 | + if extension == 'gz': |
| 214 | + fh = gzip.GzipFile(path, 'rb') |
| 215 | + elif extension == 'bz': |
| 216 | + fh = bz2.BZ2File(path, 'rb') |
| 217 | + else: |
| 218 | + raise exceptions.CompressedFileNotSupported(extension) |
| 219 | + return fh |
| 220 | + |
211 | 221 | def create_binary_filehandle(location, filename, mode): |
212 | 222 | path = os.path.join(location, filename) |
213 | 223 | return open(path, mode) |