Index: trunk/phase3/maintenance/nextJobDB.php |
— | — | @@ -33,20 +33,71 @@ |
34 | 34 | public function execute() { |
35 | 35 | global $wgMemc; |
36 | 36 | $type = $this->getOption( 'type', false ); |
37 | | - $mckey = $type === false |
38 | | - ? "jobqueue:dbs" |
39 | | - : "jobqueue:dbs:$type"; |
40 | | - $pendingDBs = $wgMemc->get( $mckey ); |
41 | 37 | |
42 | | - # If we didn't get it from the cache |
| 38 | + $memcKey = 'jobqueue:dbs:v2'; |
| 39 | + $pendingDBs = $wgMemc->get( $memcKey ); |
| 40 | + |
| 41 | + // If the cache entry wasn't present, or in 1% of cases otherwise, |
| 42 | + // regenerate the cache. |
| 43 | + if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) { |
| 44 | + $pendingDBs = $this->getPendingDbs(); |
| 45 | + $wgMemc->set( $memcKey, $pendingDBs, 300 ); |
| 46 | + } |
| 47 | + |
43 | 48 | if ( !$pendingDBs ) { |
44 | | - $pendingDBs = $this->getPendingDbs( $type ); |
45 | | - $wgMemc->set( $mckey, $pendingDBs, 300 ); |
| 49 | + return; |
46 | 50 | } |
47 | | - # If we've got a pending job in a db, display it. |
48 | | - if ( $pendingDBs ) { |
49 | | - $this->output( $pendingDBs[mt_rand( 0, count( $pendingDBs ) - 1 )] ); |
| 51 | + |
| 52 | + do { |
| 53 | + $again = false; |
| 54 | + |
| 55 | + if ( $type === false ) { |
| 56 | + $candidates = call_user_func_array( 'array_merge', $pendingDBs ); |
| 57 | + } elseif ( isset( $pendingDBs[$type] ) ) { |
| 58 | + $candidates = $pendingDBs[$type]; |
| 59 | + } else { |
| 60 | + $candidates = array(); |
| 61 | + } |
| 62 | + if ( !$candidates ) { |
| 63 | + return; |
| 64 | + } |
| 65 | + |
| 66 | + $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ]; |
| 67 | + if ( !$this->checkJob( $type, $db ) ) { |
| 68 | + // This job is not available in the current database. Remove it from |
| 69 | + // the cache. |
| 70 | + if ( $type === false ) { |
| 71 | + foreach ( $pendingDBs as $type2 => $dbs ) { |
| 72 | + $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) ); |
| 73 | + } |
| 74 | + } else { |
| 75 | + $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) ); |
| 76 | + } |
| 77 | + |
| 78 | + $wgMemc->set( $memcKey, $pendingDBs, 300 ); |
| 79 | + $again = true; |
| 80 | + } |
| 81 | + } while ( $again ); |
| 82 | + |
| 83 | + $this->output( $db . "\n" ); |
| 84 | + } |
| 85 | + |
| 86 | + /** |
| 87 | + * Check if the specified database has a job of the specified type in it. |
| 88 | + * The type may be false to indicate "all". |
| 89 | + */ |
| 90 | + function checkJob( $type, $db ) { |
| 91 | + $lb = wfGetLB( $db ); |
| 92 | + $db = $lb->getConnection( DB_MASTER ); |
| 93 | + if ( $type === false ) { |
| 94 | + $conds = array(); |
| 95 | + } else { |
| 96 | + $conds = array( 'job_cmd' => $type ); |
50 | 97 | } |
| 98 | + |
| 99 | + $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ ); |
| 100 | + $lb->reuseConnection( $db ); |
| 101 | + return $exists; |
51 | 102 | } |
52 | 103 | |
53 | 104 | /** |
— | — | @@ -54,7 +105,7 @@ |
55 | 106 | * @param $type String Job type |
56 | 107 | * @return array |
57 | 108 | */ |
58 | | - private function getPendingDbs( $type ) { |
| 109 | + private function getPendingDbs() { |
59 | 110 | global $wgLocalDatabases; |
60 | 111 | $pendingDBs = array(); |
61 | 112 | # Cross-reference DBs by master DB server |
— | — | @@ -66,10 +117,10 @@ |
67 | 118 | |
68 | 119 | foreach ( $dbsByMaster as $dbs ) { |
69 | 120 | $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] ); |
70 | | - $stype = $dbConn->addQuotes( $type ); |
71 | 121 | |
72 | 122 | # Padding row for MySQL bug |
73 | | - $sql = "(SELECT '-------------------------------------------' as db)"; |
| 123 | + $pad = str_repeat( '-', 40 ); |
| 124 | + $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)"; |
74 | 125 | foreach ( $dbs as $wikiId ) { |
75 | 126 | if ( $sql != '' ) { |
76 | 127 | $sql .= ' UNION '; |
— | — | @@ -79,10 +130,7 @@ |
80 | 131 | $dbConn->tablePrefix( $tablePrefix ); |
81 | 132 | $jobTable = $dbConn->tableName( 'job' ); |
82 | 133 | |
83 | | - if ( $type === false ) |
84 | | - $sql .= "(SELECT '$wikiId' as db FROM $dbName.$jobTable LIMIT 1)"; |
85 | | - else |
86 | | - $sql .= "(SELECT '$wikiId' as db FROM $dbName.$jobTable WHERE job_cmd=$stype LIMIT 1)"; |
| 134 | + $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)"; |
87 | 135 | } |
88 | 136 | $res = $dbConn->query( $sql, __METHOD__ ); |
89 | 137 | $first = true; |
— | — | @@ -92,7 +140,7 @@ |
93 | 141 | $first = false; |
94 | 142 | continue; |
95 | 143 | } |
96 | | - $pendingDBs[] = $row->db; |
| 144 | + $pendingDBs[$row->job_cmd][] = $row->db; |
97 | 145 | } |
98 | 146 | } |
99 | 147 | return $pendingDBs; |