r86559 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r86558‎ | r86559 | r86560 >
Date:22:38, 20 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Storing of article information now uses multiprocessing as well.
Modified paths:
  • /trunk/tools/editor_trends/etl/store.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/etl/store.py
@@ -19,7 +19,6 @@
2020
2121 from Queue import Empty
2222 import multiprocessing
23 -import os
2423 import progressbar
2524
2625 from utils import file_utils
@@ -38,7 +37,8 @@
3938 The treshold is currently more than 9 edits and is not yet configurable.
4039 '''
4140 def run(self):
42 - db = storage.Database(self.rts.storage, self.rts.dbname, self.rts.editors_raw)
 41+ db = storage.init_database(self.rts.storage, self.rts.dbname,
 42+ self.rts.editors_raw)
4343 editor_cache = cache.EditorCache(db)
4444 prev_editor = -1
4545 while True:
@@ -61,59 +61,49 @@
6262 if prev_editor != editor and prev_editor != -1:
6363 editor_cache.add(prev_editor, 'NEXT')
6464
65 - data = self.prepare_data(line)
 65+ data = prepare_data(line)
6666 #print editor, data['username']
6767 editor_cache.add(editor, data)
6868 prev_editor = editor
6969 fh.close()
7070 self.result.put(True)
7171
72 - def prepare_data(self, line):
73 - article_id = int(line[1])
74 - username = line[3].encode('utf-8')
75 - ns = int(line[4])
76 - date = text_utils.convert_timestamp_to_datetime_utc(line[6])
77 - md5 = line[7]
78 - revert = int(line[8])
79 - bot = int(line[9])
80 - cur_size = int(line[10])
81 - delta = int(line[11])
8272
83 - data = {'date': date,
84 - 'article': article_id,
85 - 'username': username,
86 - 'ns': ns,
87 - 'hash': md5,
88 - 'revert':revert,
89 - 'cur_size':cur_size,
90 - 'delta':delta,
91 - 'bot':bot
92 - }
93 - return data
94 -
95 -
96 -def store_articles(rts):
 73+def prepare_data(line):
9774 '''
98 - This function reads titles.csv and stores it in a separate collection.
99 - Besides containing the title of an article, it also includes:
100 - * namespace
101 - * category (if any)
102 - * article id
 75+ Prepare a single line to store in the database, this entails converting
 76+ to proper variable and taking care of the encoding.
10377 '''
104 - db = storage.Database(rts.storage, rts.dbname, rts.articles_raw)
105 - db.drop_collection()
106 - db.add_index('id')
107 - db.add_index('title')
108 - db.add_index('ns')
109 - db.add_index('category')
 78+ article_id = int(line[1])
 79+ username = line[3].encode('utf-8')
 80+ ns = int(line[4])
 81+ date = text_utils.convert_timestamp_to_datetime_utc(line[6])
 82+ md5 = line[7]
 83+ revert = int(line[8])
 84+ bot = int(line[9])
 85+ cur_size = int(line[10])
 86+ delta = int(line[11])
11087
111 - location = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt')
112 - files = file_utils.retrieve_file_list(rts.txt, extension='csv', mask='titles')
 88+ data = {'date': date,
 89+ 'article': article_id,
 90+ 'username': username,
 91+ 'ns': ns,
 92+ 'hash': md5,
 93+ 'revert':revert,
 94+ 'cur_size':cur_size,
 95+ 'delta':delta,
 96+ 'bot':bot
 97+ }
 98+ return data
11399
114 - print 'Storing articles...'
115 - for file in files:
116 - print 'Processing %s...' % file
117 - fh = file_utils.create_txt_filehandle(rts.txt, file, 'r', 'utf-8')
 100+
 101+def store_articles(tasks, rts):
 102+ while True:
 103+ filename = tasks.get(block=False)
 104+ if filename == None:
 105+ break
 106+ print 'Processing %s...' % filename
 107+ fh = file_utils.create_txt_filehandle(rts.txt, filename, 'r', 'utf-8')
118108 for line in fh:
119109 line = line.strip()
120110 line = line.split('\t')
@@ -132,25 +122,26 @@
133123 print 'Done storing articles...'
134124
135125
136 -def launcher(rts):
 126+def launcher_articles(rts):
137127 '''
138 - This is the main entry point and creates a number of workers and launches
139 - them.
 128+ This function reads titles.csv and stores it in a separate collection.
 129+ Besides containing the title of an article, it also includes:
 130+ * namespace
 131+ * category (if any)
 132+ * article id
