Index: trunk/tools/editor_trends/etl/variables.py |
— | — | @@ -72,11 +72,10 @@ |
73 | 73 | return title_meta |
74 | 74 | |
75 | 75 | title_meta['title'] = title |
76 | | - ns = namespace['namespace'] |
77 | | - title_meta['ns'] = ns |
| 76 | + title_meta['ns'] = namespace |
78 | 77 | if title.startswith('List of'): |
79 | 78 | title_meta['category'] = 'List' |
80 | | - elif ns == 4 or ns == 5: |
| 79 | + elif namespace == 4 or namespace == 5: |
81 | 80 | if title.find('Articles for deletion') > -1: |
82 | 81 | title_meta['category'] = 'Deletion' |
83 | 82 | elif title.find('Mediation Committee') > -1: |
— | — | @@ -200,22 +199,18 @@ |
201 | 200 | You can only determine whether an article belongs to the Main Namespace |
202 | 201 | by ruling out that it does not belong to any other namepace |
203 | 202 | ''' |
204 | | - ns = {} |
205 | 203 | if title != None: |
206 | 204 | for key in include_ns: |
207 | | - namespace = namespaces.pop(key, None) |
| 205 | + namespace = namespaces.get(key, None) |
208 | 206 | if namespace and title.startswith(namespace): |
209 | | - ns['namespace'] = key |
210 | | - if ns == {}: |
211 | | - for namespace in namespaces.itervalues(): |
212 | | - if namespace and title.startswith(namespace): |
213 | | - '''article does not belong to any of the include_ns |
214 | | - namespaces''' |
215 | | - return False |
216 | | - ns = 0 |
| 207 | + return key |
| 208 | + for key, namespace in namespaces.iteritems(): |
| 209 | + if namespace and title.startswith(namespace): |
| 210 | + '''article does not belong to any of the include_ns namespaces''' |
| 211 | + return key |
| 212 | + return 0 |
217 | 213 | else: |
218 | | - ns = False |
219 | | - return ns |
| 214 | + return 999 |
220 | 215 | |
221 | 216 | |
222 | 217 | def is_revision_reverted(hash_cur, hashes): |
Index: trunk/tools/editor_trends/etl/extracter.py |
— | — | @@ -18,13 +18,12 @@ |
19 | 19 | __date__ = '2011-04-10'
|
20 | 20 | __version__ = '0.1'
|
21 | 21 |
|
22 | | -import itertools
|
23 | 22 | from collections import deque
|
24 | 23 | import sys
|
25 | 24 | import os
|
26 | 25 | from datetime import datetime
|
27 | 26 | from xml.etree.cElementTree import iterparse, dump
|
28 | | -from multiprocessing import JoinableQueue, Process, cpu_count, RLock, Manager
|
| 27 | +from multiprocessing import JoinableQueue, Process, cpu_count, Lock, Manager
|
29 | 28 |
|
30 | 29 | if '..' not in sys.path:
|
31 | 30 | sys.path.append('..')
|
— | — | @@ -105,80 +104,80 @@ |
106 | 105 | return counts
|
107 | 106 |
|
108 | 107 |
|
109 | | -def datacompetition_count_edits(fh, rts, process_id, file_id):
|
110 | | - '''
|
111 | | - This function counts for every editor the total number of edits that person
|
112 | | - made. It follows the same logic as the parse_xml function although it
|
113 | | - skips a bunch of extraction phases that are not relevant for counting
|
114 | | - edits. This function is only to be used to create the prediction dataset
|
115 | | - for the datacompetition.
|
116 | | - '''
|
117 | | - bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
|
118 | | - include_ns = {}
|
| 108 | +#def datacompetition_count_edits(fh, rts, process_id, file_id):
|
| 109 | +# '''
|
| 110 | +# This function counts for every editor the total number of edits that person
|
| 111 | +# made. It follows the same logic as the parse_xml function although it
|
| 112 | +# skips a bunch of extraction phases that are not relevant for counting
|
| 113 | +# edits. This function is only to be used to create the prediction dataset
|
| 114 | +# for the datacompetition.
|
| 115 | +# '''
|
| 116 | +# bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
|
| 117 | +# include_ns = {}
|
| 118 | +#
|
| 119 | +# start = 'start'; end = 'end'
|
| 120 | +# context = iterparse(fh, events=(start, end))
|
| 121 | +# context = iter(context)
|
| 122 | +#
|
| 123 | +# counts = {}
|
| 124 | +# id = False
|
| 125 | +# ns = False
|
| 126 | +# parse = False
|
| 127 | +# count_articles = 0
|
| 128 | +#
|
| 129 | +# try:
|
| 130 | +# for event, elem in context:
|
| 131 | +# if event is end and elem.tag.endswith('siteinfo'):
|
| 132 | +# xml_namespace = variables.determine_xml_namespace(elem)
|
| 133 | +# namespaces = variables.create_namespace_dict(elem, xml_namespace)
|
| 134 | +# ns = True
|
| 135 | +# elem.clear()
|
| 136 | +#
|
| 137 | +# elif event is end and elem.tag.endswith('title'):
|
| 138 | +# title = variables.parse_title(elem)
|
| 139 | +# current_namespace = variables.determine_namespace(title, namespaces, include_ns)
|
| 140 | +# if isinstance(current_namespace, int):
|
| 141 | +# parse = True
|
| 142 | +# count_articles += 1
|
| 143 | +# if count_articles % 10000 == 0:
|
| 144 | +# print 'Worker %s parsed %s articles' % (process_id, count_articles)
|
| 145 | +#
|
| 146 | +# elem.clear()
|
| 147 | +#
|
| 148 | +# elif elem.tag.endswith('revision') and parse == True:
|
| 149 | +# if event is start:
|
| 150 | +# clear = False
|
| 151 | +# else:
|
| 152 | +# counts = datacompetition_parse_revision(elem, xml_namespace, bots, counts)
|
| 153 | +# clear = True
|
| 154 | +# if clear:
|
| 155 | +# elem.clear()
|
| 156 | +#
|
| 157 | +# elif event is end and elem.tag.endswith('page'):
|
| 158 | +# elem.clear()
|
| 159 | +# #Reset all variables for next article
|
| 160 | +# id = False
|
| 161 | +# parse = False
|
| 162 | +#
|
| 163 | +# except SyntaxError, error:
|
| 164 | +# print 'Encountered invalid XML tag. Error message: %s' % error
|
| 165 | +# dump(elem)
|
| 166 | +# sys.exit(-1)
|
| 167 | +# except IOError, error:
|
| 168 | +# print '''Archive file is possibly corrupted. Please delete this archive
|
| 169 | +# and retry downloading. Error message: %s''' % error
|
| 170 | +# sys.exit(-1)
|
| 171 | +# except Exception, error:
|
| 172 | +# print error
|
| 173 | +#
|
| 174 | +# filename = 'counts_kaggle_%s.csv' % file_id
|
| 175 | +# keys = counts.keys()
|
| 176 | +# fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8')
|
| 177 | +# file_utils.write_dict_to_csv(counts, fh, keys)
|
| 178 | +# fh.close()
|
| 179 | +# counts = {}
|
119 | 180 |
|
120 | | - start = 'start'; end = 'end'
|
121 | | - context = iterparse(fh, events=(start, end))
|
122 | | - context = iter(context)
|
123 | 181 |
|
124 | | - counts = {}
|
125 | | - id = False
|
126 | | - ns = False
|
127 | | - parse = False
|
128 | | - count_articles = 0
|
129 | | -
|
130 | | - try:
|
131 | | - for event, elem in context:
|
132 | | - if event is end and elem.tag.endswith('siteinfo'):
|
133 | | - xml_namespace = variables.determine_xml_namespace(elem)
|
134 | | - namespaces = variables.create_namespace_dict(elem, xml_namespace)
|
135 | | - ns = True
|
136 | | - elem.clear()
|
137 | | -
|
138 | | - elif event is end and elem.tag.endswith('title'):
|
139 | | - title = variables.parse_title(elem)
|
140 | | - current_namespace = variables.determine_namespace(title, namespaces, include_ns)
|
141 | | - if isinstance(current_namespace, int):
|
142 | | - parse = True
|
143 | | - count_articles += 1
|
144 | | - if count_articles % 10000 == 0:
|
145 | | - print 'Worker %s parsed %s articles' % (process_id, count_articles)
|
146 | | -
|
147 | | - elem.clear()
|
148 | | -
|
149 | | - elif elem.tag.endswith('revision') and parse == True:
|
150 | | - if event is start:
|
151 | | - clear = False
|
152 | | - else:
|
153 | | - counts = datacompetition_parse_revision(elem, xml_namespace, bots, counts)
|
154 | | - clear = True
|
155 | | - if clear:
|
156 | | - elem.clear()
|
157 | | -
|
158 | | - elif event is end and elem.tag.endswith('page'):
|
159 | | - elem.clear()
|
160 | | - #Reset all variables for next article
|
161 | | - id = False
|
162 | | - parse = False
|
163 | | -
|
164 | | - except SyntaxError, error:
|
165 | | - print 'Encountered invalid XML tag. Error message: %s' % error
|
166 | | - dump(elem)
|
167 | | - sys.exit(-1)
|
168 | | - except IOError, error:
|
169 | | - print '''Archive file is possibly corrupted. Please delete this archive
|
170 | | - and retry downloading. Error message: %s''' % error
|
171 | | - sys.exit(-1)
|
172 | | - except Exception, error:
|
173 | | - print error
|
174 | | -
|
175 | | - filename = 'counts_kaggle_%s.csv' % file_id
|
176 | | - keys = counts.keys()
|
177 | | - fh = file_utils.create_txt_filehandle(rts.txt, filename, 'w', 'utf-8')
|
178 | | - file_utils.write_dict_to_csv(counts, fh, keys)
|
179 | | - fh.close()
|
180 | | - counts = {}
|
181 | | -
|
182 | | -
|
183 | 182 | def parse_xml(fh, rts, cache, process_id, file_id):
|
184 | 183 | bots = bot_detector.retrieve_bots(rts.storage, rts.language.code)
|
185 | 184 | include_ns = {3: 'User Talk',
|
— | — | @@ -210,7 +209,7 @@ |
211 | 210 | article['title'] = title
|
212 | 211 | current_namespace = variables.determine_namespace(title, namespaces, include_ns)
|
213 | 212 | title_meta = variables.parse_title_meta_data(title, current_namespace)
|
214 | | - if isinstance(current_namespace, int):
|
| 213 | + if current_namespace < 6:
|
215 | 214 | parse = True
|
216 | 215 | article['namespace'] = current_namespace
|
217 | 216 | cache.count_articles += 1
|
— | — | @@ -257,11 +256,11 @@ |
258 | 257 | print 'Finished parsing Wikipedia dump file.'
|
259 | 258 |
|
260 | 259 |
|
261 | | -def stream_raw_xml(input_queue, process_id, lock, rts):
|
| 260 | +def stream_raw_xml(input_queue, process_id, fhd, rts):
|
262 | 261 | t0 = datetime.now()
|
263 | 262 | file_id = 0
|
264 | 263 | if not rts.kaggle:
|
265 | | - cache = buffer.CSVBuffer(process_id, rts, lock)
|
| 264 | + cache = buffer.CSVBuffer(process_id, rts, fhd)
|
266 | 265 |
|
267 | 266 | while True:
|
268 | 267 | filename = input_queue.get()
|
— | — | @@ -296,22 +295,17 @@ |
297 | 296 |
|
298 | 297 |
|
299 | 298 | def launcher(rts):
|
300 | | - lock = RLock()
|
301 | | - mgr = Manager()
|
302 | | - open_handles = []
|
303 | | - open_handles = mgr.list(open_handles)
|
304 | | - clock = buffer.CustomLock(lock, open_handles)
|
305 | 299 | input_queue = JoinableQueue()
|
306 | 300 |
|
307 | 301 | files = file_utils.retrieve_file_list(rts.input_location)
|
308 | 302 |
|
309 | | - if rts.kaggle:
|
310 | | - processors = 4
|
311 | | - elif len(files) > cpu_count():
|
312 | | - processors = cpu_count() - 1
|
| 303 | + if len(files) > cpu_count():
|
| 304 | + processors = cpu_count()
|
313 | 305 | else:
|
314 | 306 | processors = len(files)
|
315 | 307 |
|
| 308 | + fhd = buffer.FileHandleDistributor(rts.max_filehandles, processors)
|
| 309 | +
|
316 | 310 | for filename in files:
|
317 | 311 | filename = os.path.join(rts.input_location, filename)
|
318 | 312 | print filename
|
— | — | @@ -322,7 +316,7 @@ |
323 | 317 | input_queue.put(None)
|
324 | 318 |
|
325 | 319 | extracters = [Process(target=stream_raw_xml, args=[input_queue, process_id,
|
326 | | - clock, rts])
|
| 320 | + fhd, rts])
|
327 | 321 | for process_id in xrange(processors)]
|
328 | 322 | for extracter in extracters:
|
329 | 323 | extracter.start()
|