r76546 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r76545‎ | r76546 | r76547 >
Date:23:07, 11 November 2010
Author:diederik
Status:deferred
Tags:
Comment:
Ironing out platform inconsistencies.
Modified paths:
  • /trunk/tools/editor_trends/config.py (modified) (history)
  • /trunk/tools/editor_trends/construct_datasets.py (modified) (history)
  • /trunk/tools/editor_trends/database/cache.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/optimize_editors.py (modified) (history)
  • /trunk/tools/editor_trends/run.py (modified) (history)
  • /trunk/tools/editor_trends/settings.py (modified) (history)
  • /trunk/tools/editor_trends/utils/dump_downloader.py (modified) (history)
  • /trunk/tools/editor_trends/utils/models.py (modified) (history)
  • /trunk/tools/editor_trends/utils/process_constructor.py (modified) (history)
  • /trunk/tools/editor_trends/utils/sort.py (modified) (history)
  • /trunk/tools/editor_trends/utils/utils.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -41,15 +41,15 @@
4242 return getattr(args, key, None)
4343
4444
45 -def config_launcher(args, location, filename, project, language_code):
 45+def config_launcher(args, location, filename, project, full_project, language_code):
4646 config.load_configuration(args)
4747
4848
4949 def determine_default_language():
5050 language_code = locale.getdefaultlocale()[0]
5151 return language_code.split('_')[0]
52 -
53 -
 52+
 53+
5454 def retrieve_projectname(args):
5555 language_code = retrieve_language(args)
5656 if language_code == None:
@@ -90,33 +90,39 @@
9191 language_code = retrieve_language(args)
9292 locations['language_code'] = language_code
9393 locations['location'] = os.path.join(location, language_code, project)
94 - locations['project'] = retrieve_projectname(args)
 94+ locations['project'] = project
 95+ locations['full_project'] = retrieve_projectname(args)
9596 locations['filename'] = generate_wikidump_filename(project, args)
9697 return locations
9798
9899
99 -def show_settings(args, location, filename, project, language_code):
 100+def prepare_file_locations(location):
 101+ result = utils.check_file_exists(location, '')
 102+ if result == False:
 103+ utils.create_directory(os.path.join(location))
 104+
 105+
 106+def show_settings(args, location, filename, project, full_project, language_code):
100107 project = settings.WIKIMEDIA_PROJECTS.get(project, 'wiki')
101108 project = project.title()
102109 language_map = utils.invert_dict(languages.MAPPING)
103110 print 'Project: %s' % (project)
104111 print 'Language: %s' % language_map[language_code].decode('utf-8')
105112 print 'Input directory: %s' % location
106 - print 'Output directory: TODO'
107 -
 113+ print 'Output directory: %s and subdirectories' % location
108114
109 -def dump_downloader_launcher(args, location, filename, project, language_code):
 115+
 116+def dump_downloader_launcher(args, location, filename, project, full_project, language_code):
110117 print 'dump downloader'
111118 pbar = get_value(args, 'progress')
112119 domain = settings.WP_DUMP_LOCATION
113120 path = '/%s/latest/' % project
114121 extension = utils.determine_file_extension(filename)
115122 filemode = utils.determine_file_mode(extension)
116 -
117123 dump_downloader.download_wiki_file(domain, path, filename, location, filemode, pbar)
118124
119125
120 -def split_xml_file_launcher(args, location, filename, project, language_code):
 126+def split_xml_file_launcher(args, location, filename, project, full_project, language_code):
121127 print 'split_xml_file_launcher'
122128 ext = utils.determine_file_extension(filename)
123129 if ext in settings.COMPRESSION_EXTENSIONS:
@@ -136,7 +142,7 @@
137143 path = config.detect_installed_program('7zip')
138144 source = os.path.join(location, file)
139145 p = None
140 -
 146+
141147 if settings.OS == 'Windows':
142148 p = subprocess.Popen(['%s%s' % (path, '7z.exe'), 'e', '-o%s\\' % location, '%s' % (source,)], shell=True).wait()
143149 elif settings.OS == 'Linux':
@@ -148,18 +154,22 @@
149155 return p
150156
151157
152 -def mongodb_script_launcher(args, location, filename, project, language_code):
 158+def mongodb_script_launcher(args, location, filename, project, full_project, language_code):
