r79949 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r79948‎ | r79949 | r79950 >
Date:19:05, 10 January 2011
Author:ariel
Status:deferred
Tags:
Comment:
still lots of work to do but this should now be production usable

* enable us to run sets of dumps on from different db lists on different hosts
without having to edit the master list by hand to remove dups
* make 7z recombine phase run
* move db page stats retrieval out to a class
* fix range so that last page in a chunk is included in the dump instead of
excluded, wooooops

* chunk info moved into a class
* specify specific page ranges for chunks as workaround for crappy estimation
of revs per interval
* separate page ranges per chunk for abstracts (can't separate out current and
history, as it's stub-based and we write all stubs on one run)
* sensible defaults and config entries for papges per chunk etc, no more hardcoded
values

* find latest dump by looking first at date in dir name and then at mtime of
status file if needed (this allows us to rerun an old step of some job that
failed, without this suddenly being considered the "latest" dump)
* fix code that finds previous dump to figure out where it is in the list based
on date and not on relative index from end of list

* make all progress callbacks work
* fix signal/wait handling
* rewrite command monitoring/output retrieval to use queues and threads, fix
poll/wait loops to reasonable values, minimal cpu hogging
Modified paths:
  • /branches/ariel/xmldumps-backup/CommandManagement.py (modified) (history)
  • /branches/ariel/xmldumps-backup/WikiDump.py (modified) (history)
  • /branches/ariel/xmldumps-backup/worker.py (modified) (history)

Diff [purge]

Index: branches/ariel/xmldumps-backup/WikiDump.py
@@ -94,6 +94,7 @@
9595 "dir": "",
9696 "forcenormal": "0",
9797 "halt": "0",
 98+ "skipdblist" : "",
9899 #"output": {
99100 "public": "/dumps/public",
100101 "private": "/dumps/private",
@@ -122,11 +123,29 @@
123124 "grep": "/bin/grep",
124125 #"cleanup": {
125126 "keep": "3",
 127+ #"chunks": {
 128+ # set this to True to enable runing the various xml dump stages as chunks in parallel
 129+ "chunksEnabled" : False,
 130+ # for page history runs, number of pages for each chunk, specified separately
 131+ # e.g. "1000,10000,100000,2000000,2000000,2000000,2000000,2000000,2000000,2000000"
 132+ # would define 10 chunks with the specified number of pages in each and any extra in
 133+ # a final 11th chunk
 134+ "pagesPerChunkHistory" : False,
 135+ # revs per chunk (roughly, it will be split along page lines) for history and current dumps
 136+ # values: positive integer, "compute",
 137+ # this field is overriden by pagesPerChunkHistory
 138+ # CURRENTLY NOT COMPLETE so please don't use this.
 139+ "revsPerChunkHistory" : False,
 140+ # pages per chunk for abstract runs
 141+ "pagesPerChunkAbstract" : False,
126142 }
127143 conf = ConfigParser.SafeConfigParser(defaults)
128144 conf.read(files)
129145
130146 self.dbList = dbList(conf.get("wiki", "dblist"))
 147+ self.skipDbList = dbList(conf.get("wiki", "skipdblist"))
 148+ self.dbList = list(set(self.dbList) - set(self.skipDbList))
 149+
131150 self.privateList = dbList(conf.get("wiki", "privatelist"))
132151 biglistFile = conf.get("wiki", "biglist")
133152 if biglistFile:
@@ -169,6 +188,11 @@
170189 self.cat = conf.get("tools", "cat")
171190 self.grep = conf.get("tools", "grep")
172191
 192+ self.chunksEnabled = conf.get("chunks","chunksEnabled")
 193+ self.pagesPerChunkHistory = conf.get("chunks","pagesPerChunkHistory")
 194+ self.revsPerChunkHistory = conf.get("chunks","revsPerChunkHistory")
 195+ self.pagesPerChunkAbstract = conf.get("chunks","pagesPerChunkAbstract")
 196+
173197 self.keep = conf.getint("cleanup", "keep")
174198
175199 def dbListByAge(self):
@@ -185,6 +209,12 @@
186210
187211 If some error occurs checking a dump status, that dump is put last in the
188212 list (sort value is (True, maxint) )
 213+
 214+ Note that we now sort this list by the date of the dump directory, not the
 215+ last date that a dump file in that directory may have been touched. This
 216+ allows us to rerun jobs to completion from older runs, for example
 217+ an en pedia history urn that failed in the middle, without borking the
 218+ index page links.
