r84992 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r84991‎ | r84992 | r84993 >
Date:23:57, 29 March 2011
Author:diederik
Status:deferred
Tags:
Comment:
Added support for the following variables:
1) md5 hash
2) character count
3) delta character count
4) article title
5) Removed unzipping as stage, stream straight from gz or bz file now, should reduce processing time quite substantial.
This is still part of a large addition of functionality so it probably won't run smoothly (yet).
Modified paths:
  • /trunk/tools/editor_trends/etl/enricher.py (modified) (history)
  • /trunk/tools/editor_trends/etl/extracter.py (modified) (history)
  • /trunk/tools/editor_trends/manage.py (modified) (history)
  • /trunk/tools/editor_trends/utils/file_utils.py (modified) (history)

Diff [purge]

Index: trunk/tools/editor_trends/manage.py
@@ -34,7 +34,7 @@
3535 from utils import timer
3636 from database import db
3737 from etl import downloader
38 -from etl import extracter
 38+from etl import enricher
3939 from etl import store
4040 from etl import sort
4141 from etl import transformer
@@ -187,7 +187,6 @@
188188 %s' % ''.join([f + ',\n' for f in rts.file_choices]),
189189 default='stub-meta-history.xml.gz')
190190
191 -
192191 return project, language, parser
193192
194193
@@ -236,22 +235,20 @@
237236 rts.input_location = config.get('file_locations', 'input_location')
238237
239238
240 -
241 -
242 -def downloader_launcher(properties, logger):
 239+def downloader_launcher(rts, logger):
243240 '''
244241 This launcher calls the dump downloader to download a Wikimedia dump file.
245242 '''
246243 print 'Start downloading'
247244 stopwatch = timer.Timer()
248 - log.log_to_mongo(properties, 'dataset', 'download', stopwatch, event='start')
249 - res = downloader.launcher(properties, logger)
 245+ log.log_to_mongo(rts, 'dataset', 'download', stopwatch, event='start')
 246+ res = downloader.launcher(rts, logger)
250247 stopwatch.elapsed()
251 - log.log_to_mongo(properties, 'dataset', 'download', stopwatch, event='finish')
 248+ log.log_to_mongo(rts, 'dataset', 'download', stopwatch, event='finish')
252249 return res
253250
254251
255 -def extract_launcher(properties, logger):
 252+def extract_launcher(rts, logger):
256253 '''
257254 The extract launcher is used to extract the required variables from a dump
258255 file. If the zip file is a known archive then it will first launch the
@@ -259,10 +256,10 @@
260257 '''
261258 print 'Extracting data from XML'
262259 stopwatch = timer.Timer()
263 - log.log_to_mongo(properties, 'dataset', 'extract', stopwatch, event='start')
264 - extracter.launcher(properties)
 260+ log.log_to_mongo(rts, 'dataset', 'extract', stopwatch, event='start')
 261+ enricher.launcher(rts)
265262 stopwatch.elapsed()
266 - log.log_to_mongo(properties, 'dataset', 'extract', stopwatch, event='finish')
 263+ log.log_to_mongo(rts, 'dataset', 'extract', stopwatch, event='finish')
267264
268265
269266 def sort_launcher(rts, logger):
Index: trunk/tools/editor_trends/etl/enricher.py
@@ -18,21 +18,20 @@
1919 __version__ = '0.1'
2020
2121
22 -import bz2
2322 import os
2423 import hashlib
2524 import codecs
2625 import sys
2726 import datetime
2827 import progressbar
29 -from multiprocessing import JoinableQueue, Process, cpu_count, current_process
 28+from multiprocessing import JoinableQueue, Process, cpu_count, current_process, RLock
3029 from xml.etree.cElementTree import iterparse, dump
3130 from collections import deque
3231
 32+
