r109787 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r109786‎ | r109787 | r109788 >
Date:01:13, 23 January 2012
Author:diederik
Status:deferred
Tags:
Comment:
Multiprocessing launcher no longer causes memory leaks. Fixes a lot of edge cases.
Modified paths:
  • /trunk/analytics/pipeliner/dataset.py (modified) (history)
  • /trunk/analytics/pipeliner/launcher.py (modified) (history)
  • /trunk/analytics/pipeliner/pipeline.py (modified) (history)
  • /trunk/analytics/pipeliner/user_agent.py (modified) (history)

Diff [purge]

Index: trunk/analytics/pipeliner/user_agent.py
@@ -1,25 +1,82 @@
 2+# -*- coding: utf-8 -*-
 3+'''
 4+Copyright (C) 2012 by Diederik van Liere (dvanliere@wikimedia.org)
 5+This program is free software; you can redistribute it and/or
 6+modify it under the terms of the GNU General Public License version 2
 7+as published by the Free Software Foundation.
 8+This program is distributed in the hope that it will be useful,
 9+but WITHOUT ANY WARRANTY; without even the implied warranty of
 10+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 11+See the GNU General Public License for more details, at
 12+http://www.fsf.org/licenses/gpl.html
 13+'''
 14+
 15+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@wikimedia.org)', ])
 16+__email__ = 'dvanliere at wikimedia dot org'
 17+__date__ = '2012-01-22'
 18+__version__ = '0.1'
 19+
 20+import gc
 21+from datetime import datetime
222 from urlparse import urlparse
3 -from datetime import datetime
423 from wurfl import devices
524 from pywurfl.algorithms import TwoStepAnalysis
625 import GeoIP
726
827 from pipeline import DataPipeline
9 -from dataset import UserAgentObservation
1028
1129 search_algorithm = TwoStepAnalysis(devices)
1230
1331 class Variable:
 32+ '''
 33+ This class defines the variables that we want to collect,
 34+ which function we need to call to construct the variable and
 35+ the position of the raw data in a line of data.
 36+ '''
1437 def __init__(self, name, func_name, location, store=True):
1538 self.name = name
1639 self.func_name = func_name
1740 self.location = location
1841 self.store = store
1942
 43+class UserAgentObservation:
 44+ '''
 45+ This class aggregates observations for a particular combination of
 46+ project, language, browser, country, timestamp.
 47+ '''
 48+ def __init__(self, **kwargs):
 49+ self.count = 0
 50+ for key, value in kwargs.iteritems():
 51+ setattr(self, key, value)
 52+
 53+ def __str__(self):
 54+ if self.device:
 55+ return '%s observations using %s:%s in %s for %s%s on %s' % (self.count, self.device, self.geography, self.language_code, self.project, self.timestamp)
 56+ else:
 57+ return '%s observations in %s for %s%s on %s' % (self.count, self.geography, self.language_code, self.project, self.timestamp)
 58+
 59+
2060 class UserAgentPipeline(DataPipeline):
21 - def __init__(self, observation_class, filename, process_id, number_of_fields=144):
 61+ '''
 62+ This class does all the heavy lifting of aggregating observations, writing output
 63+ to a file, and do some simple logging
 64+ '''
 65+ def __init__(self, observation_class, filename, process_id, number_of_fields=14):
2266 self.start = datetime.now()
23 - self.output_counter = 0
 67+ self.skipped_records = 0
 68+ self.http_valid_codes = {'200':1,'304':1, '302':1}
 69+ self.observations = {} #weakref.WeakValueDictionary()
 70+ self.http_invalid_codes = {}
 71+ self.invalid_user_agents = {}
 72+ self.keys = set()
 73+ self.url = None
 74+ self.filename = filename
 75+ self.process_id = process_id
 76+ self.number_of_fields = number_of_fields
 77+ self.observation_class = observation_class
 78+ self.gi = GeoIP.new(GeoIP.GEOIP_MEMORY_CACHE)
 79+ self.error_vars = ['http_invalid_codes', 'invalid_user_agents', 'skipped_records']
 80+
