r106443 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r106442‎ | r106443 | r106444 >
Date:15:57, 16 December 2011
Author:ariel
Status:deferred
Tags:
Comment:
* Add "aftercheckpoint" option which will restart a job step from immediately
after the specified checkpoint file
* When doing dump using pageidrange, delete old files from same date covering
that same range of pages; also true for aftercheckpoint option
* Clean up old symlinks and rss feed entries when running "latestlinks" job
* "verbose" option for more ... verbosity (debugging)
* In human-readable description of various dump files, change "image" to
"media/files" (thanks to Danny_B for that)
Modified paths:
  • /branches/ariel/xmldumps-backup/worker.py (modified) (history)

Diff [purge]

Index: branches/ariel/xmldumps-backup/worker.py
@@ -18,6 +18,7 @@
1919 import CommandManagement
2020 import Queue
2121 import thread
 22+import traceback
2223
2324 from os.path import exists
2425 from subprocess import Popen, PIPE
@@ -369,9 +370,10 @@
370371 pass
371372
372373 class RunInfoFile(object):
373 - def __init__(self, wiki, enabled):
 374+ def __init__(self, wiki, enabled, verbose = False):
374375 self.wiki = wiki
375376 self._enabled = enabled
 377+ self.verbose = verbose
376378
377379 def saveDumpRunInfoFile(self, text):
378380 """Write out a simple text file with the status for this wiki's dump."""
@@ -379,6 +381,9 @@
380382 try:
381383 self._writeDumpRunInfoFile(text)
382384 except:
 385+ if (self.verbose):
 386+ exc_type, exc_value, exc_traceback = sys.exc_info()
 387+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
383388 print "Couldn't save dump run info file. Continuing anyways"
384389
385390 def statusOfOldDumpIsDone(self, runner, date, jobName, jobDesc):
@@ -410,6 +415,9 @@
411416 infile.close
412417 return results
413418 except:
 419+ if (self.verbose):
 420+ exc_type, exc_value, exc_traceback = sys.exc_info()
 421+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
414422 return False
415423
416424 #
@@ -481,6 +489,9 @@
482490 infile.close
483491 return None
484492 except:
 493+ if (self.verbose):
 494+ exc_type, exc_value, exc_traceback = sys.exc_info()
 495+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
485496 return None
486497
487498 # find desc in there, look for "class='done'"
@@ -506,6 +517,9 @@
507518 infile.close
508519 return None
509520 except:
 521+ if (self.verbose):
 522+ exc_type, exc_value, exc_traceback = sys.exc_info()
 523+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
510524 return None
511525
512526
@@ -591,11 +605,11 @@
592606 #PrivateTable("filearchive", "filearchivetable", "Deleted image data"),
593607
594608 PublicTable("site_stats", "sitestatstable", "A few statistics such as the page count."),
595 - PublicTable("image", "imagetable", "Metadata on current versions of uploaded images."),
596 - PublicTable("oldimage", "oldimagetable", "Metadata on prior versions of uploaded images."),
 609+ PublicTable("image", "imagetable", "Metadata on current versions of uploaded media/files."),
 610+ PublicTable("oldimage", "oldimagetable", "Metadata on prior versions of uploaded media/files."),
597611 PublicTable("pagelinks", "pagelinkstable", "Wiki page-to-page link records."),
598612 PublicTable("categorylinks", "categorylinkstable", "Wiki category membership link records."),
599 - PublicTable("imagelinks", "imagelinkstable", "Wiki image usage records."),
 613+ PublicTable("imagelinks", "imagelinkstable", "Wiki media/files usage records."),
