Index: trunk/tools/editor_trends/etl/variables.py |
— | — | @@ -14,7 +14,7 @@ |
15 | 15 | |
16 | 16 | |
17 | 17 | __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
18 | | -__author__email = 'dvanliere at gmail dot com' |
| 18 | +__email__ = 'dvanliere at gmail dot com' |
19 | 19 | __date__ = '2011-04-10' |
20 | 20 | __version__ = '0.1' |
21 | 21 | |
— | — | @@ -37,6 +37,9 @@ |
38 | 38 | |
39 | 39 | |
40 | 40 | def validate_ip(address): |
| 41 | + ''' |
| 42 | + Determine whether a username is an IP4 address. |
| 43 | + ''' |
41 | 44 | parts = address.split(".") |
42 | 45 | if len(parts) != 4: |
43 | 46 | return False |
— | — | @@ -51,6 +54,9 @@ |
52 | 55 | |
53 | 56 | |
54 | 57 | def extract_revision_text(revision, xml_namespace): |
| 58 | + ''' |
| 59 | + Extract the actual text from a revision. |
| 60 | + ''' |
55 | 61 | rev_text = revision.find('%s%s' % (xml_namespace, 'text')) |
56 | 62 | if rev_text.text == None: |
57 | 63 | rev_text.text = fix_revision_text(revision) |
— | — | @@ -58,6 +64,7 @@ |
59 | 65 | |
60 | 66 | |
61 | 67 | def parse_title(title): |
| 68 | + ''' Extract the text of a title of an article''' |
62 | 69 | return title.text |
63 | 70 | |
64 | 71 | |
— | — | @@ -102,6 +109,7 @@ |
103 | 110 | |
104 | 111 | |
105 | 112 | def extract_username(contributor, xml_namespace): |
| 113 | + '''Extract the username of the contributor''' |
106 | 114 | contributor = contributor.find('%s%s' % (xml_namespace, 'username')) |
107 | 115 | if contributor != None: |
108 | 116 | return contributor.text |
— | — | @@ -149,11 +157,18 @@ |
150 | 158 | |
151 | 159 | |
152 | 160 | def fix_revision_text(revision): |
| 161 | + ''' |
| 162 | + If revision text is None then replace by empty string so other functions |
| 163 | + still work |
| 164 | + ''' |
153 | 165 | if revision.text == None: |
154 | 166 | return '' |
155 | 167 | |
156 | 168 | |
157 | 169 | def create_md5hash(text): |
| 170 | + ''' |
| 171 | + Calculate md5 hash based on revision text. |
| 172 | + ''' |
158 | 173 | hash = {} |
159 | 174 | if text != None: |
160 | 175 | m = hashlib.md5() |
— | — | @@ -166,6 +181,10 @@ |
167 | 182 | |
168 | 183 | |
169 | 184 | def calculate_delta_article_size(size, text): |
| 185 | + ''' |
| 186 | + Determine how many characters were added / removed compared to previous |
| 187 | + version of text. |
| 188 | + ''' |
170 | 189 | if text == None: |
171 | 190 | text = '' |
172 | 191 | if 'prev_size' not in size: |
— | — | @@ -181,6 +200,9 @@ |
182 | 201 | |
183 | 202 | |
184 | 203 | def parse_contributor(revision, bots, xml_namespace): |
| 204 | + ''' |
| 205 | + Function that takes care of all contributor related variables. |
| 206 | + ''' |
185 | 207 | username = extract_username(revision, xml_namespace) |
186 | 208 | user_id = extract_contributor_id(revision, xml_namespace) |
187 | 209 | bot = determine_username_is_bot(revision, bots, xml_namespace) |
— | — | @@ -214,6 +236,9 @@ |
215 | 237 | |
216 | 238 | |
217 | 239 | def is_revision_reverted(hash_cur, hashes): |
| 240 | + ''' |
| 241 | + Determine whether an edit was reverted or not based on md5 hashes |
| 242 | + ''' |
218 | 243 | revert = {} |
219 | 244 | if hash_cur in hashes and hash_cur != -1: |
220 | 245 | revert['revert'] = 1 |
— | — | @@ -223,6 +248,9 @@ |
224 | 249 | |
225 | 250 | |
226 | 251 | def extract_revision_id(revision_id): |
| 252 | + ''' |
| 253 | + Determine the id of a revision |
| 254 | + ''' |
227 | 255 | if revision_id != None: |
228 | 256 | return revision_id.text |
229 | 257 | else: |
— | — | @@ -230,6 +258,9 @@ |
231 | 259 | |
232 | 260 | |
233 | 261 | def extract_comment_text(revision_id, revision): |
| 262 | + ''' |
| 263 | + Extract the comment associated with an edit. |
| 264 | + ''' |
234 | 265 | comment = {} |
235 | 266 | text = revision.find('comment') |
236 | 267 | if text != None and text.text != None: |
Index: trunk/tools/editor_trends/etl/store.py |
— | — | @@ -103,7 +103,7 @@ |
104 | 104 | try: |
105 | 105 | filename = tasks.get(block=False) |
106 | 106 | except Empty: |
107 | | - pass |
| 107 | + continue |
108 | 108 | |
109 | 109 | if filename == None: |
110 | 110 | break |
— | — | @@ -168,45 +168,46 @@ |
169 | 169 | This is the main entry point and creates a number of workers and launches |
170 | 170 | them. |
171 | 171 | ''' |
172 | | - launcher_articles(rts) |
173 | | -# print 'Input directory is: %s ' % rts.sorted |
174 | | -# db = storage.init_database(rts.storage, rts.dbname, rts.editors_raw) |
175 | | -# db.drop_collection() |
176 | | -# db.add_index('editor') |
177 | | -# |
178 | | -# files = file_utils.retrieve_file_list(rts.sorted, 'csv') |
179 | | -# pbar = progressbar.ProgressBar(maxval=len(files)).start() |
180 | | -# |
181 | | -# tasks = multiprocessing.JoinableQueue() |
182 | | -# result = multiprocessing.JoinableQueue() |
183 | | -# |
184 | | -# storers = [Storer(rts, tasks, result) for |
185 | | -# x in xrange(rts.number_of_processes)] |
186 | | -# |
187 | | -# for filename in files: |
188 | | -# tasks.put(filename) |
189 | | -# |
190 | | -# for x in xrange(rts.number_of_processes): |
191 | | -# tasks.put(None) |
192 | | -# |
193 | | -# for storer in storers: |
194 | | -# storer.start() |
195 | | -# |
196 | | -# ppills = rts.number_of_processes |
197 | | -# while True: |
198 | | -# while ppills > 0: |
199 | | -# try: |
200 | | -# res = result.get(block=False) |
201 | | -# if res == True: |
202 | | -# pbar.update(pbar.currval + 1) |
203 | | -# else: |
204 | | -# ppills -= 1 |
205 | | -# except Empty: |
206 | | -# pass |
207 | | -# break |
208 | | -# |
209 | | -# tasks.join() |
| 172 | + #launcher_articles(rts) |
| 173 | + print 'Input directory is: %s ' % rts.sorted |
| 174 | + db = storage.init_database(rts.storage, rts.dbname, rts.editors_raw) |
| 175 | + db.drop_collection() |
| 176 | + db.add_index('editor') |
| 177 | + db.add_index('username') |
210 | 178 | |
| 179 | + files = file_utils.retrieve_file_list(rts.sorted, 'csv') |
| 180 | + pbar = progressbar.ProgressBar(maxval=len(files)).start() |
211 | 181 | |
| 182 | + tasks = multiprocessing.JoinableQueue() |
| 183 | + result = multiprocessing.JoinableQueue() |
| 184 | + |
| 185 | + storers = [Storer(rts, tasks, result) for |
| 186 | + x in xrange(rts.number_of_processes)] |
| 187 | + |
| 188 | + for filename in files: |
| 189 | + tasks.put(filename) |
| 190 | + |
| 191 | + for x in xrange(rts.number_of_processes): |
| 192 | + tasks.put(None) |
| 193 | + |
| 194 | + for storer in storers: |
| 195 | + storer.start() |
| 196 | + |
| 197 | + ppills = rts.number_of_processes |
| 198 | + while True: |
| 199 | + while ppills > 0: |
| 200 | + try: |
| 201 | + res = result.get(block=False) |
| 202 | + if res == True: |
| 203 | + pbar.update(pbar.currval + 1) |
| 204 | + else: |
| 205 | + ppills -= 1 |
| 206 | + except Empty: |
| 207 | + pass |
| 208 | + break |
| 209 | + |
| 210 | + tasks.join() |
| 211 | + |
| 212 | + |
212 | 213 | if __name__ == '__main__': |
213 | 214 | pass |
Index: trunk/tools/editor_trends/etl/downloader.py |
— | — | @@ -21,6 +21,7 @@ |
22 | 22 | import progressbar |
23 | 23 | import multiprocessing |
24 | 24 | import sys |
| 25 | +import os |
25 | 26 | |
26 | 27 | from utils import file_utils |
27 | 28 | from utils import http_utils |
— | — | @@ -43,7 +44,7 @@ |
44 | 45 | print 'Swallowed a poison pill' |
45 | 46 | break |
46 | 47 | widgets = log.init_progressbar_widgets(filename) |
47 | | - extension = file_utils.determine_file_extension(filename) |
| 48 | + extension = os.path.splitext(filename)[1] |
48 | 49 | filemode = file_utils.determine_file_mode(extension) |
49 | 50 | filesize = http_utils.determine_remote_filesize(properties.wp_dump_location, |
50 | 51 | properties.dump_relative_path, |
Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -12,12 +12,21 @@ |
13 | 13 | http://www.fsf.org/licenses/gpl.html
|
14 | 14 | '''
|
15 | 15 |
|
16 | | -
|
17 | 16 | __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
|
18 | | -__author__email = 'dvanliere at gmail dot com'
|
| 17 | +__email__ = 'dvanliere at gmail dot com'
|
19 | 18 | __date__ = '2011-04-10'
|
20 | 19 | __version__ = '0.1'
|
21 | 20 |
|
| 21 | +'''
|
| 22 | +The extracter module takes care of decompressing a Wikipedia XML dumpfile,
|
| 23 | +parsing the XML on the fly and extracting & constructing the variables that are
|
| 24 | +need for subsequent analysis. The extract module is initialized using an
|
| 25 | +instance of RunTimeSettings and the most important parameters are:
|
| 26 | +The name of project\n
|
| 27 | +The language of the project\n
|
| 28 | +The location where the dump files are stored
|
| 29 | +'''
|
| 30 | +
|
22 | 31 | from collections import deque
|
23 | 32 | import sys
|
24 | 33 | import os
|
— | — | @@ -25,6 +34,7 @@ |
26 | 35 | from xml.etree.cElementTree import iterparse, dump
|
27 | 36 | from multiprocessing import JoinableQueue, Process, cpu_count
|
28 | 37 |
|
| 38 | +
|
29 | 39 | if '..' not in sys.path:
|
30 | 40 | sys.path.append('..')
|
31 | 41 |
|
— | — | @@ -36,8 +46,8 @@ |
37 | 47 | def parse_revision(revision, article, xml_namespace, cache, bots, md5hashes, size):
|
38 | 48 | '''
|
39 | 49 | This function has as input a single revision from a Wikipedia dump file,
|
40 | | - article information it belongs to, the xml_namespace of the Wikipedia dump
|
41 | | - file, the cache object that collects parsed revisions, a list of md5hashes
|
| 50 | + article id it belongs to, the xml_namespace of the Wikipedia dump file,
|
| 51 | + the cache object that collects parsed revisions, a list of md5hashes
|
42 | 52 | to determine whether an edit was reverted and a size dictionary to determine
|
43 | 53 | how many characters were added and removed compared to the previous revision.
|
44 | 54 | '''
|
— | — | @@ -80,31 +90,11 @@ |
81 | 91 | return md5hashes, size
|
82 | 92 |
|
83 | 93 |
|
84 | | -def datacompetition_parse_revision(revision, xml_namespace, bots, counts):
|
| 94 | +def parse_xml(fh, rts, cache, process_id, file_id):
|
85 | 95 | '''
|
86 | | - This function has as input a single revision from a Wikipedia dump file,
|
87 | | - article information it belongs to, the xml_namespace of the Wikipedia dump
|
88 | | - file, the cache object that collects parsed revisions, a list of md5hashes
|
89 | | - to determine whether an edit was reverted and a size dictionary to determine
|
90 | | - how many characters were added and removed compared to the previous revision.
|
| 96 | + This function initializes the XML parser and calls the appropriate function
|
| 97 | + to extract / construct the variables from the XML stream.
|
91 | 98 | '''
|
92 | | - if revision == None:
|
93 | | - #the entire revision is empty, weird.
|
94 | | - #dump(revision)
|
95 | | - return counts
|
96 | | -
|
97 | | - contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
|
98 | | - contributor = variables.parse_contributor(contributor, bots, xml_namespace)
|
99 | | - if not contributor:
|
100 | | - #editor is anonymous, ignore
|
101 | | - return counts
|
102 | | - else:
|
103 | | - counts.setdefault(contributor['id'], 0)
|
104 | | - counts[contributor['id']] += 1
|
105 | | - return counts
|
106 | | -
|
107 | | -
|
108 | | -def parse_xml(fh, rts, cache, process_id, file_id):
|
109 | 99 | bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
|
110 | 100 | include_ns = {3: 'User Talk',
|
111 | 101 | 5: 'Wikipedia Talk',
|
— | — | @@ -125,12 +115,23 @@ |
126 | 116 | try:
|
127 | 117 | for event, elem in context:
|
128 | 118 | if event is end and elem.tag.endswith('siteinfo'):
|
| 119 | + '''
|
| 120 | + This event happens once for every dump file and is used to
|
| 121 | + determine the version of the generator used to generate the XML
|
| 122 | + file.
|
| 123 | + '''
|
129 | 124 | xml_namespace = variables.determine_xml_namespace(elem)
|
130 | 125 | namespaces = variables.create_namespace_dict(elem, xml_namespace)
|
131 | 126 | ns = True
|
132 | 127 | elem.clear()
|
133 | 128 |
|
134 | 129 | elif event is end and elem.tag.endswith('title'):
|
| 130 | + '''
|
| 131 | + This function determines the title of an article and the
|
| 132 | + namespace to which it belongs. Then, if the namespace is one
|
| 133 | + which we are interested set parse to True so that we start
|
| 134 | + parsing this article, else it will skip this article.
|
| 135 | + '''
|
135 | 136 | title = variables.parse_title(elem)
|
136 | 137 | article['title'] = title
|
137 | 138 | current_namespace = variables.determine_namespace(title, namespaces, include_ns)
|
— | — | @@ -145,6 +146,11 @@ |
146 | 147 | elem.clear()
|
147 | 148 |
|
148 | 149 | elif elem.tag.endswith('revision'):
|
| 150 | + '''
|
| 151 | + This function does the actual analysis of an individual revision,
|
| 152 | + calculating size difference between this and previous revision and
|
| 153 | + calculating md5 hash to determine whether this edit was reverted.
|
| 154 | + '''
|
149 | 155 | if parse:
|
150 | 156 | if event is start:
|
151 | 157 | clear = False
|
— | — | @@ -158,6 +164,9 @@ |
159 | 165 | elem.clear()
|
160 | 166 |
|
161 | 167 | elif event is end and elem.tag.endswith('id') and id == False:
|
| 168 | + '''
|
| 169 | + Determine id of article
|
| 170 | + '''
|
162 | 171 | article['article_id'] = elem.text
|
163 | 172 | if isinstance(current_namespace, int):
|
164 | 173 | cache.articles[article['article_id']] = title_meta
|
— | — | @@ -165,6 +174,10 @@ |
166 | 175 | elem.clear()
|
167 | 176 |
|
168 | 177 | elif event is end and elem.tag.endswith('page'):
|
| 178 | + '''
|
| 179 | + We have reached end of an article, reset all variables and free
|
| 180 | + memory.
|
| 181 | + '''
|
169 | 182 | elem.clear()
|
170 | 183 | #Reset all variables for next article
|
171 | 184 | article = {}
|
— | — | @@ -185,6 +198,9 @@ |
186 | 199 |
|
187 | 200 |
|
188 | 201 | def stream_raw_xml(input_queue, process_id, fhd, rts):
|
| 202 | + '''
|
| 203 | + This function fetches an XML file from the queue and launches the processor.
|
| 204 | + '''
|
189 | 205 | t0 = datetime.now()
|
190 | 206 | file_id = 0
|
191 | 207 | cache = buffer.CSVBuffer(process_id, rts, fhd)
|
— | — | @@ -216,6 +232,10 @@ |
217 | 233 |
|
218 | 234 |
|
219 | 235 | def launcher(rts):
|
| 236 | + '''
|
| 237 | + This function initializes the multiprocessor, and loading the queue with
|
| 238 | + the compressed XML files.
|
| 239 | + '''
|
220 | 240 | input_queue = JoinableQueue()
|
221 | 241 |
|
222 | 242 | files = file_utils.retrieve_file_list(rts.input_location)
|
Index: trunk/tools/editor_trends/etl/transformer.py |
— | — | @@ -234,11 +234,11 @@ |
235 | 235 | month = edit['date'].month |
236 | 236 | ns = edit['ns'] |
237 | 237 | dc[year][month].setdefault(ns, {}) |
238 | | - dc[year][month][ns].setdefault('added', 0) |
239 | | - dc[year][month][ns].setdefault('removed', 0) |
240 | 238 | if edit['delta'] < 0: |
| 239 | + dc[year][month][ns].setdefault('added', 0) |
241 | 240 | dc[year][month][ns]['removed'] += edit['delta'] |
242 | 241 | elif edit['delta'] > 0: |
| 242 | + dc[year][month][ns].setdefault('removed', 0) |
243 | 243 | dc[year][month][ns]['added'] += edit['delta'] |
244 | 244 | dc = cleanup_datacontainer(dc, {}) |
245 | 245 | return dc |
— | — | @@ -281,6 +281,7 @@ |
282 | 282 | for month in articles_edited[year]: |
283 | 283 | for ns in articles_edited[year][month]: |
284 | 284 | dc[year][month][ns] = len(articles_edited[year][month][ns]) |
| 285 | + dc = cleanup_datacontainer(dc, {}) |
285 | 286 | return dc |
286 | 287 | |
287 | 288 | |
Index: trunk/tools/editor_trends/etl/sort.py |
— | — | @@ -55,6 +55,11 @@ |
56 | 56 | fh.close() |
57 | 57 | for x, d in enumerate(data): |
58 | 58 | d = d.strip().split('\t') |
| 59 | + #TEMP FIX: |
| 60 | + editor = d[2] |
| 61 | + d[2] = d[0] |
| 62 | + d[0] = editor |
| 63 | + #END TEMP FIX |
59 | 64 | data[x] = d |
60 | 65 | #data = [d.strip() for d in data] |
61 | 66 | #data = [d.split('\t') for d in data] |
— | — | @@ -113,21 +118,18 @@ |
114 | 119 | return result |
115 | 120 | |
116 | 121 | |
117 | | -def merge_sorted_files(target, files, iteration, rts): |
| 122 | +def merge_sorted_files(target, files): |
118 | 123 | ''' |
119 | 124 | Merges smaller sorted files in one big file, Only used for creating |
120 | 125 | data competition file. |
121 | 126 | ''' |
122 | | - fh = file_utils.create_txt_filehandle(target, |
123 | | - 'merged_%s.txt' % iteration, |
124 | | - 'w', |
125 | | - 'utf-8') |
| 127 | + fh = file_utils.create_txt_filehandle(target, 'kaggle.csv', 'w', 'utf-8') |
126 | 128 | lines = 0 |
127 | 129 | for line in heapq.merge(*[readline(filename) for filename in files]): |
128 | 130 | file_utils.write_list_to_csv(line, fh) |
129 | 131 | lines += 1 |
130 | 132 | fh.close() |
131 | | - print lines |
| 133 | + print 'Total number of edits: %s ' % lines |
132 | 134 | return fh.name |
133 | 135 | |
134 | 136 | |
— | — | @@ -151,20 +153,19 @@ |
152 | 154 | pbar = progressbar.ProgressBar(maxval=len(files)).start() |
153 | 155 | tasks = multiprocessing.JoinableQueue() |
154 | 156 | result = multiprocessing.JoinableQueue() |
| 157 | + number_of_processes = 3 |
| 158 | + sorters = [Sorter(rts, tasks, result) for x in xrange(number_of_processes)] |
155 | 159 | |
156 | | - consumers = [Sorter(rts, tasks, result) for |
157 | | - x in xrange(rts.number_of_processes)] |
158 | | - |
159 | 160 | for filename in files: |
160 | 161 | tasks.put(filename) |
161 | 162 | |
162 | | - for x in xrange(rts.number_of_processes): |
| 163 | + for x in xrange(number_of_processes): |
163 | 164 | tasks.put(None) |
164 | 165 | |
165 | | - for w in consumers: |
166 | | - w.start() |
| 166 | + for sorter in sorters: |
| 167 | + sorter.start() |
167 | 168 | |
168 | | - ppills = rts.number_of_processes |
| 169 | + ppills = number_of_processes |
169 | 170 | while True: |
170 | 171 | while ppills > 0: |
171 | 172 | try: |