3333 if '..' not in sys.path:
3434 sys.path.append('..')
3535
36 -
3736 try:
3837 from database import cassandra
3938 import pycassa
@@ -44,7 +43,6 @@
4544 from database import db
4645 from bots import detector
4746 from utils import file_utils
48 -import extracter
4947
5048 NAMESPACE = {
5149 #0:'Main',
@@ -84,12 +82,13 @@
8583
8684
8785 class Buffer:
88 - def __init__(self, storage, id):
 86+ def __init__(self, storage, id, rts=None, filehandles=None, locks=None):
8987 assert storage == 'cassandra' or storage == 'mongo' or storage == 'csv', \
9088 'Valid storage options are cassandra and mongo.'
9189 self.storage = storage
9290 self.revisions = {}
9391 self.comments = {}
 92+ self.titles = {}
9493 self.id = id
9594 self.keyspace_name = 'enwiki'
9695 self.keys = ['revision_id', 'article_id', 'id', 'namespace',
@@ -97,6 +96,12 @@
9897 'cur_size', 'delta']
9998 self.setup_storage()
10099 self.stats = Statistics()
 100+ if storage == 'csv':
 101+ self.rts = rts
 102+ self.lock1 = locks[0] #lock for generic data
 103+ self.lock2 = locks[1] #lock for comment data
 104+ self.lock3 = locks[2] #lock for article titles
 105+ self.filehandles = filehandles
101106
102107 def setup_storage(self):
103108 if self.storage == 'cassandra':
@@ -108,19 +113,43 @@
109114 self.collection = self.db['kaggle']
110115
111116 else:
112 - kaggle_file = 'kaggle_%s.csv' % self.id
113 - comment_file = 'kaggle_comments_%s.csv' % self.id
114 - file_utils.delete_file('', kaggle_file, directory=False)
 117+ title_file = 'titles.csv'
 118+ comment_file = 'comments.csv'
 119+ file_utils.delete_file('', title_file, directory=False)
115120 file_utils.delete_file('', comment_file, directory=False)
116 - self.fh_main = codecs.open(kaggle_file, 'a', 'utf-8')
117 - self.fh_extra = codecs.open(comment_file, 'a', 'utf-8')
 121+ self.fh_titles = codecs.open(title_file, 'a', 'utf-8')
 122+ self.fh_comments = codecs.open(comment_file, 'a', 'utf-8')
118123
 124+ def get_hash(self, id):
 125+ '''
 126+ A very simple hash function based on modulo. The except clause has been
 127+ added because there are instances where the username is stored in userid
 128+ tag and hence that's a string and not an integer.
 129+ '''
 130+ try:
 131+ return int(id) % self.rts.max_filehandles
 132+ except ValueError:
 133+ return sum([ord(i) for i in id]) % self.rts.max_filehandles
 134+
 135+ def group_observations(self, revisions):
 136+ '''
 137+ This function groups observation by editor id, this way we have to make
 138+ fewer fileopening calls.
 139+ '''
 140+ data = {}
 141+ for revision in revisions:
 142+ id = revision[0]
 143+ if id not in data:
 144+ data[id] = []
 145+ data[id].append(revision)
 146+ self.revisions = data
 147+
119148 def add(self, revision):
120149 self.stringify(revision)
121150 id = revision['revision_id']
122151 self.revisions[id] = revision
123152 if len(self.revisions) > 10000:
124 - print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions))
 153+ #print '%s: Emptying buffer %s - buffer size %s' % (datetime.datetime.now(), self.id, len(self.revisions))
125154 self.store()
126155 self.clear()
127156
@@ -142,6 +171,7 @@
143172 def clear(self):
144173 self.revisions = {}
145174 self.comments = {}
 175+ self.titles = {}
146176
147177 def store(self):
148178 if self.storage == 'cassandra':
@@ -149,21 +179,59 @@
150180 elif self.storage == 'mongo':
151181 print 'insert into mongo'
152182 else:
153 - for revision in self.revisions.itervalues():
 183+ rows = []
 184+ for id, revision in self.revisions.iteritems():
154185 values = []
155186 for key in self.keys:
156187 values.append(revision[key].decode('utf-8'))
 188+ values.insert(0, id)
 189+ rows.append(values)
 190+ self.write_output(rows)
