Index: branches/ariel/xmldumps-backup/WikiDump.py |
— | — | @@ -191,6 +191,7 @@ |
192 | 192 | "cat": "/bin/cat", |
193 | 193 | "grep": "/bin/grep", |
194 | 194 | "checkforbz2footer": "/usr/local/bin/checkforbz2footer", |
| 195 | + "writeuptopageid": "/usr/local/bin/writeuptopageid", |
195 | 196 | #"cleanup": { |
196 | 197 | "keep": "3", |
197 | 198 | #"chunks": { |
— | — | @@ -271,6 +272,7 @@ |
272 | 273 | self.cat = self.conf.get("tools", "cat") |
273 | 274 | self.grep = self.conf.get("tools", "grep") |
274 | 275 | self.checkforbz2footer = self.conf.get("tools","checkforbz2footer") |
| 276 | + self.writeuptopageid = self.conf.get("tools","writeuptopageid") |
275 | 277 | |
276 | 278 | if not self.conf.has_section('cleanup'): |
277 | 279 | self.conf.add_section('cleanup') |
Index: branches/ariel/xmldumps-backup/worker.py |
— | — | @@ -485,12 +485,13 @@ |
486 | 486 | self._toBeRun = toBeRun |
487 | 487 | |
488 | 488 | class DumpItemList(object): |
489 | | - def __init__(self, wiki, prefetch, spawn, chunkToDo, singleJob, chunkInfo, runInfoFile, dumpDir): |
| 489 | + def __init__(self, wiki, prefetch, spawn, chunkToDo, checkpointFile, singleJob, chunkInfo, runInfoFile, dumpDir): |
490 | 490 | self.wiki = wiki |
491 | 491 | self._hasFlaggedRevs = self.wiki.hasFlaggedRevs() |
492 | 492 | self._prefetch = prefetch |
493 | 493 | self._spawn = spawn |
494 | 494 | self.chunkInfo = chunkInfo |
| 495 | + self.checkpointFile = checkpointFile |
495 | 496 | self._chunkToDo = chunkToDo |
496 | 497 | self._singleJob = singleJob |
497 | 498 | self._runInfoFile = runInfoFile |
— | — | @@ -505,9 +506,21 @@ |
506 | 507 | self._singleJob[-9:] == 'recombine' or |
507 | 508 | self._singleJob == 'noop' or |
508 | 509 | self._singleJob == 'xmlpagelogsdump' or |
509 | | - self._singleJob == 'pagetitlesdump'): |
| 510 | + self._singleJob == 'pagetitlesdump' or |
| 511 | + self._singleJob.endswith('recombine')): |
510 | 512 | raise BackupError("You cannot specify a chunk with the job %s, exiting.\n" % self._singleJob) |
511 | 513 | |
| 514 | + if (self._singleJob and self.checkpointFile): |
| 515 | + if (self._singleJob[-5:] == 'table' or |
| 516 | + self._singleJob[-9:] == 'recombine' or |
| 517 | + self._singleJob == 'noop' or |
| 518 | + self._singleJob == 'xmlpagelogsdump' or |
| 519 | + self._singleJob == 'pagetitlesdump' or |
| 520 | + self._singleJob == 'abstractsdump' or |
| 521 | + self._singleJob == 'xmlstubsdump' or |
| 522 | + self._singleJob.endswith('recombine')): |
| 523 | + raise BackupError("You cannot specify a checkpoint file with the job %s, exiting.\n" % self._singleJob) |
| 524 | + |
512 | 525 | self.dumpItems = [PrivateTable("user", "usertable", "User account data."), |
513 | 526 | PrivateTable("watchlist", "watchlisttable", "Users' watchlist settings."), |
514 | 527 | PrivateTable("ipblocks", "ipblockstable", "Data for blocks of IP addresses, ranges, and users."), |
— | — | @@ -555,7 +568,7 @@ |
556 | 569 | XmlDump("articles", |
557 | 570 | "articlesdump", |
558 | 571 | "<big><b>Articles, templates, image descriptions, and primary meta-pages.</b></big>", |
559 | | - "This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("articlesdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints)) |
| 572 | + "This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("articlesdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints, self.checkpointFile)) |
560 | 573 | if (self.chunkInfo.chunksEnabled()): |
561 | 574 | self.dumpItems.append(RecombineXmlDump("articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('articlesdump'))) |
562 | 575 | |
— | — | @@ -563,7 +576,7 @@ |
564 | 577 | XmlDump("meta-current", |
565 | 578 | "metacurrentdump", |
566 | 579 | "All pages, current versions only.", |
567 | | - "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("metacurrentdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints)) |
| 580 | + "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("metacurrentdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints, self.checkpointFile)) |
568 | 581 | |
569 | 582 | if (self.chunkInfo.chunksEnabled()): |
570 | 583 | self.dumpItems.append(RecombineXmlDump("metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.findItemByName('metacurrentdump'))) |
— | — | @@ -582,7 +595,7 @@ |
583 | 596 | "metahistorybz2dump", |
584 | 597 | "All pages with complete page edit history (.bz2)", |
585 | 598 | "These dumps can be *very* large, uncompressing up to 20 times the archive download size. " + |
586 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("metahistorybz2dump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints)) |
| 599 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("metahistorybz2dump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints, self.checkpointFile)) |
587 | 600 | if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()): |
588 | 601 | self.dumpItems.append( |
589 | 602 | RecombineXmlDump("metahistorybz2dumprecombine", |
— | — | @@ -594,7 +607,7 @@ |
595 | 608 | "metahistory7zdump", |
596 | 609 | "All pages with complete edit history (.7z)", |
597 | 610 | "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
598 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistorybz2dump'), self.wiki, self._getChunkToDo("metahistory7zdump"), self.chunkInfo.getPagesPerChunkHistory())) |
| 611 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistorybz2dump'), self.wiki, self._getChunkToDo("metahistory7zdump"), self.chunkInfo.getPagesPerChunkHistory(), self.checkpointFile)) |
599 | 612 | if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()): |
600 | 613 | self.dumpItems.append( |
601 | 614 | RecombineXmlRecompressDump("metahistory7zdumprecombine", |
— | — | @@ -1454,7 +1467,7 @@ |
1455 | 1468 | return os.path.join(self.wiki.publicDir(), self.wiki.date) |
1456 | 1469 | |
1457 | 1470 | class Runner(object): |
1458 | | - def __init__(self, wiki, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False): |
| 1471 | + def __init__(self, wiki, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False, checkpointFile = None): |
1459 | 1472 | self.wiki = wiki |
1460 | 1473 | self.dbName = wiki.dbName |
1461 | 1474 | self.prefetch = prefetch |
— | — | @@ -1465,7 +1478,18 @@ |
1466 | 1479 | self.log = None |
1467 | 1480 | self.dryrun = dryrun |
1468 | 1481 | self._chunkToDo = chunkToDo |
| 1482 | + self.checkpointFile = None |
1469 | 1483 | |
| 1484 | + if (checkpointFile): |
| 1485 | + f = DumpFilename(self.wiki) |
| 1486 | + f.newFromFilename(checkpointFile) |
| 1487 | + # we should get chunk if any |
| 1488 | + if not self._chunkToDo and f.chunkInt: |
| 1489 | + self._chunkToDo = f.chunkInt |
| 1490 | + elif self._chunkToDo and f.chunkInt and self._chunkToDo != f.chunkInt: |
| 1491 | + raise BackupError("specifed chunk to do does not match chunk of checkpoint file %s to redo", self.checkpointFile) |
| 1492 | + self.checkpointFile = f |
| 1493 | + |
1470 | 1494 | self._loggingEnabled = loggingEnabled |
1471 | 1495 | self._statusEnabled = True |
1472 | 1496 | self._checksummerEnabled = True |
— | — | @@ -1514,7 +1538,7 @@ |
1515 | 1539 | self.checksums = Checksummer(self.wiki, self.dumpDir, self._checksummerEnabled) |
1516 | 1540 | |
1517 | 1541 | # some or all of these dumpItems will be marked to run |
1518 | | - self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self._chunkToDo, self.jobRequested, self.chunkInfo, self.runInfoFile, self.dumpDir) |
| 1542 | + self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self._chunkToDo, self.checkpointFile, self.jobRequested, self.chunkInfo, self.runInfoFile, self.dumpDir) |
1519 | 1543 | self.status = Status(self.wiki, self.dumpDir, self.dumpItemList.dumpItems, self.checksums, self._statusEnabled, self.htmlNoticeFile, self.logAndPrint) |
1520 | 1544 | |
1521 | 1545 | def logQueueReader(self,log): |
— | — | @@ -1662,11 +1686,11 @@ |
1663 | 1687 | item.start(self) |
1664 | 1688 | self.status.updateStatusFiles() |
1665 | 1689 | self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
1666 | | - try: |
1667 | | - item.dump(self) |
1668 | | - except Exception, ex: |
1669 | | - self.debug("*** exception! " + str(ex)) |
1670 | | - item.setStatus("failed") |
| 1690 | +# try: |
| 1691 | + item.dump(self) |
| 1692 | +# except Exception, ex: |
| 1693 | +# self.debug("*** exception! " + str(ex)) |
| 1694 | +# item.setStatus("failed") |
1671 | 1695 | if item.status() == "failed": |
1672 | 1696 | self.runHandleFailure() |
1673 | 1697 | else: |
— | — | @@ -1696,11 +1720,11 @@ |
1697 | 1721 | item.start(self) |
1698 | 1722 | self.status.updateStatusFiles() |
1699 | 1723 | self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
1700 | | - try: |
1701 | | - item.dump(self) |
1702 | | - except Exception, ex: |
1703 | | - self.debug("*** exception! " + str(ex)) |
1704 | | - item.setStatus("failed") |
| 1724 | +# try: |
| 1725 | + item.dump(self) |
| 1726 | +# except Exception, ex: |
| 1727 | +# self.debug("*** exception! " + str(ex)) |
| 1728 | +# item.setStatus("failed") |
1705 | 1729 | if item.status() == "failed": |
1706 | 1730 | self.runHandleFailure() |
1707 | 1731 | else: |
— | — | @@ -1873,6 +1897,8 @@ |
1874 | 1898 | self._chunksEnabled = False |
1875 | 1899 | if not hasattr(self, '_checkpointsEnabled'): |
1876 | 1900 | self._checkpointsEnabled = False |
| 1901 | + if not hasattr(self, 'checkpointFile'): |
| 1902 | + self.checkpointFile = False |
1877 | 1903 | |
1878 | 1904 | def name(self): |
1879 | 1905 | return self.runInfo.name() |
— | — | @@ -1931,11 +1957,11 @@ |
1932 | 1958 | |
1933 | 1959 | def dump(self, runner): |
1934 | 1960 | """Attempt to run the operation, updating progress/status info.""" |
1935 | | - try: |
1936 | | - self.run(runner) |
1937 | | - except Exception, ex: |
1938 | | - self.setStatus("failed") |
1939 | | - raise ex |
| 1961 | +# try: |
| 1962 | + self.run(runner) |
| 1963 | +# except Exception, ex: |
| 1964 | +# self.setStatus("failed") |
| 1965 | +# raise ex |
1940 | 1966 | self.setStatus("done") |
1941 | 1967 | |
1942 | 1968 | def run(self, runner): |
— | — | @@ -2026,6 +2052,10 @@ |
2027 | 2053 | |
2028 | 2054 | def cleanupOldFiles(self, dumpDir, chunks = False): |
2029 | 2055 | if (runner._cleanupOldFilesEnabled): |
| 2056 | + if (self.checkpointFile): |
| 2057 | + # we only rerun this one, so just remove this one |
| 2058 | + if exists(dumpDir.filenamePublicPath(self.checkpointFile)): |
| 2059 | + os.remove(dumpDir.filenamePublicPath(self.checkpointFile)) |
2030 | 2060 | files = self.listOutputFilesForCleanup(dumpDir) |
2031 | 2061 | for f in files: |
2032 | 2062 | if exists(dumpDir.filenamePublicPath(f)): |
— | — | @@ -2266,6 +2296,10 @@ |
2267 | 2297 | if (dumpNames == None): |
2268 | 2298 | dumpNames = [ self.dumpName ] |
2269 | 2299 | files = [] |
| 2300 | + if (self.checkpointFile): |
| 2301 | + files.append(self.checkpointFile) |
| 2302 | + return files |
| 2303 | + |
2270 | 2304 | if (self._checkpointsEnabled): |
2271 | 2305 | # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
2272 | 2306 | files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
— | — | @@ -2285,6 +2319,10 @@ |
2286 | 2320 | if (dumpNames == None): |
2287 | 2321 | dumpNames = [ self.dumpName ] |
2288 | 2322 | files = [] |
| 2323 | + if (self.checkpointFile): |
| 2324 | + files.append(self.checkpointFile) |
| 2325 | + return files |
| 2326 | + |
2289 | 2327 | if (self._checkpointsEnabled): |
2290 | 2328 | # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
2291 | 2329 | files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
— | — | @@ -2303,6 +2341,10 @@ |
2304 | 2342 | if (dumpNames == None): |
2305 | 2343 | dumpNames = [ self.dumpName ] |
2306 | 2344 | files = [] |
| 2345 | + if (self.checkpointFile): |
| 2346 | + files.append(self.checkpointFile) |
| 2347 | + return files |
| 2348 | + |
2307 | 2349 | if (self._checkpointsEnabled): |
2308 | 2350 | # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
2309 | 2351 | files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
— | — | @@ -2321,7 +2363,11 @@ |
2322 | 2364 | if (dumpNames == None): |
2323 | 2365 | dumpNames = [ self.dumpName ] |
2324 | 2366 | files = [] |
2325 | | - if (self._checkpointsEnabled): |
| 2367 | + if (self.checkpointFile): |
| 2368 | + files.append(self.checkpointFile) |
| 2369 | + return files |
| 2370 | + |
| 2371 | + if (self._checkpointsEnabled): |
2326 | 2372 | # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
2327 | 2373 | files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
2328 | 2374 | files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
— | — | @@ -2618,7 +2664,7 @@ |
2619 | 2665 | |
2620 | 2666 | class XmlDump(Dump): |
2621 | 2667 | """Primary XML dumps, one section at a time.""" |
2622 | | - def __init__(self, subset, name, desc, detail, itemForStubs, prefetch, spawn, wiki, chunkToDo, chunks = False, checkpoints = False): |
| 2668 | + def __init__(self, subset, name, desc, detail, itemForStubs, prefetch, spawn, wiki, chunkToDo, chunks = False, checkpoints = False, checkpointFile = None): |
2623 | 2669 | self._subset = subset |
2624 | 2670 | self._detail = detail |
2625 | 2671 | self._desc = desc |
— | — | @@ -2634,6 +2680,11 @@ |
2635 | 2681 | self.itemForStubs = itemForStubs |
2636 | 2682 | if checkpoints: |
2637 | 2683 | self._checkpointsEnabled = True |
| 2684 | + self.checkpointFile = checkpointFile |
| 2685 | + if self.checkpointFile: |
| 2686 | + # we don't checkpoint the checkpoint file. |
| 2687 | + self._checkpointsEnabled = False |
| 2688 | + |
2638 | 2689 | Dump.__init__(self, name, desc) |
2639 | 2690 | |
2640 | 2691 | def getDumpNameBase(self): |
— | — | @@ -2670,12 +2721,17 @@ |
2671 | 2722 | break |
2672 | 2723 | if len(inputFiles) > 1: |
2673 | 2724 | raise BackupError("Trouble finding stub files for xml dump run") |
2674 | | - for f in inputFiles: |
2675 | | - # we should convert the input file to an output file I guess |
2676 | | - # we write regular files |
2677 | | - outputFile = DumpFilename(runner.wiki, f.date, f.dumpName, f.fileType, self.fileExt) |
2678 | | - series = self.buildCommand(runner, f) |
| 2725 | + |
| 2726 | + if self.checkpointFile: |
| 2727 | + series = self.buildCommand(runner, self.checkpointFile) |
2679 | 2728 | commands.append(series) |
| 2729 | + else: |
| 2730 | + for f in inputFiles: |
| 2731 | + # we should convert the input file to an output file I guess |
| 2732 | + # we write regular files |
| 2733 | + outputFile = DumpFilename(self.wiki, f.date, f.dumpName, f.fileType, self.fileExt) |
| 2734 | + series = self.buildCommand(runner, f) |
| 2735 | + commands.append(series) |
2680 | 2736 | |
2681 | 2737 | error = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner) |
2682 | 2738 | truncationError = self.checkForTruncatedFiles(runner) |
— | — | @@ -2690,7 +2746,7 @@ |
2691 | 2747 | else: |
2692 | 2748 | files = self.listRegularFilesPerChunkExisting(runner.dumpDir, self.getChunkList(), [ self.dumpName ]) |
2693 | 2749 | for f in files: |
2694 | | - f = DumpFile(runner.wiki,runner.dumpDir.filenamePublicPath(f)) |
| 2750 | + f = DumpFile(self.wiki,runner.dumpDir.filenamePublicPath(f)) |
2695 | 2751 | if (f.checkIfTruncated()): |
2696 | 2752 | runner.logAndPrint("file %s is truncated, moving out of the way" % f.filename ) |
2697 | 2753 | f.rename( f.filename + ".truncated" ) |
— | — | @@ -2707,27 +2763,76 @@ |
2708 | 2764 | # do we need checkpoints? ummm |
2709 | 2765 | xmlbz2 = runner.dumpDir.filenamePublicPath(f) |
2710 | 2766 | |
2711 | | - if (not exists( runner.wiki.config.bzip2 ) ): |
2712 | | - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2) |
2713 | | - if runner.wiki.config.bzip2[-6:] == "dbzip2": |
| 2767 | + if (not exists( self.wiki.config.bzip2 ) ): |
| 2768 | + raise BackupError("bzip2 command %s not found" % self.wiki.config.bzip2) |
| 2769 | + if self.wiki.config.bzip2[-6:] == "dbzip2": |
2714 | 2770 | bz2mode = "dbzip2" |
2715 | 2771 | else: |
2716 | 2772 | bz2mode = "bzip2" |
2717 | 2773 | return "--output=%s:%s" % (bz2mode, xmlbz2) |
2718 | 2774 | |
| 2775 | + def writePartialStub(self, inputFile, outputFile, startPageID, endPageID): |
| 2776 | + if (not exists( self.wiki.config.writeuptopageid ) ): |
| 2777 | + raise BackupError("writeuptopageid command %s not found" % self.wiki.config.writeuptopageid) |
| 2778 | + writeuptopageid = self.wiki.config.writeuptopageid |
| 2779 | + |
| 2780 | + inputFilePath = runner.dumpDir.filenamePublicPath(inputFile) |
| 2781 | + outputFilePath = os.path.join(self.wiki.config.tempDir,outputFile.filename) |
| 2782 | + if inputFile.fileExt == "gz": |
| 2783 | + command1 = "%s -dc %s" % (self.wiki.config.gzip, inputFilePath ) |
| 2784 | + command2 = "%s > %s" % (self.wiki.config.gzip, outputFilePath ) |
| 2785 | + elif inputFile.fileExt == '7z': |
| 2786 | + command1 = "%s e -si %s" % (self.wiki.config.sevenzip, inputFilePath ) |
| 2787 | + command2 = "%s e -so %s" % (self.wiki.config.sevenzip, outputFilePath ) |
| 2788 | + elif inputFile.fileExt == 'bz': |
| 2789 | + command1 = "%s -dc %s" % (self.wiki.config.bzip2, inputFilePath ) |
| 2790 | + command2 = "%s > %s" % (self.wiki.config.bzip2, outputFilePath ) |
| 2791 | + else: |
| 2792 | + raise BackupError("unknown stub file extension %s" % inputFile.fileExt) |
| 2793 | + command = [ command1 + ( "| %s %s %s |" % (self.wiki.config.writeuptopageid, startPageID, endPageID) ) + command2 ] |
| 2794 | + |
| 2795 | + pipeline = [ command ] |
| 2796 | + series = [ pipeline ] |
| 2797 | + error = runner.runCommand([ series ], shell = True) |
| 2798 | + if (error): |
| 2799 | + raise BackupError("failed to write partial stub file %s" % outputFile.filename) |
| 2800 | + |
2719 | 2801 | def buildCommand(self, runner, f): |
2720 | 2802 | """Build the command line for the dump, minus output and filter options""" |
2721 | 2803 | |
2722 | | - if (self._checkpointsEnabled): |
| 2804 | + if (self.checkpointFile): |
| 2805 | + outputFile = f |
| 2806 | + print "outputFile is ", outputFile.filename |
| 2807 | + elif (self._checkpointsEnabled): |
2723 | 2808 | # we write a temp file, it will be checkpointed every so often. |
2724 | | - outputFile = DumpFilename(runner.wiki, f.date, self.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, temp = True) |
| 2809 | + outputFile = DumpFilename(self.wiki, f.date, self.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, temp = True) |
2725 | 2810 | else: |
2726 | 2811 | # we write regular files |
2727 | | - outputFile = DumpFilename(runner.wiki, f.date, self.dumpName, f.fileType, self.fileExt, f.chunk, checkpoint = False, temp = False) |
| 2812 | + outputFile = DumpFilename(self.wiki, f.date, self.dumpName, f.fileType, self.fileExt, f.chunk, checkpoint = False, temp = False) |
2728 | 2813 | |
2729 | 2814 | # Page and revision data pulled from this skeleton dump... |
2730 | | - stubOption = "--stub=gzip:%s" % runner.dumpDir.filenamePublicPath(f) |
| 2815 | + # FIXME we need the stream wrappers for proper use of writeupto. this is a hack. |
| 2816 | + if (self.checkpointFile): |
| 2817 | + # fixme I now have this code in a couple places, make it a function. |
| 2818 | + if not self.dumpName.startswith(self.getDumpNameBase()): |
| 2819 | + raise BackupError("dumpName %s of unknown form for this job" % self.dumpName) |
| 2820 | + dumpName = self.dumpName[len(self.getDumpNameBase()):] |
| 2821 | + stubDumpNames = self.itemForStubs.listDumpNames() |
| 2822 | + for s in stubDumpNames: |
| 2823 | + if s.endswith(dumpName): |
| 2824 | + stubDumpName = s |
2731 | 2825 | |
| 2826 | + stubInputFilename = self.checkpointFile.newFilename(stubDumpName, self.itemForStubs.getFileType(), self.itemForStubs.getFileExt(), self.checkpointFile.date, self.checkpointFile.chunk) |
| 2827 | + stubInputFile = DumpFilename(self.wiki) |
| 2828 | + stubInputFile.newFromFilename(stubInputFilename) |
| 2829 | + stubOutputFilename = self.checkpointFile.newFilename(stubDumpName, self.itemForStubs.getFileType(), self.itemForStubs.getFileExt(), self.checkpointFile.date, self.checkpointFile.chunk, self.checkpointFile.checkpoint) |
| 2830 | + stubOutputFile = DumpFilename(self.wiki) |
| 2831 | + stubOutputFile.newFromFilename(stubOutputFilename) |
| 2832 | + self.writePartialStub(stubInputFile, stubOutputFile, self.checkpointFile.firstPageID, str(int(self.checkpointFile.lastPageID) + 1)) |
| 2833 | + stubOption = "--stub=gzip:%s" % os.path.join(self.wiki.config.tempDir, stubOutputFile.filename) |
| 2834 | + else: |
| 2835 | + stubOption = "--stub=gzip:%s" % runner.dumpDir.filenamePublicPath(f) |
| 2836 | + |
2732 | 2837 | # Try to pull text from the previous run; most stuff hasn't changed |
2733 | 2838 | #Source=$OutputDir/pages_$section.xml.bz2 |
2734 | 2839 | sources = [] |
— | — | @@ -2753,21 +2858,21 @@ |
2754 | 2859 | prefetch = "" |
2755 | 2860 | |
2756 | 2861 | if self._spawn: |
2757 | | - spawn = "--spawn=%s" % (runner.wiki.config.php) |
| 2862 | + spawn = "--spawn=%s" % (self.wiki.config.php) |
2758 | 2863 | else: |
2759 | 2864 | spawn = "" |
2760 | 2865 | |
2761 | | - if (not exists( runner.wiki.config.php ) ): |
2762 | | - raise BackupError("php command %s not found" % runner.wiki.config.php) |
| 2866 | + if (not exists( self.wiki.config.php ) ): |
| 2867 | + raise BackupError("php command %s not found" % self.wiki.config.php) |
2763 | 2868 | |
2764 | | - if (self.wiki.config.checkpointTime): |
2765 | | - checkpointTime = "--maxtime=%s" % (runner.wiki.config.checkpointTime) |
| 2869 | + if (self._checkpointsEnabled): |
| 2870 | + checkpointTime = "--maxtime=%s" % (self.wiki.config.checkpointTime) |
2766 | 2871 | checkpointFile = "--checkpointfile=%s" % outputFile.newFilename(outputFile.dumpName, outputFile.fileType, outputFile.fileExt, outputFile.date, outputFile.chunk, "p%sp%s", None) |
2767 | 2872 | else: |
2768 | 2873 | checkpointTime = "" |
2769 | 2874 | checkpointFile = "" |
2770 | | - dumpCommand = [ "%s" % runner.wiki.config.php, |
2771 | | - "-q", "%s/maintenance/dumpTextPass.php" % runner.wiki.config.wikiDir, |
| 2875 | + dumpCommand = [ "%s" % self.wiki.config.php, |
| 2876 | + "-q", "%s/maintenance/dumpTextPass.php" % self.wiki.config.wikiDir, |
2772 | 2877 | "--wiki=%s" % runner.dbName, |
2773 | 2878 | "%s" % stubOption, |
2774 | 2879 | "%s" % prefetch, |
— | — | @@ -2853,7 +2958,7 @@ |
2854 | 2959 | startPageID = 1 |
2855 | 2960 | endPageID = None |
2856 | 2961 | |
2857 | | - dumps = runner.wiki.dumpDirs() |
| 2962 | + dumps = self.wiki.dumpDirs() |
2858 | 2963 | dumps.sort() |
2859 | 2964 | dumps.reverse() |
2860 | 2965 | for date in dumps: |
— | — | @@ -2978,17 +3083,17 @@ |
2979 | 3084 | # 200 files, right? |
2980 | 3085 | def buildCommand(self, runner, outputFiles): |
2981 | 3086 | # FIXME need shell escape |
2982 | | - if (not exists( runner.wiki.config.bzip2 ) ): |
2983 | | - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2) |
2984 | | - if (not exists( runner.wiki.config.sevenzip ) ): |
2985 | | - raise BackupError("7zip command %s not found" % runner.wiki.config.sevenzip) |
| 3087 | + if (not exists( self.wiki.config.bzip2 ) ): |
| 3088 | + raise BackupError("bzip2 command %s not found" % self.wiki.config.bzip2) |
| 3089 | + if (not exists( self.wiki.config.sevenzip ) ): |
| 3090 | + raise BackupError("7zip command %s not found" % self.wiki.config.sevenzip) |
2986 | 3091 | |
2987 | 3092 | commandSeries = [] |
2988 | 3093 | for f in outputFiles: |
2989 | | - inputFile = DumpFilename(runner.wiki, None, f.dumpName, f.fileType, self.itemForRecompression.fileExt, f.chunk, f.checkpoint) |
| 3094 | + inputFile = DumpFilename(self.wiki, None, f.dumpName, f.fileType, self.itemForRecompression.fileExt, f.chunk, f.checkpoint) |
2990 | 3095 | outfile = runner.dumpDir.filenamePublicPath(f) |
2991 | 3096 | infile = runner.dumpDir.filenamePublicPath(inputFile) |
2992 | | - commandPipe = [ [ "%s -dc %s | %s a -si %s" % (runner.wiki.config.bzip2, infile, runner.wiki.config.sevenzip, outfile) ] ] |
| 3097 | + commandPipe = [ [ "%s -dc %s | %s a -si %s" % (self.wiki.config.bzip2, infile, self.wiki.config.sevenzip, outfile) ] ] |
2993 | 3098 | commandSeries.append(commandPipe) |
2994 | 3099 | return(commandSeries) |
2995 | 3100 | |
— | — | @@ -3107,10 +3212,10 @@ |
3108 | 3213 | if not len(inputFiles): |
3109 | 3214 | self.setStatus("failed") |
3110 | 3215 | raise BackupError("No input files for %s found" % self.name()) |
3111 | | - if (not exists( runner.wiki.config.sevenzip ) ): |
3112 | | - raise BackupError("sevenzip command %s not found" % runner.wiki.config.sevenzip) |
3113 | | - compressionCommand = "%s a -si" % runner.wiki.config.sevenzip |
3114 | | - uncompressionCommand = [ "%s" % runner.wiki.config.sevenzip, "e", "-so" ] |
| 3216 | + if (not exists( self.wiki.config.sevenzip ) ): |
| 3217 | + raise BackupError("sevenzip command %s not found" % self.wiki.config.sevenzip) |
| 3218 | + compressionCommand = "%s a -si" % self.wiki.config.sevenzip |
| 3219 | + uncompressionCommand = [ "%s" % self.wiki.config.sevenzip, "e", "-so" ] |
3115 | 3220 | |
3116 | 3221 | recombineCommandString = self.buildRecombineCommandString(runner, files, outputFile, compressionCommand, uncompressionCommand ) |
3117 | 3222 | recombineCommand = [ recombineCommandString ] |
— | — | @@ -3361,7 +3466,9 @@ |
3362 | 3467 | if message: |
3363 | 3468 | print message |
3364 | 3469 | print "Usage: python worker.py [options] [wikidbname]" |
3365 | | - print "Options: --chunk, --configfile, --date, --job, --addnotice, --delnotice, --force, --noprefetch, --nospawn, --restartfrom, --log" |
| 3470 | + print "Options: --checkpoint, --chunk, --configfile, --date, --job, --addnotice, --delnotice, --force, --noprefetch, --nospawn, --restartfrom, --log" |
| 3471 | + print "--checkpoint: Specify the name of the checkpoint file to rerun (requires --job," |
| 3472 | + print " depending on the file this may imply --chunk)" |
3366 | 3473 | print "--chunk: Specify the number of the chunk to rerun (use with a specific job" |
3367 | 3474 | print " to rerun, only if parallel jobs (chunks) are enabled)." |
3368 | 3475 | print "--configfile: Specify an alternative configuration file to read." |
— | — | @@ -3407,10 +3514,11 @@ |
3408 | 3515 | htmlNotice = "" |
3409 | 3516 | dryrun = False |
3410 | 3517 | chunkToDo = False |
| 3518 | + checkpointFile = None |
3411 | 3519 | |
3412 | 3520 | try: |
3413 | 3521 | (options, remainder) = getopt.gnu_getopt(sys.argv[1:], "", |
3414 | | - ['date=', 'job=', 'configfile=', 'addnotice=', 'delnotice', 'force', 'dryrun', 'noprefetch', 'nospawn', 'restartfrom', 'log', 'chunk=' ]) |
| 3522 | + ['date=', 'job=', 'configfile=', 'addnotice=', 'delnotice', 'force', 'dryrun', 'noprefetch', 'nospawn', 'restartfrom', 'log', 'chunk=', 'checkpoint=' ]) |
3415 | 3523 | except: |
3416 | 3524 | usage("Unknown option specified") |
3417 | 3525 | |
— | — | @@ -3419,6 +3527,8 @@ |
3420 | 3528 | date = val |
3421 | 3529 | elif opt == "--configfile": |
3422 | 3530 | configFile = val |
| 3531 | + elif opt == '--checkpoint': |
| 3532 | + checkpointFile = val |
3423 | 3533 | elif opt == '--chunk': |
3424 | 3534 | chunkToDo = int(val) |
3425 | 3535 | elif opt == "--force": |
— | — | @@ -3452,6 +3562,10 @@ |
3453 | 3563 | usage("--chunk option requires a specific job for which to rerun that chunk") |
3454 | 3564 | if (chunkToDo and restart): |
3455 | 3565 | usage("--chunk option can be specified only for one specific job") |
| 3566 | + if checkpointFile and (len(remainder) == 0): |
| 3567 | + usage("--checkpoint option requires the name of a wikidb to be specified") |
| 3568 | + if checkpointFile and not jobRequested: |
| 3569 | + usage("--chekcpoint option requires --job and the job from which to restart") |
3456 | 3570 | |
3457 | 3571 | # allow alternate config file |
3458 | 3572 | if (configFile): |
— | — | @@ -3488,7 +3602,7 @@ |
3489 | 3603 | date = TimeUtils.today() |
3490 | 3604 | wiki.setDate(date) |
3491 | 3605 | |
3492 | | - runner = Runner(wiki, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo) |
| 3606 | + runner = Runner(wiki, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo, checkpointFile) |
3493 | 3607 | if (restart): |
3494 | 3608 | print "Running %s, restarting from job %s..." % (wiki.dbName, jobRequested) |
3495 | 3609 | elif (jobRequested): |