189219 """
190220 available = []
191221 for db in self.dbList:
@@ -196,7 +226,9 @@
197227 if last:
198228 dumpStatus = os.path.join(wiki.publicDir(), last, "status.html")
199229 try:
200 - age = fileAge(dumpStatus)
 230+ # tack on the file age so that if we have multiple wikis
 231+ # dumped on the same day, they get ordered properly
 232+ age = last . fileAge(dumpStatus)
201233 status = readFile(dumpStatus)
202234 except:
203235 print "dump dir %s corrupt?" % dumpStatus
@@ -341,11 +373,14 @@
342374 link = "%s (new)" % self.dbName
343375 return "<li>%s %s: %s</li>\n" % (stamp, link, status)
344376
345 - def latestDump(self, index=-1):
 377+ def latestDump(self, index=-1, all=False):
346378 """Find the last (or slightly less than last) dump for a db."""
347379 dirs = self.dumpDirs()
348380 if dirs:
349 - return dirs[index]
 381+ if all:
 382+ return dirs
 383+ else:
 384+ return dirs[index]
350385 else:
351386 return None
352387
Index: branches/ariel/xmldumps-backup/worker.py
@@ -12,6 +12,7 @@
1313 import shutil
1414 import stat
1515 import signal
 16+import errno
1617 import WikiDump
1718 import CommandManagement
1819
@@ -46,6 +47,135 @@
4748 def xmlEscape(text):
4849 return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
4950
 51+# so if the pages/revsPerChunkAbstract/History are just one number it means
 52+# use that number for all the chunks, figure out yourself how many.
 53+# otherwise we get passed alist that says "here's now many for each chunk and it's this many chunks.
 54+# extra pages/revs go in the last chunk, stuck on the end. too bad. :-P
 55+class Chunk(object, ):
 56+ def __init__(self, wiki, dbName):
 57+
 58+ self._dbName = dbName
 59+ self._chunksEnabled = wiki.config.chunksEnabled
 60+ self._pagesPerChunkHistory = self.convertCommaSepLineToNumbers(wiki.config.pagesPerChunkHistory)
 61+ self._revsPerChunkHistory = self.convertCommaSepLineToNumbers(wiki.config.revsPerChunkHistory)
 62+ self._pagesPerChunkAbstract = self.convertCommaSepLineToNumbers(wiki.config.pagesPerChunkAbstract)
 63+
 64+ if (self._chunksEnabled):
 65+ self.Stats = PageAndEditStats(wiki,dbName)
 66+
 67+ if (self._revsPerChunkHistory):
 68+ if (len(self._revsPerChunkHistory) == 1):
 69+ self._numChunksHistory = self.getNumberOfChunksForXMLDumps(totalEdits, self._pagesPerChunkHistory)
 70+ self._revsPerChunkHistory = [ self._revsPerChunkHistory[0] for i in range(self._numChunksHistory)]
 71+ else:
 72+ self._numChunksHistory = len(self._revsPerChunkHistory)
 73+ # here we should generate the number of pages per chunk based on number of revs.
 74+ # ...next code update! FIXME
 75+ # self._pagesPerChunkHistory = ....
 76+ elif (self._pagesPerChunkHistory):
 77+ if (len(self._pagesPerChunkHistory) == 1):
 78+ self._numChunksHistory = self.getNumberOfChunksForXMLDumps(totalPages, self._pagesPerChunkHistory)
 79+ self._pagesPerChunkHistory = [ self._pagesPerChunkHistory[0] for i in range(self._numChunksHistory)]
 80+ else:
 81+ self._numChunksHistory = len(self._pagesPerChunkHistory)
 82+ else:
 83+ self._numChunksHistory = 0
 84+
 85+ if (self._pagesPerChunkAbstract):
 86+ if (len(self._pagesPerChunkAbstract) == 1):
 87+ self._numChunksAbstract = self.getNumberOfChunksForXMLDumps(totalPages, self._pagesPerChunkAbstract)
 88+ self._pagesPerChunkAbstract = [ self._pagesPerChunkAbstract[0] for i in range(self._numChunksAbstract)]
 89+ else:
 90+ self._numChunksAbstract = len(self._pagesPerChunkAbstract)
 91+ else:
 92+ self._numChunksAbstract = 0
 93+
 94+ def convertCommaSepLineToNumbers(self, line):
 95+ if (line == ""):
 96+ return(False)
 97+ result = line.split(',')
 98+ numbers = []
 99+ for field in result:
 100+ field = field.strip()
 101+ numbers.append(int(field))
 102+ return(numbers)
 103+
 104+ def getPagesPerChunkAbstract(self):
 105+ return self._pagesPerChunkAbstract
 106+
 107+ def getNumChunksAbstract(self):
 108+ return self._numChunksAbtsract
 109+
 110+ def getPagesPerChunkHistory(self):
 111+ return self._pagesPerChunkHistory
 112+
 113+ def getNumChunksHistory(self):
 114+ return self._numChunksHistory
 115+
 116+ def chunksEnabled(self):
 117+ return self._chunksEnabled
 118+
 119+ # args: total (pages or revs), and the number of (pages or revs) per chunk.
 120+ def getNumberOfChunksForXMLDumps(self, total, perChunk):
 121+ if (not total):
 122+ # default: no chunking.
 123+ return 0
 124+ else:
 125+ chunks = int(total/perChunk)
 126+ # more smaller chunks are better, we want speed
 127+ if (total - (chunks * perChunk)) > 0:
 128+ chunks = chunks + 1
 129+ if chunks == 1:
 130+ return 0
 131+ return chunks
 132+
 133+class PageAndEditStats(object):
 134+ def __init__(self, wiki, dbName):
 135+ self.totalPages = None
 136+ self.totalEdits = None
 137+ self.config = wiki.config
 138+ self.dbName = dbName
 139+ (totalPages, totalEdits) = self.getStatistics(config,dbName)
 140+
 141+ def getStatistics(self, config,dbName):
 142+ """Get (cached) statistics for the wiki"""
 143+ totalPages = None
 144+ totalEdits = None
 145+ statsCommand = """%s -q %s/maintenance/showStats.php --wiki=%s """ % shellEscape((
 146+ self.config.php, self.config.wikiDir, self.dbName))
 147+ # FIXME runAndReturn? defined somewhere else
 148+ results = self.runAndReturn(statsCommand)
 149+ lines = results.splitlines()
 150+ if (lines):
 151+ for line in lines:
 152+ (name,value) = line.split(':')
 153+ name = name.replace(' ','')
 154+ value = value.replace(' ','')
 155+ if (name == "Totalpages"):
 156+ totalPages = int(value)
 157+ elif (name == "Totaledits"):
 158+ totalEdits = int(value)
 159+ return(totalPages, totalEdits)
 160+
 161+ def getTotalPages(self):
 162+ return self.totalPages
 163+
 164+ def getTotalEdits(self):
 165+ return self.totalEdits
 166+
 167+ # FIXME should rewrite this I guess and also move it elsewhere, phooey
 168+ def runAndReturn(self, command):
 169+ """Run a command and return the output as a string.
 170+ Raises BackupError on non-zero return code."""
 171+ # FIXME convert all these calls so they just use runCommand now
 172+ proc = popen2.Popen4(command, 64)
 173+ output = proc.fromchild.read()
 174+ retval = proc.wait()
 175+ if retval:
 176+ raise BackupError("Non-zero return code from '%s'" % command)
 177+ else:
 178+ return output
 179+
50180 class BackupError(Exception):
51181 pass
52182
@@ -82,14 +212,15 @@
83213
84214 class DumpItemList(object):
85215
86 - def __init__(self, wiki, prefetch, spawn, date, chunks):
 216+ def __init__(self, wiki, prefetch, spawn, date, chunkInfo):
87217 self.date = date
88218 self.wiki = wiki
89219 self._hasFlaggedRevs = self.wiki.hasFlaggedRevs()
90220 self._isBig = self.wiki.isBig()
91221 self._prefetch = prefetch
92222 self._spawn = spawn
93 - self._chunks = chunks
 223+ self.chunkInfo = chunkInfo
 224+
94225 self.dumpItems = [PrivateTable("user", "usertable", "User account data."),
95226 PrivateTable("watchlist", "watchlisttable", "Users' watchlist settings."),
96227 PrivateTable("ipblocks", "ipblockstable", "Data for blocks of IP addresses, ranges, and users."),
@@ -122,36 +253,36 @@
123254
124255 TitleDump("pagetitlesdump", "List of page titles"),
125256
126 - AbstractDump("abstractsdump","Extracted page abstracts for Yahoo", self._chunks)]
 257+ AbstractDump("abstractsdump","Extracted page abstracts for Yahoo", self.chunkInfo.getPagesPerChunkAbstract())]
127258
128 - if (self._chunks):
129 - self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self._chunks))
 259+ if (self.chunkInfo.chunksEnabled()):
 260+ self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self.chunkInfo.getPagesPerChunkAbstract()))
130261
131 - self.dumpItems.append(XmlStub("xmlstubsdump", "First-pass for page XML data dumps", self._chunks))
132 -
133 - if (self._chunks):
134 - self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps", self._chunks))
 262+ self.dumpItems.append(XmlStub("xmlstubsdump", "First-pass for page XML data dumps", self.chunkInfo.getPagesPerChunkHistory()))
 263+ if (self.chunkInfo.chunksEnabled()):
 264+ self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps", self.chunkInfo.getPagesPerChunkHistory()))
135265
 266+ # NOTE that the chunkInfo thing passed here is irrelevant, these get generated from the stubs which are all done in one pass
136267 self.dumpItems.append(
137268 XmlDump("articles",
138269 "articlesdump",
139270 "<big><b>Articles, templates, image descriptions, and primary meta-pages.</b></big>",
140 - "This contains current versions of article content, and is the archive most mirror sites will probably want.", self._prefetch, self._spawn, self._chunks))
141 - if (self._chunks):
142 - self.dumpItems.append(RecombineXmlDump("articles","articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self._chunks))
 271+ "This contains current versions of article content, and is the archive most mirror sites will probably want.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory()))
 272+ if (self.chunkInfo.chunksEnabled()):
 273+ self.dumpItems.append(RecombineXmlDump("articles","articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.chunkInfo.getPagesPerChunkHistory()))
143274
144275 self.dumpItems.append(
145276 XmlDump("meta-current",
146277 "metacurrentdump",
147278 "All pages, current versions only.",
148 - "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._prefetch, self._spawn, self._chunks))
 279+ "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory()))
 280+
 281+ if (self.chunkInfo.chunksEnabled()):
 282+ self.dumpItems.append(RecombineXmlDump("meta-current","metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.chunkInfo.getPagesPerChunkHistory()))
149283
150 - if (self._chunks):
151 - self.dumpItems.append(RecombineXmlDump("meta-current","metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._chunks))
152 -
153284 self.dumpItems.append(
154285 XmlLogging("Log events to all pages."))
155 -
 286+
156287 if self._hasFlaggedRevs:
157288 self.dumpItems.append(
158289 PublicTable( "flaggedpages", "flaggedpagestable","This contains a row for each flagged article, containing the stable revision ID, if the lastest edit was flagged, and how long edits have been pending." ))
@@ -164,20 +295,20 @@
165296 "metahistorybz2dump",
166297 "All pages with complete page edit history (.bz2)",
167298 "These dumps can be *very* large, uncompressing up to 20 times the archive download size. " +
168 - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._prefetch, self._spawn, self._chunks))
 299+ "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory()))
169300 self.dumpItems.append(
170301 XmlRecompressDump("meta-history",
171302 "metahistory7zdump",
172303 "All pages with complete edit history (.7z)",
173304 "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
174 - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._chunks))
175 - if (self._chunks):
 305+ "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory()))
 306+ if (self.chunkInfo.chunksEnabled()):
176307 self.dumpItems.append(
177308 RecombineXmlRecompressDump("meta-history",
178309 "metahistory7zdumprecombine",
179310 "Recombine all pages with complete edit history (.7z)",
180311 "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " +
181 - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._chunks))
 312+ "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory()))
182313 self.oldRunInfoRetrieved = self._getOldRunInfoFromFile()
183314
184315
@@ -313,7 +444,7 @@
314445 if (not self.jobDoneSuccessfully("metacurrentdump")):
315446 return False
316447 if (job == "metahistory7zdumprecombine"):
317 - if (not self.jobDoneSuccessfully("metahistory7zdumprecombine")):
 448+ if (not self.jobDoneSuccessfully("metahistory7zdump")):
318449 return False
319450 if (job == "metahistory7zdump"):
320451 if (not self.jobDoneSuccessfully("xmlstubsdump") or not self.jobDoneSuccessfully("metahistorybz2dump")):
@@ -405,7 +536,7 @@
406537 self.dbName = wiki.dbName
407538 self.prefetch = prefetch
408539 self.spawn = spawn
409 - self._chunks = self.setNumberOfChunksForXMLDumps(self.dbName)
 540+ self.chunkInfo = Chunk(wiki, self.dbName)
410541
411542 if date:
412543 # Override, continuing a past dump?
@@ -423,7 +554,7 @@
424555 self.dumpDir = DumpDir(self.wiki, self.dbName, self.date)
425556 self.checksums = Checksummer(self.wiki, self.dumpDir)
426557 # some or all of these dumpItems will be marked to run
427 - self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self.date, self._chunks);
 558+ self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self.date, self.chunkInfo);
428559
429560 def passwordOption(self):
430561 """If you pass '-pfoo' mysql uses the password 'foo',
@@ -496,26 +627,6 @@
497628 totalEdits = int(value)
498629 return(totalPages, totalEdits)
499630
500 - def pagesPerChunk(self):
501 - # we are gonna say that chunks are 2 mill pages, it can always be adjusted later.
502 - # last one might be 3 mill pages.
503 - return 2000000
504 -# return 200
505 -
506 - def setNumberOfChunksForXMLDumps(self, dbName):
507 - (pages,edits) = self.getStatistics(dbName)
508 - if (not pages):
509 - # default: no chunking.
510 - return 0
511 - else:
512 - chunks = int(pages/self.pagesPerChunk())
513 - # more smaller chunks are better, we want speed
514 - if pages - (chunks * self.pagesPerChunk()) > 0:
515 - chunks = chunks + 1
516 - if chunks == 1:
517 - return 0
518 - return chunks
519 -
520631 # command series list: list of (commands plus args) is one pipeline. list of pipelines = 1 series.
521632 # this function wants a list of series.
522633 # be a list (the command name and the various args)
@@ -789,18 +900,29 @@
790901
791902 def reportPreviousDump(self, done):
792903 """Produce a link to the previous dump, if any"""
 904+ # get the list of dumps for this wiki in order, find me in the list, find the one prev to me.
 905+ # why? we might be rerunning a job from an older dumps. we might have two
 906+ # runs going at once (think en pedia, one finishing up the history, another
 907+ # starting at the beginning to get the new abstracts and stubs).
 908+