157191
158 - value = '\t'.join(values) + '\n'
159 - row = '\t'.join([key, value])
160 - self.fh_main.write(row)
 192+ if self.comments:
 193+ self.lock2.acquire()
 194+ try:
 195+ for revision_id, comment in self.comments.iteritems():
 196+ comment = comment.decode('utf-8')
 197+ row = '\t'.join([revision_id, comment]) + '\n'
 198+ file_utils.write_list_to_csv(row, fh_comments, lock=self.lock2)
 199+ except:
 200+ pass
 201+ finally:
 202+ self.lock2.release()
161203
162 - for revision_id, comment in self.comments.iteritems():
163 - comment = comment.decode('utf-8')
164 - row = '\t'.join([revision_id, comment]) + '\n'
165 - self.fh_extra.write(row)
 204+ elif self.titles:
 205+ self.lock3.acquire()
 206+ try:
 207+ for article_id, title in self.titles.iteritems():
 208+ title = title.decode('utf-8')
 209+ row = '\t'.join([article_id, title]) + '\n'
 210+ file_utils.write_list_to_csv(row, fh_titles, lock=self.lock3)
 211+ except:
 212+ pass
 213+ finally:
 214+ self.lock3.release()
166215
167216
 217+ def write_output(self, data):
 218+ self.group_observations(data)
 219+ for editor in self.revisions:
 220+ #lock the write around all edits of an editor for a particular page
 221+ self.lock1.acquire()
 222+ try:
 223+ for i, revision in enumerate(self.revisions[editor]):
 224+ if i == 0:
 225+ id = self.get_hash(revision[2])
 226+ fh = self.filehandles[id]
 227+ try:
 228+ file_utils.write_list_to_csv(revision, fh, lock=self.lock1)
 229+ except Exception, error:
 230+ print 'Encountered the following error while writing data to %s: %s' % (error, fh)
 231+ finally:
 232+ self.lock1.release()
 233+
 234+
 235+