153159 print 'mongodb_script_launcher'
154160 map_wiki_editors.run_parse_editors(project, language_code, location)
155161
156162
157 -def dataset_launcher(args, project):
 163+def sort_launcher(args, location, filename, project, full_project, language_code):
 164+ raise NotImplementedError
 165+
 166+
 167+def dataset_launcher(args, full_project):
158168 print 'dataset launcher'
159169 optimize_editors.run_optimize_editors(project)
160170 construct_datasets.generate_editor_dataset_launcher(project)
161171
162172
163 -def all_launcher(args, location, filename, project, language_code):
 173+def all_launcher(args, location, filename, project, full_project, language_code):
164174 print 'all_launcher'
165175 dump_downloader_launcher(args, location, filename, project, language_code)
166176 split_xml_file_launcher(args, location, filename, project, language_code)
@@ -173,7 +183,7 @@
174184 return tuple(choices)
175185
176186
177 -def show_languages(args, location, filename, project, language_code):
 187+def show_languages(args, location, filename, project, full_project, language_code):
178188 first = get_value(args, 'startswith')
179189 if first != None:
180190 first = first.title()
@@ -193,18 +203,18 @@
194204
195205
196206 def detect_python_version():
197 - version = ''.join(sys.version_info[0:2])
 207+ version = sys.version_info[0:2]
198208 if version < settings.MINIMUM_PYTHON_VERSION:
199 - raise 'Please upgrade to Python 2.6 or higher (but not Python 3.x).'
 209+ raise 'Please upgrade to Python 2.6 or higher (but not Python 3.x).'
200210
201211 def about():
202212 print 'Editor Trends Software is (c) 2010 by the Wikimedia Foundation.'
203213 print 'Written by Diederik van Liere (dvanliere@gmail.com).'
204214 print 'This software comes with ABSOLUTELY NO WARRANTY. This is free software, and you are welcome to distribute it under certain conditions.'
205215 print 'See the README.1ST file for more information.'
206 - print ''
207 -
 216+ print '\n'
208217
 218+
