r86238 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r86237‎ | r86238 | r86239 >
Date:21:36, 16 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
A 'lockless' lock class that makes sure that only one process is writing to a file object without locking the file.
Modified paths:
  • /trunk/tools/editor_trends/classes/buffer.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/classes/buffer.py
@@ -19,40 +19,58 @@
2020 __version__ = '0.1'
2121
2222 import sys
 23+from collections import deque
2324 import itertools
2425 if '..' not in sys.path:
2526 sys.path.append('..')
2627
2728 from utils import file_utils
2829
29 -class CustomLock:
30 - def __init__(self, lock, open_handles):
31 - self.lock = lock
32 - self.open_handles = open_handles
3330
34 - def available(self, handle):
35 - self.lock.acquire()
36 - try:
37 - self.open_handles.index(handle)
38 - #print 'RETRIEVED FILEHANDLE %s' % handle
39 - return False
40 - except (ValueError, Exception), error:
41 - self.open_handles.append(handle)
42 - #print 'ADDED FILEHANDLE %s' % handle
43 - return True
44 - finally:
45 - #print 'FIles locked: %s' % len(self.open_handles)
46 - self.lock.release()
 31+class FileHandleDistributor:
 32+ '''
 33+ Locking a file object is an expensive operation. This class is a lockless
 34+ lock that is very fast and still makes sure that only one process is
 35+ accessing a file object at a time. The logic is as follows: create a deque
 36+ object with the same number of items as you have open filehandles. When a
 37+ process wants to write something to a file then the FileHandleDistributor
 38+ pops and id from the deque. It sees whether it has already issued this id
 39+ to the calling process. If not then it returns this id and the process can
 40+ use the matching file object to this id to write stuff. When finished,
 41+ the process returns the id and it's inserted in the deque again.
 42+ If the id has already been assigned to a process then it puts it straight
 43+ back into the deque and gets the next id.
 44+ '''
 45+ def __init__(self, nr_filehandles, nr_processors):
 46+ self.nr_filehandles = nr_filehandles
 47+ self.nr_processors = nr_processors
 48+ self.x = [i for i in xrange(self.nr_filehandles)]
 49+ self.deque = deque(self.x)
 50+ self.tracker = {}
 51+ for process_id in xrange(self.nr_processors):
 52+ self.tracker[process_id] = {}
4753
48 - def release(self, handle):
49 - #print 'RELEASED FILEHANDLE %s' % handle
50 - self.open_handles.remove(handle)
 54+ def assign_filehandle(self, process_id):
 55+ while True:
 56+ fh = self.deque.popleft()
 57+ processed = self.tracker[process_id].get(fh)
 58+ if processed:
 59+ self.return_filehandle(fh)
 60+ else:
 61+ self.tracker[process_id][fh] = 1
 62+ return fh
5163
 64+ def return_filehandle(self, fh):
 65+ self.deque.append(fh)
5266
 67+ def reset_tracker(self, process_id):
 68+ self.tracker[process_id] = {}
 69+
 70+
5371 class CSVBuffer:
54 - def __init__(self, process_id, rts, lock):
 72+ def __init__(self, process_id, rts, fhd):
5573 self.rts = rts
56 - self.lock = lock
 74+ self.fhd = fhd
5775 self.revisions = {}
5876 self.comments = {}
5977 self.articles = {}
@@ -140,10 +158,11 @@
141159 #row = '\t'.join([revision_id, comment]) + '\n'
142160 rows.append([revision_id, comment])
143161 file_utils.write_list_to_csv(rows, self.fh_comments)
144 - self.comments = {}
145162 except Exception, error:
146163 print '''Encountered the following error while writing comment data
147164 to %s: %s''' % (self.fh_comments, error)
 165+ self.comments = {}
 166+ self.fh_comments.flush()
148167
149168 def write_articles(self):
150169 #t0 = datetime.datetime.now()
@@ -166,34 +185,33 @@
167186 print '''Encountered the following error while writing article
168187 data to %s: %s''' % (self.fh_articles, error)
169188 self.articles = {}
 189+ self.fh_articles.flush()
170190 #t1 = datetime.datetime.now()
171191 #print '%s articles took %s' % (len(self.articles.keys()), (t1 - t0))
172192
173193 def write_revisions(self):
174194 #t0 = datetime.datetime.now()
175195 file_ids = self.revisions.keys()
176 - while len(self.revisions.keys()) != 0:
177 - for file_id in file_ids:
178 - #wait = True
179 - for i, revision in enumerate(self.revisions[file_id]):
180 - if i == 0:
181 - #while wait:
182 - #print file_id, self.lock
183 - if self.lock.available(file_id):
184 - fh = self.filehandles[file_id]
185 - #wait = False
186 - else:
187 - break
188 - try:
189 - file_utils.write_list_to_csv(revision, fh)
190 - except Exception, error:
191 - print '''Encountered the following error while writing
192 - revision data to %s: %s''' % (fh, error)
 196+ while len(file_ids) > 0:
 197+ fh_id = self.fhd.assign_filehandle(self.process_id)
 198+ revisions = self.revisions.get(fh_id, [])
 199+ fh = self.filehandles[fh_id]
 200+ for revision in revisions:
 201+ try:
 202+ file_utils.write_list_to_csv(revision, fh)
 203+ except Exception, error:
 204+ print '''Encountered the following error while writing
 205+ revision data to %s: %s''' % (fh, error)
 206+ fh.flush()
 207+ self.fhd.return_filehandle(fh_id)
 208+ try:
 209+ del self.revisions[fh_id]
 210+ file_ids.remove(fh_id)
 211+ except KeyError:
 212+ pass
193213
194 - self.lock.release(file_id)
195 - del self.revisions[file_id]
196 - #wait = True
197 - print 'Buffer size: %s' % len(self.revisions.keys())
 214+ self.fhd.reset_tracker(self.process_id)
 215+
198216 # t1 = datetime.datetime.now()
199217 # print 'Worker %s: %s revisions took %s' % (self.process_id,
200218 # len([1]),