Index: trunk/tools/editor_trends/etl/store.py |
— | — | @@ -19,7 +19,6 @@ |
20 | 20 | |
21 | 21 | from Queue import Empty |
22 | 22 | import multiprocessing |
23 | | -import os |
24 | 23 | import progressbar |
25 | 24 | |
26 | 25 | from utils import file_utils |
— | — | @@ -38,7 +37,8 @@ |
39 | 38 | The treshold is currently more than 9 edits and is not yet configurable. |
40 | 39 | ''' |
41 | 40 | def run(self): |
42 | | - db = storage.Database(self.rts.storage, self.rts.dbname, self.rts.editors_raw) |
| 41 | + db = storage.init_database(self.rts.storage, self.rts.dbname, |
| 42 | + self.rts.editors_raw) |
43 | 43 | editor_cache = cache.EditorCache(db) |
44 | 44 | prev_editor = -1 |
45 | 45 | while True: |
— | — | @@ -61,59 +61,49 @@ |
62 | 62 | if prev_editor != editor and prev_editor != -1: |
63 | 63 | editor_cache.add(prev_editor, 'NEXT') |
64 | 64 | |
65 | | - data = self.prepare_data(line) |
| 65 | + data = prepare_data(line) |
66 | 66 | #print editor, data['username'] |
67 | 67 | editor_cache.add(editor, data) |
68 | 68 | prev_editor = editor |
69 | 69 | fh.close() |
70 | 70 | self.result.put(True) |
71 | 71 | |
72 | | - def prepare_data(self, line): |
73 | | - article_id = int(line[1]) |
74 | | - username = line[3].encode('utf-8') |
75 | | - ns = int(line[4]) |
76 | | - date = text_utils.convert_timestamp_to_datetime_utc(line[6]) |
77 | | - md5 = line[7] |
78 | | - revert = int(line[8]) |
79 | | - bot = int(line[9]) |
80 | | - cur_size = int(line[10]) |
81 | | - delta = int(line[11]) |
82 | 72 | |
83 | | - data = {'date': date, |
84 | | - 'article': article_id, |
85 | | - 'username': username, |
86 | | - 'ns': ns, |
87 | | - 'hash': md5, |
88 | | - 'revert':revert, |
89 | | - 'cur_size':cur_size, |
90 | | - 'delta':delta, |
91 | | - 'bot':bot |
92 | | - } |
93 | | - return data |
94 | | - |
95 | | - |
96 | | -def store_articles(rts): |
| 73 | +def prepare_data(line): |
97 | 74 | ''' |
98 | | - This function reads titles.csv and stores it in a separate collection. |
99 | | - Besides containing the title of an article, it also includes: |
100 | | - * namespace |
101 | | - * category (if any) |
102 | | - * article id |
| 75 | + Prepare a single line to store in the database, this entails converting |
| 76 | + to proper variable and taking care of the encoding. |
103 | 77 | ''' |
104 | | - db = storage.Database(rts.storage, rts.dbname, rts.articles_raw) |
105 | | - db.drop_collection() |
106 | | - db.add_index('id') |
107 | | - db.add_index('title') |
108 | | - db.add_index('ns') |
109 | | - db.add_index('category') |
| 78 | + article_id = int(line[1]) |
| 79 | + username = line[3].encode('utf-8') |
| 80 | + ns = int(line[4]) |
| 81 | + date = text_utils.convert_timestamp_to_datetime_utc(line[6]) |
| 82 | + md5 = line[7] |
| 83 | + revert = int(line[8]) |
| 84 | + bot = int(line[9]) |
| 85 | + cur_size = int(line[10]) |
| 86 | + delta = int(line[11]) |
110 | 87 | |
111 | | - location = os.path.join(rts.input_location, rts.language.code, rts.project.name, 'txt') |
112 | | - files = file_utils.retrieve_file_list(rts.txt, extension='csv', mask='titles') |
| 88 | + data = {'date': date, |
| 89 | + 'article': article_id, |
| 90 | + 'username': username, |
| 91 | + 'ns': ns, |
| 92 | + 'hash': md5, |
| 93 | + 'revert':revert, |
| 94 | + 'cur_size':cur_size, |
| 95 | + 'delta':delta, |
| 96 | + 'bot':bot |
| 97 | + } |
| 98 | + return data |
113 | 99 | |
114 | | - print 'Storing articles...' |
115 | | - for file in files: |
116 | | - print 'Processing %s...' % file |
117 | | - fh = file_utils.create_txt_filehandle(rts.txt, file, 'r', 'utf-8') |
| 100 | + |
| 101 | +def store_articles(tasks, rts): |
| 102 | + while True: |
| 103 | + filename = tasks.get(block=False) |
| 104 | + if filename == None: |
| 105 | + break |
| 106 | + print 'Processing %s...' % filename |
| 107 | + fh = file_utils.create_txt_filehandle(rts.txt, filename, 'r', 'utf-8') |
118 | 108 | for line in fh: |
119 | 109 | line = line.strip() |
120 | 110 | line = line.split('\t') |
— | — | @@ -132,25 +122,26 @@ |
133 | 123 | print 'Done storing articles...' |
134 | 124 | |
135 | 125 | |
136 | | -def launcher(rts): |
| 126 | +def launcher_articles(rts): |
137 | 127 | ''' |
138 | | - This is the main entry point and creates a number of workers and launches |
139 | | - them. |
| 128 | + This function reads titles.csv and stores it in a separate collection. |
| 129 | + Besides containing the title of an article, it also includes: |
| 130 | + * namespace |
| 131 | + * category (if any) |
| 132 | + * article id |
140 | 133 | ''' |
141 | | - store_articles(rts) |
142 | | - print 'Input directory is: %s ' % rts.sorted |
143 | | - db = storage.Database(rts.storage, rts.dbname, rts.editors_raw) |
| 134 | + db = storage.init_database(rts.storage, rts.dbname, rts.articles_raw) |
144 | 135 | db.drop_collection() |
145 | | - db.add_index('editor') |
| 136 | + db.add_index('id') |
| 137 | + db.add_index('title') |
| 138 | + db.add_index('ns') |
| 139 | + db.add_index('category') |
146 | 140 | |
147 | | - files = file_utils.retrieve_file_list(rts.sorted, 'csv') |
148 | | - pbar = progressbar.ProgressBar(maxval=len(files)).start() |
149 | | - |
| 141 | + files = file_utils.retrieve_file_list(rts.txt, extension='csv', |
| 142 | + mask='articles') |
150 | 143 | tasks = multiprocessing.JoinableQueue() |
151 | | - result = multiprocessing.JoinableQueue() |
152 | 144 | |
153 | | - consumers = [Storer(rts, tasks, result) for |
154 | | - x in xrange(rts.number_of_processes)] |
| 145 | + print 'Storing articles...' |
155 | 146 | |
156 | 147 | for filename in files: |
157 | 148 | tasks.put(filename) |
— | — | @@ -158,24 +149,59 @@ |
159 | 150 | for x in xrange(rts.number_of_processes): |
160 | 151 | tasks.put(None) |
161 | 152 | |
162 | | - for w in consumers: |
163 | | - w.start() |
| 153 | + storers = Process(target=store_articles, args=[tasks, rts] for |
| 154 | + x in xrange(rts.number_of_processes)) |
164 | 155 | |
165 | | - ppills = rts.number_of_processes |
166 | | - while True: |
167 | | - while ppills > 0: |
168 | | - try: |
169 | | - res = result.get(block=False) |
170 | | - if res == True: |
171 | | - pbar.update(pbar.currval + 1) |
172 | | - else: |
173 | | - ppills -= 1 |
174 | | - except Empty: |
175 | | - pass |
176 | | - break |
| 156 | + for storer in storers: |
| 157 | + storer.start() |
177 | 158 | |
178 | 159 | tasks.join() |
179 | 160 | |
180 | 161 | |
| 162 | +def launcher(rts): |
| 163 | + ''' |
| 164 | + This is the main entry point and creates a number of workers and launches |
| 165 | + them. |
| 166 | + ''' |
| 167 | + launcher_articles(rts) |
| 168 | +# print 'Input directory is: %s ' % rts.sorted |
| 169 | +# db = storage.init_database(rts.storage, rts.dbname, rts.editors_raw) |
| 170 | +# db.drop_collection() |
| 171 | +# db.add_index('editor') |
| 172 | +# |
| 173 | +# files = file_utils.retrieve_file_list(rts.sorted, 'csv') |
| 174 | +# pbar = progressbar.ProgressBar(maxval=len(files)).start() |
| 175 | +# |
| 176 | +# tasks = multiprocessing.JoinableQueue() |
| 177 | +# result = multiprocessing.JoinableQueue() |
| 178 | +# |
| 179 | +# storers = [Storer(rts, tasks, result) for |
| 180 | +# x in xrange(rts.number_of_processes)] |
| 181 | +# |
| 182 | +# for filename in files: |
| 183 | +# tasks.put(filename) |
| 184 | +# |
| 185 | +# for x in xrange(rts.number_of_processes): |
| 186 | +# tasks.put(None) |
| 187 | +# |
| 188 | +# for storer in storers: |
| 189 | +# storer.start() |
| 190 | +# |
| 191 | +# ppills = rts.number_of_processes |
| 192 | +# while True: |
| 193 | +# while ppills > 0: |
| 194 | +# try: |
| 195 | +# res = result.get(block=False) |
| 196 | +# if res == True: |
| 197 | +# pbar.update(pbar.currval + 1) |
| 198 | +# else: |
| 199 | +# ppills -= 1 |
| 200 | +# except Empty: |
| 201 | +# pass |
| 202 | +# break |
| 203 | +# |
| 204 | +# tasks.join() |
| 205 | + |
| 206 | + |
181 | 207 | if __name__ == '__main__': |
182 | 208 | pass |