r109501 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r109500‎ | r109501 | r109502 >
Date:05:30, 19 January 2012
Author:diederik
Status:deferred
Tags:
Comment:
Fixes for stat1
Modified paths:
  • /trunk/analytics/pipeliner/launcher.py (modified) (history)
  • /trunk/analytics/pipeliner/pipeline.py (modified) (history)
  • /trunk/analytics/pipeliner/user_agent.py (modified) (history)

Diff [purge]

Index: trunk/analytics/pipeliner/user_agent.py
@@ -1,9 +1,7 @@
2 -from abc import ABCMeta
32 from urlparse import urlparse
43 from datetime import datetime
54 from wurfl import devices
65 from pywurfl.algorithms import TwoStepAnalysis
7 -import sys
86 import GeoIP
97
108 from pipeline import DataPipeline
@@ -18,11 +16,10 @@
1917 self.location = location
2018 self.store = store
2119
22 -class UserAgentPipeline:
23 - __metaclass__ = ABCMeta
24 -
25 - def __init__(self, queue, observation_class):
 20+class UserAgentPipeline(DataPipeline):
 21+ def __init__(self, observation_class, filename, process_id):
2622 self.start = datetime.now()
 23+ self.output_counter = 0
2724 self.variables = {
2825 'language_code': ['language_code', '_determine_language_code', 8],
2926 'project': ['project', '_determine_project', 8],
@@ -40,20 +37,12 @@
4138
4239 self.observations = {}
4340 self.observation_class = observation_class
44 - self.queue = queue
 41+ self.filename = filename
4542 self.gi = GeoIP.new(GeoIP.GEOIP_MEMORY_CACHE)
46 -
47 -
48 - def extract(self):
49 - while True:
50 - line = sys.stdin.readline()
51 - if not line:
52 - break
53 - self.aggregate(line)
 43+ self.process_id = process_id
5444
5545 def _prepare_obs(self, obs):
56 - obs = obs.split(' ')
57 - return obs
 46+ return obs.split(' ')
5847
5948 def _generate_key(self, vars):
6049 value = '_'.join(vars.values())
@@ -100,6 +89,7 @@
10190 pageviews += obs.count
10291 return pageviews
10392
 93+
10494 def transform(self, obs):
10595 vars = {}
10696 for key in self.vars.keys():
@@ -123,29 +113,51 @@
124114 def post_processing(self):
125115 for obs in self.observations.values():
126116 obs.device = devices.select_ua(obs.user_agent, search=search_algorithm)
127 -
128117
 118+ def pre_processing(self, line):
 119+ count = line.count(' ')
 120+ if count == 14:
 121+ return line
 122+ else:
 123+ return None
 124+
129125 def load(self):
130 - obs = self.observations.values()
131 - for o in obs:
132 - print o
 126+ fh = open('output/chunk-%s_process-%s.tsv' % (self.output_counter,self.process_id), 'w')
 127+ observations = self.observations.values()
 128+ for obs in observations:
 129+ try:
 130+ fh.write('%s\t%s\t%s\t%s\t%s\t%s\t%s\n' % (obs.count, obs.device.brand_name, obs.device.model_name, obs.geography, obs.language_code, obs.project, obs.timestamp))
 131+ except:
 132+ pass
 133+ fh.close()
 134+ self.output_counter+=1
133135
134136 def shutdown(self):
135 - print 'Total number of Observation instances: %s' % (len(self.observations.keys()))
 137+ print 'Total number of observation instances: %s' % (len(self.observations.keys()))
136138 print 'Total number of pageviews: %s' % self._total_pageviews()
137139 print 'Total processing time: %s' % (datetime.now() - self.start)
138140
139141 def run(self):
140 - self.extract()
 142+ for lines in self.decompress():
 143+ for line in lines:
 144+ line = self.pre_processing(line)
 145+ if line:
 146+ self.aggregate(line)
 147+
141148 self.post_processing()
142149 self.load()
143150 self.shutdown()
144151
145 -UserAgentPipeline.register(DataPipeline)
146152
147 -def main(queue):
148 - pipeline = UserAgentPipeline(queue, UserAgentObservation)
 153+def main(filename, process_id):
 154+ pipeline = UserAgentPipeline(UserAgentObservation, filename, process_id)
149155 pipeline.run()
 156+
 157+
 158+def debug():
 159+ pl = UserAgentPipeline('mobile.log-20110826.gz', UserAgentObservation)
 160+ pl.run()
150161
151162 if __name__ == '__main__':
152 - main()
\ No newline at end of file
 163+ #main()
 164+ debug()
Index: trunk/analytics/pipeliner/launcher.py
@@ -1,5 +1,6 @@
22 import os
3 -from multiprocessing import Process, Queue, cpu_count
 3+from multiprocessing import Process, JoinableQueue, cpu_count
 4+from datetime import datetime
45
56 import argparse
67
@@ -19,14 +20,28 @@
2021 return func
2122
2223
 24+def start_pipeline(queue, target, process_id):
 25+ while True:
 26+ filename = queue.get()
 27+ queue.task_done()
 28+ if not filename:
 29+ print '%s files left in the queue' % queue.qsize()
 30+ break
 31+ t0 = datetime.now()
 32+ target(filename, process_id)
 33+ t1 = datetime.now()
 34+ print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0))
 35+ print 'There are %s files left in the queue' % (queue.qsize())
2336
 37+
 38+
2439 def main(args):
2540 '''
2641 This function initializes the multiprocessor, and loading the queue with
2742 files
2843 '''
2944
30 - queue = Queue()
 45+ queue = JoinableQueue()
3146 target = _prepare_target(args.pipeline)
3247
3348 files = os.listdir(args.source)
@@ -48,15 +63,16 @@
4964 print 'Inserting poison pill %s...' % x
5065 queue.put(None)
5166
52 - pipelines = [Process(target=target, args=[queue])
 67+ pipelines = [Process(target=start_pipeline, args=[queue, target, process_id])
5368 for process_id in xrange(processors)]
5469
55 - queue.close()
 70+
5671 for pipeline in pipelines:
5772 pipeline.start()
 73+
 74+ queue.join()
5875
5976
60 -
6177 if __name__ == '__main__':
6278 parser = argparse.ArgumentParser(description='Generic DataPipeline Cruncher')
6379
Index: trunk/analytics/pipeliner/pipeline.py
@@ -1,7 +1,39 @@
2 -class DataPipeline(object):
 2+from abc import ABCMeta
 3+import zlib
 4+import sys
 5+
 6+READ_BLOCK_SIZE = 1024*8
 7+
 8+
 9+class DataPipeline(object):
 10+ __metaclass__ = ABCMeta
 11+
 12+ def decompress(self):
 13+ fh = open(self.filename, 'rb')
 14+ d = zlib.decompressobj(16+zlib.MAX_WBITS)
 15+ data = ''
 16+ while True:
 17+ raw_data = fh.read(READ_BLOCK_SIZE)
 18+ data += d.decompress(raw_data)
 19+ if not data:
 20+ break
 21+ elif not data.endswith('\n'):
 22+ position = data.rfind('\n') +1
 23+ lines = data[:position]
 24+ lines = lines.split('\n')
 25+ data=data[position:]
 26+ else:
 27+ lines = data
 28+ yield lines
 29+
330 def extract(self):
4 - return
 31+ while True:
 32+ line = sys.stdin.readline()
 33+ if not line:
 34+ break
 35+ self.aggregate(line)
536
 37+
638 def aggregate(self, obs):
739 return
840
@@ -24,3 +56,4 @@
2557 return
2658
2759
 60+

Status & tagging log