793909 try:
794 - raw = self.wiki.latestDump(-2)
 910+ dumpsInOrder = self.wiki.latestDump(all=True)
 911+ meIndex = dumpsInOrder.index(self.date)
 912+ # don't wrap around to the newest dump in the list!
 913+ if (meIndex > 0):
 914+ rawDate = dumpsInOrder[meIndex-1]
 915+ else:
 916+ raise(ValueException)
795917 except:
796918 return "No prior dumps of this database stored."
797 - date = WikiDump.prettyDate(raw)
 919+ prettyDate = WikiDump.prettyDate(rawDate)
798920 if done:
799921 prefix = ""
800922 message = "Last dumped on"
801923 else:
802924 prefix = "This dump is in progress; see also the "
803925 message = "previous dump from"
804 - return "%s<a href=\"../%s/\">%s %s</a>" % (prefix, raw, message, date)
 926+ return "%s<a href=\"../%s/\">%s %s</a>" % (prefix, rawDate, message, prettyDate)
805927
806928 def reportStatusSummaryLine(self, done=False):
807929 if (done == "done"):
@@ -925,7 +1047,7 @@
9261048 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())}
9271049 rssPath = self.dumpDir.latestPath(file + "-rss.xml")
9281050 WikiDump.dumpFile(rssPath, rssText)
929 -
 1051+