2481 self.variables = {
2582 'language_code': ['language_code', '_determine_language_code', 8],
2683 'project': ['project', '_determine_project', 8],
@@ -27,36 +84,40 @@
2885 'timestamp': ['timestamp', '_determine_date', 2],
2986 'user_agent': ['user_agent', '_convert_to_unicode', 13],
3087 }
31 - if self.variables == {}:
32 - raise Exception('''You have to define both a mapping of your
33 - properties and the functions that generate them and a mapping
34 - of your raw input data and their position in a raw data line.''')
35 - self.vars= {}
 88+ self.props= {}
3689 for key, value in self.variables.iteritems():
37 - self.vars[key]= Variable(*value)
38 -
39 - self.observations = {}
40 - self.observation_class = observation_class
41 - self.filename = filename
42 - self.gi = GeoIP.new(GeoIP.GEOIP_MEMORY_CACHE)
43 - self.process_id = process_id
44 - self.number_of_fields = number_of_fields
 90+ self.props[key]= Variable(*value)
4591
4692 def _prepare_obs(self, obs):
47 - return obs.strip().split(' ')
 93+ '''
 94+ Obs is the raw untouched input from stdin. This function prepares it for
 95+ subsequent analysis.
 96+ '''
 97+ if type(obs) != type([]):
 98+ return obs.strip().split(' ')
 99+ else:
 100+ return obs
48101
49 - def _generate_key(self, vars):
50 - value = '_'.join(vars.values())
 102+ def _parse_url(self, url):
 103+ self.url = urlparse(url)
 104+
 105+ def _generate_key(self, props):
 106+ value = '_'.join(props.values())
51107 return hash(value)
52108
53109 def _determine_language_code(self, url):
54 - url = urlparse(url)
55 - return url.netloc.split('.')[0]
 110+ domain = self.url.netloc.split('.')
 111+ if len(domain) != 2:
 112+ return domain[0]
 113+ else:
 114+ return 'en'
56115
57116 def _determine_project(self, url):
58 - url = urlparse(url)
59 - return url.netloc.split('.')[-2]
60 -
 117+ try:
 118+ return self.url.netloc.split('.')[-2]
 119+ except IndexError:
 120+ return 'unknown_project'
 121+
61122 def _determine_geography(self, ip):
62123 geography = self.gi.country_code_by_addr(ip)
63124 if not geography:
@@ -70,15 +131,16 @@
71132 '''
72133 return timestamp[:10]
73134
74 - def _add_observation(self, key, vars):
 135+ def _add_observation(self, key, props):
75136 '''
76 - Vars should be a dictionary
 137+ props should be a dictionary and here the actual aggregation happens.
