r95275 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r95274‎ | r95275 | r95276 >
Date:23:16, 22 August 2011
Author:ariel
Status:deferred
Tags:
Comment:
In CommandManagement.py:
* single monitor watches parallel processes (so that output of these processes
is received by one thread instead of multiple threads)
In WikiDump.py:
* routine to write file in place instead of using temp file
* checkpoint file options in WikiDump.py
In worker.py:
* lose some extraneous semicolons
* fix some split() calls
* write various status files in place instead of using temp files,
writing from one thread so there is no issue with collisions
* support for writing checkpoint files for text pass of xml dumps,
recompression of text pass, and recombination of text pass
* support for checkpoint files in prefetch, cleanup of prefetch code
* for all recombine or other jobs that rely on output from previous jobs,
pass a pointer to the previous job as an argument, so various information
can be retrieved from it
* all jobs that aren't table dumps are now directly derived from Dump class
* complete rewrite of file handling, remove a bunch of hardcoded file types
and extensions, move a bunch of path-related stuff
into DumpDir class and add listing of filenames in the directory,
new DumpFilename class for dealing with all the bits and pieces of
filenames (chunks, temp file? checkpoint? extension, etc), change all
code to use these functions and classes
* clean up old output files every time (needed anyways now that we might have
old checkpoint files lying around)

Note that there are some unused dir listing functions in here which will
either be put into use later or tossed; there's still other related cleanup
to do but the code works so in it goes.
Modified paths:
  • /branches/ariel/xmldumps-backup/CommandManagement.py (modified) (history)
  • /branches/ariel/xmldumps-backup/WikiDump.py (modified) (history)
  • /branches/ariel/xmldumps-backup/worker.py (modified) (history)

Diff [purge]

Index: branches/ariel/xmldumps-backup/WikiDump.py
@@ -20,7 +20,9 @@
2121 return os.fdopen(fd, mode)
2222
2323 def writeFile(dirname, filename, text, perms = 0):
24 - """Write text to a file, as atomically as possible, via a temporary file in the same directory."""
 24+ """Write text to a file, as atomically as possible, via a temporary file in a specified directory.
 25+ Arguments: dirname = where temp file is created, filename = full path to actual file, text = contents
 26+ to write to file, perms = permissions that the file will have after creation"""
2527
2628 (fd, tempFilename ) = tempfile.mkstemp("_txt","wikidump_",dirname);
2729 os.write(fd,text)
@@ -31,6 +33,19 @@
3234 # Of course nothing else will work on Windows. ;)
3335 os.rename(tempFilename, filename)
3436
 37+ def writeFileInPlace(filename, text, perms = 0):
 38+ """Write text to a file, after opening it for write with truncation.
 39+ This assumes that only one process or thread accesses the given file at a time.
 40+ Arguments: filename = full path to actual file, text = contents
 41+ to write to file, perms = permissions that the file will have after creation,
 42+ if it did not exist already"""
 43+
 44+ file = open(filename, "wt")
 45+ file.write(text)
 46+ file.close()
 47+ if (perms):
 48+ os.chmod(filename,perms)
 49+
3550 def readFile(filename):
3651 """Read text from a file in one fell swoop."""
3752 file = open(filename, "r")
@@ -75,6 +90,7 @@
7691 fileAge = staticmethod(fileAge)
7792 atomicCreate = staticmethod(atomicCreate)
7893 writeFile = staticmethod(writeFile)
 94+ writeFileInPlace = staticmethod(writeFileInPlace)
7995 readFile = staticmethod(readFile)
8096 splitPath = staticmethod(splitPath)
8197 relativePath = staticmethod(relativePath)
@@ -140,7 +156,6 @@
141157 #"wiki": {
142158 "dblist": "",
143159 "privatelist": "",
144 - "biglist": "",
145160 "flaggedrevslist": "",
146161 # "dir": "",
147162 "forcenormal": "0",
@@ -149,6 +164,7 @@
150165 #"output": {
151166 "public": "/dumps/public",
152167 "private": "/dumps/private",
 168+ "temp":"/dumps/temp",
153169 "webroot": "http://localhost/dumps",
154170 "index": "index.html",
155171 "templatedir": home,
@@ -194,6 +210,9 @@
195211 "pagesPerChunkAbstract" : False,
196212 # whether or not to recombine the history pieces
197213 "recombineHistory" : "1",
 214+ # do we write out checkpoint files at regular intervals? (article/metacurrent/metahistory
 215+ # dumps only.)
 216+ "checkpointTime" : "0",
198217 }
199218 self.conf = ConfigParser.SafeConfigParser(defaults)
200219 self.conf.read(self.files)
@@ -213,7 +232,6 @@
214233 self.dbList = MiscUtils.dbList(self.conf.get("wiki", "dblist"))
215234 self.skipDbList = MiscUtils.dbList(self.conf.get("wiki", "skipdblist"))
216235 self.privateList = MiscUtils.dbList(self.conf.get("wiki", "privatelist"))
217 - self.bigList = MiscUtils.dbList(self.conf.get("wiki", "biglist"))
218236 self.flaggedRevsList = MiscUtils.dbList(self.conf.get("wiki", "flaggedrevslist"))
219237 self.wikiDir = self.conf.get("wiki", "dir")
220238 self.forceNormal = self.conf.getint("wiki", "forceNormal")
@@ -225,6 +243,7 @@
226244 self.conf.add_section('output')
227245 self.publicDir = self.conf.get("output", "public")
228246 self.privateDir = self.conf.get("output", "private")
 247+ self.tempDir = self.conf.get("output", "temp")
229248 self.webRoot = self.conf.get("output", "webroot")
230249 self.index = self.conf.get("output", "index")
231250 self.templateDir = self.conf.get("output", "templateDir")
@@ -279,6 +298,7 @@
280299 self.revsPerChunkHistory = self.getOptionForProjectOrDefault(conf, "chunks","revsPerChunkHistory",0)
281300 self.pagesPerChunkAbstract = self.getOptionForProjectOrDefault(conf, "chunks","pagesPerChunkAbstract",0)
282301 self.recombineHistory = self.getOptionForProjectOrDefault(conf, "chunks","recombineHistory",1)
 302+ self.checkpointTime = self.getOptionForProjectOrDefault(conf, "chunks","checkpointTime",1)
283303
284304 def getOptionForProjectOrDefault(self, conf, sectionName, itemName, isInt):
285305 if (conf.has_section(self.projectName)):
@@ -366,9 +386,6 @@
367387 def isPrivate(self):
368388 return self.dbName in self.config.privateList
369389
370 - def isBig(self):
371 - return self.dbName in self.config.bigList
372 -
373390 def hasFlaggedRevs(self):
374391 return self.dbName in self.config.flaggedRevsList
375392
@@ -395,7 +412,7 @@
396413
397414 def privateDir(self):
398415 return os.path.join(self.config.privateDir, self.dbName)
399 -
 416+
400417 def webDir(self):
401418 return "/".join((self.config.webRoot, self.dbName))
402419
@@ -437,7 +454,7 @@
438455 def writePerDumpIndex(self, html):
439456 directory = os.path.join(self.publicDir(), self.date)
440457 index = os.path.join(self.publicDir(), self.date, self.config.perDumpIndex)
441 - FileUtils.writeFile(directory, index, html, self.config.fileperms)
 458+ FileUtils.writeFileInPlace(index, html, self.config.fileperms)
442459
443460 def existsPerDumpIndex(self):
444461 index = os.path.join(self.publicDir(), self.date, self.config.perDumpIndex)
@@ -446,7 +463,7 @@
447464 def writeStatus(self, message):
448465 directory = os.path.join(self.publicDir(), self.date)
449466 index = os.path.join(self.publicDir(), self.date, "status.html")
450 - FileUtils.writeFile(directory, index, message, self.config.fileperms)
 467+ FileUtils.writeFileInPlace(index, message, self.config.fileperms)
451468
452469 def statusLine(self):
453470 date = self.latestDump()
Index: branches/ariel/xmldumps-backup/worker.py
@@ -19,7 +19,7 @@
2020 import Queue
2121 import thread
2222
23 -from os.path import dirname, exists, getsize, join, realpath
 23+from os.path import exists
2424 from subprocess import Popen, PIPE
2525 from WikiDump import FileUtils, MiscUtils, TimeUtils
2626 from CommandManagement import CommandPipeline, CommandSeries, CommandsInParallel
@@ -164,7 +164,7 @@
165165 def defaultServer(self):
166166 # if this fails what do we do about it? Not a bleeping thing. *ugh* FIXME!!
167167 if (not exists( self.wiki.config.php ) ):
168 - raise BackupError("php command %s not found" % self.wiki.config.php);
 168+ raise BackupError("php command %s not found" % self.wiki.config.php)
169169 command = "%s -q %s/maintenance/getSlaveServer.php --wiki=%s --group=dump" % MiscUtils.shellEscape((
170170 self.wiki.config.php, self.wiki.config.wikiDir, self.dbName))
171171 return RunSimpleCommand.runAndReturn(command, self.errorCallback).strip()
@@ -175,7 +175,7 @@
176176 def buildSqlCommand(self, query, pipeto = None):
177177 """Put together a command to execute an sql query to the server for this DB."""
178178 if (not exists( self.wiki.config.mysql ) ):
179 - raise BackupError("mysql command %s not found" % self.wiki.config.mysql);
 179+ raise BackupError("mysql command %s not found" % self.wiki.config.mysql)
180180 command = [ [ "/bin/echo", "%s" % query ],
181181 [ "%s" % self.wiki.config.mysql, "-h",
182182 "%s" % self.dbServer,
@@ -191,7 +191,7 @@
192192 """Put together a command to dump a table from the current DB with mysqldump
193193 and save to a gzipped sql file."""
194194 if (not exists( self.wiki.config.mysqldump ) ):
195 - raise BackupError("mysqldump command %s not found" % self.wiki.config.mysqldump);
 195+ raise BackupError("mysqldump command %s not found" % self.wiki.config.mysqldump)
196196 command = [ [ "%s" % self.wiki.config.mysqldump, "-h",
197197 "%s" % self.dbServer, "-u",
198198 "%s" % self.wiki.config.dbUser,
@@ -225,7 +225,7 @@
226226 """Get the prefix for all tables for the specific wiki ($wgDBprefix)"""
227227 # FIXME later full path
228228 if (not exists( self.wiki.config.php ) ):
229 - raise BackupError("php command %s not found" % self.wiki.config.php);
 229+ raise BackupError("php command %s not found" % self.wiki.config.php)
230230 command = "echo 'print $wgDBprefix; ' | %s -q %s/maintenance/eval.php --wiki=%s" % MiscUtils.shellEscape((
231231 self.wiki.config.php, self.wiki.config.wikiDir, self.dbName))
232232 return RunSimpleCommand.runAndReturn(command, self.errorCallback).strip()
@@ -369,16 +369,16 @@
370370
371371 def _getDumpRunInfoDirName(self, date=None):
372372 if (date):
373 - return os.path.join(self.wiki.publicDir(), date);
 373+ return os.path.join(self.wiki.publicDir(), date)
374374 else:
375 - return os.path.join(self.wiki.publicDir(), self.wiki.date);
 375+ return os.path.join(self.wiki.publicDir(), self.wiki.date)
376376
377377 # format: name:%; updated:%; status:%
378378 def _getOldRunInfoFromLine(self, line):
379379 # get rid of leading/trailing/blanks
380380 line = line.strip(" ")
381381 line = line.replace("\n","")
382 - fields = line.split(';',3)
 382+ fields = line.split(';',2)
383383 dumpRunInfo = RunInfo()
384384 for field in fields:
385385 field = field.strip(" ")
@@ -394,14 +394,15 @@
395395 def _writeDumpRunInfoFile(self, text):
396396 directory = self._getDumpRunInfoDirName()
397397 dumpRunInfoFilename = self._getDumpRunInfoFileName()
398 - FileUtils.writeFile(directory, dumpRunInfoFilename, text, self.wiki.config.fileperms)
 398+# FileUtils.writeFile(directory, dumpRunInfoFilename, text, self.wiki.config.fileperms)
 399+ FileUtils.writeFile(self.wiki.config.tempDir, dumpRunInfoFilename, text, self.wiki.config.fileperms)
399400
400401 # format: name:%; updated:%; status:%
401402 def _getStatusForJobFromRunInfoFileLine(self, line, jobName):
402403 # get rid of leading/trailing/embedded blanks
403404 line = line.replace(" ","")
404405 line = line.replace("\n","")
405 - fields = line.split(';',3)
 406+ fields = line.split(';',2)
406407 for field in fields:
407408 (fieldName, separator, fieldValue) = field.partition(':')
408409 if (fieldName == "name"):
@@ -484,18 +485,21 @@
485486 self._toBeRun = toBeRun
486487
487488 class DumpItemList(object):
488 - def __init__(self, wiki, prefetch, spawn, date, chunkToDo, singleJob, chunkInfo, runInfoFile):
489 - self.date = date
 489+ def __init__(self, wiki, prefetch, spawn, chunkToDo, singleJob, chunkInfo, runInfoFile, dumpDir):
490490 self.wiki = wiki
491491 self._hasFlaggedRevs = self.wiki.hasFlaggedRevs()
492 - self._isBig = self.wiki.isBig()
493492 self._prefetch = prefetch
494493 self._spawn = spawn
495494 self.chunkInfo = chunkInfo
496495 self._chunkToDo = chunkToDo
497496 self._singleJob = singleJob
498497 self._runInfoFile = runInfoFile
499 -
 498+ self.dumpDir = dumpDir
 499+ if self.wiki.config.checkpointTime:
 500+ checkpoints = True
 501+ else:
 502+ checkpoints = False
 503+
500504 if (self._singleJob and self._chunkToDo):
501505 if (self._singleJob[-5:] == 'table' or
502506 self._singleJob[-9:] == 'recombine' or
@@ -540,29 +544,29 @@
541545 AbstractDump("abstractsdump","Extracted page abstracts for Yahoo", self._getChunkToDo("abstractsdump"), self.chunkInfo.getPagesPerChunkAbstract())]
542546
543547 if (self.chunkInfo.chunksEnabled()):
544 - self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self.chunkInfo.getPagesPerChunkAbstract()))
 548+ self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self.findItemByName('abstractsdump')))
545549
546550 self.dumpItems.append(XmlStub("xmlstubsdump", "First-pass for page XML data dumps", self._getChunkToDo("xmlstubsdump"), self.chunkInfo.getPagesPerChunkHistory()))
547551 if (self.chunkInfo.chunksEnabled()):
548 - self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps", self.chunkInfo.getPagesPerChunkHistory()))
 552+ self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps",self.findItemByName('xmlstubsdump')))
549553
550554 # NOTE that the chunkInfo thing passed here is irrelevant, these get generated from the stubs which are all done in one pass
551555 self.dumpItems.append(
552556 XmlDump("articles",
553557 "articlesdump",
554558 "<big><b>Articles, templates, image descriptions, and primary meta-pages.</b></big>",
555 - "This contains current versions of article content, and is the archive most mirror sites will probably want.", self._prefetch, self._spawn, self._getChunkToDo("articlesdump"), self.chunkInfo.getPagesPerChunkHistory()))
 559+ "This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("articlesdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints))
