r86673 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r86672‎ | r86673 | r86674 >
Date:22:15, 21 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Fixed sorting bug and implemented pylint suggestions
Modified paths:
  • /trunk/tools/editor_trends/etl/downloader.py (modified) (history)
  • /trunk/tools/editor_trends/etl/extracter.py (modified) (history)
  • /trunk/tools/editor_trends/etl/sort.py (modified) (history)
  • /trunk/tools/editor_trends/etl/store.py (modified) (history)
  • /trunk/tools/editor_trends/etl/transformer.py (modified) (history)
  • /trunk/tools/editor_trends/etl/variables.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/variables.py
@@ -14,7 +14,7 @@
1515
1616
1717 __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
18 -__author__email = 'dvanliere at gmail dot com'
 18+__email__ = 'dvanliere at gmail dot com'
1919 __date__ = '2011-04-10'
2020 __version__ = '0.1'
2121
@@ -37,6 +37,9 @@
3838
3939
4040 def validate_ip(address):
 41+ '''
 42+ Determine whether a username is an IP4 address.
 43+ '''
4144 parts = address.split(".")
4245 if len(parts) != 4:
4346 return False
@@ -51,6 +54,9 @@
5255
5356
5457 def extract_revision_text(revision, xml_namespace):
 58+ '''
 59+ Extract the actual text from a revision.
 60+ '''
5561 rev_text = revision.find('%s%s' % (xml_namespace, 'text'))
5662 if rev_text.text == None:
5763 rev_text.text = fix_revision_text(revision)
@@ -58,6 +64,7 @@
5965
6066
6167 def parse_title(title):
 68+ ''' Extract the text of a title of an article'''
6269 return title.text
6370
6471
@@ -102,6 +109,7 @@
103110
104111
105112 def extract_username(contributor, xml_namespace):
 113+ '''Extract the username of the contributor'''
106114 contributor = contributor.find('%s%s' % (xml_namespace, 'username'))
107115 if contributor != None:
108116 return contributor.text
@@ -149,11 +157,18 @@
150158
151159
152160 def fix_revision_text(revision):
 161+ '''
 162+ If revision text is None then replace by empty string so other functions
 163+ still work
 164+ '''
153165 if revision.text == None:
154166 return ''
155167
156168
157169 def create_md5hash(text):
 170+ '''
 171+ Calculate md5 hash based on revision text.
 172+ '''
158173 hash = {}
159174 if text != None:
160175 m = hashlib.md5()
@@ -166,6 +181,10 @@
167182
168183
169184 def calculate_delta_article_size(size, text):
 185+ '''
 186+ Determine how many characters were added / removed compared to previous
 187+ version of text.
 188+ '''
170189 if text == None:
171190 text = ''
172191 if 'prev_size' not in size:
@@ -181,6 +200,9 @@
182201
183202
184203 def parse_contributor(revision, bots, xml_namespace):
 204+ '''
 205+ Function that takes care of all contributor related variables.
 206+ '''
185207 username = extract_username(revision, xml_namespace)
186208 user_id = extract_contributor_id(revision, xml_namespace)
187209 bot = determine_username_is_bot(revision, bots, xml_namespace)
@@ -214,6 +236,9 @@
215237
216238
217239 def is_revision_reverted(hash_cur, hashes):
 240+ '''
 241+ Determine whether an edit was reverted or not based on md5 hashes
 242+ '''
218243 revert = {}
219244 if hash_cur in hashes and hash_cur != -1:
220245 revert['revert'] = 1
@@ -223,6 +248,9 @@
224249
225250
226251 def extract_revision_id(revision_id):
 252+ '''
 253+ Determine the id of a revision
 254+ '''
227255 if revision_id != None:
228256 return revision_id.text
229257 else:
@@ -230,6 +258,9 @@
231259
232260
233261 def extract_comment_text(revision_id, revision):
 262+ '''
 263+ Extract the comment associated with an edit.
 264+ '''
234265 comment = {}
235266 text = revision.find('comment')
236267 if text != None and text.text != None:
Index: trunk/tools/editor_trends/etl/store.py
@@ -103,7 +103,7 @@
104104 try:
105105 filename = tasks.get(block=False)
106106 except Empty:
107 - pass
 107+ continue