140133 '''
141 - store_articles(rts)
142 - print 'Input directory is: %s ' % rts.sorted
143 - db = storage.Database(rts.storage, rts.dbname, rts.editors_raw)
 134+ db = storage.init_database(rts.storage, rts.dbname, rts.articles_raw)
144135 db.drop_collection()
145 - db.add_index('editor')
 136+ db.add_index('id')
 137+ db.add_index('title')
 138+ db.add_index('ns')
 139+ db.add_index('category')
146140
147 - files = file_utils.retrieve_file_list(rts.sorted, 'csv')
148 - pbar = progressbar.ProgressBar(maxval=len(files)).start()
149 -
 141+ files = file_utils.retrieve_file_list(rts.txt, extension='csv',
 142+ mask='articles')
150143 tasks = multiprocessing.JoinableQueue()
151 - result = multiprocessing.JoinableQueue()
152144
153 - consumers = [Storer(rts, tasks, result) for
154 - x in xrange(rts.number_of_processes)]
 145+ print 'Storing articles...'
155146
156147 for filename in files:
157148 tasks.put(filename)
@@ -158,24 +149,59 @@
159150 for x in xrange(rts.number_of_processes):
160151 tasks.put(None)
161152
162 - for w in consumers:
163 - w.start()
 153+ storers = Process(target=store_articles, args=[tasks, rts] for
 154+ x in xrange(rts.number_of_processes))
164155
165 - ppills = rts.number_of_processes
166 - while True:
167 - while ppills > 0:
168 - try:
169 - res = result.get(block=False)
170 - if res == True:
171 - pbar.update(pbar.currval + 1)
172 - else:
173 - ppills -= 1
174 - except Empty:
175 - pass
176 - break
 156+ for storer in storers:
 157+ storer.start()
177158
178159 tasks.join()
179160
180161
 162+def launcher(rts):
 163+ '''
 164+ This is the main entry point and creates a number of workers and launches
 165+ them.
 166+ '''
 167+ launcher_articles(rts)
 168+# print 'Input directory is: %s ' % rts.sorted
 169+# db = storage.init_database(rts.storage, rts.dbname, rts.editors_raw)
 170+# db.drop_collection()
 171+# db.add_index('editor')
 172+#
 173+# files = file_utils.retrieve_file_list(rts.sorted, 'csv')
 174+# pbar = progressbar.ProgressBar(maxval=len(files)).start()
 175+#
 176+# tasks = multiprocessing.JoinableQueue()
 177+# result = multiprocessing.JoinableQueue()
 178+#
 179+# storers = [Storer(rts, tasks, result) for
 180+# x in xrange(rts.number_of_processes)]
 181+#
 182+# for filename in files:
 183+# tasks.put(filename)
 184+#
 185+# for x in xrange(rts.number_of_processes):
 186+# tasks.put(None)
 187+#
 188+# for storer in storers:
 189+# storer.start()
 190+#
 191+# ppills = rts.number_of_processes
 192+# while True:
 193+# while ppills > 0:
 194+# try:
 195+# res = result.get(block=False)
 196+# if res == True:
 197+# pbar.update(pbar.currval + 1)
 198+# else:
 199+# ppills -= 1
 200+# except Empty:
 201+# pass
 202+# break
 203+#
 204+# tasks.join()
 205+
 206+
181207 if __name__ == '__main__':
182208 pass