Index: trunk/tools/editor_trends/classes/buffer.py |
— | — | @@ -19,40 +19,58 @@ |
20 | 20 | __version__ = '0.1' |
21 | 21 | |
22 | 22 | import sys |
| 23 | +from collections import deque |
23 | 24 | import itertools |
24 | 25 | if '..' not in sys.path: |
25 | 26 | sys.path.append('..') |
26 | 27 | |
27 | 28 | from utils import file_utils |
28 | 29 | |
29 | | -class CustomLock: |
30 | | - def __init__(self, lock, open_handles): |
31 | | - self.lock = lock |
32 | | - self.open_handles = open_handles |
33 | 30 | |
34 | | - def available(self, handle): |
35 | | - self.lock.acquire() |
36 | | - try: |
37 | | - self.open_handles.index(handle) |
38 | | - #print 'RETRIEVED FILEHANDLE %s' % handle |
39 | | - return False |
40 | | - except (ValueError, Exception), error: |
41 | | - self.open_handles.append(handle) |
42 | | - #print 'ADDED FILEHANDLE %s' % handle |
43 | | - return True |
44 | | - finally: |
45 | | - #print 'FIles locked: %s' % len(self.open_handles) |
46 | | - self.lock.release() |
| 31 | +class FileHandleDistributor: |
| 32 | + ''' |
| 33 | + Locking a file object is an expensive operation. This class is a lockless |
| 34 | + lock that is very fast and still makes sure that only one process is |
| 35 | + accessing a file object at a time. The logic is as follows: create a deque |
| 36 | + object with the same number of items as you have open filehandles. When a |
| 37 | + process wants to write something to a file then the FileHandleDistributor |
| 38 | + pops and id from the deque. It sees whether it has already issued this id |
| 39 | + to the calling process. If not then it returns this id and the process can |
| 40 | + use the matching file object to this id to write stuff. When finished, |
| 41 | + the process returns the id and it's inserted in the deque again. |
| 42 | + If the id has already been assigned to a process then it puts it straight |
| 43 | + back into the deque and gets the next id. |
| 44 | + ''' |
| 45 | + def __init__(self, nr_filehandles, nr_processors): |
| 46 | + self.nr_filehandles = nr_filehandles |
| 47 | + self.nr_processors = nr_processors |
| 48 | + self.x = [i for i in xrange(self.nr_filehandles)] |
| 49 | + self.deque = deque(self.x) |
| 50 | + self.tracker = {} |
| 51 | + for process_id in xrange(self.nr_processors): |
| 52 | + self.tracker[process_id] = {} |
47 | 53 | |
48 | | - def release(self, handle): |
49 | | - #print 'RELEASED FILEHANDLE %s' % handle |
50 | | - self.open_handles.remove(handle) |
| 54 | + def assign_filehandle(self, process_id): |
| 55 | + while True: |
| 56 | + fh = self.deque.popleft() |
| 57 | + processed = self.tracker[process_id].get(fh) |
| 58 | + if processed: |
| 59 | + self.return_filehandle(fh) |
| 60 | + else: |
| 61 | + self.tracker[process_id][fh] = 1 |
| 62 | + return fh |
51 | 63 | |
| 64 | + def return_filehandle(self, fh): |
| 65 | + self.deque.append(fh) |
52 | 66 | |
| 67 | + def reset_tracker(self, process_id): |
| 68 | + self.tracker[process_id] = {} |
| 69 | + |
| 70 | + |
53 | 71 | class CSVBuffer: |
54 | | - def __init__(self, process_id, rts, lock): |
| 72 | + def __init__(self, process_id, rts, fhd): |
55 | 73 | self.rts = rts |
56 | | - self.lock = lock |
| 74 | + self.fhd = fhd |
57 | 75 | self.revisions = {} |
58 | 76 | self.comments = {} |
59 | 77 | self.articles = {} |
— | — | @@ -140,10 +158,11 @@ |
141 | 159 | #row = '\t'.join([revision_id, comment]) + '\n' |
142 | 160 | rows.append([revision_id, comment]) |
143 | 161 | file_utils.write_list_to_csv(rows, self.fh_comments) |
144 | | - self.comments = {} |
145 | 162 | except Exception, error: |
146 | 163 | print '''Encountered the following error while writing comment data |
147 | 164 | to %s: %s''' % (self.fh_comments, error) |
| 165 | + self.comments = {} |
| 166 | + self.fh_comments.flush() |
148 | 167 | |
149 | 168 | def write_articles(self): |
150 | 169 | #t0 = datetime.datetime.now() |
— | — | @@ -166,34 +185,33 @@ |
167 | 186 | print '''Encountered the following error while writing article |
168 | 187 | data to %s: %s''' % (self.fh_articles, error) |
169 | 188 | self.articles = {} |
| 189 | + self.fh_articles.flush() |
170 | 190 | #t1 = datetime.datetime.now() |
171 | 191 | #print '%s articles took %s' % (len(self.articles.keys()), (t1 - t0)) |
172 | 192 | |
173 | 193 | def write_revisions(self): |
174 | 194 | #t0 = datetime.datetime.now() |
175 | 195 | file_ids = self.revisions.keys() |
176 | | - while len(self.revisions.keys()) != 0: |
177 | | - for file_id in file_ids: |
178 | | - #wait = True |
179 | | - for i, revision in enumerate(self.revisions[file_id]): |
180 | | - if i == 0: |
181 | | - #while wait: |
182 | | - #print file_id, self.lock |
183 | | - if self.lock.available(file_id): |
184 | | - fh = self.filehandles[file_id] |
185 | | - #wait = False |
186 | | - else: |
187 | | - break |
188 | | - try: |
189 | | - file_utils.write_list_to_csv(revision, fh) |
190 | | - except Exception, error: |
191 | | - print '''Encountered the following error while writing |
192 | | - revision data to %s: %s''' % (fh, error) |
| 196 | + while len(file_ids) > 0: |
| 197 | + fh_id = self.fhd.assign_filehandle(self.process_id) |
| 198 | + revisions = self.revisions.get(fh_id, []) |
| 199 | + fh = self.filehandles[fh_id] |
| 200 | + for revision in revisions: |
| 201 | + try: |
| 202 | + file_utils.write_list_to_csv(revision, fh) |
| 203 | + except Exception, error: |
| 204 | + print '''Encountered the following error while writing |
| 205 | + revision data to %s: %s''' % (fh, error) |
| 206 | + fh.flush() |
| 207 | + self.fhd.return_filehandle(fh_id) |
| 208 | + try: |
| 209 | + del self.revisions[fh_id] |
| 210 | + file_ids.remove(fh_id) |
| 211 | + except KeyError: |
| 212 | + pass |
193 | 213 | |
194 | | - self.lock.release(file_id) |
195 | | - del self.revisions[file_id] |
196 | | - #wait = True |
197 | | - print 'Buffer size: %s' % len(self.revisions.keys()) |
| 214 | + self.fhd.reset_tracker(self.process_id) |
| 215 | + |
198 | 216 | # t1 = datetime.datetime.now() |
199 | 217 | # print 'Worker %s: %s revisions took %s' % (self.process_id, |
200 | 218 | # len([1]), |