Index: branches/ariel/xmldumps-backup/worker.py |
— | — | @@ -1021,7 +1021,6 @@ |
1022 | 1022 | return os.path.join(self.wiki.publicDir(), self.date); |
1023 | 1023 | |
1024 | 1024 | class Runner(object): |
1025 | | - |
1026 | 1025 | def __init__(self, wiki, date=None, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False): |
1027 | 1026 | self.wiki = wiki |
1028 | 1027 | self.dbName = wiki.dbName |
— | — | @@ -1029,17 +1028,22 @@ |
1030 | 1029 | self.spawn = spawn |
1031 | 1030 | self.chunkInfo = Chunk(wiki, self.dbName, self.logAndPrint) |
1032 | 1031 | self.restart = restart |
1033 | | - self.loggingEnabled = loggingEnabled |
1034 | 1032 | self.htmlNoticeFile = None |
1035 | 1033 | self.log = None |
1036 | 1034 | self.dryrun = dryrun |
1037 | 1035 | self._chunkToDo = chunkToDo |
| 1036 | + |
| 1037 | + self._loggingEnabled = loggingEnabled |
1038 | 1038 | self._statusEnabled = True |
1039 | 1039 | self._checksummerEnabled = True |
1040 | 1040 | self._runInfoFileEnabled = True |
1041 | 1041 | self._symLinksEnabled = True |
1042 | 1042 | self._feedsEnabled = True |
1043 | 1043 | self._noticeFileEnabled = True |
| 1044 | + self._makeDirEnabled = True |
| 1045 | + self._cleanOldDumpsEnabled = True |
| 1046 | + self._cleanupOldFilesEnabled = False |
| 1047 | + self._checkForTruncatedFilesEnabled = True |
1044 | 1048 | |
1045 | 1049 | if self.dryrun or self._chunkToDo: |
1046 | 1050 | self._statusEnabled = False |
— | — | @@ -1048,8 +1052,13 @@ |
1049 | 1053 | self._symLinksEnabled = False |
1050 | 1054 | self._feedsEnabled = False |
1051 | 1055 | self._noticeFileEnabled = False |
| 1056 | + self._makeDirEnabled = False |
| 1057 | + self._cleanOldDumpsEnabled = False |
| 1058 | + self._cleanupOldFilesEnables = False |
| 1059 | + |
1052 | 1060 | if self.dryrun: |
1053 | | - self.loggingEnabled = False |
| 1061 | + self._loggingEnabled = False |
| 1062 | + self._checkForTruncatedFilesEnabled = False |
1054 | 1063 | |
1055 | 1064 | if date: |
1056 | 1065 | # Override, continuing a past dump? |
— | — | @@ -1065,7 +1074,7 @@ |
1066 | 1075 | self.lastFailed = False |
1067 | 1076 | |
1068 | 1077 | # these must come after the dumpdir setup so we know which directory we are in |
1069 | | - if (loggingEnabled): |
| 1078 | + if (self._loggingEnabled and self._makeDirEnabled): |
1070 | 1079 | self.logFileName = self.dumpDir.publicPath(self.wiki.config.logFile) |
1071 | 1080 | self.makeDir(join(self.wiki.publicDir(), self.date)) |
1072 | 1081 | self.log = Logger(self.logFileName) |
— | — | @@ -1088,7 +1097,7 @@ |
1089 | 1098 | done = log.doJobOnLogQueue() |
1090 | 1099 | |
1091 | 1100 | def logAndPrint(self, message): |
1092 | | - if hasattr(self,'log') and self.log and not self.dryrun: |
| 1101 | + if hasattr(self,'log') and self.log and self._loggingEnabled: |
1093 | 1102 | self.log.addToLogQueue("%s\n" % message) |
1094 | 1103 | print message |
1095 | 1104 | |
— | — | @@ -1098,9 +1107,8 @@ |
1099 | 1108 | else: |
1100 | 1109 | return "" |
1101 | 1110 | |
1102 | | - def remove(self, filename): |
1103 | | - if not self.dryrun: |
1104 | | - os.remove(filename) |
| 1111 | + def removeFile(self, filename): |
| 1112 | + os.remove(filename) |
1105 | 1113 | |
1106 | 1114 | # returns 0 on success, 1 on error |
1107 | 1115 | def saveTable(self, table, outfile): |
— | — | @@ -1224,9 +1232,8 @@ |
1225 | 1233 | # mark all the following jobs to run as well |
1226 | 1234 | self.dumpItemList.markFollowingJobsToRun() |
1227 | 1235 | |
1228 | | - if not self.dryrun: |
1229 | | - self.makeDir(join(self.wiki.publicDir(), self.date)) |
1230 | | - self.makeDir(join(self.wiki.privateDir(), self.date)) |
| 1236 | + self.makeDir(join(self.wiki.publicDir(), self.date)) |
| 1237 | + self.makeDir(join(self.wiki.privateDir(), self.date)) |
1231 | 1238 | |
1232 | 1239 | if (self.restart): |
1233 | 1240 | self.logAndPrint("Preparing for restart from job %s of %s" % (self.jobRequested, self.dbName)) |
— | — | @@ -1250,12 +1257,12 @@ |
1251 | 1258 | except Exception, ex: |
1252 | 1259 | self.debug("*** exception! " + str(ex)) |
1253 | 1260 | item.setStatus("failed") |
1254 | | - if item.status() == "failed" and not self.dryrun and not self._chunkToDo: |
| 1261 | + if item.status() == "failed": |
1255 | 1262 | self.runHandleFailure() |
1256 | 1263 | else: |
1257 | 1264 | self.lastFailed = False |
1258 | 1265 | # this ensures that, previous run or new one, the old or new md5sums go to the file |
1259 | | - if item.status() == "done" and not self.dryrun and not self._chunkToDo: |
| 1266 | + if item.status() == "done": |
1260 | 1267 | self.runUpdateItemFileInfo(item) |
1261 | 1268 | |
1262 | 1269 | if (self.dumpItemList.allPossibleJobsDone()): |
— | — | @@ -1263,10 +1270,9 @@ |
1264 | 1271 | else: |
1265 | 1272 | self.status.updateStatusFiles("partialdone") |
1266 | 1273 | self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
1267 | | - if not self.dryrun and not self._chunkToDo: |
1268 | | - # if any job succeeds we might as well make the sym link |
1269 | | - if (self.status.failCount < 1): |
1270 | | - self.completeDump() |
| 1274 | + # if any job succeeds we might as well make the sym link |
| 1275 | + if (self.status.failCount < 1): |
| 1276 | + self.completeDump() |
1271 | 1277 | |
1272 | 1278 | if (self.restart): |
1273 | 1279 | self.showRunnerState("Completed run restarting from job %s for %s" % (self.jobRequested, self.dbName)) |
— | — | @@ -1285,40 +1291,38 @@ |
1286 | 1292 | except Exception, ex: |
1287 | 1293 | self.debug("*** exception! " + str(ex)) |
1288 | 1294 | item.setStatus("failed") |
1289 | | - if item.status() == "failed" and not self.dryrun and not self._chunkToDo: |
| 1295 | + if item.status() == "failed": |
1290 | 1296 | self.runHandleFailure() |
1291 | 1297 | else: |
1292 | | - if not self.dryrun and not self._chunkToDo: |
1293 | | - self.runUpdateItemFileInfo(item) |
1294 | | - self.checksums.cpMd5TmpFileToPermFile() |
| 1298 | + self.runUpdateItemFileInfo(item) |
| 1299 | + self.checksums.cpMd5TmpFileToPermFile() |
1295 | 1300 | self.lastFailed = False |
1296 | 1301 | |
1297 | 1302 | self.status.updateStatusFiles("done") |
1298 | | - if not self.dryrun and not self._chunkToDo: |
1299 | | - self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
1300 | | - if self.status.failCount < 1: |
1301 | | - self.completeDump() |
| 1303 | + self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
| 1304 | + if self.status.failCount < 1: |
| 1305 | + self.completeDump() |
1302 | 1306 | |
1303 | 1307 | self.showRunnerStateComplete() |
1304 | 1308 | |
1305 | 1309 | def cleanOldDumps(self): |
1306 | | - old = self.wiki.dumpDirs() |
1307 | | - if old: |
1308 | | - if old[-1] == self.date: |
1309 | | - # If we're re-running today's (or jobs from a given day's) dump, don't count it as one |
1310 | | - # of the old dumps to keep... or delete it halfway through! |
1311 | | - old = old[:-1] |
1312 | | - if self.wiki.config.keep > 0: |
1313 | | - # Keep the last few |
1314 | | - old = old[:-(self.wiki.config.keep)] |
1315 | | - if old: |
1316 | | - for dump in old: |
1317 | | - self.showRunnerState("Purging old dump %s for %s" % (dump, self.dbName)) |
1318 | | - if not self.dryrun and not self._chunkToDo: |
| 1310 | + if self._cleanOldDumpsEnabled: |
| 1311 | + old = self.wiki.dumpDirs() |
| 1312 | + if old: |
| 1313 | + if old[-1] == self.date: |
| 1314 | + # If we're re-running today's (or jobs from a given day's) dump, don't count it as one |
| 1315 | + # of the old dumps to keep... or delete it halfway through! |
| 1316 | + old = old[:-1] |
| 1317 | + if self.wiki.config.keep > 0: |
| 1318 | + # Keep the last few |
| 1319 | + old = old[:-(self.wiki.config.keep)] |
| 1320 | + if old: |
| 1321 | + for dump in old: |
| 1322 | + self.showRunnerState("Purging old dump %s for %s" % (dump, self.dbName)) |
1319 | 1323 | base = os.path.join(self.wiki.publicDir(), dump) |
1320 | 1324 | shutil.rmtree("%s" % base) |
1321 | | - else: |
1322 | | - self.showRunnerState("No old dumps to purge.") |
| 1325 | + else: |
| 1326 | + self.showRunnerState("No old dumps to purge.") |
1323 | 1327 | |
1324 | 1328 | def showRunnerState(self, message): |
1325 | 1329 | self.debug(message) |
— | — | @@ -1335,11 +1339,12 @@ |
1336 | 1340 | self.symLinks.saveSymlink(self.checksums.getChecksumFileNameBasename()) |
1337 | 1341 | |
1338 | 1342 | def makeDir(self, dir): |
1339 | | - if exists(dir): |
1340 | | - self.debug("Checkdir dir %s ..." % dir) |
1341 | | - else: |
1342 | | - self.debug("Creating %s ..." % dir) |
1343 | | - os.makedirs(dir) |
| 1343 | + if self._makeDirEnabled: |
| 1344 | + if exists(dir): |
| 1345 | + self.debug("Checkdir dir %s ..." % dir) |
| 1346 | + else: |
| 1347 | + self.debug("Creating %s ..." % dir) |
| 1348 | + os.makedirs(dir) |
1344 | 1349 | |
1345 | 1350 | class SymLinks(object): |
1346 | 1351 | def __init__(self, wiki, dumpDir, date, logfn, debugfn, enabled): |
— | — | @@ -1351,11 +1356,12 @@ |
1352 | 1357 | self.debugfn = debugfn |
1353 | 1358 | |
1354 | 1359 | def makeDir(self, dir): |
1355 | | - if exists(dir): |
1356 | | - self.debugfn("Checkdir dir %s ..." % dir) |
1357 | | - else: |
1358 | | - self.debugfn("Creating %s ..." % dir) |
1359 | | - os.makedirs(dir) |
| 1360 | + if (self._enabled): |
| 1361 | + if exists(dir): |
| 1362 | + self.debugfn("Checkdir dir %s ..." % dir) |
| 1363 | + else: |
| 1364 | + self.debugfn("Creating %s ..." % dir) |
| 1365 | + os.makedirs(dir) |
1360 | 1366 | |
1361 | 1367 | def saveSymlink(self, file): |
1362 | 1368 | if (self._enabled): |
— | — | @@ -1376,7 +1382,7 @@ |
1377 | 1383 | # no file or it's older than ours... *then* remove the link |
1378 | 1384 | if not exists(os.path.realpath(link)) or dateinterval > 0: |
1379 | 1385 | self.debug("Removing old symlink %s" % link) |
1380 | | - os.remove(link) |
| 1386 | + runner.removeFile(link) |
1381 | 1387 | else: |
1382 | 1388 | self.logfn("What the hell dude, %s is not a symlink" % link) |
1383 | 1389 | raise BackupError("What the hell dude, %s is not a symlink" % link) |
— | — | @@ -1395,30 +1401,31 @@ |
1396 | 1402 | self._enabled = enabled |
1397 | 1403 | |
1398 | 1404 | def makeDir(self, dir): |
1399 | | - if exists(dir): |
1400 | | - self.debugfn("Checkdir dir %s ..." % dir) |
1401 | | - else: |
1402 | | - self.debugfn("Creating %s ..." % dir) |
1403 | | - os.makedirs(dir) |
| 1405 | + if (self._enabled): |
| 1406 | + if exists(dir): |
| 1407 | + self.debugfn("Checkdir dir %s ..." % dir) |
| 1408 | + else: |
| 1409 | + self.debugfn("Creating %s ..." % dir) |
| 1410 | + os.makedirs(dir) |
1404 | 1411 | |
1405 | 1412 | def saveFeed(self, file): |
1406 | | - self.makeDir(join(self.wiki.publicDir(), 'latest')) |
1407 | | - filePath = self.dumpDir.webPath(file) |
1408 | | - fileName = os.path.basename(filePath) |
1409 | | - webPath = os.path.dirname(filePath) |
1410 | | - rssText = self.wiki.config.readTemplate("feed.xml") % { |
1411 | | - "chantitle": file, |
1412 | | - "chanlink": webPath, |
1413 | | - "chandesc": "Wikimedia dump updates for %s" % self.dbName, |
1414 | | - "title": webPath, |
1415 | | - "link": webPath, |
1416 | | - "description": xmlEscape("<a href=\"%s\">%s</a>" % (filePath, fileName)), |
1417 | | - "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())} |
1418 | | - directory = self.dumpDir.latestDir() |
1419 | | - rssPath = self.dumpDir.latestPath(file + "-rss.xml") |
1420 | | - FileUtils.writeFile(directory, rssPath, rssText, self.wiki.config.fileperms) |
| 1413 | + if (self._enabled): |
| 1414 | + self.makeDir(join(self.wiki.publicDir(), 'latest')) |
| 1415 | + filePath = self.dumpDir.webPath(file) |
| 1416 | + fileName = os.path.basename(filePath) |
| 1417 | + webPath = os.path.dirname(filePath) |
| 1418 | + rssText = self.wiki.config.readTemplate("feed.xml") % { |
| 1419 | + "chantitle": file, |
| 1420 | + "chanlink": webPath, |
| 1421 | + "chandesc": "Wikimedia dump updates for %s" % self.dbName, |
| 1422 | + "title": webPath, |
| 1423 | + "link": webPath, |
| 1424 | + "description": xmlEscape("<a href=\"%s\">%s</a>" % (filePath, fileName)), |
| 1425 | + "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())} |
| 1426 | + directory = self.dumpDir.latestDir() |
| 1427 | + rssPath = self.dumpDir.latestPath(file + "-rss.xml") |
| 1428 | + FileUtils.writeFile(directory, rssPath, rssText, self.wiki.config.fileperms) |
1421 | 1429 | |
1422 | | - |
1423 | 1430 | class Dump(object): |
1424 | 1431 | def __init__(self, name, desc): |
1425 | 1432 | self._desc = desc |
— | — | @@ -1565,9 +1572,10 @@ |
1566 | 1573 | return(recombineCommandString) |
1567 | 1574 | |
1568 | 1575 | def cleanupOldFiles(self, runner, outputFileBasename): |
1569 | | - outputFilename = self.buildOutputFilename(runner, outputFileBasename) |
1570 | | - if exists(outputFilename): |
1571 | | - runner.remove(outputFilename) |
| 1576 | + if (runner._cleanupOldFilesEnabled): |
| 1577 | + outputFilename = self.buildOutputFilename(runner, outputFileBasename) |
| 1578 | + if exists(outputFilename): |
| 1579 | + runner.removeFile(outputFilename) |
1572 | 1580 | |
1573 | 1581 | def buildOutputFilename(self, runner, outputFileBasename): |
1574 | 1582 | return outputFilename |
— | — | @@ -1685,10 +1693,11 @@ |
1686 | 1694 | return(series) |
1687 | 1695 | |
1688 | 1696 | def cleanupOldFiles(self, runner, chunk = 0): |
1689 | | - fileList = self.buildOutputFilenames(runner, chunk) |
1690 | | - for filename in fileList: |
1691 | | - if exists(filename): |
1692 | | - runner.remove(filename) |
| 1697 | + if (runner._cleanupOldFilesEnabled): |
| 1698 | + fileList = self.buildOutputFilenames(runner, chunk) |
| 1699 | + for filename in fileList: |
| 1700 | + if exists(filename): |
| 1701 | + runner.removeFile(filename) |
1693 | 1702 | |
1694 | 1703 | def buildHistoryOutputFilename(self, runner, chunk = 0): |
1695 | 1704 | if (chunk): |
— | — | @@ -1801,9 +1810,10 @@ |
1802 | 1811 | return ["pages-logging.xml.gz"] |
1803 | 1812 | |
1804 | 1813 | def cleanupOldFiles(self, runner): |
1805 | | - logging = self.buildOutputFilename(runner) |
1806 | | - if exists(logging): |
1807 | | - runner.remove(logging) |
| 1814 | + if (runner._cleanupOldFilesEnabled): |
| 1815 | + logging = self.buildOutputFilename(runner) |
| 1816 | + if exists(logging): |
| 1817 | + runner.removeFile(logging) |
1808 | 1818 | |
1809 | 1819 | def buildOutputFilename(self, runner): |
1810 | 1820 | logging = runner.dumpDir.publicPath("pages-logging.xml.gz") |
— | — | @@ -1869,33 +1879,39 @@ |
1870 | 1880 | commands.append(series) |
1871 | 1881 | error = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner) |
1872 | 1882 | |
1873 | | - if (not exists( runner.wiki.config.checkforbz2footer ) ): |
1874 | | - raise BackupError("checkforbz2footer command %s not found" % runner.wiki.config.checkforbz2footer); |
1875 | | - checkforbz2footer = "%s" % runner.wiki.config.checkforbz2footer |
1876 | | - if exists(checkforbz2footer): |
1877 | | - # check to see if any of the output files are truncated |
1878 | | - files = [] |
1879 | | - if (self._chunks): |
1880 | | - if (self._chunkToDo): |
1881 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
1882 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
1883 | | - files.append( self._path(runner, 'bz2', self._chunkToDo ) ) |
1884 | | - else: |
1885 | | - for i in range(1, len(self._chunks)+1): |
1886 | | - files.append( self._path(runner, 'bz2', i ) ) |
| 1883 | + truncationError = self.checkForTruncatedFiles(runner) |
1887 | 1884 | |
1888 | | - for f in files: |
1889 | | - pipeline = [] |
1890 | | - pipeline.append([ checkforbz2footer, f ]) |
1891 | | - p = CommandPipeline(pipeline, quiet=True) |
1892 | | - p.runPipelineAndGetOutput() |
1893 | | - if not p.exitedSuccessfully(): |
1894 | | - runner.logAndPrint("file %s is truncated, moving out of the way" %f ) |
1895 | | - os.rename( f, f + ".truncated" ) |
1896 | | - error = 1 |
1897 | | - if (error): |
| 1885 | + if (error or truncationError): |
1898 | 1886 | raise BackupError("error producing xml bz2 file(s) %s" % self._subset) |
1899 | 1887 | |
| 1888 | + def checkForTruncatedFiles(self, runner): |
| 1889 | + if runner._checkForTruncatedFilesEnabled: |
| 1890 | + if (not exists( runner.wiki.config.checkforbz2footer ) ): |
| 1891 | + raise BackupError("checkforbz2footer command %s not found" % runner.wiki.config.checkforbz2footer); |
| 1892 | + checkforbz2footer = "%s" % runner.wiki.config.checkforbz2footer |
| 1893 | + if exists(checkforbz2footer): |
| 1894 | + # check to see if any of the output files are truncated |
| 1895 | + files = [] |
| 1896 | + if (self._chunks): |
| 1897 | + if (self._chunkToDo): |
| 1898 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 1899 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 1900 | + files.append( self._path(runner, 'bz2', self._chunkToDo ) ) |
| 1901 | + else: |
| 1902 | + for i in range(1, len(self._chunks)+1): |
| 1903 | + files.append( self._path(runner, 'bz2', i ) ) |
| 1904 | + |
| 1905 | + for f in files: |
| 1906 | + pipeline = [] |
| 1907 | + pipeline.append([ checkforbz2footer, f ]) |
| 1908 | + p = CommandPipeline(pipeline, quiet=True) |
| 1909 | + p.runPipelineAndGetOutput() |
| 1910 | + if not p.exitedSuccessfully(): |
| 1911 | + runner.logAndPrint("file %s is truncated, moving out of the way" %f ) |
| 1912 | + os.renameFile( f, f + ".truncated" ) |
| 1913 | + return 1 |
| 1914 | + return 0 |
| 1915 | + |
1900 | 1916 | def buildEta(self, runner): |
1901 | 1917 | """Tell the dumper script whether to make ETA estimate on page or revision count.""" |
1902 | 1918 | return "--current" |
— | — | @@ -2216,9 +2232,10 @@ |
2217 | 2233 | return(commandSeries) |
2218 | 2234 | |
2219 | 2235 | def cleanupOldFiles(self, runner, chunk = 0): |
2220 | | - xml7z = self.buildOutputFilename(runner, chunk) |
2221 | | - if exists(xml7z): |
2222 | | - runner.remove(xml7z) |
| 2236 | + if (runner._cleanupOldFilesEnabled): |
| 2237 | + xml7z = self.buildOutputFilename(runner, chunk) |
| 2238 | + if exists(xml7z): |
| 2239 | + runner.removeFile(xml7z) |
2223 | 2240 | |
2224 | 2241 | def run(self, runner): |
2225 | 2242 | if runner.lastFailed: |
— | — | @@ -2297,11 +2314,12 @@ |
2298 | 2315 | return [ self._file("7z",0) ] |
2299 | 2316 | |
2300 | 2317 | def cleanupOldFiles(self, runner): |
2301 | | - files = self.listOutputFiles(runner) |
2302 | | - for filename in files: |
2303 | | - filename = runner.dumpDir.publicPath(filename) |
2304 | | - if exists(filename): |
2305 | | - runner.remove(filename) |
| 2318 | + if (runner._cleanupOldFilesEnabled): |
| 2319 | + files = self.listOutputFiles(runner) |
| 2320 | + for filename in files: |
| 2321 | + filename = runner.dumpDir.publicPath(filename) |
| 2322 | + if exists(filename): |
| 2323 | + runner.removeFile(filename) |
2306 | 2324 | |
2307 | 2325 | def run(self, runner): |
2308 | 2326 | error = 0 |