r85688 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r85687‎ | r85688 | r85689 >
Date:20:45, 8 April 2011
Author:diederik
Status:deferred
Tags:
Comment:
Reduced processing time to write output to files significantly.
Modified paths:
  • /trunk/tools/editor_trends/classes/runtime_settings.py (modified) (history)
  • /trunk/tools/editor_trends/classes/settings.py (modified) (history)
  • /trunk/tools/editor_trends/etl/enricher.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/utils/file_utils.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -203,7 +203,7 @@
204204 valid_storage = ['mongo', 'cassandra']
205205 working_directory = raw_input('Please indicate where you installed Wikilytics.\nCurrent location is %s\nPress Enter to accept default.\n' % os.getcwd())
206206 input_location = raw_input('Please indicate where the Wikipedia dump files are or will be located.\nDefault is: %s\nPress Enter to accept default.\n' % rts.input_location)
207 - output_location = raw_input('Please indicate where to store all Wikilytics project files.\nDefault is: %s\nPress Enter to accept default.\n' % rts.output_location)
 207+ base_location = raw_input('Please indicate where to store all Wikilytics project files.\nDefault is: %s\nPress Enter to accept default.\n' % rts.base_location)
208208
209209 while db not in valid_storage:
210210 db = raw_input('Please indicate what database you are using for storage. \nDefault is: Mongo\n')
@@ -224,14 +224,14 @@
225225 language = language if language in rts.project.valid_languages else rts.language.default
226226
227227 input_location = input_location if len(input_location) > 0 else rts.input_location
228 - output_location = output_location if len(output_location) > 0 else rts.output_location
 228+ base_location = base_location if len(base_location) > 0 else rts.base_location
229229 working_directory = working_directory if len(working_directory) > 0 else os.getcwd()
230230
231231 config = ConfigParser.RawConfigParser()
232232 config.add_section('file_locations')
233233 config.set('file_locations', 'working_directory', working_directory)
234234 config.set('file_locations', 'input_location', input_location)
235 - config.set('file_locations', 'output_location', output_location)
 235+ config.set('file_locations', 'base_location', base_location)
236236 config.add_section('wiki')
237237 config.set('wiki', 'project', project)
238238 config.set('wiki', 'language', language)
@@ -246,7 +246,7 @@
247247 config_launcher,
248248 working_directory=working_directory,
249249 input_location=input_location,
250 - output_location=output_location,
 250+ base_location=base_location,
251251 project=project,
252252 language=language,)
253253
Index: trunk/tools/editor_trends/etl/enricher.py
@@ -22,9 +22,10 @@
2323 import hashlib
2424 import codecs
2525 import sys
 26+import itertools
2627 import datetime
2728 import progressbar
28 -from multiprocessing import JoinableQueue, Process, cpu_count, current_process, RLock
 29+from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager
2930 from xml.etree.cElementTree import iterparse, dump
3031 from collections import deque
3132
@@ -77,33 +78,50 @@
7879 108:'Book',
7980 }
8081
81 -class Statistics:
82 - def __init__(self, process_id):
83 - self.process_id = process_id
84 - self.count_articles = 0
85 - self.count_revisions = 0
8682
87 - def summary(self):
88 - print 'Worker %s: Number of articles: %s' % (self.process_id, self.count_articles)
89 - print 'Worker %s: Number of revisions: %s' % (self.process_id, self.count_revisions)
 83+class CustomLock:
 84+ def __init__(self, lock, open_handles):
 85+ self.lock = lock
 86+ self.open_handles = open_handles
9087
 88+ def available(self, handle):
 89+ self.lock.acquire()
 90+ try:
 91+ self.open_handles.index(handle)
 92+ #print 'RETRIEVED FILEHANDLE %s' % handle
 93+ return False
 94+ except (ValueError, Exception), error:
 95+ self.open_handles.append(handle)
 96+ #print 'ADDED FILEHANDLE %s' % handle
 97+ return True
 98+ finally:
 99+ #print 'FIles locked: %s' % len(self.open_handles)
 100+ self.lock.release()
91101
 102+ def release(self, handle):
 103+ #print 'RELEASED FILEHANDLE %s' % handle
 104+ self.open_handles.remove(handle)
 105+
 106+
92107 class Buffer:
93 - def __init__(self, storage, process_id, rts=None, locks=None):
94 - self.storage = storage
 108+ def __init__(self, process_id, rts, lock):
 109+ self.rts = rts
 110+ self.lock = lock