209219 def main():
210220 default_language = determine_default_language()
211221 file_choices = ('stub-meta-history.xml.gz',
@@ -216,8 +226,8 @@
217227 subparsers = parser.add_subparsers(help='sub-command help')
218228
219229 parser_languages = subparsers.add_parser('show_languages', help='Overview of all valid languages.')
220 - parser_languages.add_argument('-s', '--startswith',
221 - action='store',
 230+ parser_languages.add_argument('-s', '--startswith',
 231+ action='store',
222232 help='Enter the first letter of a language to see which languages are available.')
223233 parser_languages.set_defaults(func=show_languages)
224234
@@ -230,12 +240,15 @@
231241 parser_split = subparsers.add_parser('split', help='The split sub command splits the downloaded file in smaller chunks to parallelize extracting information.')
232242 parser_split.set_defaults(func=split_xml_file_launcher)
233243
 244+ parser_sort = subparsers.add_parser('sort', help='By presorting the data, significant processing time reducations are achieved.')
 245+ parser_sort.set_defaults(func=sort_launcher)
 246+
234247 parser_create = subparsers.add_parser('store', help='The store sub command parsers the XML chunk files, extracts the information and stores it in a MongoDB.')
235248 parser_create.set_defaults(func=mongodb_script_launcher)
236249
237250 parser_dataset = subparsers.add_parser('dataset', help='Create a dataset from the MongoDB and write it to a csv file.')
238251 parser_dataset.set_defaults(func=dataset_launcher)
239 -
 252+
240253 parser_all = subparsers.add_parser('all', help='The all sub command runs the download, split, store and dataset commands.\n\nWARNING: THIS COULD TAKE DAYS DEPENDING ON THE CONFIGURATION OF YOUR MACHINE AND THE SIZE OF THE WIKIMEDIA DUMP FILE.')
241254 parser_all.set_defaults(func=all_launcher)
242255
@@ -265,6 +278,7 @@
266279 args = parser.parse_args()
267280 config.load_configuration(args)
268281 locations = determine_file_locations(args)
 282+ prepare_file_locations(locations['location'])
269283 about()
270284 show_settings(args, **locations)
271285 args.func(args, **locations)
Index: trunk/tools/editor_trends/optimize_editors.py
@@ -29,6 +29,13 @@
3030 import construct_datasets
3131
3232
 33+try:
 34+ import psyco
 35+ psyco.full()
 36+except ImportError:
 37+ pass
 38+
 39+
3340 def create_datacontainer(init_value=0):
3441 '''
3542 This function initializes an empty dictionary with as key the year (starting
@@ -43,6 +50,29 @@
4451 return data
4552
4653
 54+def add_months_to_datacontainer(datacontainer):
 55+ for dc in datacontainer:
 56+ datacontainer[dc] = {}
 57+ for x in xrange(1, 13):
 58+ datacontainer[dc][str(x)] = 0
 59+ return datacontainer
 60+
 61+
 62+def determine_edits_by_month(edits):
 63+ datacontainer = create_datacontainer(init_value=0)
 64+ datacontainer = add_months_to_datacontainer(datacontainer)
 65+ for year in edits:
 66+ months = set()
 67+ for edit in edits[year]:
 68+ m = str(edit['date'].month)
 69+ if m not in months:
 70+ datacontainer[year][m] = 1
 71+ months.add(m)
 72+ if len(months) == 12:
 73+ break
 74+ return datacontainer
 75+
 76+
4777 def determine_edits_by_year(dates):
4878 '''
4979 This function counts the number of edits by year made by a particular editor.
@@ -64,19 +94,19 @@
6595 year = str(date['date'].year)
6696 articles[year].add(date['article'])
6797 for article in articles:
68 - articles[article] = len(article)
 98+ articles[article] = len(articles[article])
6999 return articles
70100
71101
72102 def sort_edits(edits):
73103 edits = utils.merge_list(edits)
74104 return sorted(edits, key=itemgetter('date'))
75 -
76105
 106+
77107 def optimize_editors(input_queue, result_queue, pbar, **kwargs):
78108 dbname = kwargs.pop('dbname')
79109 mongo = db.init_mongo_db(dbname)
80 - input = mongo['editors']
 110+ input = mongo['test']
81111 output = mongo['dataset']
82112 output.ensure_index('editor')
83113 output.ensure_index('year_joined')
@@ -85,7 +115,10 @@
86116 try:
87117 id = input_queue.get(block=False)
88118 editor = input.find_one({'editor': id})
 119+ if editor == None:
 120+ continue
89121 edits = editor['edits']
 122+ monthly_edits = determine_edits_by_month(edits)
90123 edits = sort_edits(edits)
91124 edit_count = len(edits)
92125 new_wikipedian = edits[9]['date']
@@ -93,6 +126,7 @@
94127 final_edit = edits[-1]['date']
95128 edits_by_year = determine_edits_by_year(edits)
96129 articles_by_year = determine_articles_by_year(edits)
 130+
97131 edits = edits[:10]
98132
99133 output.insert({'editor': id, 'edits': edits,
@@ -101,7 +135,8 @@
102136 'edit_count': edit_count,
103137 'final_edit': final_edit,
104138 'first_edit': first_edit,
105 - 'articles_by_year': articles_by_year})
 139+ 'articles_by_year': articles_by_year,
 140+ 'monthly_edits': monthly_edits})
106141 print 'Items left: %s' % input_queue.qsize()
107142 except Empty:
108143 break
@@ -114,20 +149,11 @@
115150 'dbname': 'enwiki',
116151 'nr_input_processors': 1,
117152 'nr_output_processors': 0,
 153+ 'poison_pill': False