77138 '''
78 - if not isinstance(vars, dict):
 139+ if not isinstance(props, dict):
79140 raise Exception('You have to feed an instance of a Datamodel a dictionary.')
80141
81 - obs = self.observations.get(key, UserAgentObservation(**vars))
 142+ obs = self.observations.get(key, UserAgentObservation(**props))
82143 obs.count +=1
 144+ self.keys.add(key)
83145 self.observations[key] =obs
84146
85147 def _convert_to_unicode(self, obs):
@@ -90,62 +152,120 @@
91153 pageviews += obs.count
92154 return pageviews
93155
 156+ def _determine_http_code(self, line):
 157+ #filter out all invalid http requests
 158+ status_code = line[5][-3:]
 159+ res = self.http_valid_codes.get(status_code, None)
 160+ if res:
 161+ return line
 162+ else:
 163+ self.http_invalid_codes.setdefault(status_code, 0)
 164+ self.http_invalid_codes[status_code] +=1
 165+ return None
94166
 167+ def _free_memory(self):
 168+ '''
 169+ This function is no longer used.
 170+ '''
 171+ for value in self.observations.values():
 172+ del value
 173+ self.observations = {}
 174+ gc.collect()
 175+
 176+ def _write_error_log(self):
 177+ for prop_name in self.error_vars:
 178+ fh = open('logs/diag-%s-%s.tsv' % (self.process_id, prop_name), 'w')
 179+ prop = getattr(self, prop_name)
 180+ if type(prop) == type({}):
 181+ if prop != {}: #it doesn't make sense to write an empty dictionary to a file.
 182+ for key, value in prop.iteritems():
 183+ fh.write('%s\t%s\n' %(key, value))
 184+ else:
 185+ fh.write('%s\t%s\n' % (prop_name, prop))
 186+ fh.close()
 187+
 188+ def _is_device_mobile(self, device):
 189+ return device.is_wireless_device
 190+
95191 def transform(self, obs):
96 - vars = {}
97 - for key in self.vars.keys():
 192+ props = {}
 193+ for key in self.props.keys():
98194 try:
99 - func_name = getattr(self.vars[key], 'func_name')
100 - location = getattr(self.vars[key], 'location')
 195+ func_name = getattr(self.props[key], 'func_name')
 196+ location = getattr(self.props[key], 'location')
 197+ if location == 8:
 198+ res = self._parse_url(obs[location])
 199+ if res == False:
 200+ print obs
101201 if func_name:
102202 func = getattr(self, func_name)
103 - except:
104 - raise Exception('You have not defined function %s' % (func_name))
105 - #print key, obs[location]
106 - vars[key] = func(obs[location])
107 - return vars
 203+ props[key] = func(obs[location])
 204+ except Exception, e:
 205+ print len(obs), location
 206+ print func_name
 207+ print obs
 208+ print props
 209+ raise Exception(e)
 210+ return props
108211
109212 def aggregate(self, obs):
110213 obs = self._prepare_obs(obs)
111 - vars = self.transform(obs)
112 - key = self._generate_key(vars)
113 - self._add_observation(key, vars)
 214+ props = self.transform(obs)
 215+ key = self._generate_key(props)
 216+ self._add_observation(key, props)
114217
115218 def post_processing(self):
116219 for obs in self.observations.values():
117 - obs.device = devices.select_ua(obs.user_agent, search=search_algorithm)
 220+ try:
 221+ device = devices.select_ua(obs.user_agent, search=search_algorithm)
 222+ obs.device = '%s:%s' % (device.brand_name, device.model_name)
 223+ obs.mobile = self._is_device_mobile(device)
 224+ except (ValueError, UnicodeDecodeError):
 225+ obs.device = '%s:%s' % ('generic', 'unknown_device')
 226+ obs.mobile = True
 227+ self.invalid_user_agents.setdefault(obs.user_agent,0)
 228+ self.invalid_user_agents[obs.user_agent] +=1
118229
119230 def pre_processing(self, line):
120 - count = line.count(' ')
121 - if count == self.number_of_fields:
122 - return line
 231+ line = self._prepare_obs(line)
 232+ if len(line) == (self.number_of_fields):
 233+ return self._determine_http_code(line)
123234 else:
124235 return None
125236
126237 def load(self):
127 - fh = open('output/chunk-%s_process-%s.tsv' % (self.output_counter,self.process_id), 'w')
 238+ fh = open('output/chunk-%s.tsv' % (self.process_id), 'w')
128239 observations = self.observations.values()
129240 for obs in observations:
130241 try:
131 - fh.write('%s\t%s\t%s\t%s\t%s\t%s\t%s\n' % (obs.count, obs.device.brand_name, obs.device.model_name, obs.geography, obs.language_code, obs.project, obs.timestamp))
 242+ fh.write('%s\t%s\t%s\t%s\t%s\t%s\n' % (obs.count, obs.device, obs.geography, obs.language_code, obs.project, obs.timestamp))
132243 except:
133244 pass
134245 fh.close()
135 - self.output_counter+=1
136246
 247+ def diagnostics(self):
 248+ self._write_error_log()
 249+ print 'Worker %s: Processing of %s took %s' % (self.process_id, self.filename, (datetime.now() - self.start))
 250+ print 'Total number of skipped records: %s' % self.skipped_records
 251+ print 'Total number of invalid http requests: %s' % sum(self.http_invalid_codes.values())
 252+ print 'Total number of invalid user agents: %s' % sum(self.invalid_user_agents.values())
 253+
137254 def shutdown(self):
138255 print 'Total number of observation instances: %s' % (len(self.observations.keys()))
139256 print 'Total number of pageviews: %s' % self._total_pageviews()
140 - print 'Total processing time: %s' % (datetime.now() - self.start)
141 -
 257+ #self._free_memory()
 258+
142259 def run(self):
143260 for line in self.decompress():
144261 line = self.pre_processing(line)
145262 if line:
146263 self.aggregate(line)
 264+ else:
 265+ self.skipped_records +=1
147266
148267 self.post_processing()
149268 self.load()
 269+ self.diagnostics()
150270 self.shutdown()
151271
152272
@@ -155,8 +275,8 @@
156276
157277
158278 def debug():
159 - pl = UserAgentPipeline(UserAgentObservation, 'mobile.log-20110826.gz', 0, 13)
160 - pl.run()
 279+ pipeline = UserAgentPipeline(UserAgentObservation, 'mobile.log-20110826.gz', 0, 14)
 280+ pipeline.run()
161281
162282 if __name__ == '__main__':
163283 #main()
Index: trunk/analytics/pipeliner/launcher.py
@@ -1,16 +1,36 @@
 2+# -*- coding: utf-8 -*-
 3+
 4+'''
 5+Copyright (C) 2012 by Diederik van Liere (dvanliere@wikimedia.org)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@wikimedia.org)', ])
 17+__email__ = 'dvanliere at wikimedia dot org'
 18+__date__ = '2012-01-22'
 19+__version__ = '0.1'
 20+
 21+
