Index: branches/ariel/xmldumps-backup/worker.py |
— | — | @@ -102,8 +102,6 @@ |
103 | 103 | |
104 | 104 | if (self._chunksEnabled): |
105 | 105 | self.Stats = PageAndEditStats(wiki,dbName) |
106 | | - print "total",self.Stats.totalEdits |
107 | | - print "total2",self.Stats.totalPages |
108 | 106 | if (not self.Stats.totalEdits or not self.Stats.totalPages): |
109 | 107 | raise BackupError("Failed to get DB stats, exiting") |
110 | 108 | if (self._revsPerChunkHistory): |
— | — | @@ -289,7 +287,6 @@ |
290 | 288 | lines = results.splitlines() |
291 | 289 | if (lines and lines[1]): |
292 | 290 | self.totalPages = int(lines[1]) |
293 | | - print "totalpages here is ",self.totalPages |
294 | 291 | query = "select MAX(rev_id) from revision;" |
295 | 292 | retries = 0 |
296 | 293 | results = None |
— | — | @@ -304,7 +301,6 @@ |
305 | 302 | lines = results.splitlines() |
306 | 303 | if (lines and lines[1]): |
307 | 304 | self.totalEdits = int(lines[1]) |
308 | | - print "totaledits here is ",self.totalEdits |
309 | 305 | return(0) |
310 | 306 | |
311 | 307 | def getTotalPages(self): |
— | — | @@ -788,13 +784,14 @@ |
789 | 785 | """For one pipeline of commands, redirect output to a given file.""" |
790 | 786 | commands[-1].extend( [ ">" , outfile ] ) |
791 | 787 | series = [ commands ] |
792 | | - return self.runCommand([ series ]) |
| 788 | + return self.runCommand([ series ], callbackTimed = self.updateStatusFiles) |
793 | 789 | |
794 | 790 | # command series list: list of (commands plus args) is one pipeline. list of pipelines = 1 series. |
795 | 791 | # this function wants a list of series. |
796 | 792 | # be a list (the command name and the various args) |
797 | 793 | # If the shell option is true, all pipelines will be run under the shell. |
798 | | - def runCommand(self, commandSeriesList, callback=None, arg=None, shell = False): |
| 794 | + # callbackinterval: how often we will call callbackTimed (in milliseconds), defaults to every 5 secs |
| 795 | + def runCommand(self, commandSeriesList, callbackStderr=None, callbackStderrArg=None, callbackTimed=None, callbackTimedArg=None, shell = False, callbackInterval=5000): |
799 | 796 | """Nonzero return code from the shell from any command in any pipeline will cause this |
800 | 797 | function to print an error message and return 1, indictating error. |
801 | 798 | Returns 0 on success. |
— | — | @@ -806,7 +803,7 @@ |
807 | 804 | This function spawns multiple series of pipelines in parallel. |
808 | 805 | |
809 | 806 | """ |
810 | | - commands = CommandsInParallel(commandSeriesList, callback=callback, arg=arg, shell=shell) |
| 807 | + commands = CommandsInParallel(commandSeriesList, callbackStderr=callbackStderr, callbackStderrArg=callbackStderrArg, callbackTimed=callbackTimed, callbackTimedArg=callbackTimedArg, shell=shell, callbackInterval=callbackInterval) |
811 | 808 | commands.runCommands() |
812 | 809 | if commands.exitedSuccessfully(): |
813 | 810 | return 0 |
— | — | @@ -1366,7 +1363,8 @@ |
1367 | 1364 | retries = retries + 1 |
1368 | 1365 | time.sleep(5) |
1369 | 1366 | error = runner.saveTable(self._table, self._path(runner)) |
1370 | | - return error |
| 1367 | + if (error): |
| 1368 | + raise BackupError("error dumping table %s" % self._table) |
1371 | 1369 | |
1372 | 1370 | def listFiles(self, runner): |
1373 | 1371 | return [self._file()] |
— | — | @@ -1461,7 +1459,8 @@ |
1462 | 1460 | else: |
1463 | 1461 | series = self.buildCommand(runner) |
1464 | 1462 | commands.append(series) |
1465 | | - runner.runCommand(commands, callback=self.progressCallback, arg=runner) |
| 1463 | + result = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner) |
| 1464 | + return result |
1466 | 1465 | |
1467 | 1466 | class RecombineXmlStub(XmlStub): |
1468 | 1467 | def __init__(self, name, desc, chunks): |
— | — | @@ -1493,23 +1492,10 @@ |
1494 | 1493 | uncompressionCommand = [ "%s" % runner.config.gzip, "-dc" ] |
1495 | 1494 | recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand ) |
1496 | 1495 | recombineCommand = [ recombineCommandString ] |
1497 | | - recombinePipeline = CommandPipeline([ recombineCommand ], shell = True) |
1498 | | - recombinePipeline.startCommands() |
1499 | | - while True: |
1500 | | - # these commands don't produce any progress bar... so we can at least |
1501 | | - # update the size and last update time of the file once a minute |
1502 | | - signal.signal(signal.SIGALRM, self.waitAlarmHandler) |
1503 | | - signal.alarm(self.timeToWait()) |
1504 | | - try: |
1505 | | - recombinePipeline._lastProcessInPipe.wait() |
1506 | | - break |
1507 | | - except Exception, e: |
1508 | | - if e.errno == errno.EINTR: |
1509 | | - pass |
1510 | | - else: |
1511 | | - raise |
1512 | | - self.progressCallback(runner) |
1513 | | - signal.alarm(0) |
| 1496 | + recombinePipeline = [ recombineCommand ] |
| 1497 | + series = [ recombinePipeline ] |
| 1498 | + result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 1499 | + return result |
1514 | 1500 | |
1515 | 1501 | class XmlLogging(Dump): |
1516 | 1502 | """ Create a logging dump of all page activity """ |
— | — | @@ -1536,7 +1522,8 @@ |
1537 | 1523 | "--output=gzip:%s" % logging ] |
1538 | 1524 | pipeline = [ command ] |
1539 | 1525 | series = [ pipeline ] |
1540 | | - runner.runCommand([ series ], callback=self.progressCallback, arg=runner) |
| 1526 | + result = runner.runCommand([ series ], callbackStderr=self.progressCallback, callbackStderrArg=runner) |
| 1527 | + return result |
1541 | 1528 | |
1542 | 1529 | class XmlDump(Dump): |
1543 | 1530 | """Primary XML dumps, one section at a time.""" |
— | — | @@ -1571,7 +1558,8 @@ |
1572 | 1559 | else: |
1573 | 1560 | series = self.buildCommand(runner) |
1574 | 1561 | commands.append(series) |
1575 | | - return runner.runCommand(commands, callback=self.progressCallback, arg=runner) |
| 1562 | + result = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner) |
| 1563 | + return result |
1576 | 1564 | |
1577 | 1565 | def buildEta(self, runner): |
1578 | 1566 | """Tell the dumper script whether to make ETA estimate on page or revision count.""" |
— | — | @@ -1886,23 +1874,10 @@ |
1887 | 1875 | uncompressionCommand = [ "%s" % runner.config.bzip2, "-dc" ] |
1888 | 1876 | recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand ) |
1889 | 1877 | recombineCommand = [ recombineCommandString ] |
1890 | | - recombinePipeline = CommandPipeline([ recombineCommand ], shell = True) |
1891 | | - recombinePipeline.startCommands() |
1892 | | - while True: |
1893 | | - # these commands don't produce any progress bar... so we can at least |
1894 | | - # update the size and last update time of the file once a minute |
1895 | | - signal.signal(signal.SIGALRM, self.waitAlarmHandler) |
1896 | | - signal.alarm(self.timeToWait()) |
1897 | | - try: |
1898 | | - recombinePipeline._lastProcessInPipe.wait() |
1899 | | - break |
1900 | | - except Exception, e: |
1901 | | - if e.errno == errno.EINTR: |
1902 | | - pass |
1903 | | - else: |
1904 | | - raise |
1905 | | - self.progressCallback(runner) |
1906 | | - signal.alarm(0) |
| 1878 | + recombinePipeline = [ recombineCommand ] |
| 1879 | + series = [ recombinePipeline ] |
| 1880 | + result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 1881 | + return result |
1907 | 1882 | |
1908 | 1883 | class BigXmlDump(XmlDump): |
1909 | 1884 | """XML page dump for something larger, where a 7-Zip compressed copy |
— | — | @@ -1971,10 +1946,7 @@ |
1972 | 1947 | else: |
1973 | 1948 | series = self.buildCommand(runner) |
1974 | 1949 | commands.append(series) |
1975 | | - # FIXME don't we want callback? yes we do. *sigh* on each one of these, right? bleah |
1976 | | - # this means we have the alarm loop in here (while we do what, poll a lot?) and um |
1977 | | - # write out a progress bar regardless after 60 secs by looking at all the files etc. bleah |
1978 | | - result = runner.runCommand(commands, callback=self.progressCallback, arg=runner, shell = True) |
| 1950 | + result = runner.runCommand(commands, callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
1979 | 1951 | # temp hack force 644 permissions until ubuntu bug # 370618 is fixed - tomasz 5/1/2009 |
1980 | 1952 | # some hacks aren't so temporary - atg 3 sept 2010 |
1981 | 1953 | if (self._chunks): |
— | — | @@ -2032,23 +2004,10 @@ |
2033 | 2005 | |
2034 | 2006 | recombineCommandString = self.buildRecombineCommandString(runner, files, outputFile, compressionCommand, uncompressionCommand ) |
2035 | 2007 | recombineCommand = [ recombineCommandString ] |
2036 | | - recombinePipeline = CommandPipeline([ recombineCommand ], shell = True) |
2037 | | - recombinePipeline.startCommands() |
2038 | | - while True: |
2039 | | - # these commands don't produce any progress bar... so we can at least |
2040 | | - # update the size and last update time of the file once a minute |
2041 | | - signal.signal(signal.SIGALRM, self.waitAlarmHandler) |
2042 | | - signal.alarm(self.timeToWait()) |
2043 | | - try: |
2044 | | - recombinePipeline._lastProcessInPipe.wait() |
2045 | | - break |
2046 | | - except Exception, e: |
2047 | | - if e.errno == errno.EINTR: |
2048 | | - pass |
2049 | | - else: |
2050 | | - raise |
2051 | | - self.progressCallback(runner) |
2052 | | - signal.alarm(0) |
| 2008 | + recombinePipeline = [ recombineCommand ] |
| 2009 | + series = [ recombinePipeline ] |
| 2010 | + result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 2011 | + return result |
2053 | 2012 | |
2054 | 2013 | class AbstractDump(Dump): |
2055 | 2014 | """XML dump for Yahoo!'s Active Abstracts thingy""" |
— | — | @@ -2095,7 +2054,7 @@ |
2096 | 2055 | else: |
2097 | 2056 | series = self.buildCommand(runner) |
2098 | 2057 | commands.append(series) |
2099 | | - runner.runCommand(commands, callback=self.progressCallback, arg=runner) |
| 2058 | + runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner) |
2100 | 2059 | |
2101 | 2060 | def _variants(self, runner): |
2102 | 2061 | # If the database name looks like it's marked as Chinese language, |
— | — | @@ -2160,23 +2119,10 @@ |
2161 | 2120 | uncompressionCommand = [ "%s" % runner.config.cat ] |
2162 | 2121 | recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand, "<feed>" ) |
2163 | 2122 | recombineCommand = [ recombineCommandString ] |
2164 | | - recombinePipeline = CommandPipeline([ recombineCommand ], shell = True) |
2165 | | - recombinePipeline.startCommands() |
2166 | | - while True: |
2167 | | - # these commands don't produce any progress bar... so we can at least |
2168 | | - # update the size and last update time of the file once a minute |
2169 | | - signal.signal(signal.SIGALRM, self.waitAlarmHandler) |
2170 | | - signal.alarm(self.timeToWait()) |
2171 | | - try: |
2172 | | - recombinePipeline._lastProcessInPipe.wait() |
2173 | | - break |
2174 | | - except Exception, e: |
2175 | | - if e.errno == errno.EINTR: |
2176 | | - pass |
2177 | | - else: |
2178 | | - raise |
2179 | | - self.progressCallback(runner) |
2180 | | - signal.alarm(0) |
| 2123 | + recombinePipeline = [ recombineCommand ] |
| 2124 | + series = [ recombinePipeline ] |
| 2125 | + result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True) |
| 2126 | + return result |
2181 | 2127 | |
2182 | 2128 | class TitleDump(Dump): |
2183 | 2129 | """This is used by "wikiproxy", a program to add Wikipedia links to BBC news online""" |
Index: branches/ariel/xmldumps-backup/CommandManagement.py |
— | — | @@ -8,6 +8,7 @@ |
9 | 9 | import signal |
10 | 10 | import Queue |
11 | 11 | import thread |
| 12 | +import fcntl |
12 | 13 | |
13 | 14 | from os.path import dirname, exists, getsize, join, realpath |
14 | 15 | from subprocess import Popen, PIPE |
— | — | @@ -388,8 +389,9 @@ |
389 | 390 | the line of output, it should be passed in the arg parameter (and it will be passed |
390 | 391 | to the callback function first before the output line). If no callback is provided |
391 | 392 | and the individual pipelines are not provided with a file to save output, |
392 | | - then output is written to stderr.""" |
393 | | - def __init__(self, commandSeriesList, callback = None, arg=None, quiet = False, shell = False ): |
| 393 | + then output is written to stderr. |
| 394 | + Callbackinterval is in milliseconds, defaults is 20 seconds""" |
| 395 | + def __init__(self, commandSeriesList, callbackStderr = None, callbackStdout = None, callbackTimed = None, callbackStderrArg=None, callbackStdoutArg = None, callbackTimedArg = None, quiet = False, shell = False, callbackInterval = 20000 ): |
394 | 396 | self._commandSeriesList = commandSeriesList |
395 | 397 | self._commandSerieses = [] |
396 | 398 | for series in self._commandSeriesList: |
— | — | @@ -397,16 +399,20 @@ |
398 | 400 | # for each command series running in parallel, |
399 | 401 | # in cases where a command pipeline in the series generates output, the callback |
400 | 402 | # will be called with a line of output from the pipeline as it becomes available |
401 | | - self._callback = callback |
402 | | - self._arg = arg |
| 403 | + self._callbackStderr = callbackStderr |
| 404 | + self._callbackStdout = callbackStdout |
| 405 | + self._callbackTimed = callbackTimed |
| 406 | + self._callbackStderrArg = callbackStderrArg |
| 407 | + self._callbackStdoutArg = callbackStdoutArg |
| 408 | + self._callbackTimedArg = callbackTimedArg |
403 | 409 | self._commandSeriesQueue = Queue.Queue() |
404 | 410 | |
405 | 411 | # number millisecs we will wait for select.poll() |
406 | 412 | self._defaultPollTime = 500 |
407 | 413 | |
408 | | - # for programs that don't generate output, wait this many seconds between |
| 414 | + # for programs that don't generate output, wait this many milliseconds between |
409 | 415 | # invoking callback if there is one |
410 | | - self._defaultCallbackInterval = 20 |
| 416 | + self._defaultCallbackInterval = callbackInterval |
411 | 417 | |
412 | 418 | def startCommands(self): |
413 | 419 | for series in self._commandSerieses: |
— | — | @@ -415,77 +421,72 @@ |
416 | 422 | # one of these as a thread to monitor each command series. |
417 | 423 | def seriesMonitor(self, timeout, queue): |
418 | 424 | series = queue.get() |
419 | | - poller = select.poll() |
420 | 425 | while series.processProducingOutput(): |
421 | 426 | p = series.processProducingOutput() |
| 427 | + poller = select.poll() |
422 | 428 | poller.register(p.stderr,select.POLLIN|select.POLLPRI) |
| 429 | + fderr = p.stderr.fileno() |
| 430 | + flerr = fcntl.fcntl(fderr, fcntl.F_GETFL) |
| 431 | + fcntl.fcntl(fderr, fcntl.F_SETFL, flerr | os.O_NONBLOCK) |
423 | 432 | if (p.stdout): |
| 433 | + poller.register(p.stdout,select.POLLIN|select.POLLPRI) |
424 | 434 | fdToStream = { p.stdout.fileno(): p.stdout, p.stderr.fileno(): p.stderr } |
| 435 | + fdout = p.stdout.fileno() |
| 436 | + flout = fcntl.fcntl(fdout, fcntl.F_GETFL) |
| 437 | + fcntl.fcntl(fdout, fcntl.F_SETFL, flout | os.O_NONBLOCK) |
425 | 438 | else: |
426 | 439 | fdToStream = { p.stderr.fileno(): p.stderr } |
427 | | - # if we have a savefile, this won't be set. |
428 | | - if (p.stdout): |
429 | | - poller.register(p.stdout,select.POLLIN|select.POLLPRI) |
430 | 440 | |
431 | | - commandCompleted = False |
| 441 | + commandCompleted = False |
432 | 442 | |
433 | | - while not commandCompleted: |
434 | | - waiting = poller.poll(self._defaultPollTime) |
435 | | - if (waiting): |
436 | | - for (fd,event) in waiting: |
437 | | - series.inProgressPipeline().setPollState(event) |
438 | | - if series.inProgressPipeline().checkPollReadyForRead(): |
439 | | - |
440 | | - # so what happens if we get more than one line of output |
441 | | - # in the poll interval? it will sit there waiting... |
442 | | - # could have accumulation. FIXME. for our purposes we want |
443 | | - # one line only, the latest. but for other uses of this |
444 | | - # module? really we should read whatever is available only, |
445 | | - # pass it to callback, let callback handle multiple lines, |
446 | | - # partial lines etc. |
447 | | - # out = p.stdout.readline() |
448 | | - out = fdToStream[fd].readline() |
449 | | - if out: |
450 | | - if self._callback: |
451 | | - if (self._arg): |
452 | | - self._callback(self._arg, out) |
| 443 | + waited = 0 |
| 444 | + while not commandCompleted: |
| 445 | + waiting = poller.poll(self._defaultPollTime) |
| 446 | + if (waiting): |
| 447 | + for (fd,event) in waiting: |
| 448 | + series.inProgressPipeline().setPollState(event) |
| 449 | + if series.inProgressPipeline().checkPollReadyForRead(): |
| 450 | + out = os.read(fd,1024) |
| 451 | + if out: |
| 452 | + if fd == p.stderr.fileno(): |
| 453 | + if self._callbackStderr: |
| 454 | + if (self._callbackStderrArg): |
| 455 | + self._callbackStderr(self._callbackStderrArg, out) |
453 | 456 | else: |
454 | | - self._callback(out) |
| 457 | + self._callbackStderr(out) |
455 | 458 | else: |
456 | | - # fixme this behavior is different, do we want it? |
457 | 459 | sys.stderr.write(out) |
458 | | - else: |
459 | | - # possible eof? (empty string from readline) |
460 | | - pass |
461 | | - elif series.inProgressPipeline().checkForPollErrors(): |
462 | | - poller.unregister(fd) |
463 | | - p.wait() |
464 | | - # FIXME put the returncode someplace? |
465 | | - print "returned from %s with %s" % (p.pid, p.returncode) |
466 | | - commandCompleted = True |
467 | | - if commandCompleted: |
468 | | - break |
| 460 | + elif fd == p.stdout.fileno(): |
| 461 | + if self._callbackStdout: |
| 462 | + if (self._callbackStdoutArg): |
| 463 | + self._callbackStdout(self._callbackStdoutArg, out) |
| 464 | + else: |
| 465 | + self._callbackStdout(out) |
| 466 | + else: |
| 467 | + sys.stderr.write(out) |
| 468 | + else: |
| 469 | + # possible eof? what would cause this? |
| 470 | + pass |
| 471 | + elif series.inProgressPipeline().checkForPollErrors(): |
| 472 | + poller.unregister(fd) |
| 473 | + # FIXME if it closed prematurely and then runs for hours to completion |
| 474 | + # we will get no updates here... |
| 475 | + p.wait() |
| 476 | + # FIXME put the returncode someplace? |
| 477 | + print "returned from %s with %s" % (p.pid, p.returncode) |
| 478 | + commandCompleted = True |
469 | 479 | |
| 480 | + waited = waited + self._defaultPollTime |
| 481 | + if waited > self._defaultCallbackInterval and self._callbackTimed: |
| 482 | + if (self._callbackTimedArg): |
| 483 | + self._callbackTimed(self._callbackTimedArg) |
| 484 | + else: |
| 485 | + self._callbackTimed() |
| 486 | + waited = 0 |
470 | 487 | |
471 | | - # run next command in series, if any |
472 | | - series.continueCommands() |
| 488 | + # run next command in series, if any |
| 489 | + series.continueCommands() |
473 | 490 | |
474 | | - else: |
475 | | - # no output from this process, just wait for it and do callback if there is one |
476 | | - waited = 0 |
477 | | - while p.poll() == None: |
478 | | - if waited > self._defaultCallbackInterval and self._callback: |
479 | | - if (self._arg): |
480 | | - self._callback(self._arg) |
481 | | - else: |
482 | | - self._callback() |
483 | | - waited = 0 |
484 | | - time.sleep(1) |
485 | | - waited = waited + 1 |
486 | | - |
487 | | - print "returned from %s with %s" % (p.pid, p.returncode) |
488 | | - series.continueCommands() |
489 | | - |
490 | 491 | # completed the whole series. time to go home. |
491 | 492 | queue.task_done() |
492 | 493 | |
— | — | @@ -551,7 +552,7 @@ |
552 | 553 | series4 = [ pipeline5 ] |
553 | 554 | series5 = [ pipeline6 ] |
554 | 555 | parallel = [ series1, series2, series3, series4, series5 ] |
555 | | - commands = CommandsInParallel(parallel, callback=testcallback) |
| 556 | + commands = CommandsInParallel(parallel, callbackStdout=testcallback) |
556 | 557 | commands.runCommands() |
557 | 558 | if commands.exitedSuccessfully(): |
558 | 559 | print "w00t!" |