9301052 class Dump(object):
9311053 def __init__(self, name, desc):
9321054 self._desc = desc
@@ -998,6 +1120,11 @@
9991121 runner.updateStatusFiles()
10001122 runner.dumpItemList.saveDumpRunInfoFile()
10011123
 1124+ def timeToWait(self):
 1125+ # we use wait this many secs for a command to complete that
 1126+ # doesn't produce output
 1127+ return 5
 1128+
10021129 def waitAlarmHandler(self, signum, frame):
10031130 pass
10041131
@@ -1049,15 +1176,14 @@
10501177 recombine = " ".join(uncompressThisFile)
10511178 headerEndNum = int(headerEndNum) + 1
10521179 if (chunkNum == 1):
1053 - # skip footer
 1180+ # first file, put header and contents
10541181 recombine = recombine + " | %s -n -1 " % headEsc
1055 - elif (chunkNum == self._chunks):
1056 - # skip header
 1182+ elif (chunkNum == len(files)):
 1183+ # last file, put footer
10571184 recombine = recombine + (" | %s -n +%s" % (tailEsc, headerEndNum))
10581185 else:
1059 - # skip header
 1186+ # put contents only
10601187 recombine = recombine + (" | %s -n +%s" % (tailEsc, headerEndNum))
1061 - # skip footer
10621188 recombine = recombine + " | %s -n -1 " % head
10631189 recombines.append(recombine)
10641190 recombineCommandString = "(" + ";".join(recombines) + ")" + "|" + "%s %s" % (compressionCommand, outputFilename)
@@ -1105,7 +1231,7 @@
11061232 A second pass will import text from prior dumps or the database to make
11071233 full files for the public."""
11081234
1109 - def __init__(self, name, desc, chunks = None):
 1235+ def __init__(self, name, desc, chunks = False):
11101236 Dump.__init__(self, name, desc)
11111237 self._chunks = chunks
11121238
@@ -1115,7 +1241,7 @@
11161242 def listFiles(self, runner, unnumbered=False):
11171243 if (self._chunks) and not unnumbered:
11181244 files = []
1119 - for i in range(1, self._chunks + 1):
 1245+ for i in range(1, len(self._chunks) + 1):
11201246 files.append("stub-meta-history%s.xml.gz" % i)
11211247 files.append("stub-meta-current%s.xml.gz" % i)
11221248 files.append("stub-articles%s.xml.gz" % i)
@@ -1149,13 +1275,15 @@
11501276 if (chunk):
11511277 # set up start end end pageids for this piece
11521278 # note there is no page id 0 I guess. so we start with 1
1153 - start = runner.pagesPerChunk()*(chunk-1) + 1
 1279+ # start = runner.pagesPerChunk()*(chunk-1) + 1
 1280+ start = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1
11541281 startopt = "--start=%s" % start
11551282 # if we are on the last chunk, we should get up to the last pageid,
11561283 # whatever that is.
11571284 command.append(startopt)
1158 - if chunk < self._chunks:
1159 - end = start + runner.pagesPerChunk()
 1285+ if chunk < len(self._chunks):
 1286+ # end = start + runner.pagesPerChunk()
 1287+ end = sum([ self._chunks[i] for i in range(0,chunk)]) +1
11601288 endopt = "--end=%s" % end
11611289 command.append(endopt)
11621290
@@ -1166,7 +1294,7 @@
11671295 def run(self, runner):
11681296 commands = []
11691297 if self._chunks:
1170 - for i in range(1, self._chunks+1):
 1298+ for i in range(1, len(self._chunks)+1):
11711299 series = self.buildCommand(runner, i)
11721300 commands.append(series)
11731301 else:
@@ -1210,16 +1338,22 @@
12111339 # these commands don't produce any progress bar... so we can at least
12121340 # update the size and last update time of the file once a minute
12131341 signal.signal(signal.SIGALRM, self.waitAlarmHandler)
1214 - signal.alarm(60)
1215 - recombinePipeline._lastProcessInPipe.wait()
 1342+ signal.alarm(self.timeToWait())
 1343+ try:
 1344+ recombinePipeline._lastProcessInPipe.wait()
 1345+ break
 1346+ except Exception, e:
 1347+ if e.errno == errno.EINTR:
 1348+ pass
 1349+ else:
 1350+ raise
12161351 self.progressCallback(runner)
12171352 signal.alarm(0)
1218 - break
12191353
12201354 class XmlLogging(Dump):
12211355 """ Create a logging dump of all page activity """
12221356
1223 - def __init__(self, desc, chunks = None):
 1357+ def __init__(self, desc, chunks = False):
12241358 Dump.__init__(self, "xmlpagelogsdump", desc)
12251359 self._chunks = chunks
12261360
@@ -1246,7 +1380,7 @@
12471381
12481382 class XmlDump(Dump):
12491383 """Primary XML dumps, one section at a time."""
1250 - def __init__(self, subset, name, desc, detail, prefetch, spawn, chunks = None):
 1384+ def __init__(self, subset, name, desc, detail, prefetch, spawn, chunks = False):
12511385 Dump.__init__(self, name, desc)
12521386 self._subset = subset
12531387 self._detail = detail
@@ -1270,7 +1404,7 @@
12711405 def run(self, runner):
12721406 commands = []
12731407 if (self._chunks):
1274 - for i in range(1, self._chunks+1):
 1408+ for i in range(1, len(self._chunks)+1):
12751409 series = self.buildCommand(runner, i)
12761410 commands.append(series)
12771411 else:
@@ -1460,7 +1594,7 @@
14611595 def listFiles(self, runner, unnumbered = False):
14621596 if (self._chunks) and not unnumbered:
14631597 files = []
1464 - for i in range(1, self._chunks+1):
 1598+ for i in range(1, len(self._chunks)+1):
14651599 files.append(self._file("bz2",i))
14661600 return files
14671601 else:
@@ -1470,7 +1604,7 @@
14711605 return checkpoint == self.__class__.__name__ + "." + self._subset
14721606
14731607 class RecombineXmlDump(XmlDump):
1474 - def __init__(self, subset, name, desc, detail, chunks = None):
 1608+ def __init__(self, subset, name, desc, detail, chunks = False):
14751609 # no prefetch, no spawn
14761610 XmlDump.__init__(self, subset, name, desc, detail, None, None, chunks)
14771611 # this is here only so that a callback can capture output from some commands
@@ -1505,11 +1639,17 @@
15061640 # these commands don't produce any progress bar... so we can at least
15071641 # update the size and last update time of the file once a minute
15081642 signal.signal(signal.SIGALRM, self.waitAlarmHandler)
1509 - signal.alarm(60)
1510 - recombinePipeline._lastProcessInPipe.wait()
 1643+ signal.alarm(self.timeToWait())
 1644+ try:
 1645+ recombinePipeline._lastProcessInPipe.wait()
 1646+ break
 1647+ except Exception, e:
 1648+ if e.errno == errno.EINTR:
 1649+ pass
 1650+ else:
 1651+ raise
15111652 self.progressCallback(runner)
15121653 signal.alarm(0)
1513 - break
15141654
15151655 class BigXmlDump(XmlDump):
15161656 """XML page dump for something larger, where a 7-Zip compressed copy
@@ -1522,7 +1662,7 @@
15231663 class XmlRecompressDump(Dump):
15241664 """Take a .bz2 and recompress it as 7-Zip."""
15251665
1526 - def __init__(self, subset, name, desc, detail, chunks = None):
 1666+ def __init__(self, subset, name, desc, detail, chunks = False):