95111 self.revisions = {}
96112 self.comments = {}
97 - self.titles = {}
 113+ self.articles = {}
98114 self.process_id = process_id
 115+ self.count_articles = 0
 116+ self.count_revisions = 0
 117+ self.filehandles = [file_utils.create_txt_filehandle(self.rts.txt,
 118+ file_id, 'a', 'utf-8') for file_id in xrange(self.rts.max_filehandles)]
99119 self.keys = ['revision_id', 'article_id', 'id', 'username', 'namespace',
100120 'title', 'timestamp', 'hash', 'revert', 'bot', 'cur_size',
101121 'delta']
102 - self.stats = Statistics(self.process_id)
103 - if locks != None:
104 - self.rts = rts
105 - self.lock1 = locks[0] #lock for generic data
106 - self.lock2 = locks[1] #lock for comment data
107 - self.lock3 = locks[2] #lock for article titles
 122+ self.fh_articles = file_utils.create_txt_filehandle(self.rts.txt,
 123+ 'articles_%s' % self.process_id, 'w', 'utf-8')
 124+ self.fh_comments = file_utils.create_txt_filehandle(self.rts.txt,
 125+ 'comments_%s' % self.process_id, 'w', 'utf-8')
108126
109127 def get_hash(self, id):
110128 '''
@@ -116,19 +134,38 @@
117135 except ValueError:
118136 return sum([ord(i) for i in id]) % self.rts.max_filehandles
119137
120 - def group_observations(self, revisions):
 138+ def invert_dictionary(self, editors):
 139+ hashes = {}
 140+ for editor, file_id in editors.iteritems():
 141+ hashes.setdefault(file_id, [])
 142+ hashes[file_id].append(editor)
 143+ return hashes
 144+
 145+ def group_revisions_by_fileid(self, revisions):
121146 '''
122 - This function groups observation by editor id, this way we have to make
123 - fewer fileopening calls.
 147+ This function groups observation by editor id and then by file_id,
 148+ this way we have to make fewer file opening calls and should reduce
 149+ processing time.
