Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -23,12 +23,64 @@ |
24 | 24 | import hashlib |
25 | 25 | import codecs |
26 | 26 | import re |
27 | | -from multiprocessing import JoinableQueue, Process |
28 | | -#from xml.etree.cElementTree. import iterparse |
29 | | -from xml.etree.cElementTree import fromstring |
| 27 | +import sys |
| 28 | +import progressbar |
| 29 | +from multiprocessing import JoinableQueue, Process, cpu_count |
| 30 | +from xml.etree.cElementTree import fromstring, dump |
| 31 | +from collections import deque |
30 | 32 | |
31 | | -RE_CATEGORY= re.compile('\(.*\`\,\.\-\:\'\)') |
| 33 | +try: |
| 34 | + import pycassa |
| 35 | +except ImportError: |
| 36 | + print 'I am not going to use Cassandra today, it\'s my off day.' |
32 | 37 | |
| 38 | +if '..' not in sys.path: |
| 39 | + sys.path.append('..') |
| 40 | + |
| 41 | +from database import db |
| 42 | +from bots import detector |
| 43 | +from utils import file_utils |
| 44 | +import extracter |
| 45 | + |
| 46 | +try: |
| 47 | + import psyco |
| 48 | + psyco.full() |
| 49 | +except ImportError: |
| 50 | + print 'and psyco is having an off day as well...' |
| 51 | + |
| 52 | +RE_CATEGORY = re.compile('\(.*\`\,\.\-\:\'\)') |
| 53 | + |
| 54 | + |
| 55 | +class Buffer: |
| 56 | + def __init__(self, storage): |
| 57 | + assert storage == 'cassandra' or storage == 'mongo', \ |
| 58 | + 'Valid storage options are cassandra and mongo.' |
| 59 | + self.storage = storage |
| 60 | + self.revisions = [] |
| 61 | + self.setup_db() |
| 62 | + |
| 63 | + def setup_db(self): |
| 64 | + if self.storage == 'cassandra': |
| 65 | + pass |
| 66 | + else: |
| 67 | + self.db = db.init_mongo_db('enwiki') |
| 68 | + self.collection = self.db['kaggle'] |
| 69 | + |
| 70 | + def add(self, revision): |
| 71 | + self.revisions.append(revision) |
| 72 | + if len(self.revisions) == 100: |
| 73 | + self.store() |
| 74 | + |
| 75 | + def empty(self): |
| 76 | + self.store() |
| 77 | + |
| 78 | + def store(self): |
| 79 | + if self.storage == 'cassandra': |
| 80 | + print 'insert into cassandra' |
| 81 | + else: |
| 82 | + print 'insert into mongo' |
| 83 | + |
| 84 | + |
33 | 85 | def extract_categories(): |
34 | 86 | ''' |
35 | 87 | Field 1: page id |
— | — | @@ -36,57 +88,117 @@ |
37 | 89 | Field 3: sort key |
38 | 90 | Field 4: timestamp last change |
39 | 91 | ''' |
40 | | - filename = '/Users/diederik/Downloads/enwiki-20110115-categorylinks.sql' |
| 92 | + filename = 'C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-categorylinks.sql' |
41 | 93 | output = codecs.open('categories.csv', 'w', encoding='utf-8') |
42 | 94 | fh = codecs.open(filename, 'r', encoding='utf-8') |
43 | | - |
44 | | - for line in fh: |
45 | | - if line.startswith('INSERT INTO `categorylinks` VALUES ('): |
46 | | - line = line.replace('INSERT INTO `categorylinks` VALUES (','') |
47 | | - line = line.replace("'",'') |
48 | | - categories = line.split('),(') |
49 | | - for category in categories: |
50 | | - category = category.split(',') |
51 | | - if len(category) ==4: |
52 | | - output.write('%s\t%s\n' % (category[0], category[1])) |
53 | | - |
| 95 | + |
| 96 | + try: |
| 97 | + for line in fh: |
| 98 | + if line.startswith('INSERT INTO `categorylinks` VALUES ('): |
| 99 | + line = line.replace('INSERT INTO `categorylinks` VALUES (', '') |
| 100 | + line = line.replace("'", '') |
| 101 | + categories = line.split('),(') |
| 102 | + for category in categories: |
| 103 | + category = category.split(',') |
| 104 | + if len(category) == 4: |
| 105 | + output.write('%s\t%s\n' % (category[0], category[1])) |
| 106 | + except UnicodeDecodeError, e: |
| 107 | + print e |
| 108 | + |
54 | 109 | output.close() |
55 | 110 | fh.close() |
56 | 111 | |
| 112 | + |
57 | 113 | def extract_revision_text(revision): |
58 | 114 | rev = revision.find('text') |
59 | 115 | if rev != None: |
60 | 116 | return rev.text.encode('utf-8') |
61 | 117 | else: |
62 | 118 | return None |
63 | | - |
| 119 | + |
| 120 | + |
64 | 121 | def create_md5hash(revision): |
65 | | - if revision == None: |
66 | | - return False |
67 | 122 | rev = extract_revision_text(revision) |
| 123 | + hash = {} |
68 | 124 | if rev != None: |
69 | 125 | m = hashlib.md5() |
70 | 126 | m.update(rev) |
71 | 127 | #echo m.digest() |
72 | | - return m.hexdigest() |
| 128 | + hash['hash'] = m.hexdigest() |
73 | 129 | else: |
74 | | - return False |
| 130 | + hash['hash'] = -1 |
| 131 | + return hash |
75 | 132 | |
76 | | - |
| 133 | + |
77 | 134 | def calculate_delta_article_size(prev_size, revision): |
78 | | - if revision == None: |
79 | | - return False |
80 | | - rev= extract_revision_text(revision) |
81 | | - if rev == None: |
82 | | - return 0, prev_size |
| 135 | + rev = extract_revision_text(revision) |
| 136 | + size = {} |
| 137 | + if prev_size == None: |
| 138 | + size['prev_size'] = 0 |
| 139 | + size['cur_size'] = len(rev) |
83 | 140 | else: |
| 141 | + size['prev_size'] = prev_size |
84 | 142 | delta = len(rev) - prev_size |
85 | 143 | prev_size = len(rev) |
86 | | - return delta, prev_size |
87 | | - |
| 144 | + size['cur_size'] = delta |
| 145 | + return size |
88 | 146 | |
89 | 147 | |
90 | | -def create_variables(result_queue): |
| 148 | +def parse_contributor(contributor, bots): |
| 149 | + username = extracter.extract_username(contributor) |
| 150 | + user_id = extracter.extract_contributor_id(contributor) |
| 151 | + bot = extracter.determine_username_is_bot(contributor, bots=bots) |
| 152 | + contributor = {} |
| 153 | + contributor['username'] = username |
| 154 | + contributor['bot'] = bot |
| 155 | + if user_id != None: |
| 156 | + contributor.update(user_id) |
| 157 | + else: |
| 158 | + contribuor = False |
| 159 | + return contributor |
| 160 | + |
| 161 | + |
| 162 | +def determine_namespace(title): |
| 163 | + namespaces = {'User': 2, |
| 164 | + 'Talk': 1, |
| 165 | + 'User Talk': 3, |
| 166 | + } |
| 167 | + ns = {} |
| 168 | + if title.text != None: |
| 169 | + title = title.text |
| 170 | + title = title.split(':') |
| 171 | + if len(title) == 1: |
| 172 | + ns['namespace'] = 0 |
| 173 | + elif len(title) == 2: |
| 174 | + if title[0] in namespaces: |
| 175 | + ns['namespace'] = namespaces[title[0]] |
| 176 | + else: |
| 177 | + ns = False #article does not belong to either the main namespace, user, talk or user talk namespace. |
| 178 | + else: |
| 179 | + ns = False |
| 180 | + return ns |
| 181 | + |
| 182 | + |
| 183 | +def prefill_row(title, article_id, namespace): |
| 184 | + row = {} |
| 185 | + row['title'] = title.text |
| 186 | + row['article_id'] = article_id |
| 187 | + row.update(namespace) |
| 188 | + return row |
| 189 | + |
| 190 | + |
| 191 | +def is_revision_reverted(hash_cur, hashes): |
| 192 | + revert = {} |
| 193 | + if hash_cur in hashes: |
| 194 | + revert['revert'] = 1 |
| 195 | + else: |
| 196 | + revert['revert'] = 0 |
| 197 | + return revert |
| 198 | + |
| 199 | + |
| 200 | +def create_variables(result_queue, storage): |
| 201 | + bots = detector.retrieve_bots('en') |
| 202 | + buffer = Buffer(storage) |
91 | 203 | while True: |
92 | 204 | try: |
93 | 205 | article = result_queue.get(block=True) |
— | — | @@ -94,26 +206,56 @@ |
95 | 207 | if article == None: |
96 | 208 | break |
97 | 209 | article = fromstring(article) |
98 | | - prev_size = 0 |
99 | | - revisions = article.findall('revision') |
100 | | - for revision in revisions: |
101 | | - revision_id = revision.find('id').text |
102 | | - hash = create_md5hash(revision) |
103 | | - delta, prev_size = calculate_delta_article_size(prev_size, revision) |
104 | | - print revision_id, hash, delta, prev_size |
| 210 | + title = article.find('title') |
| 211 | + namespace = determine_namespace(title) |
| 212 | + if namespace != False: |
| 213 | + prev_size = None |
| 214 | + revisions = article.findall('revision') |
| 215 | + article_id = article.find('id').text |
| 216 | + hashes = deque(maxlen=100) |
| 217 | + for revision in revisions: |
| 218 | + row = prefill_row(title, article_id, namespace) |
| 219 | + if revision == None: |
| 220 | + continue |
| 221 | + |
| 222 | + contributor = revision.find('contributor') |
| 223 | + contributor = parse_contributor(contributor, bots) |
| 224 | + if not contributor: |
| 225 | + #editor is anonymous, ignore |
| 226 | + continue |
| 227 | + |
| 228 | + row.update(contributor) |
| 229 | + revision_id = revision.find('id') |
| 230 | + revision_id = extracter.extract_revision_id(revision_id) |
| 231 | + row['revision_id'] = revision_id |
| 232 | + |
| 233 | + |
| 234 | + hash = create_md5hash(revision) |
| 235 | + revert = is_revision_reverted(hash['hash'], hashes) |
| 236 | + hashes.append(hash['hash']) |
| 237 | + size = calculate_delta_article_size(prev_size, revision) |
| 238 | + |
| 239 | + row.update(hash) |
| 240 | + row.update(size) |
| 241 | + row.update(revert) |
| 242 | + print row |
| 243 | + # if row['username'] == None: |
| 244 | + # contributor = revision.find('contributor') |
| 245 | + # attrs = contributor.getchildren() |
| 246 | + # for attr in attrs: |
| 247 | + # print attr.text |
| 248 | + #print revision_id, hash, delta, prev_size\ |
| 249 | + |
| 250 | + buffer.add(row) |
| 251 | + |
105 | 252 | except ValueError, e: |
106 | | - pass |
107 | | - #print e |
108 | | - |
| 253 | + print e |
| 254 | + except UnicodeDecodeError, e: |
| 255 | + print e |
| 256 | + buffer.empty() |
109 | 257 | |
110 | 258 | |
111 | 259 | def create_article(input_queue, result_queue): |
112 | | - ''' |
113 | | - This function creates three variables: |
114 | | - 1) a MD5 hash for each revision |
115 | | - 2) the size of the current revision |
116 | | - 3) the delta size of the current revision and the previous revision |
117 | | - ''' |
118 | 260 | buffer = cStringIO.StringIO() |
119 | 261 | parsing = False |
120 | 262 | while True: |
— | — | @@ -121,29 +263,30 @@ |
122 | 264 | input_queue.task_done() |
123 | 265 | if filename == None: |
124 | 266 | break |
125 | | - |
| 267 | + filesize = file_utils.determine_filesize('', filename) |
| 268 | + pbar = progressbar.ProgressBar(maxval=filesize).start() |
126 | 269 | for data in unzip(filename): |
127 | 270 | if data.startswith('<page>'): |
128 | 271 | parsing = True |
129 | | - #print data |
130 | 272 | if parsing: |
131 | 273 | buffer.write(data) |
132 | 274 | if data == '</page>': |
133 | | - xml1 = buffer.getvalue() |
134 | | - #xml1 = xml1.decode('utf-8') |
135 | | - #xml1 = xml1.encode('utf-8') |
136 | | - #xml1 = fromstring(xml1) |
137 | | - if xml1 != None: |
138 | | - result_queue.put(xml1) |
| 275 | + xml_string = buffer.getvalue() |
| 276 | + if xml_string != None: |
| 277 | + result_queue.put(xml_string) |
139 | 278 | buffer = cStringIO.StringIO() |
140 | | - |
141 | | - result_queue.put(None) |
| 279 | + pbar.update(pbar.currval + len(data)) #is inaccurate!!! |
| 280 | + |
| 281 | + |
| 282 | + result_queue.put(None) |
142 | 283 | print 'Finished parsing bz2 archives' |
143 | 284 | |
| 285 | + |
144 | 286 | def unzip(filename): |
145 | 287 | ''' |
146 | | - Filename should be a fully qualified path to the bz2 file that will be decompressed. |
147 | | - It will iterate line by line and yield this back to create_article |
| 288 | + Filename should be a fully qualified path to the bz2 file that will be |
| 289 | + decompressed. It will iterate line by line and yield this back to |
| 290 | + create_article |
148 | 291 | ''' |
149 | 292 | fh = bz2.BZ2File(filename, 'r') |
150 | 293 | for line in fh: |
— | — | @@ -156,27 +299,29 @@ |
157 | 300 | def launcher(): |
158 | 301 | input_queue = JoinableQueue() |
159 | 302 | result_queue = JoinableQueue() |
160 | | - files = ['/Users/diederik/Downloads/enwiki-20110115-pages-articles1.xml.bz2'] |
| 303 | + storage = 'cassandra' |
| 304 | + files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2'] |
161 | 305 | for file in files: |
162 | 306 | input_queue.put(file) |
163 | | - |
164 | | - for x in xrange(2): |
| 307 | + |
| 308 | + for x in xrange(cpu_count): |
165 | 309 | input_queue.put(None) |
166 | | - |
167 | | - extracters = [Process(target=create_article, args=[input_queue, result_queue]) for x in xrange(2)] |
| 310 | + |
| 311 | + extracters = [Process(target=create_article, args=[input_queue, result_queue]) |
| 312 | + for x in xrange(cpu_count)] |
168 | 313 | for extracter in extracters: |
169 | 314 | extracter.start() |
170 | | - |
171 | | - creators = [Process(target=create_variables, args=[result_queue]) for x in xrange(2)] |
| 315 | + |
| 316 | + creators = [Process(target=create_variables, args=[result_queue, storage]) |
| 317 | + for x in xrange(cpu_count)] |
172 | 318 | for creator in creators: |
173 | 319 | creator.start() |
174 | | - |
175 | | - |
| 320 | + |
| 321 | + |
176 | 322 | input_queue.join() |
177 | 323 | result_queue.join() |
178 | | - |
179 | | - |
180 | 324 | |
| 325 | + |
181 | 326 | if __name__ == '__main__': |
182 | | - extract_categories() |
183 | | - #launcher() |
\ No newline at end of file |
| 327 | + #extract_categories() |
| 328 | + launcher() |
Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -153,6 +153,13 @@ |
154 | 154 | return None |
155 | 155 | |
156 | 156 | |
| 157 | +def extract_revision_id(revision_id, **kwargs): |
| 158 | + if revision_id != None: |
| 159 | + return revision_id.text |
| 160 | + else: |
| 161 | + return None |
| 162 | + |
| 163 | + |
157 | 164 | def extract_contributor_id(contributor, **kwargs): |
158 | 165 | ''' |
159 | 166 | @contributor is the xml contributor node containing a number of attributes |
— | — | @@ -184,18 +191,19 @@ |
185 | 192 | the variable tags determines which attributes are being parsed, the values |
186 | 193 | in this dictionary are the functions used to extract the data. |
187 | 194 | ''' |
188 | | - headers = ['id', 'date', 'article', 'username'] |
| 195 | + headers = ['id', 'date', 'article', 'username', 'revision_id'] |
189 | 196 | tags = {'contributor': {'id': extract_contributor_id, |
190 | 197 | 'bot': determine_username_is_bot, |
191 | 198 | 'username': extract_username, |
192 | 199 | }, |
193 | 200 | 'timestamp': {'date': wikitree.parser.extract_text}, |
| 201 | + 'id': {'revision_id': extract_revision_id, |
| 202 | + } |
194 | 203 | } |
195 | 204 | vars = {} |
196 | 205 | flat = [] |
197 | 206 | |
198 | 207 | for x, revision in enumerate(revisions): |
199 | | - #print len(revision.getchildren()) |
200 | 208 | vars[x] = {} |
201 | 209 | vars[x]['article'] = page |
202 | 210 | for tag in tags: |
— | — | @@ -208,7 +216,6 @@ |
209 | 217 | f = tags[tag][function] |
210 | 218 | value = f(el, bots=bots) |
211 | 219 | if isinstance(value, dict): |
212 | | - #if type(value) == type({}): |
213 | 220 | for kw in value: |
214 | 221 | vars[x][kw] = value[kw] |
215 | 222 | else: |
— | — | @@ -237,7 +244,7 @@ |
238 | 245 | return output |
239 | 246 | |
240 | 247 | |
241 | | -def parse_dumpfile(tasks, rts, filehandles, lock): |
| 248 | +def parse_dumpfile(tasks, rts, lock): |
242 | 249 | bot_ids = detector.retrieve_bots(rts.language.code) |
243 | 250 | location = os.path.join(rts.input_location, rts.language.code, rts.project.name) |
244 | 251 | output = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt') |
— | — | @@ -351,6 +358,7 @@ |
352 | 359 | files = file_utils.retrieve_file_list(properties.location, |
353 | 360 | extension, |
354 | 361 | mask=canonical_filename) |
| 362 | + print properties.location |
355 | 363 | print 'Checking if dump file has been extracted...' |
356 | 364 | for fn in files: |
357 | 365 | file_without_ext = fn.replace('%s%s' % ('.', extension), '') |
— | — | @@ -369,7 +377,6 @@ |
370 | 378 | print 'There was an error while extracting %s, please make sure \ |
371 | 379 | that %s is valid archive.' % (fn, fn) |
372 | 380 | return False |
373 | | - print tasks.qsize() |
374 | 381 | return tasks |
375 | 382 | |
376 | 383 | |
— | — | @@ -392,14 +399,10 @@ |
393 | 400 | return result |
394 | 401 | |
395 | 402 | lock = multiprocessing.Lock() |
396 | | -# filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 'a', |
397 | | -# settings.encoding) for fh in xrange(settings.max_filehandles)] |
398 | 403 | |
399 | | - filehandles = [] |
400 | 404 | consumers = [multiprocessing.Process(target=parse_dumpfile, |
401 | 405 | args=(tasks, |
402 | 406 | rts, |
403 | | - filehandles, |
404 | 407 | lock)) |
405 | 408 | for x in xrange(rts.number_of_processes)] |
406 | 409 | |
— | — | @@ -411,11 +414,11 @@ |
412 | 415 | w.start() |
413 | 416 | |
414 | 417 | tasks.join() |
415 | | - filehandles = [fh.close() for fh in filehandles] |
416 | 418 | |
417 | | - result = sum([consumer.exitcode for consumer in consumers]) |
| 419 | + result = sum([consumer.exitcode for consumer in consumers |
| 420 | + if consumer.exitcode != None]) |
418 | 421 | |
419 | | - if restult == 0: |
| 422 | + if result == 0: |
420 | 423 | return True |
421 | 424 | else: |
422 | 425 | return False |
— | — | @@ -427,5 +430,6 @@ |
428 | 431 | filename = 'svwiki-latest-stub-meta-history.xml' |
429 | 432 | parse_dumpfile(project, filename, language_code) |
430 | 433 | |
| 434 | + |
431 | 435 | if __name__ == '__main__': |
432 | 436 | debug() |