Index: branches/ariel/xmldumps-backup/WikiDump.py |
— | — | @@ -20,7 +20,9 @@ |
21 | 21 | return os.fdopen(fd, mode) |
22 | 22 | |
23 | 23 | def writeFile(dirname, filename, text, perms = 0): |
24 | | - """Write text to a file, as atomically as possible, via a temporary file in the same directory.""" |
| 24 | + """Write text to a file, as atomically as possible, via a temporary file in a specified directory. |
| 25 | + Arguments: dirname = where temp file is created, filename = full path to actual file, text = contents |
| 26 | + to write to file, perms = permissions that the file will have after creation""" |
25 | 27 | |
26 | 28 | (fd, tempFilename ) = tempfile.mkstemp("_txt","wikidump_",dirname); |
27 | 29 | os.write(fd,text) |
— | — | @@ -31,6 +33,19 @@ |
32 | 34 | # Of course nothing else will work on Windows. ;) |
33 | 35 | os.rename(tempFilename, filename) |
34 | 36 | |
| 37 | + def writeFileInPlace(filename, text, perms = 0): |
| 38 | + """Write text to a file, after opening it for write with truncation. |
| 39 | + This assumes that only one process or thread accesses the given file at a time. |
| 40 | + Arguments: filename = full path to actual file, text = contents |
| 41 | + to write to file, perms = permissions that the file will have after creation, |
| 42 | + if it did not exist already""" |
| 43 | + |
| 44 | + file = open(filename, "wt") |
| 45 | + file.write(text) |
| 46 | + file.close() |
| 47 | + if (perms): |
| 48 | + os.chmod(filename,perms) |
| 49 | + |
35 | 50 | def readFile(filename): |
36 | 51 | """Read text from a file in one fell swoop.""" |
37 | 52 | file = open(filename, "r") |
— | — | @@ -75,6 +90,7 @@ |
76 | 91 | fileAge = staticmethod(fileAge) |
77 | 92 | atomicCreate = staticmethod(atomicCreate) |
78 | 93 | writeFile = staticmethod(writeFile) |
| 94 | + writeFileInPlace = staticmethod(writeFileInPlace) |
79 | 95 | readFile = staticmethod(readFile) |
80 | 96 | splitPath = staticmethod(splitPath) |
81 | 97 | relativePath = staticmethod(relativePath) |
— | — | @@ -140,7 +156,6 @@ |
141 | 157 | #"wiki": { |
142 | 158 | "dblist": "", |
143 | 159 | "privatelist": "", |
144 | | - "biglist": "", |
145 | 160 | "flaggedrevslist": "", |
146 | 161 | # "dir": "", |
147 | 162 | "forcenormal": "0", |
— | — | @@ -149,6 +164,7 @@ |
150 | 165 | #"output": { |
151 | 166 | "public": "/dumps/public", |
152 | 167 | "private": "/dumps/private", |
| 168 | + "temp":"/dumps/temp", |
153 | 169 | "webroot": "http://localhost/dumps", |
154 | 170 | "index": "index.html", |
155 | 171 | "templatedir": home, |
— | — | @@ -194,6 +210,9 @@ |
195 | 211 | "pagesPerChunkAbstract" : False, |
196 | 212 | # whether or not to recombine the history pieces |
197 | 213 | "recombineHistory" : "1", |
| 214 | + # do we write out checkpoint files at regular intervals? (article/metacurrent/metahistory |
| 215 | + # dumps only.) |
| 216 | + "checkpointTime" : "0", |
198 | 217 | } |
199 | 218 | self.conf = ConfigParser.SafeConfigParser(defaults) |
200 | 219 | self.conf.read(self.files) |
— | — | @@ -213,7 +232,6 @@ |
214 | 233 | self.dbList = MiscUtils.dbList(self.conf.get("wiki", "dblist")) |
215 | 234 | self.skipDbList = MiscUtils.dbList(self.conf.get("wiki", "skipdblist")) |
216 | 235 | self.privateList = MiscUtils.dbList(self.conf.get("wiki", "privatelist")) |
217 | | - self.bigList = MiscUtils.dbList(self.conf.get("wiki", "biglist")) |
218 | 236 | self.flaggedRevsList = MiscUtils.dbList(self.conf.get("wiki", "flaggedrevslist")) |
219 | 237 | self.wikiDir = self.conf.get("wiki", "dir") |
220 | 238 | self.forceNormal = self.conf.getint("wiki", "forceNormal") |
— | — | @@ -225,6 +243,7 @@ |
226 | 244 | self.conf.add_section('output') |
227 | 245 | self.publicDir = self.conf.get("output", "public") |
228 | 246 | self.privateDir = self.conf.get("output", "private") |
| 247 | + self.tempDir = self.conf.get("output", "temp") |
229 | 248 | self.webRoot = self.conf.get("output", "webroot") |
230 | 249 | self.index = self.conf.get("output", "index") |
231 | 250 | self.templateDir = self.conf.get("output", "templateDir") |
— | — | @@ -279,6 +298,7 @@ |
280 | 299 | self.revsPerChunkHistory = self.getOptionForProjectOrDefault(conf, "chunks","revsPerChunkHistory",0) |
281 | 300 | self.pagesPerChunkAbstract = self.getOptionForProjectOrDefault(conf, "chunks","pagesPerChunkAbstract",0) |
282 | 301 | self.recombineHistory = self.getOptionForProjectOrDefault(conf, "chunks","recombineHistory",1) |
| 302 | + self.checkpointTime = self.getOptionForProjectOrDefault(conf, "chunks","checkpointTime",1) |
283 | 303 | |
284 | 304 | def getOptionForProjectOrDefault(self, conf, sectionName, itemName, isInt): |
285 | 305 | if (conf.has_section(self.projectName)): |
— | — | @@ -366,9 +386,6 @@ |
367 | 387 | def isPrivate(self): |
368 | 388 | return self.dbName in self.config.privateList |
369 | 389 | |
370 | | - def isBig(self): |
371 | | - return self.dbName in self.config.bigList |
372 | | - |
373 | 390 | def hasFlaggedRevs(self): |
374 | 391 | return self.dbName in self.config.flaggedRevsList |
375 | 392 | |
— | — | @@ -395,7 +412,7 @@ |
396 | 413 | |
397 | 414 | def privateDir(self): |
398 | 415 | return os.path.join(self.config.privateDir, self.dbName) |
399 | | - |
| 416 | + |
400 | 417 | def webDir(self): |
401 | 418 | return "/".join((self.config.webRoot, self.dbName)) |
402 | 419 | |
— | — | @@ -437,7 +454,7 @@ |
438 | 455 | def writePerDumpIndex(self, html): |
439 | 456 | directory = os.path.join(self.publicDir(), self.date) |
440 | 457 | index = os.path.join(self.publicDir(), self.date, self.config.perDumpIndex) |
441 | | - FileUtils.writeFile(directory, index, html, self.config.fileperms) |
| 458 | + FileUtils.writeFileInPlace(index, html, self.config.fileperms) |
442 | 459 | |
443 | 460 | def existsPerDumpIndex(self): |
444 | 461 | index = os.path.join(self.publicDir(), self.date, self.config.perDumpIndex) |
— | — | @@ -446,7 +463,7 @@ |
447 | 464 | def writeStatus(self, message): |
448 | 465 | directory = os.path.join(self.publicDir(), self.date) |
449 | 466 | index = os.path.join(self.publicDir(), self.date, "status.html") |
450 | | - FileUtils.writeFile(directory, index, message, self.config.fileperms) |
| 467 | + FileUtils.writeFileInPlace(index, message, self.config.fileperms) |
451 | 468 | |
452 | 469 | def statusLine(self): |
453 | 470 | date = self.latestDump() |
Index: branches/ariel/xmldumps-backup/worker.py |
— | — | @@ -19,7 +19,7 @@ |
20 | 20 | import Queue |
21 | 21 | import thread |
22 | 22 | |
23 | | -from os.path import dirname, exists, getsize, join, realpath |
| 23 | +from os.path import exists |
24 | 24 | from subprocess import Popen, PIPE |
25 | 25 | from WikiDump import FileUtils, MiscUtils, TimeUtils |
26 | 26 | from CommandManagement import CommandPipeline, CommandSeries, CommandsInParallel |
— | — | @@ -164,7 +164,7 @@ |
165 | 165 | def defaultServer(self): |
166 | 166 | # if this fails what do we do about it? Not a bleeping thing. *ugh* FIXME!! |
167 | 167 | if (not exists( self.wiki.config.php ) ): |
168 | | - raise BackupError("php command %s not found" % self.wiki.config.php); |
| 168 | + raise BackupError("php command %s not found" % self.wiki.config.php) |
169 | 169 | command = "%s -q %s/maintenance/getSlaveServer.php --wiki=%s --group=dump" % MiscUtils.shellEscape(( |
170 | 170 | self.wiki.config.php, self.wiki.config.wikiDir, self.dbName)) |
171 | 171 | return RunSimpleCommand.runAndReturn(command, self.errorCallback).strip() |
— | — | @@ -175,7 +175,7 @@ |
176 | 176 | def buildSqlCommand(self, query, pipeto = None): |
177 | 177 | """Put together a command to execute an sql query to the server for this DB.""" |
178 | 178 | if (not exists( self.wiki.config.mysql ) ): |
179 | | - raise BackupError("mysql command %s not found" % self.wiki.config.mysql); |
| 179 | + raise BackupError("mysql command %s not found" % self.wiki.config.mysql) |
180 | 180 | command = [ [ "/bin/echo", "%s" % query ], |
181 | 181 | [ "%s" % self.wiki.config.mysql, "-h", |
182 | 182 | "%s" % self.dbServer, |
— | — | @@ -191,7 +191,7 @@ |
192 | 192 | """Put together a command to dump a table from the current DB with mysqldump |
193 | 193 | and save to a gzipped sql file.""" |
194 | 194 | if (not exists( self.wiki.config.mysqldump ) ): |
195 | | - raise BackupError("mysqldump command %s not found" % self.wiki.config.mysqldump); |
| 195 | + raise BackupError("mysqldump command %s not found" % self.wiki.config.mysqldump) |
196 | 196 | command = [ [ "%s" % self.wiki.config.mysqldump, "-h", |
197 | 197 | "%s" % self.dbServer, "-u", |
198 | 198 | "%s" % self.wiki.config.dbUser, |
— | — | @@ -225,7 +225,7 @@ |
226 | 226 | """Get the prefix for all tables for the specific wiki ($wgDBprefix)""" |
227 | 227 | # FIXME later full path |
228 | 228 | if (not exists( self.wiki.config.php ) ): |
229 | | - raise BackupError("php command %s not found" % self.wiki.config.php); |
| 229 | + raise BackupError("php command %s not found" % self.wiki.config.php) |
230 | 230 | command = "echo 'print $wgDBprefix; ' | %s -q %s/maintenance/eval.php --wiki=%s" % MiscUtils.shellEscape(( |
231 | 231 | self.wiki.config.php, self.wiki.config.wikiDir, self.dbName)) |
232 | 232 | return RunSimpleCommand.runAndReturn(command, self.errorCallback).strip() |
— | — | @@ -369,16 +369,16 @@ |
370 | 370 | |
371 | 371 | def _getDumpRunInfoDirName(self, date=None): |
372 | 372 | if (date): |
373 | | - return os.path.join(self.wiki.publicDir(), date); |
| 373 | + return os.path.join(self.wiki.publicDir(), date) |
374 | 374 | else: |
375 | | - return os.path.join(self.wiki.publicDir(), self.wiki.date); |
| 375 | + return os.path.join(self.wiki.publicDir(), self.wiki.date) |
376 | 376 | |
377 | 377 | # format: name:%; updated:%; status:% |
378 | 378 | def _getOldRunInfoFromLine(self, line): |
379 | 379 | # get rid of leading/trailing/blanks |
380 | 380 | line = line.strip(" ") |
381 | 381 | line = line.replace("\n","") |
382 | | - fields = line.split(';',3) |
| 382 | + fields = line.split(';',2) |
383 | 383 | dumpRunInfo = RunInfo() |
384 | 384 | for field in fields: |
385 | 385 | field = field.strip(" ") |
— | — | @@ -394,14 +394,15 @@ |
395 | 395 | def _writeDumpRunInfoFile(self, text): |
396 | 396 | directory = self._getDumpRunInfoDirName() |
397 | 397 | dumpRunInfoFilename = self._getDumpRunInfoFileName() |
398 | | - FileUtils.writeFile(directory, dumpRunInfoFilename, text, self.wiki.config.fileperms) |
| 398 | +# FileUtils.writeFile(directory, dumpRunInfoFilename, text, self.wiki.config.fileperms) |
| 399 | + FileUtils.writeFile(self.wiki.config.tempDir, dumpRunInfoFilename, text, self.wiki.config.fileperms) |
399 | 400 | |
400 | 401 | # format: name:%; updated:%; status:% |
401 | 402 | def _getStatusForJobFromRunInfoFileLine(self, line, jobName): |
402 | 403 | # get rid of leading/trailing/embedded blanks |
403 | 404 | line = line.replace(" ","") |
404 | 405 | line = line.replace("\n","") |
405 | | - fields = line.split(';',3) |
| 406 | + fields = line.split(';',2) |
406 | 407 | for field in fields: |
407 | 408 | (fieldName, separator, fieldValue) = field.partition(':') |
408 | 409 | if (fieldName == "name"): |
— | — | @@ -484,18 +485,21 @@ |
485 | 486 | self._toBeRun = toBeRun |
486 | 487 | |
487 | 488 | class DumpItemList(object): |
488 | | - def __init__(self, wiki, prefetch, spawn, date, chunkToDo, singleJob, chunkInfo, runInfoFile): |
489 | | - self.date = date |
| 489 | + def __init__(self, wiki, prefetch, spawn, chunkToDo, singleJob, chunkInfo, runInfoFile, dumpDir): |
490 | 490 | self.wiki = wiki |
491 | 491 | self._hasFlaggedRevs = self.wiki.hasFlaggedRevs() |
492 | | - self._isBig = self.wiki.isBig() |
493 | 492 | self._prefetch = prefetch |
494 | 493 | self._spawn = spawn |
495 | 494 | self.chunkInfo = chunkInfo |
496 | 495 | self._chunkToDo = chunkToDo |
497 | 496 | self._singleJob = singleJob |
498 | 497 | self._runInfoFile = runInfoFile |
499 | | - |
| 498 | + self.dumpDir = dumpDir |
| 499 | + if self.wiki.config.checkpointTime: |
| 500 | + checkpoints = True |
| 501 | + else: |
| 502 | + checkpoints = False |
| 503 | + |
500 | 504 | if (self._singleJob and self._chunkToDo): |
501 | 505 | if (self._singleJob[-5:] == 'table' or |
502 | 506 | self._singleJob[-9:] == 'recombine' or |
— | — | @@ -540,29 +544,29 @@ |
541 | 545 | AbstractDump("abstractsdump","Extracted page abstracts for Yahoo", self._getChunkToDo("abstractsdump"), self.chunkInfo.getPagesPerChunkAbstract())] |
542 | 546 | |
543 | 547 | if (self.chunkInfo.chunksEnabled()): |
544 | | - self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self.chunkInfo.getPagesPerChunkAbstract())) |
| 548 | + self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self.findItemByName('abstractsdump'))) |
545 | 549 | |
546 | 550 | self.dumpItems.append(XmlStub("xmlstubsdump", "First-pass for page XML data dumps", self._getChunkToDo("xmlstubsdump"), self.chunkInfo.getPagesPerChunkHistory())) |
547 | 551 | if (self.chunkInfo.chunksEnabled()): |
548 | | - self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps", self.chunkInfo.getPagesPerChunkHistory())) |
| 552 | + self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps",self.findItemByName('xmlstubsdump'))) |
549 | 553 | |
550 | 554 | # NOTE that the chunkInfo thing passed here is irrelevant, these get generated from the stubs which are all done in one pass |
551 | 555 | self.dumpItems.append( |
552 | 556 | XmlDump("articles", |
553 | 557 | "articlesdump", |
554 | 558 | "<big><b>Articles, templates, image descriptions, and primary meta-pages.</b></big>", |
555 | | - "This contains current versions of article content, and is the archive most mirror sites will probably want.", self._prefetch, self._spawn, self._getChunkToDo("articlesdump"), self.chunkInfo.getPagesPerChunkHistory())) |
| 559 | + "This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("articlesdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints)) |
556 | 560 | if (self.chunkInfo.chunksEnabled()): |
557 | | - self.dumpItems.append(RecombineXmlDump("articles","articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.chunkInfo.getPagesPerChunkHistory())) |
558 | | - |
| 561 | + self.dumpItems.append(RecombineXmlDump("articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.findItemByName('articlesdump'))) |
| 562 | + |
559 | 563 | self.dumpItems.append( |
560 | 564 | XmlDump("meta-current", |
561 | 565 | "metacurrentdump", |
562 | 566 | "All pages, current versions only.", |
563 | | - "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._prefetch, self._spawn, self._getChunkToDo("metacurrentdump"), self.chunkInfo.getPagesPerChunkHistory())) |
| 567 | + "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("metacurrentdump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints)) |
564 | 568 | |
565 | 569 | if (self.chunkInfo.chunksEnabled()): |
566 | | - self.dumpItems.append(RecombineXmlDump("meta-current","metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.chunkInfo.getPagesPerChunkHistory())) |
| 570 | + self.dumpItems.append(RecombineXmlDump("metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.findItemByName('metacurrentdump'))) |
567 | 571 | |
568 | 572 | self.dumpItems.append( |
569 | 573 | XmlLogging("Log events to all pages.")) |
— | — | @@ -572,34 +576,31 @@ |
573 | 577 | PublicTable( "flaggedpages", "flaggedpagestable","This contains a row for each flagged article, containing the stable revision ID, if the lastest edit was flagged, and how long edits have been pending." )) |
574 | 578 | self.dumpItems.append( |
575 | 579 | PublicTable( "flaggedrevs", "flaggedrevstable","This contains a row for each flagged revision, containing who flagged it, when it was flagged, reviewer comments, the flag values, and the quality tier those flags fall under." )) |
576 | | - |
577 | | - if not self._isBig: |
| 580 | + |
| 581 | + self.dumpItems.append( |
| 582 | + BigXmlDump("meta-history", |
| 583 | + "metahistorybz2dump", |
| 584 | + "All pages with complete page edit history (.bz2)", |
| 585 | + "These dumps can be *very* large, uncompressing up to 20 times the archive download size. " + |
| 586 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('xmlstubsdump'), self._prefetch, self._spawn, self.wiki, self._getChunkToDo("metahistorybz2dump"), self.chunkInfo.getPagesPerChunkHistory(), checkpoints)) |
| 587 | + if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()): |
578 | 588 | self.dumpItems.append( |
579 | | - BigXmlDump("meta-history", |
580 | | - "metahistorybz2dump", |
581 | | - "All pages with complete page edit history (.bz2)", |
582 | | - "These dumps can be *very* large, uncompressing up to 20 times the archive download size. " + |
583 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._prefetch, self._spawn, self._getChunkToDo("metahistorybz2dump"), self.chunkInfo.getPagesPerChunkHistory())) |
584 | | - if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()): |
585 | | - self.dumpItems.append( |
586 | | - RecombineXmlDump("meta-history", |
587 | | - "metahistorybz2dumprecombine", |
588 | | - "Recombine all pages with complete edit history (.bz2)", |
589 | | - "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
590 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory())) |
| 589 | + RecombineXmlDump("metahistorybz2dumprecombine", |
| 590 | + "Recombine all pages with complete edit history (.bz2)", |
| 591 | + "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
| 592 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistorybz2dump'))) |
| 593 | + self.dumpItems.append( |
| 594 | + XmlRecompressDump("meta-history", |
| 595 | + "metahistory7zdump", |
| 596 | + "All pages with complete edit history (.7z)", |
| 597 | + "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
| 598 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistorybz2dump'), self.wiki, self._getChunkToDo("metahistory7zdump"), self.chunkInfo.getPagesPerChunkHistory())) |
| 599 | + if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()): |
591 | 600 | self.dumpItems.append( |
592 | | - XmlRecompressDump("meta-history", |
593 | | - "metahistory7zdump", |
594 | | - "All pages with complete edit history (.7z)", |
595 | | - "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
596 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._getChunkToDo("metahistory7zdump"), self.chunkInfo.getPagesPerChunkHistory())) |
597 | | - if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()): |
598 | | - self.dumpItems.append( |
599 | | - RecombineXmlRecompressDump("meta-history", |
600 | | - "metahistory7zdumprecombine", |
601 | | - "Recombine all pages with complete edit history (.7z)", |
602 | | - "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
603 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory())) |
| 601 | + RecombineXmlRecompressDump("metahistory7zdumprecombine", |
| 602 | + "Recombine all pages with complete edit history (.7z)", |
| 603 | + "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
| 604 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.findItemByName('metahistory7zdump'), self.wiki)) |
604 | 605 | results = self._runInfoFile.getOldRunInfoFromFile() |
605 | 606 | if (results): |
606 | 607 | for runInfoObj in results: |
— | — | @@ -655,6 +656,12 @@ |
656 | 657 | for j in range(i,len(self.dumpItems)): |
657 | 658 | self.dumpItems[j].setToBeRun(True) |
658 | 659 | break |
| 660 | + |
| 661 | + def findItemByName(self, name): |
| 662 | + for item in self.dumpItems: |
| 663 | + if (item.name() == name): |
| 664 | + return item |
| 665 | + return None |
659 | 666 | |
660 | 667 | # see whether job needs previous jobs that have not completed successfully |
661 | 668 | def jobDoneSuccessfully(self, job): |
— | — | @@ -734,12 +741,16 @@ |
735 | 742 | checksumFileName = self._getChecksumFileNameTmp() |
736 | 743 | output = file(checksumFileName, "w") |
737 | 744 | |
738 | | - def checksum(self, filename, runner): |
| 745 | + def checksum(self, fileObj, runner): |
739 | 746 | """Run checksum for an output file, and append to the list.""" |
740 | 747 | if (self._enabled): |
741 | 748 | checksumFileName = self._getChecksumFileNameTmp() |
742 | 749 | output = file(checksumFileName, "a") |
743 | | - self._saveChecksum(filename, output, runner) |
| 750 | + runner.debug("Checksumming %s" % fileObj.filename) |
| 751 | + dumpfile = DumpFile(self.wiki, runner.dumpDir.filenamePublicPath(fileObj)) |
| 752 | + checksum = dumpfile.md5Sum() |
| 753 | + if checksum != None: |
| 754 | + output.write( "%s %s\n" % (checksum, fileObj.filename)) |
744 | 755 | output.close() |
745 | 756 | |
746 | 757 | def moveMd5FileIntoPlace(self): |
— | — | @@ -753,7 +764,7 @@ |
754 | 765 | tmpFileName = self._getChecksumFileNameTmp() |
755 | 766 | realFileName = self._getChecksumFileName() |
756 | 767 | text = FileUtils.readFile(tmpFileName) |
757 | | - FileUtils.writeFile(self._getMd5FileDirName(), realFileName, text, self.wiki.config.fileperms) |
| 768 | + FileUtils.writeFile(self.wiki.config.tempDir, realFileName, text, self.wiki.config.fileperms) |
758 | 769 | |
759 | 770 | def getChecksumFileNameBasename(self): |
760 | 771 | return ("md5sums.txt") |
— | — | @@ -762,14 +773,382 @@ |
763 | 774 | # functions internal to the class |
764 | 775 | # |
765 | 776 | def _getChecksumFileName(self): |
766 | | - return (self.dumpDir.publicPath(self.getChecksumFileNameBasename())) |
| 777 | + fileObj = DumpFilename(self.wiki, None, self.getChecksumFileNameBasename()) |
| 778 | + return (self.dumpDir.filenamePublicPath(fileObj)) |
767 | 779 | |
768 | 780 | def _getChecksumFileNameTmp(self): |
769 | | - return (self.dumpDir.publicPath(self.getChecksumFileNameBasename() + "." + self.timestamp + ".tmp")) |
| 781 | + fileObj = DumpFilename(self.wiki, None, self.getChecksumFileNameBasename() + "." + self.timestamp + ".tmp") |
| 782 | + return (self.dumpDir.filenamePublicPath(fileObj)) |
770 | 783 | |
771 | | - def _md5File(self, filename): |
| 784 | + def _getMd5FileDirName(self): |
| 785 | + return os.path.join(self.wiki.publicDir(), self.wiki.date) |
| 786 | + |
| 787 | +class DumpDir(object): |
| 788 | + def __init__(self, wiki, dbName): |
| 789 | + self._wiki = wiki |
| 790 | + self._dbName = dbName |
| 791 | + self._dirCache = {} |
| 792 | + self._dirCacheTime = {} |
| 793 | + self._chunkFileCache = {} |
| 794 | + self._checkpointFileCache = {} |
| 795 | + |
| 796 | + def filenamePrivatePath(self, dumpFile, dateString = None): |
| 797 | + """Given a DumpFilename object, produce the full path to the filename in the date subdir |
| 798 | + of the the private dump dir for the selected database. |
| 799 | + If a different date is specified, use that instead""" |
| 800 | + if (not dateString): |
| 801 | + dateString = self._wiki.date |
| 802 | + return os.path.join(self._wiki.privateDir(), dateString, dumpFile.filename) |
| 803 | + |
| 804 | + def filenamePublicPath(self, dumpFile, dateString = None): |
| 805 | + """Given a DumpFilename object produce the full path to the filename in the date subdir |
| 806 | + of the public dump dir for the selected database. |
| 807 | + If this database is marked as private, use the private dir instead. |
| 808 | + If a different date is specified, use that instead""" |
| 809 | + if (not dateString): |
| 810 | + dateString = self._wiki.date |
| 811 | + return os.path.join(self._wiki.publicDir(), dateString, dumpFile.filename) |
| 812 | + |
| 813 | + def latestDir(self): |
| 814 | + """Return 'latest' directory for the current project being dumped, e.g. |
| 815 | + if the current project is enwiki, this would return something like |
| 816 | + /mnt/data/xmldatadumps/public/enwiki/latest (if the directory /mnt/data/xmldatadumps/public |
| 817 | + is the path to the directory for public dumps).""" |
| 818 | + return os.path.join(self._wiki.publicDir(), "latest") |
| 819 | + |
| 820 | + def webPath(self, dumpFile, dateString = None): |
| 821 | + """Given a DumpFilename object produce the full url to the filename for the date of |
| 822 | + the dump for the selected database.""" |
| 823 | + if (not dateString): |
| 824 | + dateString = self._wiki.date |
| 825 | + return os.path.join(self._wiki.webDir(), dateString, dumpFile.filename) |
| 826 | + |
| 827 | + def dirCacheOutdated(self, date): |
| 828 | + if not date: |
| 829 | + date = self._wiki.date |
| 830 | + directory = os.path.join(self._wiki.publicDir(), date) |
| 831 | + dirTimeStamp = os.stat(directory).st_mtime |
| 832 | + if (not date in self._dirCache or dirTimeStamp > self._dirCacheTime[date]): |
| 833 | + return True |
| 834 | + else: |
| 835 | + return False |
| 836 | + |
| 837 | + # warning: date can also be "latest" |
| 838 | + def getFilesInDir(self, date = None): |
| 839 | + if not date: |
| 840 | + date = self._wiki.date |
| 841 | + if (self.dirCacheOutdated(date)): |
| 842 | + directory = os.path.join(self._wiki.publicDir(),date) |
| 843 | + dirTimeStamp = os.stat(directory).st_mtime |
| 844 | + files = os.listdir(directory) |
| 845 | + fileObjs = [] |
| 846 | + for f in files: |
| 847 | + fileObj = DumpFilename(self._wiki) |
| 848 | + fileObj.newFromFilename(f) |
| 849 | + fileObjs.append(fileObj) |
| 850 | + self._dirCache[date] = fileObjs |
| 851 | + self._dirCacheTime[date] = dirTimeStamp |
| 852 | + return(self._dirCache[date]) |
| 853 | + |
| 854 | + # list all files that exist, filtering by the given args. |
| 855 | + # if we get None for an arg then we accept all values for that arg in the filename, including missing |
| 856 | + # if we get False for an arg (chunk, temp, checkpoint), we reject any filename which contains a value for that arg |
| 857 | + # if we get True for an arg (chunk, temp, checkpoint), we include only filenames which contain a value for that arg |
| 858 | + # chunks should be a list of value(s) or True / False / None |
| 859 | + # |
| 860 | + # note that we ignore files with ".truncated". these are known to be bad. |
| 861 | + def _getFilesFiltered(self, date = None, dumpName = None, fileType = None, fileExt = None, chunks = None, temp = None, checkpoint = None ): |
| 862 | + if not date: |
| 863 | + date = self._wiki.date |
| 864 | + fileObjs = self.getFilesInDir(date) |
| 865 | + filesMatched = [] |
| 866 | + for f in fileObjs: |
| 867 | + # fixme this is a bit hackish |
| 868 | + if f.filename.endswith("truncated"): |
| 869 | + continue |
| 870 | + |
| 871 | + if dumpName and f.dumpName != dumpName: |
| 872 | + continue |
| 873 | + if fileType != None and f.fileType != fileType: |
| 874 | + continue |
| 875 | + if fileExt != None and f.fileExt != fileExt: |
| 876 | + continue |
| 877 | + if (chunks == False and f.isChunkFile): |
| 878 | + continue |
| 879 | + if (chunks == True and not f.isChunkFile): |
| 880 | + continue |
| 881 | + # chunks is a list... |
| 882 | + if (chunks and chunks != True and not f.chunkInt in chunks): |
| 883 | + continue |
| 884 | + if (temp == False and f.isTempFile) or (temp and not f.isTempFile): |
| 885 | + continue |
| 886 | + if (checkpoint == False and f.isCheckpointFile) or (checkpoint and not f.isCheckpointFile): |
| 887 | + continue |
| 888 | + filesMatched.append(f) |
| 889 | + self.sort_fileobjs(filesMatched) |
| 890 | + return filesMatched |
| 891 | + |
| 892 | + # taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net) |
| 893 | + def sort_fileobjs(self, l): |
| 894 | + """ Sort the given list in the way that humans expect. |
| 895 | + """ |
| 896 | + convert = lambda text: int(text) if text.isdigit() else text |
| 897 | + alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key.filename) ] |
| 898 | + l.sort( key=alphanum_key ) |
| 899 | + |
| 900 | + # list all checkpoint files that exist, filtering by the given args. |
| 901 | + # if we get None for an arg then we accept all values for that arg in the filename |
| 902 | + # if we get False for an arg (chunks, temp), we reject any filename which contains a value for that arg |
| 903 | + # if we get True for an arg (chunk, temp), we accept only filenames which contain a value for the arg |
| 904 | + # chunks should be a list of value(s), or True / False / None |
| 905 | + def getCheckpointFilesExisting(self, date = None, dumpName = None, fileType = None, fileExt = None, chunks = False, temp = False ): |
| 906 | + return self._getFilesFiltered(date, dumpName, fileType, fileExt, chunks, temp, checkpoint = True) |
| 907 | + |
| 908 | + # list all non-checkpoint files that exist, filtering by the given args. |
| 909 | + # if we get None for an arg then we accept all values for that arg in the filename |
| 910 | + # if we get False for an arg (chunk, temp), we reject any filename which contains a value for that arg |
| 911 | + # if we get True for an arg (chunk, temp), we accept only filenames which contain a value for the arg |
| 912 | + # chunks should be a list of value(s), or True / False / None |
| 913 | + def getRegularFilesExisting(self, date = None, dumpName = None, fileType = None, fileExt = None, chunks = False, temp = False ): |
| 914 | + return self._getFilesFiltered(date, dumpName, fileType, fileExt, chunks, temp, checkpoint = False) |
| 915 | + |
| 916 | + |
| 917 | +class DumpFilename(object): |
| 918 | + """ |
| 919 | + filename without directory name, and the methods that go with it, |
| 920 | + primarily for filenames that follow the standard naming convention, i.e. |
| 921 | + projectname-date-dumpName.sql/xml.gz/bz2/7z (possibly with a chunk |
| 922 | + number, possibly with start/end page id information embedded in the name). |
| 923 | + |
| 924 | + Constructor: |
| 925 | + DumpFilename(dumpName, date = None, filetype, ext, chunk = None, checkpoint = None, temp = False) -- pass in dumpName and |
| 926 | + filetype/extension at least. filetype is one of xml sql, extension is one of |
| 927 | + bz2/gz/7z. Or you can pass in the entire string without project name and date, |
| 928 | + e.g. pages-meta-history5.xml.bz2 |
| 929 | + If dumpName is not passed, no member variables will be initialized, and |
| 930 | + the caller is expected to invoke newFromFilename as an alternate |
| 931 | + constructor before doing anything else with the object. |
| 932 | + |
| 933 | + newFromFilename(filename) -- pass in full filename. This is called by the regular constructor and is |
| 934 | + what sets all attributes |
| 935 | + |
| 936 | + attributes: |
| 937 | + |
| 938 | + isCheckpointFile filename of form dbname-date-dumpName-pxxxxpxxxx.xml.bz2 |
| 939 | + isChunkFile filename of form dbname-date-dumpNamex.xml.gz/bz2/7z |
| 940 | + isTempFile filename of form dbname-date-dumpName.xml.gz/bz2/7z-tmp |
| 941 | + firstPageID for checkpoint files, taken from value in filename |
| 942 | + lastPageID for checkpoint files, value taken from filename |
| 943 | + filename full filename |
| 944 | + basename part of the filename after the project name and date (for |
| 945 | + "enwiki-20110722-pages-meta-history12.xml.bz2" this would be |
| 946 | + "pages-meta-history12.xml.bz2") |
| 947 | + fileExt extension (everything after the last ".") of the file |
| 948 | + date date embedded in filename |
| 949 | + dumpName dump name embedded in filename (eg "pages-meta-history"), if any |
| 950 | + chunk chunk number of file as string (for "pages-meta-history5.xml.bz2" this would be "5") |
| 951 | + chinkInt chunk number as int |
| 952 | + """ |
| 953 | + |
| 954 | + def __init__(self, wiki, date = None, dumpName = None, filetype = None, ext = None, chunk = None, checkpoint = None, temp = False): |
| 955 | + """Constructor. Arguments: the dump name as it should appear in the filename, |
| 956 | + the date if different than the date of the dump run, the chunk number |
| 957 | + if there is one, and temp which is true if this is a temp file (ending in "-tmp") |
| 958 | + Alternatively, one can leave off all other other stuff and just pass the entire |
| 959 | + filename minus the dbname and the date. Returns true on success, false otherwise..""" |
| 960 | + self.wiki = wiki |
| 961 | + # if dumpName is not set, the caller can call newFromFilename to initialize various values instead |
| 962 | + if dumpName: |
| 963 | + filename = self.newFilename(dumpName, filetype, ext, date, chunk, checkpoint, temp) |
| 964 | + self.newFromFilename(filename) |
| 965 | + |
| 966 | + def isExt(self,ext): |
| 967 | + if ext == "gz" or ext == "bz2" or ext == "7z" or ext == "html" or ext == "txt": |
| 968 | + return True |
| 969 | + else: |
| 970 | + return False |
| 971 | + |
| 972 | + # returns True if successful, False otherwise (filename is not in the canonical form that we manage) |
| 973 | + def newFromFilename(self, filename): |
| 974 | + """Constructor. Arguments: the full file name including the chunk, the extension, etc BUT NOT the dir name. """ |
| 975 | + self.filename = filename |
| 976 | + |
| 977 | + # example filenames: |
| 978 | + # elwikidb-20110729-all-titles-in-ns0.gz |
| 979 | + # elwikidb-20110729-abstract.xml |
| 980 | + # elwikidb-20110727-pages-meta-history2.xml-p000048534p000051561.bz2 |
| 981 | + |
| 982 | + # we need to handle cases without the projectname-date stuff in them too, as this gets used |
| 983 | + # for all files now |
| 984 | + if self.filename.endswith("-tmp"): |
| 985 | + self.isTempFile = True |
| 986 | + self.temp = "-tmp" |
| 987 | + else: |
| 988 | + self.isTempFile = False |
| 989 | + self.temp = None |
| 990 | + |
| 991 | + if ('.' in self.filename): |
| 992 | + (fileBase, self.fileExt) = self.filename.rsplit('.',1) |
| 993 | + if (self.temp): |
| 994 | + self.fileExt = self.fileExt[:-4]; |
| 995 | + else: |
| 996 | + self.dbName = None |
| 997 | + self.date = None |
| 998 | + self.dumpName = None |
| 999 | + self.filePrefix = "" |
| 1000 | + self.filePrefixLength = 0 |
| 1001 | + self.isChunkFile = False |
| 1002 | + self.isCheckpointFile = False |
| 1003 | + self.checkpoint = None |
| 1004 | + self.firstPageID = None |
| 1005 | + self.lastPageID = None |
| 1006 | + self.isTempFile = False |
| 1007 | + self.fileExt = None |
| 1008 | + self.fileType = None |
| 1009 | + return False |
| 1010 | + |
| 1011 | + # FIXME could have -tmp at the end, when do we look for that?? |
| 1012 | + |
| 1013 | + if not self.isExt(self.fileExt): |
| 1014 | + self.fileType = self.fileExt |
| 1015 | +# self.fileExt = None |
| 1016 | + self.fileExt = "" |
| 1017 | + else: |
| 1018 | + if '.' in fileBase: |
| 1019 | + (fileBase, self.fileType) = fileBase.split('.',1) |
| 1020 | + |
| 1021 | + # some files are not of this form, we skip them |
| 1022 | + if not '-' in fileBase: |
| 1023 | + self.dbName = None |
| 1024 | + self.date = None |
| 1025 | + self.dumpName = None |
| 1026 | + self.filePrefix = "" |
| 1027 | + self.filePrefixLength = 0 |
| 1028 | + self.isChunkFile = False |
| 1029 | + self.isCheckpointFile = False |
| 1030 | + self.checkpoint = None |
| 1031 | + self.firstPageID = None |
| 1032 | + self.lastPageID = None |
| 1033 | + self.isTempFile = False |
| 1034 | + self.temp = None |
| 1035 | + return False |
| 1036 | + |
| 1037 | + (self.dbName, self.date, self.dumpName) = fileBase.split('-',2) |
| 1038 | + if not self.date or not self.dumpName: |
| 1039 | + self.dbName = None |
| 1040 | + self.date = None |
| 1041 | + self.dumpName = fileBase |
| 1042 | + self.filePrefix = "" |
| 1043 | + self.filePrefixLength = 0 |
| 1044 | + else: |
| 1045 | + self.filePrefix = "%s-%s-" % (self.dbName, self.date) |
| 1046 | + self.filePrefixLength = len(self.filePrefix) |
| 1047 | + |
| 1048 | + if self.filename.startswith(self.filePrefix): |
| 1049 | + self.basename = self.filename[self.filePrefixLength:] |
| 1050 | + else: |
| 1051 | + self.basename = None |
| 1052 | + |
| 1053 | + self.checkpointPattern = "-p(?P<first>[0-9]+)p(?P<last>[0-9]+)\." + self.fileExt + "$" |
| 1054 | + self.compiledCheckpointPattern = re.compile(self.checkpointPattern) |
| 1055 | + result = self.compiledCheckpointPattern.search(self.filename) |
| 1056 | + |
| 1057 | + if result: |
| 1058 | + self.isCheckpointFile = True |
| 1059 | + self.firstPageID = result.group('first') |
| 1060 | + self.lastPageID = result.group('last') |
| 1061 | + self.checkpoint = "p" + self.firstPageID + "p" + self.lastPageID |
| 1062 | + if self.fileType and self.fileType.endswith("-" + self.checkpoint): |
| 1063 | + self.fileType = self.fileType[:-1 * ( len(self.checkpoint) + 1 ) ] |
| 1064 | + else: |
| 1065 | + self.isCheckpointFile = False |
| 1066 | + self.checkpoint = None |
| 1067 | + self.firstPageID = None |
| 1068 | + self.lastPageID = None |
| 1069 | + |
| 1070 | + self.chunkPattern = "(?P<chunk>[0-9]+)$" |
| 1071 | + self.compiledChunkPattern = re.compile(self.chunkPattern) |
| 1072 | + result = self.compiledChunkPattern.search(self.dumpName) |
| 1073 | + if result: |
| 1074 | + self.isChunkFile = True |
| 1075 | + self.chunk = result.group('chunk') |
| 1076 | + self.chunkInt = int(self.chunk) |
| 1077 | + # the dumpName has the chunk in it so lose it |
| 1078 | + self.dumpName = self.dumpName.rstrip('0123456789') |
| 1079 | + else: |
| 1080 | + self.isChunkFile = False |
| 1081 | + self.chunk = None |
| 1082 | + self.chunkInt = 0 |
| 1083 | + |
| 1084 | + return True |
| 1085 | + |
| 1086 | + def newFilename(self, dumpName, filetype, ext, date = None, chunk = None, checkpoint = None, temp = None): |
| 1087 | + if not chunk: |
| 1088 | + chunk = "" |
| 1089 | + if not date: |
| 1090 | + date = self.wiki.date |
| 1091 | + # fixme do the right thing in case no filetype or no ext |
| 1092 | + parts = [] |
| 1093 | + parts.append(self.wiki.dbName + "-" + date + "-" + dumpName + "%s" % chunk) |
| 1094 | + if checkpoint: |
| 1095 | + filetype = filetype + "-" + checkpoint |
| 1096 | + if filetype: |
| 1097 | + parts.append(filetype) |
| 1098 | + if ext: |
| 1099 | + parts.append(ext) |
| 1100 | + filename = ".".join(parts) |
| 1101 | + if temp: |
| 1102 | + filename = filename + "-tmp" |
| 1103 | + return filename |
| 1104 | + |
| 1105 | +class DumpFile(file): |
| 1106 | + """File containing output created by any job of a jump run. This includes |
| 1107 | + any file that follows the standard naming convention, i.e. |
| 1108 | + projectname-date-dumpName.sql/xml.gz/bz2/7z (possibly with a chunk |
| 1109 | + number, possibly with start/end page id information embedded in the name). |
| 1110 | + |
| 1111 | + Methods: |
| 1112 | + |
| 1113 | + md5Sum(): return md5sum of the file contents. |
| 1114 | + checkIfTruncated(): for compressed files, check if the file is truncated (stops |
| 1115 | + abruptly before the end of the compressed data) or not, and set and return |
| 1116 | + self.isTruncated accordingly. This is fast for bzip2 files |
| 1117 | + and slow for gz and 7z fles, since for the latter two types it must serially |
| 1118 | + read through the file to determine if it is truncated or not. |
| 1119 | + getSize(): returns the current size of the file in bytes |
| 1120 | + rename(newname): rename the file. Arguments: the new name of the file without |
| 1121 | + the directory. |
| 1122 | + findFirstPageIDInFile(): set self.firstPageID by examining the file contents, |
| 1123 | + returning the value, or None if there is no pageID. We uncompress the file |
| 1124 | + if needed and look through the first 500 lines. |
| 1125 | + |
| 1126 | + plus the usual file methods (read, write, open, close) |
| 1127 | + |
| 1128 | + useful variables: |
| 1129 | + |
| 1130 | + firstPageID Determined by examining the first few hundred lines of the contents, |
| 1131 | + looking for page and id tags, wihout other tags in between. (hmm) |
| 1132 | + filename full filename with directory |
| 1133 | + """ |
| 1134 | + def __init__(self, wiki, filename, fileObj = None): |
| 1135 | + """takes full filename including path""" |
| 1136 | + self._wiki = wiki |
| 1137 | + self.filename = filename |
| 1138 | + self.firstLines = None |
| 1139 | + self.isTruncated = None |
| 1140 | + self.firstPageID = None |
| 1141 | + self.dirname = os.path.dirname(filename) |
| 1142 | + if fileObj: |
| 1143 | + self.fileObj = fileObj |
| 1144 | + else: |
| 1145 | + self.fileObj = DumpFilename(wiki) |
| 1146 | + self.fileObj.newFromFilename(os.path.basename(filename)) |
| 1147 | + |
| 1148 | + def md5Sum(self): |
| 1149 | + if not self.filename: |
| 1150 | + return None |
772 | 1151 | summer = hashlib.md5() |
773 | | - infile = file(filename, "rb") |
| 1152 | + infile = file(self.filename, "rb") |
774 | 1153 | bufsize = 4192 * 32 |
775 | 1154 | buffer = infile.read(bufsize) |
776 | 1155 | while buffer: |
— | — | @@ -778,61 +1157,133 @@ |
779 | 1158 | infile.close() |
780 | 1159 | return summer.hexdigest() |
781 | 1160 | |
782 | | - def _md5FileLine(self, filename): |
783 | | - return "%s %s\n" % (self._md5File(filename), os.path.basename(filename)) |
| 1161 | + def getFirst500Lines(self): |
| 1162 | + if self.firstLines: |
| 1163 | + return(self.firstLines) |
784 | 1164 | |
785 | | - def _saveChecksum(self, file, output, runner): |
786 | | - runner.debug("Checksumming %s" % file) |
787 | | - path = self.dumpDir.publicPath(file) |
788 | | - if os.path.exists(path): |
789 | | - checksum = self._md5FileLine(path) |
790 | | - output.write(checksum) |
| 1165 | + if not self.filename or not exists(self.filename): |
| 1166 | + return None |
791 | 1167 | |
792 | | - def _getMd5FileDirName(self): |
793 | | - return os.path.join(self.wiki.publicDir(), self.wiki.date); |
| 1168 | + pipeline = self.setupUncompressionCommand() |
794 | 1169 | |
795 | | -class DumpDir(object): |
796 | | - def __init__(self, wiki, dbName, date): |
797 | | - self._wiki = wiki |
798 | | - self._dbName = dbName |
799 | | - self._date = date |
| 1170 | + if (not exists( self._wiki.config.head ) ): |
| 1171 | + raise BackupError("head command %s not found" % self._wiki.config.head) |
| 1172 | + head = self._wiki.config.head |
| 1173 | + headEsc = MiscUtils.shellEscape(head) |
| 1174 | + pipeline.append([ head, "-500" ]) |
| 1175 | + # without shell |
| 1176 | + p = CommandPipeline(pipeline, quiet=True) |
| 1177 | + p.runPipelineAndGetOutput() |
| 1178 | + self.firstLines = p.output() |
| 1179 | + return(self.firstLines) |
800 | 1180 | |
801 | | - def buildDir(self, base, version): |
802 | | - return join(base, version) |
| 1181 | + # unused |
| 1182 | + # xml, sql, text |
| 1183 | + def determineFileContentsType(self): |
| 1184 | + output = self.getFirst500Lines() |
| 1185 | + if (output): |
| 1186 | + pageData = output |
| 1187 | + if (pageData.startswith('<mediawiki')): |
| 1188 | + return('xml') |
| 1189 | + if (pageData.startswith('-- MySQL dump')): |
| 1190 | + return('sql') |
| 1191 | + return('txt') |
| 1192 | + return(None) |
803 | 1193 | |
804 | | - def buildPath(self, base, version, filename): |
805 | | - return join(base, version, "%s-%s-%s" % (self._dbName, version, filename)) |
| 1194 | + def setupUncompressionCommand(self): |
| 1195 | + if not self.filename or not exists(self.filename): |
| 1196 | + return None |
| 1197 | + pipeline = [] |
| 1198 | + if self.fileObj.fileExt == 'bz2': |
| 1199 | + command = [ self._wiki.config.bzip2, '-dc' ] |
| 1200 | + elif self.fileObj.fileExt == 'gz': |
| 1201 | + command = [ self._wiki.config.gzip, '-dc' ] |
| 1202 | + elif self.fileObj.fileExt == '7z': |
| 1203 | + command = [ self._wiki.config.sevenzip, "e", "-so" ] |
| 1204 | + else: |
| 1205 | + command = [ self._wiki.config.cat ] |
806 | 1206 | |
807 | | - def privatePath(self, filename): |
808 | | - """Take a given filename in the private dump dir for the selected database.""" |
809 | | - return self.buildPath(self._wiki.privateDir(), self._date, filename) |
| 1207 | + if (not exists( command[0] ) ): |
| 1208 | + raise BackupError( "command %s to uncompress/read file not found" % command[0] ) |
| 1209 | + command.append( self.filename ) |
| 1210 | + pipeline.append(command) |
| 1211 | + return(pipeline) |
810 | 1212 | |
811 | | - def publicPath(self, filename): |
812 | | - """Take a given filename in the public dump dir for the selected database. |
813 | | - If this database is marked as private, will use the private dir instead. |
814 | | - """ |
815 | | - return self.buildPath(self._wiki.publicDir(), self._date, filename) |
| 1213 | + # unused |
| 1214 | + # return its first and last page ids from name or from contents, depending |
| 1215 | + # return its date |
| 1216 | + |
| 1217 | + # fixme what happens if this is not an xml dump? errr. must detect and bail immediately? |
| 1218 | + # maybe instead of all that we should just open the file ourselves, read a few lines... oh. |
| 1219 | + # right. stupid compressed files. um.... do we have stream wrappers? no. this is python |
| 1220 | + # what's the easy was to read *some* compressed data into a buffer? |
| 1221 | + def findFirstPageIDInFile(self): |
| 1222 | + if (self.firstPageID): |
| 1223 | + return(self.firstPageID) |
| 1224 | + output = self.getFirst500Lines() |
| 1225 | + if (output): |
| 1226 | + pageData = output |
| 1227 | + titleAndIDPattern = re.compile('<title>(?P<title>.+?)</title>\s*' + '<id>(?P<pageid>\d+?)</id>') |
| 1228 | + result = titleAndIDPattern.search(pageData) |
| 1229 | + if (result): |
| 1230 | + self.firstPageID = result.group('pageid') |
| 1231 | + return(self.firstPageID) |
816 | 1232 | |
817 | | - def latestDir(self): |
818 | | - return self.buildDir(self._wiki.publicDir(), "latest") |
| 1233 | + def checkIfTruncated(self): |
| 1234 | + if self.isTruncated: |
| 1235 | + return self.isTruncated |
| 1236 | + if self.fileObj.fileExt == "bz2": |
| 1237 | + if (not exists( self._wiki.config.checkforbz2footer ) ): |
| 1238 | + raise BackupError("checkforbz2footer command %s not found" % runner.wiki.config.checkforbz2footer) |
| 1239 | + checkforbz2footer = self._wiki.config.checkforbz2footer |
| 1240 | + pipeline = [] |
| 1241 | + pipeline.append([ checkforbz2footer, self.filename ]) |
| 1242 | + p = CommandPipeline(pipeline, quiet=True) |
| 1243 | + p.runPipelineAndGetOutput() |
| 1244 | + if not p.exitedSuccessfully(): |
| 1245 | + self.isTruncated = True |
| 1246 | + else: |
| 1247 | + self.isTruncated = False |
| 1248 | + else: |
| 1249 | + if self.fileObj.fileExt == 'gz': |
| 1250 | + command = [ "%s -dc %s > /dev/null" % (self._wiki.config.gzip, self.filename ) ] |
| 1251 | + elif self.fileObj.fileExt == '7z': |
| 1252 | + command = [ "%s e -so %s > /dev/null" % (self._wiki.config.sevenzip, self.filename ) ] |
| 1253 | + else: |
| 1254 | + # we do't know how to handle this type of file. |
| 1255 | + return self.isTruncated |
| 1256 | + p = CommandPipeline(pipeline, quiet=True) |
| 1257 | + p.runPipelineAndGetOutput() |
| 1258 | + if not p.exitedSuccessfully(): |
| 1259 | + self.isTruncated = False |
| 1260 | + else: |
| 1261 | + self.isTruncated = True |
819 | 1262 | |
820 | | - def latestPath(self, filename): |
821 | | - return self.buildPath(self._wiki.publicDir(), "latest", filename) |
| 1263 | + return self.isTruncated |
822 | 1264 | |
823 | | - def webPath(self, filename): |
824 | | - return self.buildPath(self._wiki.webDir(), self._date, filename) |
825 | | - |
| 1265 | + def getSize(self): |
| 1266 | + if (exists(self.filename)): |
| 1267 | + return os.path.getsize(self.filename) |
| 1268 | + else: |
| 1269 | + return None |
| 1270 | + |
| 1271 | + def rename(self, newname): |
| 1272 | + try: |
| 1273 | + os.rename(self.filename, os.path.join(self.dirname,newname)) |
| 1274 | + except: |
| 1275 | + raise BackupError("failed to rename file %s" % self.filename) |
| 1276 | + |
| 1277 | + self.filename = os.path.join(self.dirname,newname) |
| 1278 | + |
826 | 1279 | # everything that has to do with reporting the status of a piece |
827 | 1280 | # of a dump is collected here |
828 | 1281 | class Status(object): |
829 | | - def __init__(self, wiki, dumpDir, date, items, checksums, enabled, noticeFile = None, errorCallback=None): |
| 1282 | + def __init__(self, wiki, dumpDir, items, checksums, enabled, noticeFile = None, errorCallback=None): |
830 | 1283 | self.wiki = wiki |
831 | 1284 | self.dbName = wiki.dbName |
832 | 1285 | self.dumpDir = dumpDir |
833 | 1286 | self.items = items |
834 | 1287 | self.checksums = checksums |
835 | | - self.date = date |
836 | | - # this is just a glorified name for "give me a logging facility" |
837 | 1288 | self.noticeFile = noticeFile |
838 | 1289 | self.errorCallback = errorCallback |
839 | 1290 | self.failCount = 0 |
— | — | @@ -848,24 +1299,28 @@ |
849 | 1300 | subject = "Dump failure for " + self.dbName |
850 | 1301 | message = self.wiki.config.readTemplate("errormail.txt") % { |
851 | 1302 | "db": self.dbName, |
852 | | - "date": self.date, |
| 1303 | + "date": self.wiki.date, |
853 | 1304 | "time": TimeUtils.prettyTime(), |
854 | | - "url": "/".join((self.wiki.config.webRoot, self.dbName, self.date, ''))} |
| 1305 | + "url": "/".join((self.wiki.config.webRoot, self.dbName, self.wiki.date, ''))} |
855 | 1306 | self.wiki.config.mail(subject, message) |
856 | 1307 | |
857 | | - # this is a per-dump-item report (well per file generated by the item) |
| 1308 | + # this is a per-dump-item report (well, per file generated by the item) |
858 | 1309 | # Report on the file size & item status of the current output and output a link if we are done |
859 | | - def reportFile(self, file, itemStatus): |
860 | | - filepath = self.dumpDir.publicPath(file) |
861 | | - if itemStatus == "in-progress" and exists (filepath): |
862 | | - size = FileUtils.prettySize(getsize(filepath)) |
863 | | - return "<li class='file'>%s %s (written) </li>" % (file, size) |
864 | | - elif itemStatus == "done" and exists(filepath): |
865 | | - size = FileUtils.prettySize(getsize(filepath)) |
866 | | - webpath = self.dumpDir.webPath(file) |
867 | | - return "<li class='file'><a href=\"%s\">%s</a> %s</li>" % (webpath, file, size) |
| 1310 | + def reportFile(self, fileObj, itemStatus): |
| 1311 | + filename = self.dumpDir.filenamePublicPath(fileObj) |
| 1312 | + if (exists(filename)): |
| 1313 | + size = os.path.getsize(filename) |
868 | 1314 | else: |
869 | | - return "<li class='missing'>%s</li>" % file |
| 1315 | + itemStatus = "missing" |
| 1316 | + size = 0 |
| 1317 | + size = FileUtils.prettySize(size) |
| 1318 | + if itemStatus == "in-progress": |
| 1319 | + return "<li class='file'>%s %s (written) </li>" % (fileObj.filename, size) |
| 1320 | + elif itemStatus == "done": |
| 1321 | + webpath = self.dumpDir.webPath(fileObj) |
| 1322 | + return "<li class='file'><a href=\"%s\">%s</a> %s</li>" % (webpath, fileObj.filename, size) |
| 1323 | + else: |
| 1324 | + return "<li class='missing'>%s</li>" % fileObj.filename |
870 | 1325 | |
871 | 1326 | # |
872 | 1327 | # functions internal to the class |
— | — | @@ -902,14 +1357,15 @@ |
903 | 1358 | statusItems = [self._reportItem(item) for item in self.items] |
904 | 1359 | statusItems.reverse() |
905 | 1360 | html = "\n".join(statusItems) |
| 1361 | + f = DumpFilename(self.wiki, None, self.checksums.getChecksumFileNameBasename()) |
906 | 1362 | return self.wiki.config.readTemplate("report.html") % { |
907 | 1363 | "db": self.dbName, |
908 | | - "date": self.date, |
| 1364 | + "date": self.wiki.date, |
909 | 1365 | "notice": self.noticeFile.notice, |
910 | 1366 | "status": self._reportStatusSummaryLine(done), |
911 | 1367 | "previous": self._reportPreviousDump(done), |
912 | 1368 | "items": html, |
913 | | - "checksum": self.dumpDir.webPath(self.checksums.getChecksumFileNameBasename()), |
| 1369 | + "checksum": self.dumpDir.webPath(f), |
914 | 1370 | "index": self.wiki.config.index} |
915 | 1371 | |
916 | 1372 | def _reportPreviousDump(self, done): |
— | — | @@ -920,7 +1376,7 @@ |
921 | 1377 | # starting at the beginning to get the new abstracts and stubs). |
922 | 1378 | try: |
923 | 1379 | dumpsInOrder = self.wiki.latestDump(all=True) |
924 | | - meIndex = dumpsInOrder.index(self.date) |
| 1380 | + meIndex = dumpsInOrder.index(self.wiki.date) |
925 | 1381 | # don't wrap around to the newest dump in the list! |
926 | 1382 | if (meIndex > 0): |
927 | 1383 | rawDate = dumpsInOrder[meIndex-1] |
— | — | @@ -964,9 +1420,9 @@ |
965 | 1421 | html = "<li class='%s'><span class='updates'>%s</span> <span class='status'>%s</span> <span class='title'>%s</span>" % (item.status(), item.updated(), item.status(), item.description()) |
966 | 1422 | if item.progress: |
967 | 1423 | html += "<div class='progress'>%s</div>\n" % item.progress |
968 | | - files = item.listOutputFiles(self) |
969 | | - if files: |
970 | | - listItems = [self.reportFile(file, item.status()) for file in files] |
| 1424 | + fileObjs = item.listOutputFilesToPublish(self.dumpDir) |
| 1425 | + if fileObjs: |
| 1426 | + listItems = [self.reportFile(fileObj, item.status()) for fileObj in fileObjs] |
971 | 1427 | html += "<ul>" |
972 | 1428 | detail = item.detail() |
973 | 1429 | if detail: |
— | — | @@ -977,9 +1433,8 @@ |
978 | 1434 | return html |
979 | 1435 | |
980 | 1436 | class NoticeFile(object): |
981 | | - def __init__(self, wiki, date, notice, enabled): |
| 1437 | + def __init__(self, wiki, notice, enabled): |
982 | 1438 | self.wiki = wiki |
983 | | - self.date = date |
984 | 1439 | self.notice = notice |
985 | 1440 | self._enabled = enabled |
986 | 1441 | self.writeNoticeFile() |
— | — | @@ -995,7 +1450,7 @@ |
996 | 1451 | # addnotice, stuff notice in a file for other jobs etc |
997 | 1452 | elif self.notice != "": |
998 | 1453 | noticeDir = self._getNoticeDir() |
999 | | - FileUtils.writeFile(noticeDir, noticeFile, self.notice, self.wiki.config.fileperms) |
| 1454 | + FileUtils.writeFile(self.wiki.config.tempDir, noticeFile, self.notice, self.wiki.config.fileperms) |
1000 | 1455 | # default case. if there is a file get the contents, otherwise |
1001 | 1456 | # we have empty contents, all good |
1002 | 1457 | else: |
— | — | @@ -1015,13 +1470,13 @@ |
1016 | 1471 | # functions internal to class |
1017 | 1472 | # |
1018 | 1473 | def _getNoticeFilename(self): |
1019 | | - return os.path.join(self.wiki.publicDir(), self.date, "notice.txt") |
| 1474 | + return os.path.join(self.wiki.publicDir(), self.wiki.date, "notice.txt") |
1020 | 1475 | |
1021 | 1476 | def _getNoticeDir(self): |
1022 | | - return os.path.join(self.wiki.publicDir(), self.date); |
| 1477 | + return os.path.join(self.wiki.publicDir(), self.wiki.date) |
1023 | 1478 | |
1024 | 1479 | class Runner(object): |
1025 | | - def __init__(self, wiki, date=None, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False): |
| 1480 | + def __init__(self, wiki, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False): |
1026 | 1481 | self.wiki = wiki |
1027 | 1482 | self.dbName = wiki.dbName |
1028 | 1483 | self.prefetch = prefetch |
— | — | @@ -1042,7 +1497,7 @@ |
1043 | 1498 | self._noticeFileEnabled = True |
1044 | 1499 | self._makeDirEnabled = True |
1045 | 1500 | self._cleanOldDumpsEnabled = True |
1046 | | - self._cleanupOldFilesEnabled = False |
| 1501 | + self._cleanupOldFilesEnabled = True |
1047 | 1502 | self._checkForTruncatedFilesEnabled = True |
1048 | 1503 | |
1049 | 1504 | if self.dryrun or self._chunkToDo: |
— | — | @@ -1054,40 +1509,35 @@ |
1055 | 1510 | self._noticeFileEnabled = False |
1056 | 1511 | self._makeDirEnabled = False |
1057 | 1512 | self._cleanOldDumpsEnabled = False |
1058 | | - self._cleanupOldFilesEnables = False |
1059 | 1513 | |
1060 | 1514 | if self.dryrun: |
1061 | 1515 | self._loggingEnabled = False |
1062 | 1516 | self._checkForTruncatedFilesEnabled = False |
| 1517 | + self._cleanupOldFilesEnabled = False |
1063 | 1518 | |
1064 | | - if date: |
1065 | | - # Override, continuing a past dump? |
1066 | | - self.date = date |
1067 | | - else: |
1068 | | - self.date = TimeUtils.today() |
1069 | | - wiki.setDate(self.date) |
1070 | | - |
1071 | 1519 | self.jobRequested = job |
1072 | 1520 | self.dbServerInfo = DbServerInfo(self.wiki, self.dbName, self.logAndPrint) |
1073 | | - self.dumpDir = DumpDir(self.wiki, self.dbName, self.date) |
| 1521 | + self.dumpDir = DumpDir(self.wiki, self.dbName) |
1074 | 1522 | |
1075 | 1523 | self.lastFailed = False |
1076 | 1524 | |
1077 | 1525 | # these must come after the dumpdir setup so we know which directory we are in |
1078 | 1526 | if (self._loggingEnabled and self._makeDirEnabled): |
1079 | | - self.logFileName = self.dumpDir.publicPath(self.wiki.config.logFile) |
1080 | | - self.makeDir(join(self.wiki.publicDir(), self.date)) |
| 1527 | + fileObj = DumpFilename(self.wiki) |
| 1528 | + fileObj.newFromfilename(self.wiki.config.logFile) |
| 1529 | + self.logFileName = self.dumpDir.filenamePublicPath(fileObj) |
| 1530 | + self.makeDir(os.path.join(self.wiki.publicDir(), self.wiki.date)) |
1081 | 1531 | self.log = Logger(self.logFileName) |
1082 | 1532 | thread.start_new_thread(self.logQueueReader,(self.log,)) |
1083 | 1533 | self.runInfoFile = RunInfoFile(wiki,self._runInfoFileEnabled) |
1084 | | - self.symLinks = SymLinks(self.wiki, self.dumpDir, self. date, self.logAndPrint, self.debug, self._symLinksEnabled) |
| 1534 | + self.symLinks = SymLinks(self.wiki, self.dumpDir, self.logAndPrint, self.debug, self._symLinksEnabled) |
1085 | 1535 | self.feeds = Feeds(self.wiki,self.dumpDir, self.dbName, self.debug, self._feedsEnabled) |
1086 | | - self.htmlNoticeFile = NoticeFile(self.wiki, self.date, notice, self._noticeFileEnabled) |
| 1536 | + self.htmlNoticeFile = NoticeFile(self.wiki, notice, self._noticeFileEnabled) |
1087 | 1537 | self.checksums = Checksummer(self.wiki, self.dumpDir, self._checksummerEnabled) |
1088 | 1538 | |
1089 | 1539 | # some or all of these dumpItems will be marked to run |
1090 | | - self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self.date, self._chunkToDo, self.jobRequested, self.chunkInfo, self.runInfoFile); |
1091 | | - self.status = Status(self.wiki, self.dumpDir, self.date, self.dumpItemList.dumpItems, self.checksums, self._statusEnabled, self.htmlNoticeFile, self.logAndPrint) |
| 1540 | + self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self._chunkToDo, self.jobRequested, self.chunkInfo, self.runInfoFile, self.dumpDir) |
| 1541 | + self.status = Status(self.wiki, self.dumpDir, self.dumpItemList.dumpItems, self.checksums, self._statusEnabled, self.htmlNoticeFile, self.logAndPrint) |
1092 | 1542 | |
1093 | 1543 | def logQueueReader(self,log): |
1094 | 1544 | if not log: |
— | — | @@ -1107,25 +1557,7 @@ |
1108 | 1558 | else: |
1109 | 1559 | return "" |
1110 | 1560 | |
1111 | | - def removeFile(self, filename): |
1112 | | - os.remove(filename) |
1113 | | - |
1114 | 1561 | # returns 0 on success, 1 on error |
1115 | | - def saveTable(self, table, outfile): |
1116 | | - """Dump a table from the current DB with mysqldump, save to a gzipped sql file.""" |
1117 | | - if (not exists( self.wiki.config.gzip ) ): |
1118 | | - raise BackupError("gzip command %s not found" % self.wiki.config.gzip); |
1119 | | - commands = self.dbServerInfo.buildSqlDumpCommand(table, self.wiki.config.gzip) |
1120 | | - return self.saveCommand(commands, outfile) |
1121 | | - |
1122 | | - def saveSql(self, query, outfile): |
1123 | | - """Pass some SQL commands to the server for this DB and save output to a gzipped file.""" |
1124 | | - if (not exists( self.wiki.config.gzip ) ): |
1125 | | - raise BackupError("gzip command %s not found" % self.wiki.config.gzip); |
1126 | | - command = self.dbServerInfo.buildSqlCommand(query, self.wiki.config.gzip) |
1127 | | - return self.saveCommand(command, outfile) |
1128 | | - |
1129 | | - # returns 0 on success, 1 on error |
1130 | 1562 | def saveCommand(self, commands, outfile): |
1131 | 1563 | """For one pipeline of commands, redirect output to a given file.""" |
1132 | 1564 | commands[-1].extend( [ ">" , outfile ] ) |
— | — | @@ -1177,12 +1609,10 @@ |
1178 | 1610 | for cmd in problemCommands: |
1179 | 1611 | errorString = errorString + "%s " % cmd |
1180 | 1612 | self.logAndPrint(errorString) |
1181 | | - # raise BackupError(errorString) |
1182 | | - return 1 |
| 1613 | + return 1 |
1183 | 1614 | |
1184 | 1615 | def debug(self, stuff): |
1185 | 1616 | self.logAndPrint("%s: %s %s" % (TimeUtils.prettyTime(), self.dbName, stuff)) |
1186 | | -# print "%s: %s %s" % (MiscUtils.prettyTime(), self.dbName, stuff) |
1187 | 1617 | |
1188 | 1618 | def runHandleFailure(self): |
1189 | 1619 | if self.status.failCount < 1: |
— | — | @@ -1192,15 +1622,17 @@ |
1193 | 1623 | self.lastFailed = True |
1194 | 1624 | |
1195 | 1625 | def runUpdateItemFileInfo(self, item): |
1196 | | - for f in item.listOutputFiles(self): |
1197 | | - print f |
1198 | | - if exists(self.dumpDir.publicPath(f)): |
| 1626 | + # this will include checkpoint files if they are enabled. |
| 1627 | + for fileObj in item.listOutputFilesToPublish(self.dumpDir): |
| 1628 | + if exists(self.dumpDir.filenamePublicPath(fileObj)): |
1199 | 1629 | # why would the file not exist? because we changed chunk numbers in the |
1200 | 1630 | # middle of a run, and now we list more files for the next stage than there |
1201 | 1631 | # were for earlier ones |
1202 | | - self.symLinks.saveSymlink(f) |
1203 | | - self.feeds.saveFeed(f) |
1204 | | - self.checksums.checksum(f, self) |
| 1632 | + self.symLinks.saveSymlink(fileObj) |
| 1633 | + self.feeds.saveFeed(fileObj) |
| 1634 | + self.checksums.checksum(fileObj, self) |
| 1635 | + self.symLinks.cleanupSymLinks() |
| 1636 | + self.feeds.cleanupFeeds() |
1205 | 1637 | |
1206 | 1638 | def run(self): |
1207 | 1639 | if (self.jobRequested): |
— | — | @@ -1232,8 +1664,8 @@ |
1233 | 1665 | # mark all the following jobs to run as well |
1234 | 1666 | self.dumpItemList.markFollowingJobsToRun() |
1235 | 1667 | |
1236 | | - self.makeDir(join(self.wiki.publicDir(), self.date)) |
1237 | | - self.makeDir(join(self.wiki.privateDir(), self.date)) |
| 1668 | + self.makeDir(os.path.join(self.wiki.publicDir(), self.wiki.date)) |
| 1669 | + self.makeDir(os.path.join(self.wiki.privateDir(), self.wiki.date)) |
1238 | 1670 | |
1239 | 1671 | if (self.restart): |
1240 | 1672 | self.logAndPrint("Preparing for restart from job %s of %s" % (self.jobRequested, self.dbName)) |
— | — | @@ -1309,7 +1741,7 @@ |
1310 | 1742 | if self._cleanOldDumpsEnabled: |
1311 | 1743 | old = self.wiki.dumpDirs() |
1312 | 1744 | if old: |
1313 | | - if old[-1] == self.date: |
| 1745 | + if old[-1] == self.wiki.date: |
1314 | 1746 | # If we're re-running today's (or jobs from a given day's) dump, don't count it as one |
1315 | 1747 | # of the old dumps to keep... or delete it halfway through! |
1316 | 1748 | old = old[:-1] |
— | — | @@ -1336,7 +1768,10 @@ |
1337 | 1769 | # will have accurate checksums for the run for which it was |
1338 | 1770 | # produced, but not the other files. FIXME |
1339 | 1771 | self.checksums.moveMd5FileIntoPlace() |
1340 | | - self.symLinks.saveSymlink(self.checksums.getChecksumFileNameBasename()) |
| 1772 | + dumpFile = DumpFilename(self.wiki, None, self.checksums.getChecksumFileNameBasename()) |
| 1773 | + self.symLinks.saveSymlink(dumpFile) |
| 1774 | + self.symLinks.cleanupSymLinks() |
| 1775 | + self.feeds.cleanupFeeds() |
1341 | 1776 | |
1342 | 1777 | def makeDir(self, dir): |
1343 | 1778 | if self._makeDirEnabled: |
— | — | @@ -1347,10 +1782,9 @@ |
1348 | 1783 | os.makedirs(dir) |
1349 | 1784 | |
1350 | 1785 | class SymLinks(object): |
1351 | | - def __init__(self, wiki, dumpDir, date, logfn, debugfn, enabled): |
| 1786 | + def __init__(self, wiki, dumpDir, logfn, debugfn, enabled): |
1352 | 1787 | self.wiki = wiki |
1353 | 1788 | self.dumpDir = dumpDir |
1354 | | - self.date = date |
1355 | 1789 | self._enabled = enabled |
1356 | 1790 | self.logfn = logfn |
1357 | 1791 | self.debugfn = debugfn |
— | — | @@ -1363,35 +1797,41 @@ |
1364 | 1798 | self.debugfn("Creating %s ..." % dir) |
1365 | 1799 | os.makedirs(dir) |
1366 | 1800 | |
1367 | | - def saveSymlink(self, file): |
| 1801 | + def saveSymlink(self, dumpFile): |
1368 | 1802 | if (self._enabled): |
1369 | | - self.makeDir(join(self.wiki.publicDir(), 'latest')) |
1370 | | - real = self.dumpDir.publicPath(file) |
1371 | | - link = self.dumpDir.latestPath(file) |
| 1803 | + self.makeDir(self.dumpDir.latestDir()) |
| 1804 | + realfile = self.dumpDir.filenamePublicPath(dumpFile) |
| 1805 | + link = os.path.join(self.dumpDir.latestDir(), dumpFile.filename) |
1372 | 1806 | if exists(link) or os.path.islink(link): |
1373 | 1807 | if os.path.islink(link): |
1374 | 1808 | realfile = os.readlink(link) |
1375 | 1809 | # format of these links should be... ../20110228/elwikidb-20110228-templatelinks.sql.gz |
1376 | | - rellinkpattern = re.compile('^\.\./(20[0-9]+)/'); |
1377 | | - dateinlink = rellinkpattern.search(realfile) |
1378 | | - if (dateinlink): |
1379 | | - dateoflinkedfile = dateinlink.group(1) |
1380 | | - dateinterval = int(self.date) - int(dateoflinkedfile) |
1381 | | - else: |
1382 | | - dateinterval = 0 |
| 1810 | + rellinkpattern = re.compile('^\.\./(20[0-9]+)/') |
| 1811 | + dateinterval = int(self.wiki.date) - int(dumpFile.date) |
1383 | 1812 | # no file or it's older than ours... *then* remove the link |
1384 | 1813 | if not exists(os.path.realpath(link)) or dateinterval > 0: |
1385 | | - self.debug("Removing old symlink %s" % link) |
1386 | | - runner.removeFile(link) |
| 1814 | + self.debugfn("Removing old symlink %s" % link) |
| 1815 | + os.remove(link) |
1387 | 1816 | else: |
1388 | 1817 | self.logfn("What the hell dude, %s is not a symlink" % link) |
1389 | 1818 | raise BackupError("What the hell dude, %s is not a symlink" % link) |
1390 | | - relative = FileUtils.relativePath(real, dirname(link)) |
| 1819 | + relative = FileUtils.relativePath(realfile, os.path.dirname(link)) |
1391 | 1820 | # if we removed the link cause it's obsolete, make the new one |
1392 | | - if exists(real) and not exists(link): |
| 1821 | + if exists(realfile) and not exists(link): |
1393 | 1822 | self.debugfn("Adding symlink %s -> %s" % (link, relative)) |
1394 | 1823 | os.symlink(relative, link) |
1395 | 1824 | |
| 1825 | + def cleanupSymLinks(self): |
| 1826 | + if (self._enabled): |
| 1827 | + latestDir = self.dumpDir.latestDir() |
| 1828 | + files = os.listdir(latestDir) |
| 1829 | + for f in files: |
| 1830 | + link = os.path.join(latestDir,f) |
| 1831 | + if os.path.islink(link): |
| 1832 | + realfile = os.readlink(link) |
| 1833 | + if not exists(realfile): |
| 1834 | + os.remove(link) |
| 1835 | + |
1396 | 1836 | class Feeds(object): |
1397 | 1837 | def __init__(self, wiki, dumpDir, dbName, debugfn, enabled): |
1398 | 1838 | self.wiki = wiki |
— | — | @@ -1400,37 +1840,61 @@ |
1401 | 1841 | self.debugfn = debugfn |
1402 | 1842 | self._enabled = enabled |
1403 | 1843 | |
1404 | | - def makeDir(self, dir): |
| 1844 | + def makeDir(self, dirname): |
1405 | 1845 | if (self._enabled): |
1406 | | - if exists(dir): |
1407 | | - self.debugfn("Checkdir dir %s ..." % dir) |
| 1846 | + if exists(dirname): |
| 1847 | + self.debugfn("Checkdir dir %s ..." % dirname) |
1408 | 1848 | else: |
1409 | | - self.debugfn("Creating %s ..." % dir) |
1410 | | - os.makedirs(dir) |
| 1849 | + self.debugfn("Creating %s ..." % dirname) |
| 1850 | + os.makedirs(dirname) |
1411 | 1851 | |
1412 | | - def saveFeed(self, file): |
| 1852 | + def saveFeed(self, fileObj): |
1413 | 1853 | if (self._enabled): |
1414 | | - self.makeDir(join(self.wiki.publicDir(), 'latest')) |
1415 | | - filePath = self.dumpDir.webPath(file) |
1416 | | - fileName = os.path.basename(filePath) |
1417 | | - webPath = os.path.dirname(filePath) |
| 1854 | + self.makeDir(self.dumpDir.latestDir()) |
| 1855 | + filenameAndPath = self.dumpDir.webPath(fileObj) |
| 1856 | + webPath = os.path.dirname(filenameAndPath) |
1418 | 1857 | rssText = self.wiki.config.readTemplate("feed.xml") % { |
1419 | | - "chantitle": file, |
| 1858 | + "chantitle": fileObj.basename, |
1420 | 1859 | "chanlink": webPath, |
1421 | 1860 | "chandesc": "Wikimedia dump updates for %s" % self.dbName, |
1422 | 1861 | "title": webPath, |
1423 | 1862 | "link": webPath, |
1424 | | - "description": xmlEscape("<a href=\"%s\">%s</a>" % (filePath, fileName)), |
1425 | | - "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())} |
| 1863 | + "description": xmlEscape("<a href=\"%s\">%s</a>" % (filenameAndPath, fileObj.filename)), |
| 1864 | + "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) } |
1426 | 1865 | directory = self.dumpDir.latestDir() |
1427 | | - rssPath = self.dumpDir.latestPath(file + "-rss.xml") |
1428 | | - FileUtils.writeFile(directory, rssPath, rssText, self.wiki.config.fileperms) |
| 1866 | + rssPath = os.path.join(self.dumpDir.latestDir(), fileObj.basename + "-rss.xml") |
| 1867 | + FileUtils.writeFile(self.wiki.config.tempDir, rssPath, rssText, self.wiki.config.fileperms) |
1429 | 1868 | |
| 1869 | + def cleanupFeeds(self): |
| 1870 | + # call this after sym links in this dir have been cleaned up. |
| 1871 | + # we should probably fix this so there is no such dependency, |
| 1872 | + # but it would mean parsing the contents of the rss file, bleah |
| 1873 | + if (self._enabled): |
| 1874 | + latestDir = self.dumpDir.latestDir() |
| 1875 | + files = os.listdir(latestDir) |
| 1876 | + for f in files: |
| 1877 | + if f.endswith("-rss.xml"): |
| 1878 | + filename = f[:-8]; |
| 1879 | + link = os.path.join(latestDir,filename) |
| 1880 | + if not exists(link): |
| 1881 | + os.remove(os.path.join(latestDir,f)) |
| 1882 | + |
1430 | 1883 | class Dump(object): |
1431 | 1884 | def __init__(self, name, desc): |
1432 | 1885 | self._desc = desc |
1433 | 1886 | self.progress = "" |
1434 | 1887 | self.runInfo = RunInfo(name,"waiting","") |
| 1888 | + self.dumpName = self.getDumpName() |
| 1889 | + self.fileType = self.getFileType() |
| 1890 | + self.fileExt = self.getFileExt() |
| 1891 | + # if var hasn't been defined by a derived class already. (We get |
| 1892 | + # called last by child classes in their constructor, so that |
| 1893 | + # their functions overriding things like the dumpbName can |
| 1894 | + # be set up before we use them to set class attributes.) |
| 1895 | + if not hasattr(self, '_chunksEnabled'): |
| 1896 | + self._chunksEnabled = False |
| 1897 | + if not hasattr(self, '_checkpointsEnabled'): |
| 1898 | + self._checkpointsEnabled = False |
1435 | 1899 | |
1436 | 1900 | def name(self): |
1437 | 1901 | return self.runInfo.name() |
— | — | @@ -1468,10 +1932,21 @@ |
1469 | 1933 | """Optionally return additional text to appear under the heading.""" |
1470 | 1934 | return None |
1471 | 1935 | |
1472 | | - def listOutputFiles(self, runner): |
1473 | | - """Return a list of filenames which should be exported and checksummed""" |
1474 | | - return [] |
| 1936 | + def getDumpName(self): |
| 1937 | + """Return the dumpName as it appears in output files for this phase of the dump |
| 1938 | + e.g. pages-meta-history, all-titles-in-ns0, etc""" |
| 1939 | + return "" |
1475 | 1940 | |
| 1941 | + def getFileExt(self): |
| 1942 | + """Return the extension of output files for this phase of the dump |
| 1943 | + e.g. bz2 7z etc""" |
| 1944 | + return "" |
| 1945 | + |
| 1946 | + def getFileType(self): |
| 1947 | + """Return the type of output files for this phase of the dump |
| 1948 | + e.g. sql xml etc""" |
| 1949 | + return "" |
| 1950 | + |
1476 | 1951 | def start(self, runner): |
1477 | 1952 | """Set the 'in progress' flag so we can output status.""" |
1478 | 1953 | self.setStatus("in-progress") |
— | — | @@ -1497,6 +1972,7 @@ |
1498 | 1973 | runner.log.addToLogQueue(line) |
1499 | 1974 | sys.stderr.write(line) |
1500 | 1975 | self.progress = line.strip() |
| 1976 | + # FIXME test this a lot!! does the updateStatus work? |
1501 | 1977 | runner.status.updateStatusFiles() |
1502 | 1978 | runner.runInfoFile.saveDumpRunInfoFile(runner.dumpItemList.reportDumpRunInfo()) |
1503 | 1979 | |
— | — | @@ -1508,19 +1984,18 @@ |
1509 | 1985 | def waitAlarmHandler(self, signum, frame): |
1510 | 1986 | pass |
1511 | 1987 | |
1512 | | - def buildRecombineCommandString(self, runner, files, outputFileBasename, compressionCommand, uncompressionCommand, endHeaderMarker="</siteinfo>"): |
1513 | | -# outputFilename = self.buildOutputFilename(runner, outputFileBasename) |
1514 | | - outputFilename = runner.dumpDir.publicPath(outputFileBasename) |
| 1988 | + def buildRecombineCommandString(self, runner, files, outputFile, compressionCommand, uncompressionCommand, endHeaderMarker="</siteinfo>"): |
| 1989 | + outputFilename = runner.dumpDir.filenamePublicPath(outputFile) |
1515 | 1990 | chunkNum = 0 |
1516 | 1991 | recombines = [] |
1517 | 1992 | if (not exists( runner.wiki.config.head ) ): |
1518 | | - raise BackupError("head command %s not found" % runner.wiki.config.head); |
| 1993 | + raise BackupError("head command %s not found" % runner.wiki.config.head) |
1519 | 1994 | head = runner.wiki.config.head |
1520 | 1995 | if (not exists( runner.wiki.config.tail ) ): |
1521 | | - raise BackupError("tail command %s not found" % runner.wiki.config.tail); |
| 1996 | + raise BackupError("tail command %s not found" % runner.wiki.config.tail) |
1522 | 1997 | tail = runner.wiki.config.tail |
1523 | 1998 | if (not exists( runner.wiki.config.grep ) ): |
1524 | | - raise BackupError("grep command %s not found" % runner.wiki.config.grep); |
| 1999 | + raise BackupError("grep command %s not found" % runner.wiki.config.grep) |
1525 | 2000 | grep = runner.wiki.config.grep |
1526 | 2001 | |
1527 | 2002 | # we assume the result is always going to be run in a subshell. |
— | — | @@ -1536,11 +2011,11 @@ |
1537 | 2012 | u = MiscUtils.shellEscape(u) |
1538 | 2013 | for u in compressionCommand: |
1539 | 2014 | u = MiscUtils.shellEscape(u) |
1540 | | - for f in files: |
1541 | | - f = MiscUtils.shellEscape(f) |
1542 | 2015 | |
1543 | | - for f in files: |
1544 | | - f = runner.dumpDir.publicPath(f) |
| 2016 | + for fileObj in files: |
| 2017 | + # uh oh FIXME |
| 2018 | +# f = MiscUtils.shellEscape(fileObj.filename) |
| 2019 | + f = runner.dumpDir.filenamePublicPath(fileObj) |
1545 | 2020 | chunkNum = chunkNum + 1 |
1546 | 2021 | pipeline = [] |
1547 | 2022 | uncompressThisFile = uncompressionCommand[:] |
— | — | @@ -1571,42 +2046,369 @@ |
1572 | 2047 | recombineCommandString = "(" + ";".join(recombines) + ")" + "|" + "%s %s" % (compressionCommand, outputFilename) |
1573 | 2048 | return(recombineCommandString) |
1574 | 2049 | |
1575 | | - def cleanupOldFiles(self, runner, outputFileBasename): |
| 2050 | + def cleanupOldFiles(self, dumpDir, chunks = False): |
1576 | 2051 | if (runner._cleanupOldFilesEnabled): |
1577 | | - outputFilename = self.buildOutputFilename(runner, outputFileBasename) |
1578 | | - if exists(outputFilename): |
1579 | | - runner.removeFile(outputFilename) |
| 2052 | + files = self.listOutputFilesForCleanup(dumpDir) |
| 2053 | + for f in files: |
| 2054 | + if exists(dumpDir.filenamePublicPath(f)): |
| 2055 | + os.remove(dumpDir.filenamePublicPath(f)) |
1580 | 2056 | |
1581 | | - def buildOutputFilename(self, runner, outputFileBasename): |
1582 | | - return outputFilename |
| 2057 | + def getChunkList(self): |
| 2058 | + if self._chunksEnabled: |
| 2059 | + if self._chunkToDo: |
| 2060 | + return [ self._chunkToDo ] |
| 2061 | + else: |
| 2062 | + return range(1, len(self._chunks)+1) |
| 2063 | + else: |
| 2064 | + return False |
1583 | 2065 | |
| 2066 | + # list all regular output files that exist |
| 2067 | + def listRegularFilesExisting(self, dumpDir, dumpNames = None, date = None, chunks = None): |
| 2068 | + files = [] |
| 2069 | + if not dumpNames: |
| 2070 | + dumpNames = [ self.dumpName ] |
| 2071 | + for d in dumpNames: |
| 2072 | + files.extend( dumpDir.getRegularFilesExisting(date, d, self.fileType, self.fileExt, chunks, temp = False)) |
| 2073 | + return files |
| 2074 | + |
| 2075 | + # list all checkpoint files that exist |
| 2076 | + def listCheckpointFilesExisting(self, dumpDir, dumpNames = None, date = None, chunks = None): |
| 2077 | + files = [] |
| 2078 | + if not dumpNames: |
| 2079 | + dumpNames = [ self.dumpName ] |
| 2080 | + for d in dumpNames: |
| 2081 | + files.extend(dumpDir.getCheckpointFilesExisting(date, d, self.fileType, self.fileExt, chunks, temp = False)) |
| 2082 | + return files |
| 2083 | + |
| 2084 | + # unused |
| 2085 | + # list all temp output files that exist |
| 2086 | + def listTempFilesExisting(self, dumpDir, dumpNames = None, date = None, chunks = None): |
| 2087 | + files = [] |
| 2088 | + if not dumpNames: |
| 2089 | + dumpNames = [ self.dumpName ] |
| 2090 | + for d in dumpNames: |
| 2091 | + files.extend( dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks = None, temp = True) ) |
| 2092 | + files.extend( dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks = None, temp = True) ) |
| 2093 | + return files |
| 2094 | + |
| 2095 | + # list checkpoint files that have been produced for specified chunk(s) |
| 2096 | + def listCheckpointFilesPerChunkExisting(self, dumpDir, chunks, dumpNames = None): |
| 2097 | + files = [] |
| 2098 | + if not dumpNames: |
| 2099 | + dumpNames = [ self.dumpName ] |
| 2100 | + for d in dumpNames: |
| 2101 | + files.extend(dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks, temp = False)) |
| 2102 | + return files |
| 2103 | + |
| 2104 | + # list noncheckpoint files that have been produced for specified chunk(s) |
| 2105 | + def listRegularFilesPerChunkExisting(self, dumpDir, chunks, dumpNames = None): |
| 2106 | + files = [] |
| 2107 | + if not dumpNames: |
| 2108 | + dumpNames = [ self.dumpName ] |
| 2109 | + for d in dumpNames: |
| 2110 | + files.extend( dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks, temp = False)) |
| 2111 | + return files |
| 2112 | + |
| 2113 | + # list temp output files that have been produced for specified chunk(s) |
| 2114 | + def listTempFilesPerChunkExisting(self, dumpDir, chunks, dumpNames = None): |
| 2115 | + files = [] |
| 2116 | + if not dumpNames: |
| 2117 | + dumpNames = [ self.dumpName ] |
| 2118 | + for d in dumpNames: |
| 2119 | + files.extend( runner.dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks, temp = True) ) |
| 2120 | + files.extend( runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks, temp = True) ) |
| 2121 | + return files |
| 2122 | + |
| 2123 | + |
| 2124 | + # unused |
| 2125 | + # list noncheckpoint chunk files that have been produced |
| 2126 | + def listRegularFilesChunkedExisting(self, dumpDir, dumpNames = None, date = None): |
| 2127 | + files = [] |
| 2128 | + if not dumpNames: |
| 2129 | + dumpNames = [ self.dumpName ] |
| 2130 | + for d in dumpNames: |
| 2131 | + files.extend(runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks= self.getChunkList(), temp = False)) |
| 2132 | + return files |
| 2133 | + |
| 2134 | + # unused |
| 2135 | + # list temp output chunk files that have been produced |
| 2136 | + def listTempFilesChunkedExisting(self, runner, dumpNames = None): |
| 2137 | + files = [] |
| 2138 | + if not dumpNames: |
| 2139 | + dumpNames = [ self.dumpName ] |
| 2140 | + for d in dumpNames: |
| 2141 | + files.extend( runner.dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks = self.getChunkList(), temp = True) ) |
| 2142 | + files.extend( runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks = self.getChunkList(), temp = True) ) |
| 2143 | + return files |
| 2144 | + |
| 2145 | + # unused |
| 2146 | + # list checkpoint files that have been produced for chunkless run |
| 2147 | + def listCheckpointFilesChunklessExisting(self, runner, dumpNames = None): |
| 2148 | + if not dumpNames: |
| 2149 | + dumpNames = [ self.dumpName ] |
| 2150 | + files = [] |
| 2151 | + for d in dumpNames: |
| 2152 | + files.extend(runner.dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks = False, temp = False)) |
| 2153 | + return files |
| 2154 | + |
| 2155 | + # unused |
| 2156 | + # list non checkpoint files that have been produced for chunkless run |
| 2157 | + def listRegularFilesChunklessExisting(self, runner, dumpNames = None): |
| 2158 | + if not dumpNames: |
| 2159 | + dumpNames = [ self.dumpName ] |
| 2160 | + files = [] |
| 2161 | + for d in dumpNames: |
| 2162 | + files.extend(runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks = False, temp = False)) |
| 2163 | + return files |
| 2164 | + |
| 2165 | + # unused |
| 2166 | + # list non checkpoint files that have been produced for chunkless run |
| 2167 | + def listTempFilesChunklessExisting(self, runner, dumpNames = None): |
| 2168 | + if not dumpNames: |
| 2169 | + dumpNames = [ self.dumpName ] |
| 2170 | + files = [] |
| 2171 | + for d in dumpNames: |
| 2172 | + files.extend(runner.dumpDir.getCheckpointFilesExisting(None, d, self.fileType, self.fileExt, chunks = False, temp = True)) |
| 2173 | + files.extend(runner.dumpDir.getRegularFilesExisting(None, d, self.fileType, self.fileExt, chunks = False, temp = True)) |
| 2174 | + return files |
| 2175 | + |
| 2176 | + |
| 2177 | + # internal function which all the public get*Possible functions call |
| 2178 | + # list all files that could be created for the given dumpName, filtering by the given args. |
| 2179 | + # by definition, checkpoint files are never returned in such a list, as we don't |
| 2180 | + # know where a checkpoint might be taken (which pageId start/end). |
| 2181 | + # |
| 2182 | + # if we get None for an arg then we accept all values for that arg in the filename |
| 2183 | + # if we get False for an arg (chunk, temp), we reject any filename which contains a value for that arg |
| 2184 | + # if we get True for an arg (temp), we accept only filenames which contain a value for the arg |
| 2185 | + # chunks should be a list of value(s), or True / False / None |
| 2186 | + def _getFilesPossible(self, dumpDir, date = None, dumpName = None, fileType = None, fileExt = None, chunks = None, temp = False ): |
| 2187 | + files = [] |
| 2188 | + if dumpName == None: |
| 2189 | + dumpname = self.dumpName |
| 2190 | + if chunks == None or chunks == False: |
| 2191 | + files.append(DumpFilename(dumpDir._wiki, date, dumpName, fileType, fileExt, None, None, temp)) |
| 2192 | + if chunks == True or chunks == None: |
| 2193 | + chunks = self.getChunksList() |
| 2194 | + if chunks: |
| 2195 | + for i in chunks: |
| 2196 | + files.append(DumpFilename(dumpDir._wiki, date, dumpName, fileType, fileExt, i, None, temp)) |
| 2197 | + return files |
| 2198 | + |
| 2199 | + # unused |
| 2200 | + # based on dump name, get all the output files we expect to generate except for temp files |
| 2201 | + def getRegularFilesPossible(self, dumpDir, dumpNames = None): |
| 2202 | + if not dumpNames: |
| 2203 | + dumpNames = [ self.dumpName ] |
| 2204 | + files = [] |
| 2205 | + for d in dumpNames: |
| 2206 | + files.extend( self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = None, temp = False) ) |
| 2207 | + return files |
| 2208 | + |
| 2209 | + # unused |
| 2210 | + # based on dump name, get all the temp output files we expect to generate |
| 2211 | + def getTempFilesPossible(self, dumpDir, dumpNames = None): |
| 2212 | + if not dumpNames: |
| 2213 | + dumpNames = [ self.dumpName ] |
| 2214 | + files = [] |
| 2215 | + for d in dumpNames: |
| 2216 | + files.extend( self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = None, temp = True ) ) |
| 2217 | + return files |
| 2218 | + |
| 2219 | + # based on dump name, chunks, etc. get all the output files we expect to generate for these chunks |
| 2220 | + def getRegularFilesPerChunkPossible(self, dumpDir, chunks, dumpNames = None): |
| 2221 | + if not dumpNames: |
| 2222 | + dumpNames = [ self.dumpName ] |
| 2223 | + files = [] |
| 2224 | + for d in dumpNames: |
| 2225 | + files.extend( self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks, temp = False) ) |
| 2226 | + return files |
| 2227 | + |
| 2228 | + # unused |
| 2229 | + # based on dump name, chunks, etc. get all the temp files we expect to generate for these chunks |
| 2230 | + def getTempFilesPerChunkPossible(self, dumpDir, chunks, dumpNames = None): |
| 2231 | + if not dumpNames: |
| 2232 | + dumpNames = [ self.dumpName ] |
| 2233 | + files = [] |
| 2234 | + for d in dumpNames: |
| 2235 | + files.extend(self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks, temp = True)) |
| 2236 | + return files |
| 2237 | + |
| 2238 | + |
| 2239 | + # unused |
| 2240 | + # based on dump name, chunks, etc. get all the output files we expect to generate for these chunks |
| 2241 | + def getRegularFilesChunkedPossible(self, dumpDir, dumpNames = None): |
| 2242 | + if not dumpNames: |
| 2243 | + dumpNames = [ self.dumpName ] |
| 2244 | + files = [] |
| 2245 | + for d in dumpNames: |
| 2246 | + files.extend( self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = True, temp = False) ) |
| 2247 | + return files |
| 2248 | + |
| 2249 | + # unused |
| 2250 | + # based on dump name, chunks, etc. get all the temp files we expect to generate for these chunks |
| 2251 | + def getTempFilesPerChunkedPossible(self, dumpDir, dumpNames = None): |
| 2252 | + if not dumpNames: |
| 2253 | + dumpNames = [ self.dumpName ] |
| 2254 | + files = [] |
| 2255 | + for d in dumpNames: |
| 2256 | + files.extend(self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = True, temp = True)) |
| 2257 | + return files |
| 2258 | + |
| 2259 | + # unused |
| 2260 | + # list noncheckpoint files that should be produced for chunkless run |
| 2261 | + def getRegularFilesChunklessPossible(self, dumpDir, dumpNames = None): |
| 2262 | + if not dumpNames: |
| 2263 | + dumpNames = [ self.dumpName ] |
| 2264 | + files = [] |
| 2265 | + for d in dumpNames: |
| 2266 | + files.extend(self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = False, temp = False)) |
| 2267 | + return files |
| 2268 | + |
| 2269 | + # unused |
| 2270 | + # list temp output files that should be produced for chunkless run |
| 2271 | + def getTempFilesChunklessPossible(self, dumpDir, dumpNames = None): |
| 2272 | + if not dumpNames: |
| 2273 | + dumpNames = [ self.dumpName ] |
| 2274 | + files = [] |
| 2275 | + for d in dumpNames: |
| 2276 | + files.extend(self._getFilesPossible(dumpDir, None, d, self.fileType, self.fileExt, chunks = False, temp = True)) |
| 2277 | + return files |
| 2278 | + |
| 2279 | +################################ |
| 2280 | +# |
| 2281 | +# these routines are all used for listing output files for various purposes... |
| 2282 | +# |
| 2283 | +# |
| 2284 | + # Used for updating md5 lists, index.html |
| 2285 | + # Includes: checkpoints, chunks, chunkless, temp files if they exist. At end of run temp files must be gone. |
| 2286 | + # This is *all* output files for the dumpName, regardless of what's being re-run. |
| 2287 | + def listOutputFilesToPublish(self, dumpDir, dumpNames = None): |
| 2288 | + # some stages (eg XLMStubs) call this for several different dumpNames |
| 2289 | + if (dumpNames == None): |
| 2290 | + dumpNames = [ self.dumpName ] |
| 2291 | + files = [] |
| 2292 | + if (self._checkpointsEnabled): |
| 2293 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 2294 | + files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2295 | + files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2296 | + else: |
| 2297 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 2298 | + files.extend(self.getRegularFilesPerChunkPossible(dumpDir, self.getChunkList(), dumpNames)) |
| 2299 | + return files |
| 2300 | + |
| 2301 | + # called at end of job run to see if results are intact or are garbage and must be tossed/rerun. |
| 2302 | + # Includes: checkpoints, chunks, chunkless. Not included: temp files. |
| 2303 | + # This is only the files that should be produced from this run. So it is limited to a specific |
| 2304 | + # chunk if that's being redone, or to all chunks if the whole job is being redone, or to the chunkless |
| 2305 | + # files if there are no chunks enabled. |
| 2306 | + def listOutputFilesToCheckForTruncation(self, dumpDir, dumpNames = None): |
| 2307 | + # some stages (eg XLMStubs) call this for several different dumpNames |
| 2308 | + if (dumpNames == None): |
| 2309 | + dumpNames = [ self.dumpName ] |
| 2310 | + files = [] |
| 2311 | + if (self._checkpointsEnabled): |
| 2312 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 2313 | + files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2314 | + else: |
| 2315 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 2316 | + files.extend(self.getRegularFilesPerChunkPossible(dumpDir, self.getChunkList(), dumpNames)) |
| 2317 | + return files |
| 2318 | + |
| 2319 | + # called when putting together commands to produce output for the job. |
| 2320 | + # Includes: chunks, chunkless, temp files. Not included: checkpoint files. |
| 2321 | + # This is only the files that should be produced from this run. So it is limited to a specific |
| 2322 | + # chunk if that's being redone, or to all chunks if the whole job is being redone, or to the chunkless |
| 2323 | + # files if there are no chunks enabled. |
| 2324 | + def listOutputFilesForBuildCommand(self, dumpDir, dumpNames = None): |
| 2325 | + # some stages (eg XLMStubs) call this for several different dumpNames |
| 2326 | + if (dumpNames == None): |
| 2327 | + dumpNames = [ self.dumpName ] |
| 2328 | + files = [] |
| 2329 | + if (self._checkpointsEnabled): |
| 2330 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 2331 | + files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2332 | + else: |
| 2333 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 2334 | + files.extend(self.getRegularFilesPerChunkPossible(dumpDir, self.getChunkList(), dumpNames)) |
| 2335 | + return files |
| 2336 | + |
| 2337 | + # called before job run to cleanup old files left around from any previous run(s) |
| 2338 | + # Includes: checkpoints, chunks, chunkless, temp files if they exist. |
| 2339 | + # This is only the files that should be produced from this run. So it is limited to a specific |
| 2340 | + # chunk if that's being redone, or to all chunks if the whole job is being redone, or to the chunkless |
| 2341 | + # files if there are no chunks enabled. |
| 2342 | + def listOutputFilesForCleanup(self, dumpDir, dumpNames = None): |
| 2343 | + # some stages (eg XLMStubs) call this for several different dumpNames |
| 2344 | + if (dumpNames == None): |
| 2345 | + dumpNames = [ self.dumpName ] |
| 2346 | + files = [] |
| 2347 | + if (self._checkpointsEnabled): |
| 2348 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 2349 | + files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2350 | + files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2351 | + else: |
| 2352 | + # fixme this should be a list |
| 2353 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 2354 | + files.extend(self.listRegularFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2355 | + return files |
| 2356 | + |
| 2357 | + # used to generate list of input files for other phase (e.g. recombine, recompress) |
| 2358 | + # Includes: checkpoints, chunks/chunkless files depending on whether chunks are enabled. Not included: temp files. |
| 2359 | + # This is *all* output files for the job, regardless of what's being re-run. The caller can sort out which |
| 2360 | + # files go to which chunk, in case input is needed on a per chunk basis. (Is that going to be annoying? Nah, |
| 2361 | + # and we only do it once per job so who cares.) |
| 2362 | + def listOutputFilesForInput(self, dumpDir, dumpNames = None): |
| 2363 | + # some stages (eg XLMStubs) call this for several different dumpNames |
| 2364 | + if (dumpNames == None): |
| 2365 | + dumpNames = [ self.dumpName ] |
| 2366 | + files = [] |
| 2367 | + if (self._checkpointsEnabled): |
| 2368 | + files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2369 | + else: |
| 2370 | + files.extend(self.listRegularFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 2371 | + return files |
| 2372 | + |
1584 | 2373 | class PublicTable(Dump): |
1585 | 2374 | """Dump of a table using MySQL's mysqldump utility.""" |
1586 | 2375 | |
1587 | 2376 | def __init__(self, table, name, desc): |
| 2377 | + self._table = table |
| 2378 | + self._chunksEnabled = False |
1588 | 2379 | Dump.__init__(self, name, desc) |
1589 | | - self._table = table |
1590 | 2380 | |
1591 | | - def _file(self): |
1592 | | - return self._table + ".sql.gz" |
| 2381 | + def getDumpName(self): |
| 2382 | + return(self._table) |
1593 | 2383 | |
1594 | | - def _path(self, runner): |
1595 | | - return runner.dumpDir.publicPath(self._file()) |
| 2384 | + def getFileType(self): |
| 2385 | + return "sql" |
1596 | 2386 | |
| 2387 | + def getFileExt(self): |
| 2388 | + return "gz" |
| 2389 | + |
1597 | 2390 | def run(self, runner): |
1598 | 2391 | retries = 0 |
1599 | 2392 | # try this initially and see how it goes |
1600 | 2393 | maxretries = 3 |
1601 | | - error = runner.saveTable(self._table, self._path(runner)) |
| 2394 | + files = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 2395 | + if (len(files) > 1): |
| 2396 | + raise BackupError("table dump %s trying to produce more than one file" % self.dumpName) |
| 2397 | + outputFile = files[0] |
| 2398 | + error = self.saveTable(self._table, runner.dumpDir.filenamePublicPath(outputFile), runner) |
1602 | 2399 | while (error and retries < maxretries): |
1603 | 2400 | retries = retries + 1 |
1604 | 2401 | time.sleep(5) |
1605 | | - error = runner.saveTable(self._table, self._path(runner)) |
| 2402 | + error = self.saveTable(self._table, runner.dumpDir.filenamePublicPath(outputFile), runner) |
1606 | 2403 | if (error): |
1607 | 2404 | raise BackupError("error dumping table %s" % self._table) |
1608 | 2405 | |
1609 | | - def listOutputFiles(self, runner): |
1610 | | - return [self._file()] |
| 2406 | + # returns 0 on success, 1 on error |
| 2407 | + def saveTable(self, table, outfile, runner): |
| 2408 | + """Dump a table from the current DB with mysqldump, save to a gzipped sql file.""" |
| 2409 | + if (not exists( runner.wiki.config.gzip ) ): |
| 2410 | + raise BackupError("gzip command %s not found" % runner.wiki.config.gzip) |
| 2411 | + commands = runner.dbServerInfo.buildSqlDumpCommand(table, runner.wiki.config.gzip) |
| 2412 | + return runner.saveCommand(commands, outfile) |
1611 | 2413 | |
1612 | 2414 | class PrivateTable(PublicTable): |
1613 | 2415 | """Hidden table dumps for private data.""" |
— | — | @@ -1614,77 +2416,120 @@ |
1615 | 2417 | def description(self): |
1616 | 2418 | return self._desc + " (private)" |
1617 | 2419 | |
1618 | | - def _path(self, runner): |
1619 | | - return runner.dumpDir.privatePath(self._file()) |
| 2420 | + def run(self, runner): |
| 2421 | + retries = 0 |
| 2422 | + # try this initially and see how it goes |
| 2423 | + maxretries = 3 |
| 2424 | + files = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 2425 | + if (len(files) > 1): |
| 2426 | + raise BackupError("table dump %s trying to produce more than one file" % self.dumpName) |
| 2427 | + outputFile = files[0] |
| 2428 | + error = self.saveTable(self._table, runner.dumpDir.filenamePrivatePath(outputFile), runner) |
| 2429 | + while (error and retries < maxretries): |
| 2430 | + retries = retries + 1 |
| 2431 | + time.sleep(5) |
| 2432 | + error = self.saveTable(self._table, runner.dumpDir.filenamePrivatePath(outputFile), runner) |
| 2433 | + if (error): |
| 2434 | + raise BackupError("error dumping table %s" % self._table) |
1620 | 2435 | |
1621 | | - def listOutputFiles(self, runner): |
| 2436 | + def listOutputFilesToPublish(self, dumpDir): |
1622 | 2437 | """Private table won't have public files to list.""" |
1623 | 2438 | return [] |
1624 | 2439 | |
1625 | | - |
1626 | 2440 | class XmlStub(Dump): |
1627 | 2441 | """Create lightweight skeleton dumps, minus bulk text. |
1628 | 2442 | A second pass will import text from prior dumps or the database to make |
1629 | 2443 | full files for the public.""" |
1630 | 2444 | |
1631 | | - def __init__(self, name, desc, chunkToDo, chunks = False): |
1632 | | - Dump.__init__(self, name, desc) |
| 2445 | + def __init__(self, name, desc, chunkToDo, chunks = False, checkpoints = False): |
1633 | 2446 | self._chunkToDo = chunkToDo |
1634 | 2447 | self._chunks = chunks |
| 2448 | + if self._chunks: |
| 2449 | + self._chunksEnabled = True |
| 2450 | + self.historyDumpName = "stub-meta-history" |
| 2451 | + self.currentDumpName = "stub-meta-current" |
| 2452 | + self.articlesDumpName = "stub-articles" |
| 2453 | + if checkpoints: |
| 2454 | + self._checkpointsEnabled = True |
| 2455 | + Dump.__init__(self, name, desc) |
1635 | 2456 | |
1636 | 2457 | def detail(self): |
1637 | 2458 | return "These files contain no page text, only revision metadata." |
1638 | 2459 | |
1639 | | - def listOutputFiles(self, runner, unnumbered=False): |
1640 | | - if (self._chunks): |
1641 | | - if (self._chunkToDo): |
1642 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
1643 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
1644 | | - files = [] |
1645 | | - files.append("stub-meta-history%s.xml.gz" % self._chunkToDo) |
1646 | | - files.append("stub-meta-current%s.xml.gz" % self._chunkToDo) |
1647 | | - files.append("stub-articles%s.xml.gz" % self._chunkToDo) |
1648 | | - return files |
1649 | | - else: |
1650 | | - files = [] |
1651 | | - for i in range(1, len(self._chunks) + 1): |
1652 | | - files.append("stub-meta-history%s.xml.gz" % i) |
1653 | | - files.append("stub-meta-current%s.xml.gz" % i) |
1654 | | - files.append("stub-articles%s.xml.gz" % i) |
1655 | | - return files |
1656 | | - else: |
1657 | | - return ["stub-meta-history.xml.gz", |
1658 | | - "stub-meta-current.xml.gz", |
1659 | | - "stub-articles.xml.gz"] |
| 2460 | + def getFileType(self): |
| 2461 | + return "xml" |
1660 | 2462 | |
1661 | | - def buildCommand(self, runner, chunk = 0): |
1662 | | - history = self.buildHistoryOutputFilename(runner, chunk) |
1663 | | - current = self.buildCurrentOutputFilename(runner, chunk) |
1664 | | - articles = self.buildArticlesOutputFilename(runner, chunk) |
| 2463 | + def getFileExt(self): |
| 2464 | + return "gz" |
1665 | 2465 | |
| 2466 | + def getDumpName(self): |
| 2467 | + return 'stub' |
| 2468 | + |
| 2469 | + def listDumpNames(self): |
| 2470 | + dumpNames = [ self.historyDumpName, self.currentDumpName, self.articlesDumpName ] |
| 2471 | + return dumpNames |
| 2472 | + |
| 2473 | + def listOutputFilesToPublish(self, dumpDir): |
| 2474 | + dumpNames = self.listDumpNames() |
| 2475 | + files = [] |
| 2476 | + files.extend(Dump.listOutputFilesToPublish(self, dumpDir, dumpNames)) |
| 2477 | + return files |
| 2478 | + |
| 2479 | + def listOutputFilesToCheckForTruncation(self, dumpDir): |
| 2480 | + dumpNames = self.listDumpNames() |
| 2481 | + files = [] |
| 2482 | + files.extend(Dump.listOutputFilesToCheckForTruncation(self, dumpDir, dumpNames)) |
| 2483 | + return files |
| 2484 | + |
| 2485 | + def listOutputFilesForBuildCommand(self, dumpDir): |
| 2486 | + dumpNames = self.listDumpNames() |
| 2487 | + files = [] |
| 2488 | + files.extend(Dump.listOutputFilesForBuildCommand(self, dumpDir, dumpNames)) |
| 2489 | + return files |
| 2490 | + |
| 2491 | + def listOutputFilesForCleanup(self, dumpDir): |
| 2492 | + # fixme should this pass a list instead of one item? |
| 2493 | + dumpNames = self.listDumpNames() |
| 2494 | + files = [] |
| 2495 | + files.extend(Dump.listOutputFilesForCleanup(self, dumpDir, dumpNames)) |
| 2496 | + return files |
| 2497 | + |
| 2498 | + def listOutputFilesForInput(self, dumpDir, dumpNames = None): |
| 2499 | + if dumpNames == None: |
| 2500 | + dumpNames = self.listDumpNames() |
| 2501 | + files = [] |
| 2502 | + files.extend(Dump.listOutputFilesForInput(self, dumpDir, dumpNames)) |
| 2503 | + return files |
| 2504 | + |
| 2505 | + def buildCommand(self, runner, f): |
1666 | 2506 | if (not exists( runner.wiki.config.php ) ): |
1667 | | - raise BackupError("php command %s not found" % runner.wiki.config.php); |
| 2507 | + raise BackupError("php command %s not found" % runner.wiki.config.php) |
| 2508 | + |
| 2509 | + # fixme we have a list of all the files for all three dumpNames, we want to split them up by dumpName. oops. |
| 2510 | + articlesFile = runner.dumpDir.filenamePublicPath(f) |
| 2511 | + historyFile = runner.dumpDir.filenamePublicPath(DumpFilename(runner.wiki, f.date, self.historyDumpName, f.fileType, f.fileExt, f.chunk, f.checkpoint, f.temp)) |
| 2512 | + currentFile = runner.dumpDir.filenamePublicPath(DumpFilename(runner.wiki, f.date, self.currentDumpName, f.fileType, f.fileExt, f.chunk, f.checkpoint, f.temp)) |
1668 | 2513 | command = [ "%s" % runner.wiki.config.php, |
1669 | 2514 | "-q", "%s/maintenance/dumpBackup.php" % runner.wiki.config.wikiDir, |
1670 | 2515 | "--wiki=%s" % runner.dbName, |
1671 | 2516 | "--full", "--stub", "--report=10000", |
1672 | 2517 | "%s" % runner.forceNormalOption(), |
1673 | | - "--output=gzip:%s" % history, |
1674 | | - "--output=gzip:%s" % current, |
1675 | | - "--filter=latest", "--output=gzip:%s" % articles, |
| 2518 | + "--output=gzip:%s" % historyFile, |
| 2519 | + "--output=gzip:%s" % currentFile, |
| 2520 | + "--filter=latest", "--output=gzip:%s" % articlesFile, |
1676 | 2521 | "--filter=latest", "--filter=notalk", "--filter=namespace:!NS_USER" ] |
1677 | | - if (chunk): |
| 2522 | + |
| 2523 | + if (f.chunk): |
1678 | 2524 | # set up start end end pageids for this piece |
1679 | 2525 | # note there is no page id 0 I guess. so we start with 1 |
1680 | 2526 | # start = runner.pagesPerChunk()*(chunk-1) + 1 |
1681 | | - start = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1 |
| 2527 | + start = sum([ self._chunks[i] for i in range(0,f.chunkInt-1)]) + 1 |
1682 | 2528 | startopt = "--start=%s" % start |
1683 | 2529 | # if we are on the last chunk, we should get up to the last pageid, |
1684 | 2530 | # whatever that is. |
1685 | 2531 | command.append(startopt) |
1686 | | - if chunk < len(self._chunks): |
1687 | | - # end = start + runner.pagesPerChunk() |
1688 | | - end = sum([ self._chunks[i] for i in range(0,chunk)]) +1 |
| 2532 | + if f.chunkInt < len(self._chunks): |
| 2533 | + end = sum([ self._chunks[i] for i in range(0,f.chunkInt)]) +1 |
1689 | 2534 | endopt = "--end=%s" % end |
1690 | 2535 | command.append(endopt) |
1691 | 2536 | |
— | — | @@ -1692,107 +2537,66 @@ |
1693 | 2538 | series = [ pipeline ] |
1694 | 2539 | return(series) |
1695 | 2540 | |
1696 | | - def cleanupOldFiles(self, runner, chunk = 0): |
1697 | | - if (runner._cleanupOldFilesEnabled): |
1698 | | - fileList = self.buildOutputFilenames(runner, chunk) |
1699 | | - for filename in fileList: |
1700 | | - if exists(filename): |
1701 | | - runner.removeFile(filename) |
1702 | | - |
1703 | | - def buildHistoryOutputFilename(self, runner, chunk = 0): |
1704 | | - if (chunk): |
1705 | | - chunkinfo = "%s" % chunk |
1706 | | - else: |
1707 | | - chunkinfo = "" |
1708 | | - history = runner.dumpDir.publicPath("stub-meta-history" + chunkinfo + ".xml.gz") |
1709 | | - return history |
1710 | | - |
1711 | | - def buildCurrentOutputFilename(self, runner, chunk = 0): |
1712 | | - if (chunk): |
1713 | | - chunkinfo = "%s" % chunk |
1714 | | - else: |
1715 | | - chunkinfo = "" |
1716 | | - current = runner.dumpDir.publicPath("stub-meta-current" + chunkinfo + ".xml.gz") |
1717 | | - return current |
1718 | | - |
1719 | | - def buildArticlesOutputFilename(self, runner, chunk = 0): |
1720 | | - if (chunk): |
1721 | | - chunkinfo = "%s" % chunk |
1722 | | - else: |
1723 | | - chunkinfo = "" |
1724 | | - articles = runner.dumpDir.publicPath("stub-articles" + chunkinfo + ".xml.gz") |
1725 | | - return articles |
1726 | | - |
1727 | | - def buildOutputFilenames(self, runner, chunk = 0): |
1728 | | - history = self.buildHistoryOutputFilename(runner, chunk) |
1729 | | - current = self.buildCurrentOutputFilename(runner, chunk) |
1730 | | - articles = self.buildArticlesOutputFilename(runner, chunk) |
1731 | | - return([ history, current, articles ]) |
1732 | | - |
1733 | 2541 | def run(self, runner): |
1734 | 2542 | commands = [] |
1735 | | - if self._chunks: |
1736 | | - if (self._chunkToDo): |
1737 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
1738 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
1739 | | - self.cleanupOldFiles(runner,self._chunkToDo) |
1740 | | - series = self.buildCommand(runner, self._chunkToDo) |
| 2543 | + self.cleanupOldFiles(runner.dumpDir) |
| 2544 | + files = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 2545 | + for f in files: |
| 2546 | + # choose arbitrarily one of the dumpNames we do (= articlesDumpName) |
| 2547 | + # buildcommand will figure out the files for the rest |
| 2548 | + if (f.dumpName == self.articlesDumpName): |
| 2549 | + series = self.buildCommand(runner, f) |
1741 | 2550 | commands.append(series) |
1742 | | - else: |
1743 | | - for i in range(1, len(self._chunks)+1): |
1744 | | - self.cleanupOldFiles(runner,i) |
1745 | | - series = self.buildCommand(runner, i) |
1746 | | - commands.append(series) |
1747 | | - else: |
1748 | | - self.cleanupOldFiles(runner) |
1749 | | - series = self.buildCommand(runner) |
1750 | | - commands.append(series) |
1751 | 2551 | error = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner) |
1752 | 2552 | if (error): |
1753 | 2553 | raise BackupError("error producing stub files" % self._subset) |
1754 | 2554 | |
1755 | | -class RecombineXmlStub(XmlStub): |
1756 | | - def __init__(self, name, desc, chunks): |
1757 | | - XmlStub.__init__(self, name, desc, False, chunks) |
1758 | | - # this is here only so that a callback can capture output from some commands |
1759 | | - # related to recombining files if we did parallel runs of the recompression |
1760 | | - self._output = None |
| 2555 | +class RecombineXmlStub(Dump): |
| 2556 | + def __init__(self, name, desc, itemForXmlStubs): |
| 2557 | + self.itemForXmlStubs = itemForXmlStubs |
| 2558 | + Dump.__init__(self, name, desc) |
| 2559 | + # the input may have checkpoints but the output will not. |
| 2560 | + self._checkpointsEnabled = False |
1761 | 2561 | |
1762 | | - def listInputFiles(self, runner): |
1763 | | - return(XmlStub.listOutputFiles(self, runner)) |
| 2562 | + def detail(self): |
| 2563 | + return "These files contain no page text, only revision metadata." |
1764 | 2564 | |
1765 | | - def listOutputFiles(self, runner): |
1766 | | - return ["stub-meta-history.xml.gz", |
1767 | | - "stub-meta-current.xml.gz", |
1768 | | - "stub-articles.xml.gz"] |
| 2565 | + def listDumpNames(self): |
| 2566 | + return self.itemForXmlStubs.listDumpNames() |
1769 | 2567 | |
| 2568 | + def getFileType(self): |
| 2569 | + return self.itemForXmlStubs.getFileType() |
| 2570 | + |
| 2571 | + def getFileExt(self): |
| 2572 | + return self.itemForXmlStubs.getFileExt() |
| 2573 | + |
| 2574 | + def getDumpName(self): |
| 2575 | + return self.itemForXmlStubs.getDumpName() |
| 2576 | + |
1770 | 2577 | def run(self, runner): |
1771 | 2578 | error=0 |
1772 | | - if (self._chunks): |
1773 | | - files = self.listInputFiles(runner) |
1774 | | - outputFileList = self.listOutputFiles(runner) |
1775 | | - for outputFile in outputFileList: |
1776 | | - inputFiles = [] |
1777 | | - for inFile in files: |
1778 | | - (base, rest) = inFile.split('.',1) |
1779 | | - base = re.sub("\d+$", "", base) |
1780 | | - if base + "." + rest == outputFile: |
1781 | | - inputFiles.append(inFile) |
1782 | | - if not len(inputFiles): |
1783 | | - self.setStatus("failed") |
1784 | | - raise BackupError("No input files for %s found" % self.name) |
1785 | | - if (not exists( runner.wiki.config.gzip ) ): |
1786 | | - raise BackupError("gzip command %s not found" % runner.wiki.config.gzip); |
1787 | | - compressionCommand = runner.wiki.config.gzip |
1788 | | - compressionCommand = "%s > " % runner.wiki.config.gzip |
1789 | | - uncompressionCommand = [ "%s" % runner.wiki.config.gzip, "-dc" ] |
1790 | | - recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand ) |
1791 | | - recombineCommand = [ recombineCommandString ] |
1792 | | - recombinePipeline = [ recombineCommand ] |
1793 | | - series = [ recombinePipeline ] |
1794 | | - result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
1795 | | - if result: |
1796 | | - error = result |
| 2579 | + files = self.itemForXmlStubs.listOutputFilesForInput(runner.dumpDir) |
| 2580 | + outputFileList = self.listOutputFilesForBuildCommand(runner.dumpDir, self.listDumpNames()) |
| 2581 | + for outputFileObj in outputFileList: |
| 2582 | + inputFiles = [] |
| 2583 | + for inFile in files: |
| 2584 | + if inFile.dumpName == outputFileObj.dumpName: |
| 2585 | + inputFiles.append(inFile) |
| 2586 | + if not len(inputFiles): |
| 2587 | + self.setStatus("failed") |
| 2588 | + raise BackupError("No input files for %s found" % self.name()) |
| 2589 | + if (not exists( runner.wiki.config.gzip ) ): |
| 2590 | + raise BackupError("gzip command %s not found" % runner.wiki.config.gzip) |
| 2591 | + compressionCommand = runner.wiki.config.gzip |
| 2592 | + compressionCommand = "%s > " % runner.wiki.config.gzip |
| 2593 | + uncompressionCommand = [ "%s" % runner.wiki.config.gzip, "-dc" ] |
| 2594 | + recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFileObj, compressionCommand, uncompressionCommand ) |
| 2595 | + recombineCommand = [ recombineCommandString ] |
| 2596 | + recombinePipeline = [ recombineCommand ] |
| 2597 | + series = [ recombinePipeline ] |
| 2598 | + result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 2599 | + if result: |
| 2600 | + error = result |
1797 | 2601 | if (error): |
1798 | 2602 | raise BackupError("error recombining stub files") |
1799 | 2603 | |
— | — | @@ -1801,29 +2605,27 @@ |
1802 | 2606 | |
1803 | 2607 | def __init__(self, desc, chunks = False): |
1804 | 2608 | Dump.__init__(self, "xmlpagelogsdump", desc) |
1805 | | - self._chunks = chunks |
1806 | 2609 | |
1807 | 2610 | def detail(self): |
1808 | 2611 | return "This contains the log of actions performed on pages." |
1809 | 2612 | |
1810 | | - def listOutputFiles(self, runner): |
1811 | | - return ["pages-logging.xml.gz"] |
| 2613 | + def getDumpName(self): |
| 2614 | + return("pages-logging") |
1812 | 2615 | |
1813 | | - def cleanupOldFiles(self, runner): |
1814 | | - if (runner._cleanupOldFilesEnabled): |
1815 | | - logging = self.buildOutputFilename(runner) |
1816 | | - if exists(logging): |
1817 | | - runner.removeFile(logging) |
| 2616 | + def getFileType(self): |
| 2617 | + return "xml" |
1818 | 2618 | |
1819 | | - def buildOutputFilename(self, runner): |
1820 | | - logging = runner.dumpDir.publicPath("pages-logging.xml.gz") |
1821 | | - return logging |
| 2619 | + def getFileExt(self): |
| 2620 | + return "gz" |
1822 | 2621 | |
1823 | 2622 | def run(self, runner): |
1824 | | - self.cleanupOldFiles(runner) |
1825 | | - logging = self.buildOutputFilename(runner) |
| 2623 | + self.cleanupOldFiles(runner.dumpDir) |
| 2624 | + files = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 2625 | + if (len(files) > 1): |
| 2626 | + raise BackupError("logging table job wants to produce more than one output file") |
| 2627 | + logging = runner.dumpDir.filenamePublicPath(files[0]) |
1826 | 2628 | if (not exists( runner.wiki.config.php ) ): |
1827 | | - raise BackupError("php command %s not found" % runner.wiki.config.php); |
| 2629 | + raise BackupError("php command %s not found" % runner.wiki.config.php) |
1828 | 2630 | command = [ "%s" % runner.wiki.config.php, |
1829 | 2631 | "-q", "%s/maintenance/dumpBackup.php" % runner.wiki.config.wikiDir, |
1830 | 2632 | "--wiki=%s" % runner.dbName, |
— | — | @@ -1838,120 +2640,132 @@ |
1839 | 2641 | |
1840 | 2642 | class XmlDump(Dump): |
1841 | 2643 | """Primary XML dumps, one section at a time.""" |
1842 | | - def __init__(self, subset, name, desc, detail, prefetch, spawn, chunkToDo, chunks = False): |
1843 | | - Dump.__init__(self, name, desc) |
| 2644 | + def __init__(self, subset, name, desc, detail, itemForStubs, prefetch, spawn, wiki, chunkToDo, chunks = False, checkpoints = False): |
1844 | 2645 | self._subset = subset |
1845 | 2646 | self._detail = detail |
1846 | 2647 | self._desc = desc |
1847 | 2648 | self._prefetch = prefetch |
1848 | 2649 | self._spawn = spawn |
1849 | 2650 | self._chunks = chunks |
| 2651 | + if self._chunks: |
| 2652 | + self._chunksEnabled = True |
1850 | 2653 | self._pageID = {} |
1851 | 2654 | self._chunkToDo = chunkToDo |
1852 | 2655 | |
1853 | | - def detail(self): |
1854 | | - """Optionally return additional text to appear under the heading.""" |
1855 | | - return self._detail |
| 2656 | + self.wiki = wiki |
| 2657 | + self.itemForStubs = itemForStubs |
| 2658 | + if checkpoints: |
| 2659 | + self._checkpointsEnabled = True |
| 2660 | + Dump.__init__(self, name, desc) |
1856 | 2661 | |
1857 | | - def _file(self, ext, chunk=0): |
1858 | | - if (chunk): |
1859 | | - return "pages-" + self._subset + ("%s.xml." % chunk) + ext |
1860 | | - else: |
1861 | | - return "pages-" + self._subset + ".xml." + ext |
| 2662 | + def getDumpNameBase(self): |
| 2663 | + return('pages-') |
1862 | 2664 | |
1863 | | - def _path(self, runner, ext, chunk = 0): |
1864 | | - return runner.dumpDir.publicPath(self._file(ext, chunk)) |
| 2665 | + def getDumpName(self): |
| 2666 | + return(self.getDumpNameBase() + self._subset) |
1865 | 2667 | |
| 2668 | + def getFileType(self): |
| 2669 | + return "xml" |
| 2670 | + |
| 2671 | + def getFileExt(self): |
| 2672 | + return "bz2" |
| 2673 | + |
1866 | 2674 | def run(self, runner): |
1867 | 2675 | commands = [] |
1868 | | - if (self._chunks): |
1869 | | - if (self._chunkToDo): |
1870 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
1871 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
1872 | | - series = self.buildCommand(runner, self._chunkToDo) |
1873 | | - commands.append(series) |
1874 | | - else: |
1875 | | - for i in range(1, len(self._chunks)+1): |
1876 | | - series = self.buildCommand(runner, i) |
1877 | | - commands.append(series) |
1878 | | - else: |
1879 | | - series = self.buildCommand(runner) |
| 2676 | + self.cleanupOldFiles(runner.dumpDir) |
| 2677 | + # just get the files pertaining to our dumpName, which is *one* of articles, pages-current, pages-history. |
| 2678 | + # stubs include all of them together. |
| 2679 | + # FIXME this needs some other jobname here. uuuggghhh and how would this job know what that is? bah |
| 2680 | + if not self.dumpName.startswith(self.getDumpNameBase()): |
| 2681 | + raise BackupError("dumpName %s of unknown form for this job" % self.dumpName) |
| 2682 | + dumpName = self.dumpName[len(self.getDumpNameBase()):] |
| 2683 | + stubDumpNames = self.itemForStubs.listDumpNames() |
| 2684 | + for s in stubDumpNames: |
| 2685 | + if s.endswith(dumpName): |
| 2686 | + stubDumpName = s |
| 2687 | + inputFiles = self.itemForStubs.listOutputFilesForInput(runner.dumpDir, [ stubDumpName ]) |
| 2688 | + if self._chunksEnabled and self._chunkToDo: |
| 2689 | + # reset inputfiles to just have the one we want. |
| 2690 | + for f in inputFiles: |
| 2691 | + if f.chunkInt == self._chunkToDo: |
| 2692 | + inputFiles = [ f ] |
| 2693 | + break |
| 2694 | + if len(inputFiles) > 1: |
| 2695 | + raise BackupError("Trouble finding stub files for xml dump run") |
| 2696 | + for f in inputFiles: |
| 2697 | + # we should convert the input file to an output file I guess |
| 2698 | + # we write regular files |
| 2699 | + outputFile = DumpFilename(runner.wiki, f.date, f.dumpName, f.fileType, self.fileExt) |
| 2700 | + series = self.buildCommand(runner, f) |
1880 | 2701 | commands.append(series) |
| 2702 | + |
1881 | 2703 | error = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner) |
1882 | | - |
1883 | 2704 | truncationError = self.checkForTruncatedFiles(runner) |
1884 | | - |
1885 | 2705 | if (error or truncationError): |
1886 | | - raise BackupError("error producing xml bz2 file(s) %s" % self._subset) |
| 2706 | + raise BackupError("error producing xml file(s) %s" % self.dumpName) |
1887 | 2707 | |
1888 | 2708 | def checkForTruncatedFiles(self, runner): |
1889 | 2709 | if runner._checkForTruncatedFilesEnabled: |
1890 | | - if (not exists( runner.wiki.config.checkforbz2footer ) ): |
1891 | | - raise BackupError("checkforbz2footer command %s not found" % runner.wiki.config.checkforbz2footer); |
1892 | | - checkforbz2footer = "%s" % runner.wiki.config.checkforbz2footer |
1893 | | - if exists(checkforbz2footer): |
1894 | | - # check to see if any of the output files are truncated |
1895 | | - files = [] |
1896 | | - if (self._chunks): |
1897 | | - if (self._chunkToDo): |
1898 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
1899 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
1900 | | - files.append( self._path(runner, 'bz2', self._chunkToDo ) ) |
1901 | | - else: |
1902 | | - for i in range(1, len(self._chunks)+1): |
1903 | | - files.append( self._path(runner, 'bz2', i ) ) |
1904 | | - |
1905 | | - for f in files: |
1906 | | - pipeline = [] |
1907 | | - pipeline.append([ checkforbz2footer, f ]) |
1908 | | - p = CommandPipeline(pipeline, quiet=True) |
1909 | | - p.runPipelineAndGetOutput() |
1910 | | - if not p.exitedSuccessfully(): |
1911 | | - runner.logAndPrint("file %s is truncated, moving out of the way" %f ) |
1912 | | - os.rename( f, f + ".truncated" ) |
1913 | | - return 1 |
| 2710 | + # check to see if any of the output files are truncated |
| 2711 | + if self._checkpointsEnabled: |
| 2712 | + files = self.listCheckpointFilesPerChunkExisting(runner.dumpDir, self.getChunkList(), [ self.dumpName ]) |
| 2713 | + else: |
| 2714 | + files = self.listRegularFilesPerChunkExisting(runner.dumpDir, self.getChunkList(), [ self.dumpName ]) |
| 2715 | + for f in files: |
| 2716 | + f = DumpFile(runner.wiki,runner.dumpDir.filenamePublicPath(f)) |
| 2717 | + if (f.checkIfTruncated()): |
| 2718 | + runner.logAndPrint("file %s is truncated, moving out of the way" % f.filename ) |
| 2719 | + f.rename( f.filename + ".truncated" ) |
| 2720 | + return 1 |
1914 | 2721 | return 0 |
1915 | 2722 | |
1916 | 2723 | def buildEta(self, runner): |
1917 | 2724 | """Tell the dumper script whether to make ETA estimate on page or revision count.""" |
1918 | 2725 | return "--current" |
1919 | 2726 | |
1920 | | - def buildFilters(self, runner, chunk = 0): |
| 2727 | + # takes name of the output file |
| 2728 | + def buildFilters(self, runner, f): |
1921 | 2729 | """Construct the output filter options for dumpTextPass.php""" |
1922 | | - xmlbz2 = self._path(runner, "bz2", chunk) |
| 2730 | + # do we need checkpoints? ummm |
| 2731 | + xmlbz2 = runner.dumpDir.filenamePublicPath(f) |
| 2732 | + |
1923 | 2733 | if (not exists( runner.wiki.config.bzip2 ) ): |
1924 | | - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2); |
| 2734 | + raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2) |
1925 | 2735 | if runner.wiki.config.bzip2[-6:] == "dbzip2": |
1926 | 2736 | bz2mode = "dbzip2" |
1927 | 2737 | else: |
1928 | 2738 | bz2mode = "bzip2" |
1929 | 2739 | return "--output=%s:%s" % (bz2mode, xmlbz2) |
1930 | 2740 | |
1931 | | - def buildCommand(self, runner, chunk=0): |
| 2741 | + def buildCommand(self, runner, f): |
1932 | 2742 | """Build the command line for the dump, minus output and filter options""" |
1933 | 2743 | |
1934 | | - if (chunk): |
1935 | | - chunkinfo = "%s" % chunk |
| 2744 | + if (self._checkpointsEnabled): |
| 2745 | + # we write a temp file, it will be checkpointed every so often. |
| 2746 | + outputFile = DumpFilename(runner.wiki, f.date, self.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, temp = True) |
1936 | 2747 | else: |
1937 | | - chunkinfo ="" |
| 2748 | + # we write regular files |
| 2749 | + outputFile = DumpFilename(runner.wiki, f.date, self.dumpName, f.fileType, self.fileExt, f.chunk, checkpoint = False, temp = False) |
1938 | 2750 | |
1939 | 2751 | # Page and revision data pulled from this skeleton dump... |
1940 | | - stub = "stub-%s" % self._subset |
1941 | | - stub = stub + "%s.xml.gz" % chunkinfo |
1942 | | - stub = runner.dumpDir.publicPath(stub), |
1943 | | - stubOption = "--stub=gzip:%s" % stub |
| 2752 | + stubOption = "--stub=gzip:%s" % runner.dumpDir.filenamePublicPath(f) |
1944 | 2753 | |
1945 | 2754 | # Try to pull text from the previous run; most stuff hasn't changed |
1946 | 2755 | #Source=$OutputDir/pages_$section.xml.bz2 |
1947 | 2756 | sources = [] |
1948 | 2757 | possibleSources = None |
1949 | 2758 | if self._prefetch: |
1950 | | - possibleSources = self._findPreviousDump(runner, chunk) |
| 2759 | + possibleSources = self._findPreviousDump(runner, f.chunk) |
1951 | 2760 | # if we have a list of more than one then we need to check existence for each and put them together in a string |
1952 | 2761 | if possibleSources: |
1953 | 2762 | for sourceFile in possibleSources: |
1954 | | - if exists(sourceFile): |
1955 | | - sources.append(sourceFile) |
| 2763 | + s = runner.dumpDir.filenamePublicPath(sourceFile, sourceFile.date) |
| 2764 | + if exists(s): |
| 2765 | + sources.append(s) |
| 2766 | + if (f.chunk): |
| 2767 | + chunkinfo = "%s" % f.chunk |
| 2768 | + else: |
| 2769 | + chunkinfo ="" |
1956 | 2770 | if (len(sources) > 0): |
1957 | 2771 | source = "bzip2:%s" % (";".join(sources) ) |
1958 | 2772 | runner.showRunnerState("... building %s %s XML dump, with text prefetch from %s..." % (self._subset, chunkinfo, source)) |
— | — | @@ -1966,59 +2780,33 @@ |
1967 | 2781 | spawn = None |
1968 | 2782 | |
1969 | 2783 | if (not exists( runner.wiki.config.php ) ): |
1970 | | - raise BackupError("php command %s not found" % runner.wiki.config.php); |
| 2784 | + raise BackupError("php command %s not found" % runner.wiki.config.php) |
| 2785 | + |
| 2786 | + if (self.wiki.config.checkpointTime): |
| 2787 | + checkpointTime = "--maxtime=%s" % (runner.wiki.config.checkpointTime) |
| 2788 | + checkpointFile = "--checkpointfile=%s" % outputFile.newFilename(outputFile.dumpName, outputFile.fileType, outputFile.fileExt, outputFile.date, outputFile.chunk, "p%sp%s", None) |
| 2789 | + else: |
| 2790 | + checkpointTime = None |
| 2791 | + checkpointFile = None |
1971 | 2792 | dumpCommand = [ "%s" % runner.wiki.config.php, |
1972 | 2793 | "-q", "%s/maintenance/dumpTextPass.php" % runner.wiki.config.wikiDir, |
1973 | 2794 | "--wiki=%s" % runner.dbName, |
1974 | 2795 | "%s" % stubOption, |
1975 | 2796 | "%s" % prefetch, |
1976 | 2797 | "%s" % runner.forceNormalOption(), |
| 2798 | + "%s" % checkpointTime, |
| 2799 | + "%s" % checkpointFile, |
1977 | 2800 | "--report=1000", |
1978 | 2801 | "%s" % spawn ] |
| 2802 | + |
1979 | 2803 | command = dumpCommand |
1980 | | - filters = self.buildFilters(runner, chunk) |
| 2804 | + filters = self.buildFilters(runner, outputFile) |
1981 | 2805 | eta = self.buildEta(runner) |
1982 | 2806 | command.extend([ filters, eta ]) |
1983 | 2807 | pipeline = [ command ] |
1984 | 2808 | series = [ pipeline ] |
1985 | 2809 | return series |
1986 | 2810 | |
1987 | | - # given filename, (assume bz2 compression) dig out the first page id in that file |
1988 | | - def findFirstPageIDInFile(self, runner, fileName): |
1989 | | - if (fileName in self._pageID): |
1990 | | - return self._pageID[fileName] |
1991 | | - pageID = None |
1992 | | - pipeline = [] |
1993 | | - if (not exists( runner.wiki.config.bzip2 ) ): |
1994 | | - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2); |
1995 | | - uncompressionCommand = [ "%s" % runner.wiki.config.bzip2, "-dc", fileName ] |
1996 | | - pipeline.append(uncompressionCommand) |
1997 | | - # warning: we figure any header (<siteinfo>...</siteinfo>) is going to be less than 2000 lines! |
1998 | | - if (not exists( runner.wiki.config.head ) ): |
1999 | | - raise BackupError("head command %s not found" % runner.wiki.config.head); |
2000 | | - head = runner.wiki.config.head |
2001 | | - headEsc = MiscUtils.shellEscape(head) |
2002 | | - pipeline.append([ head, "-2000"]) |
2003 | | - # without shell |
2004 | | - p = CommandPipeline(pipeline, quiet=True) |
2005 | | - p.runPipelineAndGetOutput() |
2006 | | - if (p.output()): |
2007 | | - pageData = p.output() |
2008 | | - titleAndIDPattern = re.compile('<title>(?P<title>.+?)</title>\s*' + '<id>(?P<pageid>\d+?)</id>') |
2009 | | - result = titleAndIDPattern.search(pageData) |
2010 | | - if (result): |
2011 | | - pageID = result.group('pageid') |
2012 | | - self._pageID[fileName] = pageID |
2013 | | - return(pageID) |
2014 | | - |
2015 | | - |
2016 | | - def filenameHasChunk(self, filename, ext): |
2017 | | - fileNamePattern = re.compile('.*pages-' + self._subset + '[0-9]+.xml.' + ext +'$') |
2018 | | - if (fileNamePattern.match(filename)): |
2019 | | - return True |
2020 | | - else: |
2021 | | - return False |
2022 | | - |
2023 | 2811 | # taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net) |
2024 | 2812 | def sort_nicely(self, l): |
2025 | 2813 | """ Sort the given list in the way that humans expect. |
— | — | @@ -2027,149 +2815,149 @@ |
2028 | 2816 | alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ] |
2029 | 2817 | l.sort( key=alphanum_key ) |
2030 | 2818 | |
| 2819 | + def getRelevantPrefetchFiles(self, fileList, startPageID, endPageID, date, runner): |
| 2820 | + possibles = [] |
| 2821 | + if (len(fileList)): |
| 2822 | + # (a) nasty hack, see below (b) |
| 2823 | + maxchunks = 0 |
| 2824 | + for fileObj in fileList: |
| 2825 | + if fileObj.isChunkFile and fileObj.chunkInt > maxchunks: |
| 2826 | + maxchunks = fileObj.chunkInt |
| 2827 | + if not fileObj.firstPageID: |
| 2828 | + f = DumpFile(self.wiki, runner.dumpDir.filenamePublicPath(fileObj, date), fileObj) |
| 2829 | + fileObj.firstPageID = f.findFirstPageIDInFile() |
| 2830 | + |
| 2831 | + # get the files that cover our range |
| 2832 | + for fileObj in fileList: |
| 2833 | + firstPageIdInFile = int(fileObj.firstPageID) |
| 2834 | + |
| 2835 | + # fixme what do we do here? this could be very expensive. is that |
| 2836 | + # worth it?? |
| 2837 | + if not fileObj.lastPageID: |
| 2838 | + # (b) nasty hack, see (a) |
| 2839 | + # it's not a chekcpoint fle or we'd have the pageid in the filename |
| 2840 | + # so... temporary hack which will give expensive results |
| 2841 | + # if chunk file, and it's the last chunk, put none |
| 2842 | + # if it's not the last chunk, get the first pageid in the next chunk and subtract 1 |
| 2843 | + # if not chunk, put none. |
| 2844 | + if fileObj.isChunkFile and fileObj.chunkInt < maxchunks: |
| 2845 | + for f in fileList: |
| 2846 | + if f.chunkInt == fileObj.chunkInt + 1: |
| 2847 | + # not true! this could be a few past where it really is |
| 2848 | + # (because of deleted pages that aren't included at all) |
| 2849 | + fileObj.lastPageID = str(int(f.firstPageID) - 1) |
| 2850 | + if fileObj.lastPageID: |
| 2851 | + lastPageIdInFile = int(fileObj.lastPageID) |
| 2852 | + else: |
| 2853 | + lastPageIdInFile = None |
| 2854 | + |
| 2855 | + # FIXME there is no point in including files that have just a few rev ids in them |
| 2856 | + # that we need, and having to read through the whole file... could take |
| 2857 | + # hours or days (later it won't matter, right? but until a rewrite, this is important) |
| 2858 | + # also be sure that if a critical page is deleted by the time we try to figure out ranges, |
| 2859 | + # that we don't get hosed |
| 2860 | + if ( firstPageIdInFile <= int(startPageID) and (lastPageIdInFile == None or lastPageIdInFile >= int(startPageID)) ) or ( firstPageIdInFile >= int(startPageID) and ( endPageID == None or firstPageIdInFile <= int(endPageID) ) ): |
| 2861 | + possibles.append(fileObj) |
| 2862 | + return possibles |
2031 | 2863 | |
2032 | 2864 | # this finds the content file or files from the first previous successful dump |
2033 | 2865 | # to be used as input ("prefetch") for this run. |
2034 | | - def _findPreviousDump(self, runner, chunk = 0): |
| 2866 | + def _findPreviousDump(self, runner, chunk = None): |
2035 | 2867 | """The previously-linked previous successful dump.""" |
2036 | | - bzfile = self._file("bz2") |
2037 | 2868 | if (chunk): |
2038 | | - startPageID = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1 |
2039 | | - if (len(self._chunks) > chunk): |
2040 | | - endPageID = sum([ self._chunks[i] for i in range(0,chunk)]) |
| 2869 | + startPageID = sum([ self._chunks[i] for i in range(0,int(chunk)-1)]) + 1 |
| 2870 | + if (len(self._chunks) > int(chunk)): |
| 2871 | + endPageID = sum([ self._chunks[i] for i in range(0,int(chunk))]) |
2041 | 2872 | else: |
2042 | 2873 | endPageID = None |
2043 | | - # we will look for the first chunk file, if it's there and the |
2044 | | - # status of the job is ok then we will get the rest of the info |
2045 | | - bzfileChunk = self._file("bz2", 1) |
2046 | | - bzfileGlob = self._file("bz2", '[1-9]*') |
2047 | | - currentChunk = realpath(runner.dumpDir.publicPath(bzfile)) |
2048 | | - current = realpath(runner.dumpDir.publicPath(bzfile)) |
| 2874 | + else: |
| 2875 | + startPageID = 1 |
| 2876 | + endPageID = None |
| 2877 | + |
2049 | 2878 | dumps = runner.wiki.dumpDirs() |
2050 | 2879 | dumps.sort() |
2051 | 2880 | dumps.reverse() |
2052 | 2881 | for date in dumps: |
2053 | | - base = runner.wiki.publicDir() |
2054 | | - # first see if a "chunk" file is there, if not we will accept |
2055 | | - # using the the single file dump although it will be slower |
2056 | | - possibles = [] |
2057 | | - oldChunk = None |
2058 | | - # scan all the existing chunk files and get the first page ID from each |
2059 | | - if (chunk): |
2060 | | - oldChunk = runner.dumpDir.buildPath(base, date, bzfileChunk) |
2061 | | - oldGlob = runner.dumpDir.buildPath(base, date, bzfileGlob) |
2062 | | - pageIDs = [] |
2063 | | - bzfileChunks = glob.glob(oldGlob) |
2064 | | - self.sort_nicely(bzfileChunks) |
2065 | | - if (bzfileChunks): |
2066 | | - for fileName in bzfileChunks: |
2067 | | - pageID = self.findFirstPageIDInFile(runner, fileName ) |
2068 | | - if (pageID): |
2069 | | - pageIDs.append(pageID) |
| 2882 | + if (date == self.wiki.date): |
| 2883 | + runner.debug("skipping current dump for prefetch of job %s, date %s" % (self.name(), self.wiki.date)) |
| 2884 | + continue |
2070 | 2885 | |
2071 | | - old = runner.dumpDir.buildPath(base, date, bzfile) |
2072 | | - if (oldChunk): |
2073 | | - if exists(oldChunk): |
2074 | | - possibles.append(oldChunk) |
2075 | | - if (old): |
2076 | | - if exists(old): |
2077 | | - possibles.append(old) |
| 2886 | + # see if this job from that date was successful |
| 2887 | + if not runner.runInfoFile.statusOfOldDumpIsDone(runner, date, self.name(), self._desc): |
| 2888 | + runner.debug("skipping incomplete or failed dump for prefetch date %s" % date) |
| 2889 | + continue |
2078 | 2890 | |
2079 | | - for possible in possibles: |
2080 | | - if exists(possible): |
2081 | | - size = getsize(possible) |
2082 | | - if size < 70000: |
2083 | | - runner.debug("small %d-byte prefetch dump at %s, skipping" % (size, possible)) |
2084 | | - continue |
2085 | | - if realpath(old) == current: |
2086 | | - runner.debug("skipping current dump for prefetch %s" % possible) |
2087 | | - continue |
2088 | | - if not runner.runInfoFile.statusOfOldDumpIsDone(runner, date, self.name, self._desc): |
2089 | | - runner.debug("skipping incomplete or failed dump for prefetch %s" % possible) |
2090 | | - continue |
2091 | | - if (chunk) and (self.filenameHasChunk(possible, "bz2")): |
2092 | | - runner.debug("Prefetchable %s etc." % possible) |
2093 | | - else: |
2094 | | - runner.debug("Prefetchable %s" % possible) |
2095 | | - # found something workable, now check the chunk situation |
2096 | | - if (chunk): |
2097 | | - if (self.filenameHasChunk(possible, "bz2")): |
2098 | | - if len(pageIDs) > 0: |
2099 | | - possibleStartNum = None |
2100 | | - for i in range(len(pageIDs)): |
2101 | | - if int(pageIDs[i]) <= int(startPageID): |
2102 | | - # chunk number of file starts at 1. |
2103 | | - possibleStartNum = i+1 |
2104 | | - else: |
2105 | | - break; |
2106 | | - if possibleStartNum: |
2107 | | - possibleEndNum = possibleStartNum |
2108 | | - for j in range(i,len(pageIDs)): |
2109 | | - if (not endPageID) or (int(pageIDs[j]) <= int(endPageID)): |
2110 | | - # chunk number of file starts at 1. |
2111 | | - possibleEndNum = j + 1 |
2112 | | - else: |
2113 | | - break |
2114 | | - # now we have the range of the relevant files, put together the list. |
2115 | | - possible = [ runner.dumpDir.buildPath(base, date, self._file("bz2", k)) for k in range(possibleStartNum,possibleEndNum+1) ] |
2116 | | - return possible |
2117 | | - else: |
2118 | | - continue |
2119 | | - |
2120 | | - return [ possible ] |
| 2891 | + # first check if there are checkpoint files from this run we can use |
| 2892 | + files = self.listCheckpointFilesExisting(runner.dumpDir, [ self.dumpName ], date, chunks = None) |
| 2893 | + possiblePrefetchList = self.getRelevantPrefetchFiles(files, startPageID, endPageID, date, runner) |
| 2894 | + if (len(possiblePrefetchList)): |
| 2895 | + return(possiblePrefetchList) |
| 2896 | + |
| 2897 | + # ok, let's check for chunk files instead, from any run (may not conform to our numbering |
| 2898 | + # for this job) |
| 2899 | + files = self.listRegularFilesExisting(runner.dumpDir,[ self.dumpName ], date, chunks = True) |
| 2900 | + possiblePrefetchList = self.getRelevantPrefetchFiles(files, startPageID, endPageID, date, runner) |
| 2901 | + if (len(possiblePrefetchList)): |
| 2902 | + return(possiblePrefetchList) |
| 2903 | + |
| 2904 | + # last shot, get output file that contains all the pages, if there is one |
| 2905 | + files = self.listRegularFilesExisting(runner.dumpDir, [ self.dumpName ], date, chunks = False) |
| 2906 | + # there is only one, don't bother to check for relevance :-P |
| 2907 | + possiblePrefetchList = files |
| 2908 | + files = [] |
| 2909 | + for p in possiblePrefetchList: |
| 2910 | + possible = runner.dumpDir.filenamePublicPath(p, date) |
| 2911 | + size = os.path.getsize(possible) |
| 2912 | + if size < 70000: |
| 2913 | + runner.debug("small %d-byte prefetch dump at %s, skipping" % (size, possible)) |
| 2914 | + continue |
| 2915 | + else: |
| 2916 | + files.append(p) |
| 2917 | + if (len(files)): |
| 2918 | + return(files) |
| 2919 | + |
2121 | 2920 | runner.debug("Could not locate a prefetchable dump.") |
2122 | 2921 | return None |
2123 | 2922 | |
2124 | | - def listOutputFiles(self, runner): |
2125 | | - if (self._chunks): |
2126 | | - files = [] |
2127 | | - for i in range(1, len(self._chunks)+1): |
2128 | | - files.append(self._file("bz2",i)) |
2129 | | - return files |
2130 | | - else: |
2131 | | - return [ self._file("bz2",0) ] |
2132 | | - |
2133 | 2923 | class RecombineXmlDump(XmlDump): |
2134 | | - def __init__(self, subset, name, desc, detail, chunks = False): |
| 2924 | + def __init__(self, name, desc, detail, itemForXmlDumps): |
2135 | 2925 | # no prefetch, no spawn |
2136 | | - XmlDump.__init__(self, subset, name, desc, detail, None, None, False, chunks) |
2137 | | - # this is here only so that a callback can capture output from some commands |
2138 | | - # related to recombining files if we did parallel runs of the recompression |
2139 | | - self._output = None |
| 2926 | + self.itemForXmlDumps = itemForXmlDumps |
| 2927 | + self._detail = detail |
| 2928 | + Dump.__init__(self, name, desc) |
| 2929 | + # the input may have checkpoints but the output will not. |
| 2930 | + self._checkpointsEnabled = False |
2140 | 2931 | |
2141 | | - def listInputFiles(self, runner): |
2142 | | - return XmlDump.listOutputFiles(self,runner) |
| 2932 | + def listDumpNames(self): |
| 2933 | + return self.itemForXmlDumps.listDumpNames() |
2143 | 2934 | |
2144 | | - def listOutputFiles(self, runner): |
2145 | | - return [ self._file("bz2",0) ] |
| 2935 | + def getFileType(self): |
| 2936 | + return self.itemForXmlDumps.getFileType() |
2146 | 2937 | |
| 2938 | + def getFileExt(self): |
| 2939 | + return self.itemForXmlDumps.getFileExt() |
| 2940 | + |
| 2941 | + def getDumpName(self): |
| 2942 | + return self.itemForXmlDumps.getDumpName() |
| 2943 | + |
2147 | 2944 | def run(self, runner): |
| 2945 | + files = self.itemForXmlDumps.listOutputFilesForInput(runner.dumpDir) |
| 2946 | + outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 2947 | + if (len(outputFiles) > 1): |
| 2948 | + raise BackupError("recombine XML Dump trying to produce more than one output file") |
| 2949 | + |
2148 | 2950 | error=0 |
2149 | | - if (self._chunks): |
2150 | | - files = self.listInputFiles(runner) |
2151 | | - outputFileList = self.listOutputFiles(runner) |
2152 | | - for outputFile in outputFileList: |
2153 | | - inputFiles = [] |
2154 | | - for inFile in files: |
2155 | | - (base, rest) = inFile.split('.',1) |
2156 | | - base = re.sub("\d+$", "", base) |
2157 | | - if base + "." + rest == outputFile: |
2158 | | - inputFiles.append(inFile) |
2159 | | - if not len(inputFiles): |
2160 | | - self.setStatus("failed") |
2161 | | - raise BackupError("No input files for %s found" % self.name) |
2162 | | - if (not exists( runner.wiki.config.bzip2 ) ): |
2163 | | - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2); |
2164 | | - compressionCommand = runner.wiki.config.bzip2 |
2165 | | - compressionCommand = "%s > " % runner.wiki.config.bzip2 |
2166 | | - uncompressionCommand = [ "%s" % runner.wiki.config.bzip2, "-dc" ] |
2167 | | - recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand ) |
2168 | | - recombineCommand = [ recombineCommandString ] |
2169 | | - recombinePipeline = [ recombineCommand ] |
2170 | | - series = [ recombinePipeline ] |
2171 | | - result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
2172 | | - if result: |
2173 | | - error = result |
| 2951 | + if (not exists( runner.wiki.config.bzip2 ) ): |
| 2952 | + raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2) |
| 2953 | + compressionCommand = runner.wiki.config.bzip2 |
| 2954 | + compressionCommand = "%s > " % runner.wiki.config.bzip2 |
| 2955 | + uncompressionCommand = [ "%s" % runner.wiki.config.bzip2, "-dc" ] |
| 2956 | + recombineCommandString = self.buildRecombineCommandString(runner, files, outputFiles[0], compressionCommand, uncompressionCommand ) |
| 2957 | + recombineCommand = [ recombineCommandString ] |
| 2958 | + recombinePipeline = [ recombineCommand ] |
| 2959 | + series = [ recombinePipeline ] |
| 2960 | + error = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 2961 | + |
2174 | 2962 | if (error): |
2175 | 2963 | raise BackupError("error recombining xml bz2 files") |
2176 | 2964 | |
— | — | @@ -2184,171 +2972,174 @@ |
2185 | 2973 | class XmlRecompressDump(Dump): |
2186 | 2974 | """Take a .bz2 and recompress it as 7-Zip.""" |
2187 | 2975 | |
2188 | | - def __init__(self, subset, name, desc, detail, chunkToDo, chunks = False): |
2189 | | - Dump.__init__(self, name, desc) |
| 2976 | + def __init__(self, subset, name, desc, detail, itemForRecompression, wiki, chunkToDo, chunks = False, checkpoints = False): |
2190 | 2977 | self._subset = subset |
2191 | 2978 | self._detail = detail |
2192 | 2979 | self._chunks = chunks |
| 2980 | + if self._chunks: |
| 2981 | + self._chunksEnabled = True |
2193 | 2982 | self._chunkToDo = chunkToDo |
| 2983 | + self.wiki = wiki |
| 2984 | + self.itemForRecompression = itemForRecompression |
| 2985 | + if checkpoints: |
| 2986 | + self._checkpointsEnabled = True |
| 2987 | + Dump.__init__(self, name, desc) |
2194 | 2988 | |
2195 | | - def detail(self): |
2196 | | - """Optionally return additional text to appear under the heading.""" |
2197 | | - return self._detail |
| 2989 | + def getDumpName(self): |
| 2990 | + return "pages-" + self._subset |
2198 | 2991 | |
2199 | | - def _file(self, ext, chunk = 0): |
2200 | | - if (chunk): |
2201 | | - return "pages-" + self._subset + ("%s.xml." % chunk) + ext |
2202 | | - else: |
2203 | | - return "pages-" + self._subset + ".xml." + ext |
| 2992 | + def getFileType(self): |
| 2993 | + return "xml" |
2204 | 2994 | |
2205 | | - def _path(self, runner, ext, chunk=0): |
2206 | | - return runner.dumpDir.publicPath(self._file(ext,chunk)) |
| 2995 | + def getFileExt(self): |
| 2996 | + return "7z" |
2207 | 2997 | |
2208 | | - def buildOutputFilename(self, runner, chunk=0): |
2209 | | - if (chunk): |
2210 | | - xml7z = self._path(runner, "7z", chunk) |
2211 | | - else: |
2212 | | - xml7z = self._path(runner, "7z") |
2213 | | - return(xml7z) |
2214 | | - |
2215 | | - def getInputFilename(self, runner, chunk): |
2216 | | - if (chunk): |
2217 | | - xmlbz2 = self._path(runner, "bz2", chunk) |
2218 | | - else: |
2219 | | - xmlbz2 = self._path(runner, "bz2") |
2220 | | - return(xmlbz2) |
2221 | | - |
2222 | | - def buildCommand(self, runner, chunk = 0): |
2223 | | - xmlbz2 = self.getInputFilename(runner, chunk) |
2224 | | - xml7z = self.buildOutputFilename(runner, chunk) |
2225 | | - |
| 2998 | + # output files is a list of checkpoint files, otherwise it is a list of one file. |
| 2999 | + # checkpoint files get done one at a time. we can't really do parallel recompression jobs of |
| 3000 | + # 200 files, right? |
| 3001 | + def buildCommand(self, runner, outputFiles): |
2226 | 3002 | # FIXME need shell escape |
2227 | 3003 | if (not exists( runner.wiki.config.bzip2 ) ): |
2228 | | - raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2); |
| 3004 | + raise BackupError("bzip2 command %s not found" % runner.wiki.config.bzip2) |
2229 | 3005 | if (not exists( runner.wiki.config.sevenzip ) ): |
2230 | | - raise BackupError("7zip command %s not found" % runner.wiki.config.sevenzip); |
2231 | | - commandPipe = [ [ "%s -dc %s | %s a -si %s" % (runner.wiki.config.bzip2, xmlbz2, runner.wiki.config.sevenzip, xml7z) ] ] |
2232 | | - commandSeries = [ commandPipe ] |
| 3006 | + raise BackupError("7zip command %s not found" % runner.wiki.config.sevenzip) |
| 3007 | + |
| 3008 | + commandSeries = [] |
| 3009 | + for f in outputFiles: |
| 3010 | + inputFile = DumpFilename(runner.wiki, None, f.dumpName, f.fileType, self.itemForRecompression.fileExt, f.chunk, f.checkpoint) |
| 3011 | + outfile = runner.dumpDir.filenamePublicPath(f) |
| 3012 | + infile = runner.dumpDir.filenamePublicPath(inputFile) |
| 3013 | + commandPipe = [ [ "%s -dc %s | %s a -si %s" % (runner.wiki.config.bzip2, infile, runner.wiki.config.sevenzip, outfile) ] ] |
| 3014 | + commandSeries.append(commandPipe) |
2233 | 3015 | return(commandSeries) |
2234 | 3016 | |
2235 | | - def cleanupOldFiles(self, runner, chunk = 0): |
2236 | | - if (runner._cleanupOldFilesEnabled): |
2237 | | - xml7z = self.buildOutputFilename(runner, chunk) |
2238 | | - if exists(xml7z): |
2239 | | - runner.removeFile(xml7z) |
2240 | | - |
2241 | 3017 | def run(self, runner): |
2242 | 3018 | if runner.lastFailed: |
2243 | 3019 | raise BackupError("bz2 dump incomplete, not recompressing") |
2244 | 3020 | commands = [] |
2245 | | - if (self._chunks): |
2246 | | - if (self._chunkToDo): |
2247 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
2248 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
2249 | | - self.cleanupOldFiles(runner, self._chunkToDo) |
2250 | | - series = self.buildCommand(runner, self._chunkToDo) |
| 3021 | + # Remove prior 7zip attempts; 7zip will try to append to an existing archive |
| 3022 | + self.cleanupOldFiles(runner.dumpDir) |
| 3023 | + if self._chunksEnabled and not self._chunkToDo: |
| 3024 | + # must set up each parallel job separately, they may have checkpoint files that |
| 3025 | + # need to be processed in series, it's a special case |
| 3026 | + for i in range(1, len(self._chunks)+1): |
| 3027 | + outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir, i) |
| 3028 | + series = self.buildCommand(runner, outputFiles) |
2251 | 3029 | commands.append(series) |
2252 | | - else: |
2253 | | - for i in range(1, len(self._chunks)+1): |
2254 | | - # Clear prior 7zip attempts; 7zip will try to append an existing archive |
2255 | | - self.cleanupOldFiles(runner, i) |
2256 | | - series = self.buildCommand(runner, i) |
2257 | | - commands.append(series) |
2258 | 3030 | else: |
2259 | | - # Clear prior 7zip attempts; 7zip will try to append an existing archive |
2260 | | - self.cleanupOldFiles(runner) |
2261 | | - series = self.buildCommand(runner) |
| 3031 | + outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 3032 | + series = self.buildCommand(runner, outputFiles) |
2262 | 3033 | commands.append(series) |
| 3034 | + |
2263 | 3035 | error = runner.runCommand(commands, callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
2264 | | - # temp hack force 644 permissions until ubuntu bug # 370618 is fixed - tomasz 5/1/2009 |
2265 | | - # some hacks aren't so temporary - atg 3 sept 2010 |
2266 | | - if (self._chunks): |
2267 | | - if (self._chunkToDo): |
2268 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
2269 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
2270 | | - xml7z = self.buildOutputFilename(runner,self._chunkToDo) |
2271 | | - if exists(xml7z): |
2272 | | - os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH ) |
2273 | | - else: |
2274 | | - for i in range(1, len(self._chunks)+1): |
2275 | | - xml7z = self.buildOutputFilename(runner,i) |
2276 | | - if exists(xml7z): |
2277 | | - os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH ) |
2278 | | - else: |
2279 | | - xml7z = self.buildOutputFilename(runner) |
2280 | | - if exists(xml7z): |
2281 | | - os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH ) |
2282 | 3036 | if (error): |
2283 | 3037 | raise BackupError("error recompressing bz2 file(s)") |
2284 | | - |
2285 | | - def listOutputFiles(self, runner): |
2286 | | - if (self._chunks): |
2287 | | - if (self._chunkToDo): |
2288 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
2289 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
2290 | | - files = [] |
2291 | | - files.append(self._file("7z",self._chunkToDo)) |
2292 | | - return files |
2293 | | - else: |
2294 | | - files = [] |
2295 | | - for i in range(1, len(self._chunks)+1): |
2296 | | - files.append(self._file("7z",i)) |
2297 | | - return files |
| 3038 | + |
| 3039 | + # shows all files possible if we don't have checkpoint files. without temp files of course |
| 3040 | + def listOutputFilesToPublish(self, dumpDir): |
| 3041 | + files = [] |
| 3042 | + inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir) |
| 3043 | + for f in inputFiles: |
| 3044 | + files.append(DumpFilename(wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp)) |
| 3045 | + return files |
| 3046 | + |
| 3047 | + # shows all files possible if we don't have checkpoint files. without temp files of course |
| 3048 | + # only the chunks we are actually supposed to do (if there is a limit) |
| 3049 | + def listOutputFilesToCheckForTruncation(self, dumpDir): |
| 3050 | + files = [] |
| 3051 | + inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir) |
| 3052 | + for f in inputFiles: |
| 3053 | + if self._chunkToDo and f.chunkInt != self._chunkToDo: |
| 3054 | + continue |
| 3055 | + files.append(DumpFilename(wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp)) |
| 3056 | + return files |
| 3057 | + |
| 3058 | + # shows all files possible if we don't have checkpoint files. no temp files. |
| 3059 | + # only the chunks we are actually supposed to do (if there is a limit) |
| 3060 | + def listOutputFilesForBuildCommand(self, dumpDir, chunk = None): |
| 3061 | + files = [] |
| 3062 | + inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir) |
| 3063 | + for f in inputFiles: |
| 3064 | + # if this param is set it takes priority |
| 3065 | + if chunk and f.chunkInt != chunk: |
| 3066 | + continue |
| 3067 | + elif self._chunkToDo and f.chunkInt != self._chunkToDo: |
| 3068 | + continue |
| 3069 | + files.append(DumpFilename(wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp)) |
| 3070 | + return files |
| 3071 | + |
| 3072 | + # shows all files possible if we don't have checkpoint files. should include temp files |
| 3073 | + # does just the chunks we do if there is a limit |
| 3074 | + def listOutputFilesForCleanup(self, dumpDir, dumpNames = None): |
| 3075 | + # some stages (eg XLMStubs) call this for several different dumpNames |
| 3076 | + if (dumpNames == None): |
| 3077 | + dumpNames = [ self.dumpName ] |
| 3078 | + files = [] |
| 3079 | + if (self.itemForRecompression._checkpointsEnabled): |
| 3080 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 3081 | + files.extend(self.listCheckpointFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 3082 | + files.extend(self.listTempFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
2298 | 3083 | else: |
2299 | | - return [ self._file("7z",0) ] |
| 3084 | + # fixme this should be a list |
| 3085 | + # we will pass list of chunks or chunkToDo, or False, depending on the job setup. |
| 3086 | + files.extend(self.listRegularFilesPerChunkExisting(dumpDir, self.getChunkList(), dumpNames)) |
| 3087 | + return files |
2300 | 3088 | |
2301 | | - def getCommandOutputCallback(self, line): |
2302 | | - self._output = line |
| 3089 | + # must return all output files that could be produced by a full run of this stage, |
| 3090 | + # not just whatever we happened to produce (if run for one chunk, say) |
| 3091 | + def listOutputFilesForInput(self, dumpDir): |
| 3092 | + files = [] |
| 3093 | + inputFiles = self.itemForRecompression.listOutputFilesForInput(dumpDir) |
| 3094 | + for f in inputFiles: |
| 3095 | + files.append(DumpFilename(wiki, f.date, f.dumpName, f.fileType, self.fileExt, f.chunk, f.checkpoint, f.temp)) |
| 3096 | + return files |
2303 | 3097 | |
2304 | | -class RecombineXmlRecompressDump(XmlRecompressDump): |
2305 | | - def __init__(self, subset, name, desc, detail, chunks): |
2306 | | - XmlRecompressDump.__init__(self, subset, name, desc, detail, False, chunks) |
2307 | | - # this is here only so that a callback can capture output from some commands |
2308 | | - # related to recombining files if we did parallel runs of the recompression |
2309 | | - self._output = None |
2310 | 3098 | |
2311 | | - def listInputFiles(self, runner): |
2312 | | - return XmlRecompressDump.listOutputFiles(self,runner) |
| 3099 | +class RecombineXmlRecompressDump(Dump): |
| 3100 | + def __init__(self, name, desc, detail, itemForRecombine, wiki): |
| 3101 | + self._detail = detail |
| 3102 | + self._desc = desc |
| 3103 | + self.wiki = wiki |
| 3104 | + self.itemForRecombine = itemForRecombine |
| 3105 | + Dump.__init__(self, name, desc) |
| 3106 | + # the input may have checkpoints but the output will not. |
| 3107 | + self._checkpointsEnabled = False |
| 3108 | + self._chunksEnabled = False |
2313 | 3109 | |
2314 | | - def listOutputFiles(self, runner): |
2315 | | - return [ self._file("7z",0) ] |
| 3110 | + def getFileType(self): |
| 3111 | + return self.itemForRecombine.getFileType() |
2316 | 3112 | |
2317 | | - def cleanupOldFiles(self, runner): |
2318 | | - if (runner._cleanupOldFilesEnabled): |
2319 | | - files = self.listOutputFiles(runner) |
2320 | | - for filename in files: |
2321 | | - filename = runner.dumpDir.publicPath(filename) |
2322 | | - if exists(filename): |
2323 | | - runner.removeFile(filename) |
| 3113 | + def getFileExt(self): |
| 3114 | + return self.itemForRecombine.getFileExt() |
2324 | 3115 | |
| 3116 | + def getDumpName(self): |
| 3117 | + return self.itemForRecombine.getDumpName() |
| 3118 | + |
2325 | 3119 | def run(self, runner): |
2326 | 3120 | error = 0 |
2327 | | - if (self._chunks): |
2328 | | - self.cleanupOldFiles(runner) |
2329 | | - files = self.listInputFiles(runner) |
2330 | | - outputFileList = self.listOutputFiles(runner) |
2331 | | - for outputFile in outputFileList: |
2332 | | - inputFiles = [] |
2333 | | - for inFile in files: |
2334 | | - (base, rest) = inFile.split('.',1) |
2335 | | - base = re.sub("\d+$", "", base) |
2336 | | - if base + "." + rest == outputFile: |
2337 | | - inputFiles.append(inFile) |
2338 | | - if not len(inputFiles): |
2339 | | - self.setStatus("failed") |
2340 | | - raise BackupError("No input files for %s found" % self.name) |
2341 | | - if (not exists( runner.wiki.config.sevenzip ) ): |
2342 | | - raise BackupError("sevenzip command %s not found" % runner.wiki.config.sevenzip); |
2343 | | - compressionCommand = "%s a -si" % runner.wiki.config.sevenzip |
2344 | | - uncompressionCommand = [ "%s" % runner.wiki.config.sevenzip, "e", "-so" ] |
| 3121 | + self.cleanupOldFiles(runner.dumpDir) |
| 3122 | + outputFileList = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 3123 | + for outputFile in outputFileList: |
| 3124 | + inputFiles = [] |
| 3125 | + files = self.itemForRecombine.listOutputFilesForInput(runner.dumpDir) |
| 3126 | + for inFile in files: |
| 3127 | + if inFile.dumpName == outputFile.dumpName: |
| 3128 | + inputFiles.append(inFile) |
| 3129 | + if not len(inputFiles): |
| 3130 | + self.setStatus("failed") |
| 3131 | + raise BackupError("No input files for %s found" % self.name()) |
| 3132 | + if (not exists( runner.wiki.config.sevenzip ) ): |
| 3133 | + raise BackupError("sevenzip command %s not found" % runner.wiki.config.sevenzip) |
| 3134 | + compressionCommand = "%s a -si" % runner.wiki.config.sevenzip |
| 3135 | + uncompressionCommand = [ "%s" % runner.wiki.config.sevenzip, "e", "-so" ] |
2345 | 3136 | |
2346 | | - recombineCommandString = self.buildRecombineCommandString(runner, files, outputFile, compressionCommand, uncompressionCommand ) |
2347 | | - recombineCommand = [ recombineCommandString ] |
2348 | | - recombinePipeline = [ recombineCommand ] |
2349 | | - series = [ recombinePipeline ] |
2350 | | - result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
2351 | | - if result: |
2352 | | - error = result |
| 3137 | + recombineCommandString = self.buildRecombineCommandString(runner, files, outputFile, compressionCommand, uncompressionCommand ) |
| 3138 | + recombineCommand = [ recombineCommandString ] |
| 3139 | + recombinePipeline = [ recombineCommand ] |
| 3140 | + series = [ recombinePipeline ] |
| 3141 | + result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 3142 | + if result: |
| 3143 | + error = result |
2353 | 3144 | if (error): |
2354 | 3145 | raise BackupError("error recombining xml bz2 file(s)") |
2355 | 3146 | |
— | — | @@ -2356,69 +3147,79 @@ |
2357 | 3148 | """XML dump for Yahoo!'s Active Abstracts thingy""" |
2358 | 3149 | |
2359 | 3150 | def __init__(self, name, desc, chunkToDo, chunks = False): |
2360 | | - Dump.__init__(self, name, desc) |
2361 | 3151 | self._chunkToDo = chunkToDo |
2362 | 3152 | self._chunks = chunks |
| 3153 | + if self._chunks: |
| 3154 | + self._chunksEnabled = True |
| 3155 | + Dump.__init__(self, name, desc) |
2363 | 3156 | |
2364 | | - def buildCommand(self, runner, chunk = 0): |
| 3157 | + def getDumpName(self): |
| 3158 | + return "abstract" |
| 3159 | + |
| 3160 | + def getFileType(self): |
| 3161 | + return "xml" |
| 3162 | + |
| 3163 | + def getFileExt(self): |
| 3164 | + # fixme no file extension, see what this means for everything |
| 3165 | + return "" |
| 3166 | + |
| 3167 | + def buildCommand(self, runner, f): |
2365 | 3168 | if (not exists( runner.wiki.config.php ) ): |
2366 | | - raise BackupError("php command %s not found" % runner.wiki.config.php); |
| 3169 | + raise BackupError("php command %s not found" % runner.wiki.config.php) |
2367 | 3170 | command = [ "%s" % runner.wiki.config.php, |
2368 | 3171 | "-q", "%s/maintenance/dumpBackup.php" % runner.wiki.config.wikiDir, |
2369 | 3172 | "--wiki=%s" % runner.dbName, |
2370 | 3173 | "--plugin=AbstractFilter:%s/extensions/ActiveAbstract/AbstractFilter.php" % runner.wiki.config.wikiDir, |
2371 | 3174 | "--current", "--report=1000", "%s" % runner.forceNormalOption(), |
2372 | 3175 | ] |
2373 | | - for variant in self._variants(runner): |
2374 | | - command.extend( [ "--output=file:%s" % runner.dumpDir.publicPath(self._variantFile(variant, chunk)), |
| 3176 | + |
| 3177 | + for v in self._variants(): |
| 3178 | + variantOption = self._variantOption(v) |
| 3179 | + dumpName = self.dumpNameFromVariant(v) |
| 3180 | + fileObj = DumpFilename(runner.wiki, f.date, dumpName, f.fileType, f.fileExt, f.chunk, f.checkpoint) |
| 3181 | + command.extend( [ "--output=file:%s" % runner.dumpDir.filenamePublicPath(fileObj), |
2375 | 3182 | "--filter=namespace:NS_MAIN", "--filter=noredirect", |
2376 | | - "--filter=abstract%s" % self._variantOption(variant) ] ) |
2377 | | - if (chunk): |
2378 | | - # set up start end end pageids for this piece |
2379 | | - # note there is no page id 0 I guess. so we start with 1 |
2380 | | - # start = runner.pagesPerChunk()*(chunk-1) + 1 |
2381 | | - start = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1 |
2382 | | - startopt = "--start=%s" % start |
2383 | | - # if we are on the last chunk, we should get up to the last pageid, |
2384 | | - # whatever that is. |
2385 | | - command.append(startopt) |
2386 | | - if chunk < len(self._chunks): |
2387 | | - # end = start + runner.pagesPerChunk() |
2388 | | - end = sum([ self._chunks[i] for i in range(0,chunk)]) +1 |
2389 | | - endopt = "--end=%s" % end |
2390 | | - command.append(endopt) |
| 3183 | + "--filter=abstract%s" % variantOption ] ) |
| 3184 | + if (f.chunk): |
| 3185 | + # set up start end end pageids for this piece |
| 3186 | + # note there is no page id 0 I guess. so we start with 1 |
| 3187 | + # start = runner.pagesPerChunk()*(chunk-1) + 1 |
| 3188 | + start = sum([ self._chunks[i] for i in range(0,f.chunkInt-1)]) + 1 |
| 3189 | + startopt = "--start=%s" % start |
| 3190 | + # if we are on the last chunk, we should get up to the last pageid, |
| 3191 | + # whatever that is. |
| 3192 | + command.append(startopt) |
| 3193 | + if f.chunkInt < len(self._chunks): |
| 3194 | + # end = start + runner.pagesPerChunk() |
| 3195 | + end = sum([ self._chunks[i] for i in range(0,f.chunkInt)]) +1 |
| 3196 | + endopt = "--end=%s" % end |
| 3197 | + command.append(endopt) |
2391 | 3198 | pipeline = [ command ] |
2392 | 3199 | series = [ pipeline ] |
2393 | 3200 | return(series) |
2394 | 3201 | |
2395 | 3202 | def run(self, runner): |
2396 | 3203 | commands = [] |
2397 | | - if (self._chunks): |
2398 | | - if (self._chunkToDo): |
2399 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
2400 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
2401 | | - series = self.buildCommand(runner, self._chunkToDo) |
| 3204 | + # choose the empty variant to pass to buildcommand, it will fill in the rest if needed |
| 3205 | + outputFiles = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 3206 | + dumpName0 = self.listDumpNames()[0] |
| 3207 | + for f in outputFiles: |
| 3208 | + if f.dumpName == dumpName0: |
| 3209 | + series = self.buildCommand(runner, f) |
2402 | 3210 | commands.append(series) |
2403 | | - else: |
2404 | | - for i in range(1, len(self._chunks)+1): |
2405 | | - series = self.buildCommand(runner, i) |
2406 | | - commands.append(series) |
2407 | | - else: |
2408 | | - series = self.buildCommand(runner) |
2409 | | - commands.append(series) |
2410 | 3211 | error = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner) |
2411 | 3212 | if (error): |
2412 | 3213 | raise BackupError("error producing abstract dump") |
2413 | 3214 | |
2414 | | - |
2415 | | - def _variants(self, runner): |
2416 | | - # If the database name looks like it's marked as Chinese language, |
2417 | | - # return a list including Simplified and Traditional versions, so |
2418 | | - # we can build separate files normalized to each orthography. |
| 3215 | + # If the database name looks like it's marked as Chinese language, |
| 3216 | + # return a list including Simplified and Traditional versions, so |
| 3217 | + # we can build separate files normalized to each orthography. |
| 3218 | + def _variants(self): |
2419 | 3219 | if runner.dbName[0:2] == "zh" and runner.dbName[2:3] != "_": |
2420 | | - return ("", "zh-cn", "zh-tw") |
| 3220 | + variants = [ "", "zh-cn", "zh-tw"] |
2421 | 3221 | else: |
2422 | | - return ("",) |
| 3222 | + variants = [ "" ] |
| 3223 | + return variants |
2423 | 3224 | |
2424 | 3225 | def _variantOption(self, variant): |
2425 | 3226 | if variant == "": |
— | — | @@ -2426,95 +3227,133 @@ |
2427 | 3228 | else: |
2428 | 3229 | return ":variant=%s" % variant |
2429 | 3230 | |
2430 | | - def _variantFile(self, variant, chunk = 0): |
2431 | | - if chunk: |
2432 | | - chunkInfo = "%s" % chunk |
| 3231 | + def dumpNameFromVariant(self, v): |
| 3232 | + dumpNameBase = 'abstract' |
| 3233 | + if v == "": |
| 3234 | + return dumpNameBase |
2433 | 3235 | else: |
2434 | | - chunkInfo = "" |
2435 | | - if variant == "": |
2436 | | - return( "abstract"+chunkInfo + ".xml") |
2437 | | - else: |
2438 | | - return( "abstract-%s%s.xml" % (variant, chunkInfo) ) |
| 3236 | + return dumpNameBase + "-" + v |
2439 | 3237 | |
2440 | | - def listOutputFiles(self, runner): |
| 3238 | + def listDumpNames(self): |
| 3239 | + # need this first for buildCommand and other such |
| 3240 | + dumpNames = [ ] |
| 3241 | + variants = self._variants() |
| 3242 | + for v in variants: |
| 3243 | + dumpNames.append(self.dumpNameFromVariant(v)) |
| 3244 | + return dumpNames |
| 3245 | + |
| 3246 | + def listOutputFilesToPublish(self, dumpDir): |
| 3247 | + dumpNames = self.listDumpNames() |
2441 | 3248 | files = [] |
2442 | | - for x in self._variants(runner): |
2443 | | - if (self._chunks): |
2444 | | - if (self._chunkToDo): |
2445 | | - if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
2446 | | - raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
2447 | | - files.append(self._variantFile(x, self._chunkToDo)) |
2448 | | - else: |
2449 | | - for i in range(1, len(self._chunks)+1): |
2450 | | - files.append(self._variantFile(x, i)) |
2451 | | - else: |
2452 | | - files.append(self._variantFile(x)) |
2453 | | - return files |
| 3249 | + files.extend(Dump.listOutputFilesToPublish(self, dumpDir, dumpNames)) |
| 3250 | + return files |
2454 | 3251 | |
2455 | | -class RecombineAbstractDump(AbstractDump): |
2456 | | - def __init__(self, name, desc, chunks): |
2457 | | - AbstractDump.__init__(self, name, desc, False, chunks) |
2458 | | - # this is here only so that a callback can capture output from some commands |
2459 | | - # related to recombining files if we did parallel runs of the recompression |
2460 | | - self._output = None |
| 3252 | + def listOutputFilesToCheckForTruncation(self, dumpDir): |
| 3253 | + dumpNames = self.listDumpNames() |
| 3254 | + files = [] |
| 3255 | + files.extend(Dump.listOutputFilesToCheckForTruncation(self, dumpDir, dumpNames)) |
| 3256 | + return files |
2461 | 3257 | |
2462 | | - def listOutputFiles(self, runner): |
| 3258 | + def listOutputFilesForBuildCommand(self, dumpDir): |
| 3259 | + dumpNames = self.listDumpNames() |
2463 | 3260 | files = [] |
2464 | | - for x in self._variants(runner): |
2465 | | - files.append(self._variantFile(x)) |
2466 | | - return files |
| 3261 | + files.extend(Dump.listOutputFilesForBuildCommand(self, dumpDir, dumpNames)) |
| 3262 | + return files |
2467 | 3263 | |
2468 | | - def listInputFiles(self, runner): |
2469 | | - return(AbstractDump.listOutputFiles(self,runner)) |
| 3264 | + def listOutputFilesForCleanup(self, dumpDir): |
| 3265 | + dumpNames = self.listDumpNames() |
| 3266 | + files = [] |
| 3267 | + files.extend(Dump.listOutputFilesForCleanup(self, dumpDir, dumpNames)) |
| 3268 | + return files |
2470 | 3269 | |
| 3270 | + def listOutputFilesForInput(self, dumpDir): |
| 3271 | + dumpNames = self.listDumpNames() |
| 3272 | + files = [] |
| 3273 | + files.extend(Dump.listOutputFilesForInput(self, dumpDir, dumpNames)) |
| 3274 | + return files |
| 3275 | + |
| 3276 | + |
| 3277 | +class RecombineAbstractDump(Dump): |
| 3278 | + def __init__(self, name, desc, itemForRecombine): |
| 3279 | + # no chunkToDo, no chunks generally (False, False), even though input may have it |
| 3280 | + self.itemForRecombine = itemForRecombine |
| 3281 | + Dump.__init__(self, name, desc) |
| 3282 | + # the input may have checkpoints but the output will not. |
| 3283 | + self._checkpointsEnabled = False |
| 3284 | + |
| 3285 | + def getFileType(self): |
| 3286 | + return self.itemForRecombine.getFileType() |
| 3287 | + |
| 3288 | + def getFileExt(self): |
| 3289 | + return self.itemForRecombine.getFileExt() |
| 3290 | + |
| 3291 | + def getDumpName(self): |
| 3292 | + return self.itemForRecombine.getDumpName() |
| 3293 | + |
2471 | 3294 | def run(self, runner): |
2472 | 3295 | error = 0 |
2473 | | - if (self._chunks): |
2474 | | - files = AbstractDump.listOutputFiles(self,runner) |
2475 | | - outputFileList = self.listOutputFiles(runner) |
2476 | | - for outputFile in outputFileList: |
2477 | | - inputFiles = [] |
2478 | | - for inFile in files: |
2479 | | - (base, rest) = inFile.split('.',1) |
2480 | | - base = re.sub("\d+$", "", base) |
2481 | | - if base + "." + rest == outputFile: |
2482 | | - inputFiles.append(inFile) |
2483 | | - if not len(inputFiles): |
2484 | | - self.setStatus("failed") |
2485 | | - raise BackupError("No input files for %s found" % self.name) |
2486 | | - if (not exists( runner.wiki.config.cat ) ): |
2487 | | - raise BackupError("cat command %s not found" % runner.wiki.config.cat); |
2488 | | - compressionCommand = "%s > " % runner.wiki.config.cat |
2489 | | - uncompressionCommand = [ "%s" % runner.wiki.config.cat ] |
2490 | | - recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand, "<feed>" ) |
2491 | | - recombineCommand = [ recombineCommandString ] |
2492 | | - recombinePipeline = [ recombineCommand ] |
2493 | | - series = [ recombinePipeline ] |
2494 | | - result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
2495 | | - if result: |
2496 | | - error = result |
| 3296 | + # FIXME check this code |
| 3297 | + files = self.itemForRecombine.listOutputFilesForInput(runner.dumpDir) |
| 3298 | + outputFileList = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 3299 | + for outputFile in outputFileList: |
| 3300 | + inputFiles = [] |
| 3301 | + for inFile in files: |
| 3302 | + if inFile.dumpName == outputFile.dumpName: |
| 3303 | + inputFiles.append(inFile) |
| 3304 | + if not len(inputFiles): |
| 3305 | + self.setStatus("failed") |
| 3306 | + raise BackupError("No input files for %s found" % self.name()) |
| 3307 | + if (not exists( runner.wiki.config.cat ) ): |
| 3308 | + raise BackupError("cat command %s not found" % runner.wiki.config.cat) |
| 3309 | + compressionCommand = "%s > " % runner.wiki.config.cat |
| 3310 | + uncompressionCommand = [ "%s" % runner.wiki.config.cat ] |
| 3311 | + recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand, "<feed>" ) |
| 3312 | + recombineCommand = [ recombineCommandString ] |
| 3313 | + recombinePipeline = [ recombineCommand ] |
| 3314 | + series = [ recombinePipeline ] |
| 3315 | + result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 3316 | + if result: |
| 3317 | + error = result |
2497 | 3318 | if (error): |
2498 | 3319 | raise BackupError("error recombining abstract dump files") |
2499 | 3320 | |
2500 | 3321 | class TitleDump(Dump): |
2501 | 3322 | """This is used by "wikiproxy", a program to add Wikipedia links to BBC news online""" |
| 3323 | + |
| 3324 | + def getDumpName(self): |
| 3325 | + return "all-titles-in-ns0" |
| 3326 | + |
| 3327 | + def getFileType(self): |
| 3328 | + return "" |
| 3329 | + |
| 3330 | + def getFileExt(self): |
| 3331 | + return "gz" |
| 3332 | + |
2502 | 3333 | def run(self, runner): |
2503 | 3334 | retries = 0 |
2504 | 3335 | # try this initially and see how it goes |
2505 | 3336 | maxretries = 3 |
2506 | 3337 | query="select page_title from page where page_namespace=0;" |
2507 | | - error = runner.saveSql(query, runner.dumpDir.publicPath("all-titles-in-ns0.gz")) |
| 3338 | + files = self.listOutputFilesForBuildCommand(runner.dumpDir) |
| 3339 | + if len(files) > 1: |
| 3340 | + raise BackupError("page title dump trying to produce more than one output file") |
| 3341 | + fileObj = files[0] |
| 3342 | + outFilename = runner.dumpDir.filenamePublicPath(fileObj) |
| 3343 | + error = self.saveSql(query, outFilename, runner) |
2508 | 3344 | while (error and retries < maxretries): |
2509 | 3345 | retries = retries + 1 |
2510 | 3346 | time.sleep(5) |
2511 | | - error = runner.saveSql(query, runner.dumpDir.publicPath("all-titles-in-ns0.gz")) |
| 3347 | + error = self.saveSql(query, outFilename, runner) |
2512 | 3348 | if (error): |
2513 | 3349 | raise BackupError("error dumping titles list") |
2514 | 3350 | |
2515 | | - def listOutputFiles(self, runner): |
2516 | | - return ["all-titles-in-ns0.gz"] |
| 3351 | + def saveSql(self, query, outfile, runner): |
| 3352 | + """Pass some SQL commands to the server for this DB and save output to a gzipped file.""" |
| 3353 | + if (not exists( runner.wiki.config.gzip ) ): |
| 3354 | + raise BackupError("gzip command %s not found" % runner.wiki.config.gzip) |
| 3355 | + command = runner.dbServerInfo.buildSqlCommand(query, runner.wiki.config.gzip) |
| 3356 | + return runner.saveCommand(command, outfile) |
2517 | 3357 | |
2518 | | - |
2519 | 3358 | def findAndLockNextWiki(config, locksEnabled): |
2520 | 3359 | if config.halt: |
2521 | 3360 | print "Dump process halted by config." |
— | — | @@ -2537,13 +3376,15 @@ |
2538 | 3377 | return None |
2539 | 3378 | |
2540 | 3379 | def xmlEscape(text): |
2541 | | - return text.replace("&", "&").replace("<", "<").replace(">", ">"); |
| 3380 | + return text.replace("&", "&").replace("<", "<").replace(">", ">") |
2542 | 3381 | |
2543 | 3382 | def usage(message = None): |
2544 | 3383 | if message: |
2545 | 3384 | print message |
2546 | 3385 | print "Usage: python worker.py [options] [wikidbname]" |
2547 | | - print "Options: --configfile, --date, --job, --addnotice, --delnotice, --force, --noprefetch, --nospawn, --restartfrom, --log" |
| 3386 | + print "Options: --chunk, --configfile, --date, --job, --addnotice, --delnotice, --force, --noprefetch, --nospawn, --restartfrom, --log" |
| 3387 | + print "--chunk: Specify the number of the chunk to rerun (use with a specific job" |
| 3388 | + print " to rerun, only if parallel jobs (chunks) are enabled)." |
2548 | 3389 | print "--configfile: Specify an alternative configuration file to read." |
2549 | 3390 | print " Default config file name: wikidump.conf" |
2550 | 3391 | print "--date: Rerun dump of a given date (probably unwise)" |
— | — | @@ -2664,7 +3505,11 @@ |
2665 | 3506 | # process any per-project configuration options |
2666 | 3507 | config.parseConfFilePerProject(wiki.dbName) |
2667 | 3508 | |
2668 | | - runner = Runner(wiki, date, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo) |
| 3509 | + if not date: |
| 3510 | + date = TimeUtils.today() |
| 3511 | + wiki.setDate(date) |
| 3512 | + |
| 3513 | + runner = Runner(wiki, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo) |
2669 | 3514 | if (restart): |
2670 | 3515 | print "Running %s, restarting from job %s..." % (wiki.dbName, jobRequested) |
2671 | 3516 | elif (jobRequested): |
Index: branches/ariel/xmldumps-backup/CommandManagement.py |
— | — | @@ -9,6 +9,7 @@ |
10 | 10 | import Queue |
11 | 11 | import thread |
12 | 12 | import fcntl |
| 13 | +import threading |
13 | 14 | |
14 | 15 | from os.path import dirname, exists, getsize, join, realpath |
15 | 16 | from subprocess import Popen, PIPE |
— | — | @@ -380,47 +381,25 @@ |
381 | 382 | if (self.allCommandsCompleted() and not len(self._processesToPoll)): |
382 | 383 | break |
383 | 384 | |
384 | | -class CommandsInParallel(object): |
385 | | - """Run a pile of commandSeries in parallel ( e.g. dump articles 1 to 100K, |
386 | | - dump articles 100K+1 to 200K, ...). This takes as arguments: a list of series |
387 | | - of pipelines (each pipeline is a list of commands, each series is a list of |
388 | | - pipelines), as well as a possible callback which is used to capture all output |
389 | | - from the various commmand series. If the callback takes an argument other than |
390 | | - the line of output, it should be passed in the arg parameter (and it will be passed |
391 | | - to the callback function first before the output line). If no callback is provided |
392 | | - and the individual pipelines are not provided with a file to save output, |
393 | | - then output is written to stderr. |
394 | | - Callbackinterval is in milliseconds, defaults is 20 seconds""" |
395 | | - def __init__(self, commandSeriesList, callbackStderr = None, callbackStdout = None, callbackTimed = None, callbackStderrArg=None, callbackStdoutArg = None, callbackTimedArg = None, quiet = False, shell = False, callbackInterval = 20000 ): |
396 | | - self._commandSeriesList = commandSeriesList |
397 | | - self._commandSerieses = [] |
398 | | - for series in self._commandSeriesList: |
399 | | - self._commandSerieses.append( CommandSeries(series, quiet, shell) ) |
400 | | - # for each command series running in parallel, |
401 | | - # in cases where a command pipeline in the series generates output, the callback |
402 | | - # will be called with a line of output from the pipeline as it becomes available |
| 385 | +class ProcessMonitor(threading.Thread): |
| 386 | + def __init__(self, timeout, queue, outputQueue, defaultCallbackInterval, |
| 387 | + callbackStderr, callbackStdout, callbackTimed, |
| 388 | + callbackStderrArg, callbackStdoutArg, callbackTimedArg): |
| 389 | + threading.Thread.__init__(self) |
| 390 | + self.timeout = timeout |
| 391 | + self.queue = queue |
| 392 | + self.outputQueue = outputQueue |
| 393 | + self._defaultCallbackInterval = defaultCallbackInterval |
403 | 394 | self._callbackStderr = callbackStderr |
404 | 395 | self._callbackStdout = callbackStdout |
405 | 396 | self._callbackTimed = callbackTimed |
406 | 397 | self._callbackStderrArg = callbackStderrArg |
407 | 398 | self._callbackStdoutArg = callbackStdoutArg |
408 | 399 | self._callbackTimedArg = callbackTimedArg |
409 | | - self._commandSeriesQueue = Queue.Queue() |
410 | 400 | |
411 | | - # number millisecs we will wait for select.poll() |
412 | | - self._defaultPollTime = 500 |
413 | | - |
414 | | - # for programs that don't generate output, wait this many milliseconds between |
415 | | - # invoking callback if there is one |
416 | | - self._defaultCallbackInterval = callbackInterval |
417 | | - |
418 | | - def startCommands(self): |
419 | | - for series in self._commandSerieses: |
420 | | - series.startCommands() |
421 | | - |
422 | 401 | # one of these as a thread to monitor each command series. |
423 | | - def seriesMonitor(self, timeout, queue): |
424 | | - series = queue.get() |
| 402 | + def run(self): |
| 403 | + series = self.queue.get() |
425 | 404 | while series.processProducingOutput(): |
426 | 405 | p = series.processProducingOutput() |
427 | 406 | poller = select.poll() |
— | — | @@ -441,7 +420,7 @@ |
442 | 421 | |
443 | 422 | waited = 0 |
444 | 423 | while not commandCompleted: |
445 | | - waiting = poller.poll(self._defaultPollTime) |
| 424 | + waiting = poller.poll(self.timeout) |
446 | 425 | if (waiting): |
447 | 426 | for (fd,event) in waiting: |
448 | 427 | series.inProgressPipeline().setPollState(event) |
— | — | @@ -449,21 +428,9 @@ |
450 | 429 | out = os.read(fd,1024) |
451 | 430 | if out: |
452 | 431 | if fd == p.stderr.fileno(): |
453 | | - if self._callbackStderr: |
454 | | - if (self._callbackStderrArg): |
455 | | - self._callbackStderr(self._callbackStderrArg, out) |
456 | | - else: |
457 | | - self._callbackStderr(out) |
458 | | - else: |
459 | | - sys.stderr.write(out) |
| 432 | + self.outputQueue.put(OutputQueueItem(OutputQueueItem.getStderrChannel(),out)) |
460 | 433 | elif fd == p.stdout.fileno(): |
461 | | - if self._callbackStdout: |
462 | | - if (self._callbackStdoutArg): |
463 | | - self._callbackStdout(self._callbackStdoutArg, out) |
464 | | - else: |
465 | | - self._callbackStdout(out) |
466 | | - else: |
467 | | - sys.stderr.write(out) |
| 434 | + self.outputQueue.put(OutputQueueItem(OutputQueueItem.getStdoutChannel(),out)) |
468 | 435 | else: |
469 | 436 | # possible eof? what would cause this? |
470 | 437 | pass |
— | — | @@ -476,7 +443,7 @@ |
477 | 444 | print "returned from %s with %s" % (p.pid, p.returncode) |
478 | 445 | commandCompleted = True |
479 | 446 | |
480 | | - waited = waited + self._defaultPollTime |
| 447 | + waited = waited + self.timeout |
481 | 448 | if waited > self._defaultCallbackInterval and self._callbackTimed: |
482 | 449 | if (self._callbackTimedArg): |
483 | 450 | self._callbackTimed(self._callbackTimedArg) |
— | — | @@ -488,13 +455,69 @@ |
489 | 456 | series.continueCommands() |
490 | 457 | |
491 | 458 | # completed the whole series. time to go home. |
492 | | - queue.task_done() |
| 459 | + self.queue.task_done() |
493 | 460 | |
| 461 | +class OutputQueueItem(object): |
| 462 | + def __init__(self, channel, contents): |
| 463 | + self.channel = channel |
| 464 | + self.contents = contents |
| 465 | + self.stdoutChannel = OutputQueueItem.getStdoutChannel() |
| 466 | + self.stderrChannel = OutputQueueItem.getStderrChannel() |
494 | 467 | |
| 468 | + def getStdoutChannel(): |
| 469 | + return 1 |
| 470 | + |
| 471 | + def getStderrChannel(): |
| 472 | + return 2 |
| 473 | + |
| 474 | + getStdoutChannel = staticmethod(getStdoutChannel) |
| 475 | + getStderrChannel = staticmethod(getStderrChannel) |
| 476 | + |
| 477 | +class CommandsInParallel(object): |
| 478 | + """Run a pile of commandSeries in parallel ( e.g. dump articles 1 to 100K, |
| 479 | + dump articles 100K+1 to 200K, ...). This takes as arguments: a list of series |
| 480 | + of pipelines (each pipeline is a list of commands, each series is a list of |
| 481 | + pipelines), as well as a possible callback which is used to capture all output |
| 482 | + from the various commmand series. If the callback takes an argument other than |
| 483 | + the line of output, it should be passed in the arg parameter (and it will be passed |
| 484 | + to the callback function first before the output line). If no callback is provided |
| 485 | + and the individual pipelines are not provided with a file to save output, |
| 486 | + then output is written to stderr. |
| 487 | + Callbackinterval is in milliseconds, defaults is 20 seconds""" |
| 488 | + def __init__(self, commandSeriesList, callbackStderr = None, callbackStdout = None, callbackTimed = None, callbackStderrArg=None, callbackStdoutArg = None, callbackTimedArg = None, quiet = False, shell = False, callbackInterval = 20000 ): |
| 489 | + self._commandSeriesList = commandSeriesList |
| 490 | + self._commandSerieses = [] |
| 491 | + for series in self._commandSeriesList: |
| 492 | + self._commandSerieses.append( CommandSeries(series, quiet, shell) ) |
| 493 | + # for each command series running in parallel, |
| 494 | + # in cases where a command pipeline in the series generates output, the callback |
| 495 | + # will be called with a line of output from the pipeline as it becomes available |
| 496 | + self._callbackStderr = callbackStderr |
| 497 | + self._callbackStdout = callbackStdout |
| 498 | + self._callbackTimed = callbackTimed |
| 499 | + self._callbackStderrArg = callbackStderrArg |
| 500 | + self._callbackStdoutArg = callbackStdoutArg |
| 501 | + self._callbackTimedArg = callbackTimedArg |
| 502 | + self._commandSeriesQueue = Queue.Queue() |
| 503 | + self._outputQueue = Queue.Queue() |
| 504 | + self._normalThreadCount = threading.activeCount() |
| 505 | + |
| 506 | + # number millisecs we will wait for select.poll() |
| 507 | + self._defaultPollTime = 500 |
| 508 | + |
| 509 | + # for programs that don't generate output, wait this many milliseconds between |
| 510 | + # invoking callback if there is one |
| 511 | + self._defaultCallbackInterval = callbackInterval |
| 512 | + |
| 513 | + def startCommands(self): |
| 514 | + for series in self._commandSerieses: |
| 515 | + series.startCommands() |
| 516 | + |
495 | 517 | def setupOutputMonitoring(self): |
496 | 518 | for series in self._commandSerieses: |
497 | 519 | self._commandSeriesQueue.put(series) |
498 | | - thread.start_new_thread(self.seriesMonitor, (500, self._commandSeriesQueue)) |
| 520 | + t = ProcessMonitor(500, self._commandSeriesQueue, self._outputQueue, self._defaultCallbackInterval,self._callbackStderr,self._callbackStdout,self._callbackTimed,self._callbackStderrArg,self._callbackStdoutArg,self._callbackTimedArg) |
| 521 | + t.start() |
499 | 522 | |
500 | 523 | def allCommandsCompleted(self): |
501 | 524 | """Check if all series have run to completion.""" |
— | — | @@ -517,10 +540,40 @@ |
518 | 541 | commands.extend(series.exitedWithErrors()) |
519 | 542 | return(commands) |
520 | 543 | |
| 544 | + def watchOutputQueue(self): |
| 545 | + done = False |
| 546 | + while not done: |
| 547 | + # check the number of threads active, if they are all gone we are done |
| 548 | + if threading.activeCount() == self._normalThreadCount: |
| 549 | + done = True |
| 550 | + output = None |
| 551 | + try: |
| 552 | + output = self._outputQueue.get(True, 1) |
| 553 | + except: |
| 554 | + pass |
| 555 | + if output: |
| 556 | + if output.channel == OutputQueueItem.getStdoutChannel(): |
| 557 | + if self._callbackStdout: |
| 558 | + if (self._callbackStdoutArg): |
| 559 | + self._callbackStdout(self._callbackStdoutArg, output.contents) |
| 560 | + else: |
| 561 | + self._callbackStdout(output.contents) |
| 562 | + else: |
| 563 | + sys.stderr.write(output.contents) |
| 564 | + else: # output channel is stderr |
| 565 | + if self._callbackStderr: |
| 566 | + if (self._callbackStderrArg): |
| 567 | + self._callbackStderr(self._callbackStderrArg, output.contents) |
| 568 | + else: |
| 569 | + self._callbackStderr(output.contents) |
| 570 | + else: |
| 571 | + sys.stderr.write(output.contents) |
| 572 | + |
521 | 573 | def runCommands(self): |
522 | 574 | self.startCommands() |
523 | 575 | self.setupOutputMonitoring() |
524 | | - self._commandSeriesQueue.join() |
| 576 | + self.watchOutputQueue() |
| 577 | +# self._commandSeriesQueue.join() |
525 | 578 | |
526 | 579 | |
527 | 580 | def testcallback(output = None): |
— | — | @@ -558,5 +611,3 @@ |
559 | 612 | print "w00t!" |
560 | 613 | else: |
561 | 614 | print "big bummer!" |
562 | | - |
563 | | - |