124150 '''
125151 data = {}
 152+ editors = {}
 153+ #first, we group all revisions by editor
126154 for revision in revisions:
127155 id = revision[0]
128156 if id not in data:
129157 data[id] = []
 158+ editors[id] = self.get_hash(id)
130159 data[id].append(revision)
131 - self.revisions = data
132160
 161+ #now, we are going to group all editors by file_id
 162+ file_ids = self.invert_dictionary(editors)
 163+ revisions = {}
 164+ for editors in file_ids.values():
 165+ for editor in editors:
 166+ revisions.setdefault(editor, [])
 167+ revisions[editor].extend(data[editor])
 168+ self.revisions = revisions
 169+
133170 def add(self, revision):
134171 self.stringify(revision)
135172 id = revision['revision_id']
@@ -136,8 +173,8 @@
137174 if len(self.revisions) > 10000:
138175 #print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions))
139176 self.store()
140 - self.clear()
141177
 178+
142179 def stringify(self, revision):
143180 for key, value in revision.iteritems():
144181 try:
@@ -146,14 +183,10 @@
147184 value = value.encode('utf-8')
148185 revision[key] = value
149186
150 - def empty(self):
151 - self.store()
152 - self.clear()
153187
154 - def clear(self):
155 - self.revisions = {}
156 - self.comments = {}
157 - self.titles = {}
 188+ def summary(self):
 189+ print 'Worker %s: Number of articles: %s' % (self.process_id, self.count_articles)
 190+ print 'Worker %s: Number of revisions: %s' % (self.process_id, self.count_revisions)
158191
159192 def store(self):
160193 rows = []
@@ -162,71 +195,73 @@
163196 for key in self.keys:
164197 values.append(revision[key].decode('utf-8'))
165198 rows.append(values)
166 - self.write_output(rows)
 199+ self.write_revisions(rows)
 200+ self.write_articles()
 201+ self.write_comments()
167202
168 - if self.comments:
169 - self.lock2.acquire()
 203+
 204+ def write_comments(self):
 205+ rows = []
 206+ try:
 207+ for revision_id, comment in self.comments.iteritems():
 208+ #comment = comment.decode('utf-8')
 209+ #row = '\t'.join([revision_id, comment]) + '\n'
 210+ rows.append([revision_id, comment])
 211+ file_utils.write_list_to_csv(rows, self.fh_comments)
 212+ self.comments = {}
 213+ except Exception, error:
 214+ print '''Encountered the following error while writing comment data
 215+ to %s: %s''' % (self.fh_comments, error)
 216+
 217+ def write_articles(self):
 218+ #t0 = datetime.datetime.now()
 219+ if len(self.articles.keys()) > 10000:
 220+ rows = []
170221 try:
171 - fh = file_utils.create_txt_filehandle(self.rts.txt,
172 - 'comments.csv', 'a', 'utf-8')
173 - rows = []
174 - for revision_id, comment in self.comments.iteritems():
175 - #comment = comment.decode('utf-8')
176 - #row = '\t'.join([revision_id, comment]) + '\n'
177 - rows.append([revision_id, comment])
178 - file_utils.write_list_to_csv(row, fh)
179 - except Exception, error:
180 - print 'Encountered the following error while writing data to %s: %s' % (fh, error)
181 - finally:
182 - fh.close()
183 - self.lock2.release()
 222+ for article_id, data in self.articles.iteritems():
 223+ keys = data.keys()
 224+ keys.insert(0, 'id')
184225
185 - elif self.titles:
186 - self.lock3.acquire()
187 - try:
188 - fh = file_utils.create_txt_filehandle(self.rts.txt,
189 - 'titles.csv', 'a', 'utf-8')
190 - rows = []
191 - for article_id, dict in self.titles.iteritems():
192 - keys = dict.keys()
193 - value = []
194 - for key in keys:
195 - value.append(key)
196 - value.append(dict[key])
197 - value.insert(0, article_id)
198 - value.insert(0, 'id')
 226+ values = data.values()
 227+ values.insert(0, article_id)
 228+
 229+ row = zip(keys, values)
 230+ row = list(itertools.chain(*row))
199231 #title = title.encode('ascii')
200232 #row = '\t'.join([article_id, title]) + '\n'
201 - rows.append(value)
202 - file_utils.write_list_to_csv(rows, fh, newline=False)
 233+ rows.append(row)
 234+ file_utils.write_list_to_csv(rows, self.fh_articles, newline=False)
 235+ self.articles = {}
203236 except Exception, error:
204 - print 'Encountered the following error while writing data to %s: %s' % (fh, error)
205 - finally:
206 - fh.close()
207 - self.lock3.release()
 237+ print '''Encountered the following error while writing article
 238+ data to %s: %s''' % (self.fh_articles, error)
 239+ #t1 = datetime.datetime.now()
 240+ #print '%s articles took %s' % (len(self.articles.keys()), (t1 - t0))
208241
209 -
210 - def write_output(self, data):
211 - self.group_observations(data)
212 - for editor in self.revisions:
 242+ def write_revisions(self, data):
 243+ #t0 = datetime.datetime.now()
 244+ self.group_revisions_by_fileid(data)
 245+ editors = self.revisions.keys()
 246+ for editor in editors:
213247 #lock the write around all edits of an editor for a particular page
214 - self.lock1.acquire()
215 - try:
216 - for i, revision in enumerate(self.revisions[editor]):
217 - if i == 0:
218 - id = self.get_hash(revision[2])
219 - fh = file_utils.create_txt_filehandle(self.rts.txt,
220 - '%s.csv' % id, 'a', 'utf-8')
 248+ for i, revision in enumerate(self.revisions[editor]):
 249+ if i == 0:
 250+ file_id = self.get_hash(revision[2])
 251+ if self.lock.available(file_id):
 252+ fh = self.filehandles[file_id]
 253+ #print editor, file_id, fh
 254+ else:
 255+ break
221256 try:
222 - file_utils.write_list_to_csv(revision, fh, lock=self.lock1)
 257+ file_utils.write_list_to_csv(revision, fh)
 258+ self.lock.release(file_id)
 259+ del self.revisions[editor]
223260 except Exception, error:
224 - print 'Encountered the following error while writing data to %s: %s' % (fh, error)
225 - finally:
226 - fh.close()
227 - self.lock1.release()
 261+ print '''Encountered the following error while writing
 262+ revision data to %s: %s''' % (fh, error)
 263+ #t1 = datetime.datetime.now()
 264+ #print '%s revisions took %s' % (len(self.revisions), (t1 - t0))
228265
229 -
230 -
231266 def extract_categories():
232267 '''
233268 Field 1: page id
@@ -559,15 +594,15 @@
560595 namespace = determine_namespace(title, namespaces, include_ns, EXCLUDE_NAMESPACE)
561596 title_meta = parse_title_meta_data(title, namespace)
562597 if namespace != False:
563 - cache.stats.count_articles += 1
 598+ cache.count_articles += 1
