r84821 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r84820‎ | r84821 | r84822 >
Date:21:14, 26 March 2011
Author:ariel
Status:deferred
Tags:
Comment:
separate callbacks for stdout, stderr, or forced callback at intervals, use nonblocking read to get output, lose a few stray debugging messages, throw exception when table dump command fails
Modified paths:
  • /branches/ariel/xmldumps-backup/CommandManagement.py (modified) (history)
  • /branches/ariel/xmldumps-backup/worker.py (modified) (history)

Diff [purge]

Index: branches/ariel/xmldumps-backup/worker.py
@@ -102,8 +102,6 @@
103103
104104 if (self._chunksEnabled):
105105 self.Stats = PageAndEditStats(wiki,dbName)
106 - print "total",self.Stats.totalEdits
107 - print "total2",self.Stats.totalPages
108106 if (not self.Stats.totalEdits or not self.Stats.totalPages):
109107 raise BackupError("Failed to get DB stats, exiting")
110108 if (self._revsPerChunkHistory):
@@ -289,7 +287,6 @@
290288 lines = results.splitlines()
291289 if (lines and lines[1]):
292290 self.totalPages = int(lines[1])
293 - print "totalpages here is ",self.totalPages
294291 query = "select MAX(rev_id) from revision;"
295292 retries = 0
296293 results = None
@@ -304,7 +301,6 @@
305302 lines = results.splitlines()
306303 if (lines and lines[1]):
307304 self.totalEdits = int(lines[1])
308 - print "totaledits here is ",self.totalEdits
309305 return(0)
310306
311307 def getTotalPages(self):
@@ -788,13 +784,14 @@
789785 """For one pipeline of commands, redirect output to a given file."""
790786 commands[-1].extend( [ ">" , outfile ] )
791787 series = [ commands ]
792 - return self.runCommand([ series ])
 788+ return self.runCommand([ series ], callbackTimed = self.updateStatusFiles)
793789
794790 # command series list: list of (commands plus args) is one pipeline. list of pipelines = 1 series.
795791 # this function wants a list of series.
796792 # be a list (the command name and the various args)
797793 # If the shell option is true, all pipelines will be run under the shell.
798 - def runCommand(self, commandSeriesList, callback=None, arg=None, shell = False):
 794+ # callbackinterval: how often we will call callbackTimed (in milliseconds), defaults to every 5 secs
 795+ def runCommand(self, commandSeriesList, callbackStderr=None, callbackStderrArg=None, callbackTimed=None, callbackTimedArg=None, shell = False, callbackInterval=5000):
799796 """Nonzero return code from the shell from any command in any pipeline will cause this
800797 function to print an error message and return 1, indictating error.
801798 Returns 0 on success.
@@ -806,7 +803,7 @@
807804 This function spawns multiple series of pipelines in parallel.
808805
809806 """
810 - commands = CommandsInParallel(commandSeriesList, callback=callback, arg=arg, shell=shell)
 807+ commands = CommandsInParallel(commandSeriesList, callbackStderr=callbackStderr, callbackStderrArg=callbackStderrArg, callbackTimed=callbackTimed, callbackTimedArg=callbackTimedArg, shell=shell, callbackInterval=callbackInterval)
811808 commands.runCommands()
812809 if commands.exitedSuccessfully():
813810 return 0
@@ -1366,7 +1363,8 @@
13671364 retries = retries + 1
13681365 time.sleep(5)
13691366 error = runner.saveTable(self._table, self._path(runner))
1370 - return error
 1367+ if (error):
 1368+ raise BackupError("error dumping table %s" % self._table)
13711369
13721370 def listFiles(self, runner):
13731371 return [self._file()]
@@ -1461,7 +1459,8 @@
14621460 else:
14631461 series = self.buildCommand(runner)
14641462 commands.append(series)
1465 - runner.runCommand(commands, callback=self.progressCallback, arg=runner)
 1463+ result = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner)
 1464+ return result
14661465
14671466 class RecombineXmlStub(XmlStub):
14681467 def __init__(self, name, desc, chunks):
@@ -1493,23 +1492,10 @@
14941493 uncompressionCommand = [ "%s" % runner.config.gzip, "-dc" ]
14951494 recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand )
14961495 recombineCommand = [ recombineCommandString ]
1497 - recombinePipeline = CommandPipeline([ recombineCommand ], shell = True)
1498 - recombinePipeline.startCommands()
1499 - while True:
1500 - # these commands don't produce any progress bar... so we can at least
1501 - # update the size and last update time of the file once a minute
1502 - signal.signal(signal.SIGALRM, self.waitAlarmHandler)
1503 - signal.alarm(self.timeToWait())
1504 - try:
1505 - recombinePipeline._lastProcessInPipe.wait()
1506 - break
1507 - except Exception, e:
1508 - if e.errno == errno.EINTR:
1509 - pass
1510 - else:
1511 - raise
1512 - self.progressCallback(runner)
1513 - signal.alarm(0)
 1496+ recombinePipeline = [ recombineCommand ]
 1497+ series = [ recombinePipeline ]
 1498+ result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 1499+ return result
15141500
15151501 class XmlLogging(Dump):
15161502 """ Create a logging dump of all page activity """
@@ -1536,7 +1522,8 @@
15371523 "--output=gzip:%s" % logging ]
15381524 pipeline = [ command ]
15391525 series = [ pipeline ]
1540 - runner.runCommand([ series ], callback=self.progressCallback, arg=runner)
 1526+ result = runner.runCommand([ series ], callbackStderr=self.progressCallback, callbackStderrArg=runner)
 1527+ return result
15411528
15421529 class XmlDump(Dump):
15431530 """Primary XML dumps, one section at a time."""
@@ -1571,7 +1558,8 @@
15721559 else:
15731560 series = self.buildCommand(runner)
15741561 commands.append(series)
1575 - return runner.runCommand(commands, callback=self.progressCallback, arg=runner)
 1562+ result = runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner)
 1563+ return result
15761564
15771565 def buildEta(self, runner):
15781566 """Tell the dumper script whether to make ETA estimate on page or revision count."""
@@ -1886,23 +1874,10 @@
18871875 uncompressionCommand = [ "%s" % runner.config.bzip2, "-dc" ]
18881876 recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand )
18891877 recombineCommand = [ recombineCommandString ]
1890 - recombinePipeline = CommandPipeline([ recombineCommand ], shell = True)
1891 - recombinePipeline.startCommands()
1892 - while True:
1893 - # these commands don't produce any progress bar... so we can at least
1894 - # update the size and last update time of the file once a minute
1895 - signal.signal(signal.SIGALRM, self.waitAlarmHandler)
1896 - signal.alarm(self.timeToWait())
1897 - try:
1898 - recombinePipeline._lastProcessInPipe.wait()
1899 - break
1900 - except Exception, e:
1901 - if e.errno == errno.EINTR:
1902 - pass
1903 - else:
1904 - raise
1905 - self.progressCallback(runner)
1906 - signal.alarm(0)
 1878+ recombinePipeline = [ recombineCommand ]
 1879+ series = [ recombinePipeline ]
 1880+ result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 1881+ return result
19071882
19081883 class BigXmlDump(XmlDump):
19091884 """XML page dump for something larger, where a 7-Zip compressed copy
@@ -1971,10 +1946,7 @@
19721947 else:
19731948 series = self.buildCommand(runner)
19741949 commands.append(series)
1975 - # FIXME don't we want callback? yes we do. *sigh* on each one of these, right? bleah
1976 - # this means we have the alarm loop in here (while we do what, poll a lot?) and um
1977 - # write out a progress bar regardless after 60 secs by looking at all the files etc. bleah
1978 - result = runner.runCommand(commands, callback=self.progressCallback, arg=runner, shell = True)
 1950+ result = runner.runCommand(commands, callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
19791951 # temp hack force 644 permissions until ubuntu bug # 370618 is fixed - tomasz 5/1/2009
19801952 # some hacks aren't so temporary - atg 3 sept 2010
19811953 if (self._chunks):
@@ -2032,23 +2004,10 @@
20332005
20342006 recombineCommandString = self.buildRecombineCommandString(runner, files, outputFile, compressionCommand, uncompressionCommand )
20352007 recombineCommand = [ recombineCommandString ]
2036 - recombinePipeline = CommandPipeline([ recombineCommand ], shell = True)
2037 - recombinePipeline.startCommands()
2038 - while True:
2039 - # these commands don't produce any progress bar... so we can at least
2040 - # update the size and last update time of the file once a minute
2041 - signal.signal(signal.SIGALRM, self.waitAlarmHandler)
2042 - signal.alarm(self.timeToWait())
2043 - try:
2044 - recombinePipeline._lastProcessInPipe.wait()
2045 - break
2046 - except Exception, e:
2047 - if e.errno == errno.EINTR:
2048 - pass
2049 - else:
2050 - raise
2051 - self.progressCallback(runner)
2052 - signal.alarm(0)
 2008+ recombinePipeline = [ recombineCommand ]
 2009+ series = [ recombinePipeline ]
 2010+ result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 2011+ return result
20532012
20542013 class AbstractDump(Dump):
20552014 """XML dump for Yahoo!'s Active Abstracts thingy"""
@@ -2095,7 +2054,7 @@
20962055 else:
20972056 series = self.buildCommand(runner)
20982057 commands.append(series)
2099 - runner.runCommand(commands, callback=self.progressCallback, arg=runner)
 2058+ runner.runCommand(commands, callbackStderr=self.progressCallback, callbackStderrArg=runner)
21002059
21012060 def _variants(self, runner):
21022061 # If the database name looks like it's marked as Chinese language,
@@ -2160,23 +2119,10 @@
21612120 uncompressionCommand = [ "%s" % runner.config.cat ]
21622121 recombineCommandString = self.buildRecombineCommandString(runner, inputFiles, outputFile, compressionCommand, uncompressionCommand, "<feed>" )
21632122 recombineCommand = [ recombineCommandString ]
2164 - recombinePipeline = CommandPipeline([ recombineCommand ], shell = True)
2165 - recombinePipeline.startCommands()
2166 - while True:
2167 - # these commands don't produce any progress bar... so we can at least
2168 - # update the size and last update time of the file once a minute
2169 - signal.signal(signal.SIGALRM, self.waitAlarmHandler)
2170 - signal.alarm(self.timeToWait())
2171 - try:
2172 - recombinePipeline._lastProcessInPipe.wait()
2173 - break
2174 - except Exception, e:
2175 - if e.errno == errno.EINTR:
2176 - pass
2177 - else:
2178 - raise
2179 - self.progressCallback(runner)
2180 - signal.alarm(0)
 2123+ recombinePipeline = [ recombineCommand ]
 2124+ series = [ recombinePipeline ]
 2125+ result = runner.runCommand([ series ], callbackTimed=self.progressCallback, callbackTimedArg=runner, shell = True)
 2126+ return result
21812127
21822128 class TitleDump(Dump):
21832129 """This is used by "wikiproxy", a program to add Wikipedia links to BBC news online"""
Index: branches/ariel/xmldumps-backup/CommandManagement.py
@@ -8,6 +8,7 @@
99 import signal
1010 import Queue
1111 import thread
 12+import fcntl
1213
1314 from os.path import dirname, exists, getsize, join, realpath
1415 from subprocess import Popen, PIPE
@@ -388,8 +389,9 @@
389390 the line of output, it should be passed in the arg parameter (and it will be passed
390391 to the callback function first before the output line). If no callback is provided
391392 and the individual pipelines are not provided with a file to save output,
392 - then output is written to stderr."""
393 - def __init__(self, commandSeriesList, callback = None, arg=None, quiet = False, shell = False ):
 393+ then output is written to stderr.
 394+ Callbackinterval is in milliseconds, defaults is 20 seconds"""
 395+ def __init__(self, commandSeriesList, callbackStderr = None, callbackStdout = None, callbackTimed = None, callbackStderrArg=None, callbackStdoutArg = None, callbackTimedArg = None, quiet = False, shell = False, callbackInterval = 20000 ):
394396 self._commandSeriesList = commandSeriesList
395397 self._commandSerieses = []
396398 for series in self._commandSeriesList:
@@ -397,16 +399,20 @@
398400 # for each command series running in parallel,
399401 # in cases where a command pipeline in the series generates output, the callback
400402 # will be called with a line of output from the pipeline as it becomes available
401 - self._callback = callback
402 - self._arg = arg
 403+ self._callbackStderr = callbackStderr
 404+ self._callbackStdout = callbackStdout
 405+ self._callbackTimed = callbackTimed
 406+ self._callbackStderrArg = callbackStderrArg
 407+ self._callbackStdoutArg = callbackStdoutArg
 408+ self._callbackTimedArg = callbackTimedArg
403409 self._commandSeriesQueue = Queue.Queue()
404410
405411 # number millisecs we will wait for select.poll()
406412 self._defaultPollTime = 500
407413
408 - # for programs that don't generate output, wait this many seconds between
 414+ # for programs that don't generate output, wait this many milliseconds between
409415 # invoking callback if there is one
410 - self._defaultCallbackInterval = 20
 416+ self._defaultCallbackInterval = callbackInterval
411417
412418 def startCommands(self):
413419 for series in self._commandSerieses:
@@ -415,77 +421,72 @@
416422 # one of these as a thread to monitor each command series.
417423 def seriesMonitor(self, timeout, queue):
418424 series = queue.get()
419 - poller = select.poll()
420425 while series.processProducingOutput():
421426 p = series.processProducingOutput()
 427+ poller = select.poll()
422428 poller.register(p.stderr,select.POLLIN|select.POLLPRI)
 429+ fderr = p.stderr.fileno()
 430+ flerr = fcntl.fcntl(fderr, fcntl.F_GETFL)
 431+ fcntl.fcntl(fderr, fcntl.F_SETFL, flerr | os.O_NONBLOCK)
423432 if (p.stdout):
 433+ poller.register(p.stdout,select.POLLIN|select.POLLPRI)
424434 fdToStream = { p.stdout.fileno(): p.stdout, p.stderr.fileno(): p.stderr }
 435+ fdout = p.stdout.fileno()
 436+ flout = fcntl.fcntl(fdout, fcntl.F_GETFL)
 437+ fcntl.fcntl(fdout, fcntl.F_SETFL, flout | os.O_NONBLOCK)
425438 else:
426439 fdToStream = { p.stderr.fileno(): p.stderr }
427 - # if we have a savefile, this won't be set.
428 - if (p.stdout):
429 - poller.register(p.stdout,select.POLLIN|select.POLLPRI)
430440
431 - commandCompleted = False
 441+ commandCompleted = False
432442
433 - while not commandCompleted:
434 - waiting = poller.poll(self._defaultPollTime)
435 - if (waiting):
436 - for (fd,event) in waiting:
437 - series.inProgressPipeline().setPollState(event)
438 - if series.inProgressPipeline().checkPollReadyForRead():
439 -
440 - # so what happens if we get more than one line of output
441 - # in the poll interval? it will sit there waiting...
442 - # could have accumulation. FIXME. for our purposes we want
443 - # one line only, the latest. but for other uses of this
444 - # module? really we should read whatever is available only,
445 - # pass it to callback, let callback handle multiple lines,
446 - # partial lines etc.
447 - # out = p.stdout.readline()
448 - out = fdToStream[fd].readline()
449 - if out:
450 - if self._callback:
451 - if (self._arg):
452 - self._callback(self._arg, out)
 443+ waited = 0
 444+ while not commandCompleted:
 445+ waiting = poller.poll(self._defaultPollTime)
 446+ if (waiting):
 447+ for (fd,event) in waiting:
 448+ series.inProgressPipeline().setPollState(event)
 449+ if series.inProgressPipeline().checkPollReadyForRead():
 450+ out = os.read(fd,1024)
 451+ if out:
 452+ if fd == p.stderr.fileno():
 453+ if self._callbackStderr:
 454+ if (self._callbackStderrArg):
 455+ self._callbackStderr(self._callbackStderrArg, out)
453456 else:
454 - self._callback(out)
 457+ self._callbackStderr(out)
455458 else:
456 - # fixme this behavior is different, do we want it?
457459 sys.stderr.write(out)
458 - else:
459 - # possible eof? (empty string from readline)
460 - pass
461 - elif series.inProgressPipeline().checkForPollErrors():
462 - poller.unregister(fd)
463 - p.wait()
464 - # FIXME put the returncode someplace?
465 - print "returned from %s with %s" % (p.pid, p.returncode)
466 - commandCompleted = True
467 - if commandCompleted:
468 - break
 460+ elif fd == p.stdout.fileno():
 461+ if self._callbackStdout:
 462+ if (self._callbackStdoutArg):
 463+ self._callbackStdout(self._callbackStdoutArg, out)
 464+ else:
 465+ self._callbackStdout(out)
 466+ else:
 467+ sys.stderr.write(out)
 468+ else:
 469+ # possible eof? what would cause this?
 470+ pass
 471+ elif series.inProgressPipeline().checkForPollErrors():
 472+ poller.unregister(fd)
 473+ # FIXME if it closed prematurely and then runs for hours to completion
 474+ # we will get no updates here...
 475+ p.wait()
 476+ # FIXME put the returncode someplace?
 477+ print "returned from %s with %s" % (p.pid, p.returncode)
 478+ commandCompleted = True
469479
 480+ waited = waited + self._defaultPollTime
 481+ if waited > self._defaultCallbackInterval and self._callbackTimed:
 482+ if (self._callbackTimedArg):
 483+ self._callbackTimed(self._callbackTimedArg)
 484+ else:
 485+ self._callbackTimed()
 486+ waited = 0
470487
471 - # run next command in series, if any
472 - series.continueCommands()
 488+ # run next command in series, if any
 489+ series.continueCommands()
473490
474 - else:
475 - # no output from this process, just wait for it and do callback if there is one
476 - waited = 0
477 - while p.poll() == None:
478 - if waited > self._defaultCallbackInterval and self._callback:
479 - if (self._arg):
480 - self._callback(self._arg)
481 - else:
482 - self._callback()
483 - waited = 0
484 - time.sleep(1)
485 - waited = waited + 1
486 -
487 - print "returned from %s with %s" % (p.pid, p.returncode)
488 - series.continueCommands()
489 -
490491 # completed the whole series. time to go home.
491492 queue.task_done()
492493
@@ -551,7 +552,7 @@
552553 series4 = [ pipeline5 ]
553554 series5 = [ pipeline6 ]
554555 parallel = [ series1, series2, series3, series4, series5 ]
555 - commands = CommandsInParallel(parallel, callback=testcallback)
 556+ commands = CommandsInParallel(parallel, callbackStdout=testcallback)
556557 commands.runCommands()
557558 if commands.exitedSuccessfully():
558559 print "w00t!"

Status & tagging log