108108
109109 if filename == None:
110110 break
@@ -168,45 +168,46 @@
169169 This is the main entry point and creates a number of workers and launches
170170 them.
171171 '''
172 - launcher_articles(rts)
173 -# print 'Input directory is: %s ' % rts.sorted
174 -# db = storage.init_database(rts.storage, rts.dbname, rts.editors_raw)
175 -# db.drop_collection()
176 -# db.add_index('editor')
177 -#
178 -# files = file_utils.retrieve_file_list(rts.sorted, 'csv')
179 -# pbar = progressbar.ProgressBar(maxval=len(files)).start()
180 -#
181 -# tasks = multiprocessing.JoinableQueue()
182 -# result = multiprocessing.JoinableQueue()
183 -#
184 -# storers = [Storer(rts, tasks, result) for
185 -# x in xrange(rts.number_of_processes)]
186 -#
187 -# for filename in files:
188 -# tasks.put(filename)
189 -#
190 -# for x in xrange(rts.number_of_processes):
191 -# tasks.put(None)
192 -#
193 -# for storer in storers:
194 -# storer.start()
195 -#
196 -# ppills = rts.number_of_processes
197 -# while True:
198 -# while ppills > 0:
199 -# try:
200 -# res = result.get(block=False)
201 -# if res == True:
202 -# pbar.update(pbar.currval + 1)
203 -# else:
204 -# ppills -= 1
205 -# except Empty:
206 -# pass
207 -# break
208 -#
209 -# tasks.join()
 172+ #launcher_articles(rts)
 173+ print 'Input directory is: %s ' % rts.sorted
 174+ db = storage.init_database(rts.storage, rts.dbname, rts.editors_raw)
 175+ db.drop_collection()
 176+ db.add_index('editor')
 177+ db.add_index('username')
210178
 179+ files = file_utils.retrieve_file_list(rts.sorted, 'csv')
 180+ pbar = progressbar.ProgressBar(maxval=len(files)).start()
211181
 182+ tasks = multiprocessing.JoinableQueue()
 183+ result = multiprocessing.JoinableQueue()
 184+
 185+ storers = [Storer(rts, tasks, result) for
 186+ x in xrange(rts.number_of_processes)]
 187+
 188+ for filename in files:
 189+ tasks.put(filename)
 190+
 191+ for x in xrange(rts.number_of_processes):
 192+ tasks.put(None)
 193+
 194+ for storer in storers:
 195+ storer.start()
 196+
 197+ ppills = rts.number_of_processes
 198+ while True:
 199+ while ppills > 0:
 200+ try:
 201+ res = result.get(block=False)
 202+ if res == True:
 203+ pbar.update(pbar.currval + 1)
 204+ else:
 205+ ppills -= 1
 206+ except Empty:
 207+ pass
 208+ break
 209+
 210+ tasks.join()
 211+
 212+
212213 if __name__ == '__main__':
213214 pass
Index: trunk/tools/editor_trends/etl/downloader.py
@@ -21,6 +21,7 @@
2222 import progressbar
2323 import multiprocessing
2424 import sys
 25+import os
2526
2627 from utils import file_utils
2728 from utils import http_utils
@@ -43,7 +44,7 @@
4445 print 'Swallowed a poison pill'
4546 break
4647 widgets = log.init_progressbar_widgets(filename)
47 - extension = file_utils.determine_file_extension(filename)
 48+ extension = os.path.splitext(filename)[1]
4849 filemode = file_utils.determine_file_mode(extension)
4950 filesize = http_utils.determine_remote_filesize(properties.wp_dump_location,
5051 properties.dump_relative_path,
Index: trunk/tools/editor_trends/etl/extracter.py
@@ -12,12 +12,21 @@
1313 http://www.fsf.org/licenses/gpl.html
1414 '''
1515
16 -
1716 __author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
18 -__author__email = 'dvanliere at gmail dot com'
 17+__email__ = 'dvanliere at gmail dot com'
1918 __date__ = '2011-04-10'
2019 __version__ = '0.1'
2120
 21+'''
 22+The extracter module takes care of decompressing a Wikipedia XML dumpfile,
 23+parsing the XML on the fly and extracting & constructing the variables that are
 24+need for subsequent analysis. The extract module is initialized using an
 25+instance of RunTimeSettings and the most important parameters are:
 26+The name of project\n
 27+The language of the project\n
 28+The location where the dump files are stored
 29+'''
 30+