556560 if (self.chunkInfo.chunksEnabled()):
557 - self.dumpItems.append(RecombineXmlDump("articles","articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.chunkInfo.getPagesPerChunkHistory()))
558 -
 561+ self.dumpItems.append(RecombineXmlDump("articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('articlesdump')))
 562+
559563 self.dumpItems.append(
560564 XmlDump("meta-current",
561565 "metacurrentdump",
562566 "All pages, current versions only.",
563 - "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._prefetch, self._spawn, self._getChunkToDo("metacurrentdump"), self.chunkInfo.getPagesPerChunkHistory()))
 567+ "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("metacurrentdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints))
564568
565569 if (self.chunkInfo.chunksEnabled()):
566 - self.dumpItems.append(RecombineXmlDump("meta-current","metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.chunkInfo.getPagesPerChunkHistory()))
 570+ self.dumpItems.append(RecombineXmlDump("metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.findItemByName('metacurrentdump')))
567571
568572 self.dumpItems.append(
569573 XmlLogging("Log events to all pages."))
@@ -572,34 +576,31 @@
573577 PublicTable( "flaggedpages", "flaggedpagestable","This contains a row for each flagged article, containing the stable revision ID, if the lastest edit was flagged, and how long edits have been pending." ))
574578 self.dumpItems.append(
575579 PublicTable( "flaggedrevs", "flaggedrevstable","This contains a row for each flagged revision, containing who flagged it, when it was flagged, reviewer comments, the flag values, and the quality tier those flags fall under." ))
576 -
577 - if not self._isBig:
 580+
 581+ self.dumpItems.append(
 582+ BigXmlDump("meta-history",
 583+ "metahistorybz2dump",
 584+ "All pages with complete page edit history (.bz2)",
 585+ "These dumps can be *very* large, uncompressing up to 20 times the archive download size. " +
 586+ "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("metahistorybz2dump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints))
 587+ if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()):
578588 self.dumpItems.append(
579 - BigXmlDump("meta-history",
580 - "metahistorybz2dump",
581 - "All pages with complete page edit history (.bz2)",
582 - "These dumps can be *very* large, uncompressing up to 20 times the archive download size. " +
583 - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._prefetch, self._spawn, self._getChunkToDo("metahistorybz2dump"), self.chunkInfo.getPagesPerChunkHistory()))
584 - if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()):
585 - self.dumpItems.append(
586 - RecombineXmlDump("meta-history",
587 - "metahistorybz2dumprecombine",
588 - "Recombine all pages with complete edit history (.bz2)",
589 - "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
590 - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory()))
 589+ RecombineXmlDump("metahistorybz2dumprecombine",
 590+ "Recombine all pages with complete edit history (.bz2)",
 591+ "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
 592+ "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistorybz2dump')))
 593+ self.dumpItems.append(
 594+ XmlRecompressDump("meta-history",
 595+ "metahistory7zdump",
 596+ "All pages with complete edit history (.7z)",
 597+ "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
 598+ "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistorybz2dump'), self.wiki, self._getChunkToDo("metahistory7zdump"), self.chunkInfo.getPagesPerChunkHistory()))
 599+ if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()):
591600 self.dumpItems.append(
592 - XmlRecompressDump("meta-history",
593 - "metahistory7zdump",
594 - "All pages with complete edit history (.7z)",
595 - "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
596 - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._getChunkToDo("metahistory7zdump"), self.chunkInfo.getPagesPerChunkHistory()))
597 - if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()):
598 - self.dumpItems.append(
599 - RecombineXmlRecompressDump("meta-history",
600 - "metahistory7zdumprecombine",
601 - "Recombine all pages with complete edit history (.7z)",
602 - "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
603 - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory()))
 601+ RecombineXmlRecompressDump("metahistory7zdumprecombine",
 602+ "Recombine all pages with complete edit history (.7z)",
 603+ "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
 604+ "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistory7zdump'), self.wiki))
604605 results = self._runInfoFile.getOldRunInfoFromFile()
605606 if (results):
606607 for runInfoObj in results:
@@ -655,6 +656,12 @@
656657 for j in range(i,len(self.dumpItems)):
657658 self.dumpItems[j].setToBeRun(True)
658659 break
 660+
 661+ def findItemByName(self, name):
 662+ for item in self.dumpItems:
 663+ if (item.name() == name):
 664+ return item
 665+ return None
659666
660667 # see whether job needs previous jobs that have not completed successfully
661668 def jobDoneSuccessfully(self, job):
@@ -734,12 +741,16 @@
735742 checksumFileName = self._getChecksumFileNameTmp()
736743 output = file(checksumFileName, "w")
737744
738 - def checksum(self, filename, runner):
 745+ def checksum(self, fileObj, runner):
739746 """Run checksum for an output file, and append to the list."""
740747 if (self._enabled):
741748 checksumFileName = self._getChecksumFileNameTmp()
742749 output = file(checksumFileName, "a")
743 - self._saveChecksum(filename, output, runner)
 750+ runner.debug("Checksumming %s" % fileObj.filename)
 751+ dumpfile = DumpFile(self.wiki, runner.dumpDir.filenamePublicPath(fileObj))
 752+ checksum = dumpfile.md5Sum()
 753+ if checksum != None:
 754+ output.write( "%s %s\n" % (checksum, fileObj.filename))
744755 output.close()
745756
746757 def moveMd5FileIntoPlace(self):
@@ -753,7 +764,7 @@
754765 tmpFileName = self._getChecksumFileNameTmp()
755766 realFileName = self._getChecksumFileName()
756767 text = FileUtils.readFile(tmpFileName)
757 - FileUtils.writeFile(self._getMd5FileDirName(), realFileName, text, self.wiki.config.fileperms)
 768+ FileUtils.writeFile(self.wiki.config.tempDir, realFileName, text, self.wiki.config.fileperms)
758769
759770 def getChecksumFileNameBasename(self):
760771 return ("md5sums.txt")
@@ -762,14 +773,382 @@
763774 # functions internal to the class
764775 #
765776 def _getChecksumFileName(self):
766 - return (self.dumpDir.publicPath(self.getChecksumFileNameBasename()))
 777+ fileObj = DumpFilename(self.wiki, None, self.getChecksumFileNameBasename())
 778+ return (self.dumpDir.filenamePublicPath(fileObj))
767779
768780 def _getChecksumFileNameTmp(self):
769 - return (self.dumpDir.publicPath(self.getChecksumFileNameBasename() + "." + self.timestamp + ".tmp"))
 781+ fileObj = DumpFilename(self.wiki, None, self.getChecksumFileNameBasename() + "." + self.timestamp + ".tmp")
 782+ return (self.dumpDir.filenamePublicPath(fileObj))
770783
771 - def _md5File(self, filename):
 784+ def _getMd5FileDirName(self):
 785+ return os.path.join(self.wiki.publicDir(), self.wiki.date)
 786+
 787+class DumpDir(object):
 788+ def __init__(self, wiki, dbName):
 789+ self._wiki = wiki
 790+ self._dbName = dbName
 791+ self._dirCache = {}
 792+ self._dirCacheTime = {}
 793+ self._chunkFileCache = {}
 794+ self._checkpointFileCache = {}
 795+
 796+ def filenamePrivatePath(self, dumpFile, dateString = None):
 797+ """Given a DumpFilename object, produce the full path to the filename in the date subdir
 798+ of the the private dump dir for the selected database.
 799+ If a different date is specified, use that instead"""
 800+ if (not dateString):
 801+ dateString = self._wiki.date
 802+ return os.path.join(self._wiki.privateDir(), dateString, dumpFile.filename)
 803+
 804+ def filenamePublicPath(self, dumpFile, dateString = None):
 805+ """Given a DumpFilename object produce the full path to the filename in the date subdir
 806+ of the public dump dir for the selected database.
 807+ If this database is marked as private, use the private dir instead.
 808+ If a different date is specified, use that instead"""
 809+ if (not dateString):
 810+ dateString = self._wiki.date
 811+ return os.path.join(self._wiki.publicDir(), dateString, dumpFile.filename)
 812+
 813+ def latestDir(self):
 814+ """Return 'latest' directory for the current project being dumped, e.g.
 815+ if the current project is enwiki, this would return something like
 816+ /mnt/data/xmldatadumps/public/enwiki/latest (if the directory /mnt/data/xmldatadumps/public
 817+ is the path to the directory for public dumps)."""
 818+ return os.path.join(self._wiki.publicDir(), "latest")
 819+
 820+ def webPath(self, dumpFile, dateString = None):
 821+ """Given a DumpFilename object produce the full url to the filename for the date of
 822+ the dump for the selected database."""
 823+ if (not dateString):
 824+ dateString = self._wiki.date
 825+ return os.path.join(self._wiki.webDir(), dateString, dumpFile.filename)
 826+
 827+ def dirCacheOutdated(self, date):
 828+ if not date:
 829+ date = self._wiki.date
 830+ directory = os.path.join(self._wiki.publicDir(), date)
 831+ dirTimeStamp = os.stat(directory).st_mtime
 832+ if (not date in self._dirCache or dirTimeStamp > self._dirCacheTime[date]):
 833+ return True
 834+ else:
 835+ return False
 836+
 837+ # warning: date can also be "latest"
 838+ def getFilesInDir(self, date = None):
 839+ if not date:
 840+ date = self._wiki.date
 841+ if (self.dirCacheOutdated(date)):
 842+ directory = os.path.join(self._wiki.publicDir(),date)
 843+ dirTimeStamp = os.stat(directory).st_mtime
 844+ files = os.listdir(directory)
 845+ fileObjs = []
 846+ for f in files:
 847+ fileObj = DumpFilename(self._wiki)
 848+ fileObj.newFromFilename(f)
 849+ fileObjs.append(fileObj)
 850+ self._dirCache[date] = fileObjs
 851+ self._dirCacheTime[date] = dirTimeStamp
 852+ return(self._dirCache[date])
 853+
 854+ # list all files that exist, filtering by the given args.
 855+ # if we get None for an arg then we accept all values for that arg in the filename, including missing
 856+ # if we get False for an arg (chunk, temp, checkpoint), we reject any filename which contains a value for that arg
 857+ # if we get True for an arg (chunk, temp, checkpoint), we include only filenames which contain a value for that arg
 858+ # chunks should be a list of value(s) or True / False / None
 859+ #
 860+ # note that we ignore files with ".truncated". these are known to be bad.
 861+ def _getFilesFiltered(self, date = None, dumpName = None, fileType = None, fileExt = None, chunks = None, temp = None, checkpoint = None ):
 862+ if not date:
 863+ date = self._wiki.date
 864+ fileObjs = self.getFilesInDir(date)
 865+ filesMatched = []
 866+ for f in fileObjs:
 867+ # fixme this is a bit hackish
 868+ if f.filename.endswith("truncated"):
 869+ continue
 870+
 871+ if dumpName and f.dumpName != dumpName:
 872+ continue
 873+ if fileType != None and f.fileType != fileType:
 874+ continue
 875+ if fileExt != None and f.fileExt != fileExt:
 876+ continue
 877+ if (chunks == False and f.isChunkFile):
 878+ continue
 879+ if (chunks == True and not f.isChunkFile):
 880+ continue
 881+ # chunks is a list...
 882+ if (chunks and chunks != True and not f.chunkInt in chunks):
 883+ continue
 884+ if (temp == False and f.isTempFile) or (temp and not f.isTempFile):
 885+ continue
 886+ if (checkpoint == False and f.isCheckpointFile) or (checkpoint and not f.isCheckpointFile):
 887+ continue
 888+ filesMatched.append(f)
 889+ self.sort_fileobjs(filesMatched)
 890+ return filesMatched
 891+
 892+ # taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net)
 893+ def sort_fileobjs(self, l):
 894+ """ Sort the given list in the way that humans expect.
 895+ """
 896+ convert = lambda text: int(text) if text.isdigit() else text
 897+ alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key.filename) ]
 898+ l.sort( key=alphanum_key )
 899+
 900+ # list all checkpoint files that exist, filtering by the given args.
 901+ # if we get None for an arg then we accept all values for that arg in the filename
 902+ # if we get False for an arg (chunks, temp), we reject any filename which contains a value for that arg
 903+ # if we get True for an arg (chunk, temp), we accept only filenames which contain a value for the arg
 904+ # chunks should be a list of value(s), or True / False / None
 905+ def getCheckpointFilesExisting(self, date = None, dumpName = None, fileType = None, fileExt = None, chunks = False, temp = False ):
 906+ return self._getFilesFiltered(date, dumpName, fileType, fileExt, chunks, temp, checkpoint = True)
 907+
 908+ # list all non-checkpoint files that exist, filtering by the given args.
 909+ # if we get None for an arg then we accept all values for that arg in the filename
 910+ # if we get False for an arg (chunk, temp), we reject any filename which contains a value for that arg
 911+ # if we get True for an arg (chunk, temp), we accept only filenames which contain a value for the arg
 912+ # chunks should be a list of value(s), or True / False / None
 913+ def getRegularFilesExisting(self, date = None, dumpName = None, fileType = None, fileExt = None, chunks = False, temp = False ):
 914+ return self._getFilesFiltered(date, dumpName, fileType, fileExt, chunks, temp, checkpoint = False)
 915+
 916+
 917+class DumpFilename(object):
 918+ """
 919+ filename without directory name, and the methods that go with it,
 920+ primarily for filenames that follow the standard naming convention, i.e.
 921+ projectname-date-dumpName.sql/xml.gz/bz2/7z (possibly with a chunk
 922+ number, possibly with start/end page id information embedded in the name).
 923+
 924+ Constructor:
 925+ DumpFilename(dumpName, date = None, filetype, ext, chunk = None, checkpoint = None, temp = False) -- pass in dumpName and
 926+ filetype/extension at least. filetype is one of xml sql, extension is one of
 927+ bz2/gz/7z. Or you can pass in the entire string without project name and date,
 928+ e.g. pages-meta-history5.xml.bz2
 929+ If dumpName is not passed, no member variables will be initialized, and
 930+ the caller is expected to invoke newFromFilename as an alternate
 931+ constructor before doing anything else with the object.
 932+
 933+ newFromFilename(filename) -- pass in full filename. This is called by the regular constructor and is
 934+ what sets all attributes
 935+
 936+ attributes:
 937+
 938+ isCheckpointFile filename of form dbname-date-dumpName-pxxxxpxxxx.xml.bz2
 939+ isChunkFile filename of form dbname-date-dumpNamex.xml.gz/bz2/7z
 940+ isTempFile filename of form dbname-date-dumpName.xml.gz/bz2/7z-tmp
 941+ firstPageID for checkpoint files, taken from value in filename
 942+ lastPageID for checkpoint files, value taken from filename
 943+ filename full filename
 944+ basename part of the filename after the project name and date (for
 945+ "enwiki-20110722-pages-meta-history12.xml.bz2" this would be
 946+ "pages-meta-history12.xml.bz2")
 947+ fileExt extension (everything after the last ".") of the file
 948+ date date embedded in filename
 949+ dumpName dump name embedded in filename (eg "pages-meta-history"), if any
 950+ chunk chunk number of file as string (for "pages-meta-history5.xml.bz2" this would be "5")
 951+ chinkInt chunk number as int
 952+ """
 953+
 954+ def __init__(self, wiki, date = None, dumpName = None, filetype = None, ext = None, chunk = None, checkpoint = None, temp = False):
 955+ """Constructor. Arguments: the dump name as it should appear in the filename,
 956+ the date if different than the date of the dump run, the chunk number
 957+ if there is one, and temp which is true if this is a temp file (ending in "-tmp")
 958+ Alternatively, one can leave off all other other stuff and just pass the entire
 959+ filename minus the dbname and the date. Returns true on success, false otherwise.."""
 960+ self.wiki = wiki
 961+ # if dumpName is not set, the caller can call newFromFilename to initialize various values instead
 962+ if dumpName:
 963+ filename = self.newFilename(dumpName, filetype, ext, date, chunk, checkpoint, temp)
 964+ self.newFromFilename(filename)
 965+
 966+ def isExt(self,ext):
 967+ if ext == "gz" or ext == "bz2" or ext == "7z" or ext == "html" or ext == "txt":
 968+ return True
 969+ else:
 970+ return False
 971+
 972+ # returns True if successful, False otherwise (filename is not in the canonical form that we manage)
 973+ def newFromFilename(self, filename):
 974+ """Constructor. Arguments: the full file name including the chunk, the extension, etc BUT NOT the dir name. """
 975+ self.filename = filename
 976+
 977+ # example filenames:
 978+ # elwikidb-20110729-all-titles-in-ns0.gz
 979+ # elwikidb-20110729-abstract.xml
 980+ # elwikidb-20110727-pages-meta-history2.xml-p000048534p000051561.bz2
 981+
 982+ # we need to handle cases without the projectname-date stuff in them too, as this gets used
 983+ # for all files now
 984+ if self.filename.endswith("-tmp"):
 985+ self.isTempFile = True
 986+ self.temp = "-tmp"
 987+ else:
 988+ self.isTempFile = False
 989+ self.temp = None
 990+
 991+ if ('.' in self.filename):
 992+ (fileBase, self.fileExt) = self.filename.rsplit('.',1)
 993+ if (self.temp):
 994+ self.fileExt = self.fileExt[:-4];
 995+ else:
 996+ self.dbName = None
 997+ self.date = None
 998+ self.dumpName = None
 999+ self.filePrefix = ""
 1000+ self.filePrefixLength = 0
 1001+ self.isChunkFile = False
 1002+ self.isCheckpointFile = False
 1003+ self.checkpoint = None
 1004+ self.firstPageID = None
 1005+ self.lastPageID = None
 1006+ self.isTempFile = False
 1007+ self.fileExt = None
 1008+ self.fileType = None
 1009+ return False
 1010+
 1011+ # FIXME could have -tmp at the end, when do we look for that??
 1012+
 1013+ if not self.isExt(self.fileExt):
 1014+ self.fileType = self.fileExt
 1015+# self.fileExt = None
 1016+ self.fileExt = ""
 1017+ else:
 1018+ if '.' in fileBase:
 1019+ (fileBase, self.fileType) = fileBase.split('.',1)
 1020+
 1021+ # some files are not of this form, we skip them
 1022+ if not '-' in fileBase:
 1023+ self.dbName = None
 1024+ self.date = None
 1025+ self.dumpName = None
 1026+ self.filePrefix = ""
 1027+ self.filePrefixLength = 0
 1028+ self.isChunkFile = False
 1029+ self.isCheckpointFile = False
 1030+ self.checkpoint = None
 1031+ self.firstPageID = None
 1032+ self.lastPageID = None
 1033+ self.isTempFile = False
 1034+ self.temp = None
 1035+ return False
 1036+
 1037+ (self.dbName, self.date, self.dumpName) = fileBase.split('-',2)
 1038+ if not self.date or not self.dumpName:
 1039+ self.dbName = None
 1040+ self.date = None
 1041+ self.dumpName = fileBase
 1042+ self.filePrefix = ""
 1043+ self.filePrefixLength = 0
 1044+ else:
 1045+ self.filePrefix = "%s-%s-" % (self.dbName, self.date)
 1046+ self.filePrefixLength = len(self.filePrefix)
 1047+
 1048+ if self.filename.startswith(self.filePrefix):
 1049+ self.basename = self.filename[self.filePrefixLength:]
 1050+ else:
 1051+ self.basename = None
 1052+
 1053+ self.checkpointPattern = "-p(?P<first>[0-9]+)p(?P<last>[0-9]+)\." + self.fileExt + "$"
 1054+ self.compiledCheckpointPattern = re.compile(self.checkpointPattern)
 1055+ result = self.compiledCheckpointPattern.search(self.filename)
 1056+
 1057+ if result:
 1058+ self.isCheckpointFile = True
 1059+ self.firstPageID = result.group('first')
 1060+ self.lastPageID = result.group('last')
 1061+ self.checkpoint = "p" + self.firstPageID + "p" + self.lastPageID
 1062+ if self.fileType and self.fileType.endswith("-" + self.checkpoint):
 1063+ self.fileType = self.fileType[:-1 * ( len(self.checkpoint) + 1 ) ]
 1064+ else:
 1065+ self.isCheckpointFile = False
 1066+ self.checkpoint = None
 1067+ self.firstPageID = None
 1068+ self.lastPageID = None
 1069+
 1070+ self.chunkPattern = "(?P<chunk>[0-9]+)$"
 1071+ self.compiledChunkPattern = re.compile(self.chunkPattern)
 1072+ result = self.compiledChunkPattern.search(self.dumpName)
 1073+ if result:
 1074+ self.isChunkFile = True
 1075+ self.chunk = result.group('chunk')
 1076+ self.chunkInt = int(self.chunk)
 1077+ # the dumpName has the chunk in it so lose it
 1078+ self.dumpName = self.dumpName.rstrip('0123456789')
 1079+ else:
 1080+ self.isChunkFile = False
 1081+ self.chunk = None
 1082+ self.chunkInt = 0
 1083+
 1084+ return True
 1085+
 1086+ def newFilename(self, dumpName, filetype, ext, date = None, chunk = None, checkpoint = None, temp = None):
 1087+ if not chunk:
 1088+ chunk = ""
 1089+ if not date:
 1090+ date = self.wiki.date
 1091+ # fixme do the right thing in case no filetype or no ext
 1092+ parts = []
 1093+ parts.append(self.wiki.dbName + "-" + date + "-" + dumpName + "%s" % chunk)
 1094+ if checkpoint:
 1095+ filetype = filetype + "-" + checkpoint
 1096+ if filetype:
 1097+ parts.append(filetype)
 1098+ if ext:
 1099+ parts.append(ext)
 1100+ filename = ".".join(parts)
 1101+ if temp:
 1102+ filename = filename + "-tmp"
 1103+ return filename
 1104+
 1105+class DumpFile(file):
 1106+ """File containing output created by any job of a jump run. This includes
 1107+ any file that follows the standard naming convention, i.e.
 1108+ projectname-date-dumpName.sql/xml.gz/bz2/7z (possibly with a chunk
 1109+ number, possibly with start/end page id information embedded in the name).
 1110+
 1111+ Methods:
 1112+
 1113+ md5Sum(): return md5sum of the file contents.
 1114+ checkIfTruncated(): for compressed files, check if the file is truncated (stops
 1115+ abruptly before the end of the compressed data) or not, and set and return
 1116+ self.isTruncated accordingly. This is fast for bzip2 files
 1117+ and slow for gz and 7z fles, since for the latter two types it must serially
 1118+ read through the file to determine if it is truncated or not.
 1119+ getSize(): returns the current size of the file in bytes
 1120+ rename(newname): rename the file. Arguments: the new name of the file without
 1121+ the directory.
 1122+ findFirstPageIDInFile(): set self.firstPageID by examining the file contents,
 1123+ returning the value, or None if there is no pageID. We uncompress the file
 1124+ if needed and look through the first 500 lines.
 1125+
 1126+ plus the usual file methods (read, write, open, close)
 1127+
 1128+ useful variables:
 1129+
 1130+ firstPageID Determined by examining the first few hundred lines of the contents,
 1131+ looking for page and id tags, wihout other tags in between. (hmm)
 1132+ filename full filename with directory
 1133+ """
 1134+ def __init__(self, wiki, filename, fileObj = None):
 1135+ """takes full filename including path"""
 1136+ self._wiki = wiki
 1137+ self.filename = filename
 1138+ self.firstLines = None
 1139+ self.isTruncated = None
 1140+ self.firstPageID = None
 1141+ self.dirname = os.path.dirname(filename)
 1142+ if fileObj:
 1143+ self.fileObj = fileObj
 1144+ else:
 1145+ self.fileObj = DumpFilename(wiki)
 1146+ self.fileObj.newFromFilename(os.path.basename(filename))
 1147+
 1148+ def md5Sum(self):
 1149+ if not self.filename:
 1150+ return None
7721151 summer = hashlib.md5()
773 - infile = file(filename, "rb")
 1152+ infile = file(self.filename, "rb")
7741153 bufsize = 4192 * 32
7751154 buffer = infile.read(bufsize)
7761155 while buffer:
@@ -778,61 +1157,133 @@
7791158 infile.close()
7801159 return summer.hexdigest()
7811160
782 - def _md5FileLine(self, filename):
783 - return "%s %s\n" % (self._md5File(filename), os.path.basename(filename))
 1161+ def getFirst500Lines(self):
 1162+ if self.firstLines:
 1163+ return(self.firstLines)
7841164
785 - def _saveChecksum(self, file, output, runner):
786 - runner.debug("Checksumming %s" % file)
787 - path = self.dumpDir.publicPath(file)
788 - if os.path.exists(path):
789 - checksum = self._md5FileLine(path)
790 - output.write(checksum)
 1165+ if not self.filename or not exists(self.filename):
 1166+ return None
7911167
792 - def _getMd5FileDirName(self):
793 - return os.path.join(self.wiki.publicDir(), self.wiki.date);
 1168+ pipeline = self.setupUncompressionCommand()
7941169
795 -class DumpDir(object):
796 - def __init__(self, wiki, dbName, date):
797 - self._wiki = wiki
798 - self._dbName = dbName
799 - self._date = date
 1170+ if (not exists( self._wiki.config.head ) ):
 1171+ raise BackupError("head command %s not found" % self._wiki.config.head)
 1172+ head = self._wiki.config.head
 1173+ headEsc = MiscUtils.shellEscape(head)
 1174+ pipeline.append([ head, "-500" ])
 1175+ # without shell
 1176+ p = CommandPipeline(pipeline, quiet=True)
 1177+ p.runPipelineAndGetOutput()
 1178+ self.firstLines = p.output()
 1179+ return(self.firstLines)
8001180
801 - def buildDir(self, base, version):
802 - return join(base, version)
 1181+ # unused
 1182+ # xml, sql, text
 1183+ def determineFileContentsType(self):
 1184+ output = self.getFirst500Lines()
 1185+ if (output):
 1186+ pageData = output
 1187+ if (pageData.startswith('<mediawiki')):
 1188+ return('xml')
 1189+ if (pageData.startswith('-- MySQL dump')):
 1190+ return('sql')
 1191+ return('txt')
 1192+ return(None)
8031193
804 - def buildPath(self, base, version, filename):
805 - return join(base, version, "%s-%s-%s" % (self._dbName, version, filename))
 1194+ def setupUncompressionCommand(self):
 1195+ if not self.filename or not exists(self.filename):
 1196+ return None
 1197+ pipeline = []
 1198+ if self.fileObj.fileExt == 'bz2':
 1199+ command = [ self._wiki.config.bzip2, '-dc' ]
 1200+ elif self.fileObj.fileExt == 'gz':
 1201+ command = [ self._wiki.config.gzip, '-dc' ]
 1202+ elif self.fileObj.fileExt == '7z':
 1203+ command = [ self._wiki.config.sevenzip, "e", "-so" ]
 1204+ else:
 1205+ command = [ self._wiki.config.cat ]
8061206
807 - def privatePath(self, filename):
808 - """Take a given filename in the private dump dir for the selected database."""
809 - return self.buildPath(self._wiki.privateDir(), self._date, filename)
 1207+ if (not exists( command[0] ) ):
 1208+ raise BackupError( "command %s to uncompress/read file not found" % command[0] )
 1209+ command.append( self.filename )
 1210+ pipeline.append(command)
 1211+ return(pipeline)
8101212
811 - def publicPath(self, filename):
812 - """Take a given filename in the public dump dir for the selected database.
813 - If this database is marked as private, will use the private dir instead.
814 - """
815 - return self.buildPath(self._wiki.publicDir(), self._date, filename)
 1213+ # unused
 1214+ # return its first and last page ids from name or from contents, depending
 1215+ # return its date
 1216+
 1217+ # fixme what happens if this is not an xml dump? errr. must detect and bail immediately?
 1218+ # maybe instead of all that we should just open the file ourselves, read a few lines... oh.
 1219+ # right. stupid compressed files. um.... do we have stream wrappers? no. this is python
 1220+ # what's the easy was to read *some* compressed data into a buffer?
 1221+ def findFirstPageIDInFile(self):
 1222+ if (self.firstPageID):
 1223+ return(self.firstPageID)
 1224+ output = self.getFirst500Lines()
 1225+ if (output):
 1226+ pageData = output
 1227+ titleAndIDPattern = re.compile('<title>(?P<title>.+?)</title>\s*' + '<id>(?P<pageid>\d+?)</id>')
 1228+ result = titleAndIDPattern.search(pageData)
 1229+ if (result):
 1230+ self.firstPageID = result.group('pageid')
 1231+ return(self.firstPageID)
8161232
817 - def latestDir(self):
818 - return self.buildDir(self._wiki.publicDir(), "latest")
 1233+ def checkIfTruncated(self):
 1234+ if self.isTruncated:
 1235+ return self.isTruncated
 1236+ if self.fileObj.fileExt == "bz2":
 1237+ if (not exists( self._wiki.config.checkforbz2footer ) ):
 1238+ raise BackupError("checkforbz2footer command %s not found" % runner.wiki.config.checkforbz2footer)
 1239+ checkforbz2footer = self._wiki.config.checkforbz2footer
 1240+ pipeline = []
 1241+ pipeline.append([ checkforbz2footer, self.filename ])
 1242+ p = CommandPipeline(pipeline, quiet=True)
 1243+ p.runPipelineAndGetOutput()
 1244+ if not p.exitedSuccessfully():
 1245+ self.isTruncated = True
 1246+ else:
 1247+ self.isTruncated = False
 1248+ else:
 1249+ if self.fileObj.fileExt == 'gz':
 1250+ command = [ "%s -dc %s > /dev/null" % (self._wiki.config.gzip, self.filename ) ]
 1251+ elif self.fileObj.fileExt == '7z':
 1252+ command = [ "%s e -so %s > /dev/null" % (self._wiki.config.sevenzip, self.filename ) ]
 1253+ else:
 1254+ # we do't know how to handle this type of file.
 1255+ return self.isTruncated
 1256+ p = CommandPipeline(pipeline, quiet=True)
 1257+ p.runPipelineAndGetOutput()
 1258+ if not p.exitedSuccessfully():
 1259+ self.isTruncated = False
 1260+ else:
 1261+ self.isTruncated = True
8191262
820 - def latestPath(self, filename):
821 - return self.buildPath(self._wiki.publicDir(), "latest", filename)
 1263+ return self.isTruncated
8221264
823 - def webPath(self, filename):
824 - return self.buildPath(self._wiki.webDir(), self._date, filename)
825 -
 1265+ def getSize(self):
 1266+ if (exists(self.filename)):
 1267+ return os.path.getsize(self.filename)
 1268+ else:
 1269+ return None
 1270+
 1271+ def rename(self, newname):
 1272+ try:
 1273+ os.rename(self.filename, os.path.join(self.dirname,newname))
 1274+ except:
 1275+ raise BackupError("failed to rename file %s" % self.filename)
 1276+
 1277+ self.filename = os.path.join(self.dirname,newname)
 1278+
8261279 # everything that has to do with reporting the status of a piece
8271280 # of a dump is collected here
8281281 class Status(object):
829 - def __init__(self, wiki, dumpDir, date, items, checksums, enabled, noticeFile = None, errorCallback=None):
 1282+ def __init__(self, wiki, dumpDir, items, checksums, enabled, noticeFile = None, errorCallback=None):
8301283 self.wiki = wiki
8311284 self.dbName = wiki.dbName
8321285 self.dumpDir = dumpDir
8331286 self.items = items
8341287 self.checksums = checksums
835 - self.date = date
836 - # this is just a glorified name for "give me a logging facility"
8371288 self.noticeFile = noticeFile
8381289 self.errorCallback = errorCallback
8391290 self.failCount = 0
@@ -848,24 +1299,28 @@
8491300 subject = "Dump failure for " + self.dbName
8501301 message = self.wiki.config.readTemplate("errormail.txt") % {
8511302 "db": self.dbName,
852 - "date": self.date,
 1303+ "date": self.wiki.date,
8531304 "time": TimeUtils.prettyTime(),
854 - "url": "/".join((self.wiki.config.webRoot, self.dbName, self.date, ''))}
 1305+ "url": "/".join((self.wiki.config.webRoot, self.dbName, self.wiki.date, ''))}
8551306 self.wiki.config.mail(subject, message)
8561307
857 - # this is a per-dump-item report (well per file generated by the item)
 1308+ # this is a per-dump-item report (well, per file generated by the item)
8581309 # Report on the file size & item status of the current output and output a link if we are done
859 - def reportFile(self, file, itemStatus):
860 - filepath = self.dumpDir.publicPath(file)
861 - if itemStatus == "in-progress" and exists (filepath):
862 - size = FileUtils.prettySize(getsize(filepath))
863 - return "<li class='file'>%s %s (written) </li>" % (file, size)
864 - elif itemStatus == "done" and exists(filepath):
865 - size = FileUtils.prettySize(getsize(filepath))
866 - webpath = self.dumpDir.webPath(file)
867 - return "<li class='file'><a href=\"%s\">%s</a> %s</li>" % (webpath, file, size)
 1310+ def reportFile(self, fileObj, itemStatus):
 1311+ filename = self.dumpDir.filenamePublicPath(fileObj)
 1312+ if (exists(filename)):
 1313+ size = os.path.getsize(filename)
8681314 else:
869 - return "<li class='missing'>%s</li>" % file
 1315+ itemStatus = "missing"
 1316+ size = 0
 1317+ size = FileUtils.prettySize(size)
 1318+ if itemStatus == "in-progress":
 1319+ return "<li class='file'>%s %s (written) </li>" % (fileObj.filename, size)
 1320+ elif itemStatus == "done":
 1321+ webpath = self.dumpDir.webPath(fileObj)
 1322+ return "<li class='file'><a href=\"%s\">%s</a> %s</li>" % (webpath, fileObj.filename, size)
 1323+ else:
 1324+ return "<li class='missing'>%s</li>" % fileObj.filename
8701325
8711326 #
8721327 # functions internal to the class
@@ -902,14 +1357,15 @@
9031358 statusItems = [self._reportItem(item) for item in self.items]
9041359 statusItems.reverse()
9051360 html = "\n".join(statusItems)
 1361+ f = DumpFilename(self.wiki, None, self.checksums.getChecksumFileNameBasename())
9061362 return self.wiki.config.readTemplate("report.html") % {
9071363 "db": self.dbName,
908 - "date": self.date,
 1364+ "date": self.wiki.date,
9091365 "notice": self.noticeFile.notice,
9101366 "status": self._reportStatusSummaryLine(done),
9111367 "previous": self._reportPreviousDump(done),
9121368 "items": html,
913 - "checksum": self.dumpDir.webPath(self.checksums.getChecksumFileNameBasename()),
 1369+ "checksum": self.dumpDir.webPath(f),
9141370 "index": self.wiki.config.index}
9151371
9161372 def _reportPreviousDump(self, done):
@@ -920,7 +1376,7 @@
9211377 # starting at the beginning to get the new abstracts and stubs).
9221378 try:
9231379 dumpsInOrder = self.wiki.latestDump(all=True)
924 - meIndex = dumpsInOrder.index(self.date)
 1380+ meIndex = dumpsInOrder.index(self.wiki.date)
9251381 # don't wrap around to the newest dump in the list!
9261382 if (meIndex > 0):
9271383 rawDate = dumpsInOrder[meIndex-1]
@@ -964,9 +1420,9 @@
9651421 html = "<li class='%s'><span class='updates'>%s</span> <span class='status'>%s</span> <span class='title'>%s</span>" % (item.status(), item.updated(), item.status(), item.description())
9661422 if item.progress:
9671423 html += "<div class='progress'>%s</div>\n" % item.progress
968 - files = item.listOutputFiles(self)
969 - if files:
970 - listItems = [self.reportFile(file, item.status()) for file in files]
 1424+ fileObjs = item.listOutputFilesToPublish(self.dumpDir)
 1425+ if fileObjs:
 1426+ listItems = [self.reportFile(fileObj, item.status()) for fileObj in fileObjs]
9711427 html += "<ul>"
9721428 detail = item.detail()
9731429 if detail:
@@ -977,9 +1433,8 @@
9781434 return html
9791435
9801436 class NoticeFile(object):
981 - def __init__(self, wiki, date, notice, enabled):
 1437+ def __init__(self, wiki, notice, enabled):
9821438 self.wiki = wiki
983 - self.date = date
9841439 self.notice = notice
9851440 self._enabled = enabled
9861441 self.writeNoticeFile()
@@ -995,7 +1450,7 @@
9961451 # addnotice, stuff notice in a file for other jobs etc
9971452 elif self.notice != "":
9981453 noticeDir = self._getNoticeDir()
999 - FileUtils.writeFile(noticeDir, noticeFile, self.notice, self.wiki.config.fileperms)
 1454+ FileUtils.writeFile(self.wiki.config.tempDir, noticeFile, self.notice, self.wiki.config.fileperms)
10001455 # default case. if there is a file get the contents, otherwise
10011456 # we have empty contents, all good
10021457 else:
@@ -1015,13 +1470,13 @@
10161471 # functions internal to class
10171472 #
10181473 def _getNoticeFilename(self):
1019 - return os.path.join(self.wiki.publicDir(), self.date, "notice.txt")
 1474+ return os.path.join(self.wiki.publicDir(), self.wiki.date, "notice.txt")
10201475
10211476 def _getNoticeDir(self):
1022 - return os.path.join(self.wiki.publicDir(), self.date);
 1477+ return os.path.join(self.wiki.publicDir(), self.wiki.date)
10231478
10241479 class Runner(object):
1025 - def __init__(self, wiki, date=None, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False):
 1480+ def __init__(self, wiki, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False):
10261481 self.wiki = wiki
10271482 self.dbName = wiki.dbName
10281483 self.prefetch = prefetch
@@ -1042,7 +1497,7 @@
10431498 self._noticeFileEnabled = True
10441499 self._makeDirEnabled = True
10451500 self._cleanOldDumpsEnabled = True
1046 - self._cleanupOldFilesEnabled = False
 1501+ self._cleanupOldFilesEnabled = True
10471502 self._checkForTruncatedFilesEnabled = True
10481503
10491504 if self.dryrun or self._chunkToDo:
@@ -1054,40 +1509,35 @@
10551510 self._noticeFileEnabled = False
10561511 self._makeDirEnabled = False
10571512 self._cleanOldDumpsEnabled = False
1058 - self._cleanupOldFilesEnables = False
10591513
10601514 if self.dryrun:
10611515 self._loggingEnabled = False
10621516 self._checkForTruncatedFilesEnabled = False
 1517+ self._cleanupOldFilesEnabled = False
10631518
1064 - if date:
1065 - # Override, continuing a past dump?
1066 - self.date = date
1067 - else:
1068 - self.date = TimeUtils.today()
1069 - wiki.setDate(self.date)
1070 -
10711519 self.jobRequested = job
10721520 self.dbServerInfo = DbServerInfo(self.wiki, self.dbName, self.logAndPrint)
1073 - self.dumpDir = DumpDir(self.wiki, self.dbName, self.date)
 1521+ self.dumpDir = DumpDir(self.wiki, self.dbName)
10741522
10751523 self.lastFailed = False
10761524
10771525 # these must come after the dumpdir setup so we know which directory we are in
10781526 if (self._loggingEnabled and self._makeDirEnabled):
1079 - self.logFileName = self.dumpDir.publicPath(self.wiki.config.logFile)
1080 - self.makeDir(join(self.wiki.publicDir(), self.date))
 1527+ fileObj = DumpFilename(self.wiki)
 1528+ fileObj.newFromfilename(self.wiki.config.logFile)
 1529+ self.logFileName = self.dumpDir.filenamePublicPath(fileObj)
 1530+ self.makeDir(os.path.join(self.wiki.publicDir(), self.wiki.date))
10811531 self.log = Logger(self.logFileName)
10821532 thread.start_new_thread(self.logQueueReader,(self.log,))
10831533 self.runInfoFile = RunInfoFile(wiki,self._runInfoFileEnabled)
1084 - self.symLinks = SymLinks(self.wiki, self.dumpDir, self. date, self.logAndPrint, self.debug, self._symLinksEnabled)
 1534+ self.symLinks = SymLinks(self.wiki, self.dumpDir, self.logAndPrint, self.debug, self._symLinksEnabled)
10851535 self.feeds = Feeds(self.wiki,self.dumpDir, self.dbName, self.debug, self._feedsEnabled)
1086 - self.htmlNoticeFile = NoticeFile(self.wiki, self.date, notice, self._noticeFileEnabled)
 1536+ self.htmlNoticeFile = NoticeFile(self.wiki, notice, self._noticeFileEnabled)
10871537 self.checksums = Checksummer(self.wiki, self.dumpDir, self._checksummerEnabled)
10881538
10891539 # some or all of these dumpItems will be marked to run
1090 - self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self.date, self._chunkToDo, self.jobRequested, self.chunkInfo, self.runInfoFile);
1091 - self.status = Status(self.wiki, self.dumpDir, self.date, self.dumpItemList.dumpItems, self.checksums, self._statusEnabled, self.htmlNoticeFile, self.logAndPrint)
 1540+ self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self._chunkToDo, self.jobRequested, self.chunkInfo, self.runInfoFile, self.dumpDir)
 1541+ self.status = Status(self.wiki, self.dumpDir, self.dumpItemList.dumpItems, self.checksums, self._statusEnabled, self.htmlNoticeFile, self.logAndPrint)
10921542
10931543 def logQueueReader(self,log):
10941544 if not log:
@@ -1107,25 +1557,7 @@
11081558 else:
11091559 return ""
11101560
1111 - def removeFile(self, filename):
1112 - os.remove(filename)
1113 -
11141561 # returns 0 on success, 1 on error
1115 - def saveTable(self, table, outfile):
1116 - """Dump a table from the current DB with mysqldump, save to a gzipped sql file."""
1117 - if (not exists( self.wiki.config.gzip ) ):
1118 - raise BackupError("gzip command %s not found" % self.wiki.config.gzip);
1119 - commands = self.dbServerInfo.buildSqlDumpCommand(table, self.wiki.config.gzip)
1120 - return self.saveCommand(commands, outfile)
1121 -
1122 - def saveSql(self, query, outfile):
1123 - """Pass some SQL commands to the server for this DB and save output to a gzipped file."""
1124 - if (not exists( self.wiki.config.gzip ) ):
1125 - raise BackupError("gzip command %s not found" % self.wiki.config.gzip);
1126 - command = self.dbServerInfo.buildSqlCommand(query, self.wiki.config.gzip)
1127 - return self.saveCommand(command, outfile)
1128 -
1129 - # returns 0 on success, 1 on error
11301562 def saveCommand(self, commands, outfile):
11311563 """For one pipeline of commands, redirect output to a given file."""
11321564 commands[-1].extend( [ ">" , outfile ] )
@@ -1177,12 +1609,10 @@
11781610 for cmd in problemCommands:
11791611 errorString = errorString + "%s " % cmd
11801612 self.logAndPrint(errorString)
1181 - # raise BackupError(errorString)
1182 - return 1
 1613+ return 1
11831614
11841615 def debug(self, stuff):
11851616 self.logAndPrint("%s: %s %s" % (TimeUtils.prettyTime(), self.dbName, stuff))
1186 -# print "%s: %s %s" % (MiscUtils.prettyTime(), self.dbName, stuff)
11871617
11881618 def runHandleFailure(self):
11891619 if self.status.failCount < 1:
@@ -1192,15 +1622,17 @@
11931623 self.lastFailed = True
11941624
11951625 def runUpdateItemFileInfo(self, item):
1196 - for f in item.listOutputFiles(self):
1197 - print f
1198 - if exists(self.dumpDir.publicPath(f)):
 1626+ # this will include checkpoint files if they are enabled.
 1627+ for fileObj in item.listOutputFilesToPublish(self.dumpDir):
 1628+ if exists(self.dumpDir.filenamePublicPath(fileObj)):
11991629 # why would the file not exist? because we changed chunk numbers in the
12001630 # middle of a run, and now we list more files for the next stage than there
12011631 # were for earlier ones
1202 - self.symLinks.saveSymlink(f)
1203 - self.feeds.saveFeed(f)
1204 - self.checksums.checksum(f, self)
 1632+ self.symLinks.saveSymlink(fileObj)
 1633+ self.feeds.saveFeed(fileObj)
 1634+ self.checksums.checksum(fileObj, self)
 1635+ self.symLinks.cleanupSymLinks()
 1636+ self.feeds.cleanupFeeds()
12051637
12061638 def run(self):
12071639 if (self.jobRequested):
@@ -1232,8 +1664,8 @@
12331665 # mark all the following jobs to run as well
12341666 self.dumpItemList.markFollowingJobsToRun()
12351667
1236 - self.makeDir(join(self.wiki.publicDir(), self.date))
1237 - self.makeDir(join(self.wiki.privateDir(), self.date))
 1668+ self.makeDir(os.path.join(self.wiki.publicDir(), self.wiki.date))
 1669+ self.makeDir(os.path.join(self.wiki.privateDir(), self.wiki.date))
12381670
12391671 if (self.restart):
12401672 self.logAndPrint("Preparing for restart from job %s of %s" % (self.jobRequested, self.dbName))
@@ -1309,7 +1741,7 @@
13101742 if self._cleanOldDumpsEnabled:
13111743 old = self.wiki.dumpDirs()
13121744 if old:
1313 - if old[-1] == self.date:
 1745+ if old[-1] == self.wiki.date:
13141746 # If we're re-running today's (or jobs from a given day's) dump, don't count it as one
13151747 # of the old dumps to keep... or delete it halfway through!
13161748 old = old[:-1]
@@ -1336,7 +1768,10 @@
13371769 # will have accurate checksums for the run for which it was
13381770 # produced, but not the other files. FIXME
13391771 self.checksums.moveMd5FileIntoPlace()
1340 - self.symLinks.saveSymlink(self.checksums.getChecksumFileNameBasename())
 1772+ dumpFile = DumpFilename(self.wiki, None, self.checksums.getChecksumFileNameBasename())
 1773+ self.symLinks.saveSymlink(dumpFile)
 1774+ self.symLinks.cleanupSymLinks()
 1775+ self.feeds.cleanupFeeds()
13411776
13421777 def makeDir(self, dir):
13431778 if self._makeDirEnabled:
@@ -1347,10 +1782,9 @@
13481783 os.makedirs(dir)
13491784
13501785 class SymLinks(object):
1351 - def __init__(self, wiki, dumpDir, date, logfn, debugfn, enabled):
 1786+ def __init__(self, wiki, dumpDir, logfn, debugfn, enabled):
13521787 self.wiki = wiki
13531788 self.dumpDir = dumpDir
1354 - self.date = date
13551789 self._enabled = enabled
13561790 self.logfn = logfn
13571791 self.debugfn = debugfn
@@ -1363,35 +1797,41 @@
13641798 self.debugfn("Creating %s ..." % dir)
13651799 os.makedirs(dir)
13661800
1367 - def saveSymlink(self, file):
 1801+ def saveSymlink(self, dumpFile):
13681802 if (self._enabled):
1369 - self.makeDir(join(self.wiki.publicDir(), 'latest'))
1370 - real = self.dumpDir.publicPath(file)
1371 - link = self.dumpDir.latestPath(file)
 1803+ self.makeDir(self.dumpDir.latestDir())
 1804+ realfile = self.dumpDir.filenamePublicPath(dumpFile)
 1805+ link = os.path.join(self.dumpDir.latestDir(), dumpFile.filename)
13721806 if exists(link) or os.path.islink(link):
13731807 if os.path.islink(link):
13741808 realfile = os.readlink(link)
13751809 # format of these links should be... ../20110228/elwikidb-20110228-templatelinks.sql.gz
1376 - rellinkpattern = re.compile('^\.\./(20[0-9]+)/');
1377 - dateinlink = rellinkpattern.search(realfile)
1378 - if (dateinlink):
1379 - dateoflinkedfile = dateinlink.group(1)
1380 - dateinterval = int(self.date) - int(dateoflinkedfile)
1381 - else:
1382 - dateinterval = 0
 1810+ rellinkpattern = re.compile('^\.\./(20[0-9]+)/')
 1811+ dateinterval = int(self.wiki.date) - int(dumpFile.date)
13831812 # no file or it's older than ours... *then* remove the link
13841813 if not exists(os.path.realpath(link)) or dateinterval > 0:
1385 - self.debug("Removing old symlink %s" % link)
1386 - runner.removeFile(link)
 1814+ self.debugfn("Removing old symlink %s" % link)
 1815+ os.remove(link)
13871816 else:
13881817 self.logfn("What the hell dude, %s is not a symlink" % link)
13891818 raise BackupError("What the hell dude, %s is not a symlink" % link)
1390 - relative = FileUtils.relativePath(real, dirname(link))
 1819+ relative = FileUtils.relativePath(realfile, os.path.dirname(link))
13911820 # if we removed the link cause it's obsolete, make the new one
1392 - if exists(real) and not exists(link):
 1821+ if exists(realfile) and not exists(link):
13931822 self.debugfn("Adding symlink %s -> %s" % (link, relative))
13941823 os.symlink(relative, link)
13951824
 1825+ def cleanupSymLinks(self):
 1826+ if (self._enabled):
 1827+ latestDir = self.dumpDir.latestDir()
 1828+ files = os.listdir(latestDir)
 1829+ for f in files:
 1830+ link = os.path.join(latestDir,f)
 1831+ if os.path.islink(link):
 1832+ realfile = os.readlink(link)
 1833+ if not exists(realfile):
 1834+ os.remove(link)
 1835+
13961836 class Feeds(object):
13971837 def __init__(self, wiki, dumpDir, dbName, debugfn, enabled):
13981838 self.wiki = wiki
@@ -1400,37 +1840,61 @@
14011841 self.debugfn = debugfn
14021842 self._enabled = enabled
14031843
1404 - def makeDir(self, dir):
 1844+ def makeDir(self, dirname):
14051845 if (self._enabled):
1406 - if exists(dir):
1407 - self.debugfn("Checkdir dir %s ..." % dir)
 1846+ if exists(dirname):
 1847+ self.debugfn("Checkdir dir %s ..." % dirname)
14081848 else:
1409 - self.debugfn("Creating %s ..." % dir)
1410 - os.makedirs(dir)
 1849+ self.debugfn("Creating %s ..." % dirname)
 1850+ os.makedirs(dirname)
14111851
1412 - def saveFeed(self, file):
 1852+ def saveFeed(self, fileObj):
14131853 if (self._enabled):
1414 - self.makeDir(join(self.wiki.publicDir(), 'latest'))
1415 - filePath = self.dumpDir.webPath(file)
1416 - fileName = os.path.basename(filePath)
1417 - webPath = os.path.dirname(filePath)
 1854+ self.makeDir(self.dumpDir.latestDir())
 1855+ filenameAndPath = self.dumpDir.webPath(fileObj)
 1856+ webPath = os.path.dirname(filenameAndPath)
14181857 rssText = self.wiki.config.readTemplate("feed.xml") % {
1419 - "chantitle": file,
 1858+ "chantitle": fileObj.basename,
14201859 "chanlink": webPath,
14211860 "chandesc": "Wikimedia dump updates for %s" % self.dbName,
14221861 "title": webPath,
14231862 "link": webPath,
1424 - "description": xmlEscape("<a href=\"%s\">%s</a>" % (filePath, fileName)),
1425 - "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())}
 1863+ "description": xmlEscape("<a href=\"%s\">%s</a>" % (filenameAndPath, fileObj.filename)),
 1864+ "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) }
14261865 directory = self.dumpDir.latestDir()
1427 - rssPath = self.dumpDir.latestPath(file + "-rss.xml")
1428 - FileUtils.writeFile(directory, rssPath, rssText, self.wiki.config.fileperms)
 1866+ rssPath = os.path.join(self.dumpDir.latestDir(), fileObj.basename + "-rss.xml")
 1867+ FileUtils.writeFile(self.wiki.config.tempDir, rssPath, rssText, self.wiki.config.fileperms)
14291868
 1869+ def cleanupFeeds(self):
 1870+ # call this after sym links in this dir have been cleaned up.
 1871+ # we should probably fix this so there is no such dependency,
 1872+ # but it would mean parsing the contents of the rss file, bleah
 1873+ if (self._enabled):
 1874+ latestDir = self.dumpDir.latestDir()
 1875+ files = os.listdir(latestDir)
 1876+ for f in files:
 1877+ if f.endswith("-rss.xml"):
 1878+ filename = f[:-8];
 1879+ link = os.path.join(latestDir,filename)
 1880+ if not exists(link):
 1881+ os.remove(os.path.join(latestDir,f))
 1882+
14301883 class Dump(object):
14311884 def __init__(self, name, desc):
14321885 self._desc = desc
14331886 self.progress = ""
14341887 self.runInfo = RunInfo(name,"waiting","")
 1888+ self.dumpName = self.getDumpName()
 1889+ self.fileType = self.getFileType()
 1890+ self.fileExt = self.getFileExt()
 1891+ # if var hasn't been defined by a derived class already. (We get
 1892+ # called last by child classes in their constructor, so that
 1893+ # their functions overriding things like the dumpbName can
 1894+ # be set up before we use them to set class attributes.)
 1895+ if not hasattr(self, '_chunksEnabled'):
 1896+ self._chunksEnabled = False
 1897+ if not hasattr(self, '_checkpointsEnabled'):
 1898+ self._checkpointsEnabled = False
14351899
14361900 def name(self):
14371901 return self.runInfo.name()
@@ -1468,10 +1932,21 @@
14691933 """Optionally return additional text to appear under the heading."""
14701934 return None
14711935
1472 - def listOutputFiles(self, runner):
1473 - """Return a list of filenames which should be exported and checksummed"""
1474 - return []
 1936+ def getDumpName(self):
 1937+ """Return the dumpName as it appears in output files for this phase of the dump
 1938+ e.g. pages-meta-history, all-titles-in-ns0, etc"""
 1939+ return ""
14751940
 1941+ def getFileExt(self):
 1942+ """Return the extension of output files for this phase of the dump
 1943+ e.g. bz2 7z etc"""
 1944+ return ""
 1945+
 1946+ def getFileType(self):
 1947+ """Return the type of output files for this phase of the dump
 1948+ e.g. sql xml etc"""
 1949+ return ""
 1950+
14761951 def start(self, runner):
14771952 """Set the 'in progress' flag so we can output status."""
14781953 self.setStatus("in-progress")
@@ -1497,6 +1972,7 @@
14981973 runner.log.addToLogQueue(line)
14991974 sys.stderr.write(line)
15001975 self.progress = line.strip()
 1976+ # FIXME test this a lot!! does the updateStatus work?
15011977 runner.status.updateStatusFiles()
15021978 runner.runInfoFile.saveDumpRunInfoFile(runner.dumpItemList.reportDumpRunInfo())
15031979
@@ -1508,19 +1984,18 @@
15091985 def waitAlarmHandler(self, signum, frame):
15101986 pass
15111987
1512 - def buildRecombineCommandString(self, runner, files, outputFileBasename, compressionCommand, uncompressionCommand, endHeaderMarker="</siteinfo>"):
1513 -# outputFilename = self.buildOutputFilename(runner, outputFileBasename)
1514 - outputFilename = runner.dumpDir.publicPath(outputFileBasename)
 1988+ def buildRecombineCommandString(self, runner, files, outputFile, compressionCommand, uncompressionCommand, endHeaderMarker="</siteinfo>"):
 1989+ outputFilename = runner.dumpDir.filenamePublicPath(outputFile)
15151990 chunkNum = 0
15161991 recombines = []
15171992 if (not exists( runner.wiki.config.head ) ):
1518 - raise BackupError("head command %s not found" % runner.wiki.config.head);
 1993+ raise BackupError("head command %s not found" % runner.wiki.config.head)
15191994 head = runner.wiki.config.head
15201995 if (not exists( runner.wiki.config.tail ) ):
1521 - raise BackupError("tail command %s not found" % runner.wiki.config.tail);
 1996+ raise BackupError("tail command %s not found" % runner.wiki.config.tail)
15221997 tail = runner.wiki.config.tail
15231998 if (not exists( runner.wiki.config.grep ) ):
1524 - raise BackupError("grep command %s not found" % runner.wiki.config.grep);
 1999+ raise BackupError("grep command %s not found" % runner.wiki.config.grep)
15252000 grep = runner.wiki.config.grep
15262001
15272002 # we assume the result is always going to be run in a subshell.
@@ -1536,11 +2011,11 @@
15372012 u = MiscUtils.shellEscape(u)
15382013 for u in compressionCommand:
15392014 u = MiscUtils.shellEscape(u)
1540 - for f in files:
1541 - f = MiscUtils.shellEscape(f)
15422015
1543 - for f in files:
1544 - f = runner.dumpDir.publicPath(f)
 2016+ for fileObj in files:
 2017+ # uh oh FIXME
 2018+# f = MiscUtils.shellEscape(fileObj.filename)
 2019+ f = runner.dumpDir.filenamePublicPath(fileObj)
15452020 chunkNum = chunkNum + 1
15462021 pipeline = []
15472022 uncompressThisFile = uncompressionCommand[:]
@@ -1571,42 +2046,369 @@
15722047 recombineCommandString = "(" + ";".join(recombines) + ")" + "|" + "%s %s" % (compressionCommand, outputFilename)
15732048 return(recombineCommandString)
15742049
1575 - def cleanupOldFiles(self, runner, outputFileBasename):
 2050+ def cleanupOldFiles(self, dumpDir, chunks = False):
15762051 if (runner._cleanupOldFilesEnabled):
1577 - outputFilename = self.buildOutputFilename(runner, outputFileBasename)
1578 - if exists(outputFilename):
1579 - runner.removeFile(outputFilename)
 2052+ files = self.listOutputFilesForCleanup(dumpDir)
 2053+ for f in files:
 2054+ if exists(dumpDir.filenamePublicPath(f)):
 2055+ os.remove(dumpDir.filenamePublicPath(f))
15802056
1581 - def buildOutputFilename(self, runner, outputFileBasename):
1582 - return outputFilename
 2057+ def getChunkList(self):
 2058+ if self._chunksEnabled:
 2059+ if self._chunkToDo:
 2060+ return [ self._chunkToDo ]
 2061+ else:
 2062+ return range(1, len(self._chunks)+1)
 2063+ else:
 2064+ return False
15832065
 2066+ # list all regular output files that exist
 2067+ def listRegularFilesExisting(self, dumpDir, dumpNames = None, date = None, chunks = None):
 2068+ files = []
 2069+ if not dumpNames:
 2070+ dumpNames = [ self.dumpName ]
 2071+ for d in dumpNames:
 2072+ files.extend( dumpDir.getRegularFilesExisting(date, d, self.fileType, self.fileExt, chunks, temp = False))
 2073+ return files
 2074+
 2075+ # list all checkpoint files that exist
 2076+ def listCheckpointFilesExisting(self, dumpDir, dumpNames = None, date = None, chunks = None):
 2077+ files = []
 2078+ if not dumpNames:
 2079+ dumpNames = [ self.dumpName ]
 2080+ for d in dumpNames:
 2081+ files.extend(dumpDir.getCheckpointFilesExisting(date, d, self.fileType, self.fileExt, chunks, temp = False))
 2082+ return files
 2083+
 2084+ # unused
 2085+ # list all temp output files that exist
 2086+ def listTempFilesExisting(self, dumpDir, dumpNames = None, date = None, chunks = None):
 2087+ files = []
 2088+ if not dumpNames:
 2089+ dumpNames = [ self.dumpName ]
 2090+ for d in dumpNames:
 2091+ files.extend( dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks = None, temp = True) )
 2092+ files.extend( dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks = None, temp = True) )
 2093+ return files
 2094+
 2095+ # list checkpoint files that have been produced for specified chunk(s)
 2096+ def listCheckpointFilesPerChunkExisting(self, dumpDir, chunks, dumpNames = None):
 2097+ files = []
 2098+ if not dumpNames:
 2099+ dumpNames = [ self.dumpName ]
 2100+ for d in dumpNames:
 2101+ files.extend(dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks, temp = False))
 2102+ return files
 2103+
 2104+ # list noncheckpoint files that have been produced for specified chunk(s)
 2105+ def listRegularFilesPerChunkExisting(self, dumpDir, chunks, dumpNames = None):
 2106+ files = []
 2107+ if not dumpNames:
 2108+ dumpNames = [ self.dumpName ]
 2109+ for d in dumpNames:
 2110+ files.extend( dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks, temp = False))
 2111+ return files
 2112+
 2113+ # list temp output files that have been produced for specified chunk(s)
 2114+ def listTempFilesPerChunkExisting(self, dumpDir, chunks, dumpNames = None):
 2115+ files = []
 2116+ if not dumpNames:
 2117+ dumpNames = [ self.dumpName ]
 2118+ for d in dumpNames:
 2119+ files.extend( runner.dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks, temp = True) )
 2120+ files.extend( runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks, temp = True) )
 2121+ return files
 2122+
 2123+
 2124+ # unused
 2125+ # list noncheckpoint chunk files that have been produced
 2126+ def listRegularFilesChunkedExisting(self, dumpDir, dumpNames = None, date = None):
 2127+ files = []
 2128+ if not dumpNames:
 2129+ dumpNames = [ self.dumpName ]
 2130+ for d in dumpNames:
 2131+ files.extend(runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks= self.getChunkList(), temp = False))
 2132+ return files
 2133+
 2134+ # unused
 2135+ # list temp output chunk files that have been produced
 2136+ def listTempFilesChunkedExisting(self, runner, dumpNames = None):
 2137+ files = []
 2138+ if not dumpNames:
 2139+ dumpNames = [ self.dumpName ]
 2140+ for d in dumpNames:
 2141+ files.extend( runner.dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks = self.getChunkList(), temp = True) )
 2142+ files.extend( runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks = self.getChunkList(), temp = True) )
 2143+ return files
 2144+
 2145+ # unused
 2146+ # list checkpoint files that have been produced for chunkless run
 2147+ def listCheckpointFilesChunklessExisting(self, runner, dumpNames = None):
 2148+ if not dumpNames:
 2149+ dumpNames = [ self.dumpName ]
 2150+ files = []
 2151+ for d in dumpNames:
 2152+ files.extend(runner.dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks = False, temp = False))
 2153+ return files
 2154+
 2155+ # unused
 2156+ # list non checkpoint files that have been produced for chunkless run
 2157+ def listRegularFilesChunklessExisting(self, runner, dumpNames = None):
 2158+ if not dumpNames:
 2159+ dumpNames = [ self.dumpName ]
 2160+ files = []
 2161+ for d in dumpNames:
 2162+ files.extend(runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks = False, temp = False))
 2163+ return files
 2164+
 2165+ # unused
 2166+ # list non checkpoint files that have been produced for chunkless run
 2167+ def listTempFilesChunklessExisting(self, runner, dumpNames = None):
 2168+ if not dumpNames:
 2169+ dumpNames = [ self.dumpName ]
 2170+ files = []
 2171+ for d in dumpNames:
 2172+ files.extend(runner.dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks = False, temp = True))
 2173+ files.extend(runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks = False, temp = True))
 2174+ return files
 2175+
 2176+
 2177+ # internal function which all the public get*Possible functions call
 2178+ # list all files that could be created for the given dumpName, filtering by the given args.
 2179+ # by definition, checkpoint files are never returned in such a list, as we don't
 2180+ # know where a checkpoint might be taken (which pageId start/end).
 2181+ #
 2182+ # if we get None for an arg then we accept all values for that arg in the filename
 2183+ # if we get False for an arg (chunk, temp), we reject any filename which contains a value for that arg
 2184+ # if we get True for an arg (temp), we accept only filenames which contain a value for the arg
 2185+ # chunks should be a list of value(s), or True / False / None
 2186+ def _getFilesPossible(self, dumpDir, date = None, dumpName = None, fileType = None, fileExt = None, chunks = None, temp = False ):
 2187+ files = []
 2188+ if dumpName == None:
 2189+ dumpname = self.dumpName
 2190+ if chunks == None or chunks == False:
 2191+ files.append(DumpFilename(dumpDir._wiki, date, dumpName, fileType, fileExt, None, None, temp))
 2192+ if chunks == True or chunks == None:
 2193+ chunks = self.getChunksList()
 2194+ if chunks:
 2195+ for i in chunks:
 2196+ files.append(DumpFilename(dumpDir._wiki, date, dumpName, fileType, fileExt, i, None, temp))
 2197+ return files
 2198+
 2199+ # unused
 2200+ # based on dump name, get all the output files we expect to generate except for temp files
 2201+ def getRegularFilesPossible(self, dumpDir, dumpNames = None):
 2202+ if not dumpNames:
 2203+ dumpNames = [ self.dumpName ]
 2204+ files = []
 2205+ for d in dumpNames:
 2206+ files.extend( self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = None, temp = False) )
 2207+ return files
 2208+
 2209+ # unused
 2210+ # based on dump name, get all the temp output files we expect to generate
 2211+ def getTempFilesPossible(self, dumpDir, dumpNames = None):
 2212+ if not dumpNames:
 2213+ dumpNames = [ self.dumpName ]
 2214+ files = []
 2215+ for d in dumpNames:
 2216+ files.extend( self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = None, temp = True ) )
 2217+ return files
 2218+
 2219+ # based on dump name, chunks, etc. get all the output files we expect to generate for these chunks
 2220+ def getRegularFilesPerChunkPossible(self, dumpDir, chunks, dumpNames = None):
 2221+ if not dumpNames:
 2222+ dumpNames = [ self.dumpName ]
 2223+ files = []
 2224+ for d in dumpNames:
 2225+ files.extend( self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks, temp = False) )
 2226+ return files
 2227+
 2228+ # unused
 2229+ # based on dump name, chunks, etc. get all the temp files we expect to generate for these chunks
 2230+ def getTempFilesPerChunkPossible(self, dumpDir, chunks, dumpNames = None):
 2231+ if not dumpNames:
 2232+ dumpNames = [ self.dumpName ]
 2233+ files = []
 2234+ for d in dumpNames:
 2235+ files.extend(self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks, temp = True))
 2236+ return files
 2237+
 2238+
 2239+ # unused
 2240+ # based on dump name, chunks, etc. get all the output files we expect to generate for these chunks
 2241+ def getRegularFilesChunkedPossible(self, dumpDir, dumpNames = None):
 2242+ if not dumpNames:
 2243+ dumpNames = [ self.dumpName ]
 2244+ files = []
 2245+ for d in dumpNames:
 2246+ files.extend( self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = True, temp = False) )
 2247+ return files
 2248+
 2249+ # unused
 2250+ # based on dump name, chunks, etc. get all the temp files we expect to generate for these chunks
 2251+ def getTempFilesPerChunkedPossible(self, dumpDir, dumpNames = None):
 2252+ if not dumpNames:
 2253+ dumpNames = [ self.dumpName ]
 2254+ files = []
 2255+ for d in dumpNames:
 2256+ files.extend(self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = True, temp = True))
 2257+ return files
 2258+
 2259+ # unused
 2260+ # list noncheckpoint files that should be produced for chunkless run
 2261+ def getRegularFilesChunklessPossible(self, dumpDir, dumpNames = None):
 2262+ if not dumpNames:
 2263+ dumpNames = [ self.dumpName ]
 2264+ files = []
 2265+ for d in dumpNames:
 2266+ files.extend(self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = False, temp = False))
 2267+ return files
 2268+
 2269+ # unused
 2270+ # list temp output files that should be produced for chunkless run
 2271+ def getTempFilesChunklessPossible(self, dumpDir, dumpNames = None):
 2272+ if not dumpNames:
 2273+ dumpNames = [ self.dumpName ]
 2274+ files = []
 2275+ for d in dumpNames:
 2276+ files.extend(self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = False, temp = True))
 2277+ return files
 2278+
 2279+################################
 2280+#
 2281+# these routines are all used for listing output files for various purposes...
 2282+#
 2283+#
 2284+ # Used for updating md5 lists, index.html
 2285+ # Includes: checkpoints, chunks, chunkless, temp files if they exist. At end of run temp files must be gone.
 2286+ # This is *all* output files for the dumpName, regardless of what's being re-run.
 2287+ def listOutputFilesToPublish(self, dumpDir, dumpNames = None):
 2288+ # some stages (eg XLMStubs) call this for several different dumpNames
 2289+ if (dumpNames == None):
 2290+ dumpNames = [ self.dumpName ]
 2291+ files = []
 2292+ if (self._checkpointsEnabled):
 2293+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 2294+ files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2295+ files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2296+ else:
 2297+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 2298+ files.extend(self.getRegularFilesPerChunkPossible(dumpDir, self.getChunkList(), dumpNames))
 2299+ return files
 2300+
 2301+ # called at end of job run to see if results are intact or are garbage and must be tossed/rerun.
 2302+ # Includes: checkpoints, chunks, chunkless. Not included: temp files.
 2303+ # This is only the files that should be produced from this run. So it is limited to a specific
 2304+ # chunk if that's being redone, or to all chunks if the whole job is being redone, or to the chunkless
 2305+ # files if there are no chunks enabled.
 2306+ def listOutputFilesToCheckForTruncation(self, dumpDir, dumpNames = None):
 2307+ # some stages (eg XLMStubs) call this for several different dumpNames
 2308+ if (dumpNames == None):
 2309+ dumpNames = [ self.dumpName ]
 2310+ files = []
 2311+ if (self._checkpointsEnabled):
 2312+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 2313+ files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2314+ else:
 2315+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 2316+ files.extend(self.getRegularFilesPerChunkPossible(dumpDir, self.getChunkList(), dumpNames))
 2317+ return files
 2318+
 2319+ # called when putting together commands to produce output for the job.
 2320+ # Includes: chunks, chunkless, temp files. Not included: checkpoint files.
 2321+ # This is only the files that should be produced from this run. So it is limited to a specific
 2322+ # chunk if that's being redone, or to all chunks if the whole job is being redone, or to the chunkless
 2323+ # files if there are no chunks enabled.
 2324+ def listOutputFilesForBuildCommand(self, dumpDir, dumpNames = None):
 2325+ # some stages (eg XLMStubs) call this for several different dumpNames
 2326+ if (dumpNames == None):
 2327+ dumpNames = [ self.dumpName ]
 2328+ files = []
 2329+ if (self._checkpointsEnabled):
 2330+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 2331+ files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2332+ else:
 2333+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 2334+ files.extend(self.getRegularFilesPerChunkPossible(dumpDir, self.getChunkList(), dumpNames))
 2335+ return files
 2336+
 2337+ # called before job run to cleanup old files left around from any previous run(s)
 2338+ # Includes: checkpoints, chunks, chunkless, temp files if they exist.
 2339+ # This is only the files that should be produced from this run. So it is limited to a specific
 2340+ # chunk if that's being redone, or to all chunks if the whole job is being redone, or to the chunkless
 2341+ # files if there are no chunks enabled.
 2342+ def listOutputFilesForCleanup(self, dumpDir, dumpNames = None):
 2343+ # some stages (eg XLMStubs) call this for several different dumpNames
 2344+ if (dumpNames == None):
 2345+ dumpNames = [ self.dumpName ]
 2346+ files = []
 2347+ if (self._checkpointsEnabled):
 2348+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 2349+ files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2350+ files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2351+ else:
 2352+ # fixme this should be a list
 2353+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 2354+ files.extend(self.listRegularFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2355+ return files
 2356+
 2357+ # used to generate list of input files for other phase (e.g. recombine, recompress)
 2358+ # Includes: checkpoints, chunks/chunkless files depending on whether chunks are enabled. Not included: temp files.
 2359+ # This is *all* output files for the job, regardless of what's being re-run. The caller can sort out which
 2360+ # files go to which chunk, in case input is needed on a per chunk basis. (Is that going to be annoying? Nah,
 2361+ # and we only do it once per job so who cares.)
 2362+ def listOutputFilesForInput(self, dumpDir, dumpNames = None):
 2363+ # some stages (eg XLMStubs) call this for several different dumpNames
 2364+ if (dumpNames == None):
 2365+ dumpNames = [ self.dumpName ]
 2366+ files = []
 2367+ if (self._checkpointsEnabled):
 2368+ files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2369+ else:
 2370+ files.extend(self.listRegularFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 2371+ return files
 2372+
15842373 class PublicTable(Dump):
15852374 """Dump of a table using MySQL's mysqldump utility."""
15862375
15872376 def __init__(self, table, name, desc):
 2377+ self._table = table
 2378+ self._chunksEnabled = False
15882379 Dump.__init__(self, name, desc)
1589 - self._table = table
15902380
1591 - def _file(self):
1592 - return self._table + ".sql.gz"
 2381+ def getDumpName(self):
 2382+ return(self._table)
15932383
1594 - def _path(self, runner):
1595 - return runner.dumpDir.publicPath(self._file())
 2384+ def getFileType(self):
 2385+ return "sql"
15962386
 2387+ def getFileExt(self):
 2388+ return "gz"
 2389+
15972390 def run(self, runner):
15982391 retries = 0
15992392 # try this initially and see how it goes
16002393 maxretries = 3
1601 - error = runner.saveTable(self._table, self._path(runner))
 2394+ files = self.listOutputFilesForBuildCommand(runner.dumpDir)
 2395+ if (len(files) > 1):
 2396+ raise BackupError("table dump %s trying to produce more than one file" % self.dumpName)
 2397+ outputFile = files[0]
 2398+ error = self.saveTable(self._table, runner.dumpDir.filenamePublicPath(outputFile), runner)
16022399 while (error and retries < maxretries):
16032400 retries = retries + 1
16042401 time.sleep(5)
1605 - error = runner.saveTable(self._table, self._path(runner))
 2402+ error = self.saveTable(self._table, runner.dumpDir.filenamePublicPath(outputFile), runner)
16062403 if (error):
16072404 raise BackupError("error dumping table %s" % self._table)
16082405
1609 - def listOutputFiles(self, runner):
1610 - return [self._file()]
 2406+ # returns 0 on success, 1 on error
 2407+ def saveTable(self, table, outfile, runner):
 2408+ """Dump a table from the current DB with mysqldump, save to a gzipped sql file."""
 2409+ if (not exists( runner.wiki.config.gzip ) ):
 2410+ raise BackupError("gzip command %s not found" % runner.wiki.config.gzip)
 2411+ commands = runner.dbServerInfo.buildSqlDumpCommand(table, runner.wiki.config.gzip)
 2412+ return runner.saveCommand(commands, outfile)
16112413
16122414 class PrivateTable(PublicTable):
16132415 """Hidden table dumps for private data."""
@@ -1614,77 +2416,120 @@
16152417 def description(self):
16162418 return self._desc + " (private)"
16172419
1618 - def _path(self, runner):
1619 - return runner.dumpDir.privatePath(self._file())
 2420+ def run(self, runner):
 2421+ retries = 0
 2422+ # try this initially and see how it goes
 2423+ maxretries = 3
 2424+ files = self.listOutputFilesForBuildCommand(runner.dumpDir)
 2425+ if (len(files) > 1):
 2426+ raise BackupError("table dump %s trying to produce more than one file" % self.dumpName)
 2427+ outputFile = files[0]
 2428+ error = self.saveTable(self._table, runner.dumpDir.filenamePrivatePath(outputFile), runner)
 2429+ while (error and retries < maxretries):
 2430+ retries = retries + 1
 2431+ time.sleep(5)
 2432+ error = self.saveTable(self._table, runner.dumpDir.filenamePrivatePath(outputFile), runner)
 2433+ if (error):
 2434+ raise BackupError("error dumping table %s" % self._table)
16202435
1621 - def listOutputFiles(self, runner):
 2436+ def listOutputFilesToPublish(self, dumpDir):
16222437 """Private table won't have public files to list."""
16232438 return []
16242439
1625 -
16262440 class XmlStub(Dump):
16272441 """Create lightweight skeleton dumps, minus bulk text.
16282442 A second pass will import text from prior dumps or the database to make
16292443 full files for the public."""
16302444
1631 - def __init__(self, name, desc, chunkToDo, chunks = False):
1632 - Dump.__init__(self, name, desc)
 2445+ def __init__(self, name, desc, chunkToDo, chunks = False, checkpoints = False):
16332446 self._chunkToDo = chunkToDo
16342447 self._chunks = chunks
 2448+ if self._chunks:
 2449+ self._chunksEnabled = True
 2450+ self.historyDumpName = "stub-meta-history"
 2451+ self.currentDumpName = "stub-meta-current"
 2452+ self.articlesDumpName = "stub-articles"
 2453+ if checkpoints:
 2454+ self._checkpointsEnabled = True
 2455+ Dump.__init__(self, name, desc)
16352456
16362457 def detail(self):
16372458 return "These files contain no page text, only revision metadata."
16382459
1639 - def listOutputFiles(self, runner, unnumbered=False):
1640 - if (self._chunks):
1641 - if (self._chunkToDo):
1642 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
1643 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
1644 - files = []
1645 - files.append("stub-meta-history%s.xml.gz" % self._chunkToDo)
1646 - files.append("stub-meta-current%s.xml.gz" % self._chunkToDo)
1647 - files.append("stub-articles%s.xml.gz" % self._chunkToDo)
1648 - return files
1649 - else:
1650 - files = []
1651 - for i in range(1, len(self._chunks) + 1):
1652 - files.append("stub-meta-history%s.xml.gz" % i)
1653 - files.append("stub-meta-current%s.xml.gz" % i)
1654 - files.append("stub-articles%s.xml.gz" % i)
1655 - return files
1656 - else:
1657 - return ["stub-meta-history.xml.gz",
1658 - "stub-meta-current.xml.gz",
1659 - "stub-articles.xml.gz"]
 2460+ def getFileType(self):
 2461+ return "xml"
16602462
1661 - def buildCommand(self, runner, chunk = 0):
1662 - history = self.buildHistoryOutputFilename(runner, chunk)
1663 - current = self.buildCurrentOutputFilename(runner, chunk)
1664 - articles = self.buildArticlesOutputFilename(runner, chunk)
 2463+ def getFileExt(self):
 2464+ return "gz"
16652465
 2466+ def getDumpName(self):
 2467+ return 'stub'
 2468+
 2469+ def listDumpNames(self):
 2470+ dumpNames = [ self.historyDumpName, self.currentDumpName, self.articlesDumpName ]
 2471+ return dumpNames
 2472+
 2473+ def listOutputFilesToPublish(self, dumpDir):
 2474+ dumpNames = self.listDumpNames()
 2475+ files = []
 2476+ files.extend(Dump.listOutputFilesToPublish(self, dumpDir, dumpNames))
 2477+ return files
 2478+
 2479+ def listOutputFilesToCheckForTruncation(self, dumpDir):
 2480+ dumpNames = self.listDumpNames()
 2481+ files = []
 2482+ files.extend(Dump.listOutputFilesToCheckForTruncation(self, dumpDir, dumpNames))
 2483+ return files
 2484+
 2485+ def listOutputFilesForBuildCommand(self, dumpDir):
 2486+ dumpNames = self.listDumpNames()
 2487+ files = []
 2488+ files.extend(Dump.listOutputFilesForBuildCommand(self, dumpDir, dumpNames))
 2489+ return files
 2490+
 2491+ def listOutputFilesForCleanup(self, dumpDir):
 2492+ # fixme should this pass a list instead of one item?
 2493+ dumpNames = self.listDumpNames()
 2494+ files = []
 2495+ files.extend(Dump.listOutputFilesForCleanup(self, dumpDir, dumpNames))
 2496+ return files
 2497+
 2498+ def listOutputFilesForInput(self, dumpDir, dumpNames = None):
 2499+ if dumpNames == None:
 2500+ dumpNames = self.listDumpNames()
 2501+ files = []
 2502+ files.extend(Dump.listOutputFilesForInput(self, dumpDir, dumpNames))
 2503+ return files
 2504+
 2505+ def buildCommand(self, runner, f):
16662506 if (not exists( runner.wiki.config.php ) ):
1667 - raise BackupError("php command %s not found" % runner.wiki.config.php);
 2507+ raise BackupError("php command %s not found" % runner.wiki.config.php)
 2508+
 2509+ # fixme we have a list of all the files for all three dumpNames, we want to split them up by dumpName. oops.
 2510+ articlesFile = runner.dumpDir.filenamePublicPath(f)
 2511+ historyFile = runner.dumpDir.filenamePublicPath(DumpFilename(runner.wiki, f.date, self.historyDumpName, f.fileType, f.fileExt, f.chunk, f.checkpoint, f.temp))
 2512+ currentFile = runner.dumpDir.filenamePublicPath(DumpFilename(runner.wiki, f.date, self.currentDumpName, f.fileType, f.fileExt, f.chunk, f.checkpoint, f.temp))
16682513 command = [ "%s" % runner.wiki.config.php,
16692514 "-q", "%s/maintenance/dumpBackup.php" % runner.wiki.config.wikiDir,
16702515 "--wiki=%s" % runner.dbName,
16712516 "--full", "--stub", "--report=10000",
16722517 "%s" % runner.forceNormalOption(),
1673 - "--output=gzip:%s" % history,
1674 - "--output=gzip:%s" % current,
1675 - "--filter=latest", "--output=gzip:%s" % articles,
 2518+ "--output=gzip:%s" % historyFile,
 2519+ "--output=gzip:%s" % currentFile,
 2520+ "--filter=latest", "--output=gzip:%s" % articlesFile,
16762521 "--filter=latest", "--filter=notalk", "--filter=namespace:!NS_USER" ]
1677 - if (chunk):
 2522+
 2523+ if (f.chunk):
16782524 # set up start end end pageids for this piece
16792525 # note there is no page id 0 I guess. so we start with 1
16802526 # start = runner.pagesPerChunk()*(chunk-1) + 1
1681 - start = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1
 2527+ start = sum([ self._chunks[i] for i in range(0,f.chunkInt-1)]) + 1
16822528 startopt = "--start=%s" % start
16832529 # if we are on the last chunk, we should get up to the last pageid,
16842530 # whatever that is.
16852531 command.append(startopt)
1686 - if chunk < len(self._chunks):
1687 - # end = start + runner.pagesPerChunk()
1688 - end = sum([ self._chunks[i] for i in range(0,chunk)]) +1
 2532+ if f.chunkInt < len(self._chunks):
 2533+ end = sum([ self._chunks[i] for i in range(0,f.chunkInt)]) +1
16892534 endopt = "--end=%s" % end
16902535 command.append(endopt)
16912536
@@ -1692,107 +2537,66 @@
16932538 series = [ pipeline ]
16942539 return(series)
16952540
1696 - def cleanupOldFiles(self, runner, chunk = 0):
1697 - if (runner._cleanupOldFilesEnabled):
1698 - fileList = self.buildOutputFilenames(runner, chunk)
1699 - for filename in fileList:
1700 - if exists(filename):
1701 - runner.removeFile(filename)
1702 -
1703 - def buildHistoryOutputFilename(self, runner, chunk = 0):
1704 - if (chunk):
1705 - chunkinfo = "%s" % chunk
1706 - else:
1707 - chunkinfo = ""
1708 - history = runner.dumpDir.publicPath("stub-meta-history" + chunkinfo + ".xml.gz")
1709 - return history
1710 -
1711 - def buildCurrentOutputFilename(self, runner, chunk = 0):
1712 - if (chunk):
1713 - chunkinfo = "%s" % chunk
1714 - else:
1715 - chunkinfo = ""
1716 - current = runner.dumpDir.publicPath("stub-meta-current" + chunkinfo + ".xml.gz")
1717 - return current
1718 -
1719 - def buildArticlesOutputFilename(self, runner, chunk = 0):
1720 - if (chunk):
1721 - chunkinfo = "%s" % chunk
1722 - else:
1723 - chunkinfo = ""
1724 - articles = runner.dumpDir.publicPath("stub-articles" + chunkinfo + ".xml.gz")
1725 - return articles
1726 -
1727 - def buildOutputFilenames(self, runner, chunk = 0):
1728 - history = self.buildHistoryOutputFilename(runner, chunk)
1729 - current = self.buildCurrentOutputFilename(runner, chunk)
1730 - articles = self.buildArticlesOutputFilename(runner, chunk)
1731 - return([ history, current, articles ])
1732 -
17332541 def run(self, runner):
17342542 commands = []
1735 - if self._chunks:
1736 - if (self._chunkToDo):
1737 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
1738 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
1739 - self.cleanupOldFiles(runner,self._chunkToDo)
1740 - series = self.buildCommand(runner, self._chunkToDo)
 2543+ self.cleanupOldFiles(runner.dumpDir)
 2544+ files = self.listOutputFilesForBuildCommand(runner.dumpDir)
 2545+ for f in files:
 2546+ # choose arbitrarily one of the dumpNames we do (= articlesDumpName)
 2547+ # buildcommand will figure out the files for the rest
 2548+ if (f.dumpName == self.articlesDumpName):
 2549+ series = self.buildCommand(runner, f)
17412550 commands.append(series)
1742 - else:
1743 - for i in range(1, len(self._chunks)+1):
1744 - self.cleanupOldFiles(runner,i)
1745 - series = self.buildCommand(runner, i)
1746 - commands.append(series)
1747 - else:
1748 - self.cleanupOldFiles(runner)
1749 - series = self.buildCommand(runner)
1750 - commands.append(series)
17512551 error = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner)
17522552 if (error):
17532553 raise BackupError("error producing stub files" % self._subset)
17542554
1755 -class RecombineXmlStub(XmlStub):
1756 - def __init__(self, name, desc, chunks):
1757 - XmlStub.__init__(self, name, desc, False, chunks)
1758 - # this is here only so that a callback can capture output from some commands
1759 - # related to recombining files if we did parallel runs of the recompression
1760 - self._output = None
 2555+class RecombineXmlStub(Dump):
 2556+ def __init__(self, name, desc, itemForXmlStubs):
 2557+ self.itemForXmlStubs = itemForXmlStubs
 2558+ Dump.__init__(self, name, desc)
 2559+ # the input may have checkpoints but the output will not.
 2560+ self._checkpointsEnabled = False
17612561
1762 - def listInputFiles(self, runner):
1763 - return(XmlStub.listOutputFiles(self, runner))
 2562+ def detail(self):
 2563+ return "These files contain no page text, only revision metadata."
17642564
1765 - def listOutputFiles(self, runner):
1766 - return ["stub-meta-history.xml.gz",
1767 - "stub-meta-current.xml.gz",
1768 - "stub-articles.xml.gz"]
 2565+ def listDumpNames(self):
 2566+ return self.itemForXmlStubs.listDumpNames()
17692567
 2568+ def getFileType(self):
 2569+ return self.itemForXmlStubs.getFileType()
 2570+
 2571+ def getFileExt(self):
 2572+ return self.itemForXmlStubs.getFileExt()
 2573+
 2574+ def getDumpName(self):
 2575+ return self.itemForXmlStubs.getDumpName()
 2576+
17702577 def run(self, runner):
17712578 error=0
1772 - if (self._chunks):
1773 - files = self.listInputFiles(runner)
1774 - outputFileList = self.listOutputFiles(runner)
1775 - for outputFile in outputFileList:
1776 - inputFiles = []
1777 - for inFile in files:
1778 - (base, rest) = inFile.split('.',1)
1779 - base = re.sub("\d+$", "", base)
1780 - if base + "." + rest == outputFile:
1781 - inputFiles.append(inFile)
1782 - if not len(inputFiles):
1783 - self.setStatus("failed")
1784 - raise BackupError("No input files for %s found" % self.name)
1785 - if (not exists( runner.wiki.config.gzip ) ):
1786 - raise BackupError("gzip command %s not found" % runner.wiki.config.gzip);
1787 - compressionCommand = runner.wiki.config.gzip
1788 - compressionCommand = "%s > " % runner.wiki.config.gzip
1789 - uncompressionCommand = [ "%s" % runner.wiki.config.gzip, "-dc" ]
1790 - recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand )
1791 - recombineCommand = [ recombineCommandString ]
1792 - recombinePipeline = [ recombineCommand ]
1793 - series = [ recombinePipeline ]
1794 - result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
1795 - if result:
1796 - error = result
 2579+ files = self.itemForXmlStubs.listOutputFilesForInput(runner.dumpDir)
 2580+ outputFileList = self.listOutputFilesForBuildCommand(runner.dumpDir, self.listDumpNames())
 2581+ for outputFileObj in outputFileList:
 2582+ inputFiles = []
 2583+ for inFile in files:
 2584+ if inFile.dumpName == outputFileObj.dumpName:
 2585+ inputFiles.append(inFile)
 2586+ if not len(inputFiles):
 2587+ self.setStatus("failed")
 2588+ raise BackupError("No input files for %s found" % self.name())
 2589+ if (not exists( runner.wiki.config.gzip ) ):
 2590+ raise BackupError("gzip command %s not found" % runner.wiki.config.gzip)
 2591+ compressionCommand = runner.wiki.config.gzip
 2592+ compressionCommand = "%s > " % runner.wiki.config.gzip
 2593+ uncompressionCommand = [ "%s" % runner.wiki.config.gzip, "-dc" ]
 2594+ recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFileObj, compressionCommand, uncompressionCommand )
 2595+ recombineCommand = [ recombineCommandString ]
 2596+ recombinePipeline = [ recombineCommand ]
 2597+ series = [ recombinePipeline ]
 2598+ result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 2599+ if result:
 2600+ error = result
