r62005 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r62004‎ | r62005 | r62006 >
Date:03:36, 5 February 2010
Author:tstarling
Status:deferred (Comments)
Tags:
Comment:
* (bug 21551) Rewrote the Squid purge HTTP client to provide a more robust and general implementation of HTTP, allowing it to purge non-Squid caches such as Varnish. Tested against Squid (keep-alive on or off), Varnish, various kinds of network error, benchmarked at ~3k req/s.
* Reverted what was left of r59178
* Removed $wgDebugSquid, didn't do anything anyway
Modified paths:
  • /trunk/phase3/CREDITS (modified) (history)
  • /trunk/phase3/RELEASE-NOTES (modified) (history)
  • /trunk/phase3/includes/AutoLoader.php (modified) (history)
  • /trunk/phase3/includes/DefaultSettings.php (modified) (history)
  • /trunk/phase3/includes/SquidPurgeClient.php (added) (history)
  • /trunk/phase3/includes/SquidUpdate.php (modified) (history)

Diff [purge]

Index: trunk/phase3/CREDITS
@@ -112,7 +112,6 @@
113113 * René Kijewski
114114 * Robert Treat
115115 * RockMFR
116 -* Roi Avinoam
117116 * ST47
118117 * Scott Colcord
119118 * Simon Walker
Index: trunk/phase3/includes/SquidPurgeClient.php
@@ -0,0 +1,380 @@
 2+<?php
 3+/**
 4+ * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
 5+ * Uses asynchronous I/O, allowing purges to be done in a highly parallel
 6+ * manner.
 7+ *
 8+ * Could be replaced by curl_multi_exec() or some such.
 9+ */
 10+class SquidPurgeClient {
 11+ var $host, $port, $ip;
 12+
 13+ var $readState = 'idle';
 14+ var $writeBuffer = '';
 15+ var $requests = array();
 16+ var $currentRequestIndex;
 17+
 18+ const EINTR = 4;
 19+ const EAGAIN = 11;
 20+ const EINPROGRESS = 115;
 21+ const BUFFER_SIZE = 8192;
 22+
 23+ /**
 24+ * The socket resource, or null for unconnected, or false for disabled due to error
 25+ */
 26+ var $socket;
 27+
 28+ public function __construct( $server, $options = array() ) {
 29+ $parts = explode( ':', $server, 2 );
 30+ $this->host = $parts[0];
 31+ $this->port = isset( $parts[1] ) ? $parts[1] : 80;
 32+ }
 33+
 34+ /**
 35+ * Open a socket if there isn't one open already, return it.
 36+ * Returns false on error.
 37+ */
 38+ protected function getSocket() {
 39+ if ( $this->socket !== null ) {
 40+ return $this->socket;
 41+ }
 42+
 43+ $ip = $this->getIP();
 44+ if ( !$ip ) {
 45+ $this->log( "DNS error" );
 46+ $this->markDown();
 47+ return false;
 48+ }
 49+ $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
 50+ socket_set_nonblock( $this->socket );
 51+ wfSuppressWarnings();
 52+ $ok = socket_connect( $this->socket, $ip, $this->port );
 53+ wfRestoreWarnings();
 54+ if ( !$ok ) {
 55+ $error = socket_last_error( $this->socket );
 56+ if ( $error !== self::EINPROGRESS ) {
 57+ $this->log( "connection error: " . socket_strerror( $error ) );
 58+ $this->markDown();
 59+ return false;
 60+ }
 61+ }
 62+
 63+ return $this->socket;
 64+ }
 65+
 66+ /**
 67+ * Get read socket array for select()
 68+ */
 69+ public function getReadSocketsForSelect() {
 70+ if ( $this->readState == 'idle' ) {
 71+ return array();
 72+ }
 73+ $socket = $this->getSocket();
 74+ if ( $socket === false ) {
 75+ return array();
 76+ }
 77+ return array( $socket );
 78+ }
 79+
 80+ /**
 81+ * Get write socket array for select()
 82+ */
 83+ public function getWriteSocketsForSelect() {
 84+ if ( !strlen( $this->writeBuffer ) ) {
 85+ return array();
 86+ }
 87+ $socket = $this->getSocket();
 88+ if ( $socket === false ) {
 89+ return array();
 90+ }
 91+ return array( $socket );
 92+ }
 93+
 94+ /**
 95+ * Get the host's IP address.
 96+ * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
 97+ */
 98+ protected function getIP() {
 99+ if ( $this->ip === null ) {
 100+ if ( IP::isIPv4( $this->host ) ) {
 101+ $this->ip = $this->host;
 102+ } elseif ( IP::isIPv6( $this->host ) ) {
 103+ throw new MWException( '$wgSquidServers does not support IPv6' );
 104+ } else {
 105+ wfSuppressWarnings();
 106+ $this->ip = gethostbyname( $this->host );
 107+ if ( $this->ip === $this->host ) {
 108+ $this->ip = false;
 109+ }
 110+ wfRestoreWarnings();
 111+ }
 112+ }
 113+ return $this->ip;
 114+ }
 115+
 116+ /**
 117+ * Close the socket and ignore any future purge requests.
 118+ * This is called if there is a protocol error.
 119+ */
 120+ protected function markDown() {
 121+ $this->close();
 122+ $this->socket = false;
 123+ }
 124+
 125+ /**
 126+ * Close the socket but allow it to be reopened for future purge requests
 127+ */
 128+ public function close() {
 129+ if ( $this->socket ) {
 130+ wfSuppressWarnings();
 131+ socket_set_block( $this->socket );
 132+ socket_shutdown( $this->socket );
 133+ socket_close( $this->socket );
 134+ wfRestoreWarnings();
 135+ }
 136+ $this->socket = null;
 137+ $this->readBuffer = '';
 138+ // Write buffer is kept since it may contain a request for the next socket
 139+ }
 140+
 141+ /**
 142+ * Queue a purge operation
 143+ */
 144+ public function queuePurge( $url ) {
 145+ $url = str_replace( "\n", '', $url );
 146+ $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
 147+ "Connection: Keep-Alive\r\n" .
 148+ "Proxy-Connection: Keep-Alive\r\n" .
 149+ "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
 150+ if ( $this->currentRequestIndex === null ) {
 151+ $this->nextRequest();
 152+ }
 153+ }
 154+
 155+ public function isIdle() {
 156+ return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
 157+ }
 158+
 159+ /**
 160+ * Perform pending writes. Call this when socket_select() indicates that writing will not block.
 161+ */
 162+ public function doWrites() {
 163+ if ( !strlen( $this->writeBuffer ) ) {
 164+ return;
 165+ }
 166+ $socket = $this->getSocket();
 167+ if ( !$socket ) {
 168+ return;
 169+ }
 170+
 171+ if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
 172+ $buf = $this->writeBuffer;
 173+ $flags = MSG_EOR;
 174+ } else {
 175+ $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
 176+ $flags = 0;
 177+ }
 178+ wfSuppressWarnings();
 179+ $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
 180+ wfRestoreWarnings();
 181+
 182+ if ( $bytesSent === false ) {
 183+ $error = socket_last_error( $socket );
 184+ if ( $error != self::EAGAIN && $error != self::EINTR ) {
 185+ $this->log( 'write error: ' . socket_strerror( $error ) );
 186+ $this->markDown();
 187+ }
 188+ return;
 189+ }
 190+
 191+ $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
 192+ }
 193+
 194+ /**
 195+ * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
 196+ */
 197+ public function doReads() {
 198+ $socket = $this->getSocket();
 199+ if ( !$socket ) {
 200+ return;
 201+ }
 202+
 203+ $buf = '';
 204+ wfSuppressWarnings();
 205+ $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
 206+ wfRestoreWarnings();
 207+ if ( $bytesRead === false ) {
 208+ $error = socket_last_error( $socket );
 209+ if ( $error != self::EAGAIN && $error != self::EINTR ) {
 210+ $this->log( 'read error: ' . socket_strerror( $error ) );
 211+ $this->markDown();
 212+ return;
 213+ }
 214+ } elseif ( $bytesRead === 0 ) {
 215+ // Assume EOF
 216+ $this->close();
 217+ return;
 218+ }
 219+
 220+ $this->readBuffer .= $buf;
 221+ while ( $this->socket && $this->processReadBuffer() === 'continue' );
 222+ }
 223+
 224+ protected function processReadBuffer() {
 225+ switch ( $this->readState ) {
 226+ case 'idle':
 227+ return 'done';
 228+ case 'status':
 229+ case 'header':
 230+ $lines = explode( "\r\n", $this->readBuffer, 2 );
 231+ if ( count( $lines ) < 2 ) {
 232+ return 'done';
 233+ }
 234+ if ( $this->readState == 'status' ) {
 235+ $this->processStatusLine( $lines[0] );
 236+ } else { // header
 237+ $this->processHeaderLine( $lines[0] );
 238+ }
 239+ $this->readBuffer = $lines[1];
 240+ return 'continue';
 241+ case 'body':
 242+ if ( $this->bodyRemaining !== null ) {
 243+ if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
 244+ $this->bodyRemaining -= strlen( $this->readBuffer );
 245+ $this->readBuffer = '';
 246+ return 'done';
 247+ } else {
 248+ $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
 249+ $this->bodyRemaining = 0;
 250+ $this->nextRequest();
 251+ return 'continue';
 252+ }
 253+ } else {
 254+ // No content length, read all data to EOF
 255+ $this->readBuffer = '';
 256+ return 'done';
 257+ }
 258+ default:
 259+ throw new MWException( __METHOD__.': unexpected state' );
 260+ }
 261+ }
 262+
 263+ protected function processStatusLine( $line ) {
 264+ if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
 265+ $this->log( 'invalid status line' );
 266+ $this->markDown();
 267+ return;
 268+ }
 269+ list( $all, $major, $minor, $status, $reason ) = $m;
 270+ $status = intval( $status );
 271+ if ( $status !== 200 && $status !== 404 ) {
 272+ $this->log( "unexpected status code: $status $reason" );
 273+ $this->markDown();
 274+ return;
 275+ }
 276+ $this->readState = 'header';
 277+ }
 278+
 279+ protected function processHeaderLine( $line ) {
 280+ if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
 281+ $this->bodyRemaining = intval( $m[1] );
 282+ } elseif ( $line === '' ) {
 283+ $this->readState = 'body';
 284+ }
 285+ }
 286+
 287+ protected function nextRequest() {
 288+ if ( $this->currentRequestIndex !== null ) {
 289+ unset( $this->requests[$this->currentRequestIndex] );
 290+ }
 291+ if ( count( $this->requests ) ) {
 292+ $this->readState = 'status';
 293+ $this->currentRequestIndex = key( $this->requests );
 294+ $this->writeBuffer = $this->requests[$this->currentRequestIndex];
 295+ } else {
 296+ $this->readState = 'idle';
 297+ $this->currentRequestIndex = null;
 298+ $this->writeBuffer = '';
 299+ }
 300+ $this->bodyRemaining = null;
 301+ }
 302+
 303+ protected function log( $msg ) {
 304+ wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
 305+ }
 306+}
 307+
 308+class SquidPurgeClientPool {
 309+ var $clients = array();
 310+ var $timeout = 5;
 311+
 312+ function __construct( $options = array() ) {
 313+ if ( isset( $options['timeout'] ) ) {
 314+ $this->timeout = $options['timeout'];
 315+ }
 316+ }
 317+
 318+ public function addClient( $client ) {
 319+ $this->clients[] = $client;
 320+ }
 321+
 322+ public function run() {
 323+ $done = false;
 324+ $startTime = microtime( true );
 325+ while ( !$done ) {
 326+ $readSockets = $writeSockets = array();
 327+ foreach ( $this->clients as $clientIndex => $client ) {
 328+ $sockets = $client->getReadSocketsForSelect();
 329+ foreach ( $sockets as $i => $socket ) {
 330+ $readSockets["$clientIndex/$i"] = $socket;
 331+ }
 332+ $sockets = $client->getWriteSocketsForSelect();
 333+ foreach ( $sockets as $i => $socket ) {
 334+ $writeSockets["$clientIndex/$i"] = $socket;
 335+ }
 336+ }
 337+ if ( !count( $readSockets ) && !count( $writeSockets ) ) {
 338+ break;
 339+ }
 340+ $exceptSockets = null;
 341+ $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
 342+ wfSuppressWarnings();
 343+ $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
 344+ wfRestoreWarnings();
 345+ if ( $numReady === false ) {
 346+ wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' .
 347+ socket_strerror( socket_last_error() ) . "\n" );
 348+ break;
 349+ }
 350+ // Check for timeout, use 1% tolerance since we aimed at having socket_select()
 351+ // exit at precisely the overall timeout
 352+ if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
 353+ wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
 354+ break;
 355+ } elseif ( !$numReady ) {
 356+ continue;
 357+ }
 358+
 359+ foreach ( $readSockets as $key => $socket ) {
 360+ list( $clientIndex, $i ) = explode( '/', $key );
 361+ $client = $this->clients[$clientIndex];
 362+ $client->doReads();
 363+ }
 364+ foreach ( $writeSockets as $key => $socket ) {
 365+ list( $clientIndex, $i ) = explode( '/', $key );
 366+ $client = $this->clients[$clientIndex];
 367+ $client->doWrites();
 368+ }
 369+
 370+ $done = true;
 371+ foreach ( $this->clients as $client ) {
 372+ if ( !$client->isIdle() ) {
 373+ $done = false;
 374+ }
 375+ }
 376+ }
 377+ foreach ( $this->clients as $client ) {
 378+ $client->close();
 379+ }
 380+ }
 381+}
Property changes on: trunk/phase3/includes/SquidPurgeClient.php
___________________________________________________________________
Name: svn:eol-style
1382 + native
Index: trunk/phase3/includes/AutoLoader.php
@@ -214,6 +214,8 @@
215215 'SpecialRedirectToSpecial' => 'includes/SpecialPage.php',
216216 'SqlBagOStuff' => 'includes/BagOStuff.php',
217217 'SquidUpdate' => 'includes/SquidUpdate.php',
 218+ 'SquidPurgeClient' => 'includes/SquidPurgeClient.php',
 219+ 'SquidPurgeClientPool' => 'includes/SquidPurgeClient.php',
218220 'Status' => 'includes/Status.php',
219221 'StubContLang' => 'includes/StubObject.php',
220222 'StubUser' => 'includes/StubObject.php',
Index: trunk/phase3/includes/DefaultSettings.php
@@ -1834,11 +1834,6 @@
18351835 */
18361836 $wgSquidServersNoPurge = array();
18371837
1838 -/**
1839 - * Default character limit for squid purge responses
1840 - */
1841 -$wgSquidResponseLimit = 250;
1842 -
18431838 /** Maximum number of titles to purge in any one client operation */
18441839 $wgMaxSquidPurgeTitles = 400;
18451840
@@ -2011,8 +2006,6 @@
20122007 $wgDebugProfiling = false;
20132008 /** Output debug message on every wfProfileIn/wfProfileOut */
20142009 $wgDebugFunctionEntry = 0;
2015 -/** Lots of debugging output from SquidUpdate.php */
2016 -$wgDebugSquid = false;
20172010
20182011 /*
20192012 * Destination for wfIncrStats() data...
Index: trunk/phase3/includes/SquidUpdate.php
@@ -81,14 +81,14 @@
8282 XXX report broken Squids per mail or log */
8383
8484 static function purge( $urlArr ) {
85 - global $wgSquidServers, $wgHTCPMulticastAddress, $wgHTCPPort, $wgSquidResponseLimit;
 85+ global $wgSquidServers, $wgHTCPMulticastAddress, $wgHTCPPort;
8686
8787 /*if ( (@$wgSquidServers[0]) == 'echo' ) {
8888 echo implode("<br />\n", $urlArr) . "<br />\n";
8989 return;
9090 }*/
9191
92 - if( empty( $urlArr ) ) {
 92+ if( !$urlArr ) {
9393 return;
9494 }
9595
@@ -98,105 +98,26 @@
9999
100100 wfProfileIn( __METHOD__ );
101101
102 - $maxsocketspersquid = 8; // socket cap per Squid
103 - $urlspersocket = 400; // 400 seems to be a good tradeoff, opening a socket takes a while
104 - $firsturl = SquidUpdate::expand( $urlArr[0] );
105 - unset($urlArr[0]);
106 - $urlArr = array_values($urlArr);
107 - $sockspersq = max(ceil(count($urlArr) / $urlspersocket ),1);
108 - if ($sockspersq == 1) {
109 - /* the most common case */
110 - $urlspersocket = count($urlArr);
111 - } else if ($sockspersq > $maxsocketspersquid ) {
112 - $urlspersocket = ceil(count($urlArr) / $maxsocketspersquid);
113 - $sockspersq = $maxsocketspersquid;
 102+ $maxSocketsPerSquid = 8; // socket cap per Squid
 103+ $urlsPerSocket = 400; // 400 seems to be a good tradeoff, opening a socket takes a while
 104+ $socketsPerSquid = ceil( count( $urlArr ) / $urlsPerSocket );
 105+ if ( $socketsPerSquid > $maxSocketsPerSquid ) {
 106+ $socketsPerSquid = $maxSocketsPerSquid;
114107 }
115 - $totalsockets = count($wgSquidServers) * $sockspersq;
116 - $sockets = Array();
117108
118 - /* this sets up the sockets and tests the first socket for each server. */
119 - for ($ss=0;$ss < count($wgSquidServers);$ss++) {
120 - $failed = false;
121 - $so = 0;
122 - while ($so < $sockspersq && !$failed) {
123 - if ($so == 0) {
124 - /* first socket for this server, do the tests */
125 - @list($server, $port) = explode(':', $wgSquidServers[$ss]);
126 - if(!isset($port)) $port = 80;
127 - #$this->debug("Opening socket to $server:$port");
128 - $error = $errstr = false;
129 - $socket = @fsockopen($server, $port, $error, $errstr, 3);
130 - #$this->debug("\n");
131 - if (!$socket) {
132 - $failed = true;
133 - $totalsockets -= $sockspersq;
134 - } else {
135 - $msg = 'PURGE ' . $firsturl . " HTTP/1.0\r\n".
136 - "Connection: Keep-Alive\r\n\r\n";
137 - #$this->debug($msg);
138 - @fputs($socket,$msg);
139 - #$this->debug("...");
140 - $res = @fread($socket,512);
141 - #$this->debug("\n");
142 - /* Squid only returns http headers with 200 or 404 status,
143 - if there's more returned something's wrong */
144 - if (strlen($res) > $wgSquidResponseLimit) {
145 - fclose($socket);
146 - $failed = true;
147 - $totalsockets -= $sockspersq;
148 - } else {
149 - @stream_set_blocking($socket,false);
150 - $sockets[] = $socket;
151 - }
152 - }
153 - } else {
154 - /* open the remaining sockets for this server */
155 - list($server, $port) = explode(':', $wgSquidServers[$ss]);
156 - if(!isset($port)) $port = 80;
157 - $socket = @fsockopen($server, $port, $error, $errstr, 2);
158 - @stream_set_blocking($socket,false);
159 - $sockets[] = $socket;
 109+ $pool = new SquidPurgeClientPool;
 110+ $chunks = array_chunk( $urlArr, ceil( count( $urlArr ) / $socketsPerSquid ) );
 111+ foreach ( $wgSquidServers as $server ) {
 112+ foreach ( $chunks as $chunk ) {
 113+ $client = new SquidPurgeClient( $server );
 114+ foreach ( $chunk as $url ) {
 115+ $client->queuePurge( $url );
160116 }
161 - $so++;
 117+ $pool->addClient( $client );
162118 }
163119 }
 120+ $pool->run();
164121
165 - if ($urlspersocket > 0) {
166 - /* now do the heavy lifting. The fread() relies on Squid returning only the headers */
167 - for ($r=0;$r < $urlspersocket;$r++) {
168 - for ($s=0;$s < $totalsockets;$s++) {
169 - if($r != 0) {
170 - $res = '';
171 - $esc = 0;
172 - while (strlen($res) < 100 && $esc < 200 ) {
173 - $res .= @fread($sockets[$s],512);
174 - $esc++;
175 - usleep(20);
176 - }
177 - }
178 - $urindex = $r + $urlspersocket * ($s - $sockspersq * floor($s / $sockspersq));
179 - $url = SquidUpdate::expand( $urlArr[$urindex] );
180 - $msg = 'PURGE ' . $url . " HTTP/1.0\r\n".
181 - "Connection: Keep-Alive\r\n\r\n";
182 - #$this->debug($msg);
183 - @fputs($sockets[$s],$msg);
184 - #$this->debug("\n");
185 - }
186 - }
187 - }
188 - #$this->debug("Reading response...");
189 - foreach ($sockets as $socket) {
190 - $res = '';
191 - $esc = 0;
192 - while (strlen($res) < 100 && $esc < 200 ) {
193 - $res .= @fread($socket,1024);
194 - $esc++;
195 - usleep(20);
196 - }
197 -
198 - @fclose($socket);
199 - }
200 - #$this->debug("\n");
201122 wfProfileOut( __METHOD__ );
202123 }
203124
@@ -259,13 +180,6 @@
260181 wfProfileOut( __METHOD__ );
261182 }
262183
263 - function debug( $text ) {
264 - global $wgDebugSquid;
265 - if ( $wgDebugSquid ) {
266 - wfDebug( $text );
267 - }
268 - }
269 -
270184 /**
271185 * Expand local URLs to fully-qualified URLs using the internal protocol
272186 * and host defined in $wgInternalServer. Input that's already fully-
Index: trunk/phase3/RELEASE-NOTES
@@ -671,7 +671,9 @@
672672 * (bug 19391) Fix caching for Recent ChangesFeed.
673673 * (bug 21455) Fixed "Watch this page" checkbox appearing on some special pages
674674 even to non-logged in users
675 -* (bug 21551) Make Squid reponse limit configurable
 675+* (bug 21551) Rewrote the Squid purge HTTP client to provide a more robust and
 676+ general implementation of HTTP, allowing it to purge non-Squid caches such as
 677+ Varnish.
676678 * Fixed corruption of long UDP debug log messages by using socket_sendto()
677679 instead of fsockopen() with fwrite().
678680 * (bug 16884) Fixed feed links in sidebar not complying with URL parameters

Past revisions this follows-up on

RevisionCommit summaryAuthorDate
r59178(bug 21551) Make Squid response limit configurable. Patch by Roi Aminoamcatrope19:04, 17 November 2009

Comments

#Comment by Platonides (talk | contribs)   12:43, 5 February 2010

Shouldn't newFromTitles() also use getSquidURLs() as in http://bug-attachment.wikimedia.org/attachment.cgi?id=6722 (the DPL part isn't applicable)? Seem that the variants won't be purged. Or is it intentional?

#Comment by Tim Starling (talk | contribs)   03:51, 19 February 2010

That has nothing to do with this revision.

#Comment by Platonides (talk | contribs)   21:25, 2 March 2010

It's related to squid code refactoring.

Treat it as a ping if you prefer.

Status & tagging log