Index: branches/ariel/xmldumps-backup/worker.py |
— | — | @@ -24,9 +24,6 @@ |
25 | 25 | from WikiDump import FileUtils, MiscUtils, TimeUtils |
26 | 26 | from CommandManagement import CommandPipeline, CommandSeries, CommandsInParallel |
27 | 27 | |
28 | | -def xmlEscape(text): |
29 | | - return text.replace("&", "&").replace("<", "<").replace(">", ">"); |
30 | | - |
31 | 28 | class Logger(object): |
32 | 29 | |
33 | 30 | def __init__(self, logFileName=None): |
— | — | @@ -234,7 +231,6 @@ |
235 | 232 | |
236 | 233 | |
237 | 234 | class RunSimpleCommand(object): |
238 | | - |
239 | 235 | # FIXME rewrite to not use popen2 |
240 | 236 | def runAndReturn(command, logCallback = None): |
241 | 237 | """Run a command and return the output as a string. |
— | — | @@ -315,6 +311,146 @@ |
316 | 312 | class BackupError(Exception): |
317 | 313 | pass |
318 | 314 | |
| 315 | +class RunInfoFile(object): |
| 316 | + def __init__(self, wiki, enabled): |
| 317 | + self.wiki = wiki |
| 318 | + self._enabled = enabled |
| 319 | + |
| 320 | + def saveDumpRunInfoFile(self, text): |
| 321 | + """Write out a simple text file with the status for this wiki's dump.""" |
| 322 | + if (self._enabled): |
| 323 | + try: |
| 324 | + self._writeDumpRunInfoFile(text) |
| 325 | + except: |
| 326 | + print "Couldn't save dump run info file. Continuing anyways" |
| 327 | + |
| 328 | + def statusOfOldDumpIsDone(self, runner, date, jobName, jobDesc): |
| 329 | + oldDumpRunInfoFilename=self._getDumpRunInfoFileName(date) |
| 330 | + status = self._getStatusForJobFromRunInfoFile(oldDumpRunInfoFilename, jobName) |
| 331 | + if (status == "done"): |
| 332 | + return 1 |
| 333 | + elif (not status == None): |
| 334 | + # failure, in progress, some other useless thing |
| 335 | + return 0 |
| 336 | + |
| 337 | + # ok, there was no info there to be had, try the index file. yuck. |
| 338 | + indexFilename = os.path.join(runner.wiki.publicDir(), date, runner.config.perDumpIndex) |
| 339 | + status = self._getStatusForJobFromIndexFile(indexFilename, jobDesc) |
| 340 | + if (status == "done"): |
| 341 | + return 1 |
| 342 | + else: |
| 343 | + return 0 |
| 344 | + |
| 345 | + def getOldRunInfoFromFile(self): |
| 346 | + # read the dump run info file in, if there is one, and get info about which dumps |
| 347 | + # have already been run and whether they were successful |
| 348 | + dumpRunInfoFileName = self._getDumpRunInfoFileName() |
| 349 | + results = [] |
| 350 | + try: |
| 351 | + infile = open(dumpRunInfoFileName,"r") |
| 352 | + for line in infile: |
| 353 | + results.append(self._getOldRunInfoFromLine(line)) |
| 354 | + infile.close |
| 355 | + return results |
| 356 | + except: |
| 357 | + return False |
| 358 | + |
| 359 | + # |
| 360 | + # functions internal to the class |
| 361 | + # |
| 362 | + def _getDumpRunInfoFileName(self, date = None): |
| 363 | + # sometimes need to get this info for an older run to check status of a file for |
| 364 | + # possible prefetch |
| 365 | + if (date): |
| 366 | + return os.path.join(self.wiki.publicDir(), date, "dumpruninfo.txt") |
| 367 | + else: |
| 368 | + return os.path.join(self.wiki.publicDir(), self.wiki.date, "dumpruninfo.txt") |
| 369 | + |
| 370 | + def _getDumpRunInfoDirName(self, date=None): |
| 371 | + if (date): |
| 372 | + return os.path.join(self.wiki.publicDir(), date); |
| 373 | + else: |
| 374 | + return os.path.join(self.wiki.publicDir(), self.wiki.date); |
| 375 | + |
| 376 | + # format: name:%; updated:%; status:% |
| 377 | + def _getOldRunInfoFromLine(self, line): |
| 378 | + # get rid of leading/trailing/blanks |
| 379 | + line = line.strip(" ") |
| 380 | + line = line.replace("\n","") |
| 381 | + fields = line.split(';',3) |
| 382 | + dumpRunInfo = RunInfo() |
| 383 | + for field in fields: |
| 384 | + field = field.strip(" ") |
| 385 | + (fieldName, separator, fieldValue) = field.partition(':') |
| 386 | + if (fieldName == "name"): |
| 387 | + dumpRunInfo.setName(fieldValue) |
| 388 | + elif (fieldName == "status"): |
| 389 | + dumpRunInfo.setStatus(fieldValue,False) |
| 390 | + elif (fieldName == "updated"): |
| 391 | + dumpRunInfo.setUpdated(fieldValue) |
| 392 | + return(dumpRunInfo) |
| 393 | + |
| 394 | + def _writeDumpRunInfoFile(self, text): |
| 395 | + directory = self._getDumpRunInfoDirName() |
| 396 | + dumpRunInfoFilename = self._getDumpRunInfoFileName() |
| 397 | + FileUtils.writeFile(directory, dumpRunInfoFilename, text, self.wiki.config.fileperms) |
| 398 | + |
| 399 | + # format: name:%; updated:%; status:% |
| 400 | + def _getStatusForJobFromRunInfoFileLine(self, line, jobName): |
| 401 | + # get rid of leading/trailing/embedded blanks |
| 402 | + line = line.replace(" ","") |
| 403 | + line = line.replace("\n","") |
| 404 | + fields = line.split(';',3) |
| 405 | + for field in fields: |
| 406 | + (fieldName, separator, fieldValue) = field.partition(':') |
| 407 | + if (fieldName == "name"): |
| 408 | + if (not fieldValue == jobName): |
| 409 | + return None |
| 410 | + elif (fieldName == "status"): |
| 411 | + return fieldValue |
| 412 | + |
| 413 | + def _getStatusForJobFromRunInfoFile(self, filename, jobName = ""): |
| 414 | + # read the dump run info file in, if there is one, and find out whether |
| 415 | + # a particular job (one step only, not a multiple piece job) has been |
| 416 | + # already run and whether it was successful (use to examine status |
| 417 | + # of step from some previous run) |
| 418 | + try: |
| 419 | + infile = open(filename,"r") |
| 420 | + for line in infile: |
| 421 | + result = self._getStatusForJobFromRunInfoFileLine(line, jobName) |
| 422 | + if (not result == None): |
| 423 | + return result |
| 424 | + infile.close |
| 425 | + return None |
| 426 | + except: |
| 427 | + return None |
| 428 | + |
| 429 | + # find desc in there, look for "class='done'" |
| 430 | + def _getStatusForJobFromIndexFileLine(self, line, desc): |
| 431 | + if not(">"+desc+"<" in line): |
| 432 | + return None |
| 433 | + if "<li class='done'>" in line: |
| 434 | + return "done" |
| 435 | + else: |
| 436 | + return "other" |
| 437 | + |
| 438 | + def _getStatusForJobFromIndexFile(self, filename, desc): |
| 439 | + # read the index file in, if there is one, and find out whether |
| 440 | + # a particular job (one step only, not a multiple piece job) has been |
| 441 | + # already run and whether it was successful (use to examine status |
| 442 | + # of step from some previous run) |
| 443 | + try: |
| 444 | + infile = open(filename,"r") |
| 445 | + for line in infile: |
| 446 | + result = self._getStatusForJobFromIndexFileLine(line, desc) |
| 447 | + if (not result == None): |
| 448 | + return result |
| 449 | + infile.close |
| 450 | + return None |
| 451 | + except: |
| 452 | + return None |
| 453 | + |
| 454 | + |
319 | 455 | class RunInfo(object): |
320 | 456 | def __init__(self, name="", status="", updated="", toBeRun = False): |
321 | 457 | self._name = name |
— | — | @@ -347,8 +483,7 @@ |
348 | 484 | self._toBeRun = toBeRun |
349 | 485 | |
350 | 486 | class DumpItemList(object): |
351 | | - |
352 | | - def __init__(self, wiki, prefetch, spawn, date, chunkInfo): |
| 487 | + def __init__(self, wiki, prefetch, spawn, date, chunkToDo, singleJob, chunkInfo, runInfoFile): |
353 | 488 | self.date = date |
354 | 489 | self.wiki = wiki |
355 | 490 | self._hasFlaggedRevs = self.wiki.hasFlaggedRevs() |
— | — | @@ -356,7 +491,18 @@ |
357 | 492 | self._prefetch = prefetch |
358 | 493 | self._spawn = spawn |
359 | 494 | self.chunkInfo = chunkInfo |
| 495 | + self._chunkToDo = chunkToDo |
| 496 | + self._singleJob = singleJob |
| 497 | + self._runInfoFile = runInfoFile |
360 | 498 | |
| 499 | + if (self._singleJob and self._chunkToDo): |
| 500 | + if (self._singleJob[-5:] == 'table' or |
| 501 | + self._singleJob[-9:] == 'recombine' or |
| 502 | + self._singleJob == 'noop' or |
| 503 | + self._singleJob == 'xmlpagelogsdump' or |
| 504 | + self._singleJob == 'pagetitlesdump'): |
| 505 | + raise BackupError("You cannot specify a chunk with the job %s, exiting.\n" % self._singleJob) |
| 506 | + |
361 | 507 | self.dumpItems = [PrivateTable("user", "usertable", "User account data."), |
362 | 508 | PrivateTable("watchlist", "watchlisttable", "Users' watchlist settings."), |
363 | 509 | PrivateTable("ipblocks", "ipblockstable", "Data for blocks of IP addresses, ranges, and users."), |
— | — | @@ -390,12 +536,12 @@ |
391 | 537 | |
392 | 538 | TitleDump("pagetitlesdump", "List of page titles"), |
393 | 539 | |
394 | | - AbstractDump("abstractsdump","Extracted page abstracts for Yahoo", self.chunkInfo.getPagesPerChunkAbstract())] |
| 540 | + AbstractDump("abstractsdump","Extracted page abstracts for Yahoo", self._getChunkToDo("abstractsdump"), self.chunkInfo.getPagesPerChunkAbstract())] |
395 | 541 | |
396 | 542 | if (self.chunkInfo.chunksEnabled()): |
397 | 543 | self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self.chunkInfo.getPagesPerChunkAbstract())) |
398 | 544 | |
399 | | - self.dumpItems.append(XmlStub("xmlstubsdump", "First-pass for page XML data dumps", self.chunkInfo.getPagesPerChunkHistory())) |
| 545 | + self.dumpItems.append(XmlStub("xmlstubsdump", "First-pass for page XML data dumps", self._getChunkToDo("xmlstubsdump"), self.chunkInfo.getPagesPerChunkHistory())) |
400 | 546 | if (self.chunkInfo.chunksEnabled()): |
401 | 547 | self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps", self.chunkInfo.getPagesPerChunkHistory())) |
402 | 548 | |
— | — | @@ -404,7 +550,7 @@ |
405 | 551 | XmlDump("articles", |
406 | 552 | "articlesdump", |
407 | 553 | "<big><b>Articles, templates, image descriptions, and primary meta-pages.</b></big>", |
408 | | - "This contains current versions of article content, and is the archive most mirror sites will probably want.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory())) |
| 554 | + "This contains current versions of article content, and is the archive most mirror sites will probably want.", self._prefetch, self._spawn, self._getChunkToDo("articlesdump"), self.chunkInfo.getPagesPerChunkHistory())) |
409 | 555 | if (self.chunkInfo.chunksEnabled()): |
410 | 556 | self.dumpItems.append(RecombineXmlDump("articles","articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.chunkInfo.getPagesPerChunkHistory())) |
411 | 557 | |
— | — | @@ -412,7 +558,7 @@ |
413 | 559 | XmlDump("meta-current", |
414 | 560 | "metacurrentdump", |
415 | 561 | "All pages, current versions only.", |
416 | | - "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory())) |
| 562 | + "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._prefetch, self._spawn, self._getChunkToDo("metacurrentdump"), self.chunkInfo.getPagesPerChunkHistory())) |
417 | 563 | |
418 | 564 | if (self.chunkInfo.chunksEnabled()): |
419 | 565 | self.dumpItems.append(RecombineXmlDump("meta-current","metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.chunkInfo.getPagesPerChunkHistory())) |
— | — | @@ -432,7 +578,7 @@ |
433 | 579 | "metahistorybz2dump", |
434 | 580 | "All pages with complete page edit history (.bz2)", |
435 | 581 | "These dumps can be *very* large, uncompressing up to 20 times the archive download size. " + |
436 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory())) |
| 582 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._prefetch, self._spawn, self._getChunkToDo("metahistorybz2dump"), self.chunkInfo.getPagesPerChunkHistory())) |
437 | 583 | if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()): |
438 | 584 | self.dumpItems.append( |
439 | 585 | RecombineXmlDump("meta-history", |
— | — | @@ -445,7 +591,7 @@ |
446 | 592 | "metahistory7zdump", |
447 | 593 | "All pages with complete edit history (.7z)", |
448 | 594 | "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
449 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory())) |
| 595 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._getChunkToDo("metahistory7zdump"), self.chunkInfo.getPagesPerChunkHistory())) |
450 | 596 | if (self.chunkInfo.chunksEnabled() and self.chunkInfo.recombineHistory()): |
451 | 597 | self.dumpItems.append( |
452 | 598 | RecombineXmlRecompressDump("meta-history", |
— | — | @@ -453,76 +599,15 @@ |
454 | 600 | "Recombine all pages with complete edit history (.7z)", |
455 | 601 | "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
456 | 602 | "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory())) |
457 | | - self.oldRunInfoRetrieved = self._getOldRunInfoFromFile() |
458 | | - |
459 | | - |
460 | | - |
461 | | - # read in contents from dump run info file and stuff into dumpItems for later reference |
462 | | - |
463 | | - # sometimes need to get this info for an older run to check status of a file for |
464 | | - # possible prefetch |
465 | | - def _getDumpRunInfoFileName(self, date=None): |
466 | | - if (date): |
467 | | - return os.path.join(self.wiki.publicDir(), date, "dumpruninfo.txt") |
| 603 | + results = self._runInfoFile.getOldRunInfoFromFile() |
| 604 | + if (results): |
| 605 | + for runInfoObj in results: |
| 606 | + self._setDumpItemRunInfo(runInfoObj) |
| 607 | + self.oldRunInfoRetrieved = True |
468 | 608 | else: |
469 | | - return os.path.join(self.wiki.publicDir(), self.date, "dumpruninfo.txt") |
470 | | - |
471 | | - def _getDumpRunInfoDirName(self, date=None): |
472 | | - if (date): |
473 | | - return os.path.join(self.wiki.publicDir(), date); |
474 | | - else: |
475 | | - return os.path.join(self.wiki.publicDir(), self.date); |
476 | | - |
477 | | - def _setDumpItemRunInfo(self, runInfo): |
478 | | - if (not runInfo.name()): |
479 | | - return False |
480 | | - for item in self.dumpItems: |
481 | | - if (item.name() == runInfo.name()): |
482 | | - item.setStatus(runInfo.status(),False) |
483 | | - item.setUpdated(runInfo.updated()) |
484 | | - item.setToBeRun(runInfo.toBeRun()) |
485 | | - return True |
486 | | - return False |
487 | | - |
488 | | - # format: name:%; updated:%; status:% |
489 | | - def _getOldRunInfoFromLine(self, line): |
490 | | - # get rid of leading/trailing/blanks |
491 | | - line = line.strip(" ") |
492 | | - line = line.replace("\n","") |
493 | | - fields = line.split(';',3) |
494 | | - dumpRunInfo = RunInfo() |
495 | | - for field in fields: |
496 | | - field = field.strip(" ") |
497 | | - (fieldName, separator, fieldValue) = field.partition(':') |
498 | | - if (fieldName == "name"): |
499 | | - dumpRunInfo.setName(fieldValue) |
500 | | - elif (fieldName == "status"): |
501 | | - dumpRunInfo.setStatus(fieldValue,False) |
502 | | - elif (fieldName == "updated"): |
503 | | - dumpRunInfo.setUpdated(fieldValue) |
504 | | - self._setDumpItemRunInfo(dumpRunInfo) |
505 | | - |
506 | | - def _getOldRunInfoFromFile(self): |
507 | | - # read the dump run info file in, if there is one, and get info about which dumps |
508 | | - # have already been run and whether they were successful |
509 | | - dumpRunInfoFileName = self._getDumpRunInfoFileName() |
510 | | - try: |
511 | | - infile = open(dumpRunInfoFileName,"r") |
512 | | - for line in infile: |
513 | | - self._getOldRunInfoFromLine(line) |
514 | | - infile.close |
515 | | - return True |
516 | | - except: |
517 | | - return False |
518 | | - |
519 | | - # write dump run info file |
520 | | - # (this file is rewritten with updates after each dumpItem completes) |
521 | | - |
522 | | - def _reportDumpRunInfoLine(self, item): |
523 | | - # even if the item has never been run we will at least have "waiting" in the status |
524 | | - return "name:%s; status:%s; updated:%s" % (item.name(), item.status(), item.updated()) |
525 | | - |
526 | | - def _reportDumpRunInfo(self, done=False): |
| 609 | + self.oldRunInfoRetrieved = False |
| 610 | + |
| 611 | + def reportDumpRunInfo(self, done=False): |
527 | 612 | """Put together a dump run info listing for this database, with all its component dumps.""" |
528 | 613 | runInfoLines = [self._reportDumpRunInfoLine(item) for item in self.dumpItems] |
529 | 614 | runInfoLines.reverse() |
— | — | @@ -530,18 +615,6 @@ |
531 | 616 | text = text + "\n" |
532 | 617 | return text |
533 | 618 | |
534 | | - def writeDumpRunInfoFile(self, text): |
535 | | - directory = self._getDumpRunInfoDirName() |
536 | | - dumpRunInfoFilename = self._getDumpRunInfoFileName() |
537 | | - FileUtils.writeFile(directory, dumpRunInfoFilename, text, self.wiki.config.fileperms) |
538 | | - |
539 | | - def saveDumpRunInfoFile(self, done=False): |
540 | | - """Write out a simple text file with the status for this wiki's dump.""" |
541 | | - try: |
542 | | - self.writeDumpRunInfoFile(self._reportDumpRunInfo(done)) |
543 | | - except: |
544 | | - print "Couldn't save dump run info file. Continuing anyways" |
545 | | - |
546 | 619 | def allPossibleJobsDone(self): |
547 | 620 | for item in self.dumpItems: |
548 | 621 | if (item.status() != "done" and item.status() != "failed"): |
— | — | @@ -583,7 +656,6 @@ |
584 | 657 | break |
585 | 658 | |
586 | 659 | # see whether job needs previous jobs that have not completed successfully |
587 | | - |
588 | 660 | def jobDoneSuccessfully(self, job): |
589 | 661 | for item in self.dumpItems: |
590 | 662 | if (item.name() == job): |
— | — | @@ -622,29 +694,72 @@ |
623 | 695 | return False |
624 | 696 | return True |
625 | 697 | |
| 698 | + def _getChunkToDo(self, jobName): |
| 699 | + if (self._singleJob): |
| 700 | + if (self._singleJob == jobName): |
| 701 | + return(self._chunkToDo) |
| 702 | + return(False) |
| 703 | + |
| 704 | + # read in contents from dump run info file and stuff into dumpItems for later reference |
| 705 | + def _setDumpItemRunInfo(self, runInfo): |
| 706 | + if (not runInfo.name()): |
| 707 | + return False |
| 708 | + for item in self.dumpItems: |
| 709 | + if (item.name() == runInfo.name()): |
| 710 | + item.setStatus(runInfo.status(),False) |
| 711 | + item.setUpdated(runInfo.updated()) |
| 712 | + item.setToBeRun(runInfo.toBeRun()) |
| 713 | + return True |
| 714 | + return False |
| 715 | + |
| 716 | + # write dump run info file |
| 717 | + # (this file is rewritten with updates after each dumpItem completes) |
| 718 | + def _reportDumpRunInfoLine(self, item): |
| 719 | + # even if the item has never been run we will at least have "waiting" in the status |
| 720 | + return "name:%s; status:%s; updated:%s" % (item.name(), item.status(), item.updated()) |
| 721 | + |
626 | 722 | class Checksummer(object): |
627 | | - def __init__(self,wiki,dumpDir): |
| 723 | + def __init__(self,wiki,dumpDir, enabled = True): |
628 | 724 | self.wiki = wiki |
629 | 725 | self.dumpDir = dumpDir |
630 | 726 | self.timestamp = time.strftime("%Y%m%d%H%M%S", time.gmtime()) |
631 | | - |
| 727 | + self._enabled = enabled |
| 728 | + |
| 729 | + def prepareChecksums(self): |
| 730 | + """Create a temporary md5 checksum file. |
| 731 | + Call this at the start of the dump run, and move the file |
| 732 | + into the final location at the completion of the dump run.""" |
| 733 | + if (self._enabled): |
| 734 | + checksumFileName = self._getChecksumFileNameTmp() |
| 735 | + output = file(checksumFileName, "w") |
| 736 | + |
| 737 | + def checksum(self, filename, runner): |
| 738 | + """Run checksum for an output file, and append to the list.""" |
| 739 | + if (self._enabled): |
| 740 | + checksumFileName = self._getChecksumFileNameTmp() |
| 741 | + output = file(checksumFileName, "a") |
| 742 | + self._saveChecksum(filename, output, runner) |
| 743 | + output.close() |
| 744 | + |
| 745 | + def moveMd5FileIntoPlace(self): |
| 746 | + if (self._enabled): |
| 747 | + tmpFileName = self._getChecksumFileNameTmp() |
| 748 | + realFileName = self._getChecksumFileName() |
| 749 | + os.rename(tmpFileName, realFileName) |
| 750 | + |
632 | 751 | def getChecksumFileNameBasename(self): |
633 | 752 | return ("md5sums.txt") |
634 | 753 | |
635 | | - def getChecksumFileName(self): |
| 754 | + # |
| 755 | + # functions internal to the class |
| 756 | + # |
| 757 | + def _getChecksumFileName(self): |
636 | 758 | return (self.dumpDir.publicPath(self.getChecksumFileNameBasename())) |
637 | 759 | |
638 | | - def getChecksumFileNameTmp(self): |
| 760 | + def _getChecksumFileNameTmp(self): |
639 | 761 | return (self.dumpDir.publicPath(self.getChecksumFileNameBasename() + "." + self.timestamp + ".tmp")) |
640 | 762 | |
641 | | - def prepareChecksums(self): |
642 | | - """Create a temporary md5 checksum file. |
643 | | - Call this at the start of the dump run, and move the file |
644 | | - into the final location at the completion of the dump run.""" |
645 | | - checksumFileName = self.getChecksumFileNameTmp() |
646 | | - output = file(checksumFileName, "w") |
647 | | - |
648 | | - def md5File(self, filename): |
| 763 | + def _md5File(self, filename): |
649 | 764 | summer = md5.new() |
650 | 765 | infile = file(filename, "rb") |
651 | 766 | bufsize = 4192 * 32 |
— | — | @@ -655,28 +770,16 @@ |
656 | 771 | infile.close() |
657 | 772 | return summer.hexdigest() |
658 | 773 | |
659 | | - def md5FileLine(self, filename): |
660 | | - return "%s %s\n" % (self.md5File(filename), os.path.basename(filename)) |
| 774 | + def _md5FileLine(self, filename): |
| 775 | + return "%s %s\n" % (self._md5File(filename), os.path.basename(filename)) |
661 | 776 | |
662 | | - def saveChecksum(self, file, output, runner): |
| 777 | + def _saveChecksum(self, file, output, runner): |
663 | 778 | runner.debug("Checksumming %s" % file) |
664 | 779 | path = self.dumpDir.publicPath(file) |
665 | 780 | if os.path.exists(path): |
666 | | - checksum = self.md5FileLine(path) |
| 781 | + checksum = self._md5FileLine(path) |
667 | 782 | output.write(checksum) |
668 | 783 | |
669 | | - def checksum(self, filename, runner): |
670 | | - """Run checksum for an output file, and append to the list.""" |
671 | | - checksumFileName = self.getChecksumFileNameTmp() |
672 | | - output = file(checksumFileName, "a") |
673 | | - self.saveChecksum(filename, output, runner) |
674 | | - output.close() |
675 | | - |
676 | | - def moveMd5FileIntoPlace(self): |
677 | | - tmpFileName = self.getChecksumFileNameTmp() |
678 | | - realFileName = self.getChecksumFileName() |
679 | | - os.rename(tmpFileName, realFileName) |
680 | | - |
681 | 784 | class DumpDir(object): |
682 | 785 | def __init__(self, wiki, dbName, date): |
683 | 786 | self._wiki = wiki |
— | — | @@ -711,7 +814,7 @@ |
712 | 815 | # everything that has to do with reporting the status of a piece |
713 | 816 | # of a dump is collected here |
714 | 817 | class Status(object): |
715 | | - def __init__(self, wiki, dumpDir, date, items, checksums, noticeFile = None, errorCallback=None): |
| 818 | + def __init__(self, wiki, dumpDir, date, items, checksums, enabled, noticeFile = None, errorCallback=None): |
716 | 819 | self.wiki = wiki |
717 | 820 | self.config = wiki.config |
718 | 821 | self.dbName = wiki.dbName |
— | — | @@ -723,16 +826,48 @@ |
724 | 827 | self.noticeFile = noticeFile |
725 | 828 | self.errorCallback = errorCallback |
726 | 829 | self.failCount = 0 |
| 830 | + self._enabled = enabled |
727 | 831 | |
728 | | - def saveStatusSummaryAndDetail(self, done=False): |
| 832 | + def updateStatusFiles(self, done=False): |
| 833 | + if self._enabled: |
| 834 | + self._saveStatusSummaryAndDetail(done) |
| 835 | + |
| 836 | + def reportFailure(self): |
| 837 | + if self._enabled: |
| 838 | + if self.config.adminMail: |
| 839 | + subject = "Dump failure for " + self.dbName |
| 840 | + message = self.config.readTemplate("errormail.txt") % { |
| 841 | + "db": self.dbName, |
| 842 | + "date": self.date, |
| 843 | + "time": TimeUtils.prettyTime(), |
| 844 | + "url": "/".join((self.config.webRoot, self.dbName, self.date, ''))} |
| 845 | + config.mail(subject, message) |
| 846 | + |
| 847 | + # this is a per-dump-item report (well per file generated by the item) |
| 848 | + # Report on the file size & item status of the current output and output a link if we are done |
| 849 | + def reportFile(self, file, itemStatus): |
| 850 | + filepath = self.dumpDir.publicPath(file) |
| 851 | + if itemStatus == "in-progress" and exists (filepath): |
| 852 | + size = FileUtils.prettySize(getsize(filepath)) |
| 853 | + return "<li class='file'>%s %s (written) </li>" % (file, size) |
| 854 | + elif itemStatus == "done" and exists(filepath): |
| 855 | + size = FileUtils.prettySize(getsize(filepath)) |
| 856 | + webpath = self.dumpDir.webPath(file) |
| 857 | + return "<li class='file'><a href=\"%s\">%s</a> %s</li>" % (webpath, file, size) |
| 858 | + else: |
| 859 | + return "<li class='missing'>%s</li>" % file |
| 860 | + |
| 861 | + # |
| 862 | + # functions internal to the class |
| 863 | + # |
| 864 | + def _saveStatusSummaryAndDetail(self, done=False): |
729 | 865 | """Write out an HTML file with the status for this wiki's dump |
730 | 866 | and links to completed files, as well as a summary status in a separate file.""" |
731 | 867 | try: |
732 | 868 | # Comprehensive report goes here |
733 | | - self.wiki.writePerDumpIndex(self.reportDatabaseStatusDetailed(done)) |
734 | | - |
| 869 | + self.wiki.writePerDumpIndex(self._reportDatabaseStatusDetailed(done)) |
735 | 870 | # Short line for report extraction goes here |
736 | | - self.wiki.writeStatus(self.reportDatabaseStatusSummary(done)) |
| 871 | + self.wiki.writeStatus(self._reportDatabaseStatusSummary(done)) |
737 | 872 | except: |
738 | 873 | message = "Couldn't update status files. Continuing anyways" |
739 | 874 | if self.errorCallback: |
— | — | @@ -740,47 +875,34 @@ |
741 | 876 | else: |
742 | 877 | print(message) |
743 | 878 | |
744 | | - def updateStatusFiles(self, done=False): |
745 | | - self.saveStatusSummaryAndDetail(done) |
746 | | - |
747 | | - def reportDatabaseStatusSummary(self, done=False): |
| 879 | + def _reportDatabaseStatusSummary(self, done = False): |
748 | 880 | """Put together a brief status summary and link for the current database.""" |
749 | | - status = self.reportStatusSummaryLine(done) |
| 881 | + status = self._reportStatusSummaryLine(done) |
750 | 882 | html = self.wiki.reportStatusLine(status) |
751 | 883 | |
752 | 884 | activeItems = [x for x in self.items if x.status() == "in-progress"] |
753 | 885 | if activeItems: |
754 | | - return html + "<ul>" + "\n".join([self.reportItem(x) for x in activeItems]) + "</ul>" |
| 886 | + return html + "<ul>" + "\n".join([self._reportItem(x) for x in activeItems]) + "</ul>" |
755 | 887 | else: |
756 | 888 | return html |
757 | 889 | |
758 | | - def reportDatabaseStatusDetailed(self, done=False): |
| 890 | + def _reportDatabaseStatusDetailed(self, done = False): |
759 | 891 | """Put together a status page for this database, with all its component dumps.""" |
760 | 892 | self.noticeFile.refreshNotice() |
761 | | - statusItems = [self.reportItem(item) for item in self.items] |
| 893 | + statusItems = [self._reportItem(item) for item in self.items] |
762 | 894 | statusItems.reverse() |
763 | 895 | html = "\n".join(statusItems) |
764 | 896 | return self.config.readTemplate("report.html") % { |
765 | 897 | "db": self.dbName, |
766 | 898 | "date": self.date, |
767 | 899 | "notice": self.noticeFile.notice, |
768 | | - "status": self.reportStatusSummaryLine(done), |
769 | | - "previous": self.reportPreviousDump(done), |
| 900 | + "status": self._reportStatusSummaryLine(done), |
| 901 | + "previous": self._reportPreviousDump(done), |
770 | 902 | "items": html, |
771 | 903 | "checksum": self.dumpDir.webPath(self.checksums.getChecksumFileNameBasename()), |
772 | 904 | "index": self.config.index} |
773 | 905 | |
774 | | - def reportFailure(self): |
775 | | - if self.config.adminMail: |
776 | | - subject = "Dump failure for " + self.dbName |
777 | | - message = self.config.readTemplate("errormail.txt") % { |
778 | | - "db": self.dbName, |
779 | | - "date": self.date, |
780 | | - "time": TimeUtils.prettyTime(), |
781 | | - "url": "/".join((self.config.webRoot, self.dbName, self.date, ''))} |
782 | | - config.mail(subject, message) |
783 | | - |
784 | | - def reportPreviousDump(self, done): |
| 906 | + def _reportPreviousDump(self, done): |
785 | 907 | """Produce a link to the previous dump, if any""" |
786 | 908 | # get the list of dumps for this wiki in order, find me in the list, find the one prev to me. |
787 | 909 | # why? we might be rerunning a job from an older dumps. we might have two |
— | — | @@ -805,7 +927,7 @@ |
806 | 928 | message = "previous dump from" |
807 | 929 | return "%s<a href=\"../%s/\">%s %s</a>" % (prefix, rawDate, message, prettyDate) |
808 | 930 | |
809 | | - def reportStatusSummaryLine(self, done=False): |
| 931 | + def _reportStatusSummaryLine(self, done=False): |
810 | 932 | if (done == "done"): |
811 | 933 | classes = "done" |
812 | 934 | text = "Dump complete" |
— | — | @@ -824,12 +946,15 @@ |
825 | 947 | text += ", %d item%s failed" % (self.failCount, ess) |
826 | 948 | return "<span class='%s'>%s</span>" % (classes, text) |
827 | 949 | |
828 | | - def reportItem(self, item): |
| 950 | + def _reportItem(self, item): |
829 | 951 | """Return an HTML fragment with info on the progress of this item.""" |
| 952 | + item.status() |
| 953 | + item.updated() |
| 954 | + item.description() |
830 | 955 | html = "<li class='%s'><span class='updates'>%s</span> <span class='status'>%s</span> <span class='title'>%s</span>" % (item.status(), item.updated(), item.status(), item.description()) |
831 | 956 | if item.progress: |
832 | 957 | html += "<div class='progress'>%s</div>\n" % item.progress |
833 | | - files = item.listFiles(self) |
| 958 | + files = item.listOutputFiles(self) |
834 | 959 | if files: |
835 | 960 | listItems = [self.reportFile(file, item.status()) for file in files] |
836 | 961 | html += "<ul>" |
— | — | @@ -841,59 +966,53 @@ |
842 | 967 | html += "</li>" |
843 | 968 | return html |
844 | 969 | |
845 | | - # this is a per-dump-item report (well per file generated by the item) |
846 | | - # Report on the file size & item status of the current output and output a link if we are done |
847 | | - def reportFile(self, file, itemStatus): |
848 | | - filepath = self.dumpDir.publicPath(file) |
849 | | - if itemStatus == "in-progress" and exists (filepath): |
850 | | - size = FileUtils.prettySize(getsize(filepath)) |
851 | | - return "<li class='file'>%s %s (written) </li>" % (file, size) |
852 | | - elif itemStatus == "done" and exists(filepath): |
853 | | - size = FileUtils.prettySize(getsize(filepath)) |
854 | | - webpath = self.dumpDir.webPath(file) |
855 | | - return "<li class='file'><a href=\"%s\">%s</a> %s</li>" % (webpath, file, size) |
856 | | - else: |
857 | | - return "<li class='missing'>%s</li>" % file |
858 | | - |
859 | 970 | class NoticeFile(object): |
860 | | - def __init__(self, wiki, date, notice): |
| 971 | + def __init__(self, wiki, date, notice, enabled): |
861 | 972 | self.wiki = wiki |
862 | 973 | self.date = date |
863 | 974 | self.notice = notice |
| 975 | + self._enabled = enabled |
| 976 | + self.writeNoticeFile() |
864 | 977 | |
865 | | - noticeFile = self.getNoticeFilename() |
866 | | - # delnotice. toss any existing file |
867 | | - if self.notice == False: |
868 | | - if exists(noticeFile): |
869 | | - os.remove(noticeFile) |
870 | | - self.notice = "" |
871 | | - # addnotice, stuff notice in a file for other jobs etc |
872 | | - elif self.notice != "": |
873 | | - noticeDir = self.getNoticeDir() |
874 | | - FileUtils.writeFile(noticeDir, noticeFile, self.notice, self.wiki.config.fileperms) |
875 | | - # default case. if there is a file get the contents, otherwise |
876 | | - # we have empty contents, all good |
877 | | - else: |
878 | | - if exists(noticeFile): |
879 | | - self.notice = FileUtils.readFile(noticeFile) |
880 | | - |
881 | | - def getNoticeFilename(self): |
882 | | - return os.path.join(self.wiki.publicDir(), self.date, "notice.txt") |
| 978 | + def writeNoticeFile(self): |
| 979 | + if (self._enabled): |
| 980 | + noticeFile = self._getNoticeFilename() |
| 981 | + # delnotice. toss any existing file |
| 982 | + if self.notice == False: |
| 983 | + if exists(noticeFile): |
| 984 | + os.remove(noticeFile) |
| 985 | + self.notice = "" |
| 986 | + # addnotice, stuff notice in a file for other jobs etc |
| 987 | + elif self.notice != "": |
| 988 | + noticeDir = self._getNoticeDir() |
| 989 | + FileUtils.writeFile(noticeDir, noticeFile, self.notice, self.wiki.config.fileperms) |
| 990 | + # default case. if there is a file get the contents, otherwise |
| 991 | + # we have empty contents, all good |
| 992 | + else: |
| 993 | + if exists(noticeFile): |
| 994 | + self.notice = FileUtils.readFile(noticeFile) |
883 | 995 | |
884 | | - def getNoticeDir(self): |
885 | | - return os.path.join(self.wiki.publicDir(), self.date); |
886 | | - |
887 | 996 | def refreshNotice(self): |
888 | 997 | # if the notice file has changed or gone away, we comply. |
889 | | - noticeFile = self.getNoticeFilename() |
| 998 | + noticeFile = self._getNoticeFilename() |
890 | 999 | if exists(noticeFile): |
891 | 1000 | self.notice = FileUtils.readFile(noticeFile) |
892 | 1001 | else: |
893 | 1002 | self.notice = "" |
894 | 1003 | |
| 1004 | + |
| 1005 | + # |
| 1006 | + # functions internal to class |
| 1007 | + # |
| 1008 | + def _getNoticeFilename(self): |
| 1009 | + return os.path.join(self.wiki.publicDir(), self.date, "notice.txt") |
| 1010 | + |
| 1011 | + def _getNoticeDir(self): |
| 1012 | + return os.path.join(self.wiki.publicDir(), self.date); |
| 1013 | + |
895 | 1014 | class Runner(object): |
896 | 1015 | |
897 | | - def __init__(self, wiki, date=None, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False): |
| 1016 | + def __init__(self, wiki, date=None, prefetch=True, spawn=True, job=None, restart=False, notice="", dryrun = False, loggingEnabled=False, chunkToDo = False): |
898 | 1017 | self.wiki = wiki |
899 | 1018 | self.config = wiki.config |
900 | 1019 | self.dbName = wiki.dbName |
— | — | @@ -905,7 +1024,24 @@ |
906 | 1025 | self.htmlNoticeFile = None |
907 | 1026 | self.log = None |
908 | 1027 | self.dryrun = dryrun |
| 1028 | + self._chunkToDo = chunkToDo |
| 1029 | + self._statusEnabled = True |
| 1030 | + self._checksummerEnabled = True |
| 1031 | + self._runInfoFileEnabled = True |
| 1032 | + self._symLinksEnabled = True |
| 1033 | + self._feedsEnabled = True |
| 1034 | + self._noticeFileEnabled = True |
909 | 1035 | |
| 1036 | + if self.dryrun or self._chunkToDo: |
| 1037 | + self._statusEnabled = False |
| 1038 | + self._checksummerEnabled = False |
| 1039 | + self._runInfoFileEnabled = False |
| 1040 | + self._symLinksEnabled = False |
| 1041 | + self._feedsEnabled = False |
| 1042 | + self._noticeFileEnabled = False |
| 1043 | + if self.dryrun: |
| 1044 | + self.loggingEnabled = False |
| 1045 | + |
910 | 1046 | if date: |
911 | 1047 | # Override, continuing a past dump? |
912 | 1048 | self.date = date |
— | — | @@ -913,34 +1049,28 @@ |
914 | 1050 | self.date = TimeUtils.today() |
915 | 1051 | wiki.setDate(self.date) |
916 | 1052 | |
917 | | - self.lastFailed = False |
918 | | - |
919 | 1053 | self.jobRequested = job |
920 | 1054 | self.dbServerInfo = DbServerInfo(self.wiki, self.dbName, self.logAndPrint) |
921 | | - |
922 | 1055 | self.dumpDir = DumpDir(self.wiki, self.dbName, self.date) |
923 | 1056 | |
924 | | - # this must come after the dumpdir setup so we know which directory we are in |
925 | | - # for the log file. |
926 | | - if (loggingEnabled and not self.dryrun): |
| 1057 | + self.lastFailed = False |
| 1058 | + |
| 1059 | + # these must come after the dumpdir setup so we know which directory we are in |
| 1060 | + if (loggingEnabled): |
927 | 1061 | self.logFileName = self.dumpDir.publicPath(config.logFile) |
928 | 1062 | self.makeDir(join(self.wiki.publicDir(), self.date)) |
929 | 1063 | self.log = Logger(self.logFileName) |
930 | 1064 | thread.start_new_thread(self.logQueueReader,(self.log,)) |
| 1065 | + self.runInfoFile = RunInfoFile(wiki,self._runInfoFileEnabled) |
| 1066 | + self.symLinks = SymLinks(self.wiki, self.dumpDir, self. date, self.logAndPrint, self.debug, self._symLinksEnabled) |
| 1067 | + self.feeds = Feeds(self.wiki,self.dumpDir,self.config, self.dbName, self.debug, self._feedsEnabled) |
| 1068 | + self.htmlNoticeFile = NoticeFile(self.wiki, self.date, notice, self._noticeFileEnabled) |
| 1069 | + self.checksums = Checksummer(self.wiki, self.dumpDir, self._checksummerEnabled) |
931 | 1070 | |
932 | | - # have to handle the notice file here instead of main, it goes in the per-run directory |
933 | | - if not dryrun: |
934 | | - self.htmlNoticeFile = NoticeFile(self.wiki, self.date, notice) |
935 | | - |
936 | | - if not dryrun: |
937 | | - self.checksums = Checksummer(self.wiki, self.dumpDir) |
938 | | - |
939 | 1071 | # some or all of these dumpItems will be marked to run |
940 | | - self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self.date, self.chunkInfo); |
| 1072 | + self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self.date, self._chunkToDo, self.jobRequested, self.chunkInfo, self.runInfoFile); |
| 1073 | + self.status = Status(self.wiki, self.dumpDir, self.date, self.dumpItemList.dumpItems, self.checksums, self._statusEnabled, self.htmlNoticeFile, self.logAndPrint) |
941 | 1074 | |
942 | | - if not self.dryrun: |
943 | | - self.status = Status(self.wiki, self.dumpDir, self.date, self.dumpItemList.dumpItems, self.checksums, self.htmlNoticeFile, self.logAndPrint) |
944 | | - |
945 | 1075 | def logQueueReader(self,log): |
946 | 1076 | if not log: |
947 | 1077 | return |
— | — | @@ -1045,19 +1175,20 @@ |
1046 | 1176 | self.lastFailed = True |
1047 | 1177 | |
1048 | 1178 | def runUpdateItemFileInfo(self, item): |
1049 | | - for f in item.listFiles(self): |
| 1179 | + for f in item.listOutputFiles(self): |
1050 | 1180 | print f |
1051 | 1181 | if exists(self.dumpDir.publicPath(f)): |
1052 | 1182 | # why would the file not exist? because we changed chunk numbers in the |
1053 | 1183 | # middle of a run, and now we list more files for the next stage than there |
1054 | 1184 | # were for earlier ones |
1055 | | - self.saveSymlink(f) |
1056 | | - self.saveFeed(f) |
| 1185 | + self.symLinks.saveSymlink(f) |
| 1186 | + self.feeds.saveFeed(f) |
1057 | 1187 | self.checksums.checksum(f, self) |
1058 | 1188 | |
1059 | 1189 | def run(self): |
1060 | 1190 | if (self.jobRequested): |
1061 | 1191 | if ((not self.dumpItemList.oldRunInfoRetrieved) and (self.wiki.existsPerDumpIndex())): |
| 1192 | + |
1062 | 1193 | # There was a previous run of all or part of this date, but... |
1063 | 1194 | # There was no old RunInfo to be had (or an error was encountered getting it) |
1064 | 1195 | # so we can't rerun a step and keep all the status information about the old run around. |
— | — | @@ -1097,41 +1228,36 @@ |
1098 | 1229 | self.cleanOldDumps() |
1099 | 1230 | self.showRunnerState("Starting backup of %s" % self.dbName) |
1100 | 1231 | |
1101 | | - files = self.listFilesFor(self.dumpItemList.dumpItems) |
1102 | | - |
1103 | 1232 | if (self.jobRequested): |
1104 | | - if not self.dryrun: |
1105 | | - self.checksums.prepareChecksums() |
| 1233 | + self.checksums.prepareChecksums() |
1106 | 1234 | |
1107 | 1235 | for item in self.dumpItemList.dumpItems: |
1108 | 1236 | if (item.toBeRun()): |
1109 | 1237 | item.start(self) |
1110 | | - if not self.dryrun: |
1111 | | - self.status.updateStatusFiles() |
1112 | | - self.dumpItemList.saveDumpRunInfoFile() |
| 1238 | + self.status.updateStatusFiles() |
| 1239 | + self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
1113 | 1240 | try: |
1114 | 1241 | item.dump(self) |
1115 | 1242 | except Exception, ex: |
1116 | 1243 | self.debug("*** exception! " + str(ex)) |
1117 | 1244 | item.setStatus("failed") |
1118 | | - if item.status() == "failed" and not self.dryrun: |
| 1245 | + if item.status() == "failed" and not self.dryrun and not self._chunkToDo: |
1119 | 1246 | self.runHandleFailure() |
1120 | 1247 | else: |
1121 | 1248 | self.lastFailed = False |
1122 | 1249 | # this ensures that, previous run or new one, the old or new md5sums go to the file |
1123 | | - if item.status() == "done" and not self.dryrun: |
| 1250 | + if item.status() == "done" and not self.dryrun and not self._chunkToDo: |
1124 | 1251 | self.runUpdateItemFileInfo(item) |
1125 | 1252 | |
1126 | | - if not self.dryrun: |
1127 | | - if (self.dumpItemList.allPossibleJobsDone()): |
1128 | | - self.status.updateStatusFiles("done") |
1129 | | - else: |
1130 | | - self.status.updateStatusFiles("partialdone") |
1131 | | - self.dumpItemList.saveDumpRunInfoFile() |
1132 | | - |
| 1253 | + if (self.dumpItemList.allPossibleJobsDone()): |
| 1254 | + self.status.updateStatusFiles("done") |
| 1255 | + else: |
| 1256 | + self.status.updateStatusFiles("partialdone") |
| 1257 | + self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
| 1258 | + if not self.dryrun and not self._chunkToDo: |
1133 | 1259 | # if any job succeeds we might as well make the sym link |
1134 | 1260 | if (self.status.failCount < 1): |
1135 | | - self.completeDump(files) |
| 1261 | + self.completeDump() |
1136 | 1262 | |
1137 | 1263 | if (self.restart): |
1138 | 1264 | self.showRunnerState("Completed run restarting from job %s for %s" % (self.jobRequested, self.dbName)) |
— | — | @@ -1139,31 +1265,29 @@ |
1140 | 1266 | self.showRunnerState("Completed job %s for %s" % (self.jobRequested, self.dbName)) |
1141 | 1267 | |
1142 | 1268 | else: |
1143 | | - if not self.dryrun: |
1144 | | - self.checksums.prepareChecksums() |
| 1269 | + self.checksums.prepareChecksums() |
1145 | 1270 | |
1146 | 1271 | for item in self.dumpItemList.dumpItems: |
1147 | 1272 | item.start(self) |
1148 | | - if not self.dryrun: |
1149 | | - self.status.updateStatusFiles() |
1150 | | - self.dumpItemList.saveDumpRunInfoFile() |
| 1273 | + self.status.updateStatusFiles() |
| 1274 | + self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
1151 | 1275 | try: |
1152 | 1276 | item.dump(self) |
1153 | 1277 | except Exception, ex: |
1154 | 1278 | self.debug("*** exception! " + str(ex)) |
1155 | 1279 | item.setStatus("failed") |
1156 | | - if item.status() == "failed" and not self.dryrun: |
| 1280 | + if item.status() == "failed" and not self.dryrun and not self._chunkToDo: |
1157 | 1281 | self.runHandleFailure() |
1158 | 1282 | else: |
1159 | | - if not self.dryrun: |
| 1283 | + if not self.dryrun and not self._chunkToDo: |
1160 | 1284 | self.runUpdateItemFileInfo(item) |
1161 | 1285 | self.lastFailed = False |
1162 | 1286 | |
1163 | | - if not self.dryrun: |
1164 | | - self.status.updateStatusFiles("done") |
1165 | | - self.dumpItemList.saveDumpRunInfoFile() |
| 1287 | + self.status.updateStatusFiles("done") |
| 1288 | + if not self.dryrun and not self._chunkToDo: |
| 1289 | + self.runInfoFile.saveDumpRunInfoFile(self.dumpItemList.reportDumpRunInfo()) |
1166 | 1290 | if self.status.failCount < 1: |
1167 | | - self.completeDump(files) |
| 1291 | + self.completeDump() |
1168 | 1292 | |
1169 | 1293 | self.showRunnerStateComplete() |
1170 | 1294 | |
— | — | @@ -1180,57 +1304,25 @@ |
1181 | 1305 | if old: |
1182 | 1306 | for dump in old: |
1183 | 1307 | self.showRunnerState("Purging old dump %s for %s" % (dump, self.dbName)) |
1184 | | - if not self.dryrun: |
| 1308 | + if not self.dryrun and not self._chunkToDo: |
1185 | 1309 | base = os.path.join(self.wiki.publicDir(), dump) |
1186 | 1310 | shutil.rmtree("%s" % base) |
1187 | 1311 | else: |
1188 | 1312 | self.showRunnerState("No old dumps to purge.") |
1189 | 1313 | |
1190 | | - def listFilesFor(self, items): |
1191 | | - files = [] |
1192 | | - for item in items: |
1193 | | - for file in item.listFiles(self): |
1194 | | - files.append(file) |
1195 | | - return files |
1196 | | - |
1197 | | - |
1198 | | - def lockFileName(self): |
1199 | | - return self.dumpDir.publicPath("lock") |
1200 | | - |
1201 | | - def doneFileName(self): |
1202 | | - return self.dumpDir.publicPath("done") |
1203 | | - |
1204 | | - def lock(self): |
1205 | | - self.showRunnerState("Creating lock file.") |
1206 | | - lockfile = self.lockFileName() |
1207 | | - donefile = self.doneFileName() |
1208 | | - if exists(lockfile): |
1209 | | - raise BackupError("Lock file %s already exists" % lockfile) |
1210 | | - if exists(donefile): |
1211 | | - self.showRunnerState("Removing completion marker %s" % donefile) |
1212 | | - os.remove(donefile) |
1213 | | - try: |
1214 | | - os.remove(lockfile) |
1215 | | - except: |
1216 | | - # failure? let it die |
1217 | | - pass |
1218 | | - |
1219 | | - def unlock(self): |
1220 | | - self.showRunnerState("Marking complete.") |
1221 | | - |
1222 | 1314 | def showRunnerState(self, message): |
1223 | 1315 | self.debug(message) |
1224 | 1316 | |
1225 | 1317 | def showRunnerStateComplete(self): |
1226 | 1318 | self.debug("SUCCESS: done.") |
1227 | 1319 | |
1228 | | - def completeDump(self, files): |
| 1320 | + def completeDump(self): |
1229 | 1321 | # note that it's possible for links in "latest" to point to |
1230 | 1322 | # files from different runs, in which case the md5sums file |
1231 | 1323 | # will have accurate checksums for the run for which it was |
1232 | 1324 | # produced, but not the other files. FIXME |
1233 | 1325 | self.checksums.moveMd5FileIntoPlace() |
1234 | | - self.saveSymlink(self.checksums.getChecksumFileNameBasename()) |
| 1326 | + self.symLinks.saveSymlink(self.checksums.getChecksumFileNameBasename()) |
1235 | 1327 | |
1236 | 1328 | def makeDir(self, dir): |
1237 | 1329 | if exists(dir): |
— | — | @@ -1239,34 +1331,67 @@ |
1240 | 1332 | self.debug("Creating %s ..." % dir) |
1241 | 1333 | os.makedirs(dir) |
1242 | 1334 | |
| 1335 | +class SymLinks(object): |
| 1336 | + def __init__(self, wiki, dumpDir, date, logfn, debugfn, enabled): |
| 1337 | + self.wiki = wiki |
| 1338 | + self.dumpDir = dumpDir |
| 1339 | + self.date = date |
| 1340 | + self._enabled = enabled |
| 1341 | + self.logfn = logfn |
| 1342 | + self.debugfn = debugfn |
| 1343 | + |
| 1344 | + def makeDir(self, dir): |
| 1345 | + if exists(dir): |
| 1346 | + self.debugfn("Checkdir dir %s ..." % dir) |
| 1347 | + else: |
| 1348 | + self.debugfn("Creating %s ..." % dir) |
| 1349 | + os.makedirs(dir) |
| 1350 | + |
1243 | 1351 | def saveSymlink(self, file): |
1244 | | - self.makeDir(join(self.wiki.publicDir(), 'latest')) |
1245 | | - real = self.dumpDir.publicPath(file) |
1246 | | - link = self.dumpDir.latestPath(file) |
1247 | | - if exists(link) or os.path.islink(link): |
1248 | | - if os.path.islink(link): |
1249 | | - realfile = os.readlink(link) |
1250 | | - # format of these links should be... ../20110228/elwikidb-20110228-templatelinks.sql.gz |
1251 | | - rellinkpattern = re.compile('^\.\./(20[0-9]+)/'); |
1252 | | - dateinlink = rellinkpattern.search(realfile) |
1253 | | - if (dateinlink): |
1254 | | - dateoflinkedfile = dateinlink.group(1) |
1255 | | - dateinterval = int(self.date) - int(dateoflinkedfile) |
| 1352 | + if (self._enabled): |
| 1353 | + self.makeDir(join(self.wiki.publicDir(), 'latest')) |
| 1354 | + real = self.dumpDir.publicPath(file) |
| 1355 | + link = self.dumpDir.latestPath(file) |
| 1356 | + if exists(link) or os.path.islink(link): |
| 1357 | + if os.path.islink(link): |
| 1358 | + realfile = os.readlink(link) |
| 1359 | + # format of these links should be... ../20110228/elwikidb-20110228-templatelinks.sql.gz |
| 1360 | + rellinkpattern = re.compile('^\.\./(20[0-9]+)/'); |
| 1361 | + dateinlink = rellinkpattern.search(realfile) |
| 1362 | + if (dateinlink): |
| 1363 | + dateoflinkedfile = dateinlink.group(1) |
| 1364 | + dateinterval = int(self.date) - int(dateoflinkedfile) |
| 1365 | + else: |
| 1366 | + dateinterval = 0 |
| 1367 | + # no file or it's older than ours... *then* remove the link |
| 1368 | + if not exists(os.path.realpath(link)) or dateinterval > 0: |
| 1369 | + self.debug("Removing old symlink %s" % link) |
| 1370 | + os.remove(link) |
1256 | 1371 | else: |
1257 | | - dateinterval = 0 |
1258 | | - # no file or it's older than ours... *then* remove the link |
1259 | | - if not exists(os.path.realpath(link)) or dateinterval > 0: |
1260 | | - self.debug("Removing old symlink %s" % link) |
1261 | | - os.remove(link) |
1262 | | - else: |
1263 | | - self.logAndPrint("What the hell dude, %s is not a symlink" % link) |
1264 | | - raise BackupError("What the hell dude, %s is not a symlink" % link) |
1265 | | - relative = FileUtils.relativePath(real, dirname(link)) |
1266 | | - # if we removed the link cause it's obsolete, make the new one |
1267 | | - if exists(real) and not exists(link): |
1268 | | - self.debug("Adding symlink %s -> %s" % (link, relative)) |
1269 | | - os.symlink(relative, link) |
| 1372 | + self.logfn("What the hell dude, %s is not a symlink" % link) |
| 1373 | + raise BackupError("What the hell dude, %s is not a symlink" % link) |
| 1374 | + relative = FileUtils.relativePath(real, dirname(link)) |
| 1375 | + # if we removed the link cause it's obsolete, make the new one |
| 1376 | + if exists(real) and not exists(link): |
| 1377 | + self.debugfn("Adding symlink %s -> %s" % (link, relative)) |
| 1378 | + os.symlink(relative, link) |
1270 | 1379 | |
| 1380 | +class Feeds(object): |
| 1381 | + def __init__(self, wiki, dumpDir, config, dbName, debugfn, enabled): |
| 1382 | + self.wiki = wiki |
| 1383 | + self.dumpDir = dumpDir |
| 1384 | + self.config = config |
| 1385 | + self.dbName = dbName |
| 1386 | + self.debugfn = debugfn |
| 1387 | + self._enabled = enabled |
| 1388 | + |
| 1389 | + def makeDir(self, dir): |
| 1390 | + if exists(dir): |
| 1391 | + self.debugfn("Checkdir dir %s ..." % dir) |
| 1392 | + else: |
| 1393 | + self.debugfn("Creating %s ..." % dir) |
| 1394 | + os.makedirs(dir) |
| 1395 | + |
1271 | 1396 | def saveFeed(self, file): |
1272 | 1397 | self.makeDir(join(self.wiki.publicDir(), 'latest')) |
1273 | 1398 | filePath = self.dumpDir.webPath(file) |
— | — | @@ -1284,6 +1409,7 @@ |
1285 | 1410 | rssPath = self.dumpDir.latestPath(file + "-rss.xml") |
1286 | 1411 | FileUtils.writeFile(directory, rssPath, rssText, self.config.fileperms) |
1287 | 1412 | |
| 1413 | + |
1288 | 1414 | class Dump(object): |
1289 | 1415 | def __init__(self, name, desc): |
1290 | 1416 | self._desc = desc |
— | — | @@ -1326,7 +1452,7 @@ |
1327 | 1453 | """Optionally return additional text to appear under the heading.""" |
1328 | 1454 | return None |
1329 | 1455 | |
1330 | | - def listFiles(self, runner): |
| 1456 | + def listOutputFiles(self, runner): |
1331 | 1457 | """Return a list of filenames which should be exported and checksummed""" |
1332 | 1458 | return [] |
1333 | 1459 | |
— | — | @@ -1356,7 +1482,7 @@ |
1357 | 1483 | sys.stderr.write(line) |
1358 | 1484 | self.progress = line.strip() |
1359 | 1485 | runner.status.updateStatusFiles() |
1360 | | - runner.dumpItemList.saveDumpRunInfoFile() |
| 1486 | + runner.runInfoFile.saveDumpRunInfoFile(runner.dumpItemList.reportDumpRunInfo()) |
1361 | 1487 | |
1362 | 1488 | def timeToWait(self): |
1363 | 1489 | # we use wait this many secs for a command to complete that |
— | — | @@ -1462,7 +1588,7 @@ |
1463 | 1589 | if (error): |
1464 | 1590 | raise BackupError("error dumping table %s" % self._table) |
1465 | 1591 | |
1466 | | - def listFiles(self, runner): |
| 1592 | + def listOutputFiles(self, runner): |
1467 | 1593 | return [self._file()] |
1468 | 1594 | |
1469 | 1595 | class PrivateTable(PublicTable): |
— | — | @@ -1474,7 +1600,7 @@ |
1475 | 1601 | def _path(self, runner): |
1476 | 1602 | return runner.dumpDir.privatePath(self._file()) |
1477 | 1603 | |
1478 | | - def listFiles(self, runner): |
| 1604 | + def listOutputFiles(self, runner): |
1479 | 1605 | """Private table won't have public files to list.""" |
1480 | 1606 | return [] |
1481 | 1607 | |
— | — | @@ -1484,21 +1610,31 @@ |
1485 | 1611 | A second pass will import text from prior dumps or the database to make |
1486 | 1612 | full files for the public.""" |
1487 | 1613 | |
1488 | | - def __init__(self, name, desc, chunks = False): |
| 1614 | + def __init__(self, name, desc, chunkToDo, chunks = False): |
1489 | 1615 | Dump.__init__(self, name, desc) |
| 1616 | + self._chunkToDo = chunkToDo |
1490 | 1617 | self._chunks = chunks |
1491 | 1618 | |
1492 | 1619 | def detail(self): |
1493 | 1620 | return "These files contain no page text, only revision metadata." |
1494 | 1621 | |
1495 | | - def listFiles(self, runner, unnumbered=False): |
1496 | | - if (self._chunks) and not unnumbered: |
1497 | | - files = [] |
1498 | | - for i in range(1, len(self._chunks) + 1): |
1499 | | - files.append("stub-meta-history%s.xml.gz" % i) |
1500 | | - files.append("stub-meta-current%s.xml.gz" % i) |
1501 | | - files.append("stub-articles%s.xml.gz" % i) |
1502 | | - return files |
| 1622 | + def listOutputFiles(self, runner, unnumbered=False): |
| 1623 | + if (self._chunks): |
| 1624 | + if (self._chunkToDo): |
| 1625 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 1626 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 1627 | + files = [] |
| 1628 | + files.append("stub-meta-history%s.xml.gz" % self._chunkToDo) |
| 1629 | + files.append("stub-meta-current%s.xml.gz" % self._chunkToDo) |
| 1630 | + files.append("stub-articles%s.xml.gz" % self._chunkToDo) |
| 1631 | + return files |
| 1632 | + else: |
| 1633 | + files = [] |
| 1634 | + for i in range(1, len(self._chunks) + 1): |
| 1635 | + files.append("stub-meta-history%s.xml.gz" % i) |
| 1636 | + files.append("stub-meta-current%s.xml.gz" % i) |
| 1637 | + files.append("stub-articles%s.xml.gz" % i) |
| 1638 | + return files |
1503 | 1639 | else: |
1504 | 1640 | return ["stub-meta-history.xml.gz", |
1505 | 1641 | "stub-meta-current.xml.gz", |
— | — | @@ -1578,10 +1714,17 @@ |
1579 | 1715 | def run(self, runner): |
1580 | 1716 | commands = [] |
1581 | 1717 | if self._chunks: |
1582 | | - for i in range(1, len(self._chunks)+1): |
1583 | | - self.cleanupOldFiles(runner,i) |
1584 | | - series = self.buildCommand(runner, i) |
| 1718 | + if (self._chunkToDo): |
| 1719 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 1720 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 1721 | + self.cleanupOldFiles(runner,self._chunkToDo) |
| 1722 | + series = self.buildCommand(runner, self._chunkToDo) |
1585 | 1723 | commands.append(series) |
| 1724 | + else: |
| 1725 | + for i in range(1, len(self._chunks)+1): |
| 1726 | + self.cleanupOldFiles(runner,i) |
| 1727 | + series = self.buildCommand(runner, i) |
| 1728 | + commands.append(series) |
1586 | 1729 | else: |
1587 | 1730 | self.cleanupOldFiles(runner) |
1588 | 1731 | series = self.buildCommand(runner) |
— | — | @@ -1592,20 +1735,24 @@ |
1593 | 1736 | |
1594 | 1737 | class RecombineXmlStub(XmlStub): |
1595 | 1738 | def __init__(self, name, desc, chunks): |
1596 | | - XmlStub.__init__(self, name, desc, chunks) |
| 1739 | + XmlStub.__init__(self, name, desc, False, chunks) |
1597 | 1740 | # this is here only so that a callback can capture output from some commands |
1598 | 1741 | # related to recombining files if we did parallel runs of the recompression |
1599 | 1742 | self._output = None |
1600 | 1743 | |
1601 | | - # oh crap we need to be able to produce a list of output files, what else? |
1602 | | - def listFiles(self, runner): |
1603 | | - return(XmlStub.listFiles(self, runner, unnumbered=True)) |
| 1744 | + def listInputFiles(self, runner): |
| 1745 | + return(XmlStub.listOutputFiles(self, runner)) |
1604 | 1746 | |
| 1747 | + def listOutputFiles(self, runner): |
| 1748 | + return ["stub-meta-history.xml.gz", |
| 1749 | + "stub-meta-current.xml.gz", |
| 1750 | + "stub-articles.xml.gz"] |
| 1751 | + |
1605 | 1752 | def run(self, runner): |
1606 | 1753 | error=0 |
1607 | 1754 | if (self._chunks): |
1608 | | - files = XmlStub.listFiles(self,runner) |
1609 | | - outputFileList = self.listFiles(runner) |
| 1755 | + files = self.listInputFiles(runner) |
| 1756 | + outputFileList = self.listOutputFiles(runner) |
1610 | 1757 | for outputFile in outputFileList: |
1611 | 1758 | inputFiles = [] |
1612 | 1759 | for inFile in files: |
— | — | @@ -1641,7 +1788,7 @@ |
1642 | 1789 | def detail(self): |
1643 | 1790 | return "This contains the log of actions performed on pages." |
1644 | 1791 | |
1645 | | - def listFiles(self, runner): |
| 1792 | + def listOutputFiles(self, runner): |
1646 | 1793 | return ["pages-logging.xml.gz"] |
1647 | 1794 | |
1648 | 1795 | def cleanupOldFiles(self, runner): |
— | — | @@ -1672,14 +1819,16 @@ |
1673 | 1820 | |
1674 | 1821 | class XmlDump(Dump): |
1675 | 1822 | """Primary XML dumps, one section at a time.""" |
1676 | | - def __init__(self, subset, name, desc, detail, prefetch, spawn, chunks = False): |
| 1823 | + def __init__(self, subset, name, desc, detail, prefetch, spawn, chunkToDo, chunks = False): |
1677 | 1824 | Dump.__init__(self, name, desc) |
1678 | 1825 | self._subset = subset |
1679 | 1826 | self._detail = detail |
| 1827 | + self._desc = desc |
1680 | 1828 | self._prefetch = prefetch |
1681 | 1829 | self._spawn = spawn |
1682 | 1830 | self._chunks = chunks |
1683 | 1831 | self._pageID = {} |
| 1832 | + self._chunkToDo = chunkToDo |
1684 | 1833 | |
1685 | 1834 | def detail(self): |
1686 | 1835 | """Optionally return additional text to appear under the heading.""" |
— | — | @@ -1697,9 +1846,15 @@ |
1698 | 1847 | def run(self, runner): |
1699 | 1848 | commands = [] |
1700 | 1849 | if (self._chunks): |
1701 | | - for i in range(1, len(self._chunks)+1): |
1702 | | - series = self.buildCommand(runner, i) |
| 1850 | + if (self._chunkToDo): |
| 1851 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 1852 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 1853 | + series = self.buildCommand(runner, self._chunkToDo) |
1703 | 1854 | commands.append(series) |
| 1855 | + else: |
| 1856 | + for i in range(1, len(self._chunks)+1): |
| 1857 | + series = self.buildCommand(runner, i) |
| 1858 | + commands.append(series) |
1704 | 1859 | else: |
1705 | 1860 | series = self.buildCommand(runner) |
1706 | 1861 | commands.append(series) |
— | — | @@ -1712,9 +1867,13 @@ |
1713 | 1868 | # check to see if any of the output files are truncated |
1714 | 1869 | files = [] |
1715 | 1870 | if (self._chunks): |
1716 | | - for i in range(1, len(self._chunks)+1): |
1717 | | - files.append( self._path(runner, 'bz2', i ) ) |
1718 | | - files.append( self._path(runner, 'bz2', i ) ) |
| 1871 | + if (self._chunkToDo): |
| 1872 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 1873 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 1874 | + files.append( self._path(runner, 'bz2', self._chunkToDo ) ) |
| 1875 | + else: |
| 1876 | + for i in range(1, len(self._chunks)+1): |
| 1877 | + files.append( self._path(runner, 'bz2', i ) ) |
1719 | 1878 | |
1720 | 1879 | for f in files: |
1721 | 1880 | pipeline = [] |
— | — | @@ -1900,7 +2059,7 @@ |
1901 | 2060 | if realpath(old) == current: |
1902 | 2061 | runner.debug("skipping current dump for prefetch %s" % possible) |
1903 | 2062 | continue |
1904 | | - if not self.statusOfOldDumpIsDone(runner, date): |
| 2063 | + if not runner.runInfoFile.statusOfOldDumpIsDone(runner, date, self.name, self._desc): |
1905 | 2064 | runner.debug("skipping incomplete or failed dump for prefetch %s" % possible) |
1906 | 2065 | continue |
1907 | 2066 | if (chunk) and (self.filenameHasChunk(possible, "bz2")): |
— | — | @@ -1936,82 +2095,8 @@ |
1937 | 2096 | runner.debug("Could not locate a prefetchable dump.") |
1938 | 2097 | return None |
1939 | 2098 | |
1940 | | - # find desc in there, look for "class='done'" |
1941 | | - def _getStatusForJobFromIndexFileLine(self, line, desc): |
1942 | | - if not(">"+desc+"<" in line): |
1943 | | - return None |
1944 | | - if "<li class='done'>" in line: |
1945 | | - return "done" |
1946 | | - else: |
1947 | | - return "other" |
1948 | | - |
1949 | | - def _getStatusForJobFromIndexFile(self, filename, desc): |
1950 | | - # read the index file in, if there is one, and find out whether |
1951 | | - # a particular job (one step only, not a multiple piece job) has been |
1952 | | - # already run and whether it was successful (use to examine status |
1953 | | - # of step from some previous run) |
1954 | | - try: |
1955 | | - infile = open(filename,"r") |
1956 | | - for line in infile: |
1957 | | - result = self._getStatusForJobFromIndexFileLine(line, desc) |
1958 | | - if (not result == None): |
1959 | | - return result |
1960 | | - infile.close |
1961 | | - return None |
1962 | | - except: |
1963 | | - return None |
1964 | | - |
1965 | | - # format: name:%; updated:%; status:% |
1966 | | - def _getStatusForJobFromRunInfoFileLine(self, line, jobName): |
1967 | | - # get rid of leading/trailing/embedded blanks |
1968 | | - line = line.replace(" ","") |
1969 | | - line = line.replace("\n","") |
1970 | | - fields = line.split(';',3) |
1971 | | - for field in fields: |
1972 | | - (fieldName, separator, fieldValue) = field.partition(':') |
1973 | | - if (fieldName == "name"): |
1974 | | - if (not fieldValue == jobName): |
1975 | | - return None |
1976 | | - elif (fieldName == "status"): |
1977 | | - return fieldValue |
1978 | | - |
1979 | | - def _getStatusForJobFromRunInfoFile(self, filename, jobName = ""): |
1980 | | - # read the dump run info file in, if there is one, and find out whether |
1981 | | - # a particular job (one step only, not a multiple piece job) has been |
1982 | | - # already run and whether it was successful (use to examine status |
1983 | | - # of step from some previous run) |
1984 | | - try: |
1985 | | - infile = open(filename,"r") |
1986 | | - for line in infile: |
1987 | | - result = self._getStatusForJobFromRunInfoFileLine(line, jobName) |
1988 | | - if (not result == None): |
1989 | | - return result |
1990 | | - infile.close |
1991 | | - return None |
1992 | | - except: |
1993 | | - return None |
1994 | | - |
1995 | | - def statusOfOldDumpIsDone(self, runner, date): |
1996 | | - oldDumpRunInfoFilename=runner.dumpItemList._getDumpRunInfoFileName(date) |
1997 | | - jobName = self.name() |
1998 | | - status = self._getStatusForJobFromRunInfoFile(oldDumpRunInfoFilename, jobName) |
1999 | | - if (status == "done"): |
2000 | | - return 1 |
2001 | | - elif (not status == None): |
2002 | | - # failure, in progress, some other useless thing |
2003 | | - return 0 |
2004 | | - |
2005 | | - # ok, there was no info there to be had, try the index file. yuck. |
2006 | | - indexFilename = os.path.join(runner.wiki.publicDir(), date, runner.config.perDumpIndex) |
2007 | | - desc = self._desc |
2008 | | - status = self._getStatusForJobFromIndexFile(indexFilename, desc) |
2009 | | - if (status == "done"): |
2010 | | - return 1 |
2011 | | - else: |
2012 | | - return 0 |
2013 | | - |
2014 | | - def listFiles(self, runner, unnumbered = False): |
2015 | | - if (self._chunks) and not unnumbered: |
| 2099 | + def listOutputFiles(self, runner): |
| 2100 | + if (self._chunks): |
2016 | 2101 | files = [] |
2017 | 2102 | for i in range(1, len(self._chunks)+1): |
2018 | 2103 | files.append(self._file("bz2",i)) |
— | — | @@ -2022,19 +2107,22 @@ |
2023 | 2108 | class RecombineXmlDump(XmlDump): |
2024 | 2109 | def __init__(self, subset, name, desc, detail, chunks = False): |
2025 | 2110 | # no prefetch, no spawn |
2026 | | - XmlDump.__init__(self, subset, name, desc, detail, None, None, chunks) |
| 2111 | + XmlDump.__init__(self, subset, name, desc, detail, None, None, False, chunks) |
2027 | 2112 | # this is here only so that a callback can capture output from some commands |
2028 | 2113 | # related to recombining files if we did parallel runs of the recompression |
2029 | 2114 | self._output = None |
2030 | 2115 | |
2031 | | - def listFiles(self, runner): |
2032 | | - return(XmlDump.listFiles(self, runner, unnumbered=True)) |
| 2116 | + def listInputFiles(self, runner): |
| 2117 | + return XmlDump.listOutputFiles(self,runner) |
2033 | 2118 | |
| 2119 | + def listOutputFiles(self, runner): |
| 2120 | + return [ self._file("bz2",0) ] |
| 2121 | + |
2034 | 2122 | def run(self, runner): |
2035 | 2123 | error=0 |
2036 | 2124 | if (self._chunks): |
2037 | | - files = XmlDump.listFiles(self,runner) |
2038 | | - outputFileList = self.listFiles(runner) |
| 2125 | + files = self.listInputFiles(runner) |
| 2126 | + outputFileList = self.listOutputFiles(runner) |
2039 | 2127 | for outputFile in outputFileList: |
2040 | 2128 | inputFiles = [] |
2041 | 2129 | for inFile in files: |
— | — | @@ -2071,11 +2159,12 @@ |
2072 | 2160 | class XmlRecompressDump(Dump): |
2073 | 2161 | """Take a .bz2 and recompress it as 7-Zip.""" |
2074 | 2162 | |
2075 | | - def __init__(self, subset, name, desc, detail, chunks = False): |
| 2163 | + def __init__(self, subset, name, desc, detail, chunkToDo, chunks = False): |
2076 | 2164 | Dump.__init__(self, name, desc) |
2077 | 2165 | self._subset = subset |
2078 | 2166 | self._detail = detail |
2079 | 2167 | self._chunks = chunks |
| 2168 | + self._chunkToDo = chunkToDo |
2080 | 2169 | |
2081 | 2170 | def detail(self): |
2082 | 2171 | """Optionally return additional text to appear under the heading.""" |
— | — | @@ -2127,11 +2216,18 @@ |
2128 | 2217 | raise BackupError("bz2 dump incomplete, not recompressing") |
2129 | 2218 | commands = [] |
2130 | 2219 | if (self._chunks): |
2131 | | - for i in range(1, len(self._chunks)+1): |
2132 | | - # Clear prior 7zip attempts; 7zip will try to append an existing archive |
2133 | | - self.cleanupOldFiles(runner, i) |
2134 | | - series = self.buildCommand(runner, i) |
| 2220 | + if (self._chunkToDo): |
| 2221 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 2222 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 2223 | + self.cleanupOldFiles(runner, self._chunkToDo) |
| 2224 | + series = self.buildCommand(runner, self._chunkToDo) |
2135 | 2225 | commands.append(series) |
| 2226 | + else: |
| 2227 | + for i in range(1, len(self._chunks)+1): |
| 2228 | + # Clear prior 7zip attempts; 7zip will try to append an existing archive |
| 2229 | + self.cleanupOldFiles(runner, i) |
| 2230 | + series = self.buildCommand(runner, i) |
| 2231 | + commands.append(series) |
2136 | 2232 | else: |
2137 | 2233 | # Clear prior 7zip attempts; 7zip will try to append an existing archive |
2138 | 2234 | self.cleanupOldFiles(runner) |
— | — | @@ -2141,10 +2237,17 @@ |
2142 | 2238 | # temp hack force 644 permissions until ubuntu bug # 370618 is fixed - tomasz 5/1/2009 |
2143 | 2239 | # some hacks aren't so temporary - atg 3 sept 2010 |
2144 | 2240 | if (self._chunks): |
2145 | | - for i in range(1, len(self._chunks)+1): |
2146 | | - xml7z = self.buildOutputFilename(runner,i) |
| 2241 | + if (self._chunkToDo): |
| 2242 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 2243 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 2244 | + xml7z = self.buildOutputFilename(runner,self._chunkToDo) |
2147 | 2245 | if exists(xml7z): |
2148 | 2246 | os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH ) |
| 2247 | + else: |
| 2248 | + for i in range(1, len(self._chunks)+1): |
| 2249 | + xml7z = self.buildOutputFilename(runner,i) |
| 2250 | + if exists(xml7z): |
| 2251 | + os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH ) |
2149 | 2252 | else: |
2150 | 2253 | xml7z = self.buildOutputFilename(runner) |
2151 | 2254 | if exists(xml7z): |
— | — | @@ -2152,12 +2255,19 @@ |
2153 | 2256 | if (error): |
2154 | 2257 | raise BackupError("error recompressing bz2 file(s)") |
2155 | 2258 | |
2156 | | - def listFiles(self, runner, unnumbered = False): |
2157 | | - if (self._chunks) and not unnumbered: |
2158 | | - files = [] |
2159 | | - for i in range(1, len(self._chunks)+1): |
2160 | | - files.append(self._file("7z",i)) |
2161 | | - return files |
| 2259 | + def listOutputFiles(self, runner): |
| 2260 | + if (self._chunks): |
| 2261 | + if (self._chunkToDo): |
| 2262 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 2263 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 2264 | + files = [] |
| 2265 | + files.append(self._file("7z",self._chunkToDo)) |
| 2266 | + return files |
| 2267 | + else: |
| 2268 | + files = [] |
| 2269 | + for i in range(1, len(self._chunks)+1): |
| 2270 | + files.append(self._file("7z",i)) |
| 2271 | + return files |
2162 | 2272 | else: |
2163 | 2273 | return [ self._file("7z",0) ] |
2164 | 2274 | |
— | — | @@ -2166,17 +2276,19 @@ |
2167 | 2277 | |
2168 | 2278 | class RecombineXmlRecompressDump(XmlRecompressDump): |
2169 | 2279 | def __init__(self, subset, name, desc, detail, chunks): |
2170 | | - XmlRecompressDump.__init__(self, subset, name, desc, detail, chunks) |
| 2280 | + XmlRecompressDump.__init__(self, subset, name, desc, detail, False, chunks) |
2171 | 2281 | # this is here only so that a callback can capture output from some commands |
2172 | 2282 | # related to recombining files if we did parallel runs of the recompression |
2173 | 2283 | self._output = None |
2174 | 2284 | |
2175 | | - def listFiles(self, runner): |
2176 | | - return(XmlRecompressDump.listFiles(self, runner, unnumbered=True)) |
| 2285 | + def listInputFiles(self, runner): |
| 2286 | + return XmlRecompressDump.listOutputFiles(self,runner) |
2177 | 2287 | |
| 2288 | + def listOutputFiles(self, runner): |
| 2289 | + return [ self._file("7z",0) ] |
| 2290 | + |
2178 | 2291 | def cleanupOldFiles(self, runner): |
2179 | | - files = self.listFiles(runner) |
2180 | | - print "here is cleanup" |
| 2292 | + files = self.listOutputFiles(runner) |
2181 | 2293 | for filename in files: |
2182 | 2294 | filename = runner.dumpDir.publicPath(filename) |
2183 | 2295 | if exists(filename): |
— | — | @@ -2186,8 +2298,8 @@ |
2187 | 2299 | error = 0 |
2188 | 2300 | if (self._chunks): |
2189 | 2301 | self.cleanupOldFiles(runner) |
2190 | | - files = XmlRecompressDump.listFiles(self,runner) |
2191 | | - outputFileList = self.listFiles(runner) |
| 2302 | + files = self.listInputFiles(runner) |
| 2303 | + outputFileList = self.listOutputFiles(runner) |
2192 | 2304 | for outputFile in outputFileList: |
2193 | 2305 | inputFiles = [] |
2194 | 2306 | for inFile in files: |
— | — | @@ -2216,8 +2328,9 @@ |
2217 | 2329 | class AbstractDump(Dump): |
2218 | 2330 | """XML dump for Yahoo!'s Active Abstracts thingy""" |
2219 | 2331 | |
2220 | | - def __init__(self, name, desc, chunks = False): |
| 2332 | + def __init__(self, name, desc, chunkToDo, chunks = False): |
2221 | 2333 | Dump.__init__(self, name, desc) |
| 2334 | + self._chunkToDo = chunkToDo |
2222 | 2335 | self._chunks = chunks |
2223 | 2336 | |
2224 | 2337 | def buildCommand(self, runner, chunk = 0): |
— | — | @@ -2254,9 +2367,15 @@ |
2255 | 2368 | def run(self, runner): |
2256 | 2369 | commands = [] |
2257 | 2370 | if (self._chunks): |
2258 | | - for i in range(1, len(self._chunks)+1): |
2259 | | - series = self.buildCommand(runner, i) |
| 2371 | + if (self._chunkToDo): |
| 2372 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 2373 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 2374 | + series = self.buildCommand(runner, self._chunkToDo) |
2260 | 2375 | commands.append(series) |
| 2376 | + else: |
| 2377 | + for i in range(1, len(self._chunks)+1): |
| 2378 | + series = self.buildCommand(runner, i) |
| 2379 | + commands.append(series) |
2261 | 2380 | else: |
2262 | 2381 | series = self.buildCommand(runner) |
2263 | 2382 | commands.append(series) |
— | — | @@ -2290,31 +2409,42 @@ |
2291 | 2410 | else: |
2292 | 2411 | return( "abstract-%s%s.xml" % (variant, chunkInfo) ) |
2293 | 2412 | |
2294 | | - def listFiles(self, runner, unnumbered = False): |
| 2413 | + def listOutputFiles(self, runner): |
2295 | 2414 | files = [] |
2296 | 2415 | for x in self._variants(runner): |
2297 | | - if (self._chunks) and not unnumbered: |
2298 | | - for i in range(1, len(self._chunks)+1): |
2299 | | - files.append(self._variantFile(x, i)) |
| 2416 | + if (self._chunks): |
| 2417 | + if (self._chunkToDo): |
| 2418 | + if (self._chunkToDo < 1 or self._chunkToDo > len(self._chunks)): |
| 2419 | + raise BackupError("chunk option must be in range of available chunks to rerun, 1 through %s\n" % str(len(self._chunks))) |
| 2420 | + files.append(self._variantFile(x, self._chunkToDo)) |
| 2421 | + else: |
| 2422 | + for i in range(1, len(self._chunks)+1): |
| 2423 | + files.append(self._variantFile(x, i)) |
2300 | 2424 | else: |
2301 | 2425 | files.append(self._variantFile(x)) |
2302 | 2426 | return files |
2303 | 2427 | |
2304 | 2428 | class RecombineAbstractDump(AbstractDump): |
2305 | 2429 | def __init__(self, name, desc, chunks): |
2306 | | - AbstractDump.__init__(self, name, desc, chunks) |
| 2430 | + AbstractDump.__init__(self, name, desc, False, chunks) |
2307 | 2431 | # this is here only so that a callback can capture output from some commands |
2308 | 2432 | # related to recombining files if we did parallel runs of the recompression |
2309 | 2433 | self._output = None |
2310 | 2434 | |
2311 | | - def listFiles(self, runner): |
2312 | | - return(AbstractDump.listFiles(self,runner, unnumbered = True)) |
| 2435 | + def listOutputFiles(self, runner): |
| 2436 | + files = [] |
| 2437 | + for x in self._variants(runner): |
| 2438 | + files.append(self._variantFile(x)) |
| 2439 | + return files |
2313 | 2440 | |
| 2441 | + def listInputFiles(self, runner): |
| 2442 | + return(AbstractDump.listOutputFiles(self,runner)) |
| 2443 | + |
2314 | 2444 | def run(self, runner): |
2315 | 2445 | error = 0 |
2316 | 2446 | if (self._chunks): |
2317 | | - files = AbstractDump.listFiles(self,runner) |
2318 | | - outputFileList = self.listFiles(runner) |
| 2447 | + files = AbstractDump.listOutputFiles(self,runner) |
| 2448 | + outputFileList = self.listOutputFiles(runner) |
2319 | 2449 | for outputFile in outputFileList: |
2320 | 2450 | inputFiles = [] |
2321 | 2451 | for inFile in files: |
— | — | @@ -2354,11 +2484,11 @@ |
2355 | 2485 | if (error): |
2356 | 2486 | raise BackupError("error dumping titles list") |
2357 | 2487 | |
2358 | | - def listFiles(self, runner): |
| 2488 | + def listOutputFiles(self, runner): |
2359 | 2489 | return ["all-titles-in-ns0.gz"] |
2360 | 2490 | |
2361 | 2491 | |
2362 | | -def findAndLockNextWiki(config): |
| 2492 | +def findAndLockNextWiki(config, locksEnabled): |
2363 | 2493 | if config.halt: |
2364 | 2494 | print "Dump process halted by config." |
2365 | 2495 | return None |
— | — | @@ -2371,13 +2501,17 @@ |
2372 | 2502 | for db in next: |
2373 | 2503 | wiki = WikiDump.Wiki(config, db) |
2374 | 2504 | try: |
2375 | | - wiki.lock() |
| 2505 | + if (locksEnabled): |
| 2506 | + wiki.lock() |
2376 | 2507 | return wiki |
2377 | 2508 | except: |
2378 | 2509 | print "Couldn't lock %s, someone else must have got it..." % db |
2379 | 2510 | continue |
2380 | 2511 | return None |
2381 | 2512 | |
| 2513 | +def xmlEscape(text): |
| 2514 | + return text.replace("&", "&").replace("<", "<").replace(">", ">"); |
| 2515 | + |
2382 | 2516 | def usage(message = None): |
2383 | 2517 | if message: |
2384 | 2518 | print message |
— | — | @@ -2425,10 +2559,11 @@ |
2426 | 2560 | log = None |
2427 | 2561 | htmlNotice = "" |
2428 | 2562 | dryrun = False |
| 2563 | + chunkToDo = False |
2429 | 2564 | |
2430 | 2565 | try: |
2431 | 2566 | (options, remainder) = getopt.gnu_getopt(sys.argv[1:], "", |
2432 | | - ['date=', 'job=', 'configfile=', 'addnotice=', 'delnotice', 'force', 'dryrun', 'noprefetch', 'nospawn', 'restartfrom', 'log']) |
| 2567 | + ['date=', 'job=', 'configfile=', 'addnotice=', 'delnotice', 'force', 'dryrun', 'noprefetch', 'nospawn', 'restartfrom', 'log', 'chunk=' ]) |
2433 | 2568 | except: |
2434 | 2569 | usage("Unknown option specified") |
2435 | 2570 | |
— | — | @@ -2437,6 +2572,8 @@ |
2438 | 2573 | date = val |
2439 | 2574 | elif opt == "--configfile": |
2440 | 2575 | configFile = val |
| 2576 | + elif opt == '--chunk': |
| 2577 | + chunkToDo = int(val) |
2441 | 2578 | elif opt == "--force": |
2442 | 2579 | forceLock = True |
2443 | 2580 | elif opt == "--noprefetch": |
— | — | @@ -2455,7 +2592,7 @@ |
2456 | 2593 | htmlNotice = val |
2457 | 2594 | elif opt == "--delnotice": |
2458 | 2595 | htmlNotice = False |
2459 | | - |
| 2596 | + |
2460 | 2597 | if dryrun and (len(remainder) == 0): |
2461 | 2598 | usage("--dryrun requires the name of a wikidb to be specified") |
2462 | 2599 | if jobRequested and (len(remainder) == 0): |
— | — | @@ -2464,6 +2601,10 @@ |
2465 | 2602 | usage("--force cannot be used with --job option") |
2466 | 2603 | if (restart and not jobRequested): |
2467 | 2604 | usage("--restartfrom requires --job and the job from which to restart") |
| 2605 | + if (chunkToDo and not jobRequested): |
| 2606 | + usage("--chunk option requires a specific job for which to rerun that chunk") |
| 2607 | + if (chunkToDo and restart): |
| 2608 | + usage("--chunk option can be specified only for one specific job") |
2468 | 2609 | |
2469 | 2610 | # allow alternate config file |
2470 | 2611 | if (configFile): |
— | — | @@ -2471,6 +2612,11 @@ |
2472 | 2613 | else: |
2473 | 2614 | config = WikiDump.Config() |
2474 | 2615 | |
| 2616 | + if dryrun or chunkToDo or (jobRequested and not restart): |
| 2617 | + locksEnabled = False |
| 2618 | + else: |
| 2619 | + locksEnabled = True |
| 2620 | + |
2475 | 2621 | if dryrun: |
2476 | 2622 | print "***" |
2477 | 2623 | print "Dry run only, no files will be updated." |
— | — | @@ -2478,22 +2624,20 @@ |
2479 | 2625 | |
2480 | 2626 | if len(remainder) > 0: |
2481 | 2627 | wiki = WikiDump.Wiki(config, remainder[0]) |
2482 | | - if not dryrun: |
2483 | | - # if we are doing one piece only of the dump, we don't try to grab a lock |
2484 | | - # unless told to. |
| 2628 | + if locksEnabled: |
2485 | 2629 | if forceLock and wiki.isLocked(): |
2486 | 2630 | wiki.unlock() |
2487 | | - if restart or not jobRequested: |
| 2631 | + if locksEnabled: |
2488 | 2632 | wiki.lock() |
2489 | 2633 | |
2490 | 2634 | else: |
2491 | | - wiki = findAndLockNextWiki(config) |
| 2635 | + wiki = findAndLockNextWiki(config, locksEnabled) |
2492 | 2636 | |
2493 | 2637 | if wiki: |
2494 | 2638 | # process any per-project configuration options |
2495 | 2639 | config.parseConfFilePerProject(wiki.dbName) |
2496 | 2640 | |
2497 | | - runner = Runner(wiki, date, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging) |
| 2641 | + runner = Runner(wiki, date, prefetch, spawn, jobRequested, restart, htmlNotice, dryrun, enableLogging, chunkToDo) |
2498 | 2642 | if (restart): |
2499 | 2643 | print "Running %s, restarting from job %s..." % (wiki.dbName, jobRequested) |
2500 | 2644 | elif (jobRequested): |
— | — | @@ -2502,9 +2646,8 @@ |
2503 | 2647 | print "Running %s..." % wiki.dbName |
2504 | 2648 | runner.run() |
2505 | 2649 | # if we are doing one piece only of the dump, we don't unlock either |
2506 | | - if not dryrun: |
2507 | | - if restart or not jobRequested: |
2508 | | - wiki.unlock() |
| 2650 | + if locksEnabled: |
| 2651 | + wiki.unlock() |
2509 | 2652 | else: |
2510 | 2653 | print "No wikis available to run." |
2511 | 2654 | finally: |