17972601 if (error):
17982602 raise BackupError("error recombining stub files")
17992603
@@ -1801,29 +2605,27 @@
18022606
18032607 def __init__(self, desc, chunks = False):
18042608 Dump.__init__(self, "xmlpagelogsdump", desc)
1805 - self._chunks = chunks
18062609
18072610 def detail(self):
18082611 return "This contains the log of actions performed on pages."
18092612
1810 - def listOutputFiles(self, runner):
1811 - return ["pages-logging.xml.gz"]
 2613+ def getDumpName(self):
 2614+ return("pages-logging")
18122615
1813 - def cleanupOldFiles(self, runner):
1814 - if (runner._cleanupOldFilesEnabled):
1815 - logging = self.buildOutputFilename(runner)
1816 - if exists(logging):
1817 - runner.removeFile(logging)
 2616+ def getFileType(self):
 2617+ return "xml"
18182618
1819 - def buildOutputFilename(self, runner):
1820 - logging = runner.dumpDir.publicPath("pages-logging.xml.gz")
1821 - return logging
 2619+ def getFileExt(self):
 2620+ return "gz"
18222621
18232622 def run(self, runner):
1824 - self.cleanupOldFiles(runner)
1825 - logging = self.buildOutputFilename(runner)
 2623+ self.cleanupOldFiles(runner.dumpDir)
 2624+ files = self.listOutputFilesForBuildCommand(runner.dumpDir)
 2625+ if (len(files) > 1):
 2626+ raise BackupError("logging table job wants to produce more than one output file")
 2627+ logging = runner.dumpDir.filenamePublicPath(files[0])
