Index: branches/ariel/xmldumps-backup/WikiDump.py |
— | — | @@ -203,6 +203,7 @@ |
204 | 204 | "grep": "/bin/grep", |
205 | 205 | "checkforbz2footer": "/usr/local/bin/checkforbz2footer", |
206 | 206 | "writeuptopageid": "/usr/local/bin/writeuptopageid", |
| 207 | + "recompressxml": "/usr/local/bin/recompressxml", |
207 | 208 | #"cleanup": { |
208 | 209 | "keep": "3", |
209 | 210 | #"chunks": { |
— | — | @@ -284,6 +285,7 @@ |
285 | 286 | self.grep = self.conf.get("tools", "grep") |
286 | 287 | self.checkforbz2footer = self.conf.get("tools","checkforbz2footer") |
287 | 288 | self.writeuptopageid = self.conf.get("tools","writeuptopageid") |
| 289 | + self.recompressxml = self.conf.get("tools","recompressxml") |
288 | 290 | |
289 | 291 | if not self.conf.has_section('cleanup'): |
290 | 292 | self.conf.add_section('cleanup') |
Index: branches/ariel/xmldumps-backup/worker.py |
— | — | @@ -688,6 +688,19 @@ |
689 | 689 | "Recombine all pages with complete edit history (.7z)", |
690 | 690 | "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
691 | 691 | "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistory7zdump'), self.wiki)) |
| 692 | + # doing this only for recombined/full articles dump |
| 693 | + if (self.chunkInfo.chunksEnabled()): |
| 694 | + inputForMultistream = "articlesdumprecombine" |
| 695 | + else: |
| 696 | + inputForMultistream = "articlesdump" |
| 697 | + self.dumpItems.append( |
| 698 | + XmlMultiStreamDump("articles", |
| 699 | + "articlesmultistreamdump", |
| 700 | + "Articles, templates, media/file descriptions, and primary meta-pages, in multiple bz2 streams, 100 pages per stream", |
| 701 | + "This contains current versions of article content, in concatenated bz2 streams, 100 pages per stream, plus a separate" + |
| 702 | + "index of page titles/ids and offsets into the file. Useful for offline readers, or for parallel processing of pages.", |
| 703 | + self.findItemByName(inputForMultistream), self.wiki, None)) |
| 704 | + |
692 | 705 | results = self._runInfoFile.getOldRunInfoFromFile() |
693 | 706 | if (results): |
694 | 707 | for runInfoObj in results: |
— | — | @@ -3326,6 +3339,161 @@ |
3327 | 3340 | if (error): |
3328 | 3341 | raise BackupError("error recombining xml bz2 files") |
3329 | 3342 | |
| 3343 | +class XmlMultiStreamDump(XmlDump): |
| 3344 | +#class XmlRecompressDump(Dump): |
| 3345 | + """Take a .bz2 and recompress it as multistream bz2, 100 pages per stream.""" |
| 3346 | + |
| 3347 | + def __init__(self, subset, name, desc, detail, itemForRecompression, wiki, chunkToDo, chunks = False, checkpoints = False, checkpointFile = None): |
| 3348 | + self._subset = subset |
| 3349 | + self._detail = detail |
| 3350 | + self._chunks = chunks |
| 3351 | + if self._chunks: |
| 3352 | + self._chunksEnabled = True |
| 3353 | + self._chunkToDo = chunkToDo |
| 3354 | + self.wiki = wiki |
| 3355 | + self.itemForRecompression = itemForRecompression |
| 3356 | + if checkpoints: |
| 3357 | + self._checkpointsEnabled = True |
| 3358 | + self.checkpointFile = checkpointFile |
| 3359 | + Dump.__init__(self, name, desc) |
| 3360 | + |
| 3361 | + def getDumpName(self): |
| 3362 | + return "pages-" + self._subset |
| 3363 | + |
| 3364 | + def getFileType(self): |
| 3365 | + return "xml" |
| 3366 | + |
| 3367 | + def getFileExt(self): |
| 3368 | + return "bz2" |
| 3369 | + |
| 3370 | + def getDumpNameMultistream(self, name): |
| 3371 | + return name + "-multistream" |
| 3372 | + |
| 3373 | + def getDumpNameMultistreamIndex(self, name): |
| 3374 | + return self.getDumpNameMultistream(name) + "-index" |
| 3375 | + |
| 3376 | + def getFileMultistreamName(self, f): |
| 3377 | + """assuming that f is the name of an input file, |
| 3378 | + return the name of the associated multistream output file""" |
| 3379 | + return DumpFilename(self.wiki, f.date, self.getDumpNameMultistream(f.dumpName), f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp) |
| 3380 | + |
| 3381 | + def getFileMultistreamIndexName(self, f): |
| 3382 | + """assuming that f is the name of a multistream output file, |
| 3383 | + return the name of the associated index file""" |
| 3384 | + return DumpFilename(self.wiki, f.date, self.getDumpNameMultistreamIndex(f.dumpName), f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp) |
| 3385 | + |
| 3386 | + # output files is a list of checkpoint files, otherwise it is a list of one file. |
| 3387 | + # checkpoint files get done one at a time. we can't really do parallel recompression jobs of |
| 3388 | + # 200 files, right? |
| 3389 | + def buildCommand(self, runner, outputFiles): |
| 3390 | + # FIXME need shell escape |
| 3391 | + if (not exists( self.wiki.config.bzip2 ) ): |
| 3392 | + raise BackupError("bzip2 command %s not found" % self.wiki.config.bzip2) |
| 3393 | + if (not exists( self.wiki.config.recompressxml ) ): |
| 3394 | + raise BackupError("recompressxml command %s not found" % self.wiki.config.recompressxml) |
| 3395 | + |
| 3396 | + commandSeries = [] |
| 3397 | + for f in outputFiles: |
| 3398 | + inputFile = DumpFilename(self.wiki, None, f.dumpName, f.fileType, self.itemForRecompression.fileExt, f.chunk, f.checkpoint) |
| 3399 | + outfile = runner.dumpDir.filenamePublicPath(self.getFileMultistreamName(f)) |
| 3400 | + outfileIndex = runner.dumpDir.filenamePublicPath(self.getFileMultistreamIndexName(f)) |
| 3401 | + infile = runner.dumpDir.filenamePublicPath(inputFile) |
| 3402 | + commandPipe = [ [ "%s -dc %s | %s --pagesperstream 100 --buildindex %s > %s" % (self.wiki.config.bzip2, infile, self.wiki.config.recompressxml, outfileIndex, outfile) ] ] |
| 3403 | + commandSeries.append(commandPipe) |
| 3404 | + return(commandSeries) |
| 3405 | + |
| 3406 | + def run(self, runner): |
| 3407 | + if runner.lastFailed: |
| 3408 | + raise BackupError("bz2 dump incomplete, not recompressing") |
| 3409 | + commands = [] |
| 3410 | + self.cleanupOldFiles(runner.dumpDir) |
| 3411 | + if self.checkpointFile: |
| 3412 | + outputFile = DumpFilename(self.wiki, None, self.checkpointFile.dumpName, self.checkpointFile.fileType, self.fileExt, self.checkpointFile.chunk, self.checkpointFile.checkpoint) |
| 3413 | + series = self.buildCommand(runner, [ outputFile ]) |
| 3414 | + commands.append(series) |
| 3415 | + elif self._chunksEnabled and not self._chunkToDo: |
| 3416 | + # must set up each parallel job separately, they may have checkpoint files that |
| 3417 | + # need to be processed in series, it's a special case |
| 3418 | + for i in range(1, len(self._chunks)+1): |
| 3419 | + outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir, i) |
| 3420 | + series = self.buildCommand(runner, outputFiles) |
| 3421 | + commands.append(series) |
| 3422 | + else: |
| 3423 | + outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 3424 | + series = self.buildCommand(runner, outputFiles) |
| 3425 | + commands.append(series) |
| 3426 | + |
| 3427 | + error = runner.runCommand(commands, callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 3428 | + if (error): |
| 3429 | + raise BackupError("error recompressing bz2 file(s)") |
| 3430 | + |
| 3431 | + # shows all files possible if we don't have checkpoint files. without temp files of course |
| 3432 | + def listOutputFilesToPublish(self, dumpDir): |
| 3433 | + files = [] |
| 3434 | + inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir) |
| 3435 | + for f in inputFiles: |
| 3436 | + files.append(self.getFileMultistreamName(f)) |
| 3437 | + files.append(self.getFileMultistreamIndexName(f)) |
| 3438 | + return files |
| 3439 | + |
| 3440 | + # shows all files possible if we don't have checkpoint files. without temp files of course |
| 3441 | + # only the chunks we are actually supposed to do (if there is a limit) |
| 3442 | + def listOutputFilesToCheckForTruncation(self, dumpDir): |
| 3443 | + files = [] |
| 3444 | + inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir) |
| 3445 | + for f in inputFiles: |
| 3446 | + if self._chunkToDo and f.chunkInt != self._chunkToDo: |
| 3447 | + continue |
| 3448 | + files.append(self.getFileMultistreamName(f)) |
| 3449 | + files.append(self.getFileMultistreamIndexName(f)) |
| 3450 | + return files |
| 3451 | + |
| 3452 | + # shows all files possible if we don't have checkpoint files. no temp files. |
| 3453 | + # only the chunks we are actually supposed to do (if there is a limit) |
| 3454 | + def listOutputFilesForBuildCommand(self, dumpDir, chunk = None): |
| 3455 | + files = [] |
| 3456 | + inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir) |
| 3457 | + for f in inputFiles: |
| 3458 | + # if this param is set it takes priority |
| 3459 | + if chunk and f.chunkInt != chunk: |
| 3460 | + continue |
| 3461 | + elif self._chunkToDo and f.chunkInt != self._chunkToDo: |
| 3462 | + continue |
| 3463 | + # we don't convert these names to the final output form, we'll do that in the build command |
| 3464 | + # (i.e. add "multistream" and "index" to them) |
| 3465 | + files.append(DumpFilename(self.wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp)) |
| 3466 | + return files |
| 3467 | + |
| 3468 | + # shows all files possible if we don't have checkpoint files. should include temp files |
| 3469 | + # does just the chunks we do if there is a limit |
| 3470 | + def listOutputFilesForCleanup(self, dumpDir, dumpNames = None): |
| 3471 | + # some stages (eg XLMStubs) call this for several different dumpNames |
| 3472 | + if (dumpNames == None): |
| 3473 | + dumpNames = [ self.dumpName ] |
| 3474 | + multistreamNames = [] |
| 3475 | + for d in dumpNames: |
| 3476 | + multistreamNames.extend( [ self.getDumpNameMultistream(d), self.getDumpNameMultistreamIndex(d) ] ) |
| 3477 | + |
| 3478 | + files = [] |
| 3479 | + if (self.itemForRecompression._checkpointsEnabled): |
| 3480 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 3481 | + files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), multistreamNames)) |
| 3482 | + files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), multistreamNames)) |
| 3483 | + else: |
| 3484 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 3485 | + files.extend(self.listRegularFilesPerChunkExisting(dumpDir, self.getChunkList(), multistreamNames)) |
| 3486 | + return files |
| 3487 | + |
| 3488 | + # must return all output files that could be produced by a full run of this stage, |
| 3489 | + # not just whatever we happened to produce (if run for one chunk, say) |
| 3490 | + def listOutputFilesForInput(self, dumpDir): |
| 3491 | + files = [] |
| 3492 | + inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir) |
| 3493 | + for f in inputFiles: |
| 3494 | + files.append(self.getFileMultistreamName(f)) |
| 3495 | + files.append(self.getFileMultistreamIndexName(f)) |
| 3496 | + return files |
| 3497 | + |
3330 | 3498 | class BigXmlDump(XmlDump): |
3331 | 3499 | """XML page dump for something larger, where a 7-Zip compressed copy |
3332 | 3500 | could save 75% of download time for some users.""" |