15271667 Dump.__init__(self, name, desc)
15281668 self._subset = subset
15291669 self._detail = detail
@@ -1572,7 +1712,7 @@
15731713 raise BackupError("bz2 dump incomplete, not recompressing")
15741714 commands = []
15751715 if (self._chunks):
1576 - for i in range(1, self._chunks+1):
 1716+ for i in range(1, len(self._chunks)+1):
15771717 series = self.buildCommand(runner, i)
15781718 commands.append(series)
15791719 else:
@@ -1585,7 +1725,7 @@
15861726 # temp hack force 644 permissions until ubuntu bug # 370618 is fixed - tomasz 5/1/2009
15871727 # some hacks aren't so temporary - atg 3 sept 2010
15881728 if (self._chunks):
1589 - for i in range(1, self._chunks+1):
 1729+ for i in range(1, len(self._chunks)+1):
15901730 xml7z = self.getOutputFilename(runner,i)
15911731 if exists(xml7z):
15921732 os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH )
@@ -1598,7 +1738,7 @@
15991739 def listFiles(self, runner, unnumbered = False):
16001740 if (self._chunks) and not unnumbered:
16011741 files = []
1602 - for i in range(1, self._chunks+1):
 1742+ for i in range(1, len(self._chunks)+1):
16031743 files.append(self._file("7z",i))
16041744 return files
16051745 else:
@@ -1645,16 +1785,22 @@
16461786 # these commands don't produce any progress bar... so we can at least
16471787 # update the size and last update time of the file once a minute
16481788 signal.signal(signal.SIGALRM, self.waitAlarmHandler)
1649 - signal.alarm(60)
1650 - recombinePipeline._lastProcessInPipe.wait()
 1789+ signal.alarm(self.timeToWait())
 1790+ try:
 1791+ recombinePipeline._lastProcessInPipe.wait()
 1792+ break
 1793+ except Exception, e:
 1794+ if e.errno == errno.EINTR:
 1795+ pass
 1796+ else:
 1797+ raise
16511798 self.progressCallback(runner)
16521799 signal.alarm(0)
1653 - break
16541800
16551801 class AbstractDump(Dump):
16561802 """XML dump for Yahoo!'s Active Abstracts thingy"""
16571803
1658 - def __init__(self, name, desc, chunks = None):
 1804+ def __init__(self, name, desc, chunks = False):
16591805 Dump.__init__(self, name, desc)
16601806 self._chunks = chunks
16611807
@@ -1672,13 +1818,15 @@
16731819 if (chunk):
16741820 # set up start end end pageids for this piece
16751821 # note there is no page id 0 I guess. so we start with 1
1676 - start = runner.pagesPerChunk()*(chunk-1) + 1
 1822+ # start = runner.pagesPerChunk()*(chunk-1) + 1
 1823+ start = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1
16771824 startopt = "--start=%s" % start
16781825 # if we are on the last chunk, we should get up to the last pageid,
16791826 # whatever that is.
16801827 command.append(startopt)
1681 - if chunk < self._chunks:
1682 - end = start + runner.pagesPerChunk()
 1828+ if chunk < len(self._chunks):
 1829+ # end = start + runner.pagesPerChunk()
 1830+ end = sum([ self._chunks[i] for i in range(0,chunk)]) +1
16831831 endopt = "--end=%s" % end
16841832 command.append(endopt)
16851833 pipeline = [ command ]
@@ -1688,7 +1836,7 @@
16891837 def run(self, runner):
16901838 commands = []
16911839 if (self._chunks):
1692 - for i in range(1, self._chunks+1):
 1840+ for i in range(1, len(self._chunks)+1):
16931841 series = self.buildCommand(runner, i)
16941842 commands.append(series)
16951843 else:
@@ -1725,7 +1873,7 @@
17261874 files = []
17271875 for x in self._variants(runner):
17281876 if (self._chunks) and not unnumbered:
1729 - for i in range(1, self._chunks+1):
 1877+ for i in range(1, len(self._chunks)+1):
17301878 files.append(self._variantFile(x, i))
17311879 else:
17321880 files.append(self._variantFile(x))
@@ -1765,11 +1913,17 @@
17661914 # these commands don't produce any progress bar... so we can at least
17671915 # update the size and last update time of the file once a minute
17681916 signal.signal(signal.SIGALRM, self.waitAlarmHandler)
1769 - signal.alarm(60)
1770 - recombinePipeline._lastProcessInPipe.wait()
 1917+ signal.alarm(self.timeToWait())
 1918+ try:
 1919+ recombinePipeline._lastProcessInPipe.wait()
 1920+ break
 1921+ except Exception, e:
 1922+ if e.errno == errno.EINTR:
 1923+ pass
 1924+ else:
 1925+ raise
17711926 self.progressCallback(runner)
17721927 signal.alarm(0)
1773 - break
17741928
17751929 class TitleDump(Dump):
17761930 """This is used by "wikiproxy", a program to add Wikipedia links to BBC news online"""
Index: branches/ariel/xmldumps-backup/CommandManagement.py
@@ -6,6 +6,8 @@
77 import subprocess
88 import select
99 import signal
 10+import Queue
 11+import thread
1012
1113 from os.path import dirname, exists, getsize, join, realpath
1214 from subprocess import Popen, PIPE
@@ -111,7 +113,9 @@
112114 else:
113115 stdoutOpt = PIPE
114116
115 - process = Popen( command, stdout=stdoutOpt, stdin=stdinOpt,
 117+ stderrOpt = PIPE
 118+
 119+ process = Popen( command, stdout=stdoutOpt, stdin=stdinOpt, stderr=stderrOpt,
116120 preexec_fn=self.subprocess_setup, shell= self._shell)
117121
118122 if (command == self._commands[0]):
@@ -142,10 +146,10 @@
143147 # will hang forever in the wait() on them.
144148 self._processes.reverse()
145149 for p in self._processes:
146 -# print "DEBUG: trying to get return code for %s" % p.pid
 150+ print "DEBUG: trying to get return code for %s" % p.pid
147151 self._exitValues.append(p.wait())
148152 retcode = p.poll()
149 -# print "DEBUG: return code %s for %s" % (retcode, p.pid)
 153+ print "DEBUG: return code %s for %s" % (retcode, p.pid)
150154 self._exitValues.reverse()
151155 self._processes.reverse()
152156 if (self.saveFile()):
@@ -154,8 +158,8 @@
155159
156160 def isRunning(self):
157161 """Check if process is running."""
158 - if (not self._lastProcessInPipe.poll()):
159 - return(False)
 162+ # Note that poll() returns None if the process
 163+ # is not completed, or some value (may be 0) otherwise
160164 if (self._lastProcessInPipe.poll() == None):
161165 return(True)
162166 else:
@@ -220,6 +224,7 @@
221225 if (timeout == None):
222226 fdReady = self._poller.poll()
223227 else:
 228+ # FIXME so poll doesn't take an arg :-P ...?
224229 fdReady = self._poller.poll(timeout)
225230
226231 if (fdReady):
@@ -240,9 +245,12 @@
241246 # line, let it get written to the caller anyways...
242247 # when we poll do we get a byte count of how much is available? no.
243248 out = self._lastProcessInPipe.stdout.readline()
 249+
 250+ # DEBUG
244251 # if (out):
245252 # sys.stdout.write("DEBUG: got from %s out %s" % (self._lastCommandString, out))
246253
 254+
247255 signal.alarm(0)
248256 return(out)
249257 elif self.checkForPollErrors():
@@ -378,90 +386,115 @@
379387 pipelines), as well as a possible callback which is used to capture all output
380388 from the various commmand series. If the callback takes an argument other than
381389 the line of output, it should be passed in the arg parameter (and it will be passed
382 - to the callback function first before the output line). If no callback is provided and the individual
383 - pipelines are not provided with a file to save output, then output is written
384 - to stderr."""
385 - def __init__(self, commandSeriesList, callback = None, arg=None, quiet = False, shell = False, forceCallback = False ):
 390+ to the callback function first before the output line). If no callback is provided
 391+ and the individual pipelines are not provided with a file to save output,
 392+ then output is written to stderr."""
 393+ def __init__(self, commandSeriesList, callback = None, arg=None, quiet = False, shell = False ):
386394 self._commandSeriesList = commandSeriesList
387395 self._commandSerieses = []
388396 for series in self._commandSeriesList:
389397 self._commandSerieses.append( CommandSeries(series, quiet, shell) )
390 - self._processesToPoll = []
391 - self._fdToProcess = {}
392 - self._fdToSeries = {}
 398+ # for each command series running in parallel,
 399+ # in cases where a command pipeline in the series generates output, the callback
 400+ # will be called with a line of output from the pipeline as it becomes available
393401 self._callback = callback
394402 self._arg = arg
395 - self._poller = None
 403+ self._commandSeriesQueue = Queue.Queue()
 404+
396405 # number millisecs we will wait for select.poll()
397406 self._defaultPollTime = 500
398 - # this number of poll cycles gives us 1 minute, use this
399 - # as the amount of time to wait between forced callbacks, if forced is requested
400 - self._defaultPollCycles = 120
401 - if (forceCallback and not callback):
402 - raise RuntimeError("CommandsInParallel: forceCallback requires that a callback be specified")
403 - self._forceCallback = forceCallback
 407+
 408+ # for programs that don't generate output, wait this many seconds between
 409+ # invoking callback if there is one
 410+ self._defaultCallbackInterval = 20
404411
405412 def startCommands(self):
406413 for series in self._commandSerieses:
407414 series.startCommands()
408415
409 - def setupOutputMonitoring(self):
410 - self._poller = select.poll()
411 - for series in self._commandSerieses:
 416+ # one of these as a thread to monitor each command series.
 417+ def seriesMonitor(self, timeout, queue):
 418+ series = queue.get()
 419+ poller = select.poll()
 420+ while series.processProducingOutput():
412421 p = series.processProducingOutput()
 422+ poller.register(p.stderr,select.POLLIN|select.POLLPRI)
 423+ if (p.stdout):
 424+ fdToStream = { p.stdout.fileno(): p.stdout, p.stderr.fileno(): p.stderr }
 425+ else:
 426+ fdToStream = { p.stderr.fileno(): p.stderr }
413427 # if we have a savefile, this won't be set.
414428 if (p.stdout):
415 - self._processesToPoll.append(p)
416 - self._poller.register(p.stdout,select.POLLIN|select.POLLPRI)
417 - self._fdToProcess['%s' % p.stdout.fileno()] = p
418 - self._fdToSeries['%s' % p.stdout.fileno()] = series
 429+ poller.register(p.stdout,select.POLLIN|select.POLLPRI)
419430
420 - # if there is a savefile don't call this, it's going to get written by itself
421 - def checkForOutput(self):
422 - if len(self._processesToPoll):
423 - fdReady = self._poller.poll(self._defaultPollTime)
424 - if (fdReady):
425 - for (fd,event) in fdReady:
426 - series = self._fdToSeries["%s" % fd]
427 - series.inProgressPipeline().setPollState(event)
428 - if series.inProgressPipeline().checkPollReadyForRead():
429 - out = self._fdToProcess['%s' % fd].stdout.readline()
430 - if out:
431 - if self._callback:
432 - if (self._arg):
433 - self._callback(self._arg, out)
 431+ commandCompleted = False
 432+
 433+ while not commandCompleted:
 434+ waiting = poller.poll(self._defaultPollTime)
 435+ if (waiting):
 436+ for (fd,event) in waiting:
 437+ series.inProgressPipeline().setPollState(event)
 438+ if series.inProgressPipeline().checkPollReadyForRead():
 439+
 440+ # so what happens if we get more than one line of output
 441+ # in the poll interval? it will sit there waiting...
 442+ # could have accumulation. FIXME. for our purposes we want
 443+ # one line only, the latest. but for other uses of this
 444+ # module? really we should read whatever is available only,
 445+ # pass it to callback, let callback handle multiple lines,
 446+ # partial lines etc.
 447+ # out = p.stdout.readline()
 448+ out = fdToStream[fd].readline()
 449+ if out:
 450+ if self._callback:
 451+ if (self._arg):
 452+ self._callback(self._arg, out)
 453+ else:
 454+ self._callback(out)
 455+ else:
 456+ # fixme this behavior is different, do we want it?
 457+ sys.stderr.write(out)
434458 else:
435 - self._callback(out)
436 - else:
437 - # fixme this behavior is different, do we want it?
438 - sys.stderr.write(out)
 459+ # possible eof? (empty string from readline)
 460+ pass
 461+ elif series.inProgressPipeline().checkForPollErrors():
 462+ poller.unregister(fd)
 463+ p.wait()
 464+ # FIXME put the returncode someplace?
 465+ print "returned from %s with %s" % (p.pid, p.returncode)
 466+ commandCompleted = True
 467+ if commandCompleted:
 468+ break
 469+
 470+
 471+ # run next command in series, if any
 472+ series.continueCommands()
 473+
 474+ else:
 475+ # no output from this process, just wait for it and do callback if there is one
 476+ waited = 0
 477+ while p.poll() == None:
 478+ if waited > self._defaultCallbackInterval and self._callback:
 479+ if (self._arg):
 480+ self._callback(self._arg)
439481 else:
440 - # possible eof? (empty string from readline)
441 - pass
442 - elif series.inProgressPipeline().checkForPollErrors():
443 - self._poller.unregister(fd)
444 - self._processesToPoll.remove(self._fdToProcess['%s' % fd])
445 - else:
446 - pass
447 - else:
448 - # suppose there is nothing left to poll.
449 - # but whatever state we have left around then is... False (= we are trying to get state from None obj)?
450 - # then we will never know we are done? ugh
451 - pass
 482+ self._callback()
 483+ waited = 0
 484+ time.sleep(1)
 485+ waited = waited + 1
452486
453 - def continueCommands(self):
454 - """Start new commands if needed, updating the poller if there are changes"""
 487+ print "returned from %s with %s" % (p.pid, p.returncode)
 488+ series.continueCommands()
 489+
 490+ # completed the whole series. time to go home.
 491+ queue.task_done()
 492+
 493+
 494+ def setupOutputMonitoring(self):
455495 for series in self._commandSerieses:
456 - oldInProgress = series.inProgressPipeline()
457 - series.continueCommands()
458 - if series.inProgressPipeline() != oldInProgress:
459 - if (series.processProducingOutput()):
460 - # Note that we don't remove the old process, that will happen when we check for output.
461 - self._poller.register(series.processProducingOutput().stdout,select.POLLIN|select.POLLPRI)
462 - self._processesToPoll.append(series.processProducingOutput())
463 - self._fdToProcess['%s' % series.processProducingOutput().stdout.fileno()] = series.processProducingOutput()
464 - self._fdToSeries['%s' % series.processProducingOutput().stdout.fileno()] = series
465 -
 496+ self._commandSeriesQueue.put(series)
 497+ thread.start_new_thread(self.seriesMonitor, (500, self._commandSeriesQueue))
 498+
466499 def allCommandsCompleted(self):
467500 """Check if all series have run to completion."""
468501 for series in self._commandSerieses:
@@ -486,19 +519,17 @@
487520 def runCommands(self):
488521 self.startCommands()
489522 self.setupOutputMonitoring()
490 - done = False
491 - while not done:
492 - for i in range(self._defaultPollCycles):
493 - self.checkForOutput()
494 - self.continueCommands()
495 - if (self.allCommandsCompleted() and not len(self._processesToPoll)):
496 - done = True
497 - break
498 - if (self._forceCallback):
499 - # forced call to callback every so often
500 - self.progressCallback(runner, "")
 523+ self._commandSeriesQueue.join()
501524
502525
 526+def testcallback(output = None):
 527+ outputFile = open("/home/ariel/src/mediawiki/testing/outputsaved.txt","a")
 528+ if (output == None):
 529+ outputFile.write( "no output for me.\n" )
 530+ else:
 531+ outputFile.write(output)
 532+ outputFile.close()
 533+
503534 if __name__ == "__main__":
504535 command1 = [ "/usr/bin/vmstat", "1", "10" ]
505536 command2 = [ "/usr/sbin/lnstat", "-i", "7", "-c", "5", "-k", "arp_cache:entries,rt_cache:in_hit,arp_cache:destroys", ">", "/home/ariel/src/mediawiki/testing/savelnstat.txt" ]
@@ -520,7 +551,7 @@
521552 series4 = [ pipeline5 ]
522553 series5 = [ pipeline6 ]
523554 parallel = [ series1, series2, series3, series4, series5 ]
524 - commands = CommandsInParallel(parallel)
 555+ commands = CommandsInParallel(parallel, callback=testcallback)
525556 commands.runCommands()
526557 if commands.exitedSuccessfully():
527558 print "w00t!"

Status & tagging log