18262628 if (not exists( runner.wiki.config.php ) ):
1827 - raise BackupError("php command %s not found" % runner.wiki.config.php);
 2629+ raise BackupError("php command %s not found" % runner.wiki.config.php)
18282630 command = [ "%s" % runner.wiki.config.php,
18292631 "-q", "%s/maintenance/dumpBackup.php" % runner.wiki.config.wikiDir,
18302632 "--wiki=%s" % runner.dbName,
@@ -1838,120 +2640,132 @@
18392641
18402642 class XmlDump(Dump):
18412643 """Primary XML dumps, one section at a time."""
1842 - def __init__(self, subset, name, desc, detail, prefetch, spawn, chunkToDo, chunks = False):
1843 - Dump.__init__(self, name, desc)
 2644+ def __init__(self, subset, name, desc, detail, itemForStubs, prefetch, spawn, wiki, chunkToDo, chunks = False, checkpoints = False):
18442645 self._subset = subset
18452646 self._detail = detail
18462647 self._desc = desc
18472648 self._prefetch = prefetch
18482649 self._spawn = spawn
18492650 self._chunks = chunks
 2651+ if self._chunks:
 2652+ self._chunksEnabled = True
18502653 self._pageID = {}
18512654 self._chunkToDo = chunkToDo
18522655
1853 - def detail(self):
1854 - """Optionally return additional text to appear under the heading."""
1855 - return self._detail
 2656+ self.wiki = wiki
 2657+ self.itemForStubs = itemForStubs
 2658+ if checkpoints:
 2659+ self._checkpointsEnabled = True
 2660+ Dump.__init__(self, name, desc)
18562661
1857 - def _file(self, ext, chunk=0):
1858 - if (chunk):
1859 - return "pages-" + self._subset + ("%s.xml." % chunk) + ext
1860 - else:
1861 - return "pages-" + self._subset + ".xml." + ext
 2662+ def getDumpNameBase(self):
 2663+ return('pages-')
18622664
1863 - def _path(self, runner, ext, chunk = 0):
1864 - return runner.dumpDir.publicPath(self._file(ext, chunk))
 2665+ def getDumpName(self):
 2666+ return(self.getDumpNameBase() + self._subset)
18652667
 2668+ def getFileType(self):
 2669+ return "xml"
 2670+
 2671+ def getFileExt(self):
 2672+ return "bz2"
 2673+
18662674 def run(self, runner):
18672675 commands = []
1868 - if (self._chunks):
1869 - if (self._chunkToDo):
1870 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
1871 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
1872 - series = self.buildCommand(runner, self._chunkToDo)
1873 - commands.append(series)
1874 - else:
1875 - for i in range(1, len(self._chunks)+1):
1876 - series = self.buildCommand(runner, i)
1877 - commands.append(series)
1878 - else:
1879 - series = self.buildCommand(runner)
 2676+ self.cleanupOldFiles(runner.dumpDir)
 2677+ # just get the files pertaining to our dumpName, which is *one* of articles, pages-current, pages-history.
 2678+ # stubs include all of them together.
 2679+ # FIXME this needs some other jobname here. uuuggghhh and how would this job know what that is? bah
 2680+ if not self.dumpName.startswith(self.getDumpNameBase()):
 2681+ raise BackupError("dumpName %s of unknown form for this job" % self.dumpName)
 2682+ dumpName = self.dumpName[len(self.getDumpNameBase()):]
 2683+ stubDumpNames = self.itemForStubs.listDumpNames()
 2684+ for s in stubDumpNames:
 2685+ if s.endswith(dumpName):
 2686+ stubDumpName = s
 2687+ inputFiles = self.itemForStubs.listOutputFilesForInput(runner.dumpDir, [ stubDumpName ])
 2688+ if self._chunksEnabled and self._chunkToDo:
 2689+ # reset inputfiles to just have the one we want.
 2690+ for f in inputFiles:
 2691+ if f.chunkInt == self._chunkToDo:
 2692+ inputFiles = [ f ]
 2693+ break
 2694+ if len(inputFiles) > 1:
 2695+ raise BackupError("Trouble finding stub files for xml dump run")
 2696+ for f in inputFiles:
 2697+ # we should convert the input file to an output file I guess
 2698+ # we write regular files
 2699+ outputFile = DumpFilename(runner.wiki, f.date, f.dumpName, f.fileType, self.fileExt)
 2700+ series = self.buildCommand(runner, f)
18802701 commands.append(series)
 2702+
18812703 error = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner)
1882 -
18832704 truncationError = self.checkForTruncatedFiles(runner)
1884 -
18852705 if (error or truncationError):
1886 - raise BackupError("error producing xml bz2 file(s) %s" % self._subset)
 2706+ raise BackupError("error producing xml file(s) %s" % self.dumpName)
