Index: trunk/analytics/pipeliner/user_agent.py |
— | — | @@ -1,25 +1,82 @@ |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | +''' |
| 4 | +Copyright (C) 2012 by Diederik van Liere (dvanliere@wikimedia.org) |
| 5 | +This program is free software; you can redistribute it and/or |
| 6 | +modify it under the terms of the GNU General Public License version 2 |
| 7 | +as published by the Free Software Foundation. |
| 8 | +This program is distributed in the hope that it will be useful, |
| 9 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 11 | +See the GNU General Public License for more details, at |
| 12 | +http://www.fsf.org/licenses/gpl.html |
| 13 | +''' |
| 14 | + |
| 15 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@wikimedia.org)', ]) |
| 16 | +__email__ = 'dvanliere at wikimedia dot org' |
| 17 | +__date__ = '2012-01-22' |
| 18 | +__version__ = '0.1' |
| 19 | + |
| 20 | +import gc |
| 21 | +from datetime import datetime |
2 | 22 | from urlparse import urlparse |
3 | | -from datetime import datetime |
4 | 23 | from wurfl import devices |
5 | 24 | from pywurfl.algorithms import TwoStepAnalysis |
6 | 25 | import GeoIP |
7 | 26 | |
8 | 27 | from pipeline import DataPipeline |
9 | | -from dataset import UserAgentObservation |
10 | 28 | |
11 | 29 | search_algorithm = TwoStepAnalysis(devices) |
12 | 30 | |
13 | 31 | class Variable: |
| 32 | + ''' |
| 33 | + This class defines the variables that we want to collect, |
| 34 | + which function we need to call to construct the variable and |
| 35 | + the position of the raw data in a line of data. |
| 36 | + ''' |
14 | 37 | def __init__(self, name, func_name, location, store=True): |
15 | 38 | self.name = name |
16 | 39 | self.func_name = func_name |
17 | 40 | self.location = location |
18 | 41 | self.store = store |
19 | 42 | |
| 43 | +class UserAgentObservation: |
| 44 | + ''' |
| 45 | + This class aggregates observations for a particular combination of |
| 46 | + project, language, browser, country, timestamp. |
| 47 | + ''' |
| 48 | + def __init__(self, **kwargs): |
| 49 | + self.count = 0 |
| 50 | + for key, value in kwargs.iteritems(): |
| 51 | + setattr(self, key, value) |
| 52 | + |
| 53 | + def __str__(self): |
| 54 | + if self.device: |
| 55 | + return '%s observations using %s:%s in %s for %s%s on %s' % (self.count, self.device, self.geography, self.language_code, self.project, self.timestamp) |
| 56 | + else: |
| 57 | + return '%s observations in %s for %s%s on %s' % (self.count, self.geography, self.language_code, self.project, self.timestamp) |
| 58 | + |
| 59 | + |
20 | 60 | class UserAgentPipeline(DataPipeline): |
21 | | - def __init__(self, observation_class, filename, process_id, number_of_fields=144): |
| 61 | + ''' |
| 62 | + This class does all the heavy lifting of aggregating observations, writing output |
| 63 | + to a file, and do some simple logging |
| 64 | + ''' |
| 65 | + def __init__(self, observation_class, filename, process_id, number_of_fields=14): |
22 | 66 | self.start = datetime.now() |
23 | | - self.output_counter = 0 |
| 67 | + self.skipped_records = 0 |
| 68 | + self.http_valid_codes = {'200':1,'304':1, '302':1} |
| 69 | + self.observations = {} #weakref.WeakValueDictionary() |
| 70 | + self.http_invalid_codes = {} |
| 71 | + self.invalid_user_agents = {} |
| 72 | + self.keys = set() |
| 73 | + self.url = None |
| 74 | + self.filename = filename |
| 75 | + self.process_id = process_id |
| 76 | + self.number_of_fields = number_of_fields |
| 77 | + self.observation_class = observation_class |
| 78 | + self.gi = GeoIP.new(GeoIP.GEOIP_MEMORY_CACHE) |
| 79 | + self.error_vars = ['http_invalid_codes', 'invalid_user_agents', 'skipped_records'] |
| 80 | + |
24 | 81 | self.variables = { |
25 | 82 | 'language_code': ['language_code', '_determine_language_code', 8], |
26 | 83 | 'project': ['project', '_determine_project', 8], |
— | — | @@ -27,36 +84,40 @@ |
28 | 85 | 'timestamp': ['timestamp', '_determine_date', 2], |
29 | 86 | 'user_agent': ['user_agent', '_convert_to_unicode', 13], |
30 | 87 | } |
31 | | - if self.variables == {}: |
32 | | - raise Exception('''You have to define both a mapping of your |
33 | | - properties and the functions that generate them and a mapping |
34 | | - of your raw input data and their position in a raw data line.''') |
35 | | - self.vars= {} |
| 88 | + self.props= {} |
36 | 89 | for key, value in self.variables.iteritems(): |
37 | | - self.vars[key]= Variable(*value) |
38 | | - |
39 | | - self.observations = {} |
40 | | - self.observation_class = observation_class |
41 | | - self.filename = filename |
42 | | - self.gi = GeoIP.new(GeoIP.GEOIP_MEMORY_CACHE) |
43 | | - self.process_id = process_id |
44 | | - self.number_of_fields = number_of_fields |
| 90 | + self.props[key]= Variable(*value) |
45 | 91 | |
46 | 92 | def _prepare_obs(self, obs): |
47 | | - return obs.strip().split(' ') |
| 93 | + ''' |
| 94 | + Obs is the raw untouched input from stdin. This function prepares it for |
| 95 | + subsequent analysis. |
| 96 | + ''' |
| 97 | + if type(obs) != type([]): |
| 98 | + return obs.strip().split(' ') |
| 99 | + else: |
| 100 | + return obs |
48 | 101 | |
49 | | - def _generate_key(self, vars): |
50 | | - value = '_'.join(vars.values()) |
| 102 | + def _parse_url(self, url): |
| 103 | + self.url = urlparse(url) |
| 104 | + |
| 105 | + def _generate_key(self, props): |
| 106 | + value = '_'.join(props.values()) |
51 | 107 | return hash(value) |
52 | 108 | |
53 | 109 | def _determine_language_code(self, url): |
54 | | - url = urlparse(url) |
55 | | - return url.netloc.split('.')[0] |
| 110 | + domain = self.url.netloc.split('.') |
| 111 | + if len(domain) != 2: |
| 112 | + return domain[0] |
| 113 | + else: |
| 114 | + return 'en' |
56 | 115 | |
57 | 116 | def _determine_project(self, url): |
58 | | - url = urlparse(url) |
59 | | - return url.netloc.split('.')[-2] |
60 | | - |
| 117 | + try: |
| 118 | + return self.url.netloc.split('.')[-2] |
| 119 | + except IndexError: |
| 120 | + return 'unknown_project' |
| 121 | + |
61 | 122 | def _determine_geography(self, ip): |
62 | 123 | geography = self.gi.country_code_by_addr(ip) |
63 | 124 | if not geography: |
— | — | @@ -70,15 +131,16 @@ |
71 | 132 | ''' |
72 | 133 | return timestamp[:10] |
73 | 134 | |
74 | | - def _add_observation(self, key, vars): |
| 135 | + def _add_observation(self, key, props): |
75 | 136 | ''' |
76 | | - Vars should be a dictionary |
| 137 | + props should be a dictionary and here the actual aggregation happens. |
77 | 138 | ''' |
78 | | - if not isinstance(vars, dict): |
| 139 | + if not isinstance(props, dict): |
79 | 140 | raise Exception('You have to feed an instance of a Datamodel a dictionary.') |
80 | 141 | |
81 | | - obs = self.observations.get(key, UserAgentObservation(**vars)) |
| 142 | + obs = self.observations.get(key, UserAgentObservation(**props)) |
82 | 143 | obs.count +=1 |
| 144 | + self.keys.add(key) |
83 | 145 | self.observations[key] =obs |
84 | 146 | |
85 | 147 | def _convert_to_unicode(self, obs): |
— | — | @@ -90,62 +152,120 @@ |
91 | 153 | pageviews += obs.count |
92 | 154 | return pageviews |
93 | 155 | |
| 156 | + def _determine_http_code(self, line): |
| 157 | + #filter out all invalid http requests |
| 158 | + status_code = line[5][-3:] |
| 159 | + res = self.http_valid_codes.get(status_code, None) |
| 160 | + if res: |
| 161 | + return line |
| 162 | + else: |
| 163 | + self.http_invalid_codes.setdefault(status_code, 0) |
| 164 | + self.http_invalid_codes[status_code] +=1 |
| 165 | + return None |
94 | 166 | |
| 167 | + def _free_memory(self): |
| 168 | + ''' |
| 169 | + This function is no longer used. |
| 170 | + ''' |
| 171 | + for value in self.observations.values(): |
| 172 | + del value |
| 173 | + self.observations = {} |
| 174 | + gc.collect() |
| 175 | + |
| 176 | + def _write_error_log(self): |
| 177 | + for prop_name in self.error_vars: |
| 178 | + fh = open('logs/diag-%s-%s.tsv' % (self.process_id, prop_name), 'w') |
| 179 | + prop = getattr(self, prop_name) |
| 180 | + if type(prop) == type({}): |
| 181 | + if prop != {}: #it doesn't make sense to write an empty dictionary to a file. |
| 182 | + for key, value in prop.iteritems(): |
| 183 | + fh.write('%s\t%s\n' %(key, value)) |
| 184 | + else: |
| 185 | + fh.write('%s\t%s\n' % (prop_name, prop)) |
| 186 | + fh.close() |
| 187 | + |
| 188 | + def _is_device_mobile(self, device): |
| 189 | + return device.is_wireless_device |
| 190 | + |
95 | 191 | def transform(self, obs): |
96 | | - vars = {} |
97 | | - for key in self.vars.keys(): |
| 192 | + props = {} |
| 193 | + for key in self.props.keys(): |
98 | 194 | try: |
99 | | - func_name = getattr(self.vars[key], 'func_name') |
100 | | - location = getattr(self.vars[key], 'location') |
| 195 | + func_name = getattr(self.props[key], 'func_name') |
| 196 | + location = getattr(self.props[key], 'location') |
| 197 | + if location == 8: |
| 198 | + res = self._parse_url(obs[location]) |
| 199 | + if res == False: |
| 200 | + print obs |
101 | 201 | if func_name: |
102 | 202 | func = getattr(self, func_name) |
103 | | - except: |
104 | | - raise Exception('You have not defined function %s' % (func_name)) |
105 | | - #print key, obs[location] |
106 | | - vars[key] = func(obs[location]) |
107 | | - return vars |
| 203 | + props[key] = func(obs[location]) |
| 204 | + except Exception, e: |
| 205 | + print len(obs), location |
| 206 | + print func_name |
| 207 | + print obs |
| 208 | + print props |
| 209 | + raise Exception(e) |
| 210 | + return props |
108 | 211 | |
109 | 212 | def aggregate(self, obs): |
110 | 213 | obs = self._prepare_obs(obs) |
111 | | - vars = self.transform(obs) |
112 | | - key = self._generate_key(vars) |
113 | | - self._add_observation(key, vars) |
| 214 | + props = self.transform(obs) |
| 215 | + key = self._generate_key(props) |
| 216 | + self._add_observation(key, props) |
114 | 217 | |
115 | 218 | def post_processing(self): |
116 | 219 | for obs in self.observations.values(): |
117 | | - obs.device = devices.select_ua(obs.user_agent, search=search_algorithm) |
| 220 | + try: |
| 221 | + device = devices.select_ua(obs.user_agent, search=search_algorithm) |
| 222 | + obs.device = '%s:%s' % (device.brand_name, device.model_name) |
| 223 | + obs.mobile = self._is_device_mobile(device) |
| 224 | + except (ValueError, UnicodeDecodeError): |
| 225 | + obs.device = '%s:%s' % ('generic', 'unknown_device') |
| 226 | + obs.mobile = True |
| 227 | + self.invalid_user_agents.setdefault(obs.user_agent,0) |
| 228 | + self.invalid_user_agents[obs.user_agent] +=1 |
118 | 229 | |
119 | 230 | def pre_processing(self, line): |
120 | | - count = line.count(' ') |
121 | | - if count == self.number_of_fields: |
122 | | - return line |
| 231 | + line = self._prepare_obs(line) |
| 232 | + if len(line) == (self.number_of_fields): |
| 233 | + return self._determine_http_code(line) |
123 | 234 | else: |
124 | 235 | return None |
125 | 236 | |
126 | 237 | def load(self): |
127 | | - fh = open('output/chunk-%s_process-%s.tsv' % (self.output_counter,self.process_id), 'w') |
| 238 | + fh = open('output/chunk-%s.tsv' % (self.process_id), 'w') |
128 | 239 | observations = self.observations.values() |
129 | 240 | for obs in observations: |
130 | 241 | try: |
131 | | - fh.write('%s\t%s\t%s\t%s\t%s\t%s\t%s\n' % (obs.count, obs.device.brand_name, obs.device.model_name, obs.geography, obs.language_code, obs.project, obs.timestamp)) |
| 242 | + fh.write('%s\t%s\t%s\t%s\t%s\t%s\n' % (obs.count, obs.device, obs.geography, obs.language_code, obs.project, obs.timestamp)) |
132 | 243 | except: |
133 | 244 | pass |
134 | 245 | fh.close() |
135 | | - self.output_counter+=1 |
136 | 246 | |
| 247 | + def diagnostics(self): |
| 248 | + self._write_error_log() |
| 249 | + print 'Worker %s: Processing of %s took %s' % (self.process_id, self.filename, (datetime.now() - self.start)) |
| 250 | + print 'Total number of skipped records: %s' % self.skipped_records |
| 251 | + print 'Total number of invalid http requests: %s' % sum(self.http_invalid_codes.values()) |
| 252 | + print 'Total number of invalid user agents: %s' % sum(self.invalid_user_agents.values()) |
| 253 | + |
137 | 254 | def shutdown(self): |
138 | 255 | print 'Total number of observation instances: %s' % (len(self.observations.keys())) |
139 | 256 | print 'Total number of pageviews: %s' % self._total_pageviews() |
140 | | - print 'Total processing time: %s' % (datetime.now() - self.start) |
141 | | - |
| 257 | + #self._free_memory() |
| 258 | + |
142 | 259 | def run(self): |
143 | 260 | for line in self.decompress(): |
144 | 261 | line = self.pre_processing(line) |
145 | 262 | if line: |
146 | 263 | self.aggregate(line) |
| 264 | + else: |
| 265 | + self.skipped_records +=1 |
147 | 266 | |
148 | 267 | self.post_processing() |
149 | 268 | self.load() |
| 269 | + self.diagnostics() |
150 | 270 | self.shutdown() |
151 | 271 | |
152 | 272 | |
— | — | @@ -155,8 +275,8 @@ |
156 | 276 | |
157 | 277 | |
158 | 278 | def debug(): |
159 | | - pl = UserAgentPipeline(UserAgentObservation, 'mobile.log-20110826.gz', 0, 13) |
160 | | - pl.run() |
| 279 | + pipeline = UserAgentPipeline(UserAgentObservation, 'mobile.log-20110826.gz', 0, 14) |
| 280 | + pipeline.run() |
161 | 281 | |
162 | 282 | if __name__ == '__main__': |
163 | 283 | #main() |
Index: trunk/analytics/pipeliner/launcher.py |
— | — | @@ -1,16 +1,36 @@ |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | + |
| 4 | +''' |
| 5 | +Copyright (C) 2012 by Diederik van Liere (dvanliere@wikimedia.org) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@wikimedia.org)', ]) |
| 17 | +__email__ = 'dvanliere at wikimedia dot org' |
| 18 | +__date__ = '2012-01-22' |
| 19 | +__version__ = '0.1' |
| 20 | + |
| 21 | + |
2 | 22 | import os |
3 | 23 | from multiprocessing import Process, JoinableQueue, cpu_count |
4 | | -from datetime import datetime |
| 24 | +from time import sleep |
5 | 25 | |
6 | 26 | import argparse |
7 | 27 | |
8 | | - |
9 | 28 | def _check_folder_exists(path): |
10 | 29 | if os.path.exists(path): |
11 | 30 | return path |
12 | 31 | else: |
13 | 32 | raise Exception('Please enter a valid source.') |
14 | 33 | |
| 34 | + |
15 | 35 | def _prepare_target(pipeline): |
16 | 36 | try: |
17 | 37 | module = __import__('%s' % pipeline) |
— | — | @@ -20,59 +40,81 @@ |
21 | 41 | return func |
22 | 42 | |
23 | 43 | |
24 | | -def start_pipeline(queue, target, process_id): |
25 | | - while True: |
26 | | - filename = queue.get() |
27 | | - queue.task_done() |
28 | | - if not filename: |
29 | | - print '%s files left in the queue' % queue.qsize() |
30 | | - break |
31 | | - t0 = datetime.now() |
32 | | - target(filename, process_id) |
33 | | - t1 = datetime.now() |
34 | | - print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0)) |
35 | | - print 'There are %s files left in the queue' % (queue.qsize()) |
| 44 | +def launch_pipeline(target, filename, pid): |
| 45 | + return Process(target=target, args=[filename, pid]) |
36 | 46 | |
37 | | - |
38 | | - |
39 | | -def main(args): |
| 47 | + |
| 48 | +def determine_number_of_files(args, ext): |
40 | 49 | ''' |
41 | | - This function initializes the multiprocessor, and loading the queue with |
42 | | - files |
| 50 | + Count the total number of files that need to be processed. |
43 | 51 | ''' |
| 52 | + files = os.listdir(args.source) |
| 53 | + nr_files = 0 |
| 54 | + for filename in files: |
| 55 | + if filename.endswith(ext): |
| 56 | + nr_files +=1 |
| 57 | + return nr_files |
44 | 58 | |
| 59 | + |
| 60 | +def determine_number_of_processors(nr_files): |
| 61 | + ''' |
| 62 | + Determine the maximum number of processes that can be launched. |
| 63 | + To prevent hammering of the server, it always leaves 1 core |
| 64 | + unused. |
| 65 | + ''' |
| 66 | + if nr_files > cpu_count(): |
| 67 | + return cpu_count() - 1 |
| 68 | + else: |
| 69 | + return nr_files |
| 70 | + |
| 71 | + |
| 72 | +def fill_queue(args, nr_processors, ext): |
| 73 | + ''' |
| 74 | + Fill the queue with the full path to the files that need to be processed |
| 75 | + ''' |
45 | 76 | queue = JoinableQueue() |
46 | | - target = _prepare_target(args.pipeline) |
| 77 | + files = os.listdir(args.source) |
47 | 78 | |
48 | | - files = os.listdir(args.source) |
49 | | - |
50 | 79 | count_files=0 |
51 | 80 | for filename in files: |
52 | 81 | filename = os.path.join(args.source, filename) |
53 | | - if filename.endswith('gz'): |
| 82 | + if filename.endswith(ext): |
54 | 83 | print filename |
55 | 84 | count_files+=1 |
56 | 85 | queue.put(filename) |
57 | | - |
58 | | - if count_files > cpu_count(): |
59 | | - processors = cpu_count() - 1 |
60 | | - else: |
61 | | - processors = count_files |
62 | | - |
63 | | - for x in xrange(processors): |
| 86 | + for x in xrange(nr_processors): |
64 | 87 | print 'Inserting poison pill %s...' % x |
65 | 88 | queue.put(None) |
66 | 89 | |
67 | | - pipelines = [Process(target=start_pipeline, args=[queue, target, process_id]) |
68 | | - for process_id in xrange(processors)] |
| 90 | + return queue |
| 91 | + |
69 | 92 | |
70 | | - |
71 | | - for pipeline in pipelines: |
72 | | - pipeline.start() |
73 | | - |
74 | | - queue.join() |
| 93 | +def main(args): |
| 94 | + ''' |
| 95 | + Main function that will fill the queue and launch the processes |
| 96 | + ''' |
| 97 | + ext= 'gz' |
| 98 | + target = _prepare_target(args.pipeline) |
| 99 | + pipelines = [] |
| 100 | + pid=0 |
| 101 | + nr_files = determine_number_of_files(args, ext) |
| 102 | + nr_processors = determine_number_of_processors(nr_files) |
| 103 | + queue= fill_queue(args, nr_processors, ext) |
| 104 | + while nr_files != 0: |
| 105 | + while sum(pipeline.is_alive() for pipeline in pipelines) != nr_processors: |
| 106 | + filename = queue.get() |
| 107 | + queue.task_done() |
| 108 | + if filename: |
| 109 | + p =launch_pipeline(target, filename, pid) |
| 110 | + p.start() |
| 111 | + pipelines.append(p) |
| 112 | + pid+=1 |
| 113 | + nr_files -=1 |
| 114 | + print 'There are %s files left in the queue' % (nr_files) |
| 115 | + sleep(30) |
75 | 116 | |
76 | 117 | |
| 118 | + |
77 | 119 | if __name__ == '__main__': |
78 | 120 | parser = argparse.ArgumentParser(description='Generic DataPipeline Cruncher') |
79 | 121 | |
Index: trunk/analytics/pipeliner/dataset.py |
— | — | @@ -1,52 +1,39 @@ |
2 | | -from abc import ABCMeta |
| 2 | +# -*- coding: utf-8 -*- |
3 | 3 | |
| 4 | +''' |
| 5 | +Copyright (C) 2012 by Diederik van Liere (dvanliere@wikimedia.org) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@wikimedia.org)', ]) |
| 17 | +__email__ = 'dvanliere at wikimedia dot org' |
| 18 | +__date__ = '2012-01-22' |
| 19 | +__version__ = '0.1' |
| 20 | + |
| 21 | + |
4 | 22 | class Observation(object): |
5 | 23 | def __init__(self, *args): |
6 | 24 | self.count = 0 |
7 | 25 | |
8 | | -# class Datamodel(object): |
9 | | -# def __init__(self, *args): |
10 | | -# self.obs = {} |
11 | | -# |
12 | | -# def add_observation(self, key, obs): |
13 | | -# return |
14 | | - |
15 | 26 | |
16 | | -class UserAgentObservation: |
17 | | - __metaclass__ = ABCMeta |
18 | | - |
| 27 | +class UserAgentObservation(object): |
| 28 | + |
19 | 29 | def __init__(self, **kwargs): |
20 | | - self.count=0 |
| 30 | + self.count = 0 |
21 | 31 | for key, value in kwargs.iteritems(): |
22 | 32 | setattr(self, key, value) |
23 | 33 | |
24 | 34 | def __str__(self): |
25 | 35 | if self.device: |
26 | | - return '%s observations using %s:%s in %s for %s%s on %s' % (self.count, self.device.brand_name, self.device.model_name, self.geography, self.language_code, self.project, self.timestamp) |
| 36 | + return '%s observations using %s:%s in %s for %s%s on %s' % (self.count, self.device, self.geography, self.language_code, self.project, self.timestamp) |
27 | 37 | else: |
28 | 38 | return '%s observations in %s for %s%s on %s' % (self.count, self.geography, self.language_code, self.project, self.timestamp) |
29 | 39 | |
30 | | -# class UserAgentDatamodel(object): |
31 | | -# __metaclass__ = ABCMeta |
32 | | -# |
33 | | -# def __init__(self, full_string, key): |
34 | | -# self.full_string = full_string |
35 | | -# self.key = key |
36 | | -# |
37 | | -# def __str__(self): |
38 | | -# return self.full_string |
39 | | -# |
40 | | -# def add_observation(self, key, vars): |
41 | | -# ''' |
42 | | -# Vars should be a dictionary |
43 | | -# ''' |
44 | | -# if not isinstance(vars, dict): |
45 | | -# raise Exception('You have to feed an instance of a Datamodel a dictionary.') |
46 | | -# |
47 | | -# obs = self.obs.get(key, UserAgentObservation(vars)) |
48 | | -# obs.count +=1 |
49 | | -# self.obs[key] =obs |
50 | | - |
51 | | -UserAgentObservation.register(Observation) |
52 | | - |
53 | 40 | |
Index: trunk/analytics/pipeliner/pipeline.py |
— | — | @@ -1,3 +1,23 @@ |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | + |
| 4 | +''' |
| 5 | +Copyright (C) 2012 by Diederik van Liere (dvanliere@wikimedia.org) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@wikimedia.org)', ]) |
| 17 | +__email__ = 'dvanliere at wikimedia dot org' |
| 18 | +__date__ = '2012-01-22' |
| 19 | +__version__ = '0.1' |
| 20 | + |
| 21 | + |
2 | 22 | from abc import ABCMeta |
3 | 23 | import subprocess |
4 | 24 | import sys |
— | — | @@ -6,6 +26,10 @@ |
7 | 27 | __metaclass__ = ABCMeta |
8 | 28 | |
9 | 29 | def decompress(self): |
| 30 | + ''' |
| 31 | + Don't use the internal zlib library from Python, it's super slow. |
| 32 | + This might make this script Linux/OSX only. |
| 33 | + ''' |
10 | 34 | p = subprocess.Popen(['gunzip','-c', self.filename], stdout=subprocess.PIPE, shell=False) |
11 | 35 | for line in iter(p.stdout.readline, ""): |
12 | 36 | yield line |