600614 PublicTable("templatelinks", "templatelinkstable", "Wiki template inclusion link records."),
601615 PublicTable("externallinks", "externallinkstable", "Wiki external URL link records."),
602616 PublicTable("langlinks", "langlinkstable", "Wiki interlanguage link records."),
@@ -627,10 +641,10 @@
628642 self.dumpItems.append(
629643 XmlDump("articles",
630644 "articlesdump",
631 - "<big><b>Articles, templates, image descriptions, and primary meta-pages.</b></big>",
 645+ "<big><b>Articles, templates, media/file descriptions, and primary meta-pages.</b></big>",
632646 "This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("articlesdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints, self.checkpointFile, self.pageIDRange))
633647 if (self.chunkInfo.chunksEnabled()):
634 - self.dumpItems.append(RecombineXmlDump("articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('articlesdump')))
 648+ self.dumpItems.append(RecombineXmlDump("articlesdumprecombine", "<big><b>Recombine articles, templates, media/file descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('articlesdump')))
635649
636650 self.dumpItems.append(
637651 XmlDump("meta-current",
@@ -801,9 +815,10 @@
802816 return "name:%s; status:%s; updated:%s" % (item.name(), item.status(), item.updated())
803817
804818 class Checksummer(object):
805 - def __init__(self,wiki,dumpDir, enabled = True):
 819+ def __init__(self,wiki,dumpDir, enabled = True, verbose = False):
806820 self.wiki = wiki
807821 self.dumpDir = dumpDir
 822+ self.verbose = verbose
808823 self.timestamp = time.strftime("%Y%m%d%H%M%S", time.gmtime())
809824 self._enabled = enabled
810825
@@ -821,7 +836,7 @@
822837 checksumFileName = self._getChecksumFileNameTmp()
823838 output = file(checksumFileName, "a")
824839 runner.debug("Checksumming %s" % fileObj.filename)
825 - dumpfile = DumpFile(self.wiki, runner.dumpDir.filenamePublicPath(fileObj))
 840+ dumpfile = DumpFile(self.wiki, runner.dumpDir.filenamePublicPath(fileObj),None,self.verbose)
826841 checksum = dumpfile.md5Sum()
827842 if checksum != None:
828843 output.write( "%s %s\n" % (checksum, fileObj.filename))
@@ -1196,7 +1211,7 @@
11971212 looking for page and id tags, wihout other tags in between. (hmm)
11981213 filename full filename with directory
11991214 """
1200 - def __init__(self, wiki, filename, fileObj = None):
 1215+ def __init__(self, wiki, filename, fileObj = None, verbose = False):
12011216 """takes full filename including path"""
12021217 self._wiki = wiki
12031218 self.filename = filename
@@ -1337,6 +1352,9 @@
13381353 try:
13391354 os.rename(self.filename, os.path.join(self.dirname,newname))
13401355 except:
 1356+ if (self.verbose):
 1357+ exc_type, exc_value, exc_traceback = sys.exc_info()
 1358+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
13411359 raise BackupError("failed to rename file %s" % self.filename)
13421360
13431361 self.filename = os.path.join(self.dirname,newname)
@@ -1344,7 +1362,7 @@
13451363 # everything that has to do with reporting the status of a piece
13461364 # of a dump is collected here
13471365 class Status(object):
1348 - def __init__(self, wiki, dumpDir, items, checksums, enabled, noticeFile = None, errorCallback=None):
 1366+ def __init__(self, wiki, dumpDir, items, checksums, enabled, noticeFile = None, errorCallback=None, verbose = False):
13491367 self.wiki = wiki
13501368 self.dbName = wiki.dbName
13511369 self.dumpDir = dumpDir
@@ -1353,6 +1371,7 @@
13541372 self.noticeFile = noticeFile
13551373 self.errorCallback = errorCallback
13561374 self.failCount = 0
 1375+ self.verbose = verbose
13571376 self._enabled = enabled
13581377
13591378 def updateStatusFiles(self, done=False):
@@ -1400,6 +1419,9 @@
14011420 # Short line for report extraction goes here
14021421 self.wiki.writeStatus(self._reportDatabaseStatusSummary(done))
14031422 except:
 1423+ if (self.verbose):
 1424+ exc_type, exc_value, exc_traceback = sys.exc_info()
 1425+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
14041426 message = "Couldn't update status files. Continuing anyways"
14051427 if self.errorCallback:
14061428 self.errorCallback(message)
@@ -1449,6 +1471,9 @@
14501472 else:
14511473 raise(ValueException)
14521474 except:
 1475+ if (self.verbose):
 1476+ exc_type, exc_value, exc_traceback = sys.exc_info()
 1477+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
14531478 return "No prior dumps of this database stored."
14541479 prettyDate = TimeUtils.prettyDate(rawDate)
14551480 if done:
@@ -1542,7 +1567,7 @@
15431568 return os.path.join(self.wiki.publicDir(), self.wiki.date)
15441569
15451570 class Runner(object):
1546 - def __init__(self, wiki, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False, checkpointFile = None, pageIDRange = None):
 1571+ def __init__(self, wiki, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False, checkpointFile = None, pageIDRange = None, verbose = False):
15471572 self.wiki = wiki
15481573 self.dbName = wiki.dbName
15491574 self.prefetch = prefetch
@@ -1553,8 +1578,9 @@
15541579 self.log = None
15551580 self.dryrun = dryrun
15561581 self._chunkToDo = chunkToDo
1557 - self.checkpointFile = None
 1582+ self.checkpointFile = checkpointFile
15581583 self.pageIDRange = pageIDRange
 1584+ self.verbose = verbose
15591585
15601586 if (self.checkpointFile):
15611587 f = DumpFilename(self.wiki)
@@ -1611,8 +1637,7 @@
16121638 self._feedsEnabled = False
16131639 self._noticeFileEnabled = False
16141640 self._makeDirEnabled = False
1615 - self._cleanOldDumpsEnabled = False
1616 - self._cleanupOldFilesEnabled = False
 1641+ self._cleanupOldFilesEnabled = True
16171642
16181643 self.jobRequested = job
16191644
@@ -1644,15 +1669,15 @@
16451670 self.makeDir(os.path.join(self.wiki.publicDir(), self.wiki.date))
16461671 self.log = Logger(self.logFileName)
16471672 thread.start_new_thread(self.logQueueReader,(self.log,))
1648 - self.runInfoFile = RunInfoFile(wiki,self._runInfoFileEnabled)
 1673+ self.runInfoFile = RunInfoFile(wiki,self._runInfoFileEnabled, self.verbose)
16491674 self.symLinks = SymLinks(self.wiki, self.dumpDir, self.logAndPrint, self.debug, self._symLinksEnabled)
16501675 self.feeds = Feeds(self.wiki,self.dumpDir, self.dbName, self.debug, self._feedsEnabled)
16511676 self.htmlNoticeFile = NoticeFile(self.wiki, notice, self._noticeFileEnabled)
1652 - self.checksums = Checksummer(self.wiki, self.dumpDir, self._checksummerEnabled)
 1677+ self.checksums = Checksummer(self.wiki, self.dumpDir, self._checksummerEnabled, self.verbose)
16531678
16541679 # some or all of these dumpItems will be marked to run
16551680 self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self._chunkToDo, self.checkpointFile, self.jobRequested, self.chunkInfo, self.pageIDRange, self.runInfoFile, self.dumpDir)
1656 - self.status = Status(self.wiki, self.dumpDir, self.dumpItemList.dumpItems, self.checksums, self._statusEnabled, self.htmlNoticeFile, self.logAndPrint)
 1681+ self.status = Status(self.wiki, self.dumpDir, self.dumpItemList.dumpItems, self.checksums, self._statusEnabled, self.htmlNoticeFile, self.logAndPrint, self.verbose)
16571682
16581683 def logQueueReader(self,log):
16591684 if not log:
@@ -1806,7 +1831,11 @@
18071832 try:
18081833 item.dump(self)
18091834 except Exception, ex:
1810 - self.debug("*** exception! " + str(ex))
 1835+ exc_type, exc_value, exc_traceback = sys.exc_info()
 1836+ if (self.verbose):
 1837+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
 1838+ else:
 1839+ self.debug("*** exception! " + str(ex))
18111840 item.setStatus("failed")
18121841 if item.status() == "failed":
18131842 self.runHandleFailure()
@@ -1824,7 +1853,13 @@
18251854 # if any job succeeds we might as well make the sym link
18261855 if (self.status.failCount < 1):
18271856 self.completeDump()
1828 -
 1857+
 1858+ # special case...
 1859+ if self.jobRequested == "latestlinks":
 1860+ if (self.dumpItemList.allPossibleJobsDone()):
 1861+ self.symLinks.removeSymLinksFromOldRuns(self.wiki.date)
 1862+ self.feeds.cleanupFeeds()
 1863+
18291864 if (self.restart):
18301865 self.showRunnerState("Completed run restarting from job %s for %s" % (self.jobRequested, self.dbName))
18311866 else:
@@ -1841,7 +1876,11 @@
18421877 try:
18431878 item.dump(self)
18441879 except Exception, ex:
1845 - self.debug("*** exception! " + str(ex))
 1880+ exc_type, exc_value, exc_traceback = sys.exc_info()
 1881+ if (self.verbose):
 1882+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
 1883+ else:
 1884+ self.debug("*** exception! " + str(ex))
18461885 item.setStatus("failed")
18471886 if item.status() == "failed":
18481887 self.runHandleFailure()
@@ -2071,8 +2110,9 @@
20722111 os.remove(os.path.join(latestDir,f))
20732112
20742113 class Dump(object):
2075 - def __init__(self, name, desc):
 2114+ def __init__(self, name, desc, verbose = False):
20762115 self._desc = desc
 2116+ self.verbose = verbose
20772117 self.progress = ""
20782118 self.runInfo = RunInfo(name,"waiting","")
20792119 self.dumpName = self.getDumpName()
@@ -2151,6 +2191,9 @@
21522192 try:
21532193 self.run(runner)
21542194 except Exception, ex:
 2195+ if (self.verbose):
 2196+ exc_type, exc_value, exc_traceback = sys.exc_info()
 2197+ print repr(traceback.format_exception(exc_type, exc_value, exc_traceback))
21552198 self.setStatus("failed")
21562199 raise ex
21572200 self.setStatus("done")
@@ -2860,7 +2903,7 @@
28612904
28622905 class XmlDump(Dump):
28632906 """Primary XML dumps, one section at a time."""
2864 - def __init__(self, subset, name, desc, detail, itemForStubs, prefetch, spawn, wiki, chunkToDo, chunks = False, checkpoints = False, checkpointFile = None, pageIDRange = None):
 2907+ def __init__(self, subset, name, desc, detail, itemForStubs, prefetch, spawn, wiki, chunkToDo, chunks = False, checkpoints = False, checkpointFile = None, pageIDRange = None, verbose = False):
28652908 self._subset = subset
28662909 self._detail = detail
28672910 self._desc = desc
@@ -2941,7 +2984,7 @@
29422985 else:
29432986 files = self.listRegularFilesPerChunkExisting(runner.dumpDir, self.getChunkList(), [ self.dumpName ])
29442987 for f in files:
2945 - f = DumpFile(self.wiki,runner.dumpDir.filenamePublicPath(f))
 2988+ f = DumpFile(self.wiki,runner.dumpDir.filenamePublicPath(f), None, self.verbose)
29462989 if (f.checkIfTruncated()):
29472990 runner.logAndPrint("file %s is truncated, moving out of the way" % f.filename )
29482991 f.rename( f.filename + ".truncated" )
@@ -3125,7 +3168,7 @@
31263169 if fileObj.isChunkFile and fileObj.chunkInt > maxchunks:
31273170 maxchunks = fileObj.chunkInt
31283171 if not fileObj.firstPageID:
3129 - f = DumpFile(self.wiki, runner.dumpDir.filenamePublicPath(fileObj, date), fileObj)
 3172+ f = DumpFile(self.wiki, runner.dumpDir.filenamePublicPath(fileObj, date), fileObj, self.verbose)
31303173 fileObj.firstPageID = f.findFirstPageIDInFile()
31313174
31323175 # get the files that cover our range
@@ -3219,6 +3262,28 @@
32203263 runner.debug("Could not locate a prefetchable dump.")
32213264 return None
32223265
 3266+ def listOutputFilesForCleanup(self, dumpDir, dumpNames = None):
 3267+ files = Dump.listOutputFilesForCleanup(self, dumpDir, dumpNames)
 3268+ filesToReturn = []
 3269+ if self.pageIDRange:
 3270+ if (',' in self.pageIDRange):
 3271+ ( firstPageID, lastPageID ) = self.pageIDRange.split(',',2)
 3272+ firstPageID = int(firstPageID)
 3273+ lastPageID = int(lastPageID)
 3274+ else:
 3275+ firstPageID = int(self.pageIDRange)
 3276+ lastPageID = None
 3277+ # filter any checkpoint files, removing from the list any with
 3278+ # page range outside of the page range this job will cover
 3279+ for f in files:
 3280+ if f.isCheckpointFile:
 3281+ if not firstPageID or (f.firstPageID and (int(f.firstPageID) >= firstPageID)):
 3282+ if not lastPageID or (f.lastPageID and (int(f.lastPageID) <= lastPageID)):
 3283+ filesToReturn.append(f)
 3284+ else:
 3285+ filesToReturn.append(f)
 3286+ return filesToReturn
 3287+
32233288 class RecombineXmlDump(XmlDump):
32243289 def __init__(self, name, desc, detail, itemForXmlDumps):
32253290 # no prefetch, no spawn
@@ -3690,7 +3755,11 @@
36913756 if message:
36923757 print message
36933758 print "Usage: python worker.py [options] [wikidbname]"
3694 - print "Options: --checkpoint, --chunk, --configfile, --date, --job, --addnotice, --delnotice, --force, --noprefetch, --nospawn, --restartfrom, --log"
 3759+ print "Options: --aftercheckpoint, --checkpoint, --chunk, --configfile, --date, --job, --addnotice, --delnotice, --force, --noprefetch, --nospawn, --restartfrom, --log"
 3760+ print "--aftercheckpoint: Restart thie job from the after specified checkpoint file, doing the"
 3761+ print " rest of the job for the appropriate chunk if chunks are configured"
 3762+ print " or for the all the rest of the revisions if no chunks are configured;"
 3763+ print " only for jobs articlesdump, metacurrentdump, metahistorybz2dump."
36953764 print "--checkpoint: Specify the name of the checkpoint file to rerun (requires --job,"
36963765 print " depending on the file this may imply --chunk)"
36973766 print "--chunk: Specify the number of the chunk to rerun (use with a specific job"
@@ -3721,6 +3790,8 @@
37223791 print "--restartfrom: Do all jobs after the one specified via --job, including that one"
37233792 print "--log: Log progress messages and other output to logfile in addition to"
37243793 print " the usual console output"
 3794+ print "--verbose: Print lots of stuff (includes printing full backtraces for any exception)"
 3795+ print " This is used primarily for debugging"
37253796
37263797 sys.exit(1)
37273798
@@ -3738,13 +3809,15 @@
37393810 htmlNotice = ""
37403811 dryrun = False
37413812 chunkToDo = False
 3813+ afterCheckpoint = False
37423814 checkpointFile = None
37433815 pageIDRange = None
37443816 result = False
 3817+ verbose = False
37453818
37463819 try:
37473820 (options, remainder) = getopt.gnu_getopt(sys.argv[1:], "",
3748 - ['date=', 'job=', 'configfile=', 'addnotice=', 'delnotice', 'force', 'dryrun', 'noprefetch', 'nospawn', 'restartfrom', 'log', 'chunk=', 'checkpoint=', 'pageidrange=' ])
 3821+ ['date=', 'job=', 'configfile=', 'addnotice=', 'delnotice', 'force', 'dryrun', 'noprefetch', 'nospawn', 'restartfrom', 'aftercheckpoint=', 'log', 'chunk=', 'checkpoint=', 'pageidrange=', 'verbose' ])
37493822 except:
37503823 usage("Unknown option specified")
37513824
@@ -3759,6 +3832,9 @@
37603833 chunkToDo = int(val)
37613834 elif opt == "--force":
37623835 forceLock = True
 3836+ elif opt == '--aftercheckpoint':
 3837+ afterCheckpoint = True
 3838+ checkpointFile = val
37633839 elif opt == "--noprefetch":
37643840 prefetch = False
37653841 elif opt == "--nospawn":
@@ -3777,6 +3853,8 @@
37783854 htmlNotice = False
37793855 elif opt == "--pageidrange":
37803856 pageIDRange = val
 3857+ elif opt == "--verbose":
 3858+ verbose = True
37813859
37823860 if dryrun and (len(remainder) == 0):
37833861 usage("--dryrun requires the name of a wikidb to be specified")
@@ -3834,7 +3912,20 @@
38353913 date = TimeUtils.today()
38363914 wiki.setDate(date)
38373915
3838 - runner = Runner(wiki, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo, checkpointFile, pageIDRange)
 3916+ if (afterCheckpoint):
 3917+ f = DumpFilename(wiki)
 3918+ f.newFromFilename(checkpointFile)
 3919+ if not f.isCheckpointFile:
 3920+ usage("--aftercheckpoint option requires the name of a checkpoint file, bad filename provided")
 3921+ pageIDRange = str( int(f.lastPageID) + 1 )
 3922+ chunkToDo = f.chunkInt
 3923+ # now we don't need this.
 3924+ checkpointFile = None
 3925+ afterCheckpointJobs = [ 'articlesdump', 'metacurrentdump', 'metahistorybz2dump' ]
 3926+ if not jobRequested or not jobRequested in [ 'articlesdump', 'metacurrentdump', 'metahistorybz2dump' ]:
 3927+ usage("--aftercheckpoint option requires --job option with one of %s" % ", ".join(afterCheckpointJobs))
 3928+
 3929+ runner = Runner(wiki, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo, checkpointFile, pageIDRange, verbose)
38393930 if (restart):
38403931 print "Running %s, restarting from job %s..." % (wiki.dbName, jobRequested)
38413932 elif (jobRequested):

Status & tagging log