r109687 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r109686‎ | r109687 | r109688 >
Date:19:07, 21 January 2012
Author:diederik
Status:deferred
Tags:
Comment:
Some massive performance improvements by using gunzip instead of python's zlib library.
Modified paths:
  • /trunk/analytics/pipeliner (modified) (history)
  • /trunk/analytics/pipeliner/pipeline.py (modified) (history)
  • /trunk/analytics/pipeliner/user_agent.py (modified) (history)

Diff [purge]

Index: trunk/analytics/pipeliner/user_agent.py
@@ -17,7 +17,7 @@
1818 self.store = store
1919
2020 class UserAgentPipeline(DataPipeline):
21 - def __init__(self, observation_class, filename, process_id):
 21+ def __init__(self, observation_class, filename, process_id, number_of_fields=144):
2222 self.start = datetime.now()
2323 self.output_counter = 0
2424 self.variables = {
@@ -40,9 +40,10 @@
4141 self.filename = filename
4242 self.gi = GeoIP.new(GeoIP.GEOIP_MEMORY_CACHE)
4343 self.process_id = process_id
 44+ self.number_of_fields = number_of_fields
4445
4546 def _prepare_obs(self, obs):
46 - return obs.split(' ')
 47+ return obs.strip().split(' ')
4748
4849 def _generate_key(self, vars):
4950 value = '_'.join(vars.values())
@@ -116,7 +117,7 @@
117118
118119 def pre_processing(self, line):
119120 count = line.count(' ')
120 - if count == 14:
 121+ if count == self.number_of_fields:
121122 return line
122123 else:
123124 return None
@@ -138,11 +139,10 @@
139140 print 'Total processing time: %s' % (datetime.now() - self.start)
140141
141142 def run(self):
142 - for lines in self.decompress():
143 - for line in lines:
144 - line = self.pre_processing(line)
145 - if line:
146 - self.aggregate(line)
 143+ for line in self.decompress():
 144+ line = self.pre_processing(line)
 145+ if line:
 146+ self.aggregate(line)
147147
148148 self.post_processing()
149149 self.load()
@@ -155,7 +155,7 @@
156156
157157
158158 def debug():
159 - pl = UserAgentPipeline('mobile.log-20110826.gz', UserAgentObservation)
 159+ pl = UserAgentPipeline(UserAgentObservation, 'mobile.log-20110826.gz', 0, 13)
160160 pl.run()
161161
162162 if __name__ == '__main__':
Index: trunk/analytics/pipeliner/pipeline.py
@@ -1,30 +1,14 @@
22 from abc import ABCMeta
3 -import zlib
 3+import subprocess
44 import sys
55
6 -READ_BLOCK_SIZE = 2**20
7 -
8 -
96 class DataPipeline(object):
107 __metaclass__ = ABCMeta
11 -
 8+
129 def decompress(self):
13 - fh = open(self.filename, 'rb')
14 - d = zlib.decompressobj(16+zlib.MAX_WBITS)
15 - data = ''
16 - while True:
17 - raw_data = fh.read(READ_BLOCK_SIZE)
18 - data += d.decompress(raw_data)
19 - if not data:
20 - break
21 - elif not data.endswith('\n'):
22 - position = data.rfind('\n') +1
23 - lines = data[:position]
24 - lines = lines.split('\n')
25 - data=data[position:]
26 - else:
27 - lines = data
28 - yield lines
 10+ p = subprocess.Popen(['gunzip','-c', self.filename], stdout=subprocess.PIPE, shell=False)
 11+ for line in iter(p.stdout.readline, ""):
 12+ yield line
2913
3014 def extract(self):
3115 while True:
Property changes on: trunk/analytics/pipeliner
___________________________________________________________________
Modified: svn:ignore
3216 - wurfl-lib
3317 + wurfl-lib
mobile.log-20110826.gz
output

Status & tagging log