18872707
18882708 def checkForTruncatedFiles(self, runner):
18892709 if runner._checkForTruncatedFilesEnabled:
1890 - if (not exists( runner.wiki.config.checkforbz2footer ) ):
1891 - raise BackupError("checkforbz2footer command %s not found" % runner.wiki.config.checkforbz2footer);
1892 - checkforbz2footer = "%s" % runner.wiki.config.checkforbz2footer
1893 - if exists(checkforbz2footer):
1894 - # check to see if any of the output files are truncated
1895 - files = []
1896 - if (self._chunks):
1897 - if (self._chunkToDo):
1898 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
1899 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
1900 - files.append( self._path(runner, 'bz2', self._chunkToDo ) )
1901 - else:
1902 - for i in range(1, len(self._chunks)+1):
1903 - files.append( self._path(runner, 'bz2', i ) )
1904 -
1905 - for f in files:
1906 - pipeline = []
1907 - pipeline.append([ checkforbz2footer, f ])
1908 - p = CommandPipeline(pipeline, quiet=True)
1909 - p.runPipelineAndGetOutput()
1910 - if not p.exitedSuccessfully():
1911 - runner.logAndPrint("file %s is truncated, moving out of the way" %f )
1912 - os.rename( f, f + ".truncated" )
1913 - return 1
 2710+ # check to see if any of the output files are truncated
 2711+ if self._checkpointsEnabled:
 2712+ files = self.listCheckpointFilesPerChunkExisting(runner.dumpDir, self.getChunkList(), [ self.dumpName ])
 2713+ else:
 2714+ files = self.listRegularFilesPerChunkExisting(runner.dumpDir, self.getChunkList(), [ self.dumpName ])
 2715+ for f in files:
 2716+ f = DumpFile(runner.wiki,runner.dumpDir.filenamePublicPath(f))
 2717+ if (f.checkIfTruncated()):
 2718+ runner.logAndPrint("file %s is truncated, moving out of the way" % f.filename )
 2719+ f.rename( f.filename + ".truncated" )
 2720+ return 1