564599 article_id = article['id'].text
565600 article['id'].clear()
566 - cache.titles[article_id] = title_meta
 601+ cache.articles[article_id] = title_meta
567602 hashes = deque()
568603 size = {}
569604 revisions = article['revisions']
570605 for revision in revisions:
571 - cache.stats.count_revisions += 1
 606+ cache.count_revisions += 1
572607 if revision == None:
573608 #the entire revision is empty, weird.
574609 continue
@@ -639,20 +674,23 @@
640675 article['namespaces'] = namespaces
641676 id = False
642677 #elif event == 'end' and ns == True:
643 - # elem.clear()
 678+ # elem.clear()
644679 except SyntaxError, error:
645680 print 'Encountered invalid XML tag. Error message: %s' % error
646681 dump(elem)
647682 sys.exit(-1)
 683+ except IOError, error:
 684+ print '''Archive file is possibly corrupted. Please delete this archive
 685+ and retry downloading. Error message: %s''' % error
 686+ sys.exit(-1)
648687
649 -
650 -def stream_raw_xml(input_queue, storage, process_id, function, dataset, locks, rts):
 688+def stream_raw_xml(input_queue, process_id, function, dataset, lock, rts):
651689 bots = bot_detector.retrieve_bots(rts.language.code)
652690
653691 t0 = datetime.datetime.now()
654692 i = 0
655693 if dataset == 'training':
656 - cache = Buffer(storage, process_id, rts, locks)
 694+ cache = Buffer(process_id, rts, lock)
657695 else:
658696 counts = {}
659697
@@ -682,8 +720,8 @@
683721 t0 = t1
684722
685723 if dataset == 'training':
686 - cache.empty()
687 - cache.stats.summary()
 724+ cache.store()
 725+ cache.summary()
688726 else:
689727 location = os.getcwd()
690728 keys = counts.keys()
@@ -698,21 +736,26 @@
699737 print 'Finished parsing Wikipedia dump files.'
700738
701739
702 -def setup(storage, rts=None):
 740+def setup(rts):
703741 '''
704742 Depending on the storage system selected (cassandra, csv or mongo) some
705743 preparations are made including setting up namespaces and cleaning up old
706744 files.
707745 '''
708 - if storage == 'csv':
709 - res = file_utils.delete_file(rts.txt, None, directory=True)
710 - if res:
711 - res = file_utils.create_directory(rts.txt)
 746+ res = file_utils.delete_file(rts.txt, None, directory=True)
 747+ if res:
 748+ res = file_utils.create_directory(rts.txt)
712749
713750
714 -def multiprocessor_launcher(function, dataset, storage, locks, rts):
 751+def multiprocessor_launcher(function, dataset, lock, rts):
 752+ mgr = Manager()
 753+ open_handles = []
 754+ open_handles = mgr.list(open_handles)
 755+ clock = CustomLock(lock, open_handles)
715756 input_queue = JoinableQueue()
 757+
716758 files = file_utils.retrieve_file_list(rts.input_location)
 759+