118154 }
119155 print len(ids)
120156 ids = list(ids)
121 - chunks = utils.split_list(ids, settings.NUMBER_OF_PROCESSES)
122 -# chunks = {}
123 -# parts = int(round(float(len(ids)) / 1, 0))
124 -# a = 0
125 -# for x in xrange(settings.NUMBER_OF_PROCESSES):
126 -# b = a + parts
127 -# chunks[x] = ids[a:b]
128 -# a = (x + 1) * parts
129 -# if a >= len(ids):
130 -# break
131 -
 157+ chunks = dict(0, ids)
132158 pc.build_scaffolding(pc.load_queue, optimize_editors, chunks, False, False, **kwargs)
133159
134160
@@ -142,4 +168,4 @@
143169
144170 if __name__ == '__main__':
145171 #debug_optimize_editors('test')
146 - run_optimize_editors('enwiki')
 172+ run_optimize_editors('enwiki')
\ No newline at end of file
Index: trunk/tools/editor_trends/settings.py
@@ -54,7 +54,7 @@
5555 IGNORE_DIRS = ['wikistats', 'zips']
5656 ROOT = '/' if OS != 'Windows' else 'c:\\'
5757
58 -MINIMUM_PYTHON_VERSION = 2.6
 58+MINIMUM_PYTHON_VERSION = (2, 6)
5959
6060 dirs = [name for name in os.listdir(WORKING_DIRECTORY) if
6161 os.path.isdir(os.path.join(WORKING_DIRECTORY, name))]
@@ -111,7 +111,7 @@
112112 MAX_XML_FILE_SIZE = 67108864
113113
114114 if OS == 'Windows' and ARCH == 'i386':
115 - MAX_FILES_OPEN = win32file._getmaxstdio()
 115+ MAX_FILES_OPEN = win32file._getmaxstdio()
116116 elif OS != 'Windows':
117117 MAX_FILES_OPEN = resource.getrlimit(resource.RLIMIT_NOFILE)
118118 else:
Index: trunk/tools/editor_trends/run.py
@@ -1,5 +1,48 @@
22 import os
 3+import settings
 4+#from utils import namespace_downloader as nd
 5+#nd.launch_downloader()
36
 7+
 8+#def which(program):
 9+# import os
 10+# def is_exe(fpath):
 11+# return os.path.exists(fpath) and os.access(fpath, os.X_OK)
 12+#
 13+# fpath, fname = os.path.split(program)
 14+# if fpath:
 15+# if is_exe(program):
 16+# return program
 17+# else:
 18+# for path in os.environ["PATH"].split(os.pathsep):
 19+# exe_file = os.path.join(path, program)
 20+# if is_exe(exe_file):
 21+# return exe_file
 22+#
 23+# return None
 24+#
 25+#
 26+#result = which('7z.exe')
 27+#print result
 28+
 29+#from database import launcher
 30+#launcher.launcher()
 31+from utils import sort
 32+input = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'txt')
 33+output = os.path.join(settings.XML_FILE_LOCATION, 'en', 'wiki', 'sorted')
 34+dbname = 'enwiki'
 35+#sort.debug_mergesort_feeder(input, output)
 36+#sort.mergesort_launcher(input, output)
 37+#sort.mergesort_external_launcher(dbname, output, output)
 38+
 39+
 40+
 41+
 42+
 43+from analyses import cohort_charts
 44+cohort_charts.prepare_cohort_dataset()
 45+import os
 46+
447 import settings
548 #from utils import namespace_downloader as nd
649 #nd.launch_downloader()
Index: trunk/tools/editor_trends/config.py
@@ -24,10 +24,13 @@
2525
2626 import settings
2727 from utils import utils
 28+try:
 29+ from _winreg import *
 30+except ImportError:
 31+ pass
2832
29 -
3033 def detect_windows_program(program):
31 - from _winreg import *
 34+
3235 entry = settings.WINDOWS_REGISTER[program]
3336 try:
3437 key = OpenKey(HKEY_CURRENT_USER, entry, 0, KEY_READ)
Index: trunk/tools/editor_trends/utils/utils.py
@@ -171,7 +171,7 @@
172172 return 'wb'
173173
174174
175 -def write_list_to_csv(data, fh, recursive=False):
 175+def write_list_to_csv(data, fh, recursive=False, newline=True):