19142721 return 0
19152722
19162723 def buildEta(self, runner):
19172724 """Tell the dumper script whether to make ETA estimate on page or revision count."""
19182725 return "--current"
19192726
1920 - def buildFilters(self, runner, chunk = 0):
 2727+ # takes name of the output file
 2728+ def buildFilters(self, runner, f):
19212729 """Construct the output filter options for dumpTextPass.php"""
1922 - xmlbz2 = self._path(runner, "bz2", chunk)
 2730+ # do we need checkpoints? ummm
 2731+ xmlbz2 = runner.dumpDir.filenamePublicPath(f)
 2732+
19232733 if (not exists( runner.wiki.config.bzip2 ) ):
1924 - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2);
 2734+ raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2)
19252735 if runner.wiki.config.bzip2[-6:] == "dbzip2":
19262736 bz2mode = "dbzip2"
19272737 else:
19282738 bz2mode = "bzip2"
19292739 return "--output=%s:%s" % (bz2mode, xmlbz2)
19302740
1931 - def buildCommand(self, runner, chunk=0):
 2741+ def buildCommand(self, runner, f):
19322742 """Build the command line for the dump, minus output and filter options"""
19332743
1934 - if (chunk):
1935 - chunkinfo = "%s" % chunk
 2744+ if (self._checkpointsEnabled):
 2745+ # we write a temp file, it will be checkpointed every so often.
 2746+ outputFile = DumpFilename(runner.wiki, f.date, self.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, temp = True)
19362747 else:
1937 - chunkinfo =""
 2748+ # we write regular files
 2749+ outputFile = DumpFilename(runner.wiki, f.date, self.dumpName, f.fileType, self.fileExt, f.chunk, checkpoint = False, temp = False)
19382750
19392751 # Page and revision data pulled from this skeleton dump...
1940 - stub = "stub-%s" % self._subset
1941 - stub = stub + "%s.xml.gz" % chunkinfo
1942 - stub = runner.dumpDir.publicPath(stub),
1943 - stubOption = "--stub=gzip:%s" % stub
 2752+ stubOption = "--stub=gzip:%s" % runner.dumpDir.filenamePublicPath(f)
19442753
19452754 # Try to pull text from the previous run; most stuff hasn't changed
19462755 #Source=$OutputDir/pages_$section.xml.bz2
19472756 sources = []
19482757 possibleSources = None
19492758 if self._prefetch:
1950 - possibleSources = self._findPreviousDump(runner, chunk)
 2759+ possibleSources = self._findPreviousDump(runner, f.chunk)
19512760 # if we have a list of more than one then we need to check existence for each and put them together in a string
19522761 if possibleSources:
19532762 for sourceFile in possibleSources:
1954 - if exists(sourceFile):
1955 - sources.append(sourceFile)
 2763+ s = runner.dumpDir.filenamePublicPath(sourceFile, sourceFile.date)
 2764+ if exists(s):
 2765+ sources.append(s)
 2766+ if (f.chunk):
 2767+ chunkinfo = "%s" % f.chunk
 2768+ else:
 2769+ chunkinfo =""
19562770 if (len(sources) > 0):
19572771 source = "bzip2:%s" % (";".join(sources) )
19582772 runner.showRunnerState("... building %s %s XML dump, with text prefetch from %s..." % (self._subset, chunkinfo, source))
@@ -1966,59 +2780,33 @@
19672781 spawn = None
19682782
19692783 if (not exists( runner.wiki.config.php ) ):
1970 - raise BackupError("php command %s not found" % runner.wiki.config.php);
 2784+ raise BackupError("php command %s not found" % runner.wiki.config.php)
 2785+
 2786+ if (self.wiki.config.checkpointTime):
 2787+ checkpointTime = "--maxtime=%s" % (runner.wiki.config.checkpointTime)
 2788+ checkpointFile = "--checkpointfile=%s" % outputFile.newFilename(outputFile.dumpName, outputFile.fileType, outputFile.fileExt, outputFile.date, outputFile.chunk, "p%sp%s", None)
 2789+ else:
 2790+ checkpointTime = None
 2791+ checkpointFile = None
19712792 dumpCommand = [ "%s" % runner.wiki.config.php,
19722793 "-q", "%s/maintenance/dumpTextPass.php" % runner.wiki.config.wikiDir,
19732794 "--wiki=%s" % runner.dbName,
19742795 "%s" % stubOption,
19752796 "%s" % prefetch,
19762797 "%s" % runner.forceNormalOption(),
 2798+ "%s" % checkpointTime,
 2799+ "%s" % checkpointFile,
19772800 "--report=1000",
19782801 "%s" % spawn ]
 2802+
19792803 command = dumpCommand
1980 - filters = self.buildFilters(runner, chunk)
 2804+ filters = self.buildFilters(runner, outputFile)
