r104648 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r104647‎ | r104648 | r104649 >
Date:02:54, 30 November 2011
Author:khorn
Status:ok (Comments)
Tags:
Comment:
followup r104503, r104539, r104588
Limbo queue consumer IN the orphan rectifier: Initial commit for all the command-line stuff. This has yet to be thoroughly tested and as such, should not be used by anyone.
Modified paths:
  • /trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php (modified) (history)
  • /trunk/extensions/DonationInterface/globalcollect_gateway/globalcollect.adapter.php (modified) (history)
  • /trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphan_adapter.php (modified) (history)
  • /trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php (modified) (history)

Diff [purge]

Index: trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php
@@ -13,42 +13,193 @@
1414
1515 protected $killfiles = array();
1616 protected $order_ids = array();
17 - protected $max_per_execute = 3;
 17+ protected $max_per_execute = 500; //only really used if you're going by-file.
 18+ protected $target_execute_time = 30; //(seconds) - only used by the stomp option.
 19+ protected $adapter;
1820
1921
2022 function execute(){
 23+ $func = 'parse_files';
 24+ if ( !empty( $_SERVER['argv'][1] ) ){
 25+ if ( $_SERVER['argv'][1] === 'stomp' ){
 26+ $func = 'orphan_stomp';
 27+ if ( !empty( $_SERVER['argv'][2] ) && is_numeric( $_SERVER['argv'][2] ) ){
 28+ $this->target_execute_time = $_SERVER['argv'][2];
 29+ }
 30+ } elseif ( is_numeric( $_SERVER['argv'][1] ) ){
 31+ $this->max_per_execute = $_SERVER['argv'][1];
 32+ }
 33+ }
2134
22 - $order_ids = file('orphanlogs/order_ids.txt', FILE_SKIP_EMPTY_LINES);
23 - foreach ($order_ids as $key=>$val){
24 - $order_ids[$key] = trim($val);
 35+ $data = array(
 36+ 'wheeee' => 'yes'
 37+ );
 38+ $this->adapter = new GlobalCollectOrphanAdapter(array('external_data' => $data));
 39+
 40+ //Now, actually do the processing.
 41+ if ( method_exists( $this, $func ) ) {
 42+ $this->{$function_name}();
 43+ } else {
 44+ echo "There's no $func in Orphan Rectifying!\n";
 45+ die();
2546 }
26 - foreach ($order_ids as $id){
 47+ }
 48+
 49+ function orphan_stomp(){
 50+
 51+ $this->removed_message_count = 0;
 52+ $this->now = time(); //time at start, thanks very much.
 53+
 54+ //I want to be clear on the problem I hope to prevent with this.
 55+ //Say, for instance, we pull a legit orphan, and for whatever reason, can't completely rectify it.
 56+ //Then, we go back and pull more... and that same one is in the list again. We should stop after one try per message per execute.
 57+ //We should also be smart enough to not process things we believe we just deleted.
 58+ $this->handled_ids = array();
 59+
 60+ //first, we need to... clean up the limbo queue.
 61+ $this->handleStompAntiMessages();
 62+ $this->adapter->log( 'Removed ' . $this->removed_message_count . ' messages and antimessages.' );
 63+
 64+ //Pull a batch of CC orphans, keeping in mind that Things May Have Happened in the small slice of time since we handled the antimessages.
 65+ $orphans = $this->getStompOrphans();
 66+ while ( count( $orphans ) && $this->keepGoing() ){
 67+ //..do stuff.
 68+ foreach ( $orphans as $correlation_id => $orphan ) {
 69+ //process
 70+ if ( $this->rectifyOrphan( $orphan ) ){
 71+ $this->addStompCorrelationIDToAckBucket( $correlation_id );
 72+ $this->handled_ids[$correlation_id] = 'rectified';
 73+ } else {
 74+ $this->handled_ids[$correlation_id] = 'error';
 75+ }
 76+ }
 77+ }
 78+
 79+ $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it.
 80+
 81+ //TODO: Make stats squirt out all over the place.
 82+ $am = 0;
 83+ $rec = 0;
 84+ $err = 0;
 85+ foreach( $this->handled_ids as $id=>$whathappened ){
 86+ switch ( $whathappened ){
 87+ case 'antimessage' :
 88+ $am += 1;
 89+ break;
 90+ case 'rectified' :
 91+ $rec += 1;
 92+ break;
 93+ case 'error' :
 94+ $err += 1;
 95+ break;
 96+ }
 97+ }
 98+ echo "\nDone! Final results: \n $am destroyed via antimessage \n $rec rectified orphans \n $err errored out\n";
 99+
 100+ }
 101+
 102+ function keepGoing(){
 103+ $elapsed = time() - $this->now;
 104+ if ( $elapsed < $this->target_execute_time ){
 105+ return true;
 106+ } else {
 107+ return false;
 108+ }
 109+ }
 110+
 111+ function addStompCorrelationIDToAckBucket( $correlation_id, $ackNow = false ){
 112+ static $bucket = array();
 113+ $count = 50; //sure. Why not?
 114+ if ( $correlation_id ) {
 115+ $bucket[$correlation_id] = "'$correlation_id'"; //avoiding duplicates.
 116+ $this->handled_ids[$correlation_id] = 'antimessage';
 117+ }
 118+ if ( count( $bucket ) && ( count( $bucket ) >= $count || $ackNow ) ){
 119+ //ack now.
 120+ $selector = 'JMSCorrelationID IN (' . implode( ", ", $bucket ) . ')';
 121+ $ackMe = stompFetchMessages( 'limbo', $selector, $count * 100 ); //This is outrageously high, but I just want to be reasonably sure we get all the matches.
 122+ $retrieved_count = count( $ackMe );
 123+ if ( $retrieved_count ){
 124+ stompAckMessages( $ackMe );
 125+ $this->removed_message_count += $retrieved_count;
 126+ }
 127+ $bucket = array();
 128+ }
 129+
 130+ }
 131+
 132+ function handleStompAntiMessages(){
 133+ $selector = "antimessage = 'true'";
 134+ $antimessages = stompFetchMessages( 'limbo', $selector, 1000 );
 135+ $count = 0;
 136+ while ( count( $antimessages ) ){ //if there's an antimessage, we can ack 'em all right now.
 137+ $count += count( $antimessages );
 138+ foreach ( $antimessages as $message ){
 139+ //add the correlation ID to the ack bucket.
 140+ if (array_key_exists('correlation-id', $message->headers)) {
 141+ $this->addStompCorrelationIDToAckBucket( $message->headers['correlation-id'] );
 142+ } else {
 143+ echo 'The STOMP message ' . $message->headers['message-id'] . ' has no correlation ID!';
 144+ }
 145+ }
 146+ $antimessages = stompFetchMessages( 'limbo', $selector, 1000 );
 147+ }
 148+ $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it.
 149+ $this->adapter->log("Found $count antimessages.");
 150+ }
 151+
 152+ /**
 153+ * Returns an array of **at most** 300 decoded orphans that we don't think we've rectified yet.
 154+ * @return array keys are the correlation_id, and the values are the decoded stomp message body.
 155+ */
 156+ function getStompOrphans(){
 157+ $time_buffer = 60*20; //20 minutes? Sure. Why not?
 158+ $selector = "payment_method = 'cc'";
 159+ $messages = stompFetchMessages( 'limbo', $selector, 300 );
 160+ $orphans = array();
 161+ foreach ( $messages as $message ){
 162+ if ( !array_key_exists('antimessage', $message->headers )
 163+ && !array_key_exists( $message->headers['correlation-id'], $this->handled_ids ) ) {
 164+ //check the timestamp to see if it's old enough.
 165+ $decoded = json_decode($message->body, true);
 166+ if ( array_key_exists( 'date', $decoded ) ){
 167+ $elapsed = $this->now - $decoded['date'];
 168+ if ( $elapsed > $time_buffer ){
 169+ //we got ourselves an orphan!
 170+ $orphans[$message->headers['antimessage']] = $decoded;
 171+ }
 172+ }
 173+ }
 174+ }
 175+ return $orphans;
 176+ }
 177+
 178+ function parse_files(){
 179+ //all the old stuff goes here.
 180+ $order_ids = file( 'orphanlogs/order_ids.txt', FILE_SKIP_EMPTY_LINES );
 181+ foreach ( $order_ids as $key=>$val ){
 182+ $order_ids[$key] = trim( $val );
 183+ }
 184+ foreach ( $order_ids as $id ){
27185 $this->order_ids[$id] = $id; //easier to unset this way.
28186 }
29 - $outstanding_count = count($this->order_ids);
 187+ $outstanding_count = count( $this->order_ids );
30188 echo "Order ID count: " . $outstanding_count . "\n";
31189
32190 $files = $this->getAllLogFileNames();
33191 $payments = array();
34 - foreach ($files as $file){
35 - if (count($payments) < $this->max_per_execute){
 192+ foreach ( $files as $file ){
 193+ if ( count( $payments ) < $this->max_per_execute ){
36194 $file_array = $this->getLogfileLines( $file );
37 - $payments = array_merge($this->findTransactionLines($file_array), $payments);
38 - if (count($payments) === 0){
 195+ $payments = array_merge( $this->findTransactionLines( $file_array ), $payments );
 196+ if ( count( $payments ) === 0 ){
39197 $this->killfiles[] = $file;
40 - echo print_r($this->killfiles, true);
 198+ echo print_r( $this->killfiles, true );
41199 }
42200 }
43 - }
 201+ }
44202
45 - $data = array(
46 - 'wheeee' => 'yes'
47 - );
48 -
49 - $adapter = new GlobalCollectOrphanAdapter(array('external_data' => $data));
50 - $adapter->setCurrentTransaction('INSERT_ORDERWITHPAYMENT');
51 - $var_map = $adapter->defineVarMap();
52 -
 203+ $this->adapter->setCurrentTransaction('INSERT_ORDERWITHPAYMENT');
53204 $xml = new DomDocument;
54205
55206 //fields that have generated notices if they're not there.
@@ -69,12 +220,11 @@
70221 'zip2',
71222 );
72223
73 -
74224 foreach ($payments as $key => $payment_data){
75225 $xml->loadXML($payment_data['xml']);
76 - $parsed = $adapter->getResponseData($xml);
 226+ $parsed = $this->adapter->getResponseData($xml);
77227 $payments[$key]['parsed'] = $parsed;
78 - $payments[$key]['unstaged'] = $adapter->unstage_data($parsed);
 228+ $payments[$key]['unstaged'] = $this->adapter->unstage_data($parsed);
79229 $payments[$key]['unstaged']['contribution_tracking_id'] = $payments[$key]['contribution_tracking_id'];
80230 $payments[$key]['unstaged']['i_order_id'] = $payments[$key]['unstaged']['order_id'];
81231 foreach ($additional_fields as $val){
@@ -92,18 +242,9 @@
93243 foreach($payments as $payment_data){
94244 if ($i < $this->max_per_execute){
95245 ++$i;
96 - $adapter->loadDataAndReInit($payment_data['unstaged']);
97 - $results = $adapter->do_transaction('Confirm_CreditCard');
98 - if ($results['status']){
99 - $adapter->log( $payment_data['unstaged']['contribution_tracking_id'] . ": FINAL: " . $results['action']);
100 - unset($this->order_ids[$payment_data['unstaged']['order_id']]);
101 - } else {
102 - $adapter->log( $payment_data['unstaged']['contribution_tracking_id'] . ": ERROR: " . $results['message']);
103 - if (strpos($results['message'], "GET_ORDERSTATUS reports that the payment is already complete.")){
104 - unset($this->order_ids[$payment_data['unstaged']['order_id']]);
105 - }
 246+ if ( $this->rectifyOrphan( $payment_data['unstaged'] ) ) {
 247+ unset( $this->order_ids[$payment_data['unstaged']['order_id']] );
106248 }
107 - echo $results['message'] . "\n";
108249 }
109250 }
110251
@@ -112,6 +253,32 @@
113254 }
114255 }
115256
 257+ /**
 258+ * Uses the Orphan Adapter to rectify a single orphan. Returns a boolean letting the caller know if
 259+ * the orphan has been fully rectified or not.
 260+ * @param array $data Some set of orphan data.
 261+ * @param boolean $query_contribution_tracking A flag specifying if we should query the contribution_tracking table or not.
 262+ * @return boolean True if the orphan has been rectified, false if not.
 263+ */
 264+ function rectifyOrphan( $data, $query_contribution_tracking = true ){
 265+ $rectified = false;
 266+
 267+ $this->adapter->loadDataAndReInit( $data, $query_contribution_tracking );
 268+ $results = $this->adapter->do_transaction( 'Confirm_CreditCard' );
 269+ if ($results['status']){
 270+ $this->adapter->log( $data['contribution_tracking_id'] . ": FINAL: " . $results['action'] );
 271+ $rectified = true;
 272+ } else {
 273+ $this->adapter->log( $data['contribution_tracking_id'] . ": ERROR: " . $results['message'] );
 274+ if ( strpos( $results['message'], "GET_ORDERSTATUS reports that the payment is already complete." ) ){
 275+ $rectified = true;
 276+ }
 277+ }
 278+ echo $results['message'] . "\n";
 279+
 280+ return $rectified;
 281+ }
 282+
116283 function getAllLogFileNames(){
117284 $files = array();
118285 if ($handle = opendir(dirname(__FILE__) . '/orphanlogs/')){
Index: trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphan_adapter.php
@@ -31,7 +31,7 @@
3232 return $unstaged;
3333 }
3434
35 - public function loadDataAndReInit( $data ) {
 35+ public function loadDataAndReInit( $data, $useDB = true ) {
3636 $this->batch = true; //or the hooks will accumulate badness.
3737 //re-init all these arrays, because this is a batch thing.
3838 $this->hard_data = array( );
@@ -46,7 +46,19 @@
4747
4848 $this->raw_data = $this->dataObj->getData();
4949
50 - $this->hard_data = array_merge( $this->hard_data, $this->getUTMInfoFromDB() );
 50+ if ( $useDB ){
 51+ $this->hard_data = array_merge( $this->hard_data, $this->getUTMInfoFromDB() );
 52+ } else {
 53+ $utm_keys = array(
 54+ 'utm_source',
 55+ 'utm_campaign',
 56+ 'utm_medium',
 57+ 'date'
 58+ );
 59+ foreach($utm_keys as $key){
 60+ $this->hard_data[$key] = $data[$key];
 61+ }
 62+ }
5163 $this->reAddHardData();
5264
5365 $this->staged_data = $this->raw_data;
@@ -178,11 +190,14 @@
179191 return;
180192 }
181193
182 -
183 - if ( !is_null( $this->getData_Raw( 'ts' ) ) ) {
184 - $timestamp = strtotime( $this->getData_Raw( 'ts' ) ); //I hate that this works.
 194+ if ( !is_null( $this->getData_Raw( 'date' ) ) ) {
 195+ $timestamp = $this->getData_Raw( 'date' );
185196 } else {
186 - $timestamp = time();
 197+ if ( !is_null( $this->getData_Raw( 'ts' ) ) ) {
 198+ $timestamp = strtotime( $this->getData_Raw( 'ts' ) ); //I hate that this works.
 199+ } else {
 200+ $timestamp = time();
 201+ }
187202 }
188203
189204 // send the thing.
Index: trunk/extensions/DonationInterface/globalcollect_gateway/globalcollect.adapter.php
@@ -1070,7 +1070,7 @@
10711071 }
10721072 }
10731073
1074 - $post_status_check = false;
 1074+ $is_orphan = false;
10751075 if ( count( $addme ) ){ //nothing unusual here.
10761076 $this->addData( $addme );
10771077 $logmsg = $this->getData_Raw( 'contribution_tracking_id' ) . ': ';
@@ -1079,18 +1079,24 @@
10801080 self::log( $logmsg );
10811081 } else { //this is an orphan transaction.
10821082 $this->staged_data['order_id'] = $this->staged_data['i_order_id'];
1083 - $post_status_check = true;
 1083+ $is_orphan = true;
10841084 }
10851085
 1086+ //have to change this code range: All these are usually "pending" and
 1087+ //that would still be true...
 1088+ //...aside from the fact that if the user has gotten this far, they left
 1089+ //the part where they could add more data.
 1090+ //By now, "incomplete" definitely means "failed" for 0-70.
 1091+ $this->addCodeRange( 'GET_ORDERSTATUS', 'STATUSID', 'failed', 0, 70 );
10861092 $status_result = $this->do_transaction( 'GET_ORDERSTATUS' );
10871093
10881094 $cancelflag = false; //this will denote the thing we're trying to do with the donation attempt
10891095 $problemflag = false; //this will get set to true, if we can't continue and need to give up and just log the hell out of it.
10901096 $problemmessage = ''; //to be used in conjunction with the flag.
1091 - $deletelimbomessageflag = false; //this tells us if we should delete this transaction's limbo queue message or not.
 1097+ $add_antimessage = false; //this tells us if we should add an antimessage when we are done or not.
10921098
10931099
1094 - if ( $post_status_check ){
 1100+ if ( $is_orphan ){
10951101 if ( array_key_exists('data', $status_result) ) {
10961102 foreach ( $pull_vars as $theirkey => $ourkey) {
10971103 if ( array_key_exists($theirkey, $status_result['data']) ) {
@@ -1134,13 +1140,13 @@
11351141 switch ( $order_status_results ){
11361142 case 'failed' :
11371143 case 'revised' :
1138 - $deletelimbomessageflag = true;
 1144+ $add_antimessage = true;
11391145 $cancelflag = true; //makes sure we don't try to confirm.
11401146 break;
11411147 case 'complete' :
11421148 $problemflag = true; //nothing to be done.
11431149 $problemmessage = "GET_ORDERSTATUS reports that the payment is already complete.";
1144 - $deletelimbomessageflag = true;
 1150+ $add_antimessage = true;
11451151 break;
11461152 }
11471153 }
@@ -1172,7 +1178,7 @@
11731179 $this->setTransactionResult( "Original Response Status (pre-SET_PAYMENT): " . $original_status_code, 'txn_message' );
11741180 $this->runPostProcessHooks(); //stomp is in here
11751181 $this->unsetAllSessionData();
1176 - $deletelimbomessageflag = true;
 1182+ $add_antimessage = true;
11771183 } else {
11781184 $problemflag = true;
11791185 $problemmessage = "SET_PAYMENT couldn't communicate properly!";
@@ -1184,7 +1190,7 @@
11851191 if ( isset( $final['status'] ) && $final['status'] === true ) {
11861192 $this->setTransactionWMFStatus( 'failed' );
11871193 $this->unsetAllSessionData();
1188 - $deletelimbomessageflag = true;
 1194+ $add_antimessage = true;
11891195 } else {
11901196 $problemflag = true;
11911197 $problemmessage = "CANCEL_PAYMENT couldn't communicate properly!";
@@ -1196,7 +1202,7 @@
11971203 }
11981204 }
11991205
1200 - if ( $deletelimbomessageflag ) {
 1206+ if ( $add_antimessage && !$is_orphan ) {
12011207 //As it happens, we can't remove things from the queue here: It
12021208 //takes way too dang long. (~5 seconds!)
12031209 //So, instead, I'll add an anti-message and deal with it later. (~.01 seconds)
Index: trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php
@@ -259,14 +259,16 @@
260260 break;
261261 }
262262
263 - $stomp = getDIStompConnection();
 263+ //This needs to be renewed every time, or the selectors won't work.
 264+ //So says the internets, at least.
 265+ $stomp = getDIStompConnection( true );
264266
265267 $properties = array( 'ack' => 'client' );
266 - if (!is_null($selector)){
 268+ if ( !is_null( $selector ) ){
267269 $properties['selector'] = $selector;
268270 }
269271
270 - $returned = $stomp->subscribe('/queue/' . $queue, $properties);
 272+ $returned = $stomp->subscribe( '/queue/' . $queue, $properties );
271273 $message = $stomp->readFrame();
272274
273275 $return = array();

Follow-up revisions

RevisionCommit summaryAuthorDate
r104791Another round of preparatory limbo stomp changes....khorn23:45, 30 November 2011
r105350MFT r104225, r104471, r104503, r104539, r104588, r104600, r104607, r104648, ...khorn21:12, 6 December 2011
r105351MFT r104225, r104471, r104503, r104539, r104588, r104600, r104607, r104648, ...khorn21:12, 6 December 2011
r105502MFT r103501, r103503, r104648, r104791, r104933khorn00:45, 8 December 2011

Past revisions this follows-up on

RevisionCommit summaryAuthorDate
r104503Establishes a 'limbo' queue for data that GlobalCollect is either extremely l...khorn23:12, 28 November 2011
r104539More changes for the 'limbo' queue functionality....khorn03:08, 29 November 2011
r104588r104503, r104539...khorn20:38, 29 November 2011

Comments

#Comment by Awjrichards (talk | contribs)   02:19, 6 December 2011
+		$data = array(
+			'wheeee' => 'yes'			
+		);
+		$this->adapter = new GlobalCollectOrphanAdapter(array('external_data' => $data));

Please comment around this whatever is going on here :p

+	function getStompOrphans(){
+		$time_buffer = 60*20; //20 minutes? Sure. Why not? 
+		$selector = "payment_method = 'cc'";
+		$messages = stompFetchMessages( 'limbo', $selector, 300 );

It would be great if the batch size for stompFetchMessages was configurable, as well as the $time_buffer.

Status & tagging log