r42767 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r42766‎ | r42767 | r42768 >
Date:15:15, 29 October 2008
Author:tstarling
Status:old (Comments)
Tags:
Comment:
* Added recompressTracked.php, the second part of the recompression project. Uses the sorted list of blobs compiled by trackBlobs.php to recompress the entire contents of a set of external storage clusters to a new set of clusters, and updates the text table accordingly.
* Tweaked blob_tracking indexes now that I know what I'm doing with them
* Standardised isHappy() on no arguments. Use an uncompressed size limit instead of the weird things I did originally.
Modified paths:
  • /trunk/phase3/includes/HistoryBlob.php (modified) (history)
  • /trunk/phase3/maintenance/storage/blob_tracking.sql (modified) (history)
  • /trunk/phase3/maintenance/storage/compressOld.inc (modified) (history)
  • /trunk/phase3/maintenance/storage/compressOld.php (modified) (history)
  • /trunk/phase3/maintenance/storage/recompressTracked.php (added) (history)

Diff [purge]

Index: trunk/phase3/maintenance/storage/compressOld.php
@@ -18,8 +18,6 @@
1919 * -b <begin-date> earliest date to check for uncompressed revisions
2020 * -e <end-date> latest revision date to compress
2121 * -s <start-id> the old_id to start from
22 - * -f <max-factor> the maximum ratio of compressed chunk bytes to uncompressed avg. revision bytes
23 - * -h <threshold> is a minimum number of KB, where <max-factor> cuts in
2422 * --extdb <cluster> store specified revisions in an external cluster (untested)
2523 *
2624 * @file
@@ -40,8 +38,6 @@
4139 't' => 'concat',
4240 'c' => 20,
4341 's' => 0,
44 - 'f' => 5,
45 - 'h' => 100,
4642 'b' => '',
4743 'e' => '',
4844 'extdb' => '',
@@ -62,7 +58,7 @@
6359
6460 $success = true;
6561 if ( $options['t'] == 'concat' ) {
66 - $success = compressWithConcat( $options['s'], $options['c'], $options['f'], $options['h'], $options['b'],
 62+ $success = compressWithConcat( $options['s'], $options['c'], $options['b'],
6763 $options['e'], $options['extdb'], $options['endid'] );
6864 } else {
6965 compressOldPages( $options['s'], $options['extdb'] );
Index: trunk/phase3/maintenance/storage/compressOld.inc
@@ -66,7 +66,7 @@
6767 define( 'LS_CHUNKED', 1 );
6868
6969 /** @todo document */
70 -function compressWithConcat( $startId, $maxChunkSize, $maxChunkFactor, $factorThreshold, $beginDate,
 70+function compressWithConcat( $startId, $maxChunkSize, $beginDate,
7171 $endDate, $extdb="", $maxPageId = false )
7272 {
7373 $fname = 'compressWithConcat';
@@ -194,7 +194,7 @@
195195 $primaryOldid = $revs[$i]->rev_text_id;
196196
197197 # Get the text of each revision and add it to the object
198 - for ( $j = 0; $j < $thisChunkSize && $chunk->isHappy( $maxChunkFactor, $factorThreshold ); $j++ ) {
 198+ for ( $j = 0; $j < $thisChunkSize && $chunk->isHappy(); $j++ ) {
199199 $oldid = $revs[$i + $j]->rev_text_id;
200200
201201 # Get text
Index: trunk/phase3/maintenance/storage/recompressTracked.php
@@ -0,0 +1,624 @@
 2+<?php
 3+
 4+$optionsWithArgs = RecompressTracked::getOptionsWithArgs();
 5+require( dirname( __FILE__ ) .'/../commandLine.inc' );
 6+
 7+if ( count( $args ) < 1 ) {
 8+ echo "Usage: php recompressTracked.php <cluster> [... <cluster>...]\n";
 9+ echo "Moves blobs indexed by trackBlobs.php to a specified list of destination
 10+clusters, and recompresses them in the process. Restartable.\n";
 11+ exit( 1 );
 12+}
 13+
 14+$job = RecompressTracked::newFromCommandLine( $args, $options );
 15+$job->execute();
 16+
 17+class RecompressTracked {
 18+ var $destClusters;
 19+ var $batchSize = 1000;
 20+ var $reportingInterval = 10;
 21+ var $numProcs = 8;
 22+ var $slavePipes, $slaveProcs, $prevSlaveId;
 23+ var $blobClass = 'DiffHistoryBlob';
 24+ var $copyOnly = false;
 25+ var $isChild = false;
 26+ var $slaveId = false;
 27+ var $store;
 28+
 29+ static $optionsWithArgs = array( 'procs', 'class' );
 30+ static $cmdLineOptionMap = array(
 31+ 'procs' => 'numProcs',
 32+ 'class' => 'blobClass',
 33+ 'copy-only' => 'copyOnly',
 34+ 'child' => 'isChild',
 35+ 'slave-id' => 'slaveId',
 36+ );
 37+
 38+ static function getOptionsWithArgs() {
 39+ return self::$optionsWithArgs;
 40+ }
 41+
 42+ static function newFromCommandLine( $args, $options ) {
 43+ $jobOptions = array( 'destClusters' => $args );
 44+ foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
 45+ if ( isset( $options[$cmdOption] ) ) {
 46+ $jobOptions[$classOption] = $options[$cmdOption];
 47+ }
 48+ }
 49+ return new self( $jobOptions );
 50+ }
 51+
 52+ function __construct( $options ) {
 53+ foreach ( $options as $name => $value ) {
 54+ $this->$name = $value;
 55+ }
 56+ $this->store = new ExternalStoreDB;
 57+ }
 58+
 59+ function debug( $msg ) {
 60+ if ( $this->slaveId !== false ) {
 61+ $msg = "{$this->slaveId}: $msg";
 62+ }
 63+ $msg .= "\n";
 64+ wfDebug( $msg );
 65+ }
 66+
 67+ /**
 68+ * Wait until the selected slave has caught up to the master.
 69+ * This allows us to use the slave for things that were committed in a
 70+ * previous part of this batch process.
 71+ */
 72+ function syncDBs() {
 73+ $dbw = wfGetDB( DB_MASTER );
 74+ $dbr = wfGetDB( DB_SLAVE );
 75+ $pos = $dbw->getMasterPos();
 76+ $dbr->masterPosWait( $pos, 100000 );
 77+ }
 78+
 79+ /**
 80+ * Execute parent or child depending on the isChild option
 81+ */
 82+ function execute() {
 83+ if ( $this->isChild ) {
 84+ $this->executeChild();
 85+ } else {
 86+ $this->executeParent();
 87+ }
 88+ }
 89+
 90+ /**
 91+ * Execute the parent process
 92+ */
 93+ function executeParent() {
 94+ if ( !$this->checkTrackingTable() ) {
 95+ return;
 96+ }
 97+
 98+ $this->syncDBs();
 99+ $this->startSlaveProcs();
 100+ $this->doAllPages();
 101+ $this->doAllOrphans();
 102+ $this->killSlaveProcs();
 103+ }
 104+
 105+ /**
 106+ * Make sure the tracking table exists and isn't empty
 107+ */
 108+ function checkTrackingTable() {
 109+ $dbr = wfGetDB( DB_SLAVE );
 110+ if ( !$dbr->tableExists( 'blob_tracking' ) ) {
 111+ echo "Error: blob_tracking table does not exist\n";
 112+ return false;
 113+ }
 114+ $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ );
 115+ if ( !$row ) {
 116+ echo "Warning: blob_tracking table contains no rows, skipping this wiki.\n";
 117+ return false;
 118+ }
 119+ return true;
 120+ }
 121+
 122+ /**
 123+ * Start the worker processes.
 124+ * These processes will listen on stdin for commands.
 125+ * This necessary because text recompression is slow: loading, compressing and
 126+ * writing are all slow.
 127+ */
 128+ function startSlaveProcs() {
 129+ $cmd = 'php ' . wfEscapeShellArg( __FILE__ );
 130+ foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
 131+ if ( in_array( $cmdOption, self::$optionsWithArgs ) ) {
 132+ $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption );
 133+ } elseif ( $this->$classOption ) {
 134+ $cmd .= " --$cmdOption";
 135+ }
 136+ }
 137+ $cmd .= ' --child' .
 138+ ' --wiki ' . wfEscapeShellArg( wfWikiID() ) .
 139+ ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters );
 140+
 141+ $this->slavePipes = $this->slaveProcs = array();
 142+ for ( $i = 0; $i < $this->numProcs; $i++ ) {
 143+ $pipes = false;
 144+ $spec = array(
 145+ array( 'pipe', 'r' ),
 146+ array( 'file', '/dev/stdout', 'w' ),
 147+ array( 'file', '/dev/stderr', 'w' )
 148+ );
 149+ wfSuppressWarnings();
 150+ $proc = proc_open( $cmd, $spec, $pipes );
 151+ wfRestoreWarnings();
 152+ if ( !$proc ) {
 153+ echo "Error opening slave process\n";
 154+ exit( 1 );
 155+ }
 156+ $this->slaveProcs[$i] = $proc;
 157+ $this->slavePipes[$i] = $pipes[0];
 158+ }
 159+ $this->prevSlaveId = -1;
 160+ }
 161+
 162+ /**
 163+ * Gracefully terminate the child processes
 164+ */
 165+ function killSlaveProcs() {
 166+ for ( $i = 0; $i < $this->numProcs; $i++ ) {
 167+ $this->dispatchToSlave( $i, 'quit' );
 168+ }
 169+ for ( $i = 0; $i < $this->numProcs; $i++ ) {
 170+ proc_close( $this->slaveProcs[$i] );
 171+ }
 172+ }
 173+
 174+ /**
 175+ * Dispatch a command to the next available slave.
 176+ * This may block until a slave finishes its work and becomes available.
 177+ */
 178+ function dispatch( /*...*/ ) {
 179+ $args = func_get_args();
 180+ $pipes = $this->slavePipes;
 181+ $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 );
 182+ if ( !$numPipes ) {
 183+ echo "Error waiting to write to slaves. Aborting\n";
 184+ exit( 1 );
 185+ }
 186+ for ( $i = 0; $i < $this->numProcs; $i++ ) {
 187+ $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs;
 188+ if ( isset( $pipes[$slaveId] ) ) {
 189+ $this->prevSlaveId = $slaveId;
 190+ $this->dispatchToSlave( $slaveId, $args );
 191+ return;
 192+ }
 193+ }
 194+ echo "Unreachable\n";
 195+ exit( 1 );
 196+ }
 197+
 198+ /**
 199+ * Dispatch a command to a specified slave
 200+ */
 201+ function dispatchToSlave( $slaveId, $args ) {
 202+ $args = (array)$args;
 203+ $cmd = implode( ' ', $args );
 204+ fwrite( $this->slavePipes[$slaveId], "$cmd\n" );
 205+ }
 206+
 207+ /**
 208+ * Move all tracked pages to the new clusters
 209+ */
 210+ function doAllPages() {
 211+ $dbr = wfGetDB( DB_SLAVE );
 212+ $startId = 0;
 213+ $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_page)',
 214+ # A condition is required so that this query uses the index
 215+ array( 'bt_moved' => 0 ),
 216+ __METHOD__ );
 217+ echo "Moving pages...\n";
 218+ while ( true ) {
 219+ $res = $dbr->select( 'blob_tracking',
 220+ array( 'bt_page' ),
 221+ array(
 222+ 'bt_moved' => 0,
 223+ 'bt_page > ' . $dbr->addQuotes( $startId )
 224+ ),
 225+ __METHOD__,
 226+ array(
 227+ 'DISTINCT',
 228+ 'ORDER BY' => 'bt_page',
 229+ 'LIMIT' => $this->batchSize,
 230+ )
 231+ );
 232+ if ( !$res->numRows() ) {
 233+ break;
 234+ }
 235+ foreach ( $res as $row ) {
 236+ $this->dispatch( 'doPage', $row->bt_page );
 237+ }
 238+ $startId = $row->bt_page;
 239+ $this->report( $startId, $endId );
 240+ }
 241+ echo "Done moving pages.\n";
 242+ }
 243+
 244+ /**
 245+ * Display a progress report
 246+ */
 247+ function report( $start, $end ) {
 248+ $this->numBatches++;
 249+ if ( $this->numBatches >= $this->reportingInterval ) {
 250+ $this->numBatches = 0;
 251+ echo "$start / $end\n";
 252+ wfWaitForSlaves( 5 );
 253+ }
 254+ }
 255+
 256+ /**
 257+ * Move all orphan text to the new clusters
 258+ */
 259+ function doAllOrphans() {
 260+ $dbr = wfGetDB( DB_SLAVE );
 261+ $startId = 0;
 262+ $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_text_id)',
 263+ array( 'bt_moved' => 0, 'bt_page' => 0 ),
 264+ __METHOD__ );
 265+ if ( !$endId ) {
 266+ return;
 267+ }
 268+ echo "Moving orphans...\n";
 269+
 270+ while ( true ) {
 271+ $res = $dbr->select( 'blob_tracking',
 272+ array( 'bt_text_id' ),
 273+ array(
 274+ 'bt_moved' => 0,
 275+ 'bt_page' => 0,
 276+ 'bt_text_id > ' . $dbr->addQuotes( $startId )
 277+ ),
 278+ __METHOD__,
 279+ array(
 280+ 'DISTINCT',
 281+ 'ORDER BY' => 'bt_text_id',
 282+ 'LIMIT' => $this->batchSize
 283+ )
 284+ );
 285+ if ( !$res->numRows() ) {
 286+ break;
 287+ }
 288+ $args = array( 'doOrphanList' );
 289+ foreach ( $res as $row ) {
 290+ $args[] = $row->bt_text_id;
 291+ }
 292+ call_user_func_array( array( $this, 'dispatch' ), $args );
 293+ $startId = $row->bt_text_id;
 294+ $this->report( $startId, $endId );
 295+ }
 296+ echo "Done moving orphans.\n";
 297+ }
 298+
 299+ /**
 300+ * Main entry point for worker processes
 301+ */
 302+ function executeChild() {
 303+ $this->syncDBs();
 304+
 305+ while ( !feof( STDIN ) ) {
 306+ $line = rtrim( fgets( STDIN ) );
 307+ if ( $line == '' ) {
 308+ continue;
 309+ }
 310+ $args = explode( ' ', $line );
 311+ $cmd = array_shift( $args );
 312+ switch ( $cmd ) {
 313+ case 'doPage':
 314+ $this->doPage( intval( $args[0] ) );
 315+ break;
 316+ case 'doOrphanList':
 317+ $this->doOrphanList( array_map( 'intval', $args ) );
 318+ break;
 319+ case 'quit':
 320+ return;
 321+ }
 322+ }
 323+ }
 324+
 325+ /**
 326+ * Move tracked text in a given page
 327+ */
 328+ function doPage( $pageId ) {
 329+ $dbr = wfGetDB( DB_SLAVE );
 330+
 331+ // Finish any incomplete transactions
 332+ if ( !$this->copyOnly ) {
 333+ $this->finishIncompleteMoves();
 334+ }
 335+
 336+ $startId = 0;
 337+ $trx = new CgzCopyTransaction( $this );
 338+
 339+ while ( true ) {
 340+ $res = $dbr->select(
 341+ array( 'blob_tracking', 'text' ),
 342+ '*',
 343+ array(
 344+ 'bt_page' => $pageId,
 345+ 'bt_text_id > ' . $dbr->addQuotes( $startId ),
 346+ 'bt_moved' => 0,
 347+ 'bt_new_url' => '',
 348+ 'bt_text_id=old_id',
 349+ ),
 350+ __METHOD__,
 351+ array(
 352+ 'ORDER BY' => 'bt_text_id',
 353+ 'LIMIT' => $this->batchSize
 354+ )
 355+ );
 356+ if ( !$res->numRows() ) {
 357+ break;
 358+ }
 359+
 360+ $lastTextId = 0;
 361+ foreach ( $res as $row ) {
 362+ if ( $lastTextId == $row->bt_text_id ) {
 363+ // Duplicate (null edit)
 364+ continue;
 365+ }
 366+ $lastTextId = $row->bt_text_id;
 367+ // Load the text
 368+ $text = Revision::getRevisionText( $row );
 369+ if ( $text === false ) {
 370+ echo "Error loading {$row->bt_rev_id}/{$row->bt_text_id}\n";
 371+ continue;
 372+ }
 373+
 374+ // Queue it
 375+ if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
 376+ $trx->commit();
 377+ $trx = new CgzCopyTransaction( $this );
 378+ }
 379+ }
 380+ $startId = $row->bt_text_id;
 381+ }
 382+ $trx->commit();
 383+ }
 384+
 385+ /**
 386+ * Atomic move operation.
 387+ *
 388+ * Write the new URL to the text table and set the bt_moved flag.
 389+ *
 390+ * This is done in a single transaction to provide restartable behaviour
 391+ * without data loss.
 392+ *
 393+ * The transaction is kept short to reduce locking.
 394+ */
 395+ function moveTextRow( $textId, $url ) {
 396+ $dbw = wfGetDB( DB_MASTER );
 397+ $dbw->begin();
 398+ $dbw->update( 'text',
 399+ array( // set
 400+ 'old_text' => $url,
 401+ 'old_flags' => 'external,utf8',
 402+ ),
 403+ array( // where
 404+ 'old_id' => $textId
 405+ ),
 406+ __METHOD__
 407+ );
 408+ $dbw->update( 'blob_tracking',
 409+ array( 'bt_moved' => 1 ),
 410+ array( 'bt_text_id' => $textId ),
 411+ __METHOD__
 412+ );
 413+ $dbw->commit();
 414+ }
 415+
 416+ /**
 417+ * Moves are done in two phases: bt_new_url and then bt_moved.
 418+ * - bt_new_url indicates that the text has been copied to the new cluster.
 419+ * - bt_moved indicates that the text table has been updated.
 420+ *
 421+ * This function completes any moves that only have done bt_new_url. This
 422+ * can happen when the script is interrupted, or when --copy-only is used.
 423+ */
 424+ function finishIncompleteMoves() {
 425+ $dbr = wfGetDB( DB_SLAVE );
 426+
 427+ $startId = 0;
 428+ while ( true ) {
 429+ $res = $dbr->select( 'blob_tracking',
 430+ '*',
 431+ array(
 432+ 'bt_text_id > ' . $dbr->addQuotes( $startId ),
 433+ 'bt_moved' => 0,
 434+ "bt_new_url <> ''",
 435+ ),
 436+ __METHOD__,
 437+ array(
 438+ 'ORDER BY' => 'bt_text_id',
 439+ 'LIMIT' => $this->batchSize,
 440+ )
 441+ );
 442+ if ( !$res->numRows() ) {
 443+ break;
 444+ }
 445+ foreach ( $res as $row ) {
 446+ $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
 447+ }
 448+ $startId = $row->bt_text_id;
 449+ }
 450+ }
 451+
 452+ /**
 453+ * Returns the name of the next target cluster
 454+ */
 455+ function getTargetCluster() {
 456+ $cluster = next( $this->destClusters );
 457+ if ( $cluster === false ) {
 458+ $cluster = reset( $this->destClusters );
 459+ }
 460+ return $cluster;
 461+ }
 462+
 463+ /**
 464+ * Gets a DB master connection for the given external cluster name
 465+ */
 466+ function getExtDB( $cluster ) {
 467+ $lb = wfGetLBFactory()->getExternalLB( $cluster );
 468+ return $lb->getConnection( DB_MASTER );
 469+ }
 470+
 471+ /**
 472+ * Move an orphan text_id to the new cluster
 473+ */
 474+ function doOrphanList( $textIds ) {
 475+ $trx = new CgzCopyTransaction( $this );
 476+ foreach ( $textIds as $textId ) {
 477+ $row = wfGetDB( DB_SLAVE )->selectRow( 'text', array( 'old_text', 'old_flags' ),
 478+ array( 'old_id' => $textId ), __METHOD__ );
 479+ $text = Revision::getRevisionText( $row );
 480+ if ( $text === false ) {
 481+ echo "Error: cannot load revision text for $textId\n";
 482+ continue;
 483+ }
 484+
 485+ if ( !$trx->addItem( $text, $textId ) ) {
 486+ $trx->commit();
 487+ $trx = new CgzCopyTransaction( $this );
 488+ }
 489+ }
 490+ }
 491+}
 492+
 493+/**
 494+ * Class to represent a recompression operation for a single CGZ blob
 495+ */
 496+class CgzCopyTransaction {
 497+ var $blobClass;
 498+ var $cgz;
 499+ var $referrers;
 500+
 501+ /**
 502+ * Create a transaction from a RecompressTracked object
 503+ */
 504+ function __construct( $parent ) {
 505+ $this->blobClass = $parent->blobClass;
 506+ $this->cgz = false;
 507+ $this->texts = array();
 508+ }
 509+
 510+ /**
 511+ * Add text.
 512+ * Returns false if it's ready to commit.
 513+ */
 514+ function addItem( $text, $textId ) {
 515+ if ( !$this->cgz ) {
 516+ $class = $this->blobClass;
 517+ $this->cgz = new $class;
 518+ }
 519+ $hash = $this->cgz->addItem( $text );
 520+ $this->referrers[$textId] = $hash;
 521+ $this->texts[$textId] = $text;
 522+ return $this->cgz->isHappy();
 523+ }
 524+
 525+ /**
 526+ * Recompress text after some aberrant modification
 527+ */
 528+ function recompress() {
 529+ $class = $this->blobClass;
 530+ $this->cgz = new $class;
 531+ $this->referrers = array();
 532+ foreach ( $this->texts as $textId => $text ) {
 533+ $hash = $this->cgz->addItem( $text );
 534+ $this->referrers[$textId] = $hash;
 535+ }
 536+ }
 537+
 538+ /**
 539+ * Commit the blob.
 540+ * Does nothing if no text items have been added.
 541+ * May skip the move if --copy-only is set.
 542+ */
 543+ function commit() {
 544+ $originalCount = count( $this->texts );
 545+ if ( !$originalCount ) {
 546+ return;
 547+ }
 548+
 549+ // Check to see if the target text_ids have been moved already.
 550+ //
 551+ // We originally read from the slave, so this can happen when a single
 552+ // text_id is shared between multiple pages. It's rare, but possible
 553+ // if a delete/move/undelete cycle splits up a null edit.
 554+ //
 555+ // We do a locking read to prevent closer-run race conditions.
 556+ $dbw = wfGetDB( DB_MASTER );
 557+ $dbw->begin();
 558+ $dirty = false;
 559+ foreach ( $this->referrers as $textId => $hash ) {
 560+ $moved = $dbw->selectField( 'blob_tracking', 'bt_moved',
 561+ array( 'bt_text_id' => $textId ),
 562+ __METHOD__,
 563+ array( 'FOR UPDATE' )
 564+ );
 565+ if ( !$moved ) {
 566+ # This row has already been moved, remove it
 567+ unset( $this->texts[$textId] );
 568+ $dirty = true;
 569+ }
 570+ }
 571+
 572+ // Recompress the blob if necessary
 573+ if ( $dirty ) {
 574+ if ( !count( $this->texts ) ) {
 575+ // All have been moved already
 576+ if ( $originalCount > 1 ) {
 577+ // This is suspcious, make noise
 578+ echo "Warning: concurrent operation detected, are there two conflicting\n" .
 579+ "processes running, doing the same job?\n";
 580+ }
 581+ return;
 582+ }
 583+ $this->recompress();
 584+ }
 585+
 586+ // Insert the data into the destination cluster
 587+ $targetCluster = $this->parent->getTargetCluster();
 588+ $store = $this->parent->store;
 589+ $targetDB = $store->getMaster( $targetCluster );
 590+ $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
 591+ $targetDB->begin();
 592+ $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
 593+
 594+ // Write the new URLs to the blob_tracking table
 595+ foreach ( $this->referrers as $textId => $hash ) {
 596+ $url = $baseUrl . '/' . $hash;
 597+ $dbw->update( 'blob_tracking',
 598+ array( 'bt_new_url' => $url ),
 599+ array(
 600+ 'bt_text_id' => $textId,
 601+ 'bt_moved' => 0, # Check for concurrent conflicting update
 602+ ),
 603+ __METHOD__
 604+ );
 605+ }
 606+
 607+ $targetDB->commit();
 608+ // Critical section here: interruption at this point causes blob duplication
 609+ // Reversing the order of the commits would cause data loss instead
 610+ $dbw->commit();
 611+
 612+ // Write the new URLs to the text table and set the moved flag
 613+ if ( !$this->parent->copyOnly ) {
 614+ foreach ( $this->referrers as $textId => $hash ) {
 615+ $url = $baseUrl . '/' . $hash;
 616+ $this->parent->moveTextRow( $textId, $url );
 617+ }
 618+ }
 619+ }
 620+
 621+ function signalHandler() {
 622+ $this->signalled = true;
 623+ }
 624+}
 625+
