r71805 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r71804‎ | r71805 | r71806 >
Date:20:57, 27 August 2010
Author:platonides
Status:resolved (Comments)
Tags:
Comment:
Make a bunch of incompatible changes to the PoolCounter.
It wasn't finished, so it's not a big deal.

* Use the term workers instead of threads, which fits better for a multiserver setup.
* The API is now more accurate for our goals (I hope).
* Add support for using the parse from another worker.
* Use child classes instead of array callbacks.
* The daemon is written in C using libevent instead of python using twistd.
* The hash function used is that of Bob Jenkins, with files hash.c and hash.h directly copied from memcached 1.4.5
* Although similar in a few aspects to memcached assoc.c hash table, this is a different hash table implementation. Most important:
** The usage of a double linked list in the hash table.
** Growing is not performed using a maintenance thread. Since the entries are shortlived, it just waits for the old hash table to disappear.
* Note: valgrind 3.5.0 (2009-8-19) does not support accept4 (added in r10955, 2009-11-25). In the meantime you need to use HAVE_ACCEPT4=0 for running with valgrind (as you would need for a non-linux system).
* Sending SIGUSR1 to the daemon gracefully restarts it. The maximum limits will be doubled until the old instance finishes (ie. all its client connections expire).
* Do not try to test it with instances calling an ?action=purge They will serialize on the "UPDATE `page` SET page_touched" query instead of being serialized by the PoolCounter.
* The workers parameter is not stored by the poolcounter. It is expected that all requests with the same key will also have the same value. A reduction in new entries will not take effect if that number is working (not even when they end, if there are waiting entries). But an increase will increase throughput even for old queued requests.
Modified paths:
  • /trunk/extensions/PoolCounter/PoolCounterClient.i18n.php (modified) (history)
  • /trunk/extensions/PoolCounter/PoolCounterClient.php (modified) (history)
  • /trunk/extensions/PoolCounter/PoolCounterClient_body.php (modified) (history)
  • /trunk/extensions/PoolCounter/daemon (added) (history)
  • /trunk/extensions/PoolCounter/daemon/Makefile (added) (history)
  • /trunk/extensions/PoolCounter/daemon/client_data.c (added) (history)
  • /trunk/extensions/PoolCounter/daemon/client_data.h (added) (history)
  • /trunk/extensions/PoolCounter/daemon/hash.c (added) (history)
  • /trunk/extensions/PoolCounter/daemon/hash.h (added) (history)
  • /trunk/extensions/PoolCounter/daemon/locks.c (added) (history)
  • /trunk/extensions/PoolCounter/daemon/locks.h (added) (history)
  • /trunk/extensions/PoolCounter/daemon/main.c (added) (history)
  • /trunk/extensions/PoolCounter/daemon/prototypes.h (added) (history)
  • /trunk/phase3/includes/Article.php (modified) (history)
  • /trunk/phase3/includes/AutoLoader.php (modified) (history)
  • /trunk/phase3/includes/PoolCounter.php (modified) (history)

Diff [purge]