168236 def extract_categories():
169237 '''
170238 Field 1: page id
@@ -192,6 +260,34 @@
193261 fh.close()
194262
195263
 264+def validate_hostname(address):
 265+ '''
 266+ This is not a foolproof solution at all. The problem is that it's really
 267+ hard to determine whether a string is a hostname or not **reliably**. This
 268+ is a very fast rule of thumb. Will lead to false positives,
 269+ but that's life :)
 270+ '''
 271+ parts = address.split(".")
 272+ if len(parts) > 2:
 273+ return True
 274+ else:
 275+ return False
 276+
 277+
 278+def validate_ip(address):
 279+ parts = address.split(".")
 280+ if len(parts) != 4:
 281+ return False
 282+ parts = parts[:3]
 283+ for item in parts:
 284+ try:
 285+ if not 0 <= int(item) <= 255:
 286+ return False
 287+ except ValueError:
 288+ return False
 289+ return True
 290+
 291+
196292 def extract_revision_text(revision):
197293 rev = revision.find('ns0:text')
198294 if rev != None:
@@ -202,22 +298,22 @@
203299 return ''
204300
205301
206 -def extract_username(contributor):
207 - contributor = contributor.find('ns0:username')
 302+def extract_username(contributor, xml_namespace):
 303+ contributor = contributor.find('%s%s' % (xml_namespace, 'username'))
208304 if contributor != None:
209305 return contributor.text
210306 else:
211307 return None
212308
213309
214 -def determine_username_is_bot(contributor, bots):
 310+def determine_username_is_bot(contributor, bots, xml_namespace):
215311 '''
216312 #contributor is an xml element containing the id of the contributor
217313 @bots should have a dict with all the bot ids and bot names
218314 @Return False if username id is not in bot dict id or True if username id
219315 is a bot id.
220316 '''
221 - username = contributor.find('ns0:username')
 317+ username = contributor.find('%s%s' % (xml_namespace, 'username'))
222318 if username == None:
223319 return 0
224320 else:
@@ -227,23 +323,23 @@
228324 return 0
229325
230326
231 -def extract_contributor_id(contributor):
 327+def extract_contributor_id(revision, xml_namespace):
232328 '''
233329 @contributor is the xml contributor node containing a number of attributes
234330 Currently, we are only interested in registered contributors, hence we
235331 ignore anonymous editors.
236332 '''
237 - if contributor.get('deleted'):
 333+ if revision.get('deleted'):
238334 # ASK: Not sure if this is the best way to code deleted contributors.
239335 return None
240 - elem = contributor.find('ns0:id')
 336+ elem = revision.find('%s%s' % (xml_namespace, 'id'))
241337 if elem != None:
242338 return {'id':elem.text}
243339 else:
244 - elem = contributor.find('ns0:ip')
245 - if elem != None and elem.text != None \
246 - and validate_ip(elem.text) == False \
247 - and validate_hostname(elem.text) == False:
 340+ elem = revision.find('%s%s' % (xml_namespace, 'ip'))
 341+ if elem == None or elem.text == None:
 342+ return None
 343+ elif validate_ip(elem.text) == False and validate_hostname(elem.text) == False:
248344 return {'username':elem.text, 'id': elem.text}
249345 else:
250346 return None
@@ -280,11 +376,11 @@
281377 return size
282378
283379
284 -def parse_contributor(contributor, bots):
285 - username = extract_username(contributor)
286 - user_id = extract_contributor_id(contributor)
287 - bot = determine_username_is_bot(contributor, bots)
288 - contributor.clear()
 380+def parse_contributor(revision, bots, xml_namespace):
 381+ username = extract_username(revision, xml_namespace)
 382+ user_id = extract_contributor_id(revision, xml_namespace)
 383+ bot = determine_username_is_bot(revision, bots, xml_namespace)
 384+ revision.clear()
289385 editor = {}
290386 editor['username'] = username
291387 editor['bot'] = bot
@@ -333,6 +429,13 @@
334430 return revert
335431
336432
 433+def extract_revision_id(revision_id):
 434+ if revision_id != None:
 435+ return revision_id.text
 436+ else:
 437+ return None
 438+
 439+
337440 def extract_comment_text(revision_id, revision):
338441 comment = {}
339442 text = revision.find('comment')
@@ -341,11 +444,10 @@
342445 return comment
343446
344447
345 -def count_edits(article, counts, bots):
 448+def count_edits(article, counts, bots, xml_namespace):
346449 namespaces = {}
347450 title = article['title'].text
348451 namespace = determine_namespace(title, namespaces)
349 - xml_namespace = '{http://www.mediawiki.org/xml/export-0.4/}'
350452 if namespace != False:
351453 article_id = article['id'].text
352454 revisions = article['revisions']
@@ -353,6 +455,7 @@
354456 if revision == None:
355457 #the entire revision is empty, weird.
356458 continue
 459+
357460 contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
358461 contributor = parse_contributor(contributor, bots)
359462 if not contributor:
@@ -366,12 +469,12 @@
367470 return counts
368471
369472
370 -def create_variables(article, cache, bots):
 473+def create_variables(article, cache, bots, xml_namespace):
371474 namespaces = {'User': 2,
372475 'Talk': 1,
373476 'User Talk': 3,
374477 }
375 - title = article['title']
 478+ title = article['title'].text
376479 namespace = determine_namespace(title, namespaces)
377480
378481 if namespace != False:
@@ -386,14 +489,15 @@
387490 if revision == None:
388491 #the entire revision is empty, weird.
389492 continue
390 - contributor = revision.find('ns0:contributor')
391 - contributor = parse_contributor(contributor, bots)
 493+ #dump(revision)
 494+ contributor = revision.find('%s%s' % (xml_namespace, 'contributor'))
 495+ contributor = parse_contributor(contributor, bots, xml_namespace)
392496 if not contributor:
393497 #editor is anonymous, ignore
394498 continue
395499
396 - revision_id = revision.find('ns0:id')
397 - revision_id = extracter.extract_revision_id(revision_id)
 500+ revision_id = revision.find('%s%s' % (xml_namespace, 'id'))
 501+ revision_id = extract_revision_id(revision_id)
398502 if revision_id == None:
399503 #revision_id is missing, which is weird
400504 continue
@@ -406,7 +510,7 @@
407511 comment = extract_comment_text(revision_id, revision)
408512 cache.comments.update(comment)
409513
410 - timestamp = revision.find('ns0:timestamp').text
 514+ timestamp = revision.find('%s%s' % (xml_namespace, 'timestamp')).text
411515 row['timestamp'] = timestamp
412516
413517 hash = create_md5hash(text)
@@ -417,43 +521,55 @@
418522 row.update(hash)
419523 row.update(size)
420524 row.update(revert)
 525+ cache.add(row)
421526 revision.clear()
422 - cache.add(row)
423527
424528
425 -def parse_xml(fh):
 529+
 530+def parse_xml(fh, xml_namespace):
426531 context = iterparse(fh, events=('end',))
427532 context = iter(context)
428533
429534 article = {}
430535 article['revisions'] = []
431536 id = False
432 - namespace = '{http://www.mediawiki.org/xml/export-0.4/}'
433537
434538 for event, elem in context:
435 - if event == 'end' and elem.tag == '%s%s' % (namespace, 'title'):
 539+ if event == 'end' and elem.tag == '%s%s' % (xml_namespace, 'title'):
436540 article['title'] = elem
437 - elif event == 'end' and elem.tag == '%s%s' % (namespace, 'revision'):
 541+ elif event == 'end' and elem.tag == '%s%s' % (xml_namespace, 'revision'):
438542 article['revisions'].append(elem)
439 - elif event == 'end' and elem.tag == '%s%s' % (namespace, 'id') and id == False:
 543+ elif event == 'end' and elem.tag == '%s%s' % (xml_namespace, 'id') and id == False:
440544 article['id'] = elem
441545 id = True
442 - elif event == 'end' and elem.tag == '%s%s' % (namespace, 'page'):
 546+ elif event == 'end' and elem.tag == '%s%s' % (xml_namespace, 'page'):
443547 yield article
444548 elem.clear()
445549 article = {}
446550 article['revisions'] = []
447551 id = False
448 - elif event == 'end':
449 - elem.clear()
 552+ #elif event == 'end':
 553+ # elem.clear()
450554
451555
452 -def stream_raw_xml(input_queue, storage, id, function, dataset):
 556+def delayediter(iterable):
 557+ iterable = iter(iterable)
 558+ prev = iterable.next()
 559+ for item in iterable:
 560+ yield prev
 561+ prev = item
 562+ yield prev
 563+
 564+def stream_raw_xml(input_queue, storage, id, function, dataset, locks, rts):
453565 bots = detector.retrieve_bots('en')
 566+ xml_namespace = '{http://www.mediawiki.org/xml/export-0.4/}'
 567+ path = os.path.join(rts.location, 'txt')
 568+ filehandles = [file_utils.create_txt_filehandle(path, '%s.csv' % fh, 'a',
 569+ rts.encoding) for fh in xrange(rts.max_filehandles)]
454570 t0 = datetime.datetime.now()
455571 i = 0
456572 if dataset == 'training':
457 - cache = Buffer(storage, id)
 573+ cache = Buffer(storage, id, rts, filehandles, locks)
458574 else:
459575 counts = {}
460576
@@ -463,12 +579,12 @@
464580 if filename == None:
465581 break
466582
467 - fh = bz2.BZ2File(filename, 'rb')
468 - for article in parse_xml(fh):
 583+ fh = file_utils.create_streaming_buffer(filename)
 584+ for article in parse_xml(fh, xml_namespace):
469585 if dataset == 'training':
470 - function(article, cache, bots)
471 - else:
472 - counts = function(article, counts, bots)
 586+ function(article, cache, bots, xml_namespace)
 587+ elif dataset == 'prediction':
 588+ counts = function(article, counts, bots, xml_namespace)
473589 i += 1
474590 if i % 10000 == 0:
475591 print 'Worker %s parsed %s articles' % (id, i)
@@ -488,19 +604,27 @@
489605 file_utils.store_object(counts, location, filename)
490606
491607
492 -def setup(storage):
 608+def setup(storage, rts=None):
493609 keyspace_name = 'enwiki'
494610 if storage == 'cassandra':
495611 cassandra.install_schema(keyspace_name, drop_first=True)
 612+ elif storage == 'csv':
 613+ output_articles = os.path.join(rts.input_location, rts.language.code,
 614+ rts.project.name)
 615+ output_txt = os.path.join(rts.input_location, rts.language.code,
 616+ rts.project.name, 'txt')
 617+ res = file_utils.delete_file(output_articles, 'articles.csv')
 618+ res = file_utils.delete_file(output_txt, None, directory=True)
 619+ if res:
 620+ res = file_utils.create_directory(output_txt)
496621
497622
498 -def launcher(function, path, dataset, storage, processors):
499 - setup(storage)
 623+def multiprocessor_launcher(function, path, dataset, storage, processors, extension, locks=None, rts=None):
500624 input_queue = JoinableQueue()
501625 #files = ['C:\\Users\\diederik.vanliere\\Downloads\\enwiki-latest-pages-articles1.xml.bz2']
502626 #files = ['/home/diederik/kaggle/enwiki-20100904-pages-meta-history2.xml.bz2']
503627
504 - files = file_utils.retrieve_file_list(path, 'bz2')
 628+ files = file_utils.retrieve_file_list(path, extension)
505629
506630 for file in files:
507631 filename = os.path.join(path, file)
@@ -510,7 +634,7 @@
511635 for x in xrange(processors):
512636 input_queue.put(None)
513637
514 - extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, id, function, dataset])
 638+ extracters = [Process(target=stream_raw_xml, args=[input_queue, storage, id, function, dataset, locks, rts])
515639 for id in xrange(processors)]
516640 for extracter in extracters:
517641 extracter.start()
@@ -534,19 +658,40 @@
535659 storage = 'csv'
536660 dataset = 'training'
537661 processors = cpu_count()
538 - launcher(function, path, dataset, storage, processors)
 662+ extension = 'bz2'
 663+ setup(storage)
 664+ multiprocessor_launcher(function, path, dataset, storage, processors, extension)
539665
540666
541667 def launcher_prediction():
542668 # launcher for creating test data
543 - path = '/media/wikipedia_dumps/batch1/'
 669+ path = '/mnt/wikipedia_dumps/batch1/'
544670 function = count_edits
545671 storage = 'csv'
546672 dataset = 'prediction'
547673 processors = 7
548 - launcher(function, path, dataset, storage, processors)
 674+ extension = 'bz2'
 675+ setup(storage)
 676+ multiprocessor_launcher(function, path, dataset, storage, processors, extension)
549677
550678
 679+def launcher(rts):
 680+ # launcher for creating regular mongo dataset
 681+ path = rts.location
 682+ function = create_variables
 683+ storage = 'csv'
 684+ dataset = 'training'
 685+ processors = 1
 686+ extension = 'gz'
 687+ lock1 = RLock()
 688+ lock2 = RLock()
 689+ lock3 = RLock()
 690+ locks = [lock1, lock2, lock3]
 691+ setup(storage, rts)
 692+ multiprocessor_launcher(function, path, dataset, storage, processors, extension, locks, rts)
 693+
 694+
551695 if __name__ == '__main__':
552696 #launcher_training()
553 - launcher_prediction()
 697+ #launcher_prediction()
 698+ launcher()
Index: trunk/tools/editor_trends/etl/extracter.py
@@ -188,10 +188,21 @@
189189
190190 def parse_title(title):
191191 title_data = {}
192 - if type(title.text) == type('str'):
193 - title_data['title'] = title.text.decode('utf-8')
 192+ t1 = type(title.text)
 193+ if type(title.text) != type('str'):
 194+ print 'encodign'
 195+ print title.text.encode('utf-8')
 196+ title = title.text.encode('utf-8')
 197+ title = title.decode('utf-8', 'ignore')
 198+ print title
194199 else:
195 - title_data['title'] = title.text
 200+ title = title.text
 201+
 202+ #title = title.encode('utf-8')
 203+ title_data['title'] = title #.decode('utf-8')
 204+ t2 = type(title_data['title'])
 205+ print t1, t2
 206+ #title_data['title'] = title
196207 if title_data['title'].startswith('List of'):
197208 title_data['list'] = True
198209 else:
@@ -268,7 +279,6 @@
269280 widgets = log.init_progressbar_widgets('Extracting data')
270281 filehandles = [file_utils.create_txt_filehandle(output, '%s.csv' % fh, 'a',
271282 rts.encoding) for fh in xrange(rts.max_filehandles)]
272 -
273283 while True:
274284 total, processed = 0.0, 0.0
275285 try:
@@ -286,7 +296,7 @@
287297 filesize = file_utils.determine_filesize(location, filename)
288298 print 'Opening %s...' % (os.path.join(location, filename))
289299 print 'Filesize: %s' % filesize
290 - fh1 = file_utils.create_txt_filehandle(location, filename, 'r', rts.encoding)
 300+ fh1 = file_utils.create_txt_filehandle(location, filename, 'r', 'ascii')
291301 fh2 = file_utils.create_txt_filehandle(location, 'articles.csv', 'a', rts.encoding)
292302 ns, xml_namespace = wikitree.parser.extract_meta_information(fh1)
293303 ns = build_namespaces_locale(ns, rts.namespaces)
@@ -305,7 +315,7 @@
306316 output = output_editor_information(revisions, article_id, bot_ids, rts)
307317 output = add_namespace_to_output(output, namespace)
308318 write_output(output, filehandles, lock, rts)
309 - file_utils.write_list_to_csv([article_id, title.values()], fh2)
 319+ #file_utils.write_list_to_csv([article_id, title.values()], fh2, newline=False, lock=lock)
310320 processed += 1
311321 page.clear()
312322 pbar.update(pbar.currval + article_size)
@@ -366,11 +376,15 @@
367377 return sum([ord(i) for i in id]) % rts.max_filehandles
368378
369379
370 -def prepare(output):
371 - res = file_utils.delete_file(output, 'articles.csv')
372 - res = file_utils.delete_file(output, None, directory=True)
 380+def prepare(rts):
 381+ output_articles = os.path.join(rts.input_location, rts.language.code,
 382+ rts.project.name)
 383+ output_txt = os.path.join(rts.input_location, rts.language.code,
 384+ rts.project.name, 'txt')
 385+ res = file_utils.delete_file(output_articles, 'articles.csv')
 386+ res = file_utils.delete_file(output_txt, None, directory=True)
373387 if res:
374 - res = file_utils.create_directory(output)
 388+ res = file_utils.create_directory(output_txt)
375389 return res
376390
377391
@@ -415,9 +429,7 @@
416430 if not tasks:
417431 return False
418432
419 - output = os.path.join(rts.input_location, rts.language.code,
420 - rts.project.name, 'txt')
421 - result = prepare(output)
 433+ result = prepare(rts)
422434 if not result:
423435 return result
424436
@@ -448,10 +460,11 @@
449461
450462
451463 def debug():
452 - project = 'wiki'
453 - language_code = 'sv'
454 - filename = 'svwiki-latest-stub-meta-history.xml'
455 - parse_dumpfile(project, filename, language_code)
 464+ pass
 465+ #project = 'wiki'
 466+ #language_code = 'sv'
 467+ #filename = 'svwiki-latest-stub-meta-history.xml'
 468+ #parse_dumpfile(project, filename, language_code)
456469
457470
458471 if __name__ == '__main__':
Index: trunk/tools/editor_trends/utils/file_utils.py
@@ -23,6 +23,8 @@
2424 and track error messages.
2525 '''
2626
 27+import bz2
 28+import gzip
2729 import re
2830 import htmlentitydefs
2931 import time
@@ -142,8 +144,6 @@
143145 The calling function is responsible for:
144146 1) closing the filehandle
145147 '''
146 -
147 -
148148 tab = False
149149 wrote_newline = None
150150 if recursive:
@@ -207,6 +207,16 @@
208208 return codecs.open(path, mode, encoding=encoding)
209209
210210
 211+def create_streaming_buffer(path):
 212+ extension = determine_file_extension(path)
 213+ if extension == 'gz':
 214+ fh = gzip.GzipFile(path, 'rb')
 215+ elif extension == 'bz':
 216+ fh = bz2.BZ2File(path, 'rb')
 217+ else:
 218+ raise exceptions.CompressedFileNotSupported(extension)
 219+ return fh
 220+
211221 def create_binary_filehandle(location, filename, mode):
212222 path = os.path.join(location, filename)
213223 return open(path, mode)