Property changes on: trunk/phase3/maintenance/storage/recompressTracked.php
___________________________________________________________________
Added: svn:eol-style
1626 + native
Index: trunk/phase3/maintenance/storage/blob_tracking.sql
@@ -28,16 +28,19 @@
2929 -- True if the text table has been updated to point to bt_new_url
3030 bt_moved bool not null default 0,
3131
32 - PRIMARY KEY (bt_rev_id, bt_text_id),
 32+ -- Primary key
 33+ -- Note that text_id is not unique due to null edits (protection, move)
 34+ -- moveTextRow(), commit(), trackOrphanText()
 35+ PRIMARY KEY (bt_text_id, bt_rev_id),
3336
3437 -- Sort by page for easy CGZ recompression
35 - KEY (bt_moved, bt_page, bt_rev_id),
 38+ -- doAllPages(), doAllOrphans(), doPage(), finishIncompleteMoves()
 39+ KEY (bt_moved, bt_page, bt_text_id),
3640
37 - -- For fast orphan searches
38 - KEY (bt_text_id),
39 -
4041 -- Key for determining the revisions using a given blob
 42+ -- Not used by any scripts yet
4143 KEY (bt_cluster, bt_blob_id, bt_cgz_hash)
 44+
4245 ) /*$wgDBTableOptions*/;
4346
4447 -- Tracking table for blob rows that aren't tracked by the text table
Index: trunk/phase3/includes/HistoryBlob.php
@@ -43,7 +43,9 @@
4444 class ConcatenatedGzipHistoryBlob implements HistoryBlob
4545 {
4646 public $mVersion = 0, $mCompressed = false, $mItems = array(), $mDefaultHash = '';
47 - public $mFast = 0, $mSize = 0;
 47+ public $mSize = 0;
 48+ public $mMaxSize = 10000000;
 49+ public $mMaxCount = 100;
4850
4951 /** Constructor */
5052 public function ConcatenatedGzipHistoryBlob() {
@@ -122,25 +124,9 @@
123125 * Helper function for compression jobs
124126 * Returns true until the object is "full" and ready to be committed
125127 */
126 - public function isHappy( $maxFactor, $factorThreshold ) {
127 - if ( count( $this->mItems ) == 0 ) {
128 - return true;
129 - }
130 - if ( !$this->mFast ) {
131 - $this->uncompress();
132 - $record = serialize( $this->mItems );
133 - $size = strlen( $record );
134 - $avgUncompressed = $size / count( $this->mItems );
135 - $compressed = strlen( gzdeflate( $record ) );
136 -
137 - if ( $compressed < $factorThreshold * 1024 ) {
138 - return true;
139 - } else {
140 - return $avgUncompressed * $maxFactor < $compressed;
141 - }
142 - } else {
143 - return count( $this->mItems ) <= 10;
144 - }
 128+ public function isHappy() {
 129+ return $this->mSize < $this->mMaxSize
 130+ && count( $this->mItems ) < $this->mMaxCount;
145131 }
146132 }
147133
@@ -313,6 +299,17 @@
314300 var $mFrozen = false;
315301
316302
 303+ /**
 304+ * The maximum uncompressed size before the object becomes sad
 305+ * Should be less than max_allowed_packet
 306+ */
 307+ var $mMaxSize = 10000000;
 308+
 309+ /**
 310+ * The maximum number of text items before the object becomes sad
 311+ */
 312+ var $mMaxCount = 100;
 313+
317314 function __construct() {
318315 if ( !function_exists( 'xdiff_string_bdiff' ) ){
319316 throw new MWException( "Need xdiff 1.5+ support to read or write DiffHistoryBlob\n" );
@@ -328,6 +325,7 @@
329326 }
330327
331328 $this->mItems[] = $text;
 329+ $this->mSize += strlen( $text );
332330 $i = count( $this->mItems ) - 1;
333331 if ( $i > 0 ) {
334332 # Need to do a null concatenation with warnings off, due to bugs in the current version of xdiff
@@ -401,4 +399,14 @@
402400 $this->mItems[0] = $info['base'];
403401 $this->mDiffs = $info['diffs'];
404402 }
 403+
 404+ /**
 405+ * Helper function for compression jobs
 406+ * Returns true until the object is "full" and ready to be committed
 407+ */
 408+ function isHappy() {
 409+ return $this->mSize < $this->mMaxSize
 410+ && count( $this->mItems ) < $this->mMaxCount;
 411+ }
 412+
405413 }

Comments

#Comment by Brion VIBBER (talk | contribs)   00:58, 14 November 2008

Hoping all is well :D

Status & tagging log