Index: trunk/analytics/pipeliner/user_agent.py |
— | — | @@ -17,7 +17,7 @@ |
18 | 18 | self.store = store |
19 | 19 | |
20 | 20 | class UserAgentPipeline(DataPipeline): |
21 | | - def __init__(self, observation_class, filename, process_id): |
| 21 | + def __init__(self, observation_class, filename, process_id, number_of_fields=144): |
22 | 22 | self.start = datetime.now() |
23 | 23 | self.output_counter = 0 |
24 | 24 | self.variables = { |
— | — | @@ -40,9 +40,10 @@ |
41 | 41 | self.filename = filename |
42 | 42 | self.gi = GeoIP.new(GeoIP.GEOIP_MEMORY_CACHE) |
43 | 43 | self.process_id = process_id |
| 44 | + self.number_of_fields = number_of_fields |
44 | 45 | |
45 | 46 | def _prepare_obs(self, obs): |
46 | | - return obs.split(' ') |
| 47 | + return obs.strip().split(' ') |
47 | 48 | |
48 | 49 | def _generate_key(self, vars): |
49 | 50 | value = '_'.join(vars.values()) |
— | — | @@ -116,7 +117,7 @@ |
117 | 118 | |
118 | 119 | def pre_processing(self, line): |
119 | 120 | count = line.count(' ') |
120 | | - if count == 14: |
| 121 | + if count == self.number_of_fields: |
121 | 122 | return line |
122 | 123 | else: |
123 | 124 | return None |
— | — | @@ -138,11 +139,10 @@ |
139 | 140 | print 'Total processing time: %s' % (datetime.now() - self.start) |
140 | 141 | |
141 | 142 | def run(self): |
142 | | - for lines in self.decompress(): |
143 | | - for line in lines: |
144 | | - line = self.pre_processing(line) |
145 | | - if line: |
146 | | - self.aggregate(line) |
| 143 | + for line in self.decompress(): |
| 144 | + line = self.pre_processing(line) |
| 145 | + if line: |
| 146 | + self.aggregate(line) |
147 | 147 | |
148 | 148 | self.post_processing() |
149 | 149 | self.load() |
— | — | @@ -155,7 +155,7 @@ |
156 | 156 | |
157 | 157 | |
158 | 158 | def debug(): |
159 | | - pl = UserAgentPipeline('mobile.log-20110826.gz', UserAgentObservation) |
| 159 | + pl = UserAgentPipeline(UserAgentObservation, 'mobile.log-20110826.gz', 0, 13) |
160 | 160 | pl.run() |
161 | 161 | |
162 | 162 | if __name__ == '__main__': |
Index: trunk/analytics/pipeliner/pipeline.py |
— | — | @@ -1,30 +1,14 @@ |
2 | 2 | from abc import ABCMeta |
3 | | -import zlib |
| 3 | +import subprocess |
4 | 4 | import sys |
5 | 5 | |
6 | | -READ_BLOCK_SIZE = 2**20 |
7 | | - |
8 | | - |
9 | 6 | class DataPipeline(object): |
10 | 7 | __metaclass__ = ABCMeta |
11 | | - |
| 8 | + |
12 | 9 | def decompress(self): |
13 | | - fh = open(self.filename, 'rb') |
14 | | - d = zlib.decompressobj(16+zlib.MAX_WBITS) |
15 | | - data = '' |
16 | | - while True: |
17 | | - raw_data = fh.read(READ_BLOCK_SIZE) |
18 | | - data += d.decompress(raw_data) |
19 | | - if not data: |
20 | | - break |
21 | | - elif not data.endswith('\n'): |
22 | | - position = data.rfind('\n') +1 |
23 | | - lines = data[:position] |
24 | | - lines = lines.split('\n') |
25 | | - data=data[position:] |
26 | | - else: |
27 | | - lines = data |
28 | | - yield lines |
| 10 | + p = subprocess.Popen(['gunzip','-c', self.filename], stdout=subprocess.PIPE, shell=False) |
| 11 | + for line in iter(p.stdout.readline, ""): |
| 12 | + yield line |
29 | 13 | |
30 | 14 | def extract(self): |
31 | 15 | while True: |
Property changes on: trunk/analytics/pipeliner |
___________________________________________________________________ |
Modified: svn:ignore |
32 | 16 | - wurfl-lib |
33 | 17 | + wurfl-lib |
mobile.log-20110826.gz |
output |