2231 from collections import deque
2332 import sys
2433 import os
@@ -25,6 +34,7 @@
2635 from xml.etree.cElementTree import iterparse, dump
2736 from multiprocessing import JoinableQueue, Process, cpu_count
2837
 38+
2939 if '..' not in sys.path:
3040 sys.path.append('..')
3141
@@ -36,8 +46,8 @@
3747 def parse_revision(revision, article, xml_namespace, cache, bots, md5hashes, size):
3848 '''
3949 This function has as input a single revision from a Wikipedia dump file,
40 - article information it belongs to, the xml_namespace of the Wikipedia dump
41 - file, the cache object that collects parsed revisions, a list of md5hashes
 50+ article id it belongs to, the xml_namespace of the Wikipedia dump file,
 51+ the cache object that collects parsed revisions, a list of md5hashes
4252 to determine whether an edit was reverted and a size dictionary to determine
4353 how many characters were added and removed compared to the previous revision.
4454 '''
@@ -80,31 +90,11 @@
8191 return md5hashes, size
8292
8393
84 -def datacompetition_parse_revision(revision, xml_namespace, bots, counts):
 94+def parse_xml(fh, rts, cache, process_id, file_id):
8595 '''
86 - This function has as input a single revision from a Wikipedia dump file,
87 - article information it belongs to, the xml_namespace of the Wikipedia dump
88 - file, the cache object that collects parsed revisions, a list of md5hashes
89 - to determine whether an edit was reverted and a size dictionary to determine
90 - how many characters were added and removed compared to the previous revision.
 96+ This function initializes the XML parser and calls the appropriate function
 97+ to extract / construct the variables from the XML stream.
9198 '''
92 - if revision == None:
93 - #the entire revision is empty, weird.
94 - #dump(revision)
95 - return counts
96 -
97 - contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
98 - contributor = variables.parse_contributor(contributor, bots, xml_namespace)
99 - if not contributor:
100 - #editor is anonymous, ignore
101 - return counts
102 - else:
103 - counts.setdefault(contributor['id'], 0)
104 - counts[contributor['id']] += 1
105 - return counts
106 -
107 -
108 -def parse_xml(fh, rts, cache, process_id, file_id):
10999 bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
110100 include_ns = {3: 'User Talk',
111101 5: 'Wikipedia Talk',
@@ -125,12 +115,23 @@
126116 try:
127117 for event, elem in context:
128118 if event is end and elem.tag.endswith('siteinfo'):
 119+ '''
 120+ This event happens once for every dump file and is used to
 121+ determine the version of the generator used to generate the XML
 122+ file.
 123+ '''
129124 xml_namespace = variables.determine_xml_namespace(elem)
130125 namespaces = variables.create_namespace_dict(elem, xml_namespace)
131126 ns = True
132127 elem.clear()
133128
134129 elif event is end and elem.tag.endswith('title'):
 130+ '''
 131+ This function determines the title of an article and the
 132+ namespace to which it belongs. Then, if the namespace is one
 133+ which we are interested set parse to True so that we start
 134+ parsing this article, else it will skip this article.
 135+ '''
135136 title = variables.parse_title(elem)
136137 article['title'] = title
137138 current_namespace = variables.determine_namespace(title, namespaces, include_ns)
@@ -145,6 +146,11 @@
146147 elem.clear()
147148
148149 elif elem.tag.endswith('revision'):
 150+ '''
 151+ This function does the actual analysis of an individual revision,
 152+ calculating size difference between this and previous revision and
 153+ calculating md5 hash to determine whether this edit was reverted.
 154+ '''
149155 if parse:
150156 if event is start:
151157 clear = False
@@ -158,6 +164,9 @@
159165 elem.clear()
160166
161167 elif event is end and elem.tag.endswith('id') and id == False:
 168+ '''
 169+ Determine id of article
 170+ '''
162171 article['article_id'] = elem.text
163172 if isinstance(current_namespace, int):
164173 cache.articles[article['article_id']] = title_meta
@@ -165,6 +174,10 @@
166175 elem.clear()
167176
168177 elif event is end and elem.tag.endswith('page'):
 178+ '''
 179+ We have reached end of an article, reset all variables and free
 180+ memory.
 181+ '''
169182 elem.clear()
170183 #Reset all variables for next article
171184 article = {}
@@ -185,6 +198,9 @@
186199
187200
188201 def stream_raw_xml(input_queue, process_id, fhd, rts):
 202+ '''
 203+ This function fetches an XML file from the queue and launches the processor.
 204+ '''
189205 t0 = datetime.now()
190206 file_id = 0
191207 cache = buffer.CSVBuffer(process_id, rts, fhd)
@@ -216,6 +232,10 @@
217233
218234
219235 def launcher(rts):
 236+ '''
 237+ This function initializes the multiprocessor, and loading the queue with
 238+ the compressed XML files.
 239+ '''
220240 input_queue = JoinableQueue()
221241
222242 files = file_utils.retrieve_file_list(rts.input_location)
Index: trunk/tools/editor_trends/etl/transformer.py
@@ -234,11 +234,11 @@
235235 month = edit['date'].month
236236 ns = edit['ns']
237237 dc[year][month].setdefault(ns, {})
238 - dc[year][month][ns].setdefault('added', 0)
239 - dc[year][month][ns].setdefault('removed', 0)
240238 if edit['delta'] < 0:
 239+ dc[year][month][ns].setdefault('added', 0)
241240 dc[year][month][ns]['removed'] += edit['delta']
242241 elif edit['delta'] > 0:
 242+ dc[year][month][ns].setdefault('removed', 0)
243243 dc[year][month][ns]['added'] += edit['delta']
244244 dc = cleanup_datacontainer(dc, {})
245245 return dc
@@ -281,6 +281,7 @@
282282 for month in articles_edited[year]:
283283 for ns in articles_edited[year][month]:
284284 dc[year][month][ns] = len(articles_edited[year][month][ns])
 285+ dc = cleanup_datacontainer(dc, {})
285286 return dc
286287
287288
Index: trunk/tools/editor_trends/etl/sort.py
@@ -55,6 +55,11 @@
5656 fh.close()
5757 for x, d in enumerate(data):
5858 d = d.strip().split('\t')
 59+ #TEMP FIX:
 60+ editor = d[2]
 61+ d[2] = d[0]
 62+ d[0] = editor
 63+ #END TEMP FIX
5964 data[x] = d
6065 #data = [d.strip() for d in data]
6166 #data = [d.split('\t') for d in data]
@@ -113,21 +118,18 @@
114119 return result
115120
116121
117 -def merge_sorted_files(target, files, iteration, rts):
 122+def merge_sorted_files(target, files):
118123 '''
119124 Merges smaller sorted files in one big file, Only used for creating
120125 data competition file.
121126 '''
122 - fh = file_utils.create_txt_filehandle(target,
123 - 'merged_%s.txt' % iteration,
124 - 'w',
125 - 'utf-8')
 127+ fh = file_utils.create_txt_filehandle(target, 'kaggle.csv', 'w', 'utf-8')
126128 lines = 0
127129 for line in heapq.merge(*[readline(filename) for filename in files]):
128130 file_utils.write_list_to_csv(line, fh)
129131 lines += 1
130132 fh.close()
131 - print lines
 133+ print 'Total number of edits: %s ' % lines
132134 return fh.name
133135
134136
@@ -151,20 +153,19 @@
152154 pbar = progressbar.ProgressBar(maxval=len(files)).start()
153155 tasks = multiprocessing.JoinableQueue()
154156 result = multiprocessing.JoinableQueue()
 157+ number_of_processes = 3
 158+ sorters = [Sorter(rts, tasks, result) for x in xrange(number_of_processes)]
155159
156 - consumers = [Sorter(rts, tasks, result) for
157 - x in xrange(rts.number_of_processes)]
158 -
159160 for filename in files:
160161 tasks.put(filename)
161162
162 - for x in xrange(rts.number_of_processes):
 163+ for x in xrange(number_of_processes):
163164 tasks.put(None)
164165
165 - for w in consumers:
166 - w.start()
 166+ for sorter in sorters:
 167+ sorter.start()
167168
168 - ppills = rts.number_of_processes
 169+ ppills = number_of_processes
169170 while True:
170171 while ppills > 0:
171172 try: