Index: trunk/phase3/maintenance/storage/recompressTracked.php |
— | — | @@ -8,8 +8,11 @@ |
9 | 9 | Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable. |
10 | 10 | |
11 | 11 | Options: |
12 | | - --procs <procs> Set the number of child processes (default 8) |
13 | | - --copy-only Copy only, do not update the text table. Restart without this option to complete. |
| 12 | + --procs <procs> Set the number of child processes (default 1) |
| 13 | + --copy-only Copy only, do not update the text table. Restart without this option to complete. |
| 14 | + --debug-log <file> Log debugging data to the specified file |
| 15 | + --info-log <file> Log progress messages to the specified file |
| 16 | + --critical-log <file> Log error messages to the specified file |
14 | 17 | "; |
15 | 18 | exit( 1 ); |
16 | 19 | } |
— | — | @@ -20,21 +23,26 @@ |
21 | 24 | class RecompressTracked { |
22 | 25 | var $destClusters; |
23 | 26 | var $batchSize = 1000; |
| 27 | + var $orphanBatchSize = 1000; |
24 | 28 | var $reportingInterval = 10; |
25 | | - var $numProcs = 8; |
| 29 | + var $numProcs = 1; |
26 | 30 | var $useDiff, $pageBlobClass, $orphanBlobClass; |
27 | 31 | var $slavePipes, $slaveProcs, $prevSlaveId; |
28 | 32 | var $copyOnly = false; |
29 | 33 | var $isChild = false; |
30 | 34 | var $slaveId = false; |
| 35 | + var $debugLog, $infoLog, $criticalLog; |
31 | 36 | var $store; |
32 | 37 | |
33 | | - static $optionsWithArgs = array( 'procs', 'slave-id' ); |
| 38 | + static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' ); |
34 | 39 | static $cmdLineOptionMap = array( |
35 | 40 | 'procs' => 'numProcs', |
36 | 41 | 'copy-only' => 'copyOnly', |
37 | 42 | 'child' => 'isChild', |
38 | 43 | 'slave-id' => 'slaveId', |
| 44 | + 'debug-log' => 'debugLog', |
| 45 | + 'info-log' => 'infoLog', |
| 46 | + 'critical-log' => 'criticalLog', |
39 | 47 | ); |
40 | 48 | |
41 | 49 | static function getOptionsWithArgs() { |
— | — | @@ -68,8 +76,35 @@ |
69 | 77 | |
70 | 78 | function debug( $msg ) { |
71 | 79 | wfDebug( "$msg\n" ); |
| 80 | + if ( $this->debugLog ) { |
| 81 | + $this->logToFile( $msg, $this->debugLog ); |
| 82 | + } |
| 83 | + |
72 | 84 | } |
73 | 85 | |
| 86 | + function info( $msg ) { |
| 87 | + echo "$msg\n"; |
| 88 | + if ( $this->infoLog ) { |
| 89 | + $this->logToFile( $msg, $this->infoLog ); |
| 90 | + } |
| 91 | + } |
| 92 | + |
| 93 | + function critical( $msg ) { |
| 94 | + echo "$msg\n"; |
| 95 | + if ( $this->criticalLog ) { |
| 96 | + $this->logToFile( $msg, $this->criticalLog ); |
| 97 | + } |
| 98 | + } |
| 99 | + |
| 100 | + function logToFile( $msg, $file ) { |
| 101 | + $header = '[' . date('d\TH:i:s') . '] ' . wfHostname() . ' ' . posix_getpid(); |
| 102 | + if ( $this->slaveId !== false ) { |
| 103 | + $header .= "({$this->slaveId})"; |
| 104 | + } |
| 105 | + $header .= ' ' . wfWikiID(); |
| 106 | + wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file ); |
| 107 | + } |
| 108 | + |
74 | 109 | /** |
75 | 110 | * Wait until the selected slave has caught up to the master. |
76 | 111 | * This allows us to use the slave for things that were committed in a |
— | — | @@ -114,12 +149,12 @@ |
115 | 150 | function checkTrackingTable() { |
116 | 151 | $dbr = wfGetDB( DB_SLAVE ); |
117 | 152 | if ( !$dbr->tableExists( 'blob_tracking' ) ) { |
118 | | - echo "Error: blob_tracking table does not exist\n"; |
| 153 | + $this->critical( "Error: blob_tracking table does not exist" ); |
119 | 154 | return false; |
120 | 155 | } |
121 | 156 | $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ ); |
122 | 157 | if ( !$row ) { |
123 | | - echo "Warning: blob_tracking table contains no rows, skipping this wiki.\n"; |
| 158 | + $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." ); |
124 | 159 | return false; |
125 | 160 | } |
126 | 161 | return true; |
— | — | @@ -156,7 +191,7 @@ |
157 | 192 | $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes ); |
158 | 193 | wfRestoreWarnings(); |
159 | 194 | if ( !$proc ) { |
160 | | - echo "Error opening slave process\n"; |
| 195 | + $this->critical( "Error opening slave process" ); |
161 | 196 | exit( 1 ); |
162 | 197 | } |
163 | 198 | $this->slaveProcs[$i] = $proc; |
— | — | @@ -169,12 +204,17 @@ |
170 | 205 | * Gracefully terminate the child processes |
171 | 206 | */ |
172 | 207 | function killSlaveProcs() { |
| 208 | + $this->info( "Waiting for slave processes to finish..." ); |
173 | 209 | for ( $i = 0; $i < $this->numProcs; $i++ ) { |
174 | 210 | $this->dispatchToSlave( $i, 'quit' ); |
175 | 211 | } |
176 | 212 | for ( $i = 0; $i < $this->numProcs; $i++ ) { |
177 | | - proc_close( $this->slaveProcs[$i] ); |
| 213 | + $status = proc_close( $this->slaveProcs[$i] ); |
| 214 | + if ( $status ) { |
| 215 | + $this->critical( "Warning: child #$i exited with status $status" ); |
| 216 | + } |
178 | 217 | } |
| 218 | + $this->info( "Done." ); |
179 | 219 | } |
180 | 220 | |
181 | 221 | /** |
— | — | @@ -186,7 +226,7 @@ |
187 | 227 | $pipes = $this->slavePipes; |
188 | 228 | $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 ); |
189 | 229 | if ( !$numPipes ) { |
190 | | - echo "Error waiting to write to slaves. Aborting\n"; |
| 230 | + $this->critical( "Error waiting to write to slaves. Aborting" ); |
191 | 231 | exit( 1 ); |
192 | 232 | } |
193 | 233 | for ( $i = 0; $i < $this->numProcs; $i++ ) { |
— | — | @@ -197,7 +237,7 @@ |
198 | 238 | return; |
199 | 239 | } |
200 | 240 | } |
201 | | - echo "Unreachable\n"; |
| 241 | + $this->critical( "Unreachable" ); |
202 | 242 | exit( 1 ); |
203 | 243 | } |
204 | 244 | |
— | — | @@ -215,12 +255,19 @@ |
216 | 256 | */ |
217 | 257 | function doAllPages() { |
218 | 258 | $dbr = wfGetDB( DB_SLAVE ); |
| 259 | + $i = 0; |
219 | 260 | $startId = 0; |
220 | | - $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_page)', |
| 261 | + $numPages = $dbr->selectField( 'blob_tracking', |
| 262 | + 'COUNT(DISTINCT bt_page)', |
221 | 263 | # A condition is required so that this query uses the index |
222 | 264 | array( 'bt_moved' => 0 ), |
223 | | - __METHOD__ ); |
224 | | - echo "Moving pages...\n"; |
| 265 | + __METHOD__ |
| 266 | + ); |
| 267 | + if ( $this->copyOnly ) { |
| 268 | + $this->info( "Copying pages..." ); |
| 269 | + } else { |
| 270 | + $this->info( "Moving pages..." ); |
| 271 | + } |
225 | 272 | while ( true ) { |
226 | 273 | $res = $dbr->select( 'blob_tracking', |
227 | 274 | array( 'bt_page' ), |
— | — | @@ -240,21 +287,27 @@ |
241 | 288 | } |
242 | 289 | foreach ( $res as $row ) { |
243 | 290 | $this->dispatch( 'doPage', $row->bt_page ); |
| 291 | + $i++; |
244 | 292 | } |
245 | 293 | $startId = $row->bt_page; |
246 | | - $this->report( $startId, $endId ); |
| 294 | + $this->report( 'pages', $i, $numPages ); |
247 | 295 | } |
248 | | - echo "Done moving pages.\n"; |
| 296 | + $this->report( 'pages', $i, $numPages ); |
| 297 | + if ( $this->copyOnly ) { |
| 298 | + $this->info( "All page copies queued." ); |
| 299 | + } else { |
| 300 | + $this->info( "All page moves queued." ); |
| 301 | + } |
249 | 302 | } |
250 | 303 | |
251 | 304 | /** |
252 | 305 | * Display a progress report |
253 | 306 | */ |
254 | | - function report( $start, $end ) { |
| 307 | + function report( $label, $current, $end ) { |
255 | 308 | $this->numBatches++; |
256 | | - if ( $this->numBatches >= $this->reportingInterval ) { |
| 309 | + if ( $current == $end || $this->numBatches >= $this->reportingInterval ) { |
257 | 310 | $this->numBatches = 0; |
258 | | - echo "$start / $end\n"; |
| 311 | + $this->info( "$label: $current / $end" ); |
259 | 312 | wfWaitForSlaves( 5 ); |
260 | 313 | } |
261 | 314 | } |
— | — | @@ -265,13 +318,20 @@ |
266 | 319 | function doAllOrphans() { |
267 | 320 | $dbr = wfGetDB( DB_SLAVE ); |
268 | 321 | $startId = 0; |
269 | | - $endId = $dbr->selectField( 'blob_tracking', 'MAX(bt_text_id)', |
| 322 | + $i = 0; |
| 323 | + $numOrphans = $dbr->selectField( 'blob_tracking', |
| 324 | + 'COUNT(DISTINCT bt_text_id)', |
270 | 325 | array( 'bt_moved' => 0, 'bt_page' => 0 ), |
271 | 326 | __METHOD__ ); |
272 | | - if ( !$endId ) { |
| 327 | + if ( !$numOrphans ) { |
273 | 328 | return; |
274 | 329 | } |
275 | | - echo "Moving orphans...\n"; |
| 330 | + if ( $this->copyOnly ) { |
| 331 | + $this->info( "Copying orphans..." ); |
| 332 | + } else { |
| 333 | + $this->info( "Moving orphans..." ); |
| 334 | + } |
| 335 | + $ids = array(); |
276 | 336 | |
277 | 337 | while ( true ) { |
278 | 338 | $res = $dbr->select( 'blob_tracking', |
— | — | @@ -291,15 +351,24 @@ |
292 | 352 | if ( !$res->numRows() ) { |
293 | 353 | break; |
294 | 354 | } |
295 | | - $args = array( 'doOrphanList' ); |
296 | 355 | foreach ( $res as $row ) { |
297 | | - $args[] = $row->bt_text_id; |
| 356 | + $ids[] = $row->bt_text_id; |
| 357 | + $i++; |
298 | 358 | } |
299 | | - call_user_func_array( array( $this, 'dispatch' ), $args ); |
| 359 | + // Need to send enough orphan IDs to the child at a time to fill a blob, |
| 360 | + // so orphanBatchSize needs to be at least ~100. |
| 361 | + // batchSize can be smaller or larger. |
| 362 | + while ( count( $ids ) > $this->orphanBatchSize ) { |
| 363 | + $args = array_slice( $ids, 0, $this->orphanBatchSize ); |
| 364 | + $ids = array_slice( $ids, $this->orphanBatchSize ); |
| 365 | + array_unshift( $args, 'doOrphanList' ); |
| 366 | + call_user_func_array( array( $this, 'dispatch' ), $args ); |
| 367 | + } |
300 | 368 | $startId = $row->bt_text_id; |
301 | | - $this->report( $startId, $endId ); |
| 369 | + $this->report( 'orphans', $i, $numOrphans ); |
302 | 370 | } |
303 | | - echo "Done moving orphans.\n"; |
| 371 | + $this->report( 'orphans', $i, $numOrphans ); |
| 372 | + $this->info( "All orphans queued." ); |
304 | 373 | } |
305 | 374 | |
306 | 375 | /** |
— | — | @@ -345,6 +414,7 @@ |
346 | 415 | // Finish any incomplete transactions |
347 | 416 | if ( !$this->copyOnly ) { |
348 | 417 | $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) ); |
| 418 | + $this->syncDBs(); |
349 | 419 | } |
350 | 420 | |
351 | 421 | $startId = 0; |
— | — | @@ -381,7 +451,7 @@ |
382 | 452 | // Load the text |
383 | 453 | $text = Revision::getRevisionText( $row ); |
384 | 454 | if ( $text === false ) { |
385 | | - echo "Error loading {$row->bt_rev_id}/{$row->bt_text_id}\n"; |
| 455 | + $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" ); |
386 | 456 | continue; |
387 | 457 | } |
388 | 458 | |
— | — | @@ -410,6 +480,10 @@ |
411 | 481 | * The transaction is kept short to reduce locking. |
412 | 482 | */ |
413 | 483 | function moveTextRow( $textId, $url ) { |
| 484 | + if ( $this->copyOnly ) { |
| 485 | + $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" ); |
| 486 | + exit( 1 ); |
| 487 | + } |
414 | 488 | $dbw = wfGetDB( DB_MASTER ); |
415 | 489 | $dbw->begin(); |
416 | 490 | $dbw->update( 'text', |
— | — | @@ -491,19 +565,33 @@ |
492 | 566 | */ |
493 | 567 | function doOrphanList( $textIds ) { |
494 | 568 | // Finish incomplete moves |
495 | | - $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) ); |
| 569 | + if ( !$this->copyOnly ) { |
| 570 | + $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) ); |
| 571 | + $this->syncDBs(); |
| 572 | + } |
496 | 573 | |
497 | 574 | $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); |
498 | | - foreach ( $textIds as $textId ) { |
499 | | - $row = wfGetDB( DB_SLAVE )->selectRow( 'text', array( 'old_text', 'old_flags' ), |
500 | | - array( 'old_id' => $textId ), __METHOD__ ); |
| 575 | + |
| 576 | + $res = wfGetDB( DB_SLAVE )->select( |
| 577 | + array( 'text', 'blob_tracking' ), |
| 578 | + array( 'old_id', 'old_text', 'old_flags' ), |
| 579 | + array( |
| 580 | + 'old_id' => $textIds, |
| 581 | + 'bt_text_id=old_id', |
| 582 | + 'bt_moved' => 0, |
| 583 | + ), |
| 584 | + __METHOD__, |
| 585 | + array( 'DISTINCT' ) |
| 586 | + ); |
| 587 | + |
| 588 | + foreach ( $res as $row ) { |
501 | 589 | $text = Revision::getRevisionText( $row ); |
502 | 590 | if ( $text === false ) { |
503 | | - echo "Error: cannot load revision text for $textId\n"; |
| 591 | + $this->critical( "Error: cannot load revision text for old_id=$textId" ); |
504 | 592 | continue; |
505 | 593 | } |
506 | 594 | |
507 | | - if ( !$trx->addItem( $text, $textId ) ) { |
| 595 | + if ( !$trx->addItem( $text, $row->old_id ) ) { |
508 | 596 | $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" ); |
509 | 597 | $trx->commit(); |
510 | 598 | $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); |
— | — | @@ -605,8 +693,8 @@ |
606 | 694 | // All have been moved already |
607 | 695 | if ( $originalCount > 1 ) { |
608 | 696 | // This is suspcious, make noise |
609 | | - echo "Warning: concurrent operation detected, are there two conflicting " . |
610 | | - "processes running, doing the same job?\n"; |
| 697 | + $this->critical( "Warning: concurrent operation detected, are there two conflicting " . |
| 698 | + "processes running, doing the same job?" ); |
611 | 699 | } |
612 | 700 | return; |
613 | 701 | } |