19812805 eta = self.buildEta(runner)
19822806 command.extend([ filters, eta ])
19832807 pipeline = [ command ]
19842808 series = [ pipeline ]
19852809 return series
19862810
1987 - # given filename, (assume bz2 compression) dig out the first page id in that file
1988 - def findFirstPageIDInFile(self, runner, fileName):
1989 - if (fileName in self._pageID):
1990 - return self._pageID[fileName]
1991 - pageID = None
1992 - pipeline = []
1993 - if (not exists( runner.wiki.config.bzip2 ) ):
1994 - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2);
1995 - uncompressionCommand = [ "%s" % runner.wiki.config.bzip2, "-dc", fileName ]
1996 - pipeline.append(uncompressionCommand)
1997 - # warning: we figure any header (<siteinfo>...</siteinfo>) is going to be less than 2000 lines!
1998 - if (not exists( runner.wiki.config.head ) ):
1999 - raise BackupError("head command %s not found" % runner.wiki.config.head);
2000 - head = runner.wiki.config.head
2001 - headEsc = MiscUtils.shellEscape(head)
2002 - pipeline.append([ head, "-2000"])
2003 - # without shell
2004 - p = CommandPipeline(pipeline, quiet=True)
2005 - p.runPipelineAndGetOutput()
2006 - if (p.output()):
2007 - pageData = p.output()
2008 - titleAndIDPattern = re.compile('<title>(?P<title>.+?)</title>\s*' + '<id>(?P<pageid>\d+?)</id>')
2009 - result = titleAndIDPattern.search(pageData)
2010 - if (result):
2011 - pageID = result.group('pageid')
2012 - self._pageID[fileName] = pageID
2013 - return(pageID)
2014 -
2015 -
2016 - def filenameHasChunk(self, filename, ext):
2017 - fileNamePattern = re.compile('.*pages-' + self._subset + '[0-9]+.xml.' + ext +'$')
2018 - if (fileNamePattern.match(filename)):
2019 - return True
2020 - else:
2021 - return False
2022 -
20232811 # taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net)
20242812 def sort_nicely(self, l):
20252813 """ Sort the given list in the way that humans expect.
@@ -2027,149 +2815,149 @@
20282816 alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ]
20292817 l.sort( key=alphanum_key )
20302818
 2819+ def getRelevantPrefetchFiles(self, fileList, startPageID, endPageID, date, runner):
 2820+ possibles = []
 2821+ if (len(fileList)):
 2822+ # (a) nasty hack, see below (b)
 2823+ maxchunks = 0
 2824+ for fileObj in fileList:
 2825+ if fileObj.isChunkFile and fileObj.chunkInt > maxchunks:
 2826+ maxchunks = fileObj.chunkInt
 2827+ if not fileObj.firstPageID:
 2828+ f = DumpFile(self.wiki, runner.dumpDir.filenamePublicPath(fileObj, date), fileObj)
 2829+ fileObj.firstPageID = f.findFirstPageIDInFile()
 2830+
 2831+ # get the files that cover our range
 2832+ for fileObj in fileList:
 2833+ firstPageIdInFile = int(fileObj.firstPageID)
 2834+
 2835+ # fixme what do we do here? this could be very expensive. is that
 2836+ # worth it??
 2837+ if not fileObj.lastPageID:
 2838+ # (b) nasty hack, see (a)
 2839+ # it's not a chekcpoint fle or we'd have the pageid in the filename
 2840+ # so... temporary hack which will give expensive results
 2841+ # if chunk file, and it's the last chunk, put none
 2842+ # if it's not the last chunk, get the first pageid in the next chunk and subtract 1
 2843+ # if not chunk, put none.
 2844+ if fileObj.isChunkFile and fileObj.chunkInt < maxchunks:
 2845+ for f in fileList:
 2846+ if f.chunkInt == fileObj.chunkInt + 1:
 2847+ # not true! this could be a few past where it really is
 2848+ # (because of deleted pages that aren't included at all)
 2849+ fileObj.lastPageID = str(int(f.firstPageID) - 1)
 2850+ if fileObj.lastPageID:
 2851+ lastPageIdInFile = int(fileObj.lastPageID)
 2852+ else:
 2853+ lastPageIdInFile = None
 2854+
 2855+ # FIXME there is no point in including files that have just a few rev ids in them
 2856+ # that we need, and having to read through the whole file... could take
 2857+ # hours or days (later it won't matter, right? but until a rewrite, this is important)
 2858+ # also be sure that if a critical page is deleted by the time we try to figure out ranges,
 2859+ # that we don't get hosed
 2860+ if ( firstPageIdInFile <= int(startPageID) and (lastPageIdInFile == None or lastPageIdInFile >= int(startPageID)) ) or ( firstPageIdInFile >= int(startPageID) and ( endPageID == None or firstPageIdInFile <= int(endPageID) ) ):
 2861+ possibles.append(fileObj)
 2862+ return possibles
20312863
20322864 # this finds the content file or files from the first previous successful dump
20332865 # to be used as input ("prefetch") for this run.
2034 - def _findPreviousDump(self, runner, chunk = 0):
 2866+ def _findPreviousDump(self, runner, chunk = None):
20352867 """The previously-linked previous successful dump."""
2036 - bzfile = self._file("bz2")
20372868 if (chunk):
2038 - startPageID = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1
2039 - if (len(self._chunks) > chunk):
2040 - endPageID = sum([ self._chunks[i] for i in range(0,chunk)])
 2869+ startPageID = sum([ self._chunks[i] for i in range(0,int(chunk)-1)]) + 1
 2870+ if (len(self._chunks) > int(chunk)):
 2871+ endPageID = sum([ self._chunks[i] for i in range(0,int(chunk))])
20412872 else:
20422873 endPageID = None
2043 - # we will look for the first chunk file, if it's there and the
2044 - # status of the job is ok then we will get the rest of the info
2045 - bzfileChunk = self._file("bz2", 1)
2046 - bzfileGlob = self._file("bz2", '[1-9]*')
2047 - currentChunk = realpath(runner.dumpDir.publicPath(bzfile))
2048 - current = realpath(runner.dumpDir.publicPath(bzfile))
 2874+ else:
 2875+ startPageID = 1
 2876+ endPageID = None
 2877+
20492878 dumps = runner.wiki.dumpDirs()
20502879 dumps.sort()
20512880 dumps.reverse()
20522881 for date in dumps:
2053 - base = runner.wiki.publicDir()
2054 - # first see if a "chunk" file is there, if not we will accept
2055 - # using the the single file dump although it will be slower
2056 - possibles = []
2057 - oldChunk = None
2058 - # scan all the existing chunk files and get the first page ID from each
2059 - if (chunk):
2060 - oldChunk = runner.dumpDir.buildPath(base, date, bzfileChunk)
2061 - oldGlob = runner.dumpDir.buildPath(base, date, bzfileGlob)
2062 - pageIDs = []
2063 - bzfileChunks = glob.glob(oldGlob)
2064 - self.sort_nicely(bzfileChunks)
2065 - if (bzfileChunks):
2066 - for fileName in bzfileChunks:
2067 - pageID = self.findFirstPageIDInFile(runner, fileName )
2068 - if (pageID):
2069 - pageIDs.append(pageID)
 2882+ if (date == self.wiki.date):
 2883+ runner.debug("skipping current dump for prefetch of job %s, date %s" % (self.name(), self.wiki.date))
 2884+ continue
20702885
2071 - old = runner.dumpDir.buildPath(base, date, bzfile)
2072 - if (oldChunk):
2073 - if exists(oldChunk):
2074 - possibles.append(oldChunk)
2075 - if (old):
2076 - if exists(old):
2077 - possibles.append(old)
 2886+ # see if this job from that date was successful
 2887+ if not runner.runInfoFile.statusOfOldDumpIsDone(runner, date, self.name(), self._desc):
 2888+ runner.debug("skipping incomplete or failed dump for prefetch date %s" % date)
 2889+ continue
20782890
2079 - for possible in possibles:
2080 - if exists(possible):
2081 - size = getsize(possible)
2082 - if size < 70000:
2083 - runner.debug("small %d-byte prefetch dump at %s, skipping" % (size, possible))
2084 - continue
2085 - if realpath(old) == current:
2086 - runner.debug("skipping current dump for prefetch %s" % possible)
2087 - continue
2088 - if not runner.runInfoFile.statusOfOldDumpIsDone(runner, date, self.name, self._desc):
2089 - runner.debug("skipping incomplete or failed dump for prefetch %s" % possible)
2090 - continue
2091 - if (chunk) and (self.filenameHasChunk(possible, "bz2")):
2092 - runner.debug("Prefetchable %s etc." % possible)
2093 - else:
2094 - runner.debug("Prefetchable %s" % possible)
2095 - # found something workable, now check the chunk situation
2096 - if (chunk):
2097 - if (self.filenameHasChunk(possible, "bz2")):
2098 - if len(pageIDs) > 0:
2099 - possibleStartNum = None
2100 - for i in range(len(pageIDs)):
2101 - if int(pageIDs[i]) <= int(startPageID):
2102 - # chunk number of file starts at 1.
2103 - possibleStartNum = i+1
2104 - else:
2105 - break;
2106 - if possibleStartNum:
2107 - possibleEndNum = possibleStartNum
2108 - for j in range(i,len(pageIDs)):
2109 - if (not endPageID) or (int(pageIDs[j]) <= int(endPageID)):
2110 - # chunk number of file starts at 1.
2111 - possibleEndNum = j + 1
2112 - else:
2113 - break
2114 - # now we have the range of the relevant files, put together the list.
2115 - possible = [ runner.dumpDir.buildPath(base, date, self._file("bz2", k)) for k in range(possibleStartNum,possibleEndNum+1) ]
2116 - return possible
2117 - else:
2118 - continue
2119 -
2120 - return [ possible ]
 2891+ # first check if there are checkpoint files from this run we can use
 2892+ files = self.listCheckpointFilesExisting(runner.dumpDir, [ self.dumpName ], date, chunks = None)
 2893+ possiblePrefetchList = self.getRelevantPrefetchFiles(files, startPageID, endPageID, date, runner)
 2894+ if (len(possiblePrefetchList)):
 2895+ return(possiblePrefetchList)
 2896+
 2897+ # ok, let's check for chunk files instead, from any run (may not conform to our numbering
 2898+ # for this job)
 2899+ files = self.listRegularFilesExisting(runner.dumpDir,[ self.dumpName ], date, chunks = True)
 2900+ possiblePrefetchList = self.getRelevantPrefetchFiles(files, startPageID, endPageID, date, runner)
 2901+ if (len(possiblePrefetchList)):
 2902+ return(possiblePrefetchList)
 2903+
 2904+ # last shot, get output file that contains all the pages, if there is one
 2905+ files = self.listRegularFilesExisting(runner.dumpDir, [ self.dumpName ], date, chunks = False)
 2906+ # there is only one, don't bother to check for relevance :-P
 2907+ possiblePrefetchList = files
 2908+ files = []
 2909+ for p in possiblePrefetchList:
 2910+ possible = runner.dumpDir.filenamePublicPath(p, date)
 2911+ size = os.path.getsize(possible)
 2912+ if size < 70000:
 2913+ runner.debug("small %d-byte prefetch dump at %s, skipping" % (size, possible))
 2914+ continue
 2915+ else:
 2916+ files.append(p)
 2917+ if (len(files)):
 2918+ return(files)
 2919+
21212920 runner.debug("Could not locate a prefetchable dump.")
21222921 return None
21232922
2124 - def listOutputFiles(self, runner):
2125 - if (self._chunks):
2126 - files = []
2127 - for i in range(1, len(self._chunks)+1):
2128 - files.append(self._file("bz2",i))
2129 - return files
2130 - else:
2131 - return [ self._file("bz2",0) ]
2132 -
21332923 class RecombineXmlDump(XmlDump):
2134 - def __init__(self, subset, name, desc, detail, chunks = False):
 2924+ def __init__(self, name, desc, detail, itemForXmlDumps):
21352925 # no prefetch, no spawn
2136 - XmlDump.__init__(self, subset, name, desc, detail, None, None, False, chunks)
2137 - # this is here only so that a callback can capture output from some commands
2138 - # related to recombining files if we did parallel runs of the recompression
2139 - self._output = None
 2926+ self.itemForXmlDumps = itemForXmlDumps
 2927+ self._detail = detail
 2928+ Dump.__init__(self, name, desc)
 2929+ # the input may have checkpoints but the output will not.
 2930+ self._checkpointsEnabled = False
21402931
2141 - def listInputFiles(self, runner):
2142 - return XmlDump.listOutputFiles(self,runner)
 2932+ def listDumpNames(self):
 2933+ return self.itemForXmlDumps.listDumpNames()
21432934
2144 - def listOutputFiles(self, runner):
2145 - return [ self._file("bz2",0) ]
 2935+ def getFileType(self):
 2936+ return self.itemForXmlDumps.getFileType()
21462937
 2938+ def getFileExt(self):
 2939+ return self.itemForXmlDumps.getFileExt()
 2940+
 2941+ def getDumpName(self):
 2942+ return self.itemForXmlDumps.getDumpName()
 2943+
21472944 def run(self, runner):
 2945+ files = self.itemForXmlDumps.listOutputFilesForInput(runner.dumpDir)
 2946+ outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir)
 2947+ if (len(outputFiles) > 1):
 2948+ raise BackupError("recombine XML Dump trying to produce more than one output file")
 2949+
21482950 error=0
2149 - if (self._chunks):
2150 - files = self.listInputFiles(runner)
2151 - outputFileList = self.listOutputFiles(runner)
2152 - for outputFile in outputFileList:
2153 - inputFiles = []
2154 - for inFile in files:
2155 - (base, rest) = inFile.split('.',1)
2156 - base = re.sub("\d+$", "", base)
2157 - if base + "." + rest == outputFile:
2158 - inputFiles.append(inFile)
2159 - if not len(inputFiles):
2160 - self.setStatus("failed")
2161 - raise BackupError("No input files for %s found" % self.name)
2162 - if (not exists( runner.wiki.config.bzip2 ) ):
2163 - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2);
2164 - compressionCommand = runner.wiki.config.bzip2
2165 - compressionCommand = "%s > " % runner.wiki.config.bzip2
2166 - uncompressionCommand = [ "%s" % runner.wiki.config.bzip2, "-dc" ]
2167 - recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand )
2168 - recombineCommand = [ recombineCommandString ]
2169 - recombinePipeline = [ recombineCommand ]
2170 - series = [ recombinePipeline ]
2171 - result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
2172 - if result:
2173 - error = result
 2951+ if (not exists( runner.wiki.config.bzip2 ) ):
 2952+ raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2)
 2953+ compressionCommand = runner.wiki.config.bzip2
 2954+ compressionCommand = "%s > " % runner.wiki.config.bzip2
 2955+ uncompressionCommand = [ "%s" % runner.wiki.config.bzip2, "-dc" ]
 2956+ recombineCommandString = self.buildRecombineCommandString(runner, files, outputFiles[0], compressionCommand, uncompressionCommand )
 2957+ recombineCommand = [ recombineCommandString ]
 2958+ recombinePipeline = [ recombineCommand ]
 2959+ series = [ recombinePipeline ]
 2960+ error = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 2961+
21742962 if (error):
21752963 raise BackupError("error recombining xml bz2 files")
21762964
@@ -2184,171 +2972,174 @@
21852973 class XmlRecompressDump(Dump):
21862974 """Take a .bz2 and recompress it as 7-Zip."""
21872975
2188 - def __init__(self, subset, name, desc, detail, chunkToDo, chunks = False):
2189 - Dump.__init__(self, name, desc)
 2976+ def __init__(self, subset, name, desc, detail, itemForRecompression, wiki, chunkToDo, chunks = False, checkpoints = False):
21902977 self._subset = subset
21912978 self._detail = detail
21922979 self._chunks = chunks
 2980+ if self._chunks:
 2981+ self._chunksEnabled = True
21932982 self._chunkToDo = chunkToDo
 2983+ self.wiki = wiki
 2984+ self.itemForRecompression = itemForRecompression
 2985+ if checkpoints:
 2986+ self._checkpointsEnabled = True
 2987+ Dump.__init__(self, name, desc)
21942988
2195 - def detail(self):
2196 - """Optionally return additional text to appear under the heading."""
2197 - return self._detail
 2989+ def getDumpName(self):
 2990+ return "pages-" + self._subset
21982991
2199 - def _file(self, ext, chunk = 0):
2200 - if (chunk):
2201 - return "pages-" + self._subset + ("%s.xml." % chunk) + ext
2202 - else:
2203 - return "pages-" + self._subset + ".xml." + ext
 2992+ def getFileType(self):
 2993+ return "xml"
22042994
2205 - def _path(self, runner, ext, chunk=0):
2206 - return runner.dumpDir.publicPath(self._file(ext,chunk))
 2995+ def getFileExt(self):
 2996+ return "7z"
22072997
2208 - def buildOutputFilename(self, runner, chunk=0):
2209 - if (chunk):
2210 - xml7z = self._path(runner, "7z", chunk)
2211 - else:
2212 - xml7z = self._path(runner, "7z")
2213 - return(xml7z)
2214 -
2215 - def getInputFilename(self, runner, chunk):
2216 - if (chunk):
2217 - xmlbz2 = self._path(runner, "bz2", chunk)
2218 - else:
2219 - xmlbz2 = self._path(runner, "bz2")
2220 - return(xmlbz2)
2221 -
2222 - def buildCommand(self, runner, chunk = 0):
2223 - xmlbz2 = self.getInputFilename(runner, chunk)
2224 - xml7z = self.buildOutputFilename(runner, chunk)
2225 -
 2998+ # output files is a list of checkpoint files, otherwise it is a list of one file.
 2999+ # checkpoint files get done one at a time. we can't really do parallel recompression jobs of
 3000+ # 200 files, right?
 3001+ def buildCommand(self, runner, outputFiles):
22263002 # FIXME need shell escape
22273003 if (not exists( runner.wiki.config.bzip2 ) ):
2228 - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2);
 3004+ raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2)
22293005 if (not exists( runner.wiki.config.sevenzip ) ):
2230 - raise BackupError("7zip command %s not found" % runner.wiki.config.sevenzip);
2231 - commandPipe = [ [ "%s -dc %s | %s a -si %s" % (runner.wiki.config.bzip2, xmlbz2, runner.wiki.config.sevenzip, xml7z) ] ]
2232 - commandSeries = [ commandPipe ]
 3006+ raise BackupError("7zip command %s not found" % runner.wiki.config.sevenzip)
 3007+
 3008+ commandSeries = []
 3009+ for f in outputFiles:
 3010+ inputFile = DumpFilename(runner.wiki, None, f.dumpName, f.fileType, self.itemForRecompression.fileExt, f.chunk, f.checkpoint)
 3011+ outfile = runner.dumpDir.filenamePublicPath(f)
 3012+ infile = runner.dumpDir.filenamePublicPath(inputFile)
 3013+ commandPipe = [ [ "%s -dc %s | %s a -si %s" % (runner.wiki.config.bzip2, infile, runner.wiki.config.sevenzip, outfile) ] ]
 3014+ commandSeries.append(commandPipe)
22333015 return(commandSeries)
22343016
2235 - def cleanupOldFiles(self, runner, chunk = 0):
2236 - if (runner._cleanupOldFilesEnabled):
2237 - xml7z = self.buildOutputFilename(runner, chunk)
2238 - if exists(xml7z):
2239 - runner.removeFile(xml7z)
2240 -
22413017 def run(self, runner):
22423018 if runner.lastFailed:
22433019 raise BackupError("bz2 dump incomplete, not recompressing")
22443020 commands = []
2245 - if (self._chunks):
2246 - if (self._chunkToDo):
2247 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
2248 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
2249 - self.cleanupOldFiles(runner, self._chunkToDo)
2250 - series = self.buildCommand(runner, self._chunkToDo)
 3021+ # Remove prior 7zip attempts; 7zip will try to append to an existing archive
 3022+ self.cleanupOldFiles(runner.dumpDir)
 3023+ if self._chunksEnabled and not self._chunkToDo:
 3024+ # must set up each parallel job separately, they may have checkpoint files that
 3025+ # need to be processed in series, it's a special case
 3026+ for i in range(1, len(self._chunks)+1):
 3027+ outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir, i)
 3028+ series = self.buildCommand(runner, outputFiles)
22513029 commands.append(series)
2252 - else:
2253 - for i in range(1, len(self._chunks)+1):
2254 - # Clear prior 7zip attempts; 7zip will try to append an existing archive
2255 - self.cleanupOldFiles(runner, i)
2256 - series = self.buildCommand(runner, i)
2257 - commands.append(series)
22583030 else:
2259 - # Clear prior 7zip attempts; 7zip will try to append an existing archive
2260 - self.cleanupOldFiles(runner)
2261 - series = self.buildCommand(runner)
 3031+ outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir)
 3032+ series = self.buildCommand(runner, outputFiles)
22623033 commands.append(series)
 3034+
22633035 error = runner.runCommand(commands, callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
2264 - # temp hack force 644 permissions until ubuntu bug # 370618 is fixed - tomasz 5/1/2009
2265 - # some hacks aren't so temporary - atg 3 sept 2010
2266 - if (self._chunks):
2267 - if (self._chunkToDo):
2268 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
2269 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
2270 - xml7z = self.buildOutputFilename(runner,self._chunkToDo)
2271 - if exists(xml7z):
2272 - os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH )
2273 - else:
2274 - for i in range(1, len(self._chunks)+1):
2275 - xml7z = self.buildOutputFilename(runner,i)
2276 - if exists(xml7z):
2277 - os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH )
2278 - else:
2279 - xml7z = self.buildOutputFilename(runner)
2280 - if exists(xml7z):
2281 - os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH )
22823036 if (error):
22833037 raise BackupError("error recompressing bz2 file(s)")
2284 -
2285 - def listOutputFiles(self, runner):
2286 - if (self._chunks):
2287 - if (self._chunkToDo):
2288 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
2289 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
2290 - files = []
2291 - files.append(self._file("7z",self._chunkToDo))
2292 - return files
2293 - else:
2294 - files = []
2295 - for i in range(1, len(self._chunks)+1):
2296 - files.append(self._file("7z",i))
2297 - return files
 3038+
 3039+ # shows all files possible if we don't have checkpoint files. without temp files of course
 3040+ def listOutputFilesToPublish(self, dumpDir):
 3041+ files = []
 3042+ inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir)
 3043+ for f in inputFiles:
 3044+ files.append(DumpFilename(wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp))
 3045+ return files
 3046+
 3047+ # shows all files possible if we don't have checkpoint files. without temp files of course
 3048+ # only the chunks we are actually supposed to do (if there is a limit)
 3049+ def listOutputFilesToCheckForTruncation(self, dumpDir):
 3050+ files = []
 3051+ inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir)
 3052+ for f in inputFiles:
 3053+ if self._chunkToDo and f.chunkInt != self._chunkToDo:
 3054+ continue
 3055+ files.append(DumpFilename(wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp))
 3056+ return files
 3057+
 3058+ # shows all files possible if we don't have checkpoint files. no temp files.
 3059+ # only the chunks we are actually supposed to do (if there is a limit)
 3060+ def listOutputFilesForBuildCommand(self, dumpDir, chunk = None):
 3061+ files = []
 3062+ inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir)
 3063+ for f in inputFiles:
 3064+ # if this param is set it takes priority
 3065+ if chunk and f.chunkInt != chunk:
 3066+ continue
 3067+ elif self._chunkToDo and f.chunkInt != self._chunkToDo:
 3068+ continue
 3069+ files.append(DumpFilename(wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp))
 3070+ return files
 3071+
 3072+ # shows all files possible if we don't have checkpoint files. should include temp files
 3073+ # does just the chunks we do if there is a limit
 3074+ def listOutputFilesForCleanup(self, dumpDir, dumpNames = None):
 3075+ # some stages (eg XLMStubs) call this for several different dumpNames
 3076+ if (dumpNames == None):
 3077+ dumpNames = [ self.dumpName ]
 3078+ files = []
 3079+ if (self.itemForRecompression._checkpointsEnabled):
 3080+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 3081+ files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 3082+ files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
22983083 else:
2299 - return [ self._file("7z",0) ]
 3084+ # fixme this should be a list
 3085+ # we will pass list of chunks or chunkToDo, or False, depending on the job setup.
 3086+ files.extend(self.listRegularFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames))
 3087+ return files
23003088
2301 - def getCommandOutputCallback(self, line):
2302 - self._output = line
 3089+ # must return all output files that could be produced by a full run of this stage,
 3090+ # not just whatever we happened to produce (if run for one chunk, say)
 3091+ def listOutputFilesForInput(self, dumpDir):
 3092+ files = []
 3093+ inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir)
 3094+ for f in inputFiles:
 3095+ files.append(DumpFilename(wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp))
 3096+ return files
23033097
2304 -class RecombineXmlRecompressDump(XmlRecompressDump):
2305 - def __init__(self, subset, name, desc, detail, chunks):
2306 - XmlRecompressDump.__init__(self, subset, name, desc, detail, False, chunks)
2307 - # this is here only so that a callback can capture output from some commands
2308 - # related to recombining files if we did parallel runs of the recompression
2309 - self._output = None
23103098
2311 - def listInputFiles(self, runner):
2312 - return XmlRecompressDump.listOutputFiles(self,runner)
 3099+class RecombineXmlRecompressDump(Dump):
 3100+ def __init__(self, name, desc, detail, itemForRecombine, wiki):
 3101+ self._detail = detail
 3102+ self._desc = desc
 3103+ self.wiki = wiki
 3104+ self.itemForRecombine = itemForRecombine
 3105+ Dump.__init__(self, name, desc)
 3106+ # the input may have checkpoints but the output will not.
 3107+ self._checkpointsEnabled = False
 3108+ self._chunksEnabled = False
23133109
2314 - def listOutputFiles(self, runner):
2315 - return [ self._file("7z",0) ]
 3110+ def getFileType(self):
 3111+ return self.itemForRecombine.getFileType()
23163112
2317 - def cleanupOldFiles(self, runner):
2318 - if (runner._cleanupOldFilesEnabled):
2319 - files = self.listOutputFiles(runner)
2320 - for filename in files:
2321 - filename = runner.dumpDir.publicPath(filename)
2322 - if exists(filename):
2323 - runner.removeFile(filename)
 3113+ def getFileExt(self):
 3114+ return self.itemForRecombine.getFileExt()
23243115
 3116+ def getDumpName(self):
 3117+ return self.itemForRecombine.getDumpName()
 3118+
23253119 def run(self, runner):
23263120 error = 0
2327 - if (self._chunks):
2328 - self.cleanupOldFiles(runner)
2329 - files = self.listInputFiles(runner)
2330 - outputFileList = self.listOutputFiles(runner)
2331 - for outputFile in outputFileList:
2332 - inputFiles = []
2333 - for inFile in files:
2334 - (base, rest) = inFile.split('.',1)
2335 - base = re.sub("\d+$", "", base)
2336 - if base + "." + rest == outputFile:
2337 - inputFiles.append(inFile)
2338 - if not len(inputFiles):
2339 - self.setStatus("failed")
2340 - raise BackupError("No input files for %s found" % self.name)
2341 - if (not exists( runner.wiki.config.sevenzip ) ):
2342 - raise BackupError("sevenzip command %s not found" % runner.wiki.config.sevenzip);
2343 - compressionCommand = "%s a -si" % runner.wiki.config.sevenzip
2344 - uncompressionCommand = [ "%s" % runner.wiki.config.sevenzip, "e", "-so" ]
 3121+ self.cleanupOldFiles(runner.dumpDir)
 3122+ outputFileList = self.listOutputFilesForBuildCommand(runner.dumpDir)
 3123+ for outputFile in outputFileList:
 3124+ inputFiles = []
 3125+ files = self.itemForRecombine.listOutputFilesForInput(runner.dumpDir)
 3126+ for inFile in files:
 3127+ if inFile.dumpName == outputFile.dumpName:
 3128+ inputFiles.append(inFile)
 3129+ if not len(inputFiles):
 3130+ self.setStatus("failed")
 3131+ raise BackupError("No input files for %s found" % self.name())
 3132+ if (not exists( runner.wiki.config.sevenzip ) ):
 3133+ raise BackupError("sevenzip command %s not found" % runner.wiki.config.sevenzip)
 3134+ compressionCommand = "%s a -si" % runner.wiki.config.sevenzip
 3135+ uncompressionCommand = [ "%s" % runner.wiki.config.sevenzip, "e", "-so" ]
23453136
2346 - recombineCommandString = self.buildRecombineCommandString(runner, files, outputFile, compressionCommand, uncompressionCommand )
2347 - recombineCommand = [ recombineCommandString ]
2348 - recombinePipeline = [ recombineCommand ]
2349 - series = [ recombinePipeline ]
2350 - result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
2351 - if result:
2352 - error = result
 3137+ recombineCommandString = self.buildRecombineCommandString(runner, files, outputFile, compressionCommand, uncompressionCommand )
 3138+ recombineCommand = [ recombineCommandString ]
 3139+ recombinePipeline = [ recombineCommand ]
 3140+ series = [ recombinePipeline ]
 3141+ result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 3142+ if result:
 3143+ error = result
23533144 if (error):
23543145 raise BackupError("error recombining xml bz2 file(s)")
23553146
@@ -2356,69 +3147,79 @@
23573148 """XML dump for Yahoo!'s Active Abstracts thingy"""
23583149
23593150 def __init__(self, name, desc, chunkToDo, chunks = False):
2360 - Dump.__init__(self, name, desc)
23613151 self._chunkToDo = chunkToDo
23623152 self._chunks = chunks
 3153+ if self._chunks:
 3154+ self._chunksEnabled = True
 3155+ Dump.__init__(self, name, desc)
23633156
2364 - def buildCommand(self, runner, chunk = 0):
 3157+ def getDumpName(self):
 3158+ return "abstract"
 3159+
 3160+ def getFileType(self):
 3161+ return "xml"
 3162+
 3163+ def getFileExt(self):
 3164+ # fixme no file extension, see what this means for everything
 3165+ return ""
 3166+
 3167+ def buildCommand(self, runner, f):
23653168 if (not exists( runner.wiki.config.php ) ):
2366 - raise BackupError("php command %s not found" % runner.wiki.config.php);
 3169+ raise BackupError("php command %s not found" % runner.wiki.config.php)
23673170 command = [ "%s" % runner.wiki.config.php,
23683171 "-q", "%s/maintenance/dumpBackup.php" % runner.wiki.config.wikiDir,
23693172 "--wiki=%s" % runner.dbName,
23703173 "--plugin=AbstractFilter:%s/extensions/ActiveAbstract/AbstractFilter.php" % runner.wiki.config.wikiDir,
23713174 "--current", "--report=1000", "%s" % runner.forceNormalOption(),
23723175 ]
2373 - for variant in self._variants(runner):
2374 - command.extend( [ "--output=file:%s" % runner.dumpDir.publicPath(self._variantFile(variant, chunk)),
 3176+
 3177+ for v in self._variants():
 3178+ variantOption = self._variantOption(v)
 3179+ dumpName = self.dumpNameFromVariant(v)
 3180+ fileObj = DumpFilename(runner.wiki, f.date, dumpName, f.fileType, f.fileExt, f.chunk, f.checkpoint)
 3181+ command.extend( [ "--output=file:%s" % runner.dumpDir.filenamePublicPath(fileObj),
23753182 "--filter=namespace:NS_MAIN", "--filter=noredirect",
2376 - "--filter=abstract%s" % self._variantOption(variant) ] )
2377 - if (chunk):
2378 - # set up start end end pageids for this piece
2379 - # note there is no page id 0 I guess. so we start with 1
2380 - # start = runner.pagesPerChunk()*(chunk-1) + 1
2381 - start = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1
2382 - startopt = "--start=%s" % start
2383 - # if we are on the last chunk, we should get up to the last pageid,
2384 - # whatever that is.
2385 - command.append(startopt)
2386 - if chunk < len(self._chunks):
2387 - # end = start + runner.pagesPerChunk()
2388 - end = sum([ self._chunks[i] for i in range(0,chunk)]) +1
2389 - endopt = "--end=%s" % end
2390 - command.append(endopt)
 3183+ "--filter=abstract%s" % variantOption ] )
 3184+ if (f.chunk):
 3185+ # set up start end end pageids for this piece
 3186+ # note there is no page id 0 I guess. so we start with 1
 3187+ # start = runner.pagesPerChunk()*(chunk-1) + 1
 3188+ start = sum([ self._chunks[i] for i in range(0,f.chunkInt-1)]) + 1
 3189+ startopt = "--start=%s" % start
 3190+ # if we are on the last chunk, we should get up to the last pageid,
 3191+ # whatever that is.
 3192+ command.append(startopt)
 3193+ if f.chunkInt < len(self._chunks):
 3194+ # end = start + runner.pagesPerChunk()
 3195+ end = sum([ self._chunks[i] for i in range(0,f.chunkInt)]) +1
 3196+ endopt = "--end=%s" % end
 3197+ command.append(endopt)
23913198 pipeline = [ command ]
23923199 series = [ pipeline ]
23933200 return(series)
23943201
23953202 def run(self, runner):
23963203 commands = []
2397 - if (self._chunks):
2398 - if (self._chunkToDo):
2399 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
2400 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
2401 - series = self.buildCommand(runner, self._chunkToDo)
 3204+ # choose the empty variant to pass to buildcommand, it will fill in the rest if needed
 3205+ outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir)
 3206+ dumpName0 = self.listDumpNames()[0]
 3207+ for f in outputFiles:
 3208+ if f.dumpName == dumpName0:
 3209+ series = self.buildCommand(runner, f)
