r107870 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r107869‎ | r107870 | r107871 >
Date:09:06, 3 January 2012
Author:ariel
Status:deferred
Tags:
Comment:
add support for creation of a multiple bz2 stream dump of articles
Modified paths:
  • /branches/ariel/xmldumps-backup/WikiDump.py (modified) (history)
  • /branches/ariel/xmldumps-backup/worker.py (modified) (history)

Diff [purge]

Index: branches/ariel/xmldumps-backup/WikiDump.py
@@ -203,6 +203,7 @@
204204 "grep": "/bin/grep",
205205 "checkforbz2footer": "/usr/local/bin/checkforbz2footer",
206206 "writeuptopageid": "/usr/local/bin/writeuptopageid",
 207+ "recompressxml": "/usr/local/bin/recompressxml",
207208 #"cleanup": {
208209 "keep": "3",
209210 #"chunks": {
@@ -284,6 +285,7 @@
285286 self.grep = self.conf.get("tools", "grep")
286287 self.checkforbz2footer = self.conf.get("tools","checkforbz2footer")
287288 self.writeuptopageid = self.conf.get("tools","writeuptopageid")
 289+ self.recompressxml = self.conf.get("tools","recompressxml")
288290
289291 if not self.conf.has_section('cleanup'):
290292 self.conf.add_section('cleanup')
Index: branches/ariel/xmldumps-backup/worker.py
@@ -688,6 +688,19 @@
689689 "Recombine all pages with complete edit history (.7z)",
690690 "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
691691 "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistory7zdump'), self.wiki))
 692+ # doing this only for recombined/full articles dump
 693+ if (self.chunkInfo.chunksEnabled()):
 694+ inputForMultistream = "articlesdumprecombine"
 695+ else:
 696+ inputForMultistream = "articlesdump"
 697+ self.dumpItems.append(
 698+ XmlMultiStreamDump("articles",
 699+ "articlesmultistreamdump",
 700+ "Articles, templates, media/file descriptions, and primary meta-pages, in multiple bz2 streams, 100 pages per stream",
 701+ "This contains current versions of article content, in concatenated bz2 streams, 100 pages per stream, plus a separate" +
 702+ "index of page titles/ids and offsets into the file. Useful for offline readers, or for parallel processing of pages.",
 703+ self.findItemByName(inputForMultistream), self.wiki, None))
 704+