222 import os
323 from multiprocessing import Process, JoinableQueue, cpu_count
4 -from datetime import datetime
 24+from time import sleep
525
626 import argparse
727
8 -
928 def _check_folder_exists(path):
1029 if os.path.exists(path):
1130 return path
1231 else:
1332 raise Exception('Please enter a valid source.')
1433
 34+
1535 def _prepare_target(pipeline):
1636 try:
1737 module = __import__('%s' % pipeline)
@@ -20,59 +40,81 @@
2141 return func
2242
2343
24 -def start_pipeline(queue, target, process_id):
25 - while True:
26 - filename = queue.get()
27 - queue.task_done()
28 - if not filename:
29 - print '%s files left in the queue' % queue.qsize()
30 - break
31 - t0 = datetime.now()
32 - target(filename, process_id)
33 - t1 = datetime.now()
34 - print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0))
35 - print 'There are %s files left in the queue' % (queue.qsize())
 44+def launch_pipeline(target, filename, pid):
 45+ return Process(target=target, args=[filename, pid])
3646
37 -
38 -
39 -def main(args):
 47+
 48+def determine_number_of_files(args, ext):
4049 '''
41 - This function initializes the multiprocessor, and loading the queue with
42 - files
 50+ Count the total number of files that need to be processed.
4351 '''
 52+ files = os.listdir(args.source)
 53+ nr_files = 0
 54+ for filename in files:
 55+ if filename.endswith(ext):
 56+ nr_files +=1
 57+ return nr_files
4458
 59+
 60+def determine_number_of_processors(nr_files):
 61+ '''
 62+ Determine the maximum number of processes that can be launched.
 63+ To prevent hammering of the server, it always leaves 1 core
 64+ unused.
 65+ '''
 66+ if nr_files > cpu_count():
 67+ return cpu_count() - 1
 68+ else:
 69+ return nr_files
 70+
 71+
 72+def fill_queue(args, nr_processors, ext):
 73+ '''
 74+ Fill the queue with the full path to the files that need to be processed
 75+ '''
4576 queue = JoinableQueue()
46 - target = _prepare_target(args.pipeline)
 77+ files = os.listdir(args.source)
4778
48 - files = os.listdir(args.source)
49 -
5079 count_files=0
5180 for filename in files:
5281 filename = os.path.join(args.source, filename)
53 - if filename.endswith('gz'):
 82+ if filename.endswith(ext):
5483 print filename
5584 count_files+=1
5685 queue.put(filename)
57 -
58 - if count_files > cpu_count():
59 - processors = cpu_count() - 1
60 - else:
61 - processors = count_files
62 -
63 - for x in xrange(processors):
 86+ for x in xrange(nr_processors):
6487 print 'Inserting poison pill %s...' % x
6588 queue.put(None)
6689
67 - pipelines = [Process(target=start_pipeline, args=[queue, target, process_id])
68 - for process_id in xrange(processors)]
 90+ return queue
 91+
6992
70 -
71 - for pipeline in pipelines:
72 - pipeline.start()
73 -
74 - queue.join()
 93+def main(args):
 94+ '''
 95+ Main function that will fill the queue and launch the processes
 96+ '''
 97+ ext= 'gz'
 98+ target = _prepare_target(args.pipeline)
 99+ pipelines = []
 100+ pid=0
 101+ nr_files = determine_number_of_files(args, ext)
 102+ nr_processors = determine_number_of_processors(nr_files)
 103+ queue= fill_queue(args, nr_processors, ext)
 104+ while nr_files != 0:
 105+ while sum(pipeline.is_alive() for pipeline in pipelines) != nr_processors:
 106+ filename = queue.get()
 107+ queue.task_done()
 108+ if filename:
 109+ p =launch_pipeline(target, filename, pid)
 110+ p.start()
 111+ pipelines.append(p)
 112+ pid+=1
 113+ nr_files -=1
 114+ print 'There are %s files left in the queue' % (nr_files)
 115+ sleep(30)