176176 '''
177177 @data is a list which can contain other lists that will be written as a
178178 single line to a textfile
@@ -188,27 +188,32 @@
189189 if tab:
190190 fh.write('\t')
191191 if type(d) == type([]):
192 - recursive = write_list_to_csv(d, fh, True)
 192+ recursive = write_list_to_csv(d, fh, recursive=True, newline=False)
 193+ elif type(d) == type({}):
 194+ tab = write_dict_to_csv(d, fh, write_key=False, newline=newline)
193195 else:
194196 fh.write('%s' % d)
195197 tab = True
196198 if recursive:
197199 tab = False
198200 return True
199 - fh.write('\n')
 201+ if newline:
 202+ fh.write('\n')
200203
201204
202 -def write_dict_to_csv(data, fh):
 205+def write_dict_to_csv(data, fh, write_key=True, newline=True):
203206 keys = data.keys()
204207 for key in keys:
205 - fh.write('%s' % key)
206 - for obs in data[key]:
207 - if getattr(obs, '__iter__', False):
208 - for o in obs:
209 - fh.write('\t%s' % o)
210 - else:
211 - fh.write('\t%s' % (obs))
 208+ if write_key:
 209+ fh.write('%s' % key)
 210+ if getattr(data[key], '__iter__', False):
 211+ for d in data[key]:
 212+ fh.write('\t%s' % d)
 213+ else:
 214+ fh.write('%s\t' % (data[key]))
 215+ if newline:
212216 fh.write('\n')
 217+ return False #this prevents the calling function from writing \t
213218
214219
215220 def create_txt_filehandle(location, name, mode, encoding):
@@ -326,6 +331,7 @@
327332 files.append('.'.join(file))
328333 return files
329334
 335+
330336 def merge_list(datalist):
331337 merged = []
332338 for d in datalist:
@@ -333,6 +339,7 @@
334340 merged.append(x)
335341 return merged
336342
 343+
337344 def split_list(datalist, maxval):
338345 chunks = {}
339346 a = 0
Index: trunk/tools/editor_trends/utils/dump_downloader.py
@@ -29,21 +29,25 @@
3030
3131
3232
33 -def determine_remote_filesize(url, filename):
 33+def determine_remote_filesize(domain, filename):
3434 '''
35 - @url is the full path of the file to be downloaded
 35+ @domain is the full path of the file to be downloaded
3636 @filename is the name of the file to be downloaded
3737 '''
38 - if url.startswith('http://'):
39 - url = url[7:]
40 - conn = httplib.HTTPConnection(url, 80)
41 - conn.request('HEAD', filename)
42 - res = conn.getresponse()
43 - conn.close()
44 - if res.status == 200:
45 - return int(res.getheader('content-length', -1))
46 - else:
47 - return - 1
 38+ try:
 39+ if domain.startswith('http://'):
 40+ domain = domain[7:]
 41+ conn = httplib.HTTPConnection(domain)
 42+ conn.request('HEAD', filename)
 43+ res = conn.getresponse()
 44+ conn.close()
 45+ if res.status == 200:
 46+ return int(res.getheader('content-length', -1))
 47+ else:
 48+ return - 1
 49+ except httplib.socket.error:
 50+ #print 'It seemst that %s is temporarily unavailable, please try again later.' % url
 51+ raise httplib.NotConnected('It seems that %s is temporarily unavailable, please try again later.' % url)
4852
4953
5054 def download_wiki_file(domain, path, filename, location, filemode, pbar):
@@ -57,17 +61,12 @@
5862 @pbar is an instance of progressbar.ProgressBar()
5963 '''
6064 chunk = 4096
61 - result = utils.check_file_exists(location, '')
62 - if result == False:
63 - utils.create_directory(os.path.join(location))
 65+ filesize = determine_remote_filesize(domain, path + filename)
