Index: trunk/phase3/maintenance/storage/compressOld.php |
— | — | @@ -18,8 +18,6 @@ |
19 | 19 | * -b <begin-date> earliest date to check for uncompressed revisions |
20 | 20 | * -e <end-date> latest revision date to compress |
21 | 21 | * -s <start-id> the old_id to start from |
22 | | - * -f <max-factor> the maximum ratio of compressed chunk bytes to uncompressed avg. revision bytes |
23 | | - * -h <threshold> is a minimum number of KB, where <max-factor> cuts in |
24 | 22 | * --extdb <cluster> store specified revisions in an external cluster (untested) |
25 | 23 | * |
26 | 24 | * @file |
— | — | @@ -40,8 +38,6 @@ |
41 | 39 | 't' => 'concat', |
42 | 40 | 'c' => 20, |
43 | 41 | 's' => 0, |
44 | | - 'f' => 5, |
45 | | - 'h' => 100, |
46 | 42 | 'b' => '', |
47 | 43 | 'e' => '', |
48 | 44 | 'extdb' => '', |
— | — | @@ -62,7 +58,7 @@ |
63 | 59 | |
64 | 60 | $success = true; |
65 | 61 | if ( $options['t'] == 'concat' ) { |
66 | | - $success = compressWithConcat( $options['s'], $options['c'], $options['f'], $options['h'], $options['b'], |
| 62 | + $success = compressWithConcat( $options['s'], $options['c'], $options['b'], |
67 | 63 | $options['e'], $options['extdb'], $options['endid'] ); |
68 | 64 | } else { |
69 | 65 | compressOldPages( $options['s'], $options['extdb'] ); |
Index: trunk/phase3/maintenance/storage/compressOld.inc |
— | — | @@ -66,7 +66,7 @@ |
67 | 67 | define( 'LS_CHUNKED', 1 ); |
68 | 68 | |
69 | 69 | /** @todo document */ |
70 | | -function compressWithConcat( $startId, $maxChunkSize, $maxChunkFactor, $factorThreshold, $beginDate, |
| 70 | +function compressWithConcat( $startId, $maxChunkSize, $beginDate, |
71 | 71 | $endDate, $extdb="", $maxPageId = false ) |
72 | 72 | { |
73 | 73 | $fname = 'compressWithConcat'; |
— | — | @@ -194,7 +194,7 @@ |
195 | 195 | $primaryOldid = $revs[$i]->rev_text_id; |
196 | 196 | |
197 | 197 | # Get the text of each revision and add it to the object |
198 | | - for ( $j = 0; $j < $thisChunkSize && $chunk->isHappy( $maxChunkFactor, $factorThreshold ); $j++ ) { |
| 198 | + for ( $j = 0; $j < $thisChunkSize && $chunk->isHappy(); $j++ ) { |
199 | 199 | $oldid = $revs[$i + $j]->rev_text_id; |
200 | 200 | |
201 | 201 | # Get text |
Index: trunk/phase3/maintenance/storage/recompressTracked.php |
— | — | @@ -0,0 +1,624 @@ |
| 2 | +<?php |
| 3 | + |
| 4 | +$optionsWithArgs = RecompressTracked::getOptionsWithArgs(); |
| 5 | +require( dirname( __FILE__ ) .'/../commandLine.inc' ); |
| 6 | + |
| 7 | +if ( count( $args ) < 1 ) { |
| 8 | + echo "Usage: php recompressTracked.php <cluster> [... <cluster>...]\n"; |
| 9 | + echo "Moves blobs indexed by trackBlobs.php to a specified list of destination |
| 10 | +clusters, and recompresses them in the process. Restartable.\n"; |
| 11 | + exit( 1 ); |
| 12 | +} |
| 13 | + |
| 14 | +$job = RecompressTracked::newFromCommandLine( $args, $options ); |
| 15 | +$job->execute(); |
| 16 | + |
| 17 | +class RecompressTracked { |
| 18 | + var $destClusters; |
| 19 | + var $batchSize = 1000; |
| 20 | + var $reportingInterval = 10; |
| 21 | + var $numProcs = 8; |
| 22 | + var $slavePipes, $slaveProcs, $prevSlaveId; |
| 23 | + var $blobClass = 'DiffHistoryBlob'; |
| 24 | + var $copyOnly = false; |
| 25 | + var $isChild = false; |
| 26 | + var $slaveId = false; |
| 27 | + var $store; |
| 28 | + |
| 29 | + static $optionsWithArgs = array( 'procs', 'class' ); |
| 30 | + static $cmdLineOptionMap = array( |
| 31 | + 'procs' => 'numProcs', |
| 32 | + 'class' => 'blobClass', |
| 33 | + 'copy-only' => 'copyOnly', |
| 34 | + 'child' => 'isChild', |
| 35 | + 'slave-id' => 'slaveId', |
| 36 | + ); |
| 37 | + |
| 38 | + static function getOptionsWithArgs() { |
| 39 | + return self::$optionsWithArgs; |
| 40 | + } |
| 41 | + |
| 42 | + static function newFromCommandLine( $args, $options ) { |
| 43 | + $jobOptions = array( 'destClusters' => $args ); |
| 44 | + foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { |
| 45 | + if ( isset( $options[$cmdOption] ) ) { |
| 46 | + $jobOptions[$classOption] = $options[$cmdOption]; |
| 47 | + } |
| 48 | + } |
| 49 | + return new self( $jobOptions ); |
| 50 | + } |
| 51 | + |
| 52 | + function __construct( $options ) { |
| 53 | + foreach ( $options as $name => $value ) { |
| 54 | + $this->$name = $value; |
| 55 | + } |
| 56 | + $this->store = new ExternalStoreDB; |
| 57 | + } |
| 58 | + |
| 59 | + function debug( $msg ) { |
| 60 | + if ( $this->slaveId !== false ) { |
| 61 | + $msg = "{$this->slaveId}: $msg"; |
| 62 | + } |
| 63 | + $msg .= "\n"; |
| 64 | + wfDebug( $msg ); |
| 65 | + } |
| 66 | + |
| 67 | + /** |
| 68 | + * Wait until the selected slave has caught up to the master. |
| 69 | + * This allows us to use the slave for things that were committed in a |
| 70 | + * previous part of this batch process. |
| 71 | + */ |
| 72 | + function syncDBs() { |
| 73 | + $dbw = wfGetDB( DB_MASTER ); |
| 74 | + $dbr = wfGetDB( DB_SLAVE ); |
| 75 | + $pos = $dbw->getMasterPos(); |
| 76 | + $dbr->masterPosWait( $pos, 100000 ); |
| 77 | + } |
| 78 | + |
| 79 | + /** |
| 80 | + * Execute parent or child depending on the isChild option |
| 81 | + */ |
| 82 | + function execute() { |
| 83 | + if ( $this->isChild ) { |
| 84 | + $this->executeChild(); |
| 85 | + } else { |
| 86 | + $this->executeParent(); |
| 87 | + } |
| 88 | + } |
| 89 | + |
| 90 | + /** |
| 91 | + * Execute the parent process |
| 92 | + */ |
| 93 | + function executeParent() { |
| 94 | + if ( !$this->checkTrackingTable() ) { |
| 95 | + return; |
| 96 | + } |
| 97 | + |
| 98 | + $this->syncDBs(); |
| 99 | + $this->startSlaveProcs(); |
| 100 | + $this->doAllPages(); |
| 101 | + $this->doAllOrphans(); |
| 102 | + $this->killSlaveProcs(); |
| 103 | + } |
| 104 | + |
| 105 | + /** |
| 106 | + * Make sure the tracking table exists and isn't empty |
| 107 | + */ |
| 108 | + function checkTrackingTable() { |
| 109 | + $dbr = wfGetDB( DB_SLAVE ); |
| 110 | + if ( !$dbr->tableExists( 'blob_tracking' ) ) { |
| 111 | + echo "Error: blob_tracking table does not exist\n"; |
| 112 | + return false; |
| 113 | + } |
| 114 | + $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ ); |
| 115 | + if ( !$row ) { |
| 116 | + echo "Warning: blob_tracking table contains no rows, skipping this wiki.\n"; |
| 117 | + return false; |
| 118 | + } |
| 119 | + return true; |
| 120 | + } |
| 121 | + |
| 122 | + /** |
| 123 | + * Start the worker processes. |
| 124 | + * These processes will listen on stdin for commands. |
| 125 | + * This necessary because text recompression is slow: loading, compressing and |
| 126 | + * writing are all slow. |
| 127 | + */ |
| 128 | + function startSlaveProcs() { |
| 129 | + $cmd = 'php ' . wfEscapeShellArg( __FILE__ ); |
| 130 | + foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { |
| 131 | + if ( in_array( $cmdOption, self::$optionsWithArgs ) ) { |
| 132 | + $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption ); |
| 133 | + } elseif ( $this->$classOption ) { |
| 134 | + $cmd .= " --$cmdOption"; |
| 135 | + } |
| 136 | + } |
| 137 | + $cmd .= ' --child' . |
| 138 | + ' --wiki ' . wfEscapeShellArg( wfWikiID() ) . |
| 139 | + ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters ); |
| 140 | + |
| 141 | + $this->slavePipes = $this->slaveProcs = array(); |
| 142 | + for ( $i = 0; $i < $this->numProcs; $i++ ) { |
| 143 | + $pipes = false; |
| 144 | + $spec = array( |
| 145 | + array( 'pipe', 'r' ), |
| 146 | + array( 'file', '/dev/stdout', 'w' ), |
| 147 | + array( 'file', '/dev/stderr', 'w' ) |
| 148 | + ); |
| 149 | + wfSuppressWarnings(); |
| 150 | + $proc = proc_open( $cmd, $spec, $pipes ); |
| 151 | + wfRestoreWarnings(); |
| 152 | + if ( !$proc ) { |
| 153 | + echo "Error opening slave process\n"; |
| 154 | + exit( 1 ); |
| 155 | + } |
| 156 | + $this->slaveProcs[$i] = $proc; |
| 157 | + $this->slavePipes[$i] = $pipes[0]; |
| 158 | + } |
| 159 | + $this->prevSlaveId = -1; |
| 160 | + } |
| 161 | + |
| 162 | + /** |
| 163 | + * Gracefully terminate the child processes |
| 164 | + */ |
| 165 | + function killSlaveProcs() { |
| 166 | + for ( $i = 0; $i < $this->numProcs; $i++ ) { |
| 167 | + $this->dispatchToSlave( $i, 'quit' ); |
| 168 | + } |
| 169 | + for ( $i = 0; $i < $this->numProcs; $i++ ) { |
| 170 | + proc_close( $this->slaveProcs[$i] ); |
| 171 | + } |
| 172 | + } |
| 173 | + |
| 174 | + /** |
| 175 | + * Dispatch a command to the next available slave. |
| 176 | + * This may block until a slave finishes its work and becomes available. |
| 177 | + */ |
| 178 | + function dispatch( /*...*/ ) { |
| 179 | + $args = func_get_args(); |
| 180 | + $pipes = $this->slavePipes; |
| 181 | + $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 ); |
| 182 | + if ( !$numPipes ) { |
| 183 | + echo "Error waiting to write to slaves. Aborting\n"; |
| 184 | + exit( 1 ); |
| 185 | + } |
| 186 | + for ( $i = 0; $i < $this->numProcs; $i++ ) { |
| 187 | + $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs; |
| 188 | + if ( isset( $pipes[$slaveId] ) ) { |
| 189 | + $this->prevSlaveId = $slaveId; |
| 190 | + $this->dispatchToSlave( $slaveId, $args ); |
| 191 | + return; |
| 192 | + } |
| 193 | + } |
| 194 | + echo "Unreachable\n"; |
| 195 | + exit( 1 ); |
| 196 | + } |
| 197 | + |
| 198 | + /** |
| 199 | + * Dispatch a command to a specified slave |
| 200 | + */ |
| 201 | + function dispatchToSlave( $slaveId, $args ) { |
| 202 | + $args = (array)$args; |
| 203 | + $cmd = implode( ' ', $args ); |
| 204 | + fwrite( $this->slavePipes[$slaveId], "$cmd\n" ); |
| 205 | + } |
| 206 | + |
| 207 | + /** |
| 208 | + * Move all tracked pages to the new clusters |
| 209 | + */ |
| 210 | + function doAllPages() { |
| 211 | + $dbr = wfGetDB( DB_SLAVE ); |
| 212 | + $startId = 0; |
| 213 | + $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_page)', |
| 214 | + # A condition is required so that this query uses the index |
| 215 | + array( 'bt_moved' => 0 ), |
| 216 | + __METHOD__ ); |
| 217 | + echo "Moving pages...\n"; |
| 218 | + while ( true ) { |
| 219 | + $res = $dbr->select( 'blob_tracking', |
| 220 | + array( 'bt_page' ), |
| 221 | + array( |
| 222 | + 'bt_moved' => 0, |
| 223 | + 'bt_page > ' . $dbr->addQuotes( $startId ) |
| 224 | + ), |
| 225 | + __METHOD__, |
| 226 | + array( |
| 227 | + 'DISTINCT', |
| 228 | + 'ORDER BY' => 'bt_page', |
| 229 | + 'LIMIT' => $this->batchSize, |
| 230 | + ) |
| 231 | + ); |
| 232 | + if ( !$res->numRows() ) { |
| 233 | + break; |
| 234 | + } |
| 235 | + foreach ( $res as $row ) { |
| 236 | + $this->dispatch( 'doPage', $row->bt_page ); |
| 237 | + } |
| 238 | + $startId = $row->bt_page; |
| 239 | + $this->report( $startId, $endId ); |
| 240 | + } |
| 241 | + echo "Done moving pages.\n"; |
| 242 | + } |
| 243 | + |
| 244 | + /** |
| 245 | + * Display a progress report |
| 246 | + */ |
| 247 | + function report( $start, $end ) { |
| 248 | + $this->numBatches++; |
| 249 | + if ( $this->numBatches >= $this->reportingInterval ) { |
| 250 | + $this->numBatches = 0; |
| 251 | + echo "$start / $end\n"; |
| 252 | + wfWaitForSlaves( 5 ); |
| 253 | + } |
| 254 | + } |
| 255 | + |
| 256 | + /** |
| 257 | + * Move all orphan text to the new clusters |
| 258 | + */ |
| 259 | + function doAllOrphans() { |
| 260 | + $dbr = wfGetDB( DB_SLAVE ); |
| 261 | + $startId = 0; |
| 262 | + $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_text_id)', |
| 263 | + array( 'bt_moved' => 0, 'bt_page' => 0 ), |
| 264 | + __METHOD__ ); |
| 265 | + if ( !$endId ) { |
| 266 | + return; |
| 267 | + } |
| 268 | + echo "Moving orphans...\n"; |
| 269 | + |
| 270 | + while ( true ) { |
| 271 | + $res = $dbr->select( 'blob_tracking', |
| 272 | + array( 'bt_text_id' ), |
| 273 | + array( |
| 274 | + 'bt_moved' => 0, |
| 275 | + 'bt_page' => 0, |
| 276 | + 'bt_text_id > ' . $dbr->addQuotes( $startId ) |
| 277 | + ), |
| 278 | + __METHOD__, |
| 279 | + array( |
| 280 | + 'DISTINCT', |
| 281 | + 'ORDER BY' => 'bt_text_id', |
| 282 | + 'LIMIT' => $this->batchSize |
| 283 | + ) |
| 284 | + ); |
| 285 | + if ( !$res->numRows() ) { |
| 286 | + break; |
| 287 | + } |
| 288 | + $args = array( 'doOrphanList' ); |
| 289 | + foreach ( $res as $row ) { |
| 290 | + $args[] = $row->bt_text_id; |
| 291 | + } |
| 292 | + call_user_func_array( array( $this, 'dispatch' ), $args ); |
| 293 | + $startId = $row->bt_text_id; |
| 294 | + $this->report( $startId, $endId ); |
| 295 | + } |
| 296 | + echo "Done moving orphans.\n"; |
| 297 | + } |
| 298 | + |
| 299 | + /** |
| 300 | + * Main entry point for worker processes |
| 301 | + */ |
| 302 | + function executeChild() { |
| 303 | + $this->syncDBs(); |
| 304 | + |
| 305 | + while ( !feof( STDIN ) ) { |
| 306 | + $line = rtrim( fgets( STDIN ) ); |
| 307 | + if ( $line == '' ) { |
| 308 | + continue; |
| 309 | + } |
| 310 | + $args = explode( ' ', $line ); |
| 311 | + $cmd = array_shift( $args ); |
| 312 | + switch ( $cmd ) { |
| 313 | + case 'doPage': |
| 314 | + $this->doPage( intval( $args[0] ) ); |
| 315 | + break; |
| 316 | + case 'doOrphanList': |
| 317 | + $this->doOrphanList( array_map( 'intval', $args ) ); |
| 318 | + break; |
| 319 | + case 'quit': |
| 320 | + return; |
| 321 | + } |
| 322 | + } |
| 323 | + } |
| 324 | + |
| 325 | + /** |
| 326 | + * Move tracked text in a given page |
| 327 | + */ |
| 328 | + function doPage( $pageId ) { |
| 329 | + $dbr = wfGetDB( DB_SLAVE ); |
| 330 | + |
| 331 | + // Finish any incomplete transactions |
| 332 | + if ( !$this->copyOnly ) { |
| 333 | + $this->finishIncompleteMoves(); |
| 334 | + } |
| 335 | + |
| 336 | + $startId = 0; |
| 337 | + $trx = new CgzCopyTransaction( $this ); |
| 338 | + |
| 339 | + while ( true ) { |
| 340 | + $res = $dbr->select( |
| 341 | + array( 'blob_tracking', 'text' ), |
| 342 | + '*', |
| 343 | + array( |
| 344 | + 'bt_page' => $pageId, |
| 345 | + 'bt_text_id > ' . $dbr->addQuotes( $startId ), |
| 346 | + 'bt_moved' => 0, |
| 347 | + 'bt_new_url' => '', |
| 348 | + 'bt_text_id=old_id', |
| 349 | + ), |
| 350 | + __METHOD__, |
| 351 | + array( |
| 352 | + 'ORDER BY' => 'bt_text_id', |
| 353 | + 'LIMIT' => $this->batchSize |
| 354 | + ) |
| 355 | + ); |
| 356 | + if ( !$res->numRows() ) { |
| 357 | + break; |
| 358 | + } |
| 359 | + |
| 360 | + $lastTextId = 0; |
| 361 | + foreach ( $res as $row ) { |
| 362 | + if ( $lastTextId == $row->bt_text_id ) { |
| 363 | + // Duplicate (null edit) |
| 364 | + continue; |
| 365 | + } |
| 366 | + $lastTextId = $row->bt_text_id; |
| 367 | + // Load the text |
| 368 | + $text = Revision::getRevisionText( $row ); |
| 369 | + if ( $text === false ) { |
| 370 | + echo "Error loading {$row->bt_rev_id}/{$row->bt_text_id}\n"; |
| 371 | + continue; |
| 372 | + } |
| 373 | + |
| 374 | + // Queue it |
| 375 | + if ( !$trx->addItem( $text, $row->bt_text_id ) ) { |
| 376 | + $trx->commit(); |
| 377 | + $trx = new CgzCopyTransaction( $this ); |
| 378 | + } |
| 379 | + } |
| 380 | + $startId = $row->bt_text_id; |
| 381 | + } |
| 382 | + $trx->commit(); |
| 383 | + } |
| 384 | + |
| 385 | + /** |
| 386 | + * Atomic move operation. |
| 387 | + * |
| 388 | + * Write the new URL to the text table and set the bt_moved flag. |
| 389 | + * |
| 390 | + * This is done in a single transaction to provide restartable behaviour |
| 391 | + * without data loss. |
| 392 | + * |
| 393 | + * The transaction is kept short to reduce locking. |
| 394 | + */ |
| 395 | + function moveTextRow( $textId, $url ) { |
| 396 | + $dbw = wfGetDB( DB_MASTER ); |
| 397 | + $dbw->begin(); |
| 398 | + $dbw->update( 'text', |
| 399 | + array( // set |
| 400 | + 'old_text' => $url, |
| 401 | + 'old_flags' => 'external,utf8', |
| 402 | + ), |
| 403 | + array( // where |
| 404 | + 'old_id' => $textId |
| 405 | + ), |
| 406 | + __METHOD__ |
| 407 | + ); |
| 408 | + $dbw->update( 'blob_tracking', |
| 409 | + array( 'bt_moved' => 1 ), |
| 410 | + array( 'bt_text_id' => $textId ), |
| 411 | + __METHOD__ |
| 412 | + ); |
| 413 | + $dbw->commit(); |
| 414 | + } |
| 415 | + |
| 416 | + /** |
| 417 | + * Moves are done in two phases: bt_new_url and then bt_moved. |
| 418 | + * - bt_new_url indicates that the text has been copied to the new cluster. |
| 419 | + * - bt_moved indicates that the text table has been updated. |
| 420 | + * |
| 421 | + * This function completes any moves that only have done bt_new_url. This |
| 422 | + * can happen when the script is interrupted, or when --copy-only is used. |
| 423 | + */ |
| 424 | + function finishIncompleteMoves() { |
| 425 | + $dbr = wfGetDB( DB_SLAVE ); |
| 426 | + |
| 427 | + $startId = 0; |
| 428 | + while ( true ) { |
| 429 | + $res = $dbr->select( 'blob_tracking', |
| 430 | + '*', |
| 431 | + array( |
| 432 | + 'bt_text_id > ' . $dbr->addQuotes( $startId ), |
| 433 | + 'bt_moved' => 0, |
| 434 | + "bt_new_url <> ''", |
| 435 | + ), |
| 436 | + __METHOD__, |
| 437 | + array( |
| 438 | + 'ORDER BY' => 'bt_text_id', |
| 439 | + 'LIMIT' => $this->batchSize, |
| 440 | + ) |
| 441 | + ); |
| 442 | + if ( !$res->numRows() ) { |
| 443 | + break; |
| 444 | + } |
| 445 | + foreach ( $res as $row ) { |
| 446 | + $this->moveTextRow( $row->bt_text_id, $row->bt_new_url ); |
| 447 | + } |
| 448 | + $startId = $row->bt_text_id; |
| 449 | + } |
| 450 | + } |
| 451 | + |
| 452 | + /** |
| 453 | + * Returns the name of the next target cluster |
| 454 | + */ |
| 455 | + function getTargetCluster() { |
| 456 | + $cluster = next( $this->destClusters ); |
| 457 | + if ( $cluster === false ) { |
| 458 | + $cluster = reset( $this->destClusters ); |
| 459 | + } |
| 460 | + return $cluster; |
| 461 | + } |
| 462 | + |
| 463 | + /** |
| 464 | + * Gets a DB master connection for the given external cluster name |
| 465 | + */ |
| 466 | + function getExtDB( $cluster ) { |
| 467 | + $lb = wfGetLBFactory()->getExternalLB( $cluster ); |
| 468 | + return $lb->getConnection( DB_MASTER ); |
| 469 | + } |
| 470 | + |
| 471 | + /** |
| 472 | + * Move an orphan text_id to the new cluster |
| 473 | + */ |
| 474 | + function doOrphanList( $textIds ) { |
| 475 | + $trx = new CgzCopyTransaction( $this ); |
| 476 | + foreach ( $textIds as $textId ) { |
| 477 | + $row = wfGetDB( DB_SLAVE )->selectRow( 'text', array( 'old_text', 'old_flags' ), |
| 478 | + array( 'old_id' => $textId ), __METHOD__ ); |
| 479 | + $text = Revision::getRevisionText( $row ); |
| 480 | + if ( $text === false ) { |
| 481 | + echo "Error: cannot load revision text for $textId\n"; |
| 482 | + continue; |
| 483 | + } |
| 484 | + |
| 485 | + if ( !$trx->addItem( $text, $textId ) ) { |
| 486 | + $trx->commit(); |
| 487 | + $trx = new CgzCopyTransaction( $this ); |
| 488 | + } |
| 489 | + } |
| 490 | + } |
| 491 | +} |
| 492 | + |
| 493 | +/** |
| 494 | + * Class to represent a recompression operation for a single CGZ blob |
| 495 | + */ |
| 496 | +class CgzCopyTransaction { |
| 497 | + var $blobClass; |
| 498 | + var $cgz; |
| 499 | + var $referrers; |
| 500 | + |
| 501 | + /** |
| 502 | + * Create a transaction from a RecompressTracked object |
| 503 | + */ |
| 504 | + function __construct( $parent ) { |
| 505 | + $this->blobClass = $parent->blobClass; |
| 506 | + $this->cgz = false; |
| 507 | + $this->texts = array(); |
| 508 | + } |
| 509 | + |
| 510 | + /** |
| 511 | + * Add text. |
| 512 | + * Returns false if it's ready to commit. |
| 513 | + */ |
| 514 | + function addItem( $text, $textId ) { |
| 515 | + if ( !$this->cgz ) { |
| 516 | + $class = $this->blobClass; |
| 517 | + $this->cgz = new $class; |
| 518 | + } |
| 519 | + $hash = $this->cgz->addItem( $text ); |
| 520 | + $this->referrers[$textId] = $hash; |
| 521 | + $this->texts[$textId] = $text; |
| 522 | + return $this->cgz->isHappy(); |
| 523 | + } |
| 524 | + |
| 525 | + /** |
| 526 | + * Recompress text after some aberrant modification |
| 527 | + */ |
| 528 | + function recompress() { |
| 529 | + $class = $this->blobClass; |
| 530 | + $this->cgz = new $class; |
| 531 | + $this->referrers = array(); |
| 532 | + foreach ( $this->texts as $textId => $text ) { |
| 533 | + $hash = $this->cgz->addItem( $text ); |
| 534 | + $this->referrers[$textId] = $hash; |
| 535 | + } |
| 536 | + } |
| 537 | + |
| 538 | + /** |
| 539 | + * Commit the blob. |
| 540 | + * Does nothing if no text items have been added. |
| 541 | + * May skip the move if --copy-only is set. |
| 542 | + */ |
| 543 | + function commit() { |
| 544 | + $originalCount = count( $this->texts ); |
| 545 | + if ( !$originalCount ) { |
| 546 | + return; |
| 547 | + } |
| 548 | + |
| 549 | + // Check to see if the target text_ids have been moved already. |
| 550 | + // |
| 551 | + // We originally read from the slave, so this can happen when a single |
| 552 | + // text_id is shared between multiple pages. It's rare, but possible |
| 553 | + // if a delete/move/undelete cycle splits up a null edit. |
| 554 | + // |
| 555 | + // We do a locking read to prevent closer-run race conditions. |
| 556 | + $dbw = wfGetDB( DB_MASTER ); |
| 557 | + $dbw->begin(); |
| 558 | + $dirty = false; |
| 559 | + foreach ( $this->referrers as $textId => $hash ) { |
| 560 | + $moved = $dbw->selectField( 'blob_tracking', 'bt_moved', |
| 561 | + array( 'bt_text_id' => $textId ), |
| 562 | + __METHOD__, |
| 563 | + array( 'FOR UPDATE' ) |
| 564 | + ); |
| 565 | + if ( !$moved ) { |
| 566 | + # This row has already been moved, remove it |
| 567 | + unset( $this->texts[$textId] ); |
| 568 | + $dirty = true; |
| 569 | + } |
| 570 | + } |
| 571 | + |
| 572 | + // Recompress the blob if necessary |
| 573 | + if ( $dirty ) { |
| 574 | + if ( !count( $this->texts ) ) { |
| 575 | + // All have been moved already |
| 576 | + if ( $originalCount > 1 ) { |
| 577 | + // This is suspcious, make noise |
| 578 | + echo "Warning: concurrent operation detected, are there two conflicting\n" . |
| 579 | + "processes running, doing the same job?\n"; |
| 580 | + } |
| 581 | + return; |
| 582 | + } |
| 583 | + $this->recompress(); |
| 584 | + } |
| 585 | + |
| 586 | + // Insert the data into the destination cluster |
| 587 | + $targetCluster = $this->parent->getTargetCluster(); |
| 588 | + $store = $this->parent->store; |
| 589 | + $targetDB = $store->getMaster( $targetCluster ); |
| 590 | + $targetDB->clearFlag( DBO_TRX ); // we manage the transactions |
| 591 | + $targetDB->begin(); |
| 592 | + $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) ); |
| 593 | + |
| 594 | + // Write the new URLs to the blob_tracking table |
| 595 | + foreach ( $this->referrers as $textId => $hash ) { |
| 596 | + $url = $baseUrl . '/' . $hash; |
| 597 | + $dbw->update( 'blob_tracking', |
| 598 | + array( 'bt_new_url' => $url ), |
| 599 | + array( |
| 600 | + 'bt_text_id' => $textId, |
| 601 | + 'bt_moved' => 0, # Check for concurrent conflicting update |
| 602 | + ), |
| 603 | + __METHOD__ |
| 604 | + ); |
| 605 | + } |
| 606 | + |
| 607 | + $targetDB->commit(); |
| 608 | + // Critical section here: interruption at this point causes blob duplication |
| 609 | + // Reversing the order of the commits would cause data loss instead |
| 610 | + $dbw->commit(); |
| 611 | + |
| 612 | + // Write the new URLs to the text table and set the moved flag |
| 613 | + if ( !$this->parent->copyOnly ) { |
| 614 | + foreach ( $this->referrers as $textId => $hash ) { |
| 615 | + $url = $baseUrl . '/' . $hash; |
| 616 | + $this->parent->moveTextRow( $textId, $url ); |
| 617 | + } |
| 618 | + } |
| 619 | + } |
| 620 | + |
| 621 | + function signalHandler() { |
| 622 | + $this->signalled = true; |
| 623 | + } |
| 624 | +} |
| 625 | + |
Property changes on: trunk/phase3/maintenance/storage/recompressTracked.php |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 626 | + native |
Index: trunk/phase3/maintenance/storage/blob_tracking.sql |
— | — | @@ -28,16 +28,19 @@ |
29 | 29 | -- True if the text table has been updated to point to bt_new_url |
30 | 30 | bt_moved bool not null default 0, |
31 | 31 | |
32 | | - PRIMARY KEY (bt_rev_id, bt_text_id), |
| 32 | + -- Primary key |
| 33 | + -- Note that text_id is not unique due to null edits (protection, move) |
| 34 | + -- moveTextRow(), commit(), trackOrphanText() |
| 35 | + PRIMARY KEY (bt_text_id, bt_rev_id), |
33 | 36 | |
34 | 37 | -- Sort by page for easy CGZ recompression |
35 | | - KEY (bt_moved, bt_page, bt_rev_id), |
| 38 | + -- doAllPages(), doAllOrphans(), doPage(), finishIncompleteMoves() |
| 39 | + KEY (bt_moved, bt_page, bt_text_id), |
36 | 40 | |
37 | | - -- For fast orphan searches |
38 | | - KEY (bt_text_id), |
39 | | - |
40 | 41 | -- Key for determining the revisions using a given blob |
| 42 | + -- Not used by any scripts yet |
41 | 43 | KEY (bt_cluster, bt_blob_id, bt_cgz_hash) |
| 44 | + |
42 | 45 | ) /*$wgDBTableOptions*/; |
43 | 46 | |
44 | 47 | -- Tracking table for blob rows that aren't tracked by the text table |
Index: trunk/phase3/includes/HistoryBlob.php |
— | — | @@ -43,7 +43,9 @@ |
44 | 44 | class ConcatenatedGzipHistoryBlob implements HistoryBlob |
45 | 45 | { |
46 | 46 | public $mVersion = 0, $mCompressed = false, $mItems = array(), $mDefaultHash = ''; |
47 | | - public $mFast = 0, $mSize = 0; |
| 47 | + public $mSize = 0; |
| 48 | + public $mMaxSize = 10000000; |
| 49 | + public $mMaxCount = 100; |
48 | 50 | |
49 | 51 | /** Constructor */ |
50 | 52 | public function ConcatenatedGzipHistoryBlob() { |
— | — | @@ -122,25 +124,9 @@ |
123 | 125 | * Helper function for compression jobs |
124 | 126 | * Returns true until the object is "full" and ready to be committed |
125 | 127 | */ |
126 | | - public function isHappy( $maxFactor, $factorThreshold ) { |
127 | | - if ( count( $this->mItems ) == 0 ) { |
128 | | - return true; |
129 | | - } |
130 | | - if ( !$this->mFast ) { |
131 | | - $this->uncompress(); |
132 | | - $record = serialize( $this->mItems ); |
133 | | - $size = strlen( $record ); |
134 | | - $avgUncompressed = $size / count( $this->mItems ); |
135 | | - $compressed = strlen( gzdeflate( $record ) ); |
136 | | - |
137 | | - if ( $compressed < $factorThreshold * 1024 ) { |
138 | | - return true; |
139 | | - } else { |
140 | | - return $avgUncompressed * $maxFactor < $compressed; |
141 | | - } |
142 | | - } else { |
143 | | - return count( $this->mItems ) <= 10; |
144 | | - } |
| 128 | + public function isHappy() { |
| 129 | + return $this->mSize < $this->mMaxSize |
| 130 | + && count( $this->mItems ) < $this->mMaxCount; |
145 | 131 | } |
146 | 132 | } |
147 | 133 | |
— | — | @@ -313,6 +299,17 @@ |
314 | 300 | var $mFrozen = false; |
315 | 301 | |
316 | 302 | |
| 303 | + /** |
| 304 | + * The maximum uncompressed size before the object becomes sad |
| 305 | + * Should be less than max_allowed_packet |
| 306 | + */ |
| 307 | + var $mMaxSize = 10000000; |
| 308 | + |
| 309 | + /** |
| 310 | + * The maximum number of text items before the object becomes sad |
| 311 | + */ |
| 312 | + var $mMaxCount = 100; |
| 313 | + |
317 | 314 | function __construct() { |
318 | 315 | if ( !function_exists( 'xdiff_string_bdiff' ) ){ |
319 | 316 | throw new MWException( "Need xdiff 1.5+ support to read or write DiffHistoryBlob\n" ); |
— | — | @@ -328,6 +325,7 @@ |
329 | 326 | } |
330 | 327 | |
331 | 328 | $this->mItems[] = $text; |
| 329 | + $this->mSize += strlen( $text ); |
332 | 330 | $i = count( $this->mItems ) - 1; |
333 | 331 | if ( $i > 0 ) { |
334 | 332 | # Need to do a null concatenation with warnings off, due to bugs in the current version of xdiff |
— | — | @@ -401,4 +399,14 @@ |
402 | 400 | $this->mItems[0] = $info['base']; |
403 | 401 | $this->mDiffs = $info['diffs']; |
404 | 402 | } |
| 403 | + |
| 404 | + /** |
| 405 | + * Helper function for compression jobs |
| 406 | + * Returns true until the object is "full" and ready to be committed |
| 407 | + */ |
| 408 | + function isHappy() { |
| 409 | + return $this->mSize < $this->mMaxSize |
| 410 | + && count( $this->mItems ) < $this->mMaxCount; |
| 411 | + } |
| 412 | + |
405 | 413 | } |