Index: trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php |
— | — | @@ -73,9 +73,11 @@ |
74 | 74 | $this->handled_ids[$correlation_id] = 'error'; |
75 | 75 | } |
76 | 76 | } |
| 77 | + $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. |
| 78 | + $orphans = $this->getStompOrphans(); |
77 | 79 | } |
78 | 80 | |
79 | | - $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it. |
| 81 | + $this->addStompCorrelationIDToAckBucket( false, true ); //ack all outstanding. |
80 | 82 | |
81 | 83 | //TODO: Make stats squirt out all over the place. |
82 | 84 | $am = 0; |
— | — | @@ -113,15 +115,20 @@ |
114 | 116 | if ( $correlation_id ) { |
115 | 117 | $bucket[$correlation_id] = "'$correlation_id'"; //avoiding duplicates. |
116 | 118 | $this->handled_ids[$correlation_id] = 'antimessage'; |
| 119 | + echo "Added $correlation_id to the ack bucket : Total bucket count = " . count( $bucket ); |
117 | 120 | } |
118 | 121 | if ( count( $bucket ) && ( count( $bucket ) >= $count || $ackNow ) ){ |
119 | 122 | //ack now. |
| 123 | + echo 'Acking ' . count( $bucket ) . ' bucket messages.'; |
120 | 124 | $selector = 'JMSCorrelationID IN (' . implode( ", ", $bucket ) . ')'; |
121 | 125 | $ackMe = stompFetchMessages( 'cc-limbo', $selector, $count * 100 ); //This is outrageously high, but I just want to be reasonably sure we get all the matches. |
122 | 126 | $retrieved_count = count( $ackMe ); |
123 | 127 | if ( $retrieved_count ){ |
124 | 128 | stompAckMessages( $ackMe ); |
125 | 129 | $this->removed_message_count += $retrieved_count; |
| 130 | + echo "Done acking $retrieved_count messages. "; |
| 131 | + } else { |
| 132 | + echo "Oh noes! No messages to ack for some reason..."; |
126 | 133 | } |
127 | 134 | $bucket = array(); |
128 | 135 | } |
— | — | @@ -132,7 +139,7 @@ |
133 | 140 | $selector = "antimessage = 'true'"; |
134 | 141 | $antimessages = stompFetchMessages( 'cc-limbo', $selector, 1000 ); |
135 | 142 | $count = 0; |
136 | | - while ( count( $antimessages ) ){ //if there's an antimessage, we can ack 'em all right now. |
| 143 | + while ( count( $antimessages ) && $this->keepGoing() ){ //if there's an antimessage, we can ack 'em all right now. |
137 | 144 | $count += count( $antimessages ); |
138 | 145 | foreach ( $antimessages as $message ){ |
139 | 146 | //add the correlation ID to the ack bucket. |
— | — | @@ -166,7 +173,13 @@ |
167 | 174 | $elapsed = $this->now - $decoded['date']; |
168 | 175 | if ( $elapsed > $time_buffer ){ |
169 | 176 | //we got ourselves an orphan! |
170 | | - $orphans[$message->headers['antimessage']] = $decoded; |
| 177 | + $correlation_id = $message->headers['correlation-id']; |
| 178 | + $order_id = explode('-', $correlation_id); |
| 179 | + $order_id = $order_id[1]; |
| 180 | + $decoded['order_id'] = $order_id; |
| 181 | + $decoded['i_order_id'] = $order_id; |
| 182 | + $orphans[$correlation_id] = $decoded; |
| 183 | + echo "\nFound an orphan! $correlation_id"; |
171 | 184 | } |
172 | 185 | } |
173 | 186 | } |
— | — | @@ -261,6 +274,7 @@ |
262 | 275 | * @return boolean True if the orphan has been rectified, false if not. |
263 | 276 | */ |
264 | 277 | function rectifyOrphan( $data, $query_contribution_tracking = true ){ |
| 278 | + echo "\nRectifying Orphan " . $data['order_id']; |
265 | 279 | $rectified = false; |
266 | 280 | |
267 | 281 | $this->adapter->loadDataAndReInit( $data, $query_contribution_tracking ); |
Index: trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php |
— | — | @@ -250,6 +250,14 @@ |
251 | 251 | function stompFetchMessages( $queue, $selector = null, $limit = 50 ){ |
252 | 252 | global $wgStompQueueName, $wgPendingStompQueueName, $wgLimboStompQueueName, $wgCCLimboStompQueueName; |
253 | 253 | |
| 254 | + static $selector_last = null; |
| 255 | + if ( !is_null( $selector_last ) && $selector_last != $selector ){ |
| 256 | + $renew = true; |
| 257 | + } else { |
| 258 | + $renew = false; |
| 259 | + } |
| 260 | + $selector_last = $selector; |
| 261 | + |
254 | 262 | switch($queue){ |
255 | 263 | case 'pending': |
256 | 264 | $queue = $wgPendingStompQueueName; |
— | — | @@ -266,22 +274,22 @@ |
267 | 275 | break; |
268 | 276 | } |
269 | 277 | |
270 | | - //This needs to be renewed every time, or the selectors won't work. |
271 | | - //So says the internets, at least. |
272 | | - $stomp = getDIStompConnection( true ); |
| 278 | + //This needs to be renewed every time we change the selectors. |
| 279 | + $stomp = getDIStompConnection( $renew ); |
273 | 280 | |
274 | 281 | $properties = array( 'ack' => 'client' ); |
275 | 282 | if ( !is_null( $selector ) ){ |
276 | 283 | $properties['selector'] = $selector; |
277 | 284 | } |
278 | 285 | |
279 | | - $returned = $stomp->subscribe( '/queue/' . $queue, $properties ); |
| 286 | + $stomp->subscribe( '/queue/' . $queue, $properties ); |
280 | 287 | $message = $stomp->readFrame(); |
281 | 288 | |
282 | 289 | $return = array(); |
283 | 290 | |
284 | 291 | while ( !empty( $message ) && count( $return ) < $limit ) { |
285 | 292 | $return[] = $message; |
| 293 | + $stomp->subscribe( '/queue/' . $queue, $properties ); |
286 | 294 | $message = $stomp->readFrame(); |
287 | 295 | } |
288 | 296 | |