Index: branches/ariel/xmldumps-backup/WikiDump.py |
— | — | @@ -94,6 +94,7 @@ |
95 | 95 | "dir": "", |
96 | 96 | "forcenormal": "0", |
97 | 97 | "halt": "0", |
| 98 | + "skipdblist" : "", |
98 | 99 | #"output": { |
99 | 100 | "public": "/dumps/public", |
100 | 101 | "private": "/dumps/private", |
— | — | @@ -122,11 +123,29 @@ |
123 | 124 | "grep": "/bin/grep", |
124 | 125 | #"cleanup": { |
125 | 126 | "keep": "3", |
| 127 | + #"chunks": { |
| 128 | + # set this to True to enable runing the various xml dump stages as chunks in parallel |
| 129 | + "chunksEnabled" : False, |
| 130 | + # for page history runs, number of pages for each chunk, specified separately |
| 131 | + # e.g. "1000,10000,100000,2000000,2000000,2000000,2000000,2000000,2000000,2000000" |
| 132 | + # would define 10 chunks with the specified number of pages in each and any extra in |
| 133 | + # a final 11th chunk |
| 134 | + "pagesPerChunkHistory" : False, |
| 135 | + # revs per chunk (roughly, it will be split along page lines) for history and current dumps |
| 136 | + # values: positive integer, "compute", |
| 137 | + # this field is overriden by pagesPerChunkHistory |
| 138 | + # CURRENTLY NOT COMPLETE so please don't use this. |
| 139 | + "revsPerChunkHistory" : False, |
| 140 | + # pages per chunk for abstract runs |
| 141 | + "pagesPerChunkAbstract" : False, |
126 | 142 | } |
127 | 143 | conf = ConfigParser.SafeConfigParser(defaults) |
128 | 144 | conf.read(files) |
129 | 145 | |
130 | 146 | self.dbList = dbList(conf.get("wiki", "dblist")) |
| 147 | + self.skipDbList = dbList(conf.get("wiki", "skipdblist")) |
| 148 | + self.dbList = list(set(self.dbList) - set(self.skipDbList)) |
| 149 | + |
131 | 150 | self.privateList = dbList(conf.get("wiki", "privatelist")) |
132 | 151 | biglistFile = conf.get("wiki", "biglist") |
133 | 152 | if biglistFile: |
— | — | @@ -169,6 +188,11 @@ |
170 | 189 | self.cat = conf.get("tools", "cat") |
171 | 190 | self.grep = conf.get("tools", "grep") |
172 | 191 | |
| 192 | + self.chunksEnabled = conf.get("chunks","chunksEnabled") |
| 193 | + self.pagesPerChunkHistory = conf.get("chunks","pagesPerChunkHistory") |
| 194 | + self.revsPerChunkHistory = conf.get("chunks","revsPerChunkHistory") |
| 195 | + self.pagesPerChunkAbstract = conf.get("chunks","pagesPerChunkAbstract") |
| 196 | + |
173 | 197 | self.keep = conf.getint("cleanup", "keep") |
174 | 198 | |
175 | 199 | def dbListByAge(self): |
— | — | @@ -185,6 +209,12 @@ |
186 | 210 | |
187 | 211 | If some error occurs checking a dump status, that dump is put last in the |
188 | 212 | list (sort value is (True, maxint) ) |
| 213 | + |
| 214 | + Note that we now sort this list by the date of the dump directory, not the |
| 215 | + last date that a dump file in that directory may have been touched. This |
| 216 | + allows us to rerun jobs to completion from older runs, for example |
| 217 | + an en pedia history urn that failed in the middle, without borking the |
| 218 | + index page links. |
189 | 219 | """ |
190 | 220 | available = [] |
191 | 221 | for db in self.dbList: |
— | — | @@ -196,7 +226,9 @@ |
197 | 227 | if last: |
198 | 228 | dumpStatus = os.path.join(wiki.publicDir(), last, "status.html") |
199 | 229 | try: |
200 | | - age = fileAge(dumpStatus) |
| 230 | + # tack on the file age so that if we have multiple wikis |
| 231 | + # dumped on the same day, they get ordered properly |
| 232 | + age = last . fileAge(dumpStatus) |
201 | 233 | status = readFile(dumpStatus) |
202 | 234 | except: |
203 | 235 | print "dump dir %s corrupt?" % dumpStatus |
— | — | @@ -341,11 +373,14 @@ |
342 | 374 | link = "%s (new)" % self.dbName |
343 | 375 | return "<li>%s %s: %s</li>\n" % (stamp, link, status) |
344 | 376 | |
345 | | - def latestDump(self, index=-1): |
| 377 | + def latestDump(self, index=-1, all=False): |
346 | 378 | """Find the last (or slightly less than last) dump for a db.""" |
347 | 379 | dirs = self.dumpDirs() |
348 | 380 | if dirs: |
349 | | - return dirs[index] |
| 381 | + if all: |
| 382 | + return dirs |
| 383 | + else: |
| 384 | + return dirs[index] |
350 | 385 | else: |
351 | 386 | return None |
352 | 387 | |
Index: branches/ariel/xmldumps-backup/worker.py |
— | — | @@ -12,6 +12,7 @@ |
13 | 13 | import shutil |
14 | 14 | import stat |
15 | 15 | import signal |
| 16 | +import errno |
16 | 17 | import WikiDump |
17 | 18 | import CommandManagement |
18 | 19 | |
— | — | @@ -46,6 +47,135 @@ |
47 | 48 | def xmlEscape(text): |
48 | 49 | return text.replace("&", "&").replace("<", "<").replace(">", ">") |
49 | 50 | |
| 51 | +# so if the pages/revsPerChunkAbstract/History are just one number it means |
| 52 | +# use that number for all the chunks, figure out yourself how many. |
| 53 | +# otherwise we get passed alist that says "here's now many for each chunk and it's this many chunks. |
| 54 | +# extra pages/revs go in the last chunk, stuck on the end. too bad. :-P |
| 55 | +class Chunk(object, ): |
| 56 | + def __init__(self, wiki, dbName): |
| 57 | + |
| 58 | + self._dbName = dbName |
| 59 | + self._chunksEnabled = wiki.config.chunksEnabled |
| 60 | + self._pagesPerChunkHistory = self.convertCommaSepLineToNumbers(wiki.config.pagesPerChunkHistory) |
| 61 | + self._revsPerChunkHistory = self.convertCommaSepLineToNumbers(wiki.config.revsPerChunkHistory) |
| 62 | + self._pagesPerChunkAbstract = self.convertCommaSepLineToNumbers(wiki.config.pagesPerChunkAbstract) |
| 63 | + |
| 64 | + if (self._chunksEnabled): |
| 65 | + self.Stats = PageAndEditStats(wiki,dbName) |
| 66 | + |
| 67 | + if (self._revsPerChunkHistory): |
| 68 | + if (len(self._revsPerChunkHistory) == 1): |
| 69 | + self._numChunksHistory = self.getNumberOfChunksForXMLDumps(totalEdits, self._pagesPerChunkHistory) |
| 70 | + self._revsPerChunkHistory = [ self._revsPerChunkHistory[0] for i in range(self._numChunksHistory)] |
| 71 | + else: |
| 72 | + self._numChunksHistory = len(self._revsPerChunkHistory) |
| 73 | + # here we should generate the number of pages per chunk based on number of revs. |
| 74 | + # ...next code update! FIXME |
| 75 | + # self._pagesPerChunkHistory = .... |
| 76 | + elif (self._pagesPerChunkHistory): |
| 77 | + if (len(self._pagesPerChunkHistory) == 1): |
| 78 | + self._numChunksHistory = self.getNumberOfChunksForXMLDumps(totalPages, self._pagesPerChunkHistory) |
| 79 | + self._pagesPerChunkHistory = [ self._pagesPerChunkHistory[0] for i in range(self._numChunksHistory)] |
| 80 | + else: |
| 81 | + self._numChunksHistory = len(self._pagesPerChunkHistory) |
| 82 | + else: |
| 83 | + self._numChunksHistory = 0 |
| 84 | + |
| 85 | + if (self._pagesPerChunkAbstract): |
| 86 | + if (len(self._pagesPerChunkAbstract) == 1): |
| 87 | + self._numChunksAbstract = self.getNumberOfChunksForXMLDumps(totalPages, self._pagesPerChunkAbstract) |
| 88 | + self._pagesPerChunkAbstract = [ self._pagesPerChunkAbstract[0] for i in range(self._numChunksAbstract)] |
| 89 | + else: |
| 90 | + self._numChunksAbstract = len(self._pagesPerChunkAbstract) |
| 91 | + else: |
| 92 | + self._numChunksAbstract = 0 |
| 93 | + |
| 94 | + def convertCommaSepLineToNumbers(self, line): |
| 95 | + if (line == ""): |
| 96 | + return(False) |
| 97 | + result = line.split(',') |
| 98 | + numbers = [] |
| 99 | + for field in result: |
| 100 | + field = field.strip() |
| 101 | + numbers.append(int(field)) |
| 102 | + return(numbers) |
| 103 | + |
| 104 | + def getPagesPerChunkAbstract(self): |
| 105 | + return self._pagesPerChunkAbstract |
| 106 | + |
| 107 | + def getNumChunksAbstract(self): |
| 108 | + return self._numChunksAbtsract |
| 109 | + |
| 110 | + def getPagesPerChunkHistory(self): |
| 111 | + return self._pagesPerChunkHistory |
| 112 | + |
| 113 | + def getNumChunksHistory(self): |
| 114 | + return self._numChunksHistory |
| 115 | + |
| 116 | + def chunksEnabled(self): |
| 117 | + return self._chunksEnabled |
| 118 | + |
| 119 | + # args: total (pages or revs), and the number of (pages or revs) per chunk. |
| 120 | + def getNumberOfChunksForXMLDumps(self, total, perChunk): |
| 121 | + if (not total): |
| 122 | + # default: no chunking. |
| 123 | + return 0 |
| 124 | + else: |
| 125 | + chunks = int(total/perChunk) |
| 126 | + # more smaller chunks are better, we want speed |
| 127 | + if (total - (chunks * perChunk)) > 0: |
| 128 | + chunks = chunks + 1 |
| 129 | + if chunks == 1: |
| 130 | + return 0 |
| 131 | + return chunks |
| 132 | + |
| 133 | +class PageAndEditStats(object): |
| 134 | + def __init__(self, wiki, dbName): |
| 135 | + self.totalPages = None |
| 136 | + self.totalEdits = None |
| 137 | + self.config = wiki.config |
| 138 | + self.dbName = dbName |
| 139 | + (totalPages, totalEdits) = self.getStatistics(config,dbName) |
| 140 | + |
| 141 | + def getStatistics(self, config,dbName): |
| 142 | + """Get (cached) statistics for the wiki""" |
| 143 | + totalPages = None |
| 144 | + totalEdits = None |
| 145 | + statsCommand = """%s -q %s/maintenance/showStats.php --wiki=%s """ % shellEscape(( |
| 146 | + self.config.php, self.config.wikiDir, self.dbName)) |
| 147 | + # FIXME runAndReturn? defined somewhere else |
| 148 | + results = self.runAndReturn(statsCommand) |
| 149 | + lines = results.splitlines() |
| 150 | + if (lines): |
| 151 | + for line in lines: |
| 152 | + (name,value) = line.split(':') |
| 153 | + name = name.replace(' ','') |
| 154 | + value = value.replace(' ','') |
| 155 | + if (name == "Totalpages"): |
| 156 | + totalPages = int(value) |
| 157 | + elif (name == "Totaledits"): |
| 158 | + totalEdits = int(value) |
| 159 | + return(totalPages, totalEdits) |
| 160 | + |
| 161 | + def getTotalPages(self): |
| 162 | + return self.totalPages |
| 163 | + |
| 164 | + def getTotalEdits(self): |
| 165 | + return self.totalEdits |
| 166 | + |
| 167 | + # FIXME should rewrite this I guess and also move it elsewhere, phooey |
| 168 | + def runAndReturn(self, command): |
| 169 | + """Run a command and return the output as a string. |
| 170 | + Raises BackupError on non-zero return code.""" |
| 171 | + # FIXME convert all these calls so they just use runCommand now |
| 172 | + proc = popen2.Popen4(command, 64) |
| 173 | + output = proc.fromchild.read() |
| 174 | + retval = proc.wait() |
| 175 | + if retval: |
| 176 | + raise BackupError("Non-zero return code from '%s'" % command) |
| 177 | + else: |
| 178 | + return output |
| 179 | + |
50 | 180 | class BackupError(Exception): |
51 | 181 | pass |
52 | 182 | |
— | — | @@ -82,14 +212,15 @@ |
83 | 213 | |
84 | 214 | class DumpItemList(object): |
85 | 215 | |
86 | | - def __init__(self, wiki, prefetch, spawn, date, chunks): |
| 216 | + def __init__(self, wiki, prefetch, spawn, date, chunkInfo): |
87 | 217 | self.date = date |
88 | 218 | self.wiki = wiki |
89 | 219 | self._hasFlaggedRevs = self.wiki.hasFlaggedRevs() |
90 | 220 | self._isBig = self.wiki.isBig() |
91 | 221 | self._prefetch = prefetch |
92 | 222 | self._spawn = spawn |
93 | | - self._chunks = chunks |
| 223 | + self.chunkInfo = chunkInfo |
| 224 | + |
94 | 225 | self.dumpItems = [PrivateTable("user", "usertable", "User account data."), |
95 | 226 | PrivateTable("watchlist", "watchlisttable", "Users' watchlist settings."), |
96 | 227 | PrivateTable("ipblocks", "ipblockstable", "Data for blocks of IP addresses, ranges, and users."), |
— | — | @@ -122,36 +253,36 @@ |
123 | 254 | |
124 | 255 | TitleDump("pagetitlesdump", "List of page titles"), |
125 | 256 | |
126 | | - AbstractDump("abstractsdump","Extracted page abstracts for Yahoo", self._chunks)] |
| 257 | + AbstractDump("abstractsdump","Extracted page abstracts for Yahoo", self.chunkInfo.getPagesPerChunkAbstract())] |
127 | 258 | |
128 | | - if (self._chunks): |
129 | | - self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self._chunks)) |
| 259 | + if (self.chunkInfo.chunksEnabled()): |
| 260 | + self.dumpItems.append(RecombineAbstractDump("abstractsdumprecombine", "Recombine extracted page abstracts for Yahoo", self.chunkInfo.getPagesPerChunkAbstract())) |
130 | 261 | |
131 | | - self.dumpItems.append(XmlStub("xmlstubsdump", "First-pass for page XML data dumps", self._chunks)) |
132 | | - |
133 | | - if (self._chunks): |
134 | | - self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps", self._chunks)) |
| 262 | + self.dumpItems.append(XmlStub("xmlstubsdump", "First-pass for page XML data dumps", self.chunkInfo.getPagesPerChunkHistory())) |
| 263 | + if (self.chunkInfo.chunksEnabled()): |
| 264 | + self.dumpItems.append(RecombineXmlStub("xmlstubsdumprecombine", "Recombine first-pass for page XML data dumps", self.chunkInfo.getPagesPerChunkHistory())) |
135 | 265 | |
| 266 | + # NOTE that the chunkInfo thing passed here is irrelevant, these get generated from the stubs which are all done in one pass |
136 | 267 | self.dumpItems.append( |
137 | 268 | XmlDump("articles", |
138 | 269 | "articlesdump", |
139 | 270 | "<big><b>Articles, templates, image descriptions, and primary meta-pages.</b></big>", |
140 | | - "This contains current versions of article content, and is the archive most mirror sites will probably want.", self._prefetch, self._spawn, self._chunks)) |
141 | | - if (self._chunks): |
142 | | - self.dumpItems.append(RecombineXmlDump("articles","articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self._chunks)) |
| 271 | + "This contains current versions of article content, and is the archive most mirror sites will probably want.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory())) |
| 272 | + if (self.chunkInfo.chunksEnabled()): |
| 273 | + self.dumpItems.append(RecombineXmlDump("articles","articlesdumprecombine", "<big><b>Recombine articles, templates, image descriptions, and primary meta-pages.</b></big>","This contains current versions of article content, and is the archive most mirror sites will probably want.", self.chunkInfo.getPagesPerChunkHistory())) |
143 | 274 | |
144 | 275 | self.dumpItems.append( |
145 | 276 | XmlDump("meta-current", |
146 | 277 | "metacurrentdump", |
147 | 278 | "All pages, current versions only.", |
148 | | - "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._prefetch, self._spawn, self._chunks)) |
| 279 | + "Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory())) |
| 280 | + |
| 281 | + if (self.chunkInfo.chunksEnabled()): |
| 282 | + self.dumpItems.append(RecombineXmlDump("meta-current","metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self.chunkInfo.getPagesPerChunkHistory())) |
149 | 283 | |
150 | | - if (self._chunks): |
151 | | - self.dumpItems.append(RecombineXmlDump("meta-current","metacurrentdumprecombine", "Recombine all pages, current versions only.","Discussion and user pages are included in this complete archive. Most mirrors won't want this extra material.", self._chunks)) |
152 | | - |
153 | 284 | self.dumpItems.append( |
154 | 285 | XmlLogging("Log events to all pages.")) |
155 | | - |
| 286 | + |
156 | 287 | if self._hasFlaggedRevs: |
157 | 288 | self.dumpItems.append( |
158 | 289 | PublicTable( "flaggedpages", "flaggedpagestable","This contains a row for each flagged article, containing the stable revision ID, if the lastest edit was flagged, and how long edits have been pending." )) |
— | — | @@ -164,20 +295,20 @@ |
165 | 296 | "metahistorybz2dump", |
166 | 297 | "All pages with complete page edit history (.bz2)", |
167 | 298 | "These dumps can be *very* large, uncompressing up to 20 times the archive download size. " + |
168 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._prefetch, self._spawn, self._chunks)) |
| 299 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._prefetch, self._spawn, self.chunkInfo.getPagesPerChunkHistory())) |
169 | 300 | self.dumpItems.append( |
170 | 301 | XmlRecompressDump("meta-history", |
171 | 302 | "metahistory7zdump", |
172 | 303 | "All pages with complete edit history (.7z)", |
173 | 304 | "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
174 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._chunks)) |
175 | | - if (self._chunks): |
| 305 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory())) |
| 306 | + if (self.chunkInfo.chunksEnabled()): |
176 | 307 | self.dumpItems.append( |
177 | 308 | RecombineXmlRecompressDump("meta-history", |
178 | 309 | "metahistory7zdumprecombine", |
179 | 310 | "Recombine all pages with complete edit history (.7z)", |
180 | 311 | "These dumps can be *very* large, uncompressing up to 100 times the archive download size. " + |
181 | | - "Suitable for archival and statistical use, most mirror sites won't want or need this.", self._chunks)) |
| 312 | + "Suitable for archival and statistical use, most mirror sites won't want or need this.", self.chunkInfo.getPagesPerChunkHistory())) |
182 | 313 | self.oldRunInfoRetrieved = self._getOldRunInfoFromFile() |
183 | 314 | |
184 | 315 | |
— | — | @@ -313,7 +444,7 @@ |
314 | 445 | if (not self.jobDoneSuccessfully("metacurrentdump")): |
315 | 446 | return False |
316 | 447 | if (job == "metahistory7zdumprecombine"): |
317 | | - if (not self.jobDoneSuccessfully("metahistory7zdumprecombine")): |
| 448 | + if (not self.jobDoneSuccessfully("metahistory7zdump")): |
318 | 449 | return False |
319 | 450 | if (job == "metahistory7zdump"): |
320 | 451 | if (not self.jobDoneSuccessfully("xmlstubsdump") or not self.jobDoneSuccessfully("metahistorybz2dump")): |
— | — | @@ -405,7 +536,7 @@ |
406 | 537 | self.dbName = wiki.dbName |
407 | 538 | self.prefetch = prefetch |
408 | 539 | self.spawn = spawn |
409 | | - self._chunks = self.setNumberOfChunksForXMLDumps(self.dbName) |
| 540 | + self.chunkInfo = Chunk(wiki, self.dbName) |
410 | 541 | |
411 | 542 | if date: |
412 | 543 | # Override, continuing a past dump? |
— | — | @@ -423,7 +554,7 @@ |
424 | 555 | self.dumpDir = DumpDir(self.wiki, self.dbName, self.date) |
425 | 556 | self.checksums = Checksummer(self.wiki, self.dumpDir) |
426 | 557 | # some or all of these dumpItems will be marked to run |
427 | | - self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self.date, self._chunks); |
| 558 | + self.dumpItemList = DumpItemList(self.wiki, self.prefetch, self.spawn, self.date, self.chunkInfo); |
428 | 559 | |
429 | 560 | def passwordOption(self): |
430 | 561 | """If you pass '-pfoo' mysql uses the password 'foo', |
— | — | @@ -496,26 +627,6 @@ |
497 | 628 | totalEdits = int(value) |
498 | 629 | return(totalPages, totalEdits) |
499 | 630 | |
500 | | - def pagesPerChunk(self): |
501 | | - # we are gonna say that chunks are 2 mill pages, it can always be adjusted later. |
502 | | - # last one might be 3 mill pages. |
503 | | - return 2000000 |
504 | | -# return 200 |
505 | | - |
506 | | - def setNumberOfChunksForXMLDumps(self, dbName): |
507 | | - (pages,edits) = self.getStatistics(dbName) |
508 | | - if (not pages): |
509 | | - # default: no chunking. |
510 | | - return 0 |
511 | | - else: |
512 | | - chunks = int(pages/self.pagesPerChunk()) |
513 | | - # more smaller chunks are better, we want speed |
514 | | - if pages - (chunks * self.pagesPerChunk()) > 0: |
515 | | - chunks = chunks + 1 |
516 | | - if chunks == 1: |
517 | | - return 0 |
518 | | - return chunks |
519 | | - |
520 | 631 | # command series list: list of (commands plus args) is one pipeline. list of pipelines = 1 series. |
521 | 632 | # this function wants a list of series. |
522 | 633 | # be a list (the command name and the various args) |
— | — | @@ -789,18 +900,29 @@ |
790 | 901 | |
791 | 902 | def reportPreviousDump(self, done): |
792 | 903 | """Produce a link to the previous dump, if any""" |
| 904 | + # get the list of dumps for this wiki in order, find me in the list, find the one prev to me. |
| 905 | + # why? we might be rerunning a job from an older dumps. we might have two |
| 906 | + # runs going at once (think en pedia, one finishing up the history, another |
| 907 | + # starting at the beginning to get the new abstracts and stubs). |
| 908 | + |
793 | 909 | try: |
794 | | - raw = self.wiki.latestDump(-2) |
| 910 | + dumpsInOrder = self.wiki.latestDump(all=True) |
| 911 | + meIndex = dumpsInOrder.index(self.date) |
| 912 | + # don't wrap around to the newest dump in the list! |
| 913 | + if (meIndex > 0): |
| 914 | + rawDate = dumpsInOrder[meIndex-1] |
| 915 | + else: |
| 916 | + raise(ValueException) |
795 | 917 | except: |
796 | 918 | return "No prior dumps of this database stored." |
797 | | - date = WikiDump.prettyDate(raw) |
| 919 | + prettyDate = WikiDump.prettyDate(rawDate) |
798 | 920 | if done: |
799 | 921 | prefix = "" |
800 | 922 | message = "Last dumped on" |
801 | 923 | else: |
802 | 924 | prefix = "This dump is in progress; see also the " |
803 | 925 | message = "previous dump from" |
804 | | - return "%s<a href=\"../%s/\">%s %s</a>" % (prefix, raw, message, date) |
| 926 | + return "%s<a href=\"../%s/\">%s %s</a>" % (prefix, rawDate, message, prettyDate) |
805 | 927 | |
806 | 928 | def reportStatusSummaryLine(self, done=False): |
807 | 929 | if (done == "done"): |
— | — | @@ -925,7 +1047,7 @@ |
926 | 1048 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())} |
927 | 1049 | rssPath = self.dumpDir.latestPath(file + "-rss.xml") |
928 | 1050 | WikiDump.dumpFile(rssPath, rssText) |
929 | | - |
| 1051 | + |
930 | 1052 | class Dump(object): |
931 | 1053 | def __init__(self, name, desc): |
932 | 1054 | self._desc = desc |
— | — | @@ -998,6 +1120,11 @@ |
999 | 1121 | runner.updateStatusFiles() |
1000 | 1122 | runner.dumpItemList.saveDumpRunInfoFile() |
1001 | 1123 | |
| 1124 | + def timeToWait(self): |
| 1125 | + # we use wait this many secs for a command to complete that |
| 1126 | + # doesn't produce output |
| 1127 | + return 5 |
| 1128 | + |
1002 | 1129 | def waitAlarmHandler(self, signum, frame): |
1003 | 1130 | pass |
1004 | 1131 | |
— | — | @@ -1049,15 +1176,14 @@ |
1050 | 1177 | recombine = " ".join(uncompressThisFile) |
1051 | 1178 | headerEndNum = int(headerEndNum) + 1 |
1052 | 1179 | if (chunkNum == 1): |
1053 | | - # skip footer |
| 1180 | + # first file, put header and contents |
1054 | 1181 | recombine = recombine + " | %s -n -1 " % headEsc |
1055 | | - elif (chunkNum == self._chunks): |
1056 | | - # skip header |
| 1182 | + elif (chunkNum == len(files)): |
| 1183 | + # last file, put footer |
1057 | 1184 | recombine = recombine + (" | %s -n +%s" % (tailEsc, headerEndNum)) |
1058 | 1185 | else: |
1059 | | - # skip header |
| 1186 | + # put contents only |
1060 | 1187 | recombine = recombine + (" | %s -n +%s" % (tailEsc, headerEndNum)) |
1061 | | - # skip footer |
1062 | 1188 | recombine = recombine + " | %s -n -1 " % head |
1063 | 1189 | recombines.append(recombine) |
1064 | 1190 | recombineCommandString = "(" + ";".join(recombines) + ")" + "|" + "%s %s" % (compressionCommand, outputFilename) |
— | — | @@ -1105,7 +1231,7 @@ |
1106 | 1232 | A second pass will import text from prior dumps or the database to make |
1107 | 1233 | full files for the public.""" |
1108 | 1234 | |
1109 | | - def __init__(self, name, desc, chunks = None): |
| 1235 | + def __init__(self, name, desc, chunks = False): |
1110 | 1236 | Dump.__init__(self, name, desc) |
1111 | 1237 | self._chunks = chunks |
1112 | 1238 | |
— | — | @@ -1115,7 +1241,7 @@ |
1116 | 1242 | def listFiles(self, runner, unnumbered=False): |
1117 | 1243 | if (self._chunks) and not unnumbered: |
1118 | 1244 | files = [] |
1119 | | - for i in range(1, self._chunks + 1): |
| 1245 | + for i in range(1, len(self._chunks) + 1): |
1120 | 1246 | files.append("stub-meta-history%s.xml.gz" % i) |
1121 | 1247 | files.append("stub-meta-current%s.xml.gz" % i) |
1122 | 1248 | files.append("stub-articles%s.xml.gz" % i) |
— | — | @@ -1149,13 +1275,15 @@ |
1150 | 1276 | if (chunk): |
1151 | 1277 | # set up start end end pageids for this piece |
1152 | 1278 | # note there is no page id 0 I guess. so we start with 1 |
1153 | | - start = runner.pagesPerChunk()*(chunk-1) + 1 |
| 1279 | + # start = runner.pagesPerChunk()*(chunk-1) + 1 |
| 1280 | + start = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1 |
1154 | 1281 | startopt = "--start=%s" % start |
1155 | 1282 | # if we are on the last chunk, we should get up to the last pageid, |
1156 | 1283 | # whatever that is. |
1157 | 1284 | command.append(startopt) |
1158 | | - if chunk < self._chunks: |
1159 | | - end = start + runner.pagesPerChunk() |
| 1285 | + if chunk < len(self._chunks): |
| 1286 | + # end = start + runner.pagesPerChunk() |
| 1287 | + end = sum([ self._chunks[i] for i in range(0,chunk)]) +1 |
1160 | 1288 | endopt = "--end=%s" % end |
1161 | 1289 | command.append(endopt) |
1162 | 1290 | |
— | — | @@ -1166,7 +1294,7 @@ |
1167 | 1295 | def run(self, runner): |
1168 | 1296 | commands = [] |
1169 | 1297 | if self._chunks: |
1170 | | - for i in range(1, self._chunks+1): |
| 1298 | + for i in range(1, len(self._chunks)+1): |
1171 | 1299 | series = self.buildCommand(runner, i) |
1172 | 1300 | commands.append(series) |
1173 | 1301 | else: |
— | — | @@ -1210,16 +1338,22 @@ |
1211 | 1339 | # these commands don't produce any progress bar... so we can at least |
1212 | 1340 | # update the size and last update time of the file once a minute |
1213 | 1341 | signal.signal(signal.SIGALRM, self.waitAlarmHandler) |
1214 | | - signal.alarm(60) |
1215 | | - recombinePipeline._lastProcessInPipe.wait() |
| 1342 | + signal.alarm(self.timeToWait()) |
| 1343 | + try: |
| 1344 | + recombinePipeline._lastProcessInPipe.wait() |
| 1345 | + break |
| 1346 | + except Exception, e: |
| 1347 | + if e.errno == errno.EINTR: |
| 1348 | + pass |
| 1349 | + else: |
| 1350 | + raise |
1216 | 1351 | self.progressCallback(runner) |
1217 | 1352 | signal.alarm(0) |
1218 | | - break |
1219 | 1353 | |
1220 | 1354 | class XmlLogging(Dump): |
1221 | 1355 | """ Create a logging dump of all page activity """ |
1222 | 1356 | |
1223 | | - def __init__(self, desc, chunks = None): |
| 1357 | + def __init__(self, desc, chunks = False): |
1224 | 1358 | Dump.__init__(self, "xmlpagelogsdump", desc) |
1225 | 1359 | self._chunks = chunks |
1226 | 1360 | |
— | — | @@ -1246,7 +1380,7 @@ |
1247 | 1381 | |
1248 | 1382 | class XmlDump(Dump): |
1249 | 1383 | """Primary XML dumps, one section at a time.""" |
1250 | | - def __init__(self, subset, name, desc, detail, prefetch, spawn, chunks = None): |
| 1384 | + def __init__(self, subset, name, desc, detail, prefetch, spawn, chunks = False): |
1251 | 1385 | Dump.__init__(self, name, desc) |
1252 | 1386 | self._subset = subset |
1253 | 1387 | self._detail = detail |
— | — | @@ -1270,7 +1404,7 @@ |
1271 | 1405 | def run(self, runner): |
1272 | 1406 | commands = [] |
1273 | 1407 | if (self._chunks): |
1274 | | - for i in range(1, self._chunks+1): |
| 1408 | + for i in range(1, len(self._chunks)+1): |
1275 | 1409 | series = self.buildCommand(runner, i) |
1276 | 1410 | commands.append(series) |
1277 | 1411 | else: |
— | — | @@ -1460,7 +1594,7 @@ |
1461 | 1595 | def listFiles(self, runner, unnumbered = False): |
1462 | 1596 | if (self._chunks) and not unnumbered: |
1463 | 1597 | files = [] |
1464 | | - for i in range(1, self._chunks+1): |
| 1598 | + for i in range(1, len(self._chunks)+1): |
1465 | 1599 | files.append(self._file("bz2",i)) |
1466 | 1600 | return files |
1467 | 1601 | else: |
— | — | @@ -1470,7 +1604,7 @@ |
1471 | 1605 | return checkpoint == self.__class__.__name__ + "." + self._subset |
1472 | 1606 | |
1473 | 1607 | class RecombineXmlDump(XmlDump): |
1474 | | - def __init__(self, subset, name, desc, detail, chunks = None): |
| 1608 | + def __init__(self, subset, name, desc, detail, chunks = False): |
1475 | 1609 | # no prefetch, no spawn |
1476 | 1610 | XmlDump.__init__(self, subset, name, desc, detail, None, None, chunks) |
1477 | 1611 | # this is here only so that a callback can capture output from some commands |
— | — | @@ -1505,11 +1639,17 @@ |
1506 | 1640 | # these commands don't produce any progress bar... so we can at least |
1507 | 1641 | # update the size and last update time of the file once a minute |
1508 | 1642 | signal.signal(signal.SIGALRM, self.waitAlarmHandler) |
1509 | | - signal.alarm(60) |
1510 | | - recombinePipeline._lastProcessInPipe.wait() |
| 1643 | + signal.alarm(self.timeToWait()) |
| 1644 | + try: |
| 1645 | + recombinePipeline._lastProcessInPipe.wait() |
| 1646 | + break |
| 1647 | + except Exception, e: |
| 1648 | + if e.errno == errno.EINTR: |
| 1649 | + pass |
| 1650 | + else: |
| 1651 | + raise |
1511 | 1652 | self.progressCallback(runner) |
1512 | 1653 | signal.alarm(0) |
1513 | | - break |
1514 | 1654 | |
1515 | 1655 | class BigXmlDump(XmlDump): |
1516 | 1656 | """XML page dump for something larger, where a 7-Zip compressed copy |
— | — | @@ -1522,7 +1662,7 @@ |
1523 | 1663 | class XmlRecompressDump(Dump): |
1524 | 1664 | """Take a .bz2 and recompress it as 7-Zip.""" |
1525 | 1665 | |
1526 | | - def __init__(self, subset, name, desc, detail, chunks = None): |
| 1666 | + def __init__(self, subset, name, desc, detail, chunks = False): |
1527 | 1667 | Dump.__init__(self, name, desc) |
1528 | 1668 | self._subset = subset |
1529 | 1669 | self._detail = detail |
— | — | @@ -1572,7 +1712,7 @@ |
1573 | 1713 | raise BackupError("bz2 dump incomplete, not recompressing") |
1574 | 1714 | commands = [] |
1575 | 1715 | if (self._chunks): |
1576 | | - for i in range(1, self._chunks+1): |
| 1716 | + for i in range(1, len(self._chunks)+1): |
1577 | 1717 | series = self.buildCommand(runner, i) |
1578 | 1718 | commands.append(series) |
1579 | 1719 | else: |
— | — | @@ -1585,7 +1725,7 @@ |
1586 | 1726 | # temp hack force 644 permissions until ubuntu bug # 370618 is fixed - tomasz 5/1/2009 |
1587 | 1727 | # some hacks aren't so temporary - atg 3 sept 2010 |
1588 | 1728 | if (self._chunks): |
1589 | | - for i in range(1, self._chunks+1): |
| 1729 | + for i in range(1, len(self._chunks)+1): |
1590 | 1730 | xml7z = self.getOutputFilename(runner,i) |
1591 | 1731 | if exists(xml7z): |
1592 | 1732 | os.chmod(xml7z, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH ) |
— | — | @@ -1598,7 +1738,7 @@ |
1599 | 1739 | def listFiles(self, runner, unnumbered = False): |
1600 | 1740 | if (self._chunks) and not unnumbered: |
1601 | 1741 | files = [] |
1602 | | - for i in range(1, self._chunks+1): |
| 1742 | + for i in range(1, len(self._chunks)+1): |
1603 | 1743 | files.append(self._file("7z",i)) |
1604 | 1744 | return files |
1605 | 1745 | else: |
— | — | @@ -1645,16 +1785,22 @@ |
1646 | 1786 | # these commands don't produce any progress bar... so we can at least |
1647 | 1787 | # update the size and last update time of the file once a minute |
1648 | 1788 | signal.signal(signal.SIGALRM, self.waitAlarmHandler) |
1649 | | - signal.alarm(60) |
1650 | | - recombinePipeline._lastProcessInPipe.wait() |
| 1789 | + signal.alarm(self.timeToWait()) |
| 1790 | + try: |
| 1791 | + recombinePipeline._lastProcessInPipe.wait() |
| 1792 | + break |
| 1793 | + except Exception, e: |
| 1794 | + if e.errno == errno.EINTR: |
| 1795 | + pass |
| 1796 | + else: |
| 1797 | + raise |
1651 | 1798 | self.progressCallback(runner) |
1652 | 1799 | signal.alarm(0) |
1653 | | - break |
1654 | 1800 | |
1655 | 1801 | class AbstractDump(Dump): |
1656 | 1802 | """XML dump for Yahoo!'s Active Abstracts thingy""" |
1657 | 1803 | |
1658 | | - def __init__(self, name, desc, chunks = None): |
| 1804 | + def __init__(self, name, desc, chunks = False): |
1659 | 1805 | Dump.__init__(self, name, desc) |
1660 | 1806 | self._chunks = chunks |
1661 | 1807 | |
— | — | @@ -1672,13 +1818,15 @@ |
1673 | 1819 | if (chunk): |
1674 | 1820 | # set up start end end pageids for this piece |
1675 | 1821 | # note there is no page id 0 I guess. so we start with 1 |
1676 | | - start = runner.pagesPerChunk()*(chunk-1) + 1 |
| 1822 | + # start = runner.pagesPerChunk()*(chunk-1) + 1 |
| 1823 | + start = sum([ self._chunks[i] for i in range(0,chunk-1)]) + 1 |
1677 | 1824 | startopt = "--start=%s" % start |
1678 | 1825 | # if we are on the last chunk, we should get up to the last pageid, |
1679 | 1826 | # whatever that is. |
1680 | 1827 | command.append(startopt) |
1681 | | - if chunk < self._chunks: |
1682 | | - end = start + runner.pagesPerChunk() |
| 1828 | + if chunk < len(self._chunks): |
| 1829 | + # end = start + runner.pagesPerChunk() |
| 1830 | + end = sum([ self._chunks[i] for i in range(0,chunk)]) +1 |
1683 | 1831 | endopt = "--end=%s" % end |
1684 | 1832 | command.append(endopt) |
1685 | 1833 | pipeline = [ command ] |
— | — | @@ -1688,7 +1836,7 @@ |
1689 | 1837 | def run(self, runner): |
1690 | 1838 | commands = [] |
1691 | 1839 | if (self._chunks): |
1692 | | - for i in range(1, self._chunks+1): |
| 1840 | + for i in range(1, len(self._chunks)+1): |
1693 | 1841 | series = self.buildCommand(runner, i) |
1694 | 1842 | commands.append(series) |
1695 | 1843 | else: |
— | — | @@ -1725,7 +1873,7 @@ |
1726 | 1874 | files = [] |
1727 | 1875 | for x in self._variants(runner): |
1728 | 1876 | if (self._chunks) and not unnumbered: |
1729 | | - for i in range(1, self._chunks+1): |
| 1877 | + for i in range(1, len(self._chunks)+1): |
1730 | 1878 | files.append(self._variantFile(x, i)) |
1731 | 1879 | else: |
1732 | 1880 | files.append(self._variantFile(x)) |
— | — | @@ -1765,11 +1913,17 @@ |
1766 | 1914 | # these commands don't produce any progress bar... so we can at least |
1767 | 1915 | # update the size and last update time of the file once a minute |
1768 | 1916 | signal.signal(signal.SIGALRM, self.waitAlarmHandler) |
1769 | | - signal.alarm(60) |
1770 | | - recombinePipeline._lastProcessInPipe.wait() |
| 1917 | + signal.alarm(self.timeToWait()) |
| 1918 | + try: |
| 1919 | + recombinePipeline._lastProcessInPipe.wait() |
| 1920 | + break |
| 1921 | + except Exception, e: |
| 1922 | + if e.errno == errno.EINTR: |
| 1923 | + pass |
| 1924 | + else: |
| 1925 | + raise |
1771 | 1926 | self.progressCallback(runner) |
1772 | 1927 | signal.alarm(0) |
1773 | | - break |
1774 | 1928 | |
1775 | 1929 | class TitleDump(Dump): |
1776 | 1930 | """This is used by "wikiproxy", a program to add Wikipedia links to BBC news online""" |
Index: branches/ariel/xmldumps-backup/CommandManagement.py |
— | — | @@ -6,6 +6,8 @@ |
7 | 7 | import subprocess |
8 | 8 | import select |
9 | 9 | import signal |
| 10 | +import Queue |
| 11 | +import thread |
10 | 12 | |
11 | 13 | from os.path import dirname, exists, getsize, join, realpath |
12 | 14 | from subprocess import Popen, PIPE |
— | — | @@ -111,7 +113,9 @@ |
112 | 114 | else: |
113 | 115 | stdoutOpt = PIPE |
114 | 116 | |
115 | | - process = Popen( command, stdout=stdoutOpt, stdin=stdinOpt, |
| 117 | + stderrOpt = PIPE |
| 118 | + |
| 119 | + process = Popen( command, stdout=stdoutOpt, stdin=stdinOpt, stderr=stderrOpt, |
116 | 120 | preexec_fn=self.subprocess_setup, shell= self._shell) |
117 | 121 | |
118 | 122 | if (command == self._commands[0]): |
— | — | @@ -142,10 +146,10 @@ |
143 | 147 | # will hang forever in the wait() on them. |
144 | 148 | self._processes.reverse() |
145 | 149 | for p in self._processes: |
146 | | -# print "DEBUG: trying to get return code for %s" % p.pid |
| 150 | + print "DEBUG: trying to get return code for %s" % p.pid |
147 | 151 | self._exitValues.append(p.wait()) |
148 | 152 | retcode = p.poll() |
149 | | -# print "DEBUG: return code %s for %s" % (retcode, p.pid) |
| 153 | + print "DEBUG: return code %s for %s" % (retcode, p.pid) |
150 | 154 | self._exitValues.reverse() |
151 | 155 | self._processes.reverse() |
152 | 156 | if (self.saveFile()): |
— | — | @@ -154,8 +158,8 @@ |
155 | 159 | |
156 | 160 | def isRunning(self): |
157 | 161 | """Check if process is running.""" |
158 | | - if (not self._lastProcessInPipe.poll()): |
159 | | - return(False) |
| 162 | + # Note that poll() returns None if the process |
| 163 | + # is not completed, or some value (may be 0) otherwise |
160 | 164 | if (self._lastProcessInPipe.poll() == None): |
161 | 165 | return(True) |
162 | 166 | else: |
— | — | @@ -220,6 +224,7 @@ |
221 | 225 | if (timeout == None): |
222 | 226 | fdReady = self._poller.poll() |
223 | 227 | else: |
| 228 | + # FIXME so poll doesn't take an arg :-P ...? |
224 | 229 | fdReady = self._poller.poll(timeout) |
225 | 230 | |
226 | 231 | if (fdReady): |
— | — | @@ -240,9 +245,12 @@ |
241 | 246 | # line, let it get written to the caller anyways... |
242 | 247 | # when we poll do we get a byte count of how much is available? no. |
243 | 248 | out = self._lastProcessInPipe.stdout.readline() |
| 249 | + |
| 250 | + # DEBUG |
244 | 251 | # if (out): |
245 | 252 | # sys.stdout.write("DEBUG: got from %s out %s" % (self._lastCommandString, out)) |
246 | 253 | |
| 254 | + |
247 | 255 | signal.alarm(0) |
248 | 256 | return(out) |
249 | 257 | elif self.checkForPollErrors(): |
— | — | @@ -378,90 +386,115 @@ |
379 | 387 | pipelines), as well as a possible callback which is used to capture all output |
380 | 388 | from the various commmand series. If the callback takes an argument other than |
381 | 389 | the line of output, it should be passed in the arg parameter (and it will be passed |
382 | | - to the callback function first before the output line). If no callback is provided and the individual |
383 | | - pipelines are not provided with a file to save output, then output is written |
384 | | - to stderr.""" |
385 | | - def __init__(self, commandSeriesList, callback = None, arg=None, quiet = False, shell = False, forceCallback = False ): |
| 390 | + to the callback function first before the output line). If no callback is provided |
| 391 | + and the individual pipelines are not provided with a file to save output, |
| 392 | + then output is written to stderr.""" |
| 393 | + def __init__(self, commandSeriesList, callback = None, arg=None, quiet = False, shell = False ): |
386 | 394 | self._commandSeriesList = commandSeriesList |
387 | 395 | self._commandSerieses = [] |
388 | 396 | for series in self._commandSeriesList: |
389 | 397 | self._commandSerieses.append( CommandSeries(series, quiet, shell) ) |
390 | | - self._processesToPoll = [] |
391 | | - self._fdToProcess = {} |
392 | | - self._fdToSeries = {} |
| 398 | + # for each command series running in parallel, |
| 399 | + # in cases where a command pipeline in the series generates output, the callback |
| 400 | + # will be called with a line of output from the pipeline as it becomes available |
393 | 401 | self._callback = callback |
394 | 402 | self._arg = arg |
395 | | - self._poller = None |
| 403 | + self._commandSeriesQueue = Queue.Queue() |
| 404 | + |
396 | 405 | # number millisecs we will wait for select.poll() |
397 | 406 | self._defaultPollTime = 500 |
398 | | - # this number of poll cycles gives us 1 minute, use this |
399 | | - # as the amount of time to wait between forced callbacks, if forced is requested |
400 | | - self._defaultPollCycles = 120 |
401 | | - if (forceCallback and not callback): |
402 | | - raise RuntimeError("CommandsInParallel: forceCallback requires that a callback be specified") |
403 | | - self._forceCallback = forceCallback |
| 407 | + |
| 408 | + # for programs that don't generate output, wait this many seconds between |
| 409 | + # invoking callback if there is one |
| 410 | + self._defaultCallbackInterval = 20 |
404 | 411 | |
405 | 412 | def startCommands(self): |
406 | 413 | for series in self._commandSerieses: |
407 | 414 | series.startCommands() |
408 | 415 | |
409 | | - def setupOutputMonitoring(self): |
410 | | - self._poller = select.poll() |
411 | | - for series in self._commandSerieses: |
| 416 | + # one of these as a thread to monitor each command series. |
| 417 | + def seriesMonitor(self, timeout, queue): |
| 418 | + series = queue.get() |
| 419 | + poller = select.poll() |
| 420 | + while series.processProducingOutput(): |
412 | 421 | p = series.processProducingOutput() |
| 422 | + poller.register(p.stderr,select.POLLIN|select.POLLPRI) |
| 423 | + if (p.stdout): |
| 424 | + fdToStream = { p.stdout.fileno(): p.stdout, p.stderr.fileno(): p.stderr } |
| 425 | + else: |
| 426 | + fdToStream = { p.stderr.fileno(): p.stderr } |
413 | 427 | # if we have a savefile, this won't be set. |
414 | 428 | if (p.stdout): |
415 | | - self._processesToPoll.append(p) |
416 | | - self._poller.register(p.stdout,select.POLLIN|select.POLLPRI) |
417 | | - self._fdToProcess['%s' % p.stdout.fileno()] = p |
418 | | - self._fdToSeries['%s' % p.stdout.fileno()] = series |
| 429 | + poller.register(p.stdout,select.POLLIN|select.POLLPRI) |
419 | 430 | |
420 | | - # if there is a savefile don't call this, it's going to get written by itself |
421 | | - def checkForOutput(self): |
422 | | - if len(self._processesToPoll): |
423 | | - fdReady = self._poller.poll(self._defaultPollTime) |
424 | | - if (fdReady): |
425 | | - for (fd,event) in fdReady: |
426 | | - series = self._fdToSeries["%s" % fd] |
427 | | - series.inProgressPipeline().setPollState(event) |
428 | | - if series.inProgressPipeline().checkPollReadyForRead(): |
429 | | - out = self._fdToProcess['%s' % fd].stdout.readline() |
430 | | - if out: |
431 | | - if self._callback: |
432 | | - if (self._arg): |
433 | | - self._callback(self._arg, out) |
| 431 | + commandCompleted = False |
| 432 | + |
| 433 | + while not commandCompleted: |
| 434 | + waiting = poller.poll(self._defaultPollTime) |
| 435 | + if (waiting): |
| 436 | + for (fd,event) in waiting: |
| 437 | + series.inProgressPipeline().setPollState(event) |
| 438 | + if series.inProgressPipeline().checkPollReadyForRead(): |
| 439 | + |
| 440 | + # so what happens if we get more than one line of output |
| 441 | + # in the poll interval? it will sit there waiting... |
| 442 | + # could have accumulation. FIXME. for our purposes we want |
| 443 | + # one line only, the latest. but for other uses of this |
| 444 | + # module? really we should read whatever is available only, |
| 445 | + # pass it to callback, let callback handle multiple lines, |
| 446 | + # partial lines etc. |
| 447 | + # out = p.stdout.readline() |
| 448 | + out = fdToStream[fd].readline() |
| 449 | + if out: |
| 450 | + if self._callback: |
| 451 | + if (self._arg): |
| 452 | + self._callback(self._arg, out) |
| 453 | + else: |
| 454 | + self._callback(out) |
| 455 | + else: |
| 456 | + # fixme this behavior is different, do we want it? |
| 457 | + sys.stderr.write(out) |
434 | 458 | else: |
435 | | - self._callback(out) |
436 | | - else: |
437 | | - # fixme this behavior is different, do we want it? |
438 | | - sys.stderr.write(out) |
| 459 | + # possible eof? (empty string from readline) |
| 460 | + pass |
| 461 | + elif series.inProgressPipeline().checkForPollErrors(): |
| 462 | + poller.unregister(fd) |
| 463 | + p.wait() |
| 464 | + # FIXME put the returncode someplace? |
| 465 | + print "returned from %s with %s" % (p.pid, p.returncode) |
| 466 | + commandCompleted = True |
| 467 | + if commandCompleted: |
| 468 | + break |
| 469 | + |
| 470 | + |
| 471 | + # run next command in series, if any |
| 472 | + series.continueCommands() |
| 473 | + |
| 474 | + else: |
| 475 | + # no output from this process, just wait for it and do callback if there is one |
| 476 | + waited = 0 |
| 477 | + while p.poll() == None: |
| 478 | + if waited > self._defaultCallbackInterval and self._callback: |
| 479 | + if (self._arg): |
| 480 | + self._callback(self._arg) |
439 | 481 | else: |
440 | | - # possible eof? (empty string from readline) |
441 | | - pass |
442 | | - elif series.inProgressPipeline().checkForPollErrors(): |
443 | | - self._poller.unregister(fd) |
444 | | - self._processesToPoll.remove(self._fdToProcess['%s' % fd]) |
445 | | - else: |
446 | | - pass |
447 | | - else: |
448 | | - # suppose there is nothing left to poll. |
449 | | - # but whatever state we have left around then is... False (= we are trying to get state from None obj)? |
450 | | - # then we will never know we are done? ugh |
451 | | - pass |
| 482 | + self._callback() |
| 483 | + waited = 0 |
| 484 | + time.sleep(1) |
| 485 | + waited = waited + 1 |
452 | 486 | |
453 | | - def continueCommands(self): |
454 | | - """Start new commands if needed, updating the poller if there are changes""" |
| 487 | + print "returned from %s with %s" % (p.pid, p.returncode) |
| 488 | + series.continueCommands() |
| 489 | + |
| 490 | + # completed the whole series. time to go home. |
| 491 | + queue.task_done() |
| 492 | + |
| 493 | + |
| 494 | + def setupOutputMonitoring(self): |
455 | 495 | for series in self._commandSerieses: |
456 | | - oldInProgress = series.inProgressPipeline() |
457 | | - series.continueCommands() |
458 | | - if series.inProgressPipeline() != oldInProgress: |
459 | | - if (series.processProducingOutput()): |
460 | | - # Note that we don't remove the old process, that will happen when we check for output. |
461 | | - self._poller.register(series.processProducingOutput().stdout,select.POLLIN|select.POLLPRI) |
462 | | - self._processesToPoll.append(series.processProducingOutput()) |
463 | | - self._fdToProcess['%s' % series.processProducingOutput().stdout.fileno()] = series.processProducingOutput() |
464 | | - self._fdToSeries['%s' % series.processProducingOutput().stdout.fileno()] = series |
465 | | - |
| 496 | + self._commandSeriesQueue.put(series) |
| 497 | + thread.start_new_thread(self.seriesMonitor, (500, self._commandSeriesQueue)) |
| 498 | + |
466 | 499 | def allCommandsCompleted(self): |
467 | 500 | """Check if all series have run to completion.""" |
468 | 501 | for series in self._commandSerieses: |
— | — | @@ -486,19 +519,17 @@ |
487 | 520 | def runCommands(self): |
488 | 521 | self.startCommands() |
489 | 522 | self.setupOutputMonitoring() |
490 | | - done = False |
491 | | - while not done: |
492 | | - for i in range(self._defaultPollCycles): |
493 | | - self.checkForOutput() |
494 | | - self.continueCommands() |
495 | | - if (self.allCommandsCompleted() and not len(self._processesToPoll)): |
496 | | - done = True |
497 | | - break |
498 | | - if (self._forceCallback): |
499 | | - # forced call to callback every so often |
500 | | - self.progressCallback(runner, "") |
| 523 | + self._commandSeriesQueue.join() |
501 | 524 | |
502 | 525 | |
| 526 | +def testcallback(output = None): |
| 527 | + outputFile = open("/home/ariel/src/mediawiki/testing/outputsaved.txt","a") |
| 528 | + if (output == None): |
| 529 | + outputFile.write( "no output for me.\n" ) |
| 530 | + else: |
| 531 | + outputFile.write(output) |
| 532 | + outputFile.close() |
| 533 | + |
503 | 534 | if __name__ == "__main__": |
504 | 535 | command1 = [ "/usr/bin/vmstat", "1", "10" ] |
505 | 536 | command2 = [ "/usr/sbin/lnstat", "-i", "7", "-c", "5", "-k", "arp_cache:entries,rt_cache:in_hit,arp_cache:destroys", ">", "/home/ariel/src/mediawiki/testing/savelnstat.txt" ] |
— | — | @@ -520,7 +551,7 @@ |
521 | 552 | series4 = [ pipeline5 ] |
522 | 553 | series5 = [ pipeline6 ] |
523 | 554 | parallel = [ series1, series2, series3, series4, series5 ] |
524 | | - commands = CommandsInParallel(parallel) |
| 555 | + commands = CommandsInParallel(parallel, callback=testcallback) |
525 | 556 | commands.runCommands() |
526 | 557 | if commands.exitedSuccessfully(): |
527 | 558 | print "w00t!" |