Index: trunk/phase3/includes/Article.php
@@ -997,13 +997,10 @@
998998
999999 $this->checkTouched();
10001000 $key = $parserCache->getKey( $this, $parserOptions );
1001 - $poolCounter = PoolCounter::factory( 'Article::view', $key );
1002 - $dirtyCallback = $useParserCache ? array( $this, 'tryDirtyCache' ) : false;
1003 - $status = $poolCounter->executeProtected( array( $this, 'doViewParse' ), $dirtyCallback );
1004 -
1005 - if ( !$status->isOK() ) {
 1001+ $poolArticleView = new PoolWorkArticleView( $this, $key, $useParserCache, $parserOptions );
 1002+
 1003+ if ( !$poolArticleView->execute() ) {
10061004 # Connection or timeout error
1007 - $this->showPoolError( $status );
10081005 wfProfileOut( __METHOD__ );
10091006 return;
10101007 } else {
@@ -1482,6 +1479,8 @@
14831480
14841481 $useParserCache = $this->useParserCache( $oldid );
14851482 $this->outputWikiText( $this->getContent(), $useParserCache, $parserOptions );
 1483+
 1484+ return true;
14861485 }
14871486
14881487 /**
@@ -1522,23 +1521,6 @@
15231522 }
15241523
15251524 /**
1526 - * Show an error page for an error from the pool counter.
1527 - * @param $status Status
1528 - */
1529 - public function showPoolError( $status ) {
1530 - global $wgOut;
1531 -
1532 - $wgOut->clearHTML(); // for release() errors
1533 - $wgOut->enableClientCache( false );
1534 - $wgOut->setRobotPolicy( 'noindex,nofollow' );
1535 - $wgOut->addWikiText(
1536 - '<div class="errorbox">' .
1537 - $status->getWikiText( false, 'view-pool-error' ) .
1538 - '</div>'
1539 - );
1540 - }
1541 -
1542 - /**
15431525 * View redirect
15441526 *
15451527 * @param $target Title object or Array of destination(s) to redirect
@@ -4632,3 +4614,56 @@
46334615 }
46344616
46354617 }
 4618+
 4619+class PoolWorkArticleView extends PoolCounterWork {
 4620+ private $mArticle;
 4621+
 4622+ function __construct( $article, $key, $useParserCache, $parserOptions ) {
 4623+ parent::__construct( __CLASS__, $key );
 4624+ $this->mArticle = $article;
 4625+ $this->cacheable = $useParserCache;
 4626+ $this->parserOptions = $parserOptions;
 4627+ }
 4628+
 4629+ function doWork() {
 4630+ return $this->mArticle->doViewParse();
 4631+ }
 4632+
 4633+ function getCachedWork() {
 4634+ global $wgOut;
 4635+
 4636+ $parserCache = ParserCache::singleton();
 4637+ $this->mArticle->mParserOutput = $parserCache->get( $this->mArticle, $this->parserOptions );
 4638+
 4639+ if ( $this->mArticle->mParserOutput !== false ) {
 4640+ wfDebug( __METHOD__ . ": showing contents parsed by someone else\n" );
 4641+ $wgOut->addParserOutput( $this->mArticle->mParserOutput );
 4642+ # Ensure that UI elements requiring revision ID have
 4643+ # the correct version information.
 4644+ $wgOut->setRevisionId( $this->mArticle->getLatest() );
 4645+ return true;
 4646+ }
 4647+ return false;
 4648+ }
 4649+
 4650+ function fallback() {
 4651+ return $this->mArticle->tryDirtyCache();
 4652+ }
 4653+
 4654+ function error( $status ) {
 4655+ global $wgOut;
 4656+
 4657+ $wgOut->clearHTML(); // for release() errors
 4658+ $wgOut->enableClientCache( false );
 4659+ $wgOut->setRobotPolicy( 'noindex,nofollow' );
 4660+
 4661+ if ( $status instanceof Status ) {
 4662+ $errortext = $status->getWikiText( false, 'view-pool-error' );
 4663+ } else {
 4664+ $errortext = wfMsgNoTrans( 'view-pool-error', '' );
 4665+ }
 4666+ $wgOut->addWikiText( '<div class="errorbox">' . $errortext . '</div>' );
 4667+
 4668+ return false;
 4669+ }
 4670+}
Index: trunk/phase3/includes/AutoLoader.php
@@ -177,6 +177,7 @@
178178 'PatrolLog' => 'includes/PatrolLog.php',
179179 'PoolCounter' => 'includes/PoolCounter.php',
180180 'PoolCounter_Stub' => 'includes/PoolCounter.php',
 181+ 'PoolCounterWork' => 'includes/PoolCounter.php',
181182 'Preferences' => 'includes/Preferences.php',
182183 'PrefixSearch' => 'includes/PrefixSearch.php',
183184 'Profiler' => 'includes/Profiler.php',
Index: trunk/phase3/includes/PoolCounter.php
@@ -1,6 +1,66 @@
22 <?php
33
 4+/**
 5+ * When you have many workers (threads/servers) giving service, and a
 6+ * cached item expensive to produce expires, you may get several workers
 7+ * doing the job at the same time.
 8+ * Given enough requests and the item expiring fast (non-cacheable,
 9+ * lots of edits...) that single work can end up unfairly using most (all)
 10+ * of the cpu of the pool. This is also known as 'Michael Jackson effect'.
 11+ * The PoolCounter provides semaphore semantics for restricting the number
 12+ * of workers that may be concurrently performing such single task.
 13+ */
 14+
415 abstract class PoolCounter {
 16+
 17+ /* Return codes */
 18+ const LOCKED = 1; /* Lock acquired */
 19+ const RELEASED = 2; /* Lock released */
 20+ const DONE = 3; /* Another one did the work for you */
 21+
 22+ const ERROR = -1; /* Indeterminate error */
 23+ const NOT_LOCKED = -2; /* Called release() with no lock held */
 24+ const QUEUE_FULL = -3; /* There are already maxqueue workers on this lock */
 25+ const TIMEOUT = -4; /* Timeout exceeded */
 26+ const LOCK_HELD = -5; /* Cannot acquire another lock while you have one lock held */
 27+
 28+ /**
 29+ * I want to do this task and I need to do it myself.
 30+ *
 31+ * @return Locked/Error
 32+ */
 33+ abstract function acquireForMe();
 34+
 35+ /**
 36+ * I want to do this task, but if anyone else does it
 37+ * instead, it's also fine for me. I will read its cached data.
 38+ *
 39+ * @return Locked/Done/Error
 40+ */
 41+ abstract function acquireForAnyone();
 42+
 43+ /**
 44+ * I have successfully finished my task.
 45+ * Lets another one grab the lock, and returns the workers
 46+ * waiting on acquireForAnyone()
 47+ *
 48+ * @return Released/NotLocked/Error
 49+ */
 50+ abstract function release();
 51+
 52+ /**
 53+ * $key: All workers with the same key share the lock.
 54+ * $workers: It wouldn't be a good idea to have more than this number of
 55+ * workers doing the task simultaneously.
 56+ * $maxqueue: If this number of workers are already working/waiting,
 57+ * fail instead of wait.
 58+ * $timeout: Maximum time in seconds to wait for the lock.
 59+ */
 60+ protected $key, $workers, $maxqueue, $timeout;
 61+
 62+ /**
 63+ * Create a Pool counter. This should only be called from the PoolWorks.
 64+ */
565 public static function factory( $type, $key ) {
666 global $wgPoolCounterConf;
767 if ( !isset( $wgPoolCounterConf[$type] ) ) {
@@ -8,57 +68,114 @@
969 }
1070 $conf = $wgPoolCounterConf[$type];
1171 $class = $conf['class'];
 72+
1273 return new $class( $conf, $type, $key );
1374 }
14 -
15 - abstract public function acquire();
16 - abstract public function release();
17 - abstract public function wait();
18 -
19 - public function executeProtected( $mainCallback, $dirtyCallback = false ) {
20 - $status = $this->acquire();
21 - if ( !$status->isOK() ) {
22 - return $status;
23 - }
24 - if ( !empty( $status->value['overload'] ) ) {
25 - # Overloaded. Try a dirty cache entry.
26 - if ( $dirtyCallback ) {
27 - if ( call_user_func( $dirtyCallback ) ) {
28 - $this->release();
29 - return Status::newGood();
30 - }
31 - }
32 -
33 - # Wait for a thread
34 - $status = $this->wait();
35 - if ( !$status->isOK() ) {
36 - $this->release();
37 - return $status;
38 - }
39 - }
40 - # Call the main callback
41 - call_user_func( $mainCallback );
42 - return $this->release();
 75+
 76+ protected function __construct( $conf, $type, $key ) {
 77+ $this->key = $key;
 78+ $this->workers = $conf['workers'];
 79+ $this->maxqueue = $conf['maxqueue'];
 80+ $this->timeout = $conf['timeout'];
4381 }
4482 }
4583
4684 class PoolCounter_Stub extends PoolCounter {
47 - public function acquire() {
48 - return Status::newGood();
 85+ function acquireForMe() {
 86+ return PoolCounter::LOCKED;
4987 }
5088
51 - public function release() {
52 - return Status::newGood();
 89+ function acquireForAnyone() {
 90+ return PoolCounter::LOCKED;
5391 }
5492
55 - public function wait() {
56 - return Status::newGood();
 93+ function release() {
 94+ return PoolCounter::RELEASED;
5795 }
58 -
59 - public function executeProtected( $mainCallback, $dirtyCallback = false ) {
60 - call_user_func( $mainCallback );
61 - return Status::newGood();
 96+
 97+ public function __construct() {
 98+ /* No parameters needed */
6299 }
63100 }
64101
 102+/**
 103+ * Handy class for dealing with PoolCounters using class members instead of callbacks.
 104+ */
 105+abstract class PoolCounterWork {
 106+ protected $cacheable = false; //Does this override getCachedWork() ?
 107+
 108+ /**
 109+ * Actually perform the work, caching it if needed.
 110+ */
 111+ abstract function doWork();
65112
 113+ /**
 114+ * Retrieve the work from cache
 115+ * @return mixed work result or false
 116+ */
 117+ function getCachedWork() {
 118+ return false;
 119+ }
 120+
 121+ /**
 122+ * A work not so good (eg. expired one) but better than an error
 123+ * message.
 124+ * @return mixed work result or false
 125+ */
 126+ function fallback() {
 127+ return false;
 128+ }
 129+
 130+ /**
 131+ * Do something with the error, like showing it to the user.
 132+ */
 133+ function error( $status ) {
 134+ return false;
 135+ }
 136+
 137+ /**
 138+ * Get the result of the work (whatever it is), or false.
 139+ */
 140+ function execute( $skipcache = false ) {
 141+ if ( $this->cacheable && !$skipcache ) {
 142+ $status = $this->poolCounter->acquireForAnyone();
 143+ } else {
 144+ $status = $this->poolCounter->acquireForMe();
 145+ }
 146+
 147+ $result = false;
 148+ switch ( is_int( $status ) ? $status : PoolCounter::ERROR ) {
 149+ case PoolCounter::LOCKED:
 150+ $result = $this->doWork();
 151+ $this->poolCounter->release();
 152+ return $result;
 153+
 154+ case PoolCounter::DONE:
 155+ $result = $this->getCachedWork();
 156+ if ( $result === false ) {
 157+ /* That someone else work didn't serve us.
 158+ * Acquire the lock for me
 159+ */
 160+ return $this->execute( true );
 161+ }
 162+ return $result;
 163+
 164+ case PoolCounter::QUEUE_FULL:
 165+ case PoolCounter::TIMEOUT:
 166+ $result = $this->fallback();
 167+
 168+ if ( $result !== false ) {
 169+ return $result;
 170+ }
 171+ /* no break */
 172+
 173+ case PoolCounter::ERROR:
 174+ default:
 175+ return $this->error( $status );
 176+ }
 177+ }
 178+
 179+ function __construct( $type, $key ) {
 180+ $this->poolCounter = PoolCounter::factory( $type, $key );
 181+ }
 182+}
Index: trunk/extensions/PoolCounter/daemon/locks.h
@@ -0,0 +1,53 @@
 2+#ifndef LOCKS_H
 3+#define LOCKS_H
 4+
 5+#include <stdint.h>
 6+
 7+/* This application uses several double linked lists.
 8+ * They are circular lists, new items are added on the end (ie. on prev)
 9+ * and popped from next.
 10+ */
 11+struct double_linked_list {
 12+ struct double_linked_list* prev;
 13+ struct double_linked_list* next;
 14+};
 15+
 16+struct hashtable_entry {
 17+ struct double_linked_list hashtable_siblings;
 18+ struct hashtable* parent_hashtable;
 19+ uint32_t key_hash;
 20+ char* key;
 21+};
 22+
 23+struct PoolCounter {
 24+ struct hashtable_entry htentry;
 25+
 26+ uint32_t count;
 27+ int processing;
 28+
 29+ struct double_linked_list working;
 30+ struct double_linked_list for_them;
 31+ struct double_linked_list for_anyone;
 32+};
 33+
 34+struct locks {
 35+ struct double_linked_list siblings;
 36+ struct PoolCounter* parent;
 37+ enum lock_state { UNLOCKED, WAITING, WAIT_ANY, PROCESSING } state;
 38+};
 39+
 40+struct client_data;
 41+void init_lock(struct locks* l);
 42+void finish_lock(struct locks* l);
 43+char* process_line(struct client_data* cli_data, char* line, int line_len);
 44+void process_timeout(struct locks* l);
 45+void remove_client_lock(struct locks* l, int wakeup_anyones);
 46+void send_client(struct locks* l, const char* msg);
 47+
 48+
 49+void hashtable_init();
 50+struct hashtable* hashtable_create(int hashpower);
 51+void* hashtable_find(struct hashtable* ht, uint32_t hash_value, const char* key);
 52+void hashtable_insert(struct hashtable* ht, struct hashtable_entry* htentry);
 53+void hashtable_remove(struct hashtable* ht, struct hashtable_entry* htentry);
 54+#endif
Property changes on: trunk/extensions/PoolCounter/daemon/locks.h
___________________________________________________________________
Added: svn:eol-style
155 + native
Index: trunk/extensions/PoolCounter/daemon/hash.c
@@ -0,0 +1,432 @@
 2+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 3+/*
 4+ * Hash table
 5+ *
 6+ * The hash function used here is by Bob Jenkins, 1996:
 7+ * <http://burtleburtle.net/bob/hash/doobs.html>
 8+ * "By Bob Jenkins, 1996. bob_jenkins@burtleburtle.net.
 9+ * You may use this code any way you wish, private, educational,
 10+ * or commercial. It's free."
 11+ *
 12+ */
 13+#include <stddef.h>
 14+#include <inttypes.h>
 15+
 16+/*
 17+ * Since the hash function does bit manipulation, it needs to know
 18+ * whether it's big or little-endian. ENDIAN_LITTLE and ENDIAN_BIG
 19+ * are set in the configure script.
 20+ */
 21+#if ENDIAN_BIG == 1
 22+# define HASH_LITTLE_ENDIAN 0
 23+# define HASH_BIG_ENDIAN 1
 24+#else
 25+# if ENDIAN_LITTLE == 1
 26+# define HASH_LITTLE_ENDIAN 1
 27+# define HASH_BIG_ENDIAN 0
 28+# else
 29+# define HASH_LITTLE_ENDIAN 0
 30+# define HASH_BIG_ENDIAN 0
 31+# endif
 32+#endif
 33+
 34+#define rot(x,k) (((x)<<(k)) ^ ((x)>>(32-(k))))
 35+
 36+/*
 37+-------------------------------------------------------------------------------
 38+mix -- mix 3 32-bit values reversibly.
 39+
 40+This is reversible, so any information in (a,b,c) before mix() is
 41+still in (a,b,c) after mix().
 42+
 43+If four pairs of (a,b,c) inputs are run through mix(), or through
 44+mix() in reverse, there are at least 32 bits of the output that
 45+are sometimes the same for one pair and different for another pair.
 46+This was tested for:
 47+* pairs that differed by one bit, by two bits, in any combination
 48+ of top bits of (a,b,c), or in any combination of bottom bits of
 49+ (a,b,c).
 50+* "differ" is defined as +, -, ^, or ~^. For + and -, I transformed
 51+ the output delta to a Gray code (a^(a>>1)) so a string of 1's (as
 52+ is commonly produced by subtraction) look like a single 1-bit
 53+ difference.
 54+* the base values were pseudorandom, all zero but one bit set, or
 55+ all zero plus a counter that starts at zero.
 56+
 57+Some k values for my "a-=c; a^=rot(c,k); c+=b;" arrangement that
 58+satisfy this are
 59+ 4 6 8 16 19 4
 60+ 9 15 3 18 27 15
 61+ 14 9 3 7 17 3
 62+Well, "9 15 3 18 27 15" didn't quite get 32 bits diffing
 63+for "differ" defined as + with a one-bit base and a two-bit delta. I
 64+used http://burtleburtle.net/bob/hash/avalanche.html to choose
 65+the operations, constants, and arrangements of the variables.
 66+
 67+This does not achieve avalanche. There are input bits of (a,b,c)
 68+that fail to affect some output bits of (a,b,c), especially of a. The
 69+most thoroughly mixed value is c, but it doesn't really even achieve
 70+avalanche in c.
 71+
 72+This allows some parallelism. Read-after-writes are good at doubling
 73+the number of bits affected, so the goal of mixing pulls in the opposite
 74+direction as the goal of parallelism. I did what I could. Rotates
 75+seem to cost as much as shifts on every machine I could lay my hands
 76+on, and rotates are much kinder to the top and bottom bits, so I used
 77+rotates.
 78+-------------------------------------------------------------------------------
 79+*/
 80+#define mix(a,b,c) \
 81+{ \
 82+ a -= c; a ^= rot(c, 4); c += b; \
 83+ b -= a; b ^= rot(a, 6); a += c; \
 84+ c -= b; c ^= rot(b, 8); b += a; \
 85+ a -= c; a ^= rot(c,16); c += b; \
 86+ b -= a; b ^= rot(a,19); a += c; \
 87+ c -= b; c ^= rot(b, 4); b += a; \
 88+}
 89+
 90+/*
 91+-------------------------------------------------------------------------------
 92+final -- final mixing of 3 32-bit values (a,b,c) into c
 93+
 94+Pairs of (a,b,c) values differing in only a few bits will usually
 95+produce values of c that look totally different. This was tested for
 96+* pairs that differed by one bit, by two bits, in any combination
 97+ of top bits of (a,b,c), or in any combination of bottom bits of
 98+ (a,b,c).
 99+* "differ" is defined as +, -, ^, or ~^. For + and -, I transformed
 100+ the output delta to a Gray code (a^(a>>1)) so a string of 1's (as
 101+ is commonly produced by subtraction) look like a single 1-bit
 102+ difference.
 103+* the base values were pseudorandom, all zero but one bit set, or
 104+ all zero plus a counter that starts at zero.
 105+
 106+These constants passed:
 107+ 14 11 25 16 4 14 24
 108+ 12 14 25 16 4 14 24
 109+and these came close:
 110+ 4 8 15 26 3 22 24
 111+ 10 8 15 26 3 22 24
 112+ 11 8 15 26 3 22 24
 113+-------------------------------------------------------------------------------
 114+*/
 115+#define final(a,b,c) \
 116+{ \
 117+ c ^= b; c -= rot(b,14); \
 118+ a ^= c; a -= rot(c,11); \
 119+ b ^= a; b -= rot(a,25); \
 120+ c ^= b; c -= rot(b,16); \
 121+ a ^= c; a -= rot(c,4); \
 122+ b ^= a; b -= rot(a,14); \
 123+ c ^= b; c -= rot(b,24); \
 124+}
 125+
 126+#if HASH_LITTLE_ENDIAN == 1
 127+uint32_t hash(
 128+ const void *key, /* the key to hash */
 129+ size_t length, /* length of the key */
 130+ const uint32_t initval) /* initval */
 131+{
 132+ uint32_t a,b,c; /* internal state */
 133+ union { const void *ptr; size_t i; } u; /* needed for Mac Powerbook G4 */
 134+
 135+ /* Set up the internal state */
 136+ a = b = c = 0xdeadbeef + ((uint32_t)length) + initval;
 137+
 138+ u.ptr = key;
 139+ if (HASH_LITTLE_ENDIAN && ((u.i & 0x3) == 0)) {
 140+ const uint32_t *k = key; /* read 32-bit chunks */
 141+#ifdef VALGRIND
 142+ const uint8_t *k8;
 143+#endif /* ifdef VALGRIND */
 144+
 145+ /*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */
 146+ while (length > 12)
 147+ {
 148+ a += k[0];
 149+ b += k[1];
 150+ c += k[2];
 151+ mix(a,b,c);
 152+ length -= 12;
 153+ k += 3;
 154+ }
 155+
 156+ /*----------------------------- handle the last (probably partial) block */
 157+ /*
 158+ * "k[2]&0xffffff" actually reads beyond the end of the string, but
 159+ * then masks off the part it's not allowed to read. Because the
 160+ * string is aligned, the masked-off tail is in the same word as the
 161+ * rest of the string. Every machine with memory protection I've seen
 162+ * does it on word boundaries, so is OK with this. But VALGRIND will
 163+ * still catch it and complain. The masking trick does make the hash
 164+ * noticably faster for short strings (like English words).
 165+ */
 166+#ifndef VALGRIND
 167+
 168+ switch(length)
 169+ {
 170+ case 12: c+=k[2]; b+=k[1]; a+=k[0]; break;
 171+ case 11: c+=k[2]&0xffffff; b+=k[1]; a+=k[0]; break;
 172+ case 10: c+=k[2]&0xffff; b+=k[1]; a+=k[0]; break;
 173+ case 9 : c+=k[2]&0xff; b+=k[1]; a+=k[0]; break;
 174+ case 8 : b+=k[1]; a+=k[0]; break;
 175+ case 7 : b+=k[1]&0xffffff; a+=k[0]; break;
 176+ case 6 : b+=k[1]&0xffff; a+=k[0]; break;
 177+ case 5 : b+=k[1]&0xff; a+=k[0]; break;
 178+ case 4 : a+=k[0]; break;
 179+ case 3 : a+=k[0]&0xffffff; break;
 180+ case 2 : a+=k[0]&0xffff; break;
 181+ case 1 : a+=k[0]&0xff; break;
 182+ case 0 : return c; /* zero length strings require no mixing */
 183+ }
 184+
 185+#else /* make valgrind happy */
 186+
 187+ k8 = (const uint8_t *)k;
 188+ switch(length)
 189+ {
 190+ case 12: c+=k[2]; b+=k[1]; a+=k[0]; break;
 191+ case 11: c+=((uint32_t)k8[10])<<16; /* fall through */
 192+ case 10: c+=((uint32_t)k8[9])<<8; /* fall through */
 193+ case 9 : c+=k8[8]; /* fall through */
 194+ case 8 : b+=k[1]; a+=k[0]; break;
 195+ case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */
 196+ case 6 : b+=((uint32_t)k8[5])<<8; /* fall through */
 197+ case 5 : b+=k8[4]; /* fall through */
 198+ case 4 : a+=k[0]; break;
 199+ case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */
 200+ case 2 : a+=((uint32_t)k8[1])<<8; /* fall through */
 201+ case 1 : a+=k8[0]; break;
 202+ case 0 : return c; /* zero length strings require no mixing */
 203+ }
 204+
 205+#endif /* !valgrind */
 206+
 207+ } else if (HASH_LITTLE_ENDIAN && ((u.i & 0x1) == 0)) {
 208+ const uint16_t *k = key; /* read 16-bit chunks */
 209+ const uint8_t *k8;
 210+
 211+ /*--------------- all but last block: aligned reads and different mixing */
 212+ while (length > 12)
 213+ {
 214+ a += k[0] + (((uint32_t)k[1])<<16);
 215+ b += k[2] + (((uint32_t)k[3])<<16);
 216+ c += k[4] + (((uint32_t)k[5])<<16);
 217+ mix(a,b,c);
 218+ length -= 12;
 219+ k += 6;
 220+ }
 221+
 222+ /*----------------------------- handle the last (probably partial) block */
 223+ k8 = (const uint8_t *)k;
 224+ switch(length)
 225+ {
 226+ case 12: c+=k[4]+(((uint32_t)k[5])<<16);
 227+ b+=k[2]+(((uint32_t)k[3])<<16);
 228+ a+=k[0]+(((uint32_t)k[1])<<16);
 229+ break;
 230+ case 11: c+=((uint32_t)k8[10])<<16; /* @fallthrough */
 231+ case 10: c+=k[4]; /* @fallthrough@ */
 232+ b+=k[2]+(((uint32_t)k[3])<<16);
 233+ a+=k[0]+(((uint32_t)k[1])<<16);
 234+ break;
 235+ case 9 : c+=k8[8]; /* @fallthrough */
 236+ case 8 : b+=k[2]+(((uint32_t)k[3])<<16);
 237+ a+=k[0]+(((uint32_t)k[1])<<16);
 238+ break;
 239+ case 7 : b+=((uint32_t)k8[6])<<16; /* @fallthrough */
 240+ case 6 : b+=k[2];
 241+ a+=k[0]+(((uint32_t)k[1])<<16);
 242+ break;
 243+ case 5 : b+=k8[4]; /* @fallthrough */
 244+ case 4 : a+=k[0]+(((uint32_t)k[1])<<16);
 245+ break;
 246+ case 3 : a+=((uint32_t)k8[2])<<16; /* @fallthrough */
 247+ case 2 : a+=k[0];
 248+ break;
 249+ case 1 : a+=k8[0];
 250+ break;
 251+ case 0 : return c; /* zero length strings require no mixing */
 252+ }
 253+
 254+ } else { /* need to read the key one byte at a time */
 255+ const uint8_t *k = key;
 256+
 257+ /*--------------- all but the last block: affect some 32 bits of (a,b,c) */
 258+ while (length > 12)
 259+ {
 260+ a += k[0];
 261+ a += ((uint32_t)k[1])<<8;
 262+ a += ((uint32_t)k[2])<<16;
 263+ a += ((uint32_t)k[3])<<24;
 264+ b += k[4];
 265+ b += ((uint32_t)k[5])<<8;
 266+ b += ((uint32_t)k[6])<<16;
 267+ b += ((uint32_t)k[7])<<24;
 268+ c += k[8];
 269+ c += ((uint32_t)k[9])<<8;
 270+ c += ((uint32_t)k[10])<<16;
 271+ c += ((uint32_t)k[11])<<24;
 272+ mix(a,b,c);
 273+ length -= 12;
 274+ k += 12;
 275+ }
 276+
 277+ /*-------------------------------- last block: affect all 32 bits of (c) */
 278+ switch(length) /* all the case statements fall through */
 279+ {
 280+ case 12: c+=((uint32_t)k[11])<<24;
 281+ case 11: c+=((uint32_t)k[10])<<16;
 282+ case 10: c+=((uint32_t)k[9])<<8;
 283+ case 9 : c+=k[8];
 284+ case 8 : b+=((uint32_t)k[7])<<24;
 285+ case 7 : b+=((uint32_t)k[6])<<16;
 286+ case 6 : b+=((uint32_t)k[5])<<8;
 287+ case 5 : b+=k[4];
 288+ case 4 : a+=((uint32_t)k[3])<<24;
 289+ case 3 : a+=((uint32_t)k[2])<<16;
 290+ case 2 : a+=((uint32_t)k[1])<<8;
 291+ case 1 : a+=k[0];
 292+ break;
 293+ case 0 : return c; /* zero length strings require no mixing */
 294+ }
 295+ }
 296+
 297+ final(a,b,c);
 298+ return c; /* zero length strings require no mixing */
 299+}
 300+
 301+#elif HASH_BIG_ENDIAN == 1
 302+/*
 303+ * hashbig():
 304+ * This is the same as hashword() on big-endian machines. It is different
 305+ * from hashlittle() on all machines. hashbig() takes advantage of
 306+ * big-endian byte ordering.
 307+ */
 308+uint32_t hash( const void *key, size_t length, const uint32_t initval)
 309+{
 310+ uint32_t a,b,c;
 311+ union { const void *ptr; size_t i; } u; /* to cast key to (size_t) happily */
 312+
 313+ /* Set up the internal state */
 314+ a = b = c = 0xdeadbeef + ((uint32_t)length) + initval;
 315+
 316+ u.ptr = key;
 317+ if (HASH_BIG_ENDIAN && ((u.i & 0x3) == 0)) {
 318+ const uint32_t *k = key; /* read 32-bit chunks */
 319+#ifdef VALGRIND
 320+ const uint8_t *k8;
 321+#endif /* ifdef VALGRIND */
 322+
 323+ /*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */
 324+ while (length > 12)
 325+ {
 326+ a += k[0];
 327+ b += k[1];
 328+ c += k[2];
 329+ mix(a,b,c);
 330+ length -= 12;
 331+ k += 3;
 332+ }
 333+
 334+ /*----------------------------- handle the last (probably partial) block */
 335+ /*
 336+ * "k[2]<<8" actually reads beyond the end of the string, but
 337+ * then shifts out the part it's not allowed to read. Because the
 338+ * string is aligned, the illegal read is in the same word as the
 339+ * rest of the string. Every machine with memory protection I've seen
 340+ * does it on word boundaries, so is OK with this. But VALGRIND will
 341+ * still catch it and complain. The masking trick does make the hash
 342+ * noticably faster for short strings (like English words).
 343+ */
 344+#ifndef VALGRIND
 345+
 346+ switch(length)
 347+ {
 348+ case 12: c+=k[2]; b+=k[1]; a+=k[0]; break;
 349+ case 11: c+=k[2]&0xffffff00; b+=k[1]; a+=k[0]; break;
 350+ case 10: c+=k[2]&0xffff0000; b+=k[1]; a+=k[0]; break;
 351+ case 9 : c+=k[2]&0xff000000; b+=k[1]; a+=k[0]; break;
 352+ case 8 : b+=k[1]; a+=k[0]; break;
 353+ case 7 : b+=k[1]&0xffffff00; a+=k[0]; break;
 354+ case 6 : b+=k[1]&0xffff0000; a+=k[0]; break;
 355+ case 5 : b+=k[1]&0xff000000; a+=k[0]; break;
 356+ case 4 : a+=k[0]; break;
 357+ case 3 : a+=k[0]&0xffffff00; break;
 358+ case 2 : a+=k[0]&0xffff0000; break;
 359+ case 1 : a+=k[0]&0xff000000; break;
 360+ case 0 : return c; /* zero length strings require no mixing */
 361+ }
 362+
 363+#else /* make valgrind happy */
 364+
 365+ k8 = (const uint8_t *)k;
 366+ switch(length) /* all the case statements fall through */
 367+ {
 368+ case 12: c+=k[2]; b+=k[1]; a+=k[0]; break;
 369+ case 11: c+=((uint32_t)k8[10])<<8; /* fall through */
 370+ case 10: c+=((uint32_t)k8[9])<<16; /* fall through */
 371+ case 9 : c+=((uint32_t)k8[8])<<24; /* fall through */
 372+ case 8 : b+=k[1]; a+=k[0]; break;
 373+ case 7 : b+=((uint32_t)k8[6])<<8; /* fall through */
 374+ case 6 : b+=((uint32_t)k8[5])<<16; /* fall through */
 375+ case 5 : b+=((uint32_t)k8[4])<<24; /* fall through */
 376+ case 4 : a+=k[0]; break;
 377+ case 3 : a+=((uint32_t)k8[2])<<8; /* fall through */
 378+ case 2 : a+=((uint32_t)k8[1])<<16; /* fall through */
 379+ case 1 : a+=((uint32_t)k8[0])<<24; break;
 380+ case 0 : return c;
 381+ }
 382+
 383+#endif /* !VALGRIND */
 384+
 385+ } else { /* need to read the key one byte at a time */
 386+ const uint8_t *k = key;
 387+
 388+ /*--------------- all but the last block: affect some 32 bits of (a,b,c) */
 389+ while (length > 12)
 390+ {
 391+ a += ((uint32_t)k[0])<<24;
 392+ a += ((uint32_t)k[1])<<16;
 393+ a += ((uint32_t)k[2])<<8;
 394+ a += ((uint32_t)k[3]);
 395+ b += ((uint32_t)k[4])<<24;
 396+ b += ((uint32_t)k[5])<<16;
 397+ b += ((uint32_t)k[6])<<8;
 398+ b += ((uint32_t)k[7]);
 399+ c += ((uint32_t)k[8])<<24;
 400+ c += ((uint32_t)k[9])<<16;
 401+ c += ((uint32_t)k[10])<<8;
 402+ c += ((uint32_t)k[11]);
 403+ mix(a,b,c);
 404+ length -= 12;
 405+ k += 12;
 406+ }
 407+
 408+ /*-------------------------------- last block: affect all 32 bits of (c) */
 409+ switch(length) /* all the case statements fall through */
 410+ {
 411+ case 12: c+=k[11];
 412+ case 11: c+=((uint32_t)k[10])<<8;
 413+ case 10: c+=((uint32_t)k[9])<<16;
 414+ case 9 : c+=((uint32_t)k[8])<<24;
 415+ case 8 : b+=k[7];
 416+ case 7 : b+=((uint32_t)k[6])<<8;
 417+ case 6 : b+=((uint32_t)k[5])<<16;
 418+ case 5 : b+=((uint32_t)k[4])<<24;
 419+ case 4 : a+=k[3];
 420+ case 3 : a+=((uint32_t)k[2])<<8;
 421+ case 2 : a+=((uint32_t)k[1])<<16;
 422+ case 1 : a+=((uint32_t)k[0])<<24;
 423+ break;
 424+ case 0 : return c;
 425+ }
 426+ }
 427+
 428+ final(a,b,c);
 429+ return c;
 430+}
 431+#else /* HASH_XXX_ENDIAN == 1 */
 432+#error Must define HASH_BIG_ENDIAN or HASH_LITTLE_ENDIAN
 433+#endif /* HASH_XXX_ENDIAN == 1 */
Property changes on: trunk/extensions/PoolCounter/daemon/hash.c
___________________________________________________________________
Added: svn:eol-style
1434 + native
Index: trunk/extensions/PoolCounter/daemon/main.c
@@ -0,0 +1,199 @@
 2+#define _GNU_SOURCE
 3+#include <sys/socket.h>
 4+#include <arpa/inet.h>
 5+#include <sys/stat.h>
 6+#include <stdlib.h>
 7+#include <unistd.h>
 8+#include <signal.h>
 9+#include <stdio.h>
 10+#include <event.h>
 11+#include <fcntl.h>
 12+
 13+#include "client_data.h"
 14+#include "prototypes.h"
 15+#include "locks.h"
 16+
 17+static int open_sockets = 1; /* Program will automatically close when this reaches 0 */
 18+
 19+static struct event listener_ev;
 20+int main(int argc, char** argv) {
 21+ struct event_base *base;
 22+ struct stat st;
 23+ int listener;
 24+
 25+ if ( fstat( 0, &st ) || ! S_ISSOCK( st.st_mode ) ) {
 26+ close( 0 ); /* Place the listener socket in fd 0 */
 27+ listener = listensocket( PORT );
 28+ } else {
 29+ /* We have been given the listening socket in stdin */
 30+ listener = 0;
 31+ }
 32+
 33+ setup_signals();
 34+
 35+ hashtable_init();
 36+ base = event_init();
 37+ if (!base) {
 38+ fprintf( stderr, "Error in libevent initialization\n" );
 39+ return 1;
 40+ }
 41+
 42+ event_set( &listener_ev, listener, EV_READ | EV_PERSIST, on_connect, NULL );
 43+
 44+ event_add( &listener_ev, NULL );
 45+
 46+ event_dispatch();
 47+
 48+ event_del( &listener_ev );
 49+
 50+ event_base_free( base );
 51+
 52+ return 0;
 53+}
 54+
 55+int listensocket(short int port) /* prototype */
 56+{
 57+ int s;
 58+ struct sockaddr_in addr;
 59+
 60+ if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1 ) {
 61+ perror("Couldn't create TCP socket");
 62+ exit(1);
 63+ }
 64+
 65+ addr.sin_family = AF_INET;
 66+ addr.sin_port = htons(port);
 67+ addr.sin_addr.s_addr = INADDR_ANY;
 68+
 69+ if ( bind( s, (struct sockaddr *)&addr, sizeof( struct sockaddr ) ) == -1 ) {
 70+ perror("Couldn't bind");
 71+ close( s );
 72+ exit( 1 );
 73+ }
 74+
 75+ if (listen( s, BACKLOG ) == -1) {
 76+ perror("Couldn't listen");
 77+ close( s );
 78+ exit( 1 );
 79+ }
 80+
 81+ return s;
 82+}
 83+
 84+void on_connect(int listener, short type, void* arg) /* prototype */
 85+{
 86+ struct client_data* cli;
 87+ int fd;
 88+#if HAVE_ACCEPT4
 89+ fd = accept4( listener, NULL, NULL, SOCK_CLOEXEC | SOCK_NONBLOCK );
 90+#else
 91+ fd = accept( listener, NULL, NULL );
 92+#endif
 93+
 94+ if ( fd == -1 ) {
 95+ perror( "Error accepting" );
 96+ return;
 97+ }
 98+
 99+ if ( !HAVE_ACCEPT4 ) {
 100+ int flags = fcntl( fd, F_GETFL );
 101+ if ( flags != -1 ) {
 102+ fcntl( fd, F_SETFD, flags | FD_CLOEXEC | O_NONBLOCK );
 103+ }
 104+ }
 105+
 106+ cli = new_client_data( fd );
 107+ if ( !cli ) {
 108+ perror( "Couldn't allocate the client data! :(" );
 109+ close( fd );
 110+ return;
 111+ }
 112+ open_sockets++;
 113+
 114+ event_set( &cli->ev, fd, EV_READ, on_client, cli );
 115+ event_add( &cli->ev, NULL ); /* First query comes from client */
 116+}
 117+
 118+void on_client(int fd, short type, void* arg) /* prototype */
 119+{
 120+ int n;
 121+ char *line;
 122+ struct client_data* cli_data = arg;
 123+
 124+ if ( type == EV_TIMEOUT ) {
 125+ process_timeout( &cli_data->client_locks );
 126+ return;
 127+ }
 128+
 129+ n = read_client_line( fd, cli_data, &line );
 130+
 131+ if ( n < 0 ) {
 132+ /* Client disconnected */
 133+ event_del( &cli_data->ev);
 134+ free_client_data( cli_data );
 135+ close( fd );
 136+ open_sockets--;
 137+ if ( !open_sockets ) {
 138+ end( 0 );
 139+ }
 140+ } else {
 141+ while ( line ) {
 142+ send_client( &cli_data->client_locks, process_line(cli_data, line, n) );
 143+ n = recover_client_buffer(cli_data, n, &line);
 144+ }
 145+ }
 146+}
 147+
 148+static void end(int signal) /* prototype */
 149+{
 150+ close( 0 );
 151+ /* TODO: Close gracefully all connections */
 152+ event_loopbreak();
 153+}
 154+
 155+static void graceful(int signal)
 156+{
 157+ pid_t p = fork();
 158+ if ( p == -1 ) {
 159+ perror( "Can't fork" );
 160+ return;
 161+ }
 162+
 163+ if ( p ) {
 164+ /* Stop listening connections */
 165+ close( 0 );
 166+ event_del( &listener_ev );
 167+ open_sockets--;
 168+ if ( !open_sockets ) {
 169+ /* We have no clients attached. Exit here.
 170+ * Note: There is a race condition here.
 171+ */
 172+ end( 0 );
 173+ }
 174+ } else {
 175+ execl( "/proc/self/exe", "poolcounterd", NULL );
 176+ exit( 1 );
 177+ }
 178+}
 179+
 180+void setup_signals() /* prototype */
 181+{
 182+ struct sigaction sa;
 183+ sa.sa_flags = SA_RESTART;
 184+ sigfillset( &sa.sa_mask );
 185+
 186+ sa.sa_handler = SIG_IGN;
 187+ sigaction( SIGPIPE, &sa, NULL );
 188+ sigaction( SIGCHLD, &sa, NULL );
 189+
 190+ sa.sa_handler = end;
 191+ sigaction( SIGINT, &sa, NULL );
 192+ sigaction( SIGTERM, &sa, NULL );
 193+
 194+ sa.sa_handler = graceful;
 195+ sigaction( SIGUSR1, &sa, NULL );
 196+
 197+ /* Reset the process mask. It seems affected after one fork + execv in the signal handler */
 198+ sigemptyset( &sa.sa_mask );
 199+ sigprocmask( SIG_SETMASK, &sa.sa_mask, NULL );
 200+}
Property changes on: trunk/extensions/PoolCounter/daemon/main.c
___________________________________________________________________
Added: svn:eol-style
1201 + native
Index: trunk/extensions/PoolCounter/daemon/client_data.c
@@ -0,0 +1,104 @@
 2+#include <stddef.h>
 3+#include <errno.h>
 4+#include <string.h>
 5+#include <sys/types.h>
 6+#include <sys/socket.h>
 7+#include <malloc.h>
 8+#include "client_data.h"
 9+#include "locks.h"
 10+
 11+struct client_data* new_client_data(int fd) {
 12+ struct client_data* cd;
 13+ cd = malloc( sizeof( *cd ) );
 14+ cd->used_buffer = 0;
 15+ init_lock( &cd->client_locks );
 16+ cd->fd = fd;
 17+ return cd;
 18+}
 19+
 20+void free_client_data(struct client_data* cli_data) {
 21+ finish_lock( &cli_data->client_locks );
 22+ free( cli_data );
 23+}
 24+
 25+/**
 26+ * Read data from the client
 27+ * If we filled a line, return the line length, and point to it in *line.
 28+ * If a line is not available, *line will point to NULL.
 29+ * Return -1 or -2 if the socket was closed (gracefully / erroneusly)
 30+ * Line separator is \n.
 31+ * Returned lines end in \0 with \n stripped.
 32+ * Incomplete lines are not returned on close.
 33+ */
 34+int read_client_line(int fd, struct client_data* cli_data, char** line) {
 35+ int n, i;
 36+
 37+ *line = NULL;
 38+ n = recv( fd, cli_data->buffer + cli_data->used_buffer, sizeof( cli_data->buffer ) - cli_data->used_buffer, 0 );
 39+ if ( n == 0 ) {
 40+ return -1;
 41+ }
 42+ if ( n == -1 ) {
 43+ if (errno == EAGAIN) {
 44+ /* This shouldn't happen... */
 45+ return 0;
 46+ } else {
 47+ return -2;
 48+ }
 49+ }
 50+
 51+ for ( i=cli_data->used_buffer; i < cli_data->used_buffer+n; i++ ) {
 52+ if ( cli_data->buffer[i] == '\n' ) {
 53+ cli_data->buffer[i] = '\0';
 54+ *line = cli_data->buffer;
 55+ return i;
 56+ }
 57+ }
 58+
 59+ /* Wait for the rest of the line */
 60+ event_add( &cli_data->ev, NULL );
 61+ return 0;
 62+}
 63+
 64+/* Recover the space from the buffer which has been read, return another line if available */
 65+int recover_client_buffer(struct client_data* cli_data, int len, char** line) {
 66+ int i;
 67+ *line = 0;
 68+ if ( len >= cli_data->used_buffer ) {
 69+ /* This is a query-response protocol. This should be *always* the case */
 70+ cli_data->used_buffer = 0;
 71+ return 0;
 72+ }
 73+
 74+ /* Nonetheless handle the other case */
 75+ memmove(cli_data->buffer, cli_data->buffer + len, cli_data->used_buffer - len);
 76+ cli_data->used_buffer -= len;
 77+
 78+ for ( i=0; i < cli_data->used_buffer; i++ ) {
 79+ if ( cli_data->buffer[i] == '\n' ) {
 80+ cli_data->buffer[i] = '\0';
 81+ *line = cli_data->buffer;
 82+ return i;
 83+ }
 84+ }
 85+
 86+ return 0;
 87+}
 88+
 89+/* Sends the message msg to the other side, or nothing if msg is NULL
 90+ * Since the message are short, we optimistically consider that they
 91+ * will always fit and never block (note O_NONBLOCK is set).
 92+ */
 93+void send_client(struct locks* l, const char* msg) {
 94+ struct client_data* cli_data;
 95+ if ( !msg ) return;
 96+
 97+ cli_data = CLIENT_DATA_FROM_LOCKS(l);
 98+ size_t len = strlen(msg);
 99+
 100+ if ( send( cli_data->fd, msg, len, 0) != len ) {
 101+ perror( "Something failed sending message" );
 102+ }
 103+ /* Wait for answer */
 104+ event_add( &cli_data->ev, NULL );
 105+}
Property changes on: trunk/extensions/PoolCounter/daemon/client_data.c
___________________________________________________________________
Added: svn:eol-style
1106 + native
Index: trunk/extensions/PoolCounter/daemon/hash.h
@@ -0,0 +1,15 @@
 2+#ifndef HASH_H
 3+#define HASH_H
 4+
 5+#ifdef __cplusplus
 6+extern "C" {
 7+#endif
 8+
 9+uint32_t hash(const void *key, size_t length, const uint32_t initval);
 10+
 11+#ifdef __cplusplus
 12+}
 13+#endif
 14+
 15+#endif /* HASH_H */
 16+
Property changes on: trunk/extensions/PoolCounter/daemon/hash.h
___________________________________________________________________
Added: svn:eol-style
117 + native
Index: trunk/extensions/PoolCounter/daemon/locks.c
@@ -0,0 +1,250 @@
 2+#include <string.h>
 3+#include <stdlib.h>
 4+#include <stdio.h>
 5+#include "locks.h"
 6+#include "hash.h"
 7+#include "client_data.h"
 8+
 9+void init_lock(struct locks* l) {
 10+ l->state = UNLOCKED;
 11+}
 12+
 13+void finish_lock(struct locks* l) {
 14+ if (l->state != UNLOCKED) {
 15+ remove_client_lock(l, 0);
 16+ }
 17+}
 18+
 19+static struct hashtable* primary_hashtable;
 20+
 21+/* These defines are the same in memcached */
 22+#define hashsize(n) ((uint32_t)1<<(n))
 23+#define hashmask(n) (hashsize(n)-1)
 24+
 25+#define DOUBLE_LLIST_INIT(this) do { (this).prev = (this).next = &(this); } while (0)
 26+#define DOUBLE_LLIST_DEL(this) do { (this)->prev->next = (this)->next; (this)->next->prev = (this)->prev; } while (0)
 27+#define DOUBLE_LLIST_ADD(parent,child) do { (child)->prev = (parent)->prev; (child)->next = (child)->prev->next /* parent */; (parent)->prev->next = (child); (parent)->prev = (child); } while(0);
 28+
 29+char* process_line(struct client_data* cli_data, char* line, int line_len) {
 30+ struct locks* l = &cli_data->client_locks;
 31+
 32+ if ( !strncmp( line, "ACQ4ME ", 7 ) || !strncmp( line, "ACQ4ANY ", 8 ) ) {
 33+ if ( l->state != UNLOCKED ) {
 34+ return "LOCK_HELD\n";
 35+ }
 36+
 37+ int for_anyone = line[6] != ' ';
 38+
 39+ char* key = strtok( line + 7 + for_anyone, " " );
 40+ int workers = atoi( strtok(NULL, " ") );
 41+ int maxqueue = atoi( strtok(NULL, " ") );
 42+ int timeout = atoi( strtok(NULL, " ") );
 43+
 44+ uint32_t hash_value = hash( key, strlen( key ), 0 );
 45+ struct PoolCounter* pCounter;
 46+ pCounter = hashtable_find( primary_hashtable, hash_value, key );
 47+ if ( !pCounter ) {
 48+ pCounter = malloc( sizeof( *pCounter ) );
 49+ if ( !pCounter ) {
 50+ fprintf(stderr, "Out of memory\n");
 51+ return "ERROR OUT_OF_MEMORY\n";
 52+ }
 53+ pCounter->htentry.key = key;
 54+ pCounter->htentry.key_hash = hash_value;
 55+ pCounter->count = 0;
 56+ pCounter->processing = 0;
 57+
 58+ DOUBLE_LLIST_INIT( pCounter->working );
 59+ DOUBLE_LLIST_INIT( pCounter->for_them );
 60+ DOUBLE_LLIST_INIT( pCounter->for_anyone );
 61+
 62+ hashtable_insert( primary_hashtable, (struct hashtable_entry *) pCounter );
 63+ }
 64+
 65+ if ( pCounter->count >= maxqueue )
 66+ return "QUEUE_FULL\n";
 67+
 68+ l->parent = pCounter;
 69+ pCounter->count++;
 70+
 71+ if ( pCounter->processing < workers ) {
 72+ l->state = PROCESSING;
 73+ pCounter->processing++;
 74+ DOUBLE_LLIST_ADD( &pCounter->working, &l->siblings );
 75+ return "LOCKED\n";
 76+ } else {
 77+ struct timeval wait_time;
 78+ if ( for_anyone ) {
 79+ l->state = WAIT_ANY;
 80+ DOUBLE_LLIST_ADD( &pCounter->for_anyone, &l->siblings );
 81+ } else {
 82+ l->state = WAITING;
 83+ DOUBLE_LLIST_ADD( &pCounter->for_them, &l->siblings );
 84+ }
 85+
 86+ wait_time.tv_sec = timeout;
 87+ wait_time.tv_usec = 0;
 88+
 89+ event_add( &cli_data->ev, &wait_time );
 90+ return NULL;
 91+ }
 92+ } else if ( !strncmp(line, "RELEASE", 7) ) {
 93+ if ( l->state == UNLOCKED ) {
 94+ return "NOT_LOCKED\n";
 95+ } else {
 96+ remove_client_lock( l, 1 );
 97+ return "RELEASED\n";
 98+ }
 99+ } else {
 100+ return "ERROR BAD_COMMAND\n";
 101+ }
 102+}
 103+
 104+void process_timeout(struct locks* l) {
 105+ if ( ( l->state == WAIT_ANY ) || ( l->state == WAITING ) ) {
 106+ send_client( l, "TIMEOUT\n" );
 107+ remove_client_lock( l, 0 );
 108+ }
 109+}
 110+
 111+void remove_client_lock(struct locks* l, int wakeup_anyones) {
 112+ DOUBLE_LLIST_DEL(&l->siblings);
 113+
 114+ if ( wakeup_anyones ) {
 115+ while ( l->parent->for_anyone.next != &l->parent->for_anyone ) {
 116+ send_client( (void*)l->parent->for_anyone.next, "DONE\n" );
 117+ remove_client_lock( (void*)l->parent->for_anyone.next, 0 );
 118+ }
 119+ }
 120+
 121+ if ( l->state == PROCESSING ) {
 122+ /* One slot freed, wake up another worker */
 123+
 124+ /* Give priority to those which need to do it themselves, since
 125+ * the anyones will benefit from it, too.
 126+ * TODO: Prefer the first anyone if it's much older.
 127+ */
 128+ struct locks* new_owner = NULL;
 129+ if ( l->parent->for_them.next != &l->parent->for_them ) {
 130+ /* The oldest waiting worker will be on next */
 131+ new_owner = (struct locks*) l->parent->for_them.next;
 132+ } else if ( l->parent->for_anyone.next != &l->parent->for_anyone ) {
 133+ new_owner = (struct locks*) l->parent->for_anyone.next;
 134+ }
 135+
 136+ if ( new_owner ) {
 137+ DOUBLE_LLIST_DEL( &new_owner->siblings );
 138+ DOUBLE_LLIST_ADD( &l->parent->working, &new_owner->siblings );
 139+ send_client( new_owner, "LOCKED\n" );
 140+ new_owner->state = PROCESSING;
 141+ } else {
 142+ l->parent->processing--;
 143+ }
 144+ }
 145+
 146+ l->state = UNLOCKED;
 147+ l->parent->count--;
 148+ if ( !l->parent->count ) {
 149+ hashtable_remove( l->parent->htentry.parent_hashtable, &l->parent->htentry );
 150+ free( l->parent );
 151+ }
 152+}
 153+
 154+/* The code below is loosely based in those of memcached assoc.c */
 155+struct hashtable {
 156+ unsigned int hashpower;
 157+ uint32_t items;
 158+ struct hashtable* old_hashtable;
 159+ struct double_linked_list hashentries[1];
 160+};
 161+
 162+void hashtable_init() {
 163+ primary_hashtable = hashtable_create(16);
 164+ if (! primary_hashtable) {
 165+ fprintf( stderr, "Failed to init hashtable.\n" );
 166+ exit( EXIT_FAILURE );
 167+ }
 168+}
 169+
 170+struct hashtable* hashtable_create(int hashpower) {
 171+ struct hashtable* new_hashtable;
 172+ new_hashtable = calloc( hashsize( hashpower ) + ( sizeof( struct hashtable ) - 1 ) /
 173+ sizeof( new_hashtable->hashentries[0] ), sizeof( new_hashtable->hashentries[0] ) );
 174+
 175+ if ( !new_hashtable )
 176+ return NULL;
 177+
 178+ new_hashtable->hashpower = hashpower;
 179+ if ( new_hashtable->old_hashtable != NULL ) {
 180+ int i; /* Zeroes are not NULL here... */
 181+ new_hashtable->old_hashtable = NULL;
 182+ for ( i=0; i < hashsize( hashpower ); i++ ) {
 183+ new_hashtable->hashentries[i].prev = new_hashtable->hashentries[i].next = NULL;
 184+ }
 185+ }
 186+ return new_hashtable;
 187+}
 188+
 189+/**
 190+ * Find an entry with the given key in the hash table.
 191+ * NULL if not found.
 192+ */
 193+void* hashtable_find(struct hashtable* ht, uint32_t hash_value, const char* key) {
 194+ struct hashtable_entry *begin, *cur;
 195+
 196+ begin = (struct hashtable_entry*) &ht->hashentries[hash_value & hashmask(ht->hashpower)];
 197+ if (!begin->hashtable_siblings.next) return NULL; /* Empty bucket */
 198+
 199+ for (cur = (struct hashtable_entry*) begin->hashtable_siblings.next; cur != begin;
 200+ cur = (struct hashtable_entry*)cur->hashtable_siblings.next) {
 201+
 202+ if ( ( cur->key_hash == hash_value ) && ( !strcmp( key, cur->key ) ) ) {
 203+ return cur;
 204+ }
 205+ }
 206+
 207+ if ( ht->old_hashtable ) {
 208+ if ( !ht->old_hashtable->items ) {
 209+ /* Empty hash table */
 210+ free(ht->old_hashtable);
 211+ ht->old_hashtable = NULL;
 212+ return NULL;
 213+ }
 214+
 215+ return hashtable_find( ht->old_hashtable, hash_value, key );
 216+ }
 217+ return NULL;
 218+}
 219+
 220+/**
 221+ * Insert into the hash table an item known not to exist there.
 222+ */
 223+void hashtable_insert(struct hashtable* ht, struct hashtable_entry* htentry) {
 224+ struct double_linked_list* begin;
 225+
 226+ if (! ht->old_hashtable && ht->items >= (hashsize( ht->hashpower ) * 3) / 2) {
 227+ /* Same growing condition as in memcached */
 228+ struct hashtable* new_ht;
 229+ new_ht = hashtable_create( ht->hashpower + 1 );
 230+ if ( new_ht ) {
 231+ new_ht->old_hashtable = ht;
 232+ primary_hashtable = new_ht;
 233+ ht = new_ht;
 234+ }
 235+ }
 236+
 237+ begin = &ht->hashentries[ htentry->key_hash & hashmask( ht->hashpower ) ];
 238+ if ( !begin->next ) { DOUBLE_LLIST_INIT( *begin ); }
 239+ DOUBLE_LLIST_ADD( begin, &htentry->hashtable_siblings );
 240+ htentry->parent_hashtable = ht;
 241+ ht->items++;
 242+}
 243+
 244+/**
 245+ * Remove this entry from this hash table.
 246+ * Freeing the entry is the caller's responsability.
 247+ */
 248+void hashtable_remove(struct hashtable* ht, struct hashtable_entry* htentry) {
 249+ DOUBLE_LLIST_DEL( &htentry->hashtable_siblings );
 250+ ht->items--;
 251+}
Property changes on: trunk/extensions/PoolCounter/daemon/locks.c
___________________________________________________________________
Added: svn:eol-style
1252 + native
Index: trunk/extensions/PoolCounter/daemon/Makefile
@@ -0,0 +1,18 @@
 2+CC=gcc
 3+DEFINES=-DENDIAN_BIG=0 -DENDIAN_LITTLE=1 -DHAVE_ACCEPT4=1
 4+CFLAGS=-std=c90 -Wall $(DEFINES)
 5+OBJS=main.o client_data.o locks.o hash.o
 6+LINK=-levent
 7+HEADERS=prototypes.h client_data.h
 8+
 9+poolcounterd: $(OBJS)
 10+ $(CC) $(LINK) $^ -o $@
 11+
 12+%.o: %.c $(HEADERS)
 13+ $(CC) -c $(CFLAGS) $< -o $@
 14+
 15+prototypes.h: main.c
 16+ sed -n 's/\/\* prototype \*\//;/p' $^ > $@
 17+
 18+clean:
 19+ rm -f *.o prototypes.h
Property changes on: trunk/extensions/PoolCounter/daemon/Makefile
___________________________________________________________________
Added: svn:eol-style
120 + native
Index: trunk/extensions/PoolCounter/daemon/client_data.h
@@ -0,0 +1,23 @@
 2+typedef unsigned char u_char; /* needed by event.h */
 3+#include <stddef.h>
 4+#include <event.h>
 5+#include "locks.h"
 6+
 7+struct client_data {
 8+ struct event ev;
 9+ int fd;
 10+ size_t used_buffer;
 11+ char buffer[1024];
 12+
 13+ struct locks client_locks;
 14+};
 15+
 16+#define CLIENT_DATA_FROM_LOCKS(cli_lock_pointer) ((struct client_data*)(((char*)(cli_lock_pointer)) - offsetof(struct client_data,client_locks)))
 17+
 18+struct client_data* new_client_data();
 19+void free_client_data(struct client_data* cli_data);
 20+int read_client_line(int fd, struct client_data* cli_data, char** line);
 21+int recover_client_buffer(struct client_data* cli_data, int len, char** line);
 22+
 23+#define PORT 7531
 24+#define BACKLOG 20
Property changes on: trunk/extensions/PoolCounter/daemon/client_data.h
___________________________________________________________________
Added: svn:eol-style
125 + native
Index: trunk/extensions/PoolCounter/daemon/prototypes.h
@@ -0,0 +1,5 @@
 2+int listensocket(short int port) ;
 3+void on_connect(int listener, short type, void* arg) ;
 4+void on_client(int fd, short type, void* arg) ;
 5+static void end(int signal) ;
 6+void setup_signals() ;
Property changes on: trunk/extensions/PoolCounter/daemon/prototypes.h
___________________________________________________________________
Added: svn:eol-style
17 + native
Property changes on: trunk/extensions/PoolCounter/daemon
___________________________________________________________________
Added: svn:ignore
28 + poolcounterd
Index: trunk/extensions/PoolCounter/PoolCounterClient_body.php
@@ -59,16 +59,12 @@
6060 }
6161
6262 class PoolCounter_Client extends PoolCounter {
63 - var $maxThreads, $waitTimeout, $type, $key, $conn;
64 - var $isAcquired = false;
 63+ private $conn;
6564
66 - static $manager;
 65+ static private $manager;
6766
6867 function __construct( $conf, $type, $key ) {
69 - $this->waitTimeout = isset( $conf['waitTimeout'] ) ? $conf['waitTimeout'] : 15;
70 - $this->maxThreads = isset( $conf['maxThreads'] ) ? $conf['maxThreads'] : 5;
71 - $this->type = $type;
72 - $this->key = $key;
 68+ parent::__construct( $conf, $type, $key );
7369 if ( !self::$manager ) {
7470 global $wgPoolCountClientConf;
7571 self::$manager = new PoolCounter_ConnectionManager( $wgPoolCountClientConf );
@@ -96,7 +92,7 @@
9793 }
9894 $conn = $status->value;
9995 wfDebug( "Sending pool counter command: $cmd\n" );
100 - if ( fwrite( $conn, "$cmd\r\n" ) === false ) {
 96+ if ( fwrite( $conn, "$cmd\n" ) === false ) {
10197 return Status::newFatal( 'poolcounter-write-error' );
10298 }
10399 $response = fgets( $conn );
@@ -112,14 +108,14 @@
113109 $parts = explode( ' ', $parts[1], 2 );
114110 $errorMsg = isset( $parts[1] ) ? $parts[1] : '(no message given)';
115111 return Status::newFatal( 'poolcounter-remote-error', $errorMsg );
116 - case 'ACK':
 112+ case 'LOCKED':
117113 case 'RELEASED':
118 - case 'COUNT':
119 - $parts = explode( ' ', $parts[1] );
120 - $key = array_shift( $parts );
121 - $attribs = $this->colonsToAssoc( $parts );
122 - $attribs['responseType'] = $responseType;
123 - return Status::newGood( $attribs );
 114+ case 'DONE':
 115+ case 'NOT_LOCKED':
 116+ case 'QUEUE_FULL':
 117+ case 'TIMEOUT':
 118+ case 'LOCK_HELD':
 119+ return constant( "PoolCounter::$responseType" );
124120 }
125121 }
126122
@@ -135,41 +131,21 @@
136132 return $assoc;
137133 }
138134
139 - function acquire() {
140 - $status = $this->sendCommand( 'acquire', $this->key, "max:{$this->maxThreads}" );
141 - if ( !$status->isOK() ) {
142 - return $status;
143 - }
144 - $response = $status->value;
145 - $count = isset( $response['count'] ) ? $response['count'] : 0;
146 - $this->isAcquired = true;
147 - if ( $count > $this->maxThreads ) {
148 - $response['overload'] = true;
149 - $this->release();
150 - }
151 - return Status::newGood( $response );
 135+ function acquireForMe() {
 136+ return $this->sendCommand( 'ACQ4ME', $this->key, $this->workers, $this->maxqueue, $this->timeout );
152137 }
153138
 139+ function acquireForAnyone() {
 140+ return $this->sendCommand( 'ACQ4ANY', $this->key, $this->workers, $this->maxqueue, $this->timeout );
 141+ }
 142+
154143 function release() {
155 - if ( $this->isAcquired ) {
156 - $status = $this->sendCommand( 'release', $this->key );
157 - $this->isAcquired = false;
158 - } else {
159 - $status = Status::newGood();
160 - }
 144+ $status = $this->sendCommand( 'RELEASE', $this->key );
 145+
161146 if ( $this->conn ) {
162147 self::$manager->close( $this->conn );
163148 $this->conn = null;
164149 }
165150 return $status;
166151 }
167 -
168 - function wait() {
169 - $status = $this->sendCommand( 'wait', $this->key, "timeout:{$this->waitTimeout}" );
170 - if ( !$status->isOK() ) {
171 - return $status;
172 - }
173 - $this->isAcquired = true;
174 - return $status;
175 - }
176152 }
Index: trunk/extensions/PoolCounter/PoolCounterClient.i18n.php
@@ -12,7 +12,7 @@
1313 * @author Tim Starling
1414 */
1515 $messages['en'] = array(
16 - 'poolcounter-desc' => 'MediaWiki client for the pool counter daemon poolcounter.py',
 16+ 'poolcounter-desc' => 'MediaWiki client for the pool counter daemon',
1717 'poolcounter-connection-error' => 'Error connecting to pool counter server: $1',
1818 'poolcounter-read-error' => 'Error reading from pool counter server',
1919 'poolcounter-write-error' => 'Error writing to pool counter server',
Index: trunk/extensions/PoolCounter/PoolCounterClient.php
@@ -31,10 +31,11 @@
3232
3333 /**
3434 * Sample pool configuration:
35 - * $wgPoolCounterConf = array( 'Article::view' => array(
 35+ * $wgPoolCounterConf = array( 'PoolWorkArticleView' => array(
3636 * 'class' => 'PoolCounter_Client',
37 - * 'waitTimeout' => 15, // wait timeout in seconds
38 - * 'maxThreads' => 5, // maximum number of threads in each pool
 37+ * 'timeout' => 15, // wait timeout in seconds
 38+ * 'workers' => 5, // maximum number of active threads in each pool
 39+ * 'maxqueue' => 50, // maximum number of total threads in each pool
3940 * ) );
4041 */
4142

Follow-up revisions

RevisionCommit summaryAuthorDate
r71847Follow up r71805platonides08:13, 28 August 2010
r73082Follow up r71805: Drop PoolWork from the configuration key, and update Defaul...platonides19:49, 15 September 2010
r73084Removed -std since it seems not all gcc support it. See r71805 CR....platonides20:01, 15 September 2010
r73089Go back to Status objects after r71805. We return an Ok result if it can give...platonides21:03, 15 September 2010

Past revisions this follows-up on

RevisionCommit summaryAuthorDate
r10955Some tweakingnikerabbit17:46, 9 September 2005

Comments

#Comment by Tim Starling (talk | contribs)   05:15, 15 September 2010

Looks good. You should do this sort of thing more often.

+CFLAGS=-std=c90 -Wall $(DEFINES)

That gives me:

cc1: error: unrecognized command line option "-std=c90"

The manual says it's meant to be c99 or c89, I'm not sure which one you meant. It compiles either way.

Why do the PoolCounter abstract methods now return either an integer constant or a Status object, instead of just a status object as before? This inevitably leads to ugly special cases:

		switch ( is_int( $status ) ? $status : PoolCounter::ERROR ) {
		if ( $status instanceof Status ) {
			$errortext = $status->getWikiText( false, 'view-pool-error' );
		} else {
			$errortext = wfMsgNoTrans( 'view-pool-error', '' );
		}

You could have just put the integer status into the $status->value member, i.e.

return Status::newGood(constant( "PoolCounter::$responseType" ));

Or indeed, for even simpler and faster code, you could have just skipped the constants altogether and used string literals:

return Status::newGood( $responseType );

Then in the case where fallback() returns false, you could have constructed a new failing Status object to pass through to PCW::error(), allowing that function to be more simply implemented in the subclasses.

I'm not sure why you moved the initialisation of the workers, maxqueue and timeout members from PoolCounter_Client to PoolCounter. These members are not used by PoolCounter, they seem to be configuring the extension, not the core component. If there is some reason to leave them there, they need to be documented in DefaultSettings.php instead of PoolCounter.php.

Speaking of documentation, you didn't update DefaultSettings.php to account for the change of configuration key from "Article::view" to "PoolWorkArticleView". I'm not too keen on that change anyway, PoolWorkArticleView is not exactly a memorable string to be using in a configuration file to identify the operation type. Maybe something like 'pageview' would have been better.

It would be handy for debugging purposes if, like the python daemon, process_line() was case-insensitive and accepted commands from telnet, which uses \r\n line endings:

$ echo -ne 'acq4me test 1 50 15\n' | nc -q1 localhost 7531
ERROR BAD_COMMAND
$ echo -ne 'ACQ4ME test 1 50 15\r\n' | nc -q1 localhost 7531
ERROR BAD_SYNTAX
$ echo -ne 'ACQ4ME test 1 50 15\n' | nc -q1 localhost 7531
LOCKED

You broke ParserCache::getDirty() in r70783, it tries to use the Article object as a memcached key. So I assume you haven't tested timeouts or fallbacks.

#Comment by Platonides (talk | contribs)   21:04, 15 September 2010

Thanks. I may do if there's such need :)

My gcc manual explicitely mention -std=c90 (and work) but since it seems to give more problems than benefits, I removed it in r73084.

I wanted to use integers everywhere, then found PoolCounter_ConnectionManager was indeed taking advantage of Status for the errors, so I readded support for them, leading to that admitedly ugly code. Moved back to Status objects in r73089.

It initialises them because $workers, $maxqueue and $timeout are now defined in the parent. I think that fits with the general purpose of the PoolCounter and makes the whole thinks more clear. Do you think there could be a PoolCounter (other than the do-nothing Stub) for which those parameters did not make sense?

DefaultSettings.php and configuration name addressed in r73082.

I didn't add case insensitive commands since the daemons should be using The Right Names™. Added support for CRLF lines in r73084.

I tested the daemon directly from nc, and then with the wiki I timed several wget instances (I added a sleep PF to make the parse noticeable) opened from the terminal to check that it didn't parse it when it shouldn't, it was timeouting correctly and so on but I didn't looked at the actual output.

Status & tagging log