717760 if len(files) > cpu_count():
718761 processors = cpu_count() - 1
719762 else:
@@ -727,27 +770,26 @@
728771 print 'Inserting poison pill %s...' % x
729772 input_queue.put(None)
730773
731 - extracters = [Process(target=stream_raw_xml, args=[input_queue, storage,
 774+ extracters = [Process(target=stream_raw_xml, args=[input_queue,
732775 process_id, function,
733 - dataset, locks, rts])
 776+ dataset, clock, rts])
734777 for process_id in xrange(processors)]
735778 for extracter in extracters:
736779 extracter.start()
737780
738781 input_queue.join()
 782+ filehandles = [fh.close() for fh in filehandles]
739783
740 -
741784 def launcher_training():
742785 '''
743786 Launcher for creating training dataset for data competition
744787 '''
745788 path = '/mnt/wikipedia_dumps/batch2/'
746789 function = create_variables
747 - storage = 'csv'
748790 dataset = 'training'
749791 rts = DummyRTS(path)
750792 locks = []
751 - multiprocessor_launcher(function, dataset, storage, locks, rts)
 793+ multiprocessor_launcher(function, dataset, locks, rts)
752794
753795
754796 def launcher_prediction():
@@ -756,11 +798,10 @@
757799 '''
758800 path = '/mnt/wikipedia_dumps/batch1/'
759801 function = count_edits
760 - storage = 'csv'
761802 dataset = 'prediction'
762803 rts = DummyRTS(path)
763804 locks = []
764 - multiprocessor_launcher(function, dataset, storage, locks, rts)
 805+ multiprocessor_launcher(function, dataset, locks, rts)
765806
766807
767808 def launcher(rts):
@@ -769,14 +810,10 @@
770811 '''
771812 # launcher for creating regular mongo dataset
772813 function = create_variables
773 - storage = 'csv'
774814 dataset = 'training'
775 - lock1 = RLock()
776 - lock2 = RLock()
777 - lock3 = RLock()
778 - locks = [lock1, lock2, lock3]
779 - setup(storage, rts)
780 - multiprocessor_launcher(function, dataset, storage, locks, rts)
 815+ lock = RLock()
 816+ setup(rts)
 817+ multiprocessor_launcher(function, dataset, lock, rts)
781818
782819
783820 if __name__ == '__main__':
Index: trunk/tools/editor_trends/classes/settings.py
@@ -61,7 +61,7 @@
6262 result = self.load_configuration()
6363 if not result:
6464 self.input_location = os.path.join(self.root, 'wikimedia')
65 - self.output_location = os.path.join(self.root, 'wikimedia')
 65+ self.base_location = os.path.join(self.root, 'wikimedia')
6666
6767 #Date format as used by Erik Zachte
6868 self.date_format = '%Y-%m-%d'
@@ -79,11 +79,9 @@
8080
8181 self.wp_dump_location = 'http://dumps.wikimedia.org'
8282
83 -
8483 self.architecture = platform.machine()
8584 self.tab_width = 4 if self.platform == 'Windows' else 8
8685
87 -
8886 self.update_python_path()
8987
9088 self.max_filehandles = self.determine_max_filehandles_open()
@@ -105,7 +103,7 @@
106104 config.read(os.path.join(self.working_directory, 'wiki.cfg'))
107105 self.working_directory = config.get('file_locations', 'working_directory')
108106 self.input_location = config.get('file_locations', 'input_location')
109 - self.output_location = config.get('file_locations', 'output_location')
 107+ self.base_location = config.get('file_locations', 'base_location')
110108 self.default_project = config.get('wiki', 'project')
111109 self.default_language = config.get('wiki', 'language')
112110 self.storage = config.get('storage', 'db')
Index: trunk/tools/editor_trends/classes/runtime_settings.py
@@ -62,7 +62,6 @@
6363 self.input_location = self.set_input_location()
6464 self.output_location = self.set_output_location()
6565
66 -
6766 self.charts = self.determine_chart()
6867 self.keywords = self.split_keywords()
6968 self.namespaces = self.get_namespaces()
@@ -149,19 +148,20 @@
150149 def set_input_location(self):
151150 files = os.listdir(self.input_location)
152151 extensions = ['.gz', '.7z', '.bz2']
 152+ project = '%s%s' % (self.language.code, self.project.name)
153153 for file in files:
154154 basename, ext = os.path.splitext(file)
155 - if ext in extensions:
 155+ if ext in extensions and file.startswith(project):
156156 #ABS path case: check if files are stored here
157157 return self.input_location
158 - return os.path.join(self.input_location, self.language.code,
 158+ return os.path.join(self.base_location, self.language.code,
159159 self.project.name)
160160
161161 def set_output_location(self):
162162 '''
163163 Construct the full project location
164164 '''
165 - return os.path.join(self.output_location, self.language.code,
 165+ return os.path.join(self.base_location, self.language.code,
166166 self.project.name)
167167
168168 def show_settings(self):
@@ -221,6 +221,7 @@
222222 default = lnc.languages[lnc.default]
223223 if lang != default.name:
224224 lang = lnc.get_language(lang, code=False)
 225+ language = lang
225226 return lang
226227 else:
227228 return default
@@ -234,6 +235,7 @@
235236 if proj != 'wiki':
236237 pc = projects.ProjectContainer()
237238 proj = pc.get_project(proj)
 239+ project = proj
238240 return proj
239241 else:
240242 return default
Index: trunk/tools/editor_trends/utils/file_utils.py
@@ -234,7 +234,7 @@
235235 if hasattr(name, '__call__'):
236236 return '%s%s' % (name.func_name, extension)
237237 else:
238 - return name
 238+ return '%s%s' % (name, extension)
239239
240240
241241 def delete_file(location, filename, directory=False):