24023210 commands.append(series)
2403 - else:
2404 - for i in range(1, len(self._chunks)+1):
2405 - series = self.buildCommand(runner, i)
2406 - commands.append(series)
2407 - else:
2408 - series = self.buildCommand(runner)
2409 - commands.append(series)
24103211 error = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner)
24113212 if (error):
24123213 raise BackupError("error producing abstract dump")
24133214
2414 -
2415 - def _variants(self, runner):
2416 - # If the database name looks like it's marked as Chinese language,
2417 - # return a list including Simplified and Traditional versions, so
2418 - # we can build separate files normalized to each orthography.
 3215+ # If the database name looks like it's marked as Chinese language,
 3216+ # return a list including Simplified and Traditional versions, so
 3217+ # we can build separate files normalized to each orthography.
 3218+ def _variants(self):
24193219 if runner.dbName[0:2] == "zh" and runner.dbName[2:3] != "_":
2420 - return ("", "zh-cn", "zh-tw")
 3220+ variants = [ "", "zh-cn", "zh-tw"]
24213221 else:
2422 - return ("",)
 3222+ variants = [ "" ]
 3223+ return variants
24233224
24243225 def _variantOption(self, variant):
24253226 if variant == "":
@@ -2426,95 +3227,133 @@
24273228 else:
24283229 return ":variant=%s" % variant
24293230
2430 - def _variantFile(self, variant, chunk = 0):
2431 - if chunk:
2432 - chunkInfo = "%s" % chunk
 3231+ def dumpNameFromVariant(self, v):
 3232+ dumpNameBase = 'abstract'
 3233+ if v == "":
 3234+ return dumpNameBase
24333235 else:
2434 - chunkInfo = ""
2435 - if variant == "":
2436 - return( "abstract"+chunkInfo + ".xml")
2437 - else:
2438 - return( "abstract-%s%s.xml" % (variant, chunkInfo) )
 3236+ return dumpNameBase + "-" + v
24393237
2440 - def listOutputFiles(self, runner):
 3238+ def listDumpNames(self):
 3239+ # need this first for buildCommand and other such
 3240+ dumpNames = [ ]
 3241+ variants = self._variants()
 3242+ for v in variants:
 3243+ dumpNames.append(self.dumpNameFromVariant(v))
 3244+ return dumpNames
 3245+
 3246+ def listOutputFilesToPublish(self, dumpDir):
 3247+ dumpNames = self.listDumpNames()
24413248 files = []
2442 - for x in self._variants(runner):
2443 - if (self._chunks):
2444 - if (self._chunkToDo):
2445 - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)):
2446 - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks)))
2447 - files.append(self._variantFile(x, self._chunkToDo))
2448 - else:
2449 - for i in range(1, len(self._chunks)+1):
2450 - files.append(self._variantFile(x, i))
2451 - else:
2452 - files.append(self._variantFile(x))
2453 - return files
 3249+ files.extend(Dump.listOutputFilesToPublish(self, dumpDir, dumpNames))
 3250+ return files
24543251
2455 -class RecombineAbstractDump(AbstractDump):
2456 - def __init__(self, name, desc, chunks):
2457 - AbstractDump.__init__(self, name, desc, False, chunks)
2458 - # this is here only so that a callback can capture output from some commands
2459 - # related to recombining files if we did parallel runs of the recompression
2460 - self._output = None
 3252+ def listOutputFilesToCheckForTruncation(self, dumpDir):
 3253+ dumpNames = self.listDumpNames()
 3254+ files = []
 3255+ files.extend(Dump.listOutputFilesToCheckForTruncation(self, dumpDir, dumpNames))
 3256+ return files
24613257
2462 - def listOutputFiles(self, runner):
 3258+ def listOutputFilesForBuildCommand(self, dumpDir):
 3259+ dumpNames = self.listDumpNames()
24633260 files = []
2464 - for x in self._variants(runner):
2465 - files.append(self._variantFile(x))
2466 - return files
 3261+ files.extend(Dump.listOutputFilesForBuildCommand(self, dumpDir, dumpNames))
 3262+ return files
24673263
2468 - def listInputFiles(self, runner):
2469 - return(AbstractDump.listOutputFiles(self,runner))
 3264+ def listOutputFilesForCleanup(self, dumpDir):
 3265+ dumpNames = self.listDumpNames()
 3266+ files = []
 3267+ files.extend(Dump.listOutputFilesForCleanup(self, dumpDir, dumpNames))
 3268+ return files
24703269
 3270+ def listOutputFilesForInput(self, dumpDir):
 3271+ dumpNames = self.listDumpNames()
 3272+ files = []
 3273+ files.extend(Dump.listOutputFilesForInput(self, dumpDir, dumpNames))
 3274+ return files
 3275+
 3276+
 3277+class RecombineAbstractDump(Dump):
 3278+ def __init__(self, name, desc, itemForRecombine):
 3279+ # no chunkToDo, no chunks generally (False, False), even though input may have it
 3280+ self.itemForRecombine = itemForRecombine
 3281+ Dump.__init__(self, name, desc)
 3282+ # the input may have checkpoints but the output will not.
 3283+ self._checkpointsEnabled = False
 3284+
 3285+ def getFileType(self):
 3286+ return self.itemForRecombine.getFileType()
 3287+
 3288+ def getFileExt(self):
 3289+ return self.itemForRecombine.getFileExt()
 3290+
 3291+ def getDumpName(self):
 3292+ return self.itemForRecombine.getDumpName()
 3293+
24713294 def run(self, runner):
24723295 error = 0
2473 - if (self._chunks):
2474 - files = AbstractDump.listOutputFiles(self,runner)
2475 - outputFileList = self.listOutputFiles(runner)
2476 - for outputFile in outputFileList:
2477 - inputFiles = []
2478 - for inFile in files:
2479 - (base, rest) = inFile.split('.',1)
2480 - base = re.sub("\d+$", "", base)
2481 - if base + "." + rest == outputFile:
2482 - inputFiles.append(inFile)
2483 - if not len(inputFiles):
2484 - self.setStatus("failed")
2485 - raise BackupError("No input files for %s found" % self.name)
2486 - if (not exists( runner.wiki.config.cat ) ):
2487 - raise BackupError("cat command %s not found" % runner.wiki.config.cat);
2488 - compressionCommand = "%s > " % runner.wiki.config.cat
2489 - uncompressionCommand = [ "%s" % runner.wiki.config.cat ]
2490 - recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand, "<feed>" )
2491 - recombineCommand = [ recombineCommandString ]
2492 - recombinePipeline = [ recombineCommand ]
2493 - series = [ recombinePipeline ]
2494 - result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
2495 - if result:
2496 - error = result
 3296+ # FIXME check this code
 3297+ files = self.itemForRecombine.listOutputFilesForInput(runner.dumpDir)
 3298+ outputFileList = self.listOutputFilesForBuildCommand(runner.dumpDir)
 3299+ for outputFile in outputFileList:
 3300+ inputFiles = []
 3301+ for inFile in files:
 3302+ if inFile.dumpName == outputFile.dumpName:
 3303+ inputFiles.append(inFile)
 3304+ if not len(inputFiles):
 3305+ self.setStatus("failed")
 3306+ raise BackupError("No input files for %s found" % self.name())
 3307+ if (not exists( runner.wiki.config.cat ) ):
 3308+ raise BackupError("cat command %s not found" % runner.wiki.config.cat)
 3309+ compressionCommand = "%s > " % runner.wiki.config.cat
 3310+ uncompressionCommand = [ "%s" % runner.wiki.config.cat ]
 3311+ recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand, "<feed>" )
 3312+ recombineCommand = [ recombineCommandString ]
 3313+ recombinePipeline = [ recombineCommand ]
 3314+ series = [ recombinePipeline ]
 3315+ result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 3316+ if result:
 3317+ error = result
24973318 if (error):
24983319 raise BackupError("error recombining abstract dump files")
24993320
25003321 class TitleDump(Dump):
25013322 """This is used by "wikiproxy", a program to add Wikipedia links to BBC news online"""
 3323+
 3324+ def getDumpName(self):
 3325+ return "all-titles-in-ns0"
 3326+
 3327+ def getFileType(self):
 3328+ return ""
 3329+
 3330+ def getFileExt(self):
 3331+ return "gz"
 3332+
25023333 def run(self, runner):
25033334 retries = 0
25043335 # try this initially and see how it goes
25053336 maxretries = 3
25063337 query="select page_title from page where page_namespace=0;"
2507 - error = runner.saveSql(query, runner.dumpDir.publicPath("all-titles-in-ns0.gz"))
 3338+ files = self.listOutputFilesForBuildCommand(runner.dumpDir)
 3339+ if len(files) > 1:
 3340+ raise BackupError("page title dump trying to produce more than one output file")
 3341+ fileObj = files[0]
 3342+ outFilename = runner.dumpDir.filenamePublicPath(fileObj)
 3343+ error = self.saveSql(query, outFilename, runner)
25083344 while (error and retries < maxretries):
25093345 retries = retries + 1
25103346 time.sleep(5)
2511 - error = runner.saveSql(query, runner.dumpDir.publicPath("all-titles-in-ns0.gz"))
 3347+ error = self.saveSql(query, outFilename, runner)
25123348 if (error):
25133349 raise BackupError("error dumping titles list")
25143350
2515 - def listOutputFiles(self, runner):
2516 - return ["all-titles-in-ns0.gz"]
 3351+ def saveSql(self, query, outfile, runner):
 3352+ """Pass some SQL commands to the server for this DB and save output to a gzipped file."""
 3353+ if (not exists( runner.wiki.config.gzip ) ):
 3354+ raise BackupError("gzip command %s not found" % runner.wiki.config.gzip)
 3355+ command = runner.dbServerInfo.buildSqlCommand(query, runner.wiki.config.gzip)
 3356+ return runner.saveCommand(command, outfile)
25173357
2518 -
25193358 def findAndLockNextWiki(config, locksEnabled):
25203359 if config.halt:
25213360 print "Dump process halted by config."
@@ -2537,13 +3376,15 @@
25383377 return None
25393378
25403379 def xmlEscape(text):
2541 - return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;");
 3380+ return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
25423381
25433382 def usage(message = None):
25443383 if message:
25453384 print message
25463385 print "Usage: python worker.py [options] [wikidbname]"
2547 - print "Options: --configfile, --date, --job, --addnotice, --delnotice, --force, --noprefetch, --nospawn, --restartfrom, --log"
 3386+ print "Options: --chunk, --configfile, --date, --job, --addnotice, --delnotice, --force, --noprefetch, --nospawn, --restartfrom, --log"
 3387+ print "--chunk: Specify the number of the chunk to rerun (use with a specific job"
 3388+ print " to rerun, only if parallel jobs (chunks) are enabled)."
25483389 print "--configfile: Specify an alternative configuration file to read."
25493390 print " Default config file name: wikidump.conf"
25503391 print "--date: Rerun dump of a given date (probably unwise)"
@@ -2664,7 +3505,11 @@
26653506 # process any per-project configuration options
26663507 config.parseConfFilePerProject(wiki.dbName)
26673508
2668 - runner = Runner(wiki, date, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo)
 3509+ if not date:
 3510+ date = TimeUtils.today()
 3511+ wiki.setDate(date)
 3512+
 3513+ runner = Runner(wiki, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo)
26693514 if (restart):
26703515 print "Running %s, restarting from job %s..." % (wiki.dbName, jobRequested)
26713516 elif (jobRequested):
Index: branches/ariel/xmldumps-backup/CommandManagement.py
@@ -9,6 +9,7 @@
1010 import Queue
1111 import thread
1212 import fcntl
 13+import threading
1314
1415 from os.path import dirname, exists, getsize, join, realpath
1516 from subprocess import Popen, PIPE
@@ -380,47 +381,25 @@
381382 if (self.allCommandsCompleted() and not len(self._processesToPoll)):
382383 break
383384
384 -class CommandsInParallel(object):
385 - """Run a pile of commandSeries in parallel ( e.g. dump articles 1 to 100K,
386 - dump articles 100K+1 to 200K, ...). This takes as arguments: a list of series
387 - of pipelines (each pipeline is a list of commands, each series is a list of
388 - pipelines), as well as a possible callback which is used to capture all output
389 - from the various commmand series. If the callback takes an argument other than
390 - the line of output, it should be passed in the arg parameter (and it will be passed
391 - to the callback function first before the output line). If no callback is provided
392 - and the individual pipelines are not provided with a file to save output,
393 - then output is written to stderr.
394 - Callbackinterval is in milliseconds, defaults is 20 seconds"""
395 - def __init__(self, commandSeriesList, callbackStderr = None, callbackStdout = None, callbackTimed = None, callbackStderrArg=None, callbackStdoutArg = None, callbackTimedArg = None, quiet = False, shell = False, callbackInterval = 20000 ):
396 - self._commandSeriesList = commandSeriesList
397 - self._commandSerieses = []
398 - for series in self._commandSeriesList:
399 - self._commandSerieses.append( CommandSeries(series, quiet, shell) )
400 - # for each command series running in parallel,
401 - # in cases where a command pipeline in the series generates output, the callback
402 - # will be called with a line of output from the pipeline as it becomes available
 385+class ProcessMonitor(threading.Thread):
 386+ def __init__(self, timeout, queue, outputQueue, defaultCallbackInterval,
 387+ callbackStderr, callbackStdout, callbackTimed,
 388+ callbackStderrArg, callbackStdoutArg, callbackTimedArg):
 389+ threading.Thread.__init__(self)
 390+ self.timeout = timeout
 391+ self.queue = queue
 392+ self.outputQueue = outputQueue
 393+ self._defaultCallbackInterval = defaultCallbackInterval
403394 self._callbackStderr = callbackStderr
404395 self._callbackStdout = callbackStdout
405396 self._callbackTimed = callbackTimed
406397 self._callbackStderrArg = callbackStderrArg
407398 self._callbackStdoutArg = callbackStdoutArg
408399 self._callbackTimedArg = callbackTimedArg
409 - self._commandSeriesQueue = Queue.Queue()
410400
411 - # number millisecs we will wait for select.poll()
412 - self._defaultPollTime = 500
413 -
414 - # for programs that don't generate output, wait this many milliseconds between
415 - # invoking callback if there is one
416 - self._defaultCallbackInterval = callbackInterval
417 -
418 - def startCommands(self):
419 - for series in self._commandSerieses:
420 - series.startCommands()
421 -
422401 # one of these as a thread to monitor each command series.
423 - def seriesMonitor(self, timeout, queue):
424 - series = queue.get()
 402+ def run(self):
 403+ series = self.queue.get()
425404 while series.processProducingOutput():
426405 p = series.processProducingOutput()
427406 poller = select.poll()
@@ -441,7 +420,7 @@
442421
443422 waited = 0
444423 while not commandCompleted:
445 - waiting = poller.poll(self._defaultPollTime)
 424+ waiting = poller.poll(self.timeout)
446425 if (waiting):
447426 for (fd,event) in waiting:
448427 series.inProgressPipeline().setPollState(event)
@@ -449,21 +428,9 @@
450429 out = os.read(fd,1024)
451430 if out:
452431 if fd == p.stderr.fileno():
453 - if self._callbackStderr:
454 - if (self._callbackStderrArg):
455 - self._callbackStderr(self._callbackStderrArg, out)
456 - else:
457 - self._callbackStderr(out)
458 - else:
459 - sys.stderr.write(out)
 432+ self.outputQueue.put(OutputQueueItem(OutputQueueItem.getStderrChannel(),out))
460433 elif fd == p.stdout.fileno():
461 - if self._callbackStdout:
462 - if (self._callbackStdoutArg):
463 - self._callbackStdout(self._callbackStdoutArg, out)
464 - else:
465 - self._callbackStdout(out)
466 - else:
467 - sys.stderr.write(out)
 434+ self.outputQueue.put(OutputQueueItem(OutputQueueItem.getStdoutChannel(),out))
468435 else:
469436 # possible eof? what would cause this?
470437 pass
@@ -476,7 +443,7 @@
477444 print "returned from %s with %s" % (p.pid, p.returncode)
478445 commandCompleted = True
479446
480 - waited = waited + self._defaultPollTime
 447+ waited = waited + self.timeout
481448 if waited > self._defaultCallbackInterval and self._callbackTimed:
482449 if (self._callbackTimedArg):
483450 self._callbackTimed(self._callbackTimedArg)
@@ -488,13 +455,69 @@
489456 series.continueCommands()
490457
491458 # completed the whole series. time to go home.
492 - queue.task_done()
 459+ self.queue.task_done()
493460
 461+class OutputQueueItem(object):
 462+ def __init__(self, channel, contents):
 463+ self.channel = channel
 464+ self.contents = contents
 465+ self.stdoutChannel = OutputQueueItem.getStdoutChannel()
 466+ self.stderrChannel = OutputQueueItem.getStderrChannel()
494467
 468+ def getStdoutChannel():
 469+ return 1
 470+
 471+ def getStderrChannel():
 472+ return 2
 473+
 474+ getStdoutChannel = staticmethod(getStdoutChannel)
 475+ getStderrChannel = staticmethod(getStderrChannel)
 476+
 477+class CommandsInParallel(object):
 478+ """Run a pile of commandSeries in parallel ( e.g. dump articles 1 to 100K,
 479+ dump articles 100K+1 to 200K, ...). This takes as arguments: a list of series
 480+ of pipelines (each pipeline is a list of commands, each series is a list of
 481+ pipelines), as well as a possible callback which is used to capture all output
 482+ from the various commmand series. If the callback takes an argument other than
 483+ the line of output, it should be passed in the arg parameter (and it will be passed
 484+ to the callback function first before the output line). If no callback is provided
 485+ and the individual pipelines are not provided with a file to save output,
 486+ then output is written to stderr.
 487+ Callbackinterval is in milliseconds, defaults is 20 seconds"""
 488+ def __init__(self, commandSeriesList, callbackStderr = None, callbackStdout = None, callbackTimed = None, callbackStderrArg=None, callbackStdoutArg = None, callbackTimedArg = None, quiet = False, shell = False, callbackInterval = 20000 ):
 489+ self._commandSeriesList = commandSeriesList
 490+ self._commandSerieses = []
 491+ for series in self._commandSeriesList:
 492+ self._commandSerieses.append( CommandSeries(series, quiet, shell) )
 493+ # for each command series running in parallel,
 494+ # in cases where a command pipeline in the series generates output, the callback
 495+ # will be called with a line of output from the pipeline as it becomes available
 496+ self._callbackStderr = callbackStderr
 497+ self._callbackStdout = callbackStdout
 498+ self._callbackTimed = callbackTimed
 499+ self._callbackStderrArg = callbackStderrArg
 500+ self._callbackStdoutArg = callbackStdoutArg
 501+ self._callbackTimedArg = callbackTimedArg
 502+ self._commandSeriesQueue = Queue.Queue()
 503+ self._outputQueue = Queue.Queue()
 504+ self._normalThreadCount = threading.activeCount()
 505+
 506+ # number millisecs we will wait for select.poll()
 507+ self._defaultPollTime = 500
 508+
 509+ # for programs that don't generate output, wait this many milliseconds between
 510+ # invoking callback if there is one
 511+ self._defaultCallbackInterval = callbackInterval
 512+
 513+ def startCommands(self):
 514+ for series in self._commandSerieses:
 515+ series.startCommands()
 516+
495517 def setupOutputMonitoring(self):
496518 for series in self._commandSerieses:
497519 self._commandSeriesQueue.put(series)
498 - thread.start_new_thread(self.seriesMonitor, (500, self._commandSeriesQueue))
 520+ t = ProcessMonitor(500, self._commandSeriesQueue, self._outputQueue, self._defaultCallbackInterval,self._callbackStderr,self._callbackStdout,self._callbackTimed,self._callbackStderrArg,self._callbackStdoutArg,self._callbackTimedArg)
 521+ t.start()
499522
500523 def allCommandsCompleted(self):
501524 """Check if all series have run to completion."""
@@ -517,10 +540,40 @@
518541 commands.extend(series.exitedWithErrors())
519542 return(commands)
520543
 544+ def watchOutputQueue(self):
 545+ done = False
 546+ while not done:
 547+ # check the number of threads active, if they are all gone we are done
 548+ if threading.activeCount() == self._normalThreadCount:
 549+ done = True
 550+ output = None
 551+ try:
 552+ output = self._outputQueue.get(True, 1)
 553+ except:
 554+ pass
 555+ if output:
 556+ if output.channel == OutputQueueItem.getStdoutChannel():
 557+ if self._callbackStdout:
 558+ if (self._callbackStdoutArg):
 559+ self._callbackStdout(self._callbackStdoutArg, output.contents)
 560+ else:
 561+ self._callbackStdout(output.contents)
 562+ else:
 563+ sys.stderr.write(output.contents)
 564+ else: # output channel is stderr
 565+ if self._callbackStderr:
 566+ if (self._callbackStderrArg):
 567+ self._callbackStderr(self._callbackStderrArg, output.contents)
 568+ else:
 569+ self._callbackStderr(output.contents)
 570+ else:
 571+ sys.stderr.write(output.contents)
 572+
521573 def runCommands(self):
522574 self.startCommands()
523575 self.setupOutputMonitoring()
524 - self._commandSeriesQueue.join()
 576+ self.watchOutputQueue()
 577+# self._commandSeriesQueue.join()
525578
526579
527580 def testcallback(output = None):
@@ -558,5 +611,3 @@
559612 print "w00t!"
560613 else:
561614 print "big bummer!"
562 -
563 -

Status & tagging log