r105513 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r105512‎ | r105513 | r105514 >
Date:03:43, 8 December 2011
Author:khorn
Status:ok
Tags:
Comment:
Fixes for the Orphan Rectifier, particularly in the ActiveMQ/Stomp area.
Modified paths:
  • /trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php (modified) (history)
  • /trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php (modified) (history)

Diff [purge]

Index: trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php
@@ -73,9 +73,11 @@
7474 $this->handled_ids[$correlation_id] = 'error';
7575 }
7676 }
 77+ $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding.
 78+ $orphans = $this->getStompOrphans();
7779 }
7880
79 - $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it.
 81+ $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding.
8082
8183 //TODO: Make stats squirt out all over the place.
8284 $am = 0;
@@ -113,15 +115,20 @@
114116 if ( $correlation_id ) {
115117 $bucket[$correlation_id] = "'$correlation_id'"; //avoiding duplicates.
116118 $this->handled_ids[$correlation_id] = 'antimessage';
 119+ echo "Added $correlation_id to the ack bucket : Total bucket count = " . count( $bucket );
117120 }
118121 if ( count( $bucket ) && ( count( $bucket ) >= $count || $ackNow ) ){
119122 //ack now.
 123+ echo 'Acking ' . count( $bucket ) . ' bucket messages.';
120124 $selector = 'JMSCorrelationID IN (' . implode( ", ", $bucket ) . ')';
121125 $ackMe = stompFetchMessages( 'cc-limbo', $selector, $count * 100 ); //This is outrageously high, but I just want to be reasonably sure we get all the matches.
122126 $retrieved_count = count( $ackMe );
123127 if ( $retrieved_count ){
124128 stompAckMessages( $ackMe );
125129 $this->removed_message_count += $retrieved_count;
 130+ echo "Done acking $retrieved_count messages. ";
 131+ } else {
 132+ echo "Oh noes! No messages to ack for some reason...";
126133 }
127134 $bucket = array();
128135 }
@@ -132,7 +139,7 @@
133140 $selector = "antimessage = 'true'";
134141 $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000 );
135142 $count = 0;
136 - while ( count( $antimessages ) ){ //if there's an antimessage, we can ack 'em all right now.
 143+ while ( count( $antimessages ) && $this->keepGoing() ){ //if there's an antimessage, we can ack 'em all right now.
137144 $count += count( $antimessages );
138145 foreach ( $antimessages as $message ){
139146 //add the correlation ID to the ack bucket.
@@ -166,7 +173,13 @@
167174 $elapsed = $this->now - $decoded['date'];
168175 if ( $elapsed > $time_buffer ){
169176 //we got ourselves an orphan!
170 - $orphans[$message->headers['antimessage']] = $decoded;
 177+ $correlation_id = $message->headers['correlation-id'];
 178+ $order_id = explode('-', $correlation_id);
 179+ $order_id = $order_id[1];
 180+ $decoded['order_id'] = $order_id;
 181+ $decoded['i_order_id'] = $order_id;
 182+ $orphans[$correlation_id] = $decoded;
 183+ echo "\nFound an orphan! $correlation_id";
171184 }
172185 }
173186 }
@@ -261,6 +274,7 @@
262275 * @return boolean True if the orphan has been rectified, false if not.
263276 */
264277 function rectifyOrphan( $data, $query_contribution_tracking = true ){
 278+ echo "\nRectifying Orphan " . $data['order_id'];
265279 $rectified = false;
266280
267281 $this->adapter->loadDataAndReInit( $data, $query_contribution_tracking );
Index: trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php
@@ -250,6 +250,14 @@
251251 function stompFetchMessages( $queue, $selector = null, $limit = 50 ){
252252 global $wgStompQueueName, $wgPendingStompQueueName, $wgLimboStompQueueName, $wgCCLimboStompQueueName;
253253
 254+ static $selector_last = null;
 255+ if ( !is_null( $selector_last ) && $selector_last != $selector ){
 256+ $renew = true;
 257+ } else {
 258+ $renew = false;
 259+ }
 260+ $selector_last = $selector;
 261+
254262 switch($queue){
255263 case 'pending':
256264 $queue = $wgPendingStompQueueName;
@@ -266,22 +274,22 @@
267275 break;
268276 }
269277
270 - //This needs to be renewed every time, or the selectors won't work.
271 - //So says the internets, at least.
272 - $stomp = getDIStompConnection( true );
 278+ //This needs to be renewed every time we change the selectors.
 279+ $stomp = getDIStompConnection( $renew );
273280
274281 $properties = array( 'ack' => 'client' );
275282 if ( !is_null( $selector ) ){
276283 $properties['selector'] = $selector;
277284 }
278285
279 - $returned = $stomp->subscribe( '/queue/' . $queue, $properties );
 286+ $stomp->subscribe( '/queue/' . $queue, $properties );
280287 $message = $stomp->readFrame();
281288
282289 $return = array();
283290
284291 while ( !empty( $message ) && count( $return ) < $limit ) {
285292 $return[] = $message;
 293+ $stomp->subscribe( '/queue/' . $queue, $properties );
286294 $message = $stomp->readFrame();
287295 }
288296

Follow-up revisions

RevisionCommit summaryAuthorDate
r105514MFT r105513khorn03:52, 8 December 2011

Status & tagging log