Index: trunk/analytics/pipeliner/user_agent.py |
— | — | @@ -1,9 +1,7 @@ |
2 | | -from abc import ABCMeta |
3 | 2 | from urlparse import urlparse |
4 | 3 | from datetime import datetime |
5 | 4 | from wurfl import devices |
6 | 5 | from pywurfl.algorithms import TwoStepAnalysis |
7 | | -import sys |
8 | 6 | import GeoIP |
9 | 7 | |
10 | 8 | from pipeline import DataPipeline |
— | — | @@ -18,11 +16,10 @@ |
19 | 17 | self.location = location |
20 | 18 | self.store = store |
21 | 19 | |
22 | | -class UserAgentPipeline: |
23 | | - __metaclass__ = ABCMeta |
24 | | - |
25 | | - def __init__(self, queue, observation_class): |
| 20 | +class UserAgentPipeline(DataPipeline): |
| 21 | + def __init__(self, observation_class, filename, process_id): |
26 | 22 | self.start = datetime.now() |
| 23 | + self.output_counter = 0 |
27 | 24 | self.variables = { |
28 | 25 | 'language_code': ['language_code', '_determine_language_code', 8], |
29 | 26 | 'project': ['project', '_determine_project', 8], |
— | — | @@ -40,20 +37,12 @@ |
41 | 38 | |
42 | 39 | self.observations = {} |
43 | 40 | self.observation_class = observation_class |
44 | | - self.queue = queue |
| 41 | + self.filename = filename |
45 | 42 | self.gi = GeoIP.new(GeoIP.GEOIP_MEMORY_CACHE) |
46 | | - |
47 | | - |
48 | | - def extract(self): |
49 | | - while True: |
50 | | - line = sys.stdin.readline() |
51 | | - if not line: |
52 | | - break |
53 | | - self.aggregate(line) |
| 43 | + self.process_id = process_id |
54 | 44 | |
55 | 45 | def _prepare_obs(self, obs): |
56 | | - obs = obs.split(' ') |
57 | | - return obs |
| 46 | + return obs.split(' ') |
58 | 47 | |
59 | 48 | def _generate_key(self, vars): |
60 | 49 | value = '_'.join(vars.values()) |
— | — | @@ -100,6 +89,7 @@ |
101 | 90 | pageviews += obs.count |
102 | 91 | return pageviews |
103 | 92 | |
| 93 | + |
104 | 94 | def transform(self, obs): |
105 | 95 | vars = {} |
106 | 96 | for key in self.vars.keys(): |
— | — | @@ -123,29 +113,51 @@ |
124 | 114 | def post_processing(self): |
125 | 115 | for obs in self.observations.values(): |
126 | 116 | obs.device = devices.select_ua(obs.user_agent, search=search_algorithm) |
127 | | - |
128 | 117 | |
| 118 | + def pre_processing(self, line): |
| 119 | + count = line.count(' ') |
| 120 | + if count == 14: |
| 121 | + return line |
| 122 | + else: |
| 123 | + return None |
| 124 | + |
129 | 125 | def load(self): |
130 | | - obs = self.observations.values() |
131 | | - for o in obs: |
132 | | - print o |
| 126 | + fh = open('output/chunk-%s_process-%s.tsv' % (self.output_counter,self.process_id), 'w') |
| 127 | + observations = self.observations.values() |
| 128 | + for obs in observations: |
| 129 | + try: |
| 130 | + fh.write('%s\t%s\t%s\t%s\t%s\t%s\t%s\n' % (obs.count, obs.device.brand_name, obs.device.model_name, obs.geography, obs.language_code, obs.project, obs.timestamp)) |
| 131 | + except: |
| 132 | + pass |
| 133 | + fh.close() |
| 134 | + self.output_counter+=1 |
133 | 135 | |
134 | 136 | def shutdown(self): |
135 | | - print 'Total number of Observation instances: %s' % (len(self.observations.keys())) |
| 137 | + print 'Total number of observation instances: %s' % (len(self.observations.keys())) |
136 | 138 | print 'Total number of pageviews: %s' % self._total_pageviews() |
137 | 139 | print 'Total processing time: %s' % (datetime.now() - self.start) |
138 | 140 | |
139 | 141 | def run(self): |
140 | | - self.extract() |
| 142 | + for lines in self.decompress(): |
| 143 | + for line in lines: |
| 144 | + line = self.pre_processing(line) |
| 145 | + if line: |
| 146 | + self.aggregate(line) |
| 147 | + |
141 | 148 | self.post_processing() |
142 | 149 | self.load() |
143 | 150 | self.shutdown() |
144 | 151 | |
145 | | -UserAgentPipeline.register(DataPipeline) |
146 | 152 | |
147 | | -def main(queue): |
148 | | - pipeline = UserAgentPipeline(queue, UserAgentObservation) |
| 153 | +def main(filename, process_id): |
| 154 | + pipeline = UserAgentPipeline(UserAgentObservation, filename, process_id) |
149 | 155 | pipeline.run() |
| 156 | + |
| 157 | + |
| 158 | +def debug(): |
| 159 | + pl = UserAgentPipeline('mobile.log-20110826.gz', UserAgentObservation) |
| 160 | + pl.run() |
150 | 161 | |
151 | 162 | if __name__ == '__main__': |
152 | | - main() |
\ No newline at end of file |
| 163 | + #main() |
| 164 | + debug() |
Index: trunk/analytics/pipeliner/launcher.py |
— | — | @@ -1,5 +1,6 @@ |
2 | 2 | import os |
3 | | -from multiprocessing import Process, Queue, cpu_count |
| 3 | +from multiprocessing import Process, JoinableQueue, cpu_count |
| 4 | +from datetime import datetime |
4 | 5 | |
5 | 6 | import argparse |
6 | 7 | |
— | — | @@ -19,14 +20,28 @@ |
20 | 21 | return func |
21 | 22 | |
22 | 23 | |
| 24 | +def start_pipeline(queue, target, process_id): |
| 25 | + while True: |
| 26 | + filename = queue.get() |
| 27 | + queue.task_done() |
| 28 | + if not filename: |
| 29 | + print '%s files left in the queue' % queue.qsize() |
| 30 | + break |
| 31 | + t0 = datetime.now() |
| 32 | + target(filename, process_id) |
| 33 | + t1 = datetime.now() |
| 34 | + print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0)) |
| 35 | + print 'There are %s files left in the queue' % (queue.qsize()) |
23 | 36 | |
| 37 | + |
| 38 | + |
24 | 39 | def main(args): |
25 | 40 | ''' |
26 | 41 | This function initializes the multiprocessor, and loading the queue with |
27 | 42 | files |
28 | 43 | ''' |
29 | 44 | |
30 | | - queue = Queue() |
| 45 | + queue = JoinableQueue() |
31 | 46 | target = _prepare_target(args.pipeline) |
32 | 47 | |
33 | 48 | files = os.listdir(args.source) |
— | — | @@ -48,15 +63,16 @@ |
49 | 64 | print 'Inserting poison pill %s...' % x |
50 | 65 | queue.put(None) |
51 | 66 | |
52 | | - pipelines = [Process(target=target, args=[queue]) |
| 67 | + pipelines = [Process(target=start_pipeline, args=[queue, target, process_id]) |
53 | 68 | for process_id in xrange(processors)] |
54 | 69 | |
55 | | - queue.close() |
| 70 | + |
56 | 71 | for pipeline in pipelines: |
57 | 72 | pipeline.start() |
| 73 | + |
| 74 | + queue.join() |
58 | 75 | |
59 | 76 | |
60 | | - |
61 | 77 | if __name__ == '__main__': |
62 | 78 | parser = argparse.ArgumentParser(description='Generic DataPipeline Cruncher') |
63 | 79 | |
Index: trunk/analytics/pipeliner/pipeline.py |
— | — | @@ -1,7 +1,39 @@ |
2 | | -class DataPipeline(object): |
| 2 | +from abc import ABCMeta |
| 3 | +import zlib |
| 4 | +import sys |
| 5 | + |
| 6 | +READ_BLOCK_SIZE = 1024*8 |
| 7 | + |
| 8 | + |
| 9 | +class DataPipeline(object): |
| 10 | + __metaclass__ = ABCMeta |
| 11 | + |
| 12 | + def decompress(self): |
| 13 | + fh = open(self.filename, 'rb') |
| 14 | + d = zlib.decompressobj(16+zlib.MAX_WBITS) |
| 15 | + data = '' |
| 16 | + while True: |
| 17 | + raw_data = fh.read(READ_BLOCK_SIZE) |
| 18 | + data += d.decompress(raw_data) |
| 19 | + if not data: |
| 20 | + break |
| 21 | + elif not data.endswith('\n'): |
| 22 | + position = data.rfind('\n') +1 |
| 23 | + lines = data[:position] |
| 24 | + lines = lines.split('\n') |
| 25 | + data=data[position:] |
| 26 | + else: |
| 27 | + lines = data |
| 28 | + yield lines |
| 29 | + |
3 | 30 | def extract(self): |
4 | | - return |
| 31 | + while True: |
| 32 | + line = sys.stdin.readline() |
| 33 | + if not line: |
| 34 | + break |
| 35 | + self.aggregate(line) |
5 | 36 | |
| 37 | + |
6 | 38 | def aggregate(self, obs): |
7 | 39 | return |
8 | 40 | |
— | — | @@ -24,3 +56,4 @@ |
25 | 57 | return |
26 | 58 | |
27 | 59 | |
| 60 | + |