Index: branches/ariel/xmldumps-backup/pagerange.py |
— | — | @@ -0,0 +1,351 @@ |
| 2 | +import getopt |
| 3 | +import os |
| 4 | +import re |
| 5 | +import sys |
| 6 | +import time |
| 7 | +import WikiDump |
| 8 | +import bz2 |
| 9 | +import worker |
| 10 | +import CommandManagement |
| 11 | + |
| 12 | +from CommandManagement import CommandPipeline, CommandSeries, CommandsInParallel |
| 13 | +from worker import Runner |
| 14 | + |
| 15 | +class PageRange(object): |
| 16 | + """Methods for getting number of revisions per page, |
| 17 | + estimating how many revisions a consecutive number of pages contains |
| 18 | + given a certain starting page ID, estimating how long it will take |
| 19 | + to retrieve a certain number of revisions starting from a certain |
| 20 | + revisionID |
| 21 | + We use this for splitting up history runs into small chunks to be run in |
| 22 | + parallel, with each job taking roughly the same length of time. |
| 23 | + Note that doing a straight log curve fit doesn't work; it's got to be done |
| 24 | + by approximation. |
| 25 | + Speed of retrieval of revisions depends on revision size and whether it's |
| 26 | + prefetchable (in the previous dump's file) or must be retrieved from the |
| 27 | + external store (database query).""" |
| 28 | + def __init__(self, dbname, config): |
| 29 | + """Arguments: |
| 30 | + dbname -- the name of the database we are dumping |
| 31 | + config -- this is the general config context used by the runner class.""" |
| 32 | + self._dbname = dbname |
| 33 | + self._config = config |
| 34 | + self._totalPages = None |
| 35 | + # this is the number of pages we typically poll when we get estimated revs per page |
| 36 | + # at some pageID |
| 37 | + self._sampleSize = 500 |
| 38 | + self.getNumPagesInDB() |
| 39 | + |
| 40 | + def getNumPagesInDB(self): |
| 41 | + pipeline = [] |
| 42 | + pipeline.append([ "echo", '$dbr = wfGetDB( DB_SLAVE ); $count = $dbr->selectField( "page", "max(page_id)", false ); if ( intval($count) > 0 ) { echo intval($count); }' ]) |
| 43 | + pipeline.append([ '%s' % self._config.php, '%s/maintenance/eval.php' % config.wikiDir , '%s' % self._dbname]) |
| 44 | + p = CommandPipeline(pipeline, quiet=True) |
| 45 | + p.runPipelineAndGetOutput() |
| 46 | + if not p.exitedSuccessfully(): |
| 47 | + print "DEBUG: serious error encountered (1)" |
| 48 | + return None |
| 49 | + output = p.output() |
| 50 | + output = output.rstrip('\n') |
| 51 | + if (output != ''): |
| 52 | + self._totalPages = int(output) |
| 53 | + return output |
| 54 | + |
| 55 | + def runDBQueryAndGetOutput(self,query): |
| 56 | + pipeline = [] |
| 57 | + pipeline.append( query ) |
| 58 | + pipeline.append([ '%s' % self._config.php, '%s/maintenance/eval.php' % self._config.wikiDir, '%s' % self._dbname]) |
| 59 | + p = CommandPipeline(pipeline, quiet=True) |
| 60 | + p.runPipelineAndGetOutput() |
| 61 | + output = p.output().rstrip('\n') |
| 62 | + return(output) |
| 63 | + |
| 64 | + def estimateNumRevsPerPage(self, pageID): |
| 65 | + """Get the number of revisions for self._sampleSize pages starting at a given pageID. |
| 66 | + Returns an estimated number of revisions per page based on this. |
| 67 | + This assumes that the older pages (lower page ID) have |
| 68 | + generally more revisions.""" |
| 69 | + |
| 70 | + pageRangeStart = str(pageID) |
| 71 | + pageRangeEnd = str(pageID + self._sampleSize) |
| 72 | + query = [ "echo", '$dbr = wfGetDB( DB_SLAVE ); $count = $dbr->selectField( "revision", "COUNT(distinct(rev_page))", array( "rev_page < ' + pageRangeEnd + '", "rev_page >= ' + pageRangeStart + '" ) ); echo $count;' ] |
| 73 | +# query = [ "echo", '$dbr = wfGetDB( DB_SLAVE ); $result = $dbr->query( "SELECT COUNT(*) FROM ( SELECT "revision", "COUNT(*)", array( "rev_page < ' + pageRangeEnd + '", "rev_page >= ' + pageRangeStart + '" ) ); echo $count;' ] |
| 74 | + print query |
| 75 | + rowCount = self.runDBQueryAndGetOutput(query) |
| 76 | + rowCount = int(rowCount) |
| 77 | + if (rowCount > 0): |
| 78 | +# limit = str(rowCount*9/10) |
| 79 | + limit = str(rowCount) |
| 80 | + print "there were %s many rows in the request, getting %s" % (rowCount, limit) |
| 81 | + queryString = "select avg(a.cnt) as avgcnt from (select count(rev_page) as cnt from revision where rev_page >=" + pageRangeStart + " and rev_page < " + pageRangeEnd + " group by rev_page order by cnt asc limit " + limit + ") as a;" |
| 82 | + query = [ "echo", '$dbr = wfGetDB( DB_SLAVE ); $res = $dbr->query( "' + queryString + '" ); if ($res && $dbr->numRows( $res ) > 0) { while( $row = $dbr->fetchObject( $res ) ) { echo $row->avgcnt; } }' ] |
| 83 | + average = self.runDBQueryAndGetOutput(query) |
| 84 | + if (average): |
| 85 | + revsPerPage = int(round(float(average))) |
| 86 | + if (revsPerPage == 0): |
| 87 | + revsPerPage = 1 |
| 88 | + print "got average of %s revs per page at pageid %s" % (revsPerPage, pageID) |
| 89 | + print "that amounts to ", revsPerPage * self._sampleSize |
| 90 | + return (revsPerPage, rowCount) |
| 91 | + return (None, None) |
| 92 | + |
| 93 | + def getEstimatedRevsForIntervalFromEndpoints(self, revsPage0, revsPage1, pageID0, pageID1, pagesP0, pagesP1): |
| 94 | + """given the revs per page estimate at each endpoint, and the number of undeleted pages out of the sample size |
| 95 | + (self._sampleSize is the sample size), figure out the estimated revs for the interval""" |
| 96 | + |
| 97 | + estimatedPagesInInterval = self.getEstimatedUndeletedPagesInInterval(pageID0, pageID1, pagesP0, pagesP1) |
| 98 | + # now we can get a notion of how many revs might be in the interval |
| 99 | + |
| 100 | + # FIXME is this always what we want, is the average? or do we want it form the greater endpoint? or what? |
| 101 | + estimatedRevsInInterval = abs((revsPage0 + revsPage1) * estimatedPagesInInterval /2) |
| 102 | + return(estimatedRevsInInterval) |
| 103 | + |
| 104 | + def getEstimatedUndeletedPagesInInterval(self, pageID0, pageID1, pagesP0, pagesP1): |
| 105 | + """given the number of undeleted pages at both P0 and P1 for our standard sample size self._sampleSize, |
| 106 | + guesstimate the number of undeleted pages for the whole interval""" |
| 107 | + # guess at number of undeleted pages in the interval |
| 108 | + estimatedPagesInInterval = int(round((pageID1 - pageID0) * (pagesP0 + pagesP1)/(2*self._sampleSize))) |
| 109 | + print "estimatedPagesInInterval %s" % estimatedPagesInInterval |
| 110 | + return(estimatedPagesInInterval) |
| 111 | + |
| 112 | + def checkEstimatedRevsAgainstErrorMargin(self, revsPage0, revsPage1, pageID0, pageID1, pagesP0, pagesP1, errorMargin): |
| 113 | + """Decide if the estimated number of revs is within our |
| 114 | + margin of error""" |
| 115 | + |
| 116 | + # fixme call the previous function to do this |
| 117 | + |
| 118 | + # guess at number of undeleted pages in the interval |
| 119 | + estimatedPagesInInterval = int(round((pageID1 - pageID0) * (pagesP0 + pagesP1)/self._sampleSize)) |
| 120 | + # now we can get a notion of how many revs might be in the interval |
| 121 | + if abs((revsPage0 - revsPage1) * estimatedPagesInInterval) < errorMargin: |
| 122 | + return True |
| 123 | + return False |
| 124 | + |
| 125 | + def getErrorMarginForInterval(self, pageIDStart, pageIDEnd, maxRevs = None): |
| 126 | + if (not maxRevs): |
| 127 | + errorMargin = round((pageIDEnd - pageIDStart)*5/100) |
| 128 | + else: |
| 129 | + # get three samples, take the min revs per page, |
| 130 | + # guess based on that where pageIDEnd ought to be as a max cutoff, |
| 131 | + # set error margin from that |
| 132 | + |
| 133 | + intervalSize = pageIDEnd - pageIDStart |
| 134 | + print "%s %s %s" % (pageIDStart, pageIDStart + intervalSize/10, pageIDStart + intervalSize/5) |
| 135 | + (sample1, pcount1) = self.estimateNumRevsPerPage(pageIDStart) |
| 136 | + (sample2, pcount2) = self.estimateNumRevsPerPage(round(pageIDStart + intervalSize/10)) |
| 137 | + (sample3, pcount3) = self.estimateNumRevsPerPage(round(pageIDStart + intervalSize/5)) |
| 138 | + print "%s/%s, %s/%s, %s/%s" % (sample1, pcount1, sample2, pcount2, sample3, pcount3) |
| 139 | + |
| 140 | + if not sample1 or not sample2 or not sample3: |
| 141 | + errorMargin = round((pageIDEnd - pageIDStart)*5/100) |
| 142 | + else: |
| 143 | + revsPerPage = min(sample1,sample2,sample3) |
| 144 | + undeletedPagesPerSample = min(pcount1, pcount2, pcount3) |
| 145 | + newIntervalSize = (maxRevs/revsPerPage)*(undeletedPagesPerSample/self._sampleSize) |
| 146 | + newPageIDEnd = pageIDStart + newIntervalSize |
| 147 | + if newPageIDEnd < pageIDEnd: |
| 148 | + newPageIDEnd = pageIDEnd |
| 149 | + errorMargin = round((newPageIDEnd - pageIDStart)*5/100) |
| 150 | + if errorMargin < self._sampleSize: |
| 151 | + errorMargin = sampleSize |
| 152 | + |
| 153 | + def estimateNumRevsForPageRange(self, pageIDStart, pageIDEnd, maxRevs = None): |
| 154 | + """estimate the cumulative number of revisions for a given page interval. |
| 155 | + if the parameter maxRevs is supplied, stop when we get to that point |
| 156 | + (within margin of error of it anyways). |
| 157 | + return (revisions, page id of upper end of interval)""" |
| 158 | + |
| 159 | + # error margin has to make sense given the interval size; too small and we will never |
| 160 | + # get an estimate that meets it, too large and our estimate will have no value |
| 161 | + |
| 162 | + errorMargin = self.getErrorMarginForInterval(pageIDStart, pageIDEnd, maxRevs) |
| 163 | + |
| 164 | + print "pageIDEnd is %s" % pageIDEnd |
| 165 | + |
| 166 | + if (pageIDEnd + self._sampleSize > self._totalPages): |
| 167 | + pageIDEnd = self._totalPages - self._sampleSize |
| 168 | + else: |
| 169 | + print "pageIDEnd + self._sampleSize < self._totalPages", pageIDEnd + self._sampleSize, self._totalPages |
| 170 | + |
| 171 | + if (pageIDEnd < pageIDStart): |
| 172 | + pageIDEnd = pageIDStart |
| 173 | + |
| 174 | + print "estimateNumRevsForPageRange:", pageIDStart, pageIDEnd, self._totalPages |
| 175 | + if (pageIDEnd - pageIDStart) < self._sampleSize: |
| 176 | + # just take the estimate for revs at pageIDStart, call it good |
| 177 | + print "estimateNumRevsForPageRange: initial pageend is close enough to pagestart to quit" |
| 178 | + (estimate, pages) = self.estimateNumRevsPerPage(pageIDStart) |
| 179 | + if (estimate): |
| 180 | + return (estimate*pages,pageIDStart+self._sampleSize) |
| 181 | + else: |
| 182 | + return(None, None) |
| 183 | + (estimateP0, pagesP0) = self.estimateNumRevsPerPage(pageIDStart) |
| 184 | + if (not estimateP0): |
| 185 | + return (None, None) |
| 186 | + if estimateP0 * pagesP0 > maxRevs: |
| 187 | + # we're already over. too bad, report it back, |
| 188 | + # we just don't do fine grained enough estimates for this case whatever it is |
| 189 | + return (estimateP0 * pagesP0,pageIDStart+self._sampleSize) |
| 190 | + else: |
| 191 | + print "estimateP0 %s is less than maxRevs %s" % (estimateP0, maxRevs) |
| 192 | + |
| 193 | + (estimatePN, pagesPN) = self.estimateNumRevsPerPage(pageIDEnd) |
| 194 | + if (not estimatePN): |
| 195 | + return (None, None) |
| 196 | + |
| 197 | + # fixme put these comments somewhere useful |
| 198 | + # on the one hand we want revs per page |
| 199 | + # on the other hand we want pages not deleted out of the 500, these are both useful numbers |
| 200 | + |
| 201 | + if self.checkEstimatedRevsAgainstErrorMargin(estimateP0, estimatePN, pagesP0, pagesPN, pageIDStart, pageIDEnd, errorMargin): |
| 202 | + print "estimateNumRevsForPageRange: our first two estimates are close enough together to quit" |
| 203 | + print "they are %s and %s for page ids %s and %s respectively" % (estimateP0, estimatePN, pageIDStart, pageIDEnd) |
| 204 | + return (estimateP0*(pageIDEnd - pageIDStart), pageIDEnd) |
| 205 | + # main loop, here's where we have to do real work |
| 206 | + pageIDTemp = pageIDEnd |
| 207 | + tempMargin = errorMargin |
| 208 | + numintervals = 1 |
| 209 | + |
| 210 | + i=0 # debug |
| 211 | + |
| 212 | + while True: |
| 213 | + i = i + 1 # debug |
| 214 | + tempMargin = tempMargin/2 |
| 215 | + # FIXME this means that our final estimate may be outside the error margin |
| 216 | + if (tempMargin < 1): |
| 217 | + tempMargin = 1 |
| 218 | + pageIDTemp = (pageIDTemp - pageIDStart)/2 + pageIDStart |
| 219 | + if pageIDTemp - self._sampleSize> self._totalPages: |
| 220 | + pageIDTemp = self._totalPages - self._sampleSize |
| 221 | + if pageIDTemp < pageIDStart: |
| 222 | + # FIXME we really need to do something more with this case... |
| 223 | + pageIDTemp = pageIDStart |
| 224 | + |
| 225 | + numIntervals = numintervals *2 |
| 226 | + (estimateP0, pagesP0) = self.estimateNumRevsPerPage(pageIDStart) |
| 227 | + (estimatePN, pagesPN) = self.estimateNumRevsPerPage(pageIDTemp) |
| 228 | + if (not estimateP0 or not estimatePN): |
| 229 | + return (None, None) |
| 230 | + # the "distance less than self._sampleSize" clause is just a catchall in case the slope of the |
| 231 | + # curve is so steep that we can't get a good estimate within the margin of error... |
| 232 | + # in which case we have the absolute number (revs for 500 pages at p0) and we use it |
| 233 | + if self.checkEstimatedRevsAgainstErrorMargin(estimateP0, estimatePN, pageIDStart, pageIDTemp, pagesP0, pagesPN, tempMargin) or (pageIDTemp - pageIDStart < self._sampleSize): |
| 234 | + print "estimateNumRevsForPageRange: estimate of 1st interval close enough on %sth iteration for estimates %s and %s at %s and %s, tempmargin %s" %( i, estimateP0, estimatePN, pageIDStart, pageIDTemp, tempMargin) |
| 235 | + step = pageIDTemp - pageIDStart |
| 236 | + if (step < self._sampleSize): |
| 237 | + step = self._sampleSize |
| 238 | + (estimatePI1, pagesPI1) = self.estimateNumRevsPerPage(pageIDStart) |
| 239 | + if (not estimatePI1): |
| 240 | + return (None, None) |
| 241 | + totalEstimate = 0 |
| 242 | + print "have estimate %s at %s (pageIDStart)" % (estimatePI1, pageIDStart) |
| 243 | + |
| 244 | + pageI = pageIDStart |
| 245 | + while (pageI <= pageIDEnd): |
| 246 | + (estimatePI2, pagesPI2) = self.estimateNumRevsPerPage(pageI+step) |
| 247 | + if (not estimatePI2): |
| 248 | + return None |
| 249 | + print "have estimate %s at %s, step is %s and we added it to %s" % (estimatePI2, pageI+step, step, pageI) |
| 250 | + print "*******estimatePI1, estimatePI2, pageI, pageI+step, pagesPI1, pagesPI2:", estimatePI1, estimatePI2, pageI, pageI+step, pagesPI1, pagesPI2 |
| 251 | + estimate = self.getEstimatedRevsForIntervalFromEndpoints(estimatePI1, estimatePI2, pageI, pageI+step, pagesPI1, pagesPI2) |
| 252 | + |
| 253 | + if (maxRevs): |
| 254 | + # FIXME do we know we are within the margin of error? ummmm |
| 255 | + |
| 256 | + if (totalEstimate + estimate > maxRevs): |
| 257 | + print "about to return with totalEstimate %s + estimate %s > maxRevs %s" % (totalEstimate, estimate, maxRevs) |
| 258 | + if (totalEstimate): |
| 259 | + return (totalEstimate, pageI) |
| 260 | + else: |
| 261 | + return (estimate, pageI) |
| 262 | + |
| 263 | + # since the number of revs decreases as the page id increases, |
| 264 | + # eventually our interval size can get larger too and still |
| 265 | + # keep us within the margin of error |
| 266 | + |
| 267 | + undeletedPagesInInterval = self.getEstimatedUndeletedPagesInInterval(pageI, pageI+step, pagesPI1, pagesPI2) |
| 268 | + if (estimatePI2 == estimatePI1): |
| 269 | + multiplier = 2 |
| 270 | + else: |
| 271 | + multiplier = int( abs ( tempMargin/( (estimatePI2*undeletedPagesInInterval) - (estimatePI1*undeletedPagesInInterval) ) ) ) |
| 272 | + |
| 273 | + # check if this makes sense with the multiplier, is that really how we get it? |
| 274 | + if multiplier > 1: |
| 275 | + print "got multiplier %s from tempMargin %s and estimateP1 %s, estimateP2 %s, undelPagesInInterval %s, step currently %s" % (multiplier, tempMargin, estimatePI1, estimatePI2, undeletedPagesInInterval, step) |
| 276 | + step = step * multiplier |
| 277 | + print "step now adjusted to %s" % step |
| 278 | + # FIXME is this right? |
| 279 | + tempMargin = tempMargin * multiplier |
| 280 | + # redo the estimate so it matches the new step size I guess |
| 281 | + estimate = self.getEstimatedRevsForIntervalFromEndpoints(estimatePI1, estimatePI2, pageI, pageI+step, pagesPI1, pagesPI2) |
| 282 | + |
| 283 | + print "*******added %s to totalestimate %s for %s" % ( estimate, totalEstimate, totalEstimate + estimate ) |
| 284 | + totalEstimate = totalEstimate + estimate |
| 285 | + |
| 286 | + estimatePI1 = estimatePI2 |
| 287 | + pageI = pageI + step |
| 288 | + pagesPI1 = pagesPI2 |
| 289 | + |
| 290 | + return (totalEstimate, pageIDEnd) |
| 291 | + |
| 292 | + def getPageEndForNumRevsFromPageStart(self, pageIDStart, numRevs): |
| 293 | + """given a starting pageID and the number of revs we want |
| 294 | + in the page range, find an ending page ID so that the cumulative number of revs |
| 295 | + for the pages in that interval is around (and less than) numRevs""" |
| 296 | + if not self._totalPages: |
| 297 | + print "getPageEndForNumRevsFromPageStart: calling getNumPagesInDB" |
| 298 | + if not self.getNumPagesInDB(): |
| 299 | + # something wrong with this db or the server or... anyways, we bail |
| 300 | + return None |
| 301 | + |
| 302 | + # can't have more revs than pages. well actually |
| 303 | + # we can since some pages might have been deleted and then |
| 304 | + # their ids are no longer in the page table. But how are |
| 305 | + # the odds that there will be a bunch of those *and* that there |
| 306 | + # will be less pages than that with more than one revision? |
| 307 | + # so screw it. |
| 308 | + pageIDMax = pageIDStart + numRevs |
| 309 | + print "getPageEndForNumRevsFromPageStart: got total pages %s" % self._totalPages |
| 310 | + if pageIDMax > self._totalPages: |
| 311 | + pageIDMax = self._totalPages |
| 312 | + (estimatedRevs, pageIDEnd) = self.estimateNumRevsForPageRange(pageIDStart,pageIDMax,numRevs) |
| 313 | + return(pageIDEnd) |
| 314 | + |
| 315 | +if __name__ == "__main__": |
| 316 | + |
| 317 | + config = WikiDump.Config() |
| 318 | + testchunks = PageRange('enwiki',config) |
| 319 | + # test with 5000 error margin |
| 320 | + wiki = WikiDump.Wiki(config, 'enwiki') |
| 321 | + date = None |
| 322 | + checkpoint = None |
| 323 | + prefetch = False |
| 324 | + spawn = False |
| 325 | + jobRequested = None |
| 326 | +# runner = Runner(wiki, date, checkpoint, prefetch, spawn, jobRequested) |
| 327 | + |
| 328 | + numPages = testchunks.getNumPagesInDB() |
| 329 | + if not numPages: |
| 330 | + print ">>>>>>>>>failed to retrieve number of pages for db elwikidb" |
| 331 | + else: |
| 332 | + print ">>>>>>>>>total number of pages: %s" % numPages |
| 333 | + |
| 334 | +# pageIDStart = 20000 |
| 335 | +# pageIDEnd = 110000 |
| 336 | + pageIDStart = 2000000 |
| 337 | + pageIDEnd = 3000000 |
| 338 | + maxNumRevs = 2500000 |
| 339 | +# maxNumRevs = 25000 |
| 340 | +# (revcount, pageID) = testchunks.estimateNumRevsForPageRange(pageIDStart, pageIDEnd, maxNumRevs) |
| 341 | +# print ">>>>>>>>>got revcount", revcount, "for range (%s, %s) ending now at %s" % ( pageIDStart, pageIDEnd, pageID) |
| 342 | + |
| 343 | + maxNumRevs = 30000000 |
| 344 | + pageIDStart = 1 |
| 345 | + pageIDEnd = 2969038 |
| 346 | + endpageid = 0 |
| 347 | + while (endpageid < numPages): |
| 348 | + endpageid = testchunks.getPageEndForNumRevsFromPageStart(pageIDStart, maxNumRevs) |
| 349 | + print ">>>>>>>>>we think that starting from", pageIDStart, "if you go to about ", endpageid, "you get around ", maxNumRevs, "revisions and not more than that (not more? really?)" |
| 350 | + pageIDStart = endpageid |
| 351 | + |
| 352 | + |
Property changes on: branches/ariel/xmldumps-backup/pagerange.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 353 | + native |