Index: trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphans.php |
— | — | @@ -13,42 +13,193 @@ |
14 | 14 | |
15 | 15 | protected $killfiles = array(); |
16 | 16 | protected $order_ids = array(); |
17 | | - protected $max_per_execute = 3; |
| 17 | + protected $max_per_execute = 500; //only really used if you're going by-file. |
| 18 | + protected $target_execute_time = 30; //(seconds) - only used by the stomp option. |
| 19 | + protected $adapter; |
18 | 20 | |
19 | 21 | |
20 | 22 | function execute(){ |
| 23 | + $func = 'parse_files'; |
| 24 | + if ( !empty( $_SERVER['argv'][1] ) ){ |
| 25 | + if ( $_SERVER['argv'][1] === 'stomp' ){ |
| 26 | + $func = 'orphan_stomp'; |
| 27 | + if ( !empty( $_SERVER['argv'][2] ) && is_numeric( $_SERVER['argv'][2] ) ){ |
| 28 | + $this->target_execute_time = $_SERVER['argv'][2]; |
| 29 | + } |
| 30 | + } elseif ( is_numeric( $_SERVER['argv'][1] ) ){ |
| 31 | + $this->max_per_execute = $_SERVER['argv'][1]; |
| 32 | + } |
| 33 | + } |
21 | 34 | |
22 | | - $order_ids = file('orphanlogs/order_ids.txt', FILE_SKIP_EMPTY_LINES); |
23 | | - foreach ($order_ids as $key=>$val){ |
24 | | - $order_ids[$key] = trim($val); |
| 35 | + $data = array( |
| 36 | + 'wheeee' => 'yes' |
| 37 | + ); |
| 38 | + $this->adapter = new GlobalCollectOrphanAdapter(array('external_data' => $data)); |
| 39 | + |
| 40 | + //Now, actually do the processing. |
| 41 | + if ( method_exists( $this, $func ) ) { |
| 42 | + $this->{$function_name}(); |
| 43 | + } else { |
| 44 | + echo "There's no $func in Orphan Rectifying!\n"; |
| 45 | + die(); |
25 | 46 | } |
26 | | - foreach ($order_ids as $id){ |
| 47 | + } |
| 48 | + |
| 49 | + function orphan_stomp(){ |
| 50 | + |
| 51 | + $this->removed_message_count = 0; |
| 52 | + $this->now = time(); //time at start, thanks very much. |
| 53 | + |
| 54 | + //I want to be clear on the problem I hope to prevent with this. |
| 55 | + //Say, for instance, we pull a legit orphan, and for whatever reason, can't completely rectify it. |
| 56 | + //Then, we go back and pull more... and that same one is in the list again. We should stop after one try per message per execute. |
| 57 | + //We should also be smart enough to not process things we believe we just deleted. |
| 58 | + $this->handled_ids = array(); |
| 59 | + |
| 60 | + //first, we need to... clean up the limbo queue. |
| 61 | + $this->handleStompAntiMessages(); |
| 62 | + $this->adapter->log( 'Removed ' . $this->removed_message_count . ' messages and antimessages.' ); |
| 63 | + |
| 64 | + //Pull a batch of CC orphans, keeping in mind that Things May Have Happened in the small slice of time since we handled the antimessages. |
| 65 | + $orphans = $this->getStompOrphans(); |
| 66 | + while ( count( $orphans ) && $this->keepGoing() ){ |
| 67 | + //..do stuff. |
| 68 | + foreach ( $orphans as $correlation_id => $orphan ) { |
| 69 | + //process |
| 70 | + if ( $this->rectifyOrphan( $orphan ) ){ |
| 71 | + $this->addStompCorrelationIDToAckBucket( $correlation_id ); |
| 72 | + $this->handled_ids[$correlation_id] = 'rectified'; |
| 73 | + } else { |
| 74 | + $this->handled_ids[$correlation_id] = 'error'; |
| 75 | + } |
| 76 | + } |
| 77 | + } |
| 78 | + |
| 79 | + $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it. |
| 80 | + |
| 81 | + //TODO: Make stats squirt out all over the place. |
| 82 | + $am = 0; |
| 83 | + $rec = 0; |
| 84 | + $err = 0; |
| 85 | + foreach( $this->handled_ids as $id=>$whathappened ){ |
| 86 | + switch ( $whathappened ){ |
| 87 | + case 'antimessage' : |
| 88 | + $am += 1; |
| 89 | + break; |
| 90 | + case 'rectified' : |
| 91 | + $rec += 1; |
| 92 | + break; |
| 93 | + case 'error' : |
| 94 | + $err += 1; |
| 95 | + break; |
| 96 | + } |
| 97 | + } |
| 98 | + echo "\nDone! Final results: \n $am destroyed via antimessage \n $rec rectified orphans \n $err errored out\n"; |
| 99 | + |
| 100 | + } |
| 101 | + |
| 102 | + function keepGoing(){ |
| 103 | + $elapsed = time() - $this->now; |
| 104 | + if ( $elapsed < $this->target_execute_time ){ |
| 105 | + return true; |
| 106 | + } else { |
| 107 | + return false; |
| 108 | + } |
| 109 | + } |
| 110 | + |
| 111 | + function addStompCorrelationIDToAckBucket( $correlation_id, $ackNow = false ){ |
| 112 | + static $bucket = array(); |
| 113 | + $count = 50; //sure. Why not? |
| 114 | + if ( $correlation_id ) { |
| 115 | + $bucket[$correlation_id] = "'$correlation_id'"; //avoiding duplicates. |
| 116 | + $this->handled_ids[$correlation_id] = 'antimessage'; |
| 117 | + } |
| 118 | + if ( count( $bucket ) && ( count( $bucket ) >= $count || $ackNow ) ){ |
| 119 | + //ack now. |
| 120 | + $selector = 'JMSCorrelationID IN (' . implode( ", ", $bucket ) . ')'; |
| 121 | + $ackMe = stompFetchMessages( 'limbo', $selector, $count * 100 ); //This is outrageously high, but I just want to be reasonably sure we get all the matches. |
| 122 | + $retrieved_count = count( $ackMe ); |
| 123 | + if ( $retrieved_count ){ |
| 124 | + stompAckMessages( $ackMe ); |
| 125 | + $this->removed_message_count += $retrieved_count; |
| 126 | + } |
| 127 | + $bucket = array(); |
| 128 | + } |
| 129 | + |
| 130 | + } |
| 131 | + |
| 132 | + function handleStompAntiMessages(){ |
| 133 | + $selector = "antimessage = 'true'"; |
| 134 | + $antimessages = stompFetchMessages( 'limbo', $selector, 1000 ); |
| 135 | + $count = 0; |
| 136 | + while ( count( $antimessages ) ){ //if there's an antimessage, we can ack 'em all right now. |
| 137 | + $count += count( $antimessages ); |
| 138 | + foreach ( $antimessages as $message ){ |
| 139 | + //add the correlation ID to the ack bucket. |
| 140 | + if (array_key_exists('correlation-id', $message->headers)) { |
| 141 | + $this->addStompCorrelationIDToAckBucket( $message->headers['correlation-id'] ); |
| 142 | + } else { |
| 143 | + echo 'The STOMP message ' . $message->headers['message-id'] . ' has no correlation ID!'; |
| 144 | + } |
| 145 | + } |
| 146 | + $antimessages = stompFetchMessages( 'limbo', $selector, 1000 ); |
| 147 | + } |
| 148 | + $this->addStompCorrelationIDToAckBucket( false, true ); //this just acks everything that's waiting for it. |
| 149 | + $this->adapter->log("Found $count antimessages."); |
| 150 | + } |
| 151 | + |
| 152 | + /** |
| 153 | + * Returns an array of **at most** 300 decoded orphans that we don't think we've rectified yet. |
| 154 | + * @return array keys are the correlation_id, and the values are the decoded stomp message body. |
| 155 | + */ |
| 156 | + function getStompOrphans(){ |
| 157 | + $time_buffer = 60*20; //20 minutes? Sure. Why not? |
| 158 | + $selector = "payment_method = 'cc'"; |
| 159 | + $messages = stompFetchMessages( 'limbo', $selector, 300 ); |
| 160 | + $orphans = array(); |
| 161 | + foreach ( $messages as $message ){ |
| 162 | + if ( !array_key_exists('antimessage', $message->headers ) |
| 163 | + && !array_key_exists( $message->headers['correlation-id'], $this->handled_ids ) ) { |
| 164 | + //check the timestamp to see if it's old enough. |
| 165 | + $decoded = json_decode($message->body, true); |
| 166 | + if ( array_key_exists( 'date', $decoded ) ){ |
| 167 | + $elapsed = $this->now - $decoded['date']; |
| 168 | + if ( $elapsed > $time_buffer ){ |
| 169 | + //we got ourselves an orphan! |
| 170 | + $orphans[$message->headers['antimessage']] = $decoded; |
| 171 | + } |
| 172 | + } |
| 173 | + } |
| 174 | + } |
| 175 | + return $orphans; |
| 176 | + } |
| 177 | + |
| 178 | + function parse_files(){ |
| 179 | + //all the old stuff goes here. |
| 180 | + $order_ids = file( 'orphanlogs/order_ids.txt', FILE_SKIP_EMPTY_LINES ); |
| 181 | + foreach ( $order_ids as $key=>$val ){ |
| 182 | + $order_ids[$key] = trim( $val ); |
| 183 | + } |
| 184 | + foreach ( $order_ids as $id ){ |
27 | 185 | $this->order_ids[$id] = $id; //easier to unset this way. |
28 | 186 | } |
29 | | - $outstanding_count = count($this->order_ids); |
| 187 | + $outstanding_count = count( $this->order_ids ); |
30 | 188 | echo "Order ID count: " . $outstanding_count . "\n"; |
31 | 189 | |
32 | 190 | $files = $this->getAllLogFileNames(); |
33 | 191 | $payments = array(); |
34 | | - foreach ($files as $file){ |
35 | | - if (count($payments) < $this->max_per_execute){ |
| 192 | + foreach ( $files as $file ){ |
| 193 | + if ( count( $payments ) < $this->max_per_execute ){ |
36 | 194 | $file_array = $this->getLogfileLines( $file ); |
37 | | - $payments = array_merge($this->findTransactionLines($file_array), $payments); |
38 | | - if (count($payments) === 0){ |
| 195 | + $payments = array_merge( $this->findTransactionLines( $file_array ), $payments ); |
| 196 | + if ( count( $payments ) === 0 ){ |
39 | 197 | $this->killfiles[] = $file; |
40 | | - echo print_r($this->killfiles, true); |
| 198 | + echo print_r( $this->killfiles, true ); |
41 | 199 | } |
42 | 200 | } |
43 | | - } |
| 201 | + } |
44 | 202 | |
45 | | - $data = array( |
46 | | - 'wheeee' => 'yes' |
47 | | - ); |
48 | | - |
49 | | - $adapter = new GlobalCollectOrphanAdapter(array('external_data' => $data)); |
50 | | - $adapter->setCurrentTransaction('INSERT_ORDERWITHPAYMENT'); |
51 | | - $var_map = $adapter->defineVarMap(); |
52 | | - |
| 203 | + $this->adapter->setCurrentTransaction('INSERT_ORDERWITHPAYMENT'); |
53 | 204 | $xml = new DomDocument; |
54 | 205 | |
55 | 206 | //fields that have generated notices if they're not there. |
— | — | @@ -69,12 +220,11 @@ |
70 | 221 | 'zip2', |
71 | 222 | ); |
72 | 223 | |
73 | | - |
74 | 224 | foreach ($payments as $key => $payment_data){ |
75 | 225 | $xml->loadXML($payment_data['xml']); |
76 | | - $parsed = $adapter->getResponseData($xml); |
| 226 | + $parsed = $this->adapter->getResponseData($xml); |
77 | 227 | $payments[$key]['parsed'] = $parsed; |
78 | | - $payments[$key]['unstaged'] = $adapter->unstage_data($parsed); |
| 228 | + $payments[$key]['unstaged'] = $this->adapter->unstage_data($parsed); |
79 | 229 | $payments[$key]['unstaged']['contribution_tracking_id'] = $payments[$key]['contribution_tracking_id']; |
80 | 230 | $payments[$key]['unstaged']['i_order_id'] = $payments[$key]['unstaged']['order_id']; |
81 | 231 | foreach ($additional_fields as $val){ |
— | — | @@ -92,18 +242,9 @@ |
93 | 243 | foreach($payments as $payment_data){ |
94 | 244 | if ($i < $this->max_per_execute){ |
95 | 245 | ++$i; |
96 | | - $adapter->loadDataAndReInit($payment_data['unstaged']); |
97 | | - $results = $adapter->do_transaction('Confirm_CreditCard'); |
98 | | - if ($results['status']){ |
99 | | - $adapter->log( $payment_data['unstaged']['contribution_tracking_id'] . ": FINAL: " . $results['action']); |
100 | | - unset($this->order_ids[$payment_data['unstaged']['order_id']]); |
101 | | - } else { |
102 | | - $adapter->log( $payment_data['unstaged']['contribution_tracking_id'] . ": ERROR: " . $results['message']); |
103 | | - if (strpos($results['message'], "GET_ORDERSTATUS reports that the payment is already complete.")){ |
104 | | - unset($this->order_ids[$payment_data['unstaged']['order_id']]); |
105 | | - } |
| 246 | + if ( $this->rectifyOrphan( $payment_data['unstaged'] ) ) { |
| 247 | + unset( $this->order_ids[$payment_data['unstaged']['order_id']] ); |
106 | 248 | } |
107 | | - echo $results['message'] . "\n"; |
108 | 249 | } |
109 | 250 | } |
110 | 251 | |
— | — | @@ -112,6 +253,32 @@ |
113 | 254 | } |
114 | 255 | } |
115 | 256 | |
| 257 | + /** |
| 258 | + * Uses the Orphan Adapter to rectify a single orphan. Returns a boolean letting the caller know if |
| 259 | + * the orphan has been fully rectified or not. |
| 260 | + * @param array $data Some set of orphan data. |
| 261 | + * @param boolean $query_contribution_tracking A flag specifying if we should query the contribution_tracking table or not. |
| 262 | + * @return boolean True if the orphan has been rectified, false if not. |
| 263 | + */ |
| 264 | + function rectifyOrphan( $data, $query_contribution_tracking = true ){ |
| 265 | + $rectified = false; |
| 266 | + |
| 267 | + $this->adapter->loadDataAndReInit( $data, $query_contribution_tracking ); |
| 268 | + $results = $this->adapter->do_transaction( 'Confirm_CreditCard' ); |
| 269 | + if ($results['status']){ |
| 270 | + $this->adapter->log( $data['contribution_tracking_id'] . ": FINAL: " . $results['action'] ); |
| 271 | + $rectified = true; |
| 272 | + } else { |
| 273 | + $this->adapter->log( $data['contribution_tracking_id'] . ": ERROR: " . $results['message'] ); |
| 274 | + if ( strpos( $results['message'], "GET_ORDERSTATUS reports that the payment is already complete." ) ){ |
| 275 | + $rectified = true; |
| 276 | + } |
| 277 | + } |
| 278 | + echo $results['message'] . "\n"; |
| 279 | + |
| 280 | + return $rectified; |
| 281 | + } |
| 282 | + |
116 | 283 | function getAllLogFileNames(){ |
117 | 284 | $files = array(); |
118 | 285 | if ($handle = opendir(dirname(__FILE__) . '/orphanlogs/')){ |
Index: trunk/extensions/DonationInterface/globalcollect_gateway/scripts/orphan_adapter.php |
— | — | @@ -31,7 +31,7 @@ |
32 | 32 | return $unstaged; |
33 | 33 | } |
34 | 34 | |
35 | | - public function loadDataAndReInit( $data ) { |
| 35 | + public function loadDataAndReInit( $data, $useDB = true ) { |
36 | 36 | $this->batch = true; //or the hooks will accumulate badness. |
37 | 37 | //re-init all these arrays, because this is a batch thing. |
38 | 38 | $this->hard_data = array( ); |
— | — | @@ -46,7 +46,19 @@ |
47 | 47 | |
48 | 48 | $this->raw_data = $this->dataObj->getData(); |
49 | 49 | |
50 | | - $this->hard_data = array_merge( $this->hard_data, $this->getUTMInfoFromDB() ); |
| 50 | + if ( $useDB ){ |
| 51 | + $this->hard_data = array_merge( $this->hard_data, $this->getUTMInfoFromDB() ); |
| 52 | + } else { |
| 53 | + $utm_keys = array( |
| 54 | + 'utm_source', |
| 55 | + 'utm_campaign', |
| 56 | + 'utm_medium', |
| 57 | + 'date' |
| 58 | + ); |
| 59 | + foreach($utm_keys as $key){ |
| 60 | + $this->hard_data[$key] = $data[$key]; |
| 61 | + } |
| 62 | + } |
51 | 63 | $this->reAddHardData(); |
52 | 64 | |
53 | 65 | $this->staged_data = $this->raw_data; |
— | — | @@ -178,11 +190,14 @@ |
179 | 191 | return; |
180 | 192 | } |
181 | 193 | |
182 | | - |
183 | | - if ( !is_null( $this->getData_Raw( 'ts' ) ) ) { |
184 | | - $timestamp = strtotime( $this->getData_Raw( 'ts' ) ); //I hate that this works. |
| 194 | + if ( !is_null( $this->getData_Raw( 'date' ) ) ) { |
| 195 | + $timestamp = $this->getData_Raw( 'date' ); |
185 | 196 | } else { |
186 | | - $timestamp = time(); |
| 197 | + if ( !is_null( $this->getData_Raw( 'ts' ) ) ) { |
| 198 | + $timestamp = strtotime( $this->getData_Raw( 'ts' ) ); //I hate that this works. |
| 199 | + } else { |
| 200 | + $timestamp = time(); |
| 201 | + } |
187 | 202 | } |
188 | 203 | |
189 | 204 | // send the thing. |
Index: trunk/extensions/DonationInterface/globalcollect_gateway/globalcollect.adapter.php |
— | — | @@ -1070,7 +1070,7 @@ |
1071 | 1071 | } |
1072 | 1072 | } |
1073 | 1073 | |
1074 | | - $post_status_check = false; |
| 1074 | + $is_orphan = false; |
1075 | 1075 | if ( count( $addme ) ){ //nothing unusual here. |
1076 | 1076 | $this->addData( $addme ); |
1077 | 1077 | $logmsg = $this->getData_Raw( 'contribution_tracking_id' ) . ': '; |
— | — | @@ -1079,18 +1079,24 @@ |
1080 | 1080 | self::log( $logmsg ); |
1081 | 1081 | } else { //this is an orphan transaction. |
1082 | 1082 | $this->staged_data['order_id'] = $this->staged_data['i_order_id']; |
1083 | | - $post_status_check = true; |
| 1083 | + $is_orphan = true; |
1084 | 1084 | } |
1085 | 1085 | |
| 1086 | + //have to change this code range: All these are usually "pending" and |
| 1087 | + //that would still be true... |
| 1088 | + //...aside from the fact that if the user has gotten this far, they left |
| 1089 | + //the part where they could add more data. |
| 1090 | + //By now, "incomplete" definitely means "failed" for 0-70. |
| 1091 | + $this->addCodeRange( 'GET_ORDERSTATUS', 'STATUSID', 'failed', 0, 70 ); |
1086 | 1092 | $status_result = $this->do_transaction( 'GET_ORDERSTATUS' ); |
1087 | 1093 | |
1088 | 1094 | $cancelflag = false; //this will denote the thing we're trying to do with the donation attempt |
1089 | 1095 | $problemflag = false; //this will get set to true, if we can't continue and need to give up and just log the hell out of it. |
1090 | 1096 | $problemmessage = ''; //to be used in conjunction with the flag. |
1091 | | - $deletelimbomessageflag = false; //this tells us if we should delete this transaction's limbo queue message or not. |
| 1097 | + $add_antimessage = false; //this tells us if we should add an antimessage when we are done or not. |
1092 | 1098 | |
1093 | 1099 | |
1094 | | - if ( $post_status_check ){ |
| 1100 | + if ( $is_orphan ){ |
1095 | 1101 | if ( array_key_exists('data', $status_result) ) { |
1096 | 1102 | foreach ( $pull_vars as $theirkey => $ourkey) { |
1097 | 1103 | if ( array_key_exists($theirkey, $status_result['data']) ) { |
— | — | @@ -1134,13 +1140,13 @@ |
1135 | 1141 | switch ( $order_status_results ){ |
1136 | 1142 | case 'failed' : |
1137 | 1143 | case 'revised' : |
1138 | | - $deletelimbomessageflag = true; |
| 1144 | + $add_antimessage = true; |
1139 | 1145 | $cancelflag = true; //makes sure we don't try to confirm. |
1140 | 1146 | break; |
1141 | 1147 | case 'complete' : |
1142 | 1148 | $problemflag = true; //nothing to be done. |
1143 | 1149 | $problemmessage = "GET_ORDERSTATUS reports that the payment is already complete."; |
1144 | | - $deletelimbomessageflag = true; |
| 1150 | + $add_antimessage = true; |
1145 | 1151 | break; |
1146 | 1152 | } |
1147 | 1153 | } |
— | — | @@ -1172,7 +1178,7 @@ |
1173 | 1179 | $this->setTransactionResult( "Original Response Status (pre-SET_PAYMENT): " . $original_status_code, 'txn_message' ); |
1174 | 1180 | $this->runPostProcessHooks(); //stomp is in here |
1175 | 1181 | $this->unsetAllSessionData(); |
1176 | | - $deletelimbomessageflag = true; |
| 1182 | + $add_antimessage = true; |
1177 | 1183 | } else { |
1178 | 1184 | $problemflag = true; |
1179 | 1185 | $problemmessage = "SET_PAYMENT couldn't communicate properly!"; |
— | — | @@ -1184,7 +1190,7 @@ |
1185 | 1191 | if ( isset( $final['status'] ) && $final['status'] === true ) { |
1186 | 1192 | $this->setTransactionWMFStatus( 'failed' ); |
1187 | 1193 | $this->unsetAllSessionData(); |
1188 | | - $deletelimbomessageflag = true; |
| 1194 | + $add_antimessage = true; |
1189 | 1195 | } else { |
1190 | 1196 | $problemflag = true; |
1191 | 1197 | $problemmessage = "CANCEL_PAYMENT couldn't communicate properly!"; |
— | — | @@ -1196,7 +1202,7 @@ |
1197 | 1203 | } |
1198 | 1204 | } |
1199 | 1205 | |
1200 | | - if ( $deletelimbomessageflag ) { |
| 1206 | + if ( $add_antimessage && !$is_orphan ) { |
1201 | 1207 | //As it happens, we can't remove things from the queue here: It |
1202 | 1208 | //takes way too dang long. (~5 seconds!) |
1203 | 1209 | //So, instead, I'll add an anti-message and deal with it later. (~.01 seconds) |
Index: trunk/extensions/DonationInterface/activemq_stomp/activemq_stomp.php |
— | — | @@ -259,14 +259,16 @@ |
260 | 260 | break; |
261 | 261 | } |
262 | 262 | |
263 | | - $stomp = getDIStompConnection(); |
| 263 | + //This needs to be renewed every time, or the selectors won't work. |
| 264 | + //So says the internets, at least. |
| 265 | + $stomp = getDIStompConnection( true ); |
264 | 266 | |
265 | 267 | $properties = array( 'ack' => 'client' ); |
266 | | - if (!is_null($selector)){ |
| 268 | + if ( !is_null( $selector ) ){ |
267 | 269 | $properties['selector'] = $selector; |
268 | 270 | } |
269 | 271 | |
270 | | - $returned = $stomp->subscribe('/queue/' . $queue, $properties); |
| 272 | + $returned = $stomp->subscribe( '/queue/' . $queue, $properties ); |
271 | 273 | $message = $stomp->readFrame(); |
272 | 274 | |
273 | 275 | $return = array(); |