Index: trunk/tools/editor_trends/etl/enricher.py |
— | — | @@ -150,21 +150,25 @@ |
151 | 151 | data = {} |
152 | 152 | editors = {} |
153 | 153 | #first, we group all revisions by editor |
| 154 | + |
154 | 155 | for revision in revisions: |
155 | | - id = revision[0] |
156 | | - if id not in data: |
157 | | - data[id] = [] |
158 | | - editors[id] = self.get_hash(id) |
159 | | - data[id].append(revision) |
| 156 | + row = [] |
| 157 | + #strip away the keys and make sure that the values are always in the same sequence |
| 158 | + for key in self.keys: |
| 159 | + row.append(revision[key].decode('utf-8')) |
| 160 | + editor_id = row[0] |
| 161 | + data.setdefault(editor_id, []) |
| 162 | + data[editor_id].append(row) |
| 163 | + editors.setdefault(editor_id, self.get_hash(editor_id)) |
160 | 164 | |
161 | 165 | #now, we are going to group all editors by file_id |
162 | 166 | file_ids = self.invert_dictionary(editors) |
163 | 167 | revisions = {} |
164 | | - for editors in file_ids.values(): |
| 168 | + for file_id, editors in file_ids: |
165 | 169 | for editor in editors: |
166 | | - revisions.setdefault(editor, []) |
167 | | - revisions[editor].extend(data[editor]) |
168 | | - self.revisions = revisions |
| 170 | + revisions.setdefault(file_id, []) |
| 171 | + revisions[file_id].extend(data[editor]) |
| 172 | + return revisions |
169 | 173 | |
170 | 174 | def add(self, revision): |
171 | 175 | self.stringify(revision) |
— | — | @@ -189,13 +193,7 @@ |
190 | 194 | print 'Worker %s: Number of revisions: %s' % (self.process_id, self.count_revisions) |
191 | 195 | |
192 | 196 | def store(self): |
193 | | - rows = [] |
194 | | - for id, revision in self.revisions.iteritems(): |
195 | | - values = [] |
196 | | - for key in self.keys: |
197 | | - values.append(revision[key].decode('utf-8')) |
198 | | - rows.append(values) |
199 | | - self.write_revisions(rows) |
| 197 | + self.write_revisions() |
200 | 198 | self.write_articles() |
201 | 199 | self.write_comments() |
202 | 200 | |
— | — | @@ -230,7 +228,6 @@ |
231 | 229 | |
232 | 230 | row = zip(keys, values) |
233 | 231 | row = list(itertools.chain(*row)) |
234 | | - #title = title.encode('ascii') |
235 | 232 | #row = '\t'.join([article_id, title]) + '\n' |
236 | 233 | rows.append(row) |
237 | 234 | file_utils.write_list_to_csv(rows, self.fh_articles, newline=False) |
— | — | @@ -241,17 +238,16 @@ |
242 | 239 | #t1 = datetime.datetime.now() |
243 | 240 | #print '%s articles took %s' % (len(self.articles.keys()), (t1 - t0)) |
244 | 241 | |
245 | | - def write_revisions(self, data): |
| 242 | + def write_revisions(self): |
246 | 243 | #t0 = datetime.datetime.now() |
247 | | - self.group_revisions_by_fileid(data) |
248 | | - editors = self.revisions.keys() |
249 | | - while len(self.revision.keys()) > 0: |
250 | | - print len(self.revision.keys()) |
251 | | - for editor in editors: |
252 | | - #lock the write around all edits of an editor for a particular page |
253 | | - for i, revision in enumerate(self.revisions[editor]): |
| 244 | + revisions = self.group_revisions_by_fileid() |
| 245 | + file_ids = self.revisions.keys() |
| 246 | + while len(self.revisions.keys()) > 0: |
| 247 | + print len(self.revisions.keys()) |
| 248 | + for file_id in file_ids: |
| 249 | + for i, revision in enumerate(self.revisions[file_id]): |
254 | 250 | if i == 0: |
255 | | - file_id = self.get_hash(revision[2]) |
| 251 | + #file_id = self.get_hash(revision[2]) |
256 | 252 | if self.lock.available(file_id): |
257 | 253 | fh = self.filehandles[file_id] |
258 | 254 | #print editor, file_id, fh |
— | — | @@ -260,7 +256,7 @@ |
261 | 257 | try: |
262 | 258 | file_utils.write_list_to_csv(revision, fh) |
263 | 259 | self.lock.release(file_id) |
264 | | - del self.revisions[editor] |
| 260 | + del self.revisions[file_id] |
265 | 261 | except Exception, error: |
266 | 262 | print '''Encountered the following error while writing |
267 | 263 | revision data to %s: %s''' % (fh, error) |