6466 if filemode == 'w':
6567 fh = utils.create_txt_filehandle(location, filename, filemode, settings.ENCODING)
6668 else:
6769 fh = utils.create_binary_filehandle(location, filename, 'wb')
6870
69 - filesize = determine_remote_filesize(domain, path + filename)
70 -
71 -
7271 if filesize != -1 and pbar:
7372 widgets = ['%s: ' % filename, progressbar.Percentage(), ' ',
7473 progressbar.Bar(marker=progressbar.RotatingMarker()),' ',
Index: trunk/tools/editor_trends/utils/models.py
@@ -30,13 +30,14 @@
3131 for kw in kwargs:
3232 setattr(self, kw, kwargs[kw])
3333
34 - def run(self):
 34+ def start(self):
3535 proc_name = self.name
3636 kwargs = {}
3737 IGNORE = ['input_queue', 'result_queue', 'target']
3838 for kw in self.__dict__:
3939 if kw not in IGNORE and not kw.startswith('_'):
4040 kwargs[kw] = getattr(self, kw)
 41+ self._popen = True
4142 self.target(self.input_queue, self.result_queue, **kwargs)
4243
4344
@@ -50,11 +51,12 @@
5152 setattr(self, kw, kwargs[kw])
5253
5354
54 - def run(self):
 55+ def start(self):
5556 proc_name = self.name
5657 kwargs = {}
5758 IGNORE = ['result_queue', 'target']
5859 for kw in self.__dict__:
5960 if kw not in IGNORE and not kw.startswith('_'):
6061 kwargs[kw] = getattr(self, kw)
 62+ self._popen = True
6163 self.target(self.result_queue, **kwargs)
Index: trunk/tools/editor_trends/utils/process_constructor.py
@@ -81,7 +81,7 @@
8282 **kwargs) for i in xrange(nr_input_processors)]
8383
8484 for input_process in input_processes:
85 - input_process.run()
 85+ input_process.start()
8686 pids = [p.pid for p in input_processes]
8787 kwargs['pids'] = pids
8888
Index: trunk/tools/editor_trends/utils/sort.py
@@ -105,6 +105,7 @@
106106 mongo = db.init_mongo_db(dbname)
107107 collection = mongo['test']
108108 mongo.collection.ensure_index('editor')
 109+ mongo.collection.create_index('editor')
109110 editor_cache = cache.EditorCache(collection)
110111 prev_contributor = -1
111112 x = 0
@@ -113,12 +114,15 @@
114115 for line in readline(fh):
115116 if len(line) == 0:
116117 continue
117 - contributor = int(line[0])
 118+ contributor = int(line[0])
 119+ if contributor == 5767932:
 120+ print 'debug'
118121 if prev_contributor != contributor:
119122 if edits >= 10:
120123 result = editor_cache.add(prev_contributor, 'NEXT')
121124 if result:
122 - editors.add(contributor)
 125+ editors.add(prev_contributor)
 126+ result = None
123127 x += 1
124128 print 'Stored %s editors' % x
125129 else:
Index: trunk/tools/editor_trends/construct_datasets.py
@@ -19,6 +19,8 @@
2020
2121 from multiprocessing import Queue
2222 from Queue import Empty
 23+import datetime
 24+from dateutil.relativedelta import *
2325
2426 import progressbar
2527
@@ -67,7 +69,18 @@
6870 obs[var] = edits
6971 return obs
7072
 73+def write_longitudinal_data(id, edits, fh):
 74+ years = edits.keys()
 75+ years.sort()
 76+ for year in years:
 77+ months = edits[year].keys()
 78+ months = [int(m) for m in months]
 79+ months.sort()
 80+ for m in months:
 81+ date = datetime.date(int(year), int(m), 1)
 82+ fh.write('%s\t%s\t%s\n' % (id, date, edits[year][str(m)]))
7183
 84+
7285 def expand_headers(headers, vars_to_expand, obs):
7386 for var in vars_to_expand:
7487 l = len(obs[var])
@@ -77,29 +90,105 @@
7891 suffix = 2001 + i
7992 elif var.endswith('edits'):
8093 suffix = 1 + i
81 - headers.insert(pos+i, '%s_%s' % (var, suffix))
 94+ headers.insert(pos + i, '%s_%s' % (var, suffix))