75116
76117
 118+
77119 if __name__ == '__main__':
78120 parser = argparse.ArgumentParser(description='Generic DataPipeline Cruncher')
79121
Index: trunk/analytics/pipeliner/dataset.py
@@ -1,52 +1,39 @@
2 -from abc import ABCMeta
 2+# -*- coding: utf-8 -*-
33
 4+'''
 5+Copyright (C) 2012 by Diederik van Liere (dvanliere@wikimedia.org)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@wikimedia.org)', ])
 17+__email__ = 'dvanliere at wikimedia dot org'
 18+__date__ = '2012-01-22'
 19+__version__ = '0.1'
 20+
 21+
422 class Observation(object):
523 def __init__(self, *args):
624 self.count = 0
725
8 -# class Datamodel(object):
9 -# def __init__(self, *args):
10 -# self.obs = {}
11 -#
12 -# def add_observation(self, key, obs):
13 -# return
14 -
1526
16 -class UserAgentObservation:
17 - __metaclass__ = ABCMeta
18 -
 27+class UserAgentObservation(object):
 28+
1929 def __init__(self, **kwargs):
20 - self.count=0
 30+ self.count = 0
2131 for key, value in kwargs.iteritems():
2232 setattr(self, key, value)
2333
2434 def __str__(self):
2535 if self.device:
26 - return '%s observations using %s:%s in %s for %s%s on %s' % (self.count, self.device.brand_name, self.device.model_name, self.geography, self.language_code, self.project, self.timestamp)
 36+ return '%s observations using %s:%s in %s for %s%s on %s' % (self.count, self.device, self.geography, self.language_code, self.project, self.timestamp)
2737 else:
2838 return '%s observations in %s for %s%s on %s' % (self.count, self.geography, self.language_code, self.project, self.timestamp)
2939
30 -# class UserAgentDatamodel(object):
31 -# __metaclass__ = ABCMeta
32 -#
33 -# def __init__(self, full_string, key):
34 -# self.full_string = full_string
35 -# self.key = key
36 -#
37 -# def __str__(self):
38 -# return self.full_string
39 -#
40 -# def add_observation(self, key, vars):
41 -# '''
42 -# Vars should be a dictionary
43 -# '''
44 -# if not isinstance(vars, dict):
45 -# raise Exception('You have to feed an instance of a Datamodel a dictionary.')
46 -#
47 -# obs = self.obs.get(key, UserAgentObservation(vars))
48 -# obs.count +=1
49 -# self.obs[key] =obs
50 -
51 -UserAgentObservation.register(Observation)
52 -
5340
Index: trunk/analytics/pipeliner/pipeline.py
@@ -1,3 +1,23 @@
 2+# -*- coding: utf-8 -*-
 3+
 4+'''
 5+Copyright (C) 2012 by Diederik van Liere (dvanliere@wikimedia.org)
 6+This program is free software; you can redistribute it and/or
 7+modify it under the terms of the GNU General Public License version 2
 8+as published by the Free Software Foundation.
 9+This program is distributed in the hope that it will be useful,
 10+but WITHOUT ANY WARRANTY; without even the implied warranty of
 11+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
 12+See the GNU General Public License for more details, at
 13+http://www.fsf.org/licenses/gpl.html
 14+'''
 15+
 16+__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@wikimedia.org)', ])
 17+__email__ = 'dvanliere at wikimedia dot org'
 18+__date__ = '2012-01-22'
 19+__version__ = '0.1'
 20+
 21+
222 from abc import ABCMeta
323 import subprocess
424 import sys
@@ -6,6 +26,10 @@
727 __metaclass__ = ABCMeta
828
929 def decompress(self):
 30+ '''
 31+ Don't use the internal zlib library from Python, it's super slow.
 32+ This might make this script Linux/OSX only.
 33+ '''
1034 p = subprocess.Popen(['gunzip','-c', self.filename], stdout=subprocess.PIPE, shell=False)
1135 for line in iter(p.stdout.readline, ""):
1236 yield line

Status & tagging log