Index: trunk/tools/editor_trends/manage.py |
— | — | @@ -34,7 +34,7 @@ |
35 | 35 | from utils import timer |
36 | 36 | from classes import storage |
37 | 37 | from etl import downloader |
38 | | -from etl import enricher |
| 38 | +from etl import extracter |
39 | 39 | from etl import store |
40 | 40 | from etl import sort |
41 | 41 | from etl import transformer |
— | — | @@ -273,7 +273,10 @@ |
274 | 274 | stopwatch = timer.Timer() |
275 | 275 | log.to_db(rts, 'dataset', 'extract', stopwatch, event='start') |
276 | 276 | log.to_csv(logger, rts, 'Start', 'Extract', extract_launcher) |
277 | | - enricher.launcher(rts) |
| 277 | + res = file_utils.delete_file(rts.txt, None, directory=True) |
| 278 | + if res: |
| 279 | + res = file_utils.create_directory(rts.txt) |
| 280 | + extracter.launcher(rts) |
278 | 281 | stopwatch.elapsed() |
279 | 282 | log.to_db(rts, 'dataset', 'extract', stopwatch, event='finish') |
280 | 283 | log.to_csv(logger, rts, 'Finish', 'Extract', extract_launcher) |
Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -1,288 +1,297 @@ |
2 | | -#!/usr/bin/python |
3 | | -# -*- coding: utf-8 -*- |
4 | | -''' |
5 | | -Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
6 | | -This program is free software; you can redistribute it and/or |
7 | | -modify it under the terms of the GNU General Public License version 2 |
8 | | -as published by the Free Software Foundation. |
9 | | -This program is distributed in the hope that it will be useful, |
10 | | -but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | | -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
12 | | -See the GNU General Public License for more details, at |
13 | | -http://www.fsf.org/licenses/gpl.html |
14 | | -''' |
15 | | - |
16 | | - |
17 | | -__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
18 | | -__author__email = 'dvanliere at gmail dot com' |
19 | | -__date__ = '2011-04-10' |
20 | | -__version__ = '0.1' |
21 | | - |
22 | | - |
23 | | -from collections import deque |
24 | | -import sys |
25 | | -from xml.etree.cElementTree import iterparse, dump |
26 | | -from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager |
27 | | - |
28 | | -if '..' not in sys.path: |
29 | | - sys.path.append('..') |
30 | | - |
31 | | -from etl import variables |
32 | | -from classes import buffer |
33 | | -from analyses.adhoc import bot_detector |
34 | | - |
35 | | -def parse_revision(revision, article, xml_namespace, cache, bots, md5hashes, size): |
36 | | - if revision == None: |
37 | | - #the entire revision is empty, weird. |
38 | | - #dump(revision) |
39 | | - return md5hashes, size |
40 | | - |
41 | | - contributor = revision.find('%s%s' % (xml_namespace, 'contributor')) |
42 | | - contributor = variables.parse_contributor(contributor, bots, xml_namespace) |
43 | | - if not contributor: |
44 | | - #editor is anonymous, ignore |
45 | | - return md5hashes, size |
46 | | - |
47 | | - revision_id = revision.find('%s%s' % (xml_namespace, 'id')) |
48 | | - revision_id = variables.extract_revision_id(revision_id) |
49 | | - if revision_id == None: |
50 | | - #revision_id is missing, which is weird |
51 | | - return md5hashes, size |
52 | | - |
53 | | - article['revision_id'] = revision_id |
54 | | - text = variables.extract_revision_text(revision) |
55 | | - article.update(contributor) |
56 | | - |
57 | | - comment = variables.extract_comment_text(revision_id, revision) |
58 | | - cache.comments.update(comment) |
59 | | - |
60 | | - timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text |
61 | | - article['timestamp'] = timestamp |
62 | | - |
63 | | - hash = variables.create_md5hash(text) |
64 | | - revert = variables.is_revision_reverted(hash['hash'], md5hashes) |
65 | | - md5hashes.append(hash['hash']) |
66 | | - size = variables.calculate_delta_article_size(size, text) |
67 | | - |
68 | | - article.update(hash) |
69 | | - article.update(size) |
70 | | - article.update(revert) |
71 | | - cache.add(article) |
72 | | - return md5hashes, size |
73 | | - |
74 | | -def setup_parser(rts): |
75 | | - bots = '' #bot_detector.retrieve_bots(rts.language.code) |
76 | | - start = 'start'; end = 'end' |
77 | | - context = iterparse(fh, events=(start, end)) |
78 | | - context = iter(context) |
79 | | - |
80 | | - include_ns = {3: 'User Talk', |
81 | | - 5: 'Wikipedia Talk', |
82 | | - 1: 'Talk', |
83 | | - 2: 'User', |
84 | | - 4: 'Wikipedia'} |
85 | | - |
86 | | - return bots, context, include_ns |
87 | | - |
88 | | - |
89 | | -def datacompetition_count_edits(rts, file_id): |
90 | | - bots, context, include_ns = setup_parser(rts) |
91 | | - counts = {} |
92 | | - id = False |
93 | | - ns = False |
94 | | - parse = False |
95 | | - |
96 | | - try: |
97 | | - for event, elem in context: |
98 | | - if event is end and elem.tag.endswith('siteinfo'): |
99 | | - xml_namespace = variables.determine_xml_namespace(elem) |
100 | | - namespaces = variables.create_namespace_dict(elem, xml_namespace) |
101 | | - ns = True |
102 | | - elem.clear() |
103 | | - |
104 | | - elif event is end and elem.tag.endswith('title'): |
105 | | - title = variables.parse_title(elem) |
106 | | - article['title'] = title |
107 | | - current_namespace = variables.determine_namespace(title, namespaces, include_ns) |
108 | | - if current_namespace != False: |
109 | | - parse = True |
110 | | - article['namespace'] = current_namespace |
111 | | - cache.count_articles += 1 |
112 | | - elem.clear() |
113 | | - |
114 | | - elif elem.tag.endswith('revision') and parse == True: |
115 | | - if event is start: |
116 | | - clear = False |
117 | | - else: |
118 | | - print 'IMPLEMENT' |
119 | | - #md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size) |
120 | | - cache.count_revisions += 1 |
121 | | - clear = True |
122 | | - if clear: |
123 | | - elem.clear() |
124 | | - |
125 | | - elif event is end and elem.tag.endswith('page'): |
126 | | - elem.clear() |
127 | | - #Reset all variables for next article |
128 | | - id = False |
129 | | - parse = False |
130 | | - |
131 | | - except SyntaxError, error: |
132 | | - print 'Encountered invalid XML tag. Error message: %s' % error |
133 | | - dump(elem) |
134 | | - sys.exit(-1) |
135 | | - except IOError, error: |
136 | | - print '''Archive file is possibly corrupted. Please delete this archive |
137 | | - and retry downloading. Error message: %s''' % error |
138 | | - sys.exit(-1) |
139 | | - |
140 | | - filename = 'counts_kaggle_%s.csv' % file_id |
141 | | - fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8') |
142 | | - file_utils.write_dict_to_csv(counts, fh, keys) |
143 | | - fh.close() |
144 | | - |
145 | | - filename = 'counts_kaggle_%s.bin' % file_id |
146 | | - file_utils.store_object(counts, location, filename) |
147 | | - |
148 | | - |
149 | | -def parse_xml(fh, cache, rts): |
150 | | - bots, context, include_ns = setup_parser(rts) |
151 | | - article = {} |
152 | | - size = {} |
153 | | - id = False |
154 | | - ns = False |
155 | | - parse = False |
156 | | - |
157 | | - try: |
158 | | - for event, elem in context: |
159 | | - if event is end and elem.tag.endswith('siteinfo'): |
160 | | - xml_namespace = variables.determine_xml_namespace(elem) |
161 | | - namespaces = variables.create_namespace_dict(elem, xml_namespace) |
162 | | - ns = True |
163 | | - elem.clear() |
164 | | - |
165 | | - elif event is end and elem.tag.endswith('title'): |
166 | | - title = variables.parse_title(elem) |
167 | | - article['title'] = title |
168 | | - current_namespace = variables.determine_namespace(title, namespaces, include_ns) |
169 | | - title_meta = variables.parse_title_meta_data(title, current_namespace) |
170 | | - if current_namespace != False: |
171 | | - parse = True |
172 | | - article['namespace'] = current_namespace |
173 | | - cache.count_articles += 1 |
174 | | - md5hashes = deque() |
175 | | - elem.clear() |
176 | | - |
177 | | - elif elem.tag.endswith('revision') and parse == True: |
178 | | - if event is start: |
179 | | - clear = False |
180 | | - else: |
181 | | - md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size) |
182 | | - cache.count_revisions += 1 |
183 | | - clear = True |
184 | | - if clear: |
185 | | - elem.clear() |
186 | | - |
187 | | - elif event is end and elem.tag.endswith('id') and id == False: |
188 | | - article['article_id'] = elem.text |
189 | | - if current_namespace: |
190 | | - cache.articles[article['article_id']] = title_meta |
191 | | - id = True |
192 | | - elem.clear() |
193 | | - |
194 | | - elif event is end and elem.tag.endswith('page'): |
195 | | - elem.clear() |
196 | | - #Reset all variables for next article |
197 | | - article = {} |
198 | | - size = {} |
199 | | - md5hashes = deque() |
200 | | - id = False |
201 | | - parse = False |
202 | | - |
203 | | - |
204 | | - except SyntaxError, error: |
205 | | - print 'Encountered invalid XML tag. Error message: %s' % error |
206 | | - dump(elem) |
207 | | - sys.exit(-1) |
208 | | - except IOError, error: |
209 | | - print '''Archive file is possibly corrupted. Please delete this archive |
210 | | - and retry downloading. Error message: %s''' % error |
211 | | - sys.exit(-1) |
212 | | - |
213 | | - |
214 | | -def stream_raw_xml(input_queue, process_id, lock, rts): |
215 | | - t0 = datetime.datetime.now() |
216 | | - i = 0 |
217 | | - file_id = 0 |
218 | | - cache = buffer.CSVBuffer(process_id, rts, lock) |
219 | | - |
220 | | - while True: |
221 | | - filename = input_queue.get() |
222 | | - input_queue.task_done() |
223 | | - file_id += 1 |
224 | | - if filename == None: |
225 | | - print '%s files left in the queue' % input_queue.qsize() |
226 | | - break |
227 | | - |
228 | | - fh = file_utils.create_streaming_buffer(filename) |
229 | | - |
230 | | - if rts.kaggle: |
231 | | - datacompetition_count_edits(rts, file_id) |
232 | | - else: |
233 | | - parse_xml(fh, cache, rts) |
234 | | - |
235 | | - i += 1 |
236 | | - if i % 10000 == 0: |
237 | | - print 'Worker %s parsed %s articles' % (process_id, i) |
238 | | - fh.close() |
239 | | - |
240 | | - t1 = datetime.datetime.now() |
241 | | - print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0)) |
242 | | - print 'There are %s files left in the queue' % (input_queue.qsize()) |
243 | | - t0 = t1 |
244 | | - |
245 | | - if rts.kaggle: |
246 | | - cache.close() |
247 | | - cache.summary() |
248 | | - |
249 | | - |
250 | | -def debug(): |
251 | | - fh = 'c:\\wikimedia\sv\wiki\svwiki-latest-stub-meta-history.xml' |
252 | | - |
253 | | - |
254 | | -def launcher(rts): |
255 | | - lock = RLock() |
256 | | - mgr = Manager() |
257 | | - open_handles = [] |
258 | | - open_handles = mgr.list(open_handles) |
259 | | - clock = CustomLock(lock, open_handles) |
260 | | - input_queue = JoinableQueue() |
261 | | - |
262 | | - files = file_utils.retrieve_file_list(rts.input_location) |
263 | | - |
264 | | - if len(files) > cpu_count(): |
265 | | - processors = cpu_count() - 1 |
266 | | - else: |
267 | | - processors = len(files) |
268 | | - |
269 | | - for filename in files: |
270 | | - filename = os.path.join(rts.input_location, filename) |
271 | | - print filename |
272 | | - input_queue.put(filename) |
273 | | - |
274 | | - for x in xrange(processors): |
275 | | - print 'Inserting poison pill %s...' % x |
276 | | - input_queue.put(None) |
277 | | - |
278 | | - extracters = [Process(target=stream_raw_xml, args=[input_queue, process_id, |
279 | | - clock, rts]) |
280 | | - for process_id in xrange(processors)] |
281 | | - for extracter in extracters: |
282 | | - extracter.start() |
283 | | - |
284 | | - input_queue.join() |
285 | | - print 'Finished parsing Wikipedia dump files.' |
286 | | - |
287 | | - |
288 | | -if __name__ == '__main__': |
289 | | - debug() |
| 2 | +#!/usr/bin/python
|
| 3 | +# -*- coding: utf-8 -*-
|
| 4 | +'''
|
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com)
|
| 6 | +This program is free software; you can redistribute it and/or
|
| 7 | +modify it under the terms of the GNU General Public License version 2
|
| 8 | +as published by the Free Software Foundation.
|
| 9 | +This program is distributed in the hope that it will be useful,
|
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of
|
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
| 12 | +See the GNU General Public License for more details, at
|
| 13 | +http://www.fsf.org/licenses/gpl.html
|
| 14 | +'''
|
| 15 | +
|
| 16 | +
|
| 17 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ])
|
| 18 | +__author__email = 'dvanliere at gmail dot com'
|
| 19 | +__date__ = '2011-04-10'
|
| 20 | +__version__ = '0.1'
|
| 21 | +
|
| 22 | +
|
| 23 | +from collections import deque
|
| 24 | +import sys
|
| 25 | +import os
|
| 26 | +from datetime import datetime
|
| 27 | +from xml.etree.cElementTree import iterparse, dump
|
| 28 | +from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager
|
| 29 | +
|
| 30 | +if '..' not in sys.path:
|
| 31 | + sys.path.append('..')
|
| 32 | +
|
| 33 | +from etl import variables
|
| 34 | +from utils import file_utils
|
| 35 | +from classes import buffer
|
| 36 | +from analyses.adhoc import bot_detector
|
| 37 | +
|
| 38 | +def parse_revision(revision, article, xml_namespace, cache, bots, md5hashes, size):
|
| 39 | + if revision == None:
|
| 40 | + #the entire revision is empty, weird.
|
| 41 | + #dump(revision)
|
| 42 | + return md5hashes, size
|
| 43 | +
|
| 44 | + contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
|
| 45 | + contributor = variables.parse_contributor(contributor, bots, xml_namespace)
|
| 46 | + if not contributor:
|
| 47 | + #editor is anonymous, ignore
|
| 48 | + return md5hashes, size
|
| 49 | +
|
| 50 | + revision_id = revision.find('%s%s' % (xml_namespace, 'id'))
|
| 51 | + revision_id = variables.extract_revision_id(revision_id)
|
| 52 | + if revision_id == None:
|
| 53 | + #revision_id is missing, which is weird
|
| 54 | + return md5hashes, size
|
| 55 | +
|
| 56 | + article['revision_id'] = revision_id
|
| 57 | + text = variables.extract_revision_text(revision)
|
| 58 | + article.update(contributor)
|
| 59 | +
|
| 60 | + comment = variables.extract_comment_text(revision_id, revision)
|
| 61 | + cache.comments.update(comment)
|
| 62 | +
|
| 63 | + timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text
|
| 64 | + article['timestamp'] = timestamp
|
| 65 | +
|
| 66 | + hash = variables.create_md5hash(text)
|
| 67 | + revert = variables.is_revision_reverted(hash['hash'], md5hashes)
|
| 68 | + md5hashes.append(hash['hash'])
|
| 69 | + size = variables.calculate_delta_article_size(size, text)
|
| 70 | +
|
| 71 | + article.update(hash)
|
| 72 | + article.update(size)
|
| 73 | + article.update(revert)
|
| 74 | + cache.add(article)
|
| 75 | + return md5hashes, size
|
| 76 | +
|
| 77 | +def setup_parser(rts):
|
| 78 | + bots = bot_detector.retrieve_bots(rts.language.code)
|
| 79 | + include_ns = {3: 'User Talk',
|
| 80 | + 5: 'Wikipedia Talk',
|
| 81 | + 1: 'Talk',
|
| 82 | + 2: 'User',
|
| 83 | + 4: 'Wikipedia'}
|
| 84 | +
|
| 85 | + return bots, include_ns
|
| 86 | +
|
| 87 | +
|
| 88 | +def datacompetition_count_edits(fh, rts, file_id):
|
| 89 | + bots, include_ns = setup_parser(rts)
|
| 90 | +
|
| 91 | + start = 'start'; end = 'end'
|
| 92 | + context = iterparse(fh, events=(start, end))
|
| 93 | + context = iter(context)
|
| 94 | +
|
| 95 | + counts = {}
|
| 96 | + id = False
|
| 97 | + ns = False
|
| 98 | + parse = False
|
| 99 | +
|
| 100 | + try:
|
| 101 | + for event, elem in context:
|
| 102 | + if event is end and elem.tag.endswith('siteinfo'):
|
| 103 | + xml_namespace = variables.determine_xml_namespace(elem)
|
| 104 | + namespaces = variables.create_namespace_dict(elem, xml_namespace)
|
| 105 | + ns = True
|
| 106 | + elem.clear()
|
| 107 | +
|
| 108 | + elif event is end and elem.tag.endswith('title'):
|
| 109 | + title = variables.parse_title(elem)
|
| 110 | + article['title'] = title
|
| 111 | + current_namespace = variables.determine_namespace(title, namespaces, include_ns)
|
| 112 | + if current_namespace != False:
|
| 113 | + parse = True
|
| 114 | + article['namespace'] = current_namespace
|
| 115 | + cache.count_articles += 1
|
| 116 | + elem.clear()
|
| 117 | +
|
| 118 | + elif elem.tag.endswith('revision') and parse == True:
|
| 119 | + if event is start:
|
| 120 | + clear = False
|
| 121 | + else:
|
| 122 | + print 'IMPLEMENT'
|
| 123 | + #md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
|
| 124 | + cache.count_revisions += 1
|
| 125 | + clear = True
|
| 126 | + if clear:
|
| 127 | + elem.clear()
|
| 128 | +
|
| 129 | + elif event is end and elem.tag.endswith('page'):
|
| 130 | + elem.clear()
|
| 131 | + #Reset all variables for next article
|
| 132 | + id = False
|
| 133 | + parse = False
|
| 134 | +
|
| 135 | + except SyntaxError, error:
|
| 136 | + print 'Encountered invalid XML tag. Error message: %s' % error
|
| 137 | + dump(elem)
|
| 138 | + sys.exit(-1)
|
| 139 | + except IOError, error:
|
| 140 | + print '''Archive file is possibly corrupted. Please delete this archive
|
| 141 | + and retry downloading. Error message: %s''' % error
|
| 142 | + sys.exit(-1)
|
| 143 | +
|
| 144 | + filename = 'counts_kaggle_%s.csv' % file_id
|
| 145 | + fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8')
|
| 146 | + file_utils.write_dict_to_csv(counts, fh, keys)
|
| 147 | + fh.close()
|
| 148 | +
|
| 149 | + filename = 'counts_kaggle_%s.bin' % file_id
|
| 150 | + file_utils.store_object(counts, location, filename)
|
| 151 | +
|
| 152 | +
|
| 153 | +def parse_xml(fh, rts, cache, file_id):
|
| 154 | + bots, include_ns = setup_parser(rts)
|
| 155 | +
|
| 156 | + start = 'start'; end = 'end'
|
| 157 | + context = iterparse(fh, events=(start, end))
|
| 158 | + context = iter(context)
|
| 159 | +
|
| 160 | + article = {}
|
| 161 | + size = {}
|
| 162 | + id = False
|
| 163 | + ns = False
|
| 164 | + parse = False
|
| 165 | +
|
| 166 | + try:
|
| 167 | + for event, elem in context:
|
| 168 | + if event is end and elem.tag.endswith('siteinfo'):
|
| 169 | + xml_namespace = variables.determine_xml_namespace(elem)
|
| 170 | + namespaces = variables.create_namespace_dict(elem, xml_namespace)
|
| 171 | + ns = True
|
| 172 | + elem.clear()
|
| 173 | +
|
| 174 | + elif event is end and elem.tag.endswith('title'):
|
| 175 | + title = variables.parse_title(elem)
|
| 176 | + article['title'] = title
|
| 177 | + current_namespace = variables.determine_namespace(title, namespaces, include_ns)
|
| 178 | + title_meta = variables.parse_title_meta_data(title, current_namespace)
|
| 179 | + if isinstance(current_namespace, int):
|
| 180 | + parse = True
|
| 181 | + article['namespace'] = current_namespace
|
| 182 | + cache.count_articles += 1
|
| 183 | + if cache.count_articles % 10000 == 0:
|
| 184 | + print 'Worker %s parsed %s articles' % (file_id, cache.count_articles)
|
| 185 | + md5hashes = deque()
|
| 186 | + elem.clear()
|
| 187 | +
|
| 188 | + elif elem.tag.endswith('revision') and parse == True:
|
| 189 | + if event is start:
|
| 190 | + clear = False
|
| 191 | + else:
|
| 192 | + md5hashes, size = parse_revision(elem, article, xml_namespace, cache, bots, md5hashes, size)
|
| 193 | + cache.count_revisions += 1
|
| 194 | + clear = True
|
| 195 | + if clear:
|
| 196 | + elem.clear()
|
| 197 | +
|
| 198 | + elif event is end and elem.tag.endswith('id') and id == False:
|
| 199 | + article['article_id'] = elem.text
|
| 200 | + if current_namespace:
|
| 201 | + cache.articles[article['article_id']] = title_meta
|
| 202 | + id = True
|
| 203 | + elem.clear()
|
| 204 | +
|
| 205 | + elif event is end and elem.tag.endswith('page'):
|
| 206 | + elem.clear()
|
| 207 | + #Reset all variables for next article
|
| 208 | + article = {}
|
| 209 | + size = {}
|
| 210 | + md5hashes = deque()
|
| 211 | + id = False
|
| 212 | + parse = False
|
| 213 | +
|
| 214 | +
|
| 215 | + except SyntaxError, error:
|
| 216 | + print 'Encountered invalid XML tag. Error message: %s' % error
|
| 217 | + dump(elem)
|
| 218 | + sys.exit(-1)
|
| 219 | + except IOError, error:
|
| 220 | + print '''Archive file is possibly corrupted. Please delete this archive
|
| 221 | + and retry downloading. Error message: %s''' % error
|
| 222 | + sys.exit(-1)
|
| 223 | + print 'Finished parsing Wikipedia dump file.'
|
| 224 | +
|
| 225 | +
|
| 226 | +def stream_raw_xml(input_queue, process_id, lock, rts):
|
| 227 | + t0 = datetime.now()
|
| 228 | + file_id = 0
|
| 229 | + cache = buffer.CSVBuffer(process_id, rts, lock)
|
| 230 | +
|
| 231 | + while True:
|
| 232 | + filename = input_queue.get()
|
| 233 | + input_queue.task_done()
|
| 234 | + file_id += 1
|
| 235 | + if filename == None:
|
| 236 | + print '%s files left in the queue' % input_queue.qsize()
|
| 237 | + break
|
| 238 | +
|
| 239 | + print filename
|
| 240 | + fh = file_utils.create_streaming_buffer(filename)
|
| 241 | +
|
| 242 | + if rts.kaggle:
|
| 243 | + datacompetition_count_edits(fh, rts, file_id)
|
| 244 | + else:
|
| 245 | + parse_xml(fh, rts, cache, file_id)
|
| 246 | +
|
| 247 | + fh.close()
|
| 248 | +
|
| 249 | + t1 = datetime.now()
|
| 250 | + print 'Worker %s: Processing of %s took %s' % (process_id, filename, (t1 - t0))
|
| 251 | + print 'There are %s files left in the queue' % (input_queue.qsize())
|
| 252 | + t0 = t1
|
| 253 | +
|
| 254 | + if rts.kaggle:
|
| 255 | + cache.close()
|
| 256 | + cache.summary()
|
| 257 | +
|
| 258 | +
|
| 259 | +def debug():
|
| 260 | + fh = 'c:\\wikimedia\sv\wiki\svwiki-latest-stub-meta-history.xml'
|
| 261 | +
|
| 262 | +
|
| 263 | +def launcher(rts):
|
| 264 | + lock = RLock()
|
| 265 | + mgr = Manager()
|
| 266 | + open_handles = []
|
| 267 | + open_handles = mgr.list(open_handles)
|
| 268 | + clock = buffer.CustomLock(lock, open_handles)
|
| 269 | + input_queue = JoinableQueue()
|
| 270 | +
|
| 271 | + files = file_utils.retrieve_file_list(rts.input_location)
|
| 272 | +
|
| 273 | + if len(files) > cpu_count():
|
| 274 | + processors = cpu_count() - 1
|
| 275 | + else:
|
| 276 | + processors = len(files)
|
| 277 | +
|
| 278 | + for filename in files:
|
| 279 | + filename = os.path.join(rts.input_location, filename)
|
| 280 | + print filename
|
| 281 | + input_queue.put(filename)
|
| 282 | +
|
| 283 | + for x in xrange(processors):
|
| 284 | + print 'Inserting poison pill %s...' % x
|
| 285 | + input_queue.put(None)
|
| 286 | +
|
| 287 | + extracters = [Process(target=stream_raw_xml, args=[input_queue, process_id,
|
| 288 | + clock, rts])
|
| 289 | + for process_id in xrange(processors)]
|
| 290 | + for extracter in extracters:
|
| 291 | + extracter.start()
|
| 292 | +
|
| 293 | + input_queue.join()
|
| 294 | +
|
| 295 | +
|
| 296 | +
|
| 297 | +if __name__ == '__main__':
|
| 298 | + debug()
|
Index: trunk/tools/editor_trends/classes/buffer.py |
— | — | @@ -1,3 +1,29 @@ |
| 2 | +#!/usr/bin/python |
| 3 | +# -*- coding: utf-8 -*- |
| 4 | +''' |
| 5 | +Copyright (C) 2010 by Diederik van Liere (dvanliere@gmail.com) |
| 6 | +This program is free software; you can redistribute it and/or |
| 7 | +modify it under the terms of the GNU General Public License version 2 |
| 8 | +as published by the Free Software Foundation. |
| 9 | +This program is distributed in the hope that it will be useful, |
| 10 | +but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 12 | +See the GNU General Public License for more details, at |
| 13 | +http://www.fsf.org/licenses/gpl.html |
| 14 | +''' |
| 15 | + |
| 16 | + |
| 17 | +__author__ = '''\n'''.join(['Diederik van Liere (dvanliere@gmail.com)', ]) |
| 18 | +__author__email = 'dvanliere at gmail dot com' |
| 19 | +__date__ = '2011-04-11' |
| 20 | +__version__ = '0.1' |
| 21 | + |
| 22 | +import sys |
| 23 | +if '..' not in sys.path: |
| 24 | + sys.path.append('..') |
| 25 | + |
| 26 | +from utils import file_utils |
| 27 | + |
2 | 28 | class CustomLock: |
3 | 29 | def __init__(self, lock, open_handles): |
4 | 30 | self.lock = lock |
Index: trunk/tools/editor_trends/utils/file_utils.py |
— | — | @@ -202,8 +202,10 @@ |
203 | 203 | fh.write('\n') |
204 | 204 | |
205 | 205 | |
206 | | -def create_txt_filehandle(location, name, mode, encoding): |
207 | | - filename = construct_filename(name, '.csv') |
| 206 | +def create_txt_filehandle(location, filename, mode, encoding): |
| 207 | + filename = str(filename) |
| 208 | + if not filename.endswith('.csv'): |
| 209 | + filename = construct_filename(filename, '.csv') |
208 | 210 | path = os.path.join(location, filename) |
209 | 211 | return codecs.open(path, mode, encoding=encoding) |
210 | 212 | |