8295 headers.remove(var)
8396 return headers
84 -
85 -
86 -def generate_editor_dataset(input_queue, data_queue, pbar, **kwargs):
 97+
 98+
 99+def generate_long_editor_dataset(input_queue, data_queue, pbar, **kwargs):
87100 debug = kwargs.pop('debug')
88101 dbname = kwargs.pop('dbname')
89102 mongo = db.init_mongo_db(dbname)
90103 editors = mongo['dataset']
91 - name = dbname + '_editors.csv'
 104+ name = dbname + '_long_editors.csv'
92105 fh = utils.create_txt_filehandle(settings.DATASETS_FILE_LOCATION, name, 'a', settings.ENCODING)
93106 x = 0
94 - vars_to_expand = ['edits', 'edits_by_year']
 107+ vars_to_expand = []
95108 while True:
96109 try:
 110+ id = input_queue.get(block=False)
 111+ obs = editors.find_one({'editor': id}, {'monthly_edits': 1})
 112+ if x == 0:
 113+ headers = obs.keys()
 114+ headers.sort()
 115+ headers = expand_headers(headers, vars_to_expand, obs)
 116+ utils.write_list_to_csv(headers, fh)
 117+ write_longitudinal_data(id, obs['monthly_edits'], fh)
 118+ #utils.write_list_to_csv(data, fh)
 119+ x += 1
 120+ except Empty:
 121+ break
 122+
 123+
 124+def generate_cohort_analysis(input_queue, data_queue, pbar, **kwargs):
 125+ dbname = kwargs.get('dbname')
 126+ pbar = kwargs.get('pbar')
 127+ mongo = db.init_mongo_db(dbname)
 128+ editors = mongo['dataset']
 129+ year = datetime.datetime.now().year + 1
 130+ begin = year - 2001
 131+ p = [3, 6, 9]
 132+ periods = [y * 12 for y in xrange(1, begin)]
 133+ periods = p + periods
 134+ data = {}
 135+ while True:
 136+ try:
 137+ id = input_queue.get(block=False)
 138+ obs = editors.find_one({'editor': id}, {'first_edit': 1, 'final_edit': 1})
 139+ first_edit = obs['first_edit']
 140+ last_edit = obs['final_edit']
 141+ for y in xrange(2001, year):
 142+ if y == 2010 and first_edit > datetime.datetime(2010, 1, 1):
 143+ print 'debug'
 144+ if y not in data:
 145+ data[y] = {}
 146+ data[y]['n'] = 0
 147+ window_end = datetime.datetime(y, 12, 31)
 148+ if window_end > datetime.datetime.now():
 149+ now = datetime.datetime.now()
 150+ m = now.month - 1 #Dump files are always lagging at least one month....
 151+ d = now.day
 152+ window_end = datetime.datetime(y, m, d)
 153+ edits = []
 154+ for period in periods:
 155+ if period not in data[y]:
 156+ data[y][period] = 0
 157+ window_start = datetime.datetime(y, 12, 31) - relativedelta(months=period)
 158+ if window_start < datetime.datetime(2001, 1, 1):
 159+ window_start = datetime.datetime(2001, 1, 1)
 160+ if date_falls_in_window(window_start, window_end, first_edit, last_edit):
 161+ edits.append(period)
 162+ if edits != []:
 163+ p = min(edits)
 164+ data[y]['n'] += 1
 165+ data[y][p] += 1
 166+ #pbar.update(+1)
 167+ except Empty:
 168+ break
 169+ utils.store_object(data, settings.BINARY_OBJECT_FILE_LOCATION, 'cohort_data')
 170+
 171+def date_falls_in_window(window_start, window_end, first_edit, last_edit):
 172+ if first_edit >= window_start and first_edit <= window_end:
 173+ return True
 174+ else:
 175+ return False
 176+
 177+
 178+def generate_wide_editor_dataset(input_queue, data_queue, pbar, **kwargs):
 179+ dbname = kwargs.pop('dbname')
 180+ mongo = db.init_mongo_db(dbname)
 181+ editors = mongo['dataset']
 182+ name = dbname + '_wide_editors.csv'
 183+ fh = utils.create_txt_filehandle(settings.DATASETS_FILE_LOCATION, name, 'a', settings.ENCODING)
 184+ x = 0
 185+ vars_to_expand = ['edits', 'edits_by_year', 'articles_by_year']
 186+ while True:
 187+ try:
97188 if debug:
98189 id = u'99797'
99190 else:
100191 id = input_queue.get(block=False)
101 -
102192 print input_queue.qsize()
103 -
104193 obs = editors.find_one({'editor': id})
105194 obs = expand_observations(obs, vars_to_expand)
106195 if x == 0:
@@ -107,14 +196,12 @@
108197 headers.sort()
109198 headers = expand_headers(headers, vars_to_expand, obs)
110199 utils.write_list_to_csv(headers, fh)
111 - fh.write('\n')
112200 data = []
113201 keys = obs.keys()
114202 keys.sort()
115203 for key in keys:
116204 data.append(obs[key])
117205 utils.write_list_to_csv(data, fh)
118 - fh.write('\n')
119206
120207 x += 1
121208 except Empty:
@@ -141,20 +228,13 @@
142229 'nr_output_processors': 1,
143230 'debug': False,
144231 'dbname': dbname,
 232+ 'poison_pill':False,
 233+ 'pbar': True
145234 }
146235 ids = retrieve_editor_ids_mongo(dbname, 'editors')
147 - chunks = utils.split_list(ids, settings.NUMBER_OF_PROCESSES)
148 -# chunks = {}
149 -# parts = int(round(float(len(ids)) / 1, 0))
150 -# a = 0
151 -# for x in xrange(settings.NUMBER_OF_PROCESSES):
152 -# b = a + parts
153 -# chunks[x] = ids[a:b]
154 -# a = (x + 1) * parts
155 -# if a >= len(ids):
156 -# break
157 -#
158 - pc.build_scaffolding(pc.load_queue, generate_editor_dataset, chunks, False, False, **kwargs)
 236+ ids = list(ids)
 237+ chunks = dict({0: ids})
 238+ pc.build_scaffolding(pc.load_queue, generate_cohort_analysis, chunks, False, False, **kwargs)
159239
160240
161241 def generate_editor_dataset_debug(dbname):
Index: trunk/tools/editor_trends/database/cache.py
@@ -63,16 +63,17 @@
6464 def current_cache_size(self):
6565 return sum([self.editors[k].get('obs', 0) for k in self.editors])
6666
 67+ def clear(self, key):
 68+ if key in self.editors:
 69+ del self.editors[key]
 70+
6771 def add(self, key, value):
6872 if value == 'NEXT':
69 - for editor in self.treshold_editors:
70 - self.insert (editor, self.editors[editor]['edits'])
71 - self.n -= self.editors[editor]['obs']
72 - self.number_editors -= 1
73 - del self.editors[editor]
74 - if key in self.editors:
75 - del self.editors[key]
76 - self.treshold_editors = set()
 73+ result = self.insert(key, self.editors[key]['edits'])
 74+ self.n -= self.editors[key]['obs']
 75+ self.number_editors -= 1
 76+ del self.editors[key]
 77+ return result
7778 else:
7879 self.cumulative_n += 1
7980 self.n += 1
@@ -88,8 +89,8 @@
8990 self.editors[key]['edits'][year].append(value)
9091 self.editors[key]['obs'] += 1
9192
92 - if self.editors[key]['obs'] == self.treshold:
93 - self.treshold_editors.add(key)
 93+ #if self.editors[key]['obs'] == self.treshold:
 94+ # self.treshold_editors.add(key)
9495
9596 def add_years(self, key):
9697 now = datetime.datetime.now().year + 1
@@ -103,8 +104,9 @@
104105 def insert(self, editor, values):
105106 try:
106107 self.collection.insert({'editor': editor, 'edits': values})
 108+ return True
107109 except:
108 - pass
 110+ return False
109111
110112 def store(self):
111113 utils.store_object(self, settings.BINARY_OBJECT_FILE_LOCATION, self.__repr__())

Status & tagging log