692705 results = self._runInfoFile.getOldRunInfoFromFile()
693706 if (results):
694707 for runInfoObj in results:
@@ -3326,6 +3339,161 @@
33273340 if (error):
33283341 raise BackupError("error recombining xml bz2 files")
33293342
 3343+class XmlMultiStreamDump(XmlDump):
 3344+#class XmlRecompressDump(Dump):
 3345+ """Take a .bz2 and recompress it as multistream bz2, 100 pages per stream."""
 3346+
 3347+ def __init__(self, subset, name, desc, detail, itemForRecompression, wiki, chunkToDo, chunks = False, checkpoints = False, checkpointFile = None):
 3348+ self._subset = subset
 3349+ self._detail = detail
 3350+ self._chunks = chunks
 3351+ if self._chunks:
 3352+ self._chunksEnabled = True
 3353+ self._chunkToDo = chunkToDo
 3354+ self.wiki = wiki
 3355+ self.itemForRecompression = itemForRecompression
 3356+ if checkpoints:
 3357+ self._checkpointsEnabled = True
 3358+ self.checkpointFile = checkpointFile
 3359+ Dump.__init__(self, name, desc)
 3360+
 3361+ def getDumpName(self):
 3362+ return "pages-" + self._subset
 3363+
 3364+ def getFileType(self):
 3365+ return "xml"
 3366+
 3367+ def getFileExt(self):
 3368+ return "bz2"
 3369+
 3370+ def getDumpNameMultistream(self, name):
 3371+ return name + "-multistream"
 3372+
 3373+ def getDumpNameMultistreamIndex(self, name):
 3374+ return self.getDumpNameMultistream(name) + "-index"
 3375+
 3376+ def getFileMultistreamName(self, f):
 3377+ """assuming that f is the name of an input file,
 3378+ return the name of the associated multistream output file"""
 3379+ return DumpFilename(self.wiki, f.date, self.getDumpNameMultistream(f.dumpName), f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp)
 3380+
 3381+ def getFileMultistreamIndexName(self, f):
 3382+ """assuming that f is the name of a multistream output file,
 3383+ return the name of the associated index file"""
 3384+ return DumpFilename(self.wiki, f.date, self.getDumpNameMultistreamIndex(f.dumpName), f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp)
 3385+
 3386+ # output files is a list of checkpoint files, otherwise it is a list of one file.
 3387+ # checkpoint files get done one at a time. we can't really do parallel recompression jobs of
 3388+ # 200 files, right?
 3389+ def buildCommand(self, runner, outputFiles):
 3390+ # FIXME need shell escape
 3391+ if (not exists( self.wiki.config.bzip2 ) ):
 3392+ raise BackupError("bzip2 command %s not found" % self.wiki.config.bzip2)
 3393+ if (not exists( self.wiki.config.recompressxml ) ):
 3394+ raise BackupError("recompressxml command %s not found" % self.wiki.config.recompressxml)
 3395+
 3396+ commandSeries = []
 3397+ for f in outputFiles:
 3398+ inputFile = DumpFilename(self.wiki, None, f.dumpName, f.fileType, self.itemForRecompression.fileExt, f.chunk, f.checkpoint)
 3399+ outfile = runner.dumpDir.filenamePublicPath(self.getFileMultistreamName(f))
 3400+ outfileIndex = runner.dumpDir.filenamePublicPath(self.getFileMultistreamIndexName(f))
 3401+ infile = runner.dumpDir.filenamePublicPath(inputFile)
 3402+ commandPipe = [ [ "%s -dc %s | %s --pagesperstream 100 --buildindex %s > %s" % (self.wiki.config.bzip2, infile, self.wiki.config.recompressxml, outfileIndex, outfile) ] ]
 3403+ commandSeries.append(commandPipe)
 3404+ return(commandSeries)
 3405+
 3406+ def run(self, runner):
 3407+ if runner.lastFailed:
 3408+ raise BackupError("bz2 dump incomplete, not recompressing")
 3409+ commands = []
 3410+ self.cleanupOldFiles(runner.dumpDir)
 3411+ if self.checkpointFile:
 3412+ outputFile = DumpFilename(self.wiki, None, self.checkpointFile.dumpName, self.checkpointFile.fileType, self.fileExt, self.checkpointFile.chunk, self.checkpointFile.checkpoint)
 3413+ series = self.buildCommand(runner, [ outputFile ])
 3414+ commands.append(series)
 3415+ elif self._chunksEnabled and not self._chunkToDo:
 3416+ # must set up each parallel job separately, they may have checkpoint files that
 3417+ # need to be processed in series, it's a special case
 3418+ for i in range(1, len(self._chunks)+1):
 3419+ outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir, i)
 3420+ series = self.buildCommand(runner, outputFiles)
 3421+ commands.append(series)
 3422+ else:
 3423+ outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir)
 3424+ series = self.buildCommand(runner, outputFiles)
 3425+ commands.append(series)
 3426+
 3427+ error = runner.runCommand(commands, callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 3428+ if (error):
 3429+ raise BackupError("error recompressing bz2 file(s)")
 3430+
 3431+ # shows all files possible if we don't have checkpoint files. without temp files of course
 3432+ def listOutputFilesToPublish(self, dumpDir):
 3433+ files = []
 3434+ inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir)
 3435+ for f in inputFiles:
 3436+ files.append(self.getFileMultistreamName(f))
 3437+ files.append(self.getFileMultistreamIndexName(f))
 3438+ return files
 3439+
 3440+ # shows all files possible if we don't have checkpoint files. without temp files of course
 3441+ # only the chunks we are actually supposed to do (if there is a limit)
 3442+ def listOutputFilesToCheckForTruncation(self, dumpDir):
 3443+ files = []
 3444+ inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir)
 3445+ for f in inputFiles:
 3446+ if self._chunkToDo and f.chunkInt != self._chunkToDo:
 3447+ continue
 3448+ files.append(self.getFileMultistreamName(f))
 3449+ files.append(self.getFileMultistreamIndexName(f))
 3450+ return files
 3451+
 3452+ # shows all files possible if we don't have checkpoint files. no temp files.
 3453+ # only the chunks we are actually supposed to do (if there is a limit)
 3454+ def listOutputFilesForBuildCommand(self, dumpDir, chunk = None):
 3455+ files = []
 3456+ inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir)
 3457+ for f in inputFiles:
 3458+ # if this param is set it takes priority
 3459+ if chunk and f.chunkInt != chunk:
 3460+ continue
 3461+ elif self._chunkToDo and f.chunkInt != self._chunkToDo:
 3462+ continue
 3463+ # we don't convert these names to the final output form, we'll do that in the build command
 3464+ # (i.e. add "multistream" and "index" to them)
 3465+ files.append(DumpFilename(self.wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp))
 3466+ return files
 3467+
 3468+ # shows all files possible if we don't have checkpoint files. should include temp files
 3469+ # does just the chunks we do if there is a limit
 3470+ def listOutputFilesForCleanup(self, dumpDir, dumpNames = None):
 3471+ # some stages (eg XLMStubs) call this for several different dumpNames
 3472+ if (dumpNames == None):
 3473+ dumpNames = [ self.dumpName ]
 3474+ multistreamNames = []
 3475+ for d in dumpNames:
 3476+ multistreamNames.extend( [ self.getDumpNameMultistream(d), self.getDumpNameMultistreamIndex(d) ] )
 3477+
 3478+ files = []
 3479+ if (self.itemForRecompression._checkpointsEnabled):
 3480+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 3481+ files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), multistreamNames))
 3482+ files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), multistreamNames))
 3483+ else:
 3484+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 3485+ files.extend(self.listRegularFilesPerChunkExisting(dumpDir, self.getChunkList(), multistreamNames))
 3486+ return files
 3487+
 3488+ # must return all output files that could be produced by a full run of this stage,
 3489+ # not just whatever we happened to produce (if run for one chunk, say)
 3490+ def listOutputFilesForInput(self, dumpDir):
 3491+ files = []
 3492+ inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir)
 3493+ for f in inputFiles:
 3494+ files.append(self.getFileMultistreamName(f))
 3495+ files.append(self.getFileMultistreamIndexName(f))
 3496+ return files
 3497+
33303498 class BigXmlDump(XmlDump):
33313499 """XML page dump for something larger, where a 7-Zip compressed copy
33323500 could save 75% of download time for some users."""

Status & tagging log