Index: branches/ariel/xmldumps-backup/worker.py |
— | — | @@ -13,6 +13,7 @@ |
14 | 14 | import stat |
15 | 15 | import signal |
16 | 16 | import errno |
| 17 | +import glob |
17 | 18 | import WikiDump |
18 | 19 | import CommandManagement |
19 | 20 | |
— | — | @@ -1397,6 +1398,7 @@ |
1398 | 1399 | self._prefetch = prefetch |
1399 | 1400 | self._spawn = spawn |
1400 | 1401 | self._chunks = chunks |
| 1402 | + self._pageID = {} |
1401 | 1403 | |
1402 | 1404 | def detail(self): |
1403 | 1405 | """Optionally return additional text to appear under the heading.""" |
— | — | @@ -1451,13 +1453,17 @@ |
1452 | 1454 | |
1453 | 1455 | # Try to pull text from the previous run; most stuff hasn't changed |
1454 | 1456 | #Source=$OutputDir/pages_$section.xml.bz2 |
| 1457 | + sources = [] |
1455 | 1458 | if self._prefetch: |
1456 | | - source = self._findPreviousDump(runner, chunk) |
1457 | | - else: |
1458 | | - source = None |
1459 | | - if source and exists(source): |
| 1459 | + possibleSources = self._findPreviousDump(runner, chunk) |
| 1460 | + # if we have a list of more than one then we need to check existence for each and put them together in a string |
| 1461 | + for sourceFile in possibleSources: |
| 1462 | + if exists(sourceFile): |
| 1463 | + sources.append(sourceFile) |
| 1464 | + if (len(sources) > 0): |
| 1465 | + source = "bzip2:%s" % (";".join(sources) ) |
1460 | 1466 | runner.showRunnerState("... building %s %s XML dump, with text prefetch from %s..." % (self._subset, chunkinfo, source)) |
1461 | | - prefetch = "--prefetch=bzip2:%s" % (source) |
| 1467 | + prefetch = "--prefetch=%s" % (source) |
1462 | 1468 | else: |
1463 | 1469 | runner.showRunnerState("... building %s %s XML dump, no text prefetch..." % (self._subset, chunkinfo)) |
1464 | 1470 | prefetch = None |
— | — | @@ -1483,12 +1489,62 @@ |
1484 | 1490 | series = [ pipeline ] |
1485 | 1491 | return series |
1486 | 1492 | |
| 1493 | + # given filename, (assume bz2 compression) dig out the first page id in that file |
| 1494 | + def findFirstPageIDInFile(self, runner, fileName): |
| 1495 | + if (fileName in self._pageID): |
| 1496 | + return self._pageID[fileName] |
| 1497 | + pageID = None |
| 1498 | + pipeline = [] |
| 1499 | + uncompressionCommand = [ "%s" % runner.config.bzip2, "-dc", fileName ] |
| 1500 | + pipeline.append(uncompressionCommand) |
| 1501 | + # warning: we figure any header (<siteinfo>...</siteinfo>) is going to be less than 2000 lines! |
| 1502 | + head = runner.config.head |
| 1503 | + headEsc = shellEscape(head) |
| 1504 | + pipeline.append([ head, "-2000"]) |
| 1505 | + # without shell |
| 1506 | + p = CommandPipeline(pipeline, quiet=True) |
| 1507 | + p.runPipelineAndGetOutput() |
| 1508 | + if (p.output()): |
| 1509 | + pageData = p.output() |
| 1510 | + titleAndIDPattern = re.compile('<title>(?P<title>.+?)</title>\s*' + '<id>(?P<pageid>\d+?)</id>') |
| 1511 | + result = titleAndIDPattern.search(pageData) |
| 1512 | + if (result): |
| 1513 | + pageID = result.group('pageid') |
| 1514 | + self._pageID[fileName] = pageID |
| 1515 | + return(pageID) |
| 1516 | + |
| 1517 | + |
| 1518 | + def filenameHasChunk(self, filename, ext): |
| 1519 | + fileNamePattern = re.compile('.*pages-' + self._subset + '[0-9]+.xml.' + ext +'$') |
| 1520 | + if (fileNamePattern.match(filename)): |
| 1521 | + return True |
| 1522 | + else: |
| 1523 | + return False |
| 1524 | + |
| 1525 | + # taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net) |
| 1526 | + def sort_nicely(self, l): |
| 1527 | + """ Sort the given list in the way that humans expect. |
| 1528 | + """ |
| 1529 | + convert = lambda text: int(text) if text.isdigit() else text |
| 1530 | + alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ] |
| 1531 | + l.sort( key=alphanum_key ) |
| 1532 | + |
| 1533 | + |
| 1534 | + # this finds the content file or files from the first previous successful dump |
| 1535 | + # to be used as input ("prefetch") for this run. |
1487 | 1536 | def _findPreviousDump(self, runner, chunk = 0): |
1488 | 1537 | """The previously-linked previous successful dump.""" |
1489 | | - if (chunk): |
1490 | | - bzfileChunk = self._file("bz2", chunk) |
1491 | 1538 | bzfile = self._file("bz2") |
1492 | 1539 | if (chunk): |
| 1540 | + startPageID = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1 |
| 1541 | + if (len(self._chunks) > chunk): |
| 1542 | + endPageID = sum([ self._chunks[i] for i in range(0,chunk)]) |
| 1543 | + else: |
| 1544 | + endPageID = None |
| 1545 | + # we will look for the first chunk file, if it's there and the |
| 1546 | + # status of the job is ok then we will get the rest of the info |
| 1547 | + bzfileChunk = self._file("bz2", 1) |
| 1548 | + bzfileGlob = self._file("bz2", '[1-9]*') |
1493 | 1549 | currentChunk = realpath(runner.dumpDir.publicPath(bzfile)) |
1494 | 1550 | current = realpath(runner.dumpDir.publicPath(bzfile)) |
1495 | 1551 | dumps = runner.wiki.dumpDirs() |
— | — | @@ -1496,12 +1552,23 @@ |
1497 | 1553 | dumps.reverse() |
1498 | 1554 | for date in dumps: |
1499 | 1555 | base = runner.wiki.publicDir() |
1500 | | - # first see if the corresponding "chunk" file is there, if not we will accept |
1501 | | - # the whole dump as one file though it will be slower |
| 1556 | + # first see if a "chunk" file is there, if not we will accept |
| 1557 | + # using the the single file dump although it will be slower |
1502 | 1558 | possibles = [] |
1503 | 1559 | oldChunk = None |
| 1560 | + # scan all the existing chunk files and get the first page ID from each |
1504 | 1561 | if (chunk): |
1505 | 1562 | oldChunk = runner.dumpDir.buildPath(base, date, bzfileChunk) |
| 1563 | + oldGlob = runner.dumpDir.buildPath(base, date, bzfileGlob) |
| 1564 | + pageIDs = [] |
| 1565 | + bzfileChunks = glob.glob(oldGlob) |
| 1566 | + self.sort_nicely(bzfileChunks) |
| 1567 | + if (bzfileChunks): |
| 1568 | + for fileName in bzfileChunks: |
| 1569 | + pageID = self.findFirstPageIDInFile(runner, fileName ) |
| 1570 | + if (pageID): |
| 1571 | + pageIDs.append(pageID) |
| 1572 | + |
1506 | 1573 | old = runner.dumpDir.buildPath(base, date, bzfile) |
1507 | 1574 | if (oldChunk): |
1508 | 1575 | if exists(oldChunk): |
— | — | @@ -1523,7 +1590,31 @@ |
1524 | 1591 | runner.debug("skipping incomplete or failed dump for prefetch %s" % possible) |
1525 | 1592 | continue |
1526 | 1593 | runner.debug("Prefetchable %s" % possible) |
1527 | | - return possible |
| 1594 | + # found something workable, now check the chunk situation |
| 1595 | + if (chunk): |
| 1596 | + if (self.filenameHasChunk(possible, "bz2")): |
| 1597 | + if len(pageIDs) > 0: |
| 1598 | + for i in range(len(pageIDs)): |
| 1599 | + if int(pageIDs[i]) <= int(startPageID): |
| 1600 | + # chunk number of file starts at 1. |
| 1601 | + possibleStartNum = i+1 |
| 1602 | + else: |
| 1603 | + break; |
| 1604 | + if possibleStartNum: |
| 1605 | + possibleEndNum = possibleStartNum |
| 1606 | + for j in range(i,len(pageIDs)): |
| 1607 | + if (not endPageID) or (int(pageIDs[j]) <= int(endPageID)): |
| 1608 | + # chunk number of file starts at 1. |
| 1609 | + possibleEndNum = j + 1 |
| 1610 | + else: |
| 1611 | + break |
| 1612 | + # now we have the range of the relevant files, put together the list. |
| 1613 | + possible = [ runner.dumpDir.buildPath(base, date, self._file("bz2", k)) for k in range(possibleStartNum,possibleEndNum+1) ] |
| 1614 | + return possible |
| 1615 | + else: |
| 1616 | + continue |
| 1617 | + |
| 1618 | + return [ possible ] |
1528 | 1619 | runner.debug("Could not locate a prefetchable dump.") |
1529 | 1620 | return None |
1530 | 1621 | |