r79965 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r79964‎ | r79965 | r79966 >
Date:22:21, 10 January 2011
Author:awjrichards
Status:deferred (Comments)
Tags:
Comment:
Adding new submodule to queue2civicrm to handle processing of 'subscription' payments from PayPal (which are made up of 6 different message subtypes). Much like queue2civicrm, it allows you to configure a particular queue to consume messages from. Unlike queue2civicrm, it can process raw paypal messages (mostly because at the moment, we've been accepting subscription payments with no way to process them, so we have a huge backlog of raw subscription payment messages in the queue). This is currently untested, but should at least be syntax-error free. This also comes with some refactoring of queue2civicrm to abstract out some commonly used functionality, and breaking functions down into more discrete chunks of functionality - all of which has been moved to queue2civirm_common.inc.
Modified paths:
  • /civicrm/trunk/sites/all/modules/queue2civicrm/queue2civicrm.module (modified) (history)
  • /civicrm/trunk/sites/all/modules/queue2civicrm/queue2civicrm_common.inc (added) (history)
  • /civicrm/trunk/sites/all/modules/queue2civicrm/recurring (added) (history)
  • /civicrm/trunk/sites/all/modules/queue2civicrm/recurring/recurring.info (added) (history)
  • /civicrm/trunk/sites/all/modules/queue2civicrm/recurring/recurring.install (added) (history)
  • /civicrm/trunk/sites/all/modules/queue2civicrm/recurring/recurring.module (added) (history)

Diff [purge]

Index: civicrm/trunk/sites/all/modules/queue2civicrm/recurring/recurring.module
@@ -0,0 +1,636 @@
 2+<?php
 3+/**
 4+ * An extension of the queue2civicrm module to handle processing recurring payment
 5+ * related messages from a queue.
 6+ *
 7+ * You may notice the use of the words 'recurring' and 'subscription' - they are
 8+ * essentially interchangable. They have slightly different meanings in the
 9+ * PayPal land of subscription/recurring payments, but essentially they function
 10+ * the same. 'Recurring' more clearly captures the idea behind how the WMF uses
 11+ * PayPal's subscription payments and is how the fundraising team colloquially refers
 12+ * to the 'subscription' service, so 'recurring' is used here in attempt to promote
 13+ * greater clarity. 'Subscription' or its derivations are used in places where PayPal
 14+ * uses the word in its field/variable names (eg subscr_payment).
 15+ */
 16+
 17+// include common functions
 18+require_once( drupal_get_path( 'module', 'queue2civicrm' ) . '/queue2civicrm_common.inc' );
 19+
 20+/**
 21+ * Implementation of hook_form_alter
 22+ *
 23+ * To expand the configuration form for queue2civicrm_settings to include
 24+ * settings for recurring donations.
 25+ */
 26+function recurring_form_alter( &$form, &$form_state, $form_id ) {
 27+
 28+ // make sure we're altering the correct form
 29+ if ( $form_id != 'queue2civicrm_settings' ) {
 30+ return;
 31+ }
 32+
 33+ $form[ 'recurring' ] = array(
 34+ '#type' => 'fieldset',
 35+ '#title' => t( 'Recurring payments' ),
 36+ '#collapsible' => TRUE,
 37+ '#collapsed' => FALSE,
 38+ );
 39+
 40+ $form[ 'recurring' ][ 'subscription' ] = array(
 41+ '#type' => 'textfield',
 42+ '#title' => t('Subscription path'),
 43+ '#required' => TRUE,
 44+ '#default_value' => variable_get('recurring_subscription', '/queue/recurring_test'),
 45+ '#description' => t( 'The path to the queue for recurring payments.' ),
 46+ );
 47+
 48+ $form[ 'recurring' ][ 'batch' ] = array(
 49+ '#type' => 'select',
 50+ '#title' => t('Cron batch size'),
 51+ '#required' => TRUE,
 52+ '#default_value' => variable_get('recurring_batch', 0),
 53+ '#options' => array(
 54+ 0 => '0 (Disable)',
 55+ 1 => 1,
 56+ 5 => 5,
 57+ 10 => 10,
 58+ 20 => 20,
 59+ 30 => 30,
 60+ 40 => 40,
 61+ 50 => 50,
 62+ 75 => 75,
 63+ 100 => 100,
 64+ 120 => 120,
 65+ 150 => 150,
 66+ ),
 67+ );
 68+
 69+ return $form;
 70+}
 71+
 72+/**
 73+ * Implementation of hook queue2civicrm_batch_process
 74+ *
 75+ * @param $processed
 76+ */
 77+function recurring_queue2civicrm_batch_process( &$processed=0 ) {
 78+ $recurring_processed = 0;
 79+ watchdog('recurring', 'Attempting to process up to ' . variable_get('recurring_batch', 0) . ' contribution(s).');
 80+
 81+ // Attempt to dequeue items for the batch
 82+ for ($i = 0; $i < variable_get('recurring_batch', 0); ++$i) {
 83+ $success = recurring_process_msg();
 84+ if ($success) {
 85+ ++$recurring_processed;
 86+ } else {
 87+ break;
 88+ }
 89+ }
 90+
 91+ if ($recurring_processed > 0) {
 92+ watchdog('recurring', 'Successfully processed ' . $recurring_processed . ' contribution(s).');
 93+ }
 94+ else {
 95+ watchdog('recurring', 'No contributions processed.');
 96+ }
 97+
 98+ $processed += $recurring_processed;
 99+}
 100+
 101+/**
 102+ * Process one item from the queue.
 103+ *
 104+ * This is verry similar to queue2civicrm_dequeue, but it is more clearly named ;)
 105+ * and supports option dequeueing depending on return status of import function.
 106+ *
 107+ * @see recurring_import for status code information
 108+ */
 109+function recurring_process_msg() {
 110+ $con = _queue2civicrm_stomp_connection();
 111+
 112+ if ( $con ) {
 113+ $con->subscribe( variable_get( 'recurring_subscription', '/queue/test' ) );
 114+
 115+ $msg = $con->readFrame();
 116+
 117+ // Skip processing if no message to process.
 118+ if ( $msg !== FALSE ) {
 119+ watchdog( 'recurring', 'Read frame:<pre>' . check_plain(print_r( $msg, TRUE ) ) . '</pre>' );
 120+ set_time_limit( 60 );
 121+ try {
 122+ $msg_status = recurring_import( $msg );
 123+
 124+ if ( $msg_status == 1 ) {
 125+ $con->ack($msg);
 126+ watchdog( 'recurring', 'Frame from queue succesfully processed.', WATCHDOG_NOTICE );
 127+ return TRUE;
 128+ } elseif ( $msg_status == 2 ) {
 129+ watchdog( 'recurring', 'Frame from queue currently not processable, leaving in queue.', WATCHDOG_NOTICE );
 130+ return FALSE;
 131+ } else {
 132+ watchdog( 'recurring', 'Could not process frame from queue.', array(), WATCHDOG_ERROR );
 133+ return FALSE;
 134+ }
 135+ }
 136+ catch (Exception $e) {
 137+ watchdog('recurring', 'Could not process frame from queue.', array(), WATCHDOG_ERROR );
 138+ }
 139+ }
 140+ else {
 141+ watchdog('recurring', 'Nothing to process.');
 142+ }
 143+ }
 144+
 145+ return FALSE;
 146+}
 147+
 148+/**
 149+ * Import queued message contents to CiviCRM
 150+ *
 151+ * Return status codes:
 152+ * 0 = processing error
 153+ * 1 = fully processed, ready for removal
 154+ * 2 = not currently processable, return to queue
 155+ *
 156+ * @param $msg
 157+ * @return unknown_type
 158+ */
 159+function recurring_import( $msg ) {
 160+ global $txn_subscr_payment, $txn_subscr_acct;
 161+ civicrm_initialize(true);
 162+ $msg = recurring_normalize_msg( $msg );
 163+
 164+ // log the message
 165+ watchdog('recurring', 'Recurring msg:<pre>' . check_plain(print_r($msg, TRUE)) . '</pre>');
 166+
 167+ // define the subscription txn type for an actual 'payment'
 168+ $txn_subscr_payment = array( 'subscr_payment' );
 169+
 170+ // define the subscription txn types that affect the subscription account
 171+ $txn_subscr_acct = array(
 172+ 'subscr_cancel', // subscription canceled
 173+ 'sbuscr_eot', // subscription expired
 174+ 'subscr_failed', // failed signup
 175+ 'subscr_modify', // subscription modification
 176+ 'subscr_signup' // subscription account creation
 177+ );
 178+
 179+ // route the message to the appropriate handler depending on transaction type
 180+ if ( isset( $msg[ 'txn_type' ] ) && in_array( $msg[ 'txn_type' ], $txn_subscr_payment ) ) {
 181+ $ret_val = recurring_import_subscr_payment( $msg );
 182+ } elseif ( isset( $msg[ 'txn_type' ] ) && in_array( $msg[ 'txn_type' ], $txn_subscr_acct ) ) {
 183+ $ret_val = recurring_import_subscr_acct( $msg );
 184+ } else {
 185+ watchdog( 'recurring', 'Msg not recognized as a recurring payment related message.', WATCHDOG_NOTICE );
 186+ $ret_val = 0;
 187+ }
 188+
 189+ return $ret_val;
 190+}
 191+
 192+/**
 193+ * Import a recurring payment
 194+ *
 195+ * @param array $msg
 196+ * @return int
 197+ */
 198+function recurring_import_subscr_payment( $msg ) {
 199+ /**
 200+ * if the subscr_id is not set, we can't process it due to an error in the message.
 201+ *
 202+ * otherwise, check for the parent record in civicrm_contribution_recur.
 203+ * if one does not exist, the message is not ready for reprocessing, so requeue it.
 204+ *
 205+ * otherwise, process the payment.
 206+ */
 207+ if ( !isset( $msg[ 'subscr_id' ] ) ) {
 208+ watchdog( 'recurring', 'Msg missing the subscr_id; cannot process.', WATCHDOG_NOTICE );
 209+ return 0;
 210+ } elseif ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) { // check for parent record in civicrm_contribution_recur and fetch its id
 211+ watchdog( 'recurring', 'Msg does not have a matching recurring record in civicrm_contribution_recur; requeueing for future processing.', WATCHDOG_NOTICE );
 212+ return 2;
 213+ }
 214+
 215+ civicrm_initialize(true);
 216+
 217+ $msg = recurring_normalize_msg( $msg );
 218+
 219+ // set the correct amount fields/data and do exchange rate conversions.
 220+ $msg = _queue2civicrm_normalize_contrib_amnts( $msg );
 221+
 222+ //dsm($contribution);
 223+ watchdog('queue2civicrm', 'Contribution:<pre>' . check_plain(print_r($msg, TRUE)) . '</pre>');
 224+
 225+ $contact[ 'id' ] = $recur_record->contact_id;
 226+
 227+ //insert the contribution
 228+ $contribution = _queue2civicrm_contribution_insert( $msg, $contact, $recur_record->id );
 229+
 230+ // Insert the contribution record.
 231+ if ( $msg_normalized[ 'contribution_tracking_update' ] ) {
 232+ // Map the tracking record to the CiviCRM contribution
 233+ _queue2civicrm_update_contribution_tracking( $msg, $contribution );
 234+ }
 235+
 236+ // update subscription record with next payment date
 237+ $query = "UPDATE {civicrm.civicrm_contribution_recur} SET next_sched_contribution='%s'";
 238+ db_query( $query, strtotime( "+" . $msg[ 'frequency_interval' ] . " " . $msg[ 'frequency_unit' ], $msg[ 'payment_date' ] ));
 239+
 240+ // construct an array of useful info to invocations of queue2civicrm_import
 241+ $contribution_info = array(
 242+ 'contribution_id' => $contribution['id'],
 243+ 'contact_id' => $contact['id'],
 244+ 'msg' => $msg,
 245+ );
 246+
 247+ // Send thank you email, other post-import things
 248+ module_invoke_all( 'queue2civicrm_import', $contribution_info );
 249+
 250+ return 1;
 251+}
 252+
 253+/**
 254+ * Import subscription account
 255+ *
 256+ * Routes different subscription message types to an appropriate handling
 257+ * function.
 258+ *
 259+ * @param $msg
 260+ * @return int
 261+ */
 262+function recurring_import_subscr_acct( $msg ) {
 263+ switch ( $msg[ 'txn_type' ] ) {
 264+ case 'subscr_signup':
 265+ $ret_val = recurring_import_subscr_signup( $msg );
 266+ break;
 267+
 268+ case 'subsr_cancel':
 269+ $ret_val = recurring_import_subscr_cancel( $msg );
 270+ break;
 271+
 272+ case 'subscr_eot':
 273+ $ret_val = recurring_import_subscr_eot( $msg );
 274+ break;
 275+
 276+ case 'subscr_modify':
 277+ $ret_val = recurring_import_subscr_modify( $msg );
 278+ break;
 279+
 280+ case 'subscr_failed':
 281+ $ret_val = recurring_import_subscr_failed( $msg );
 282+ break;
 283+
 284+ default:
 285+ watchdog( 'recurring', 'Invalid subscription message type.', print_r( $msg, true ), WATCHDOG_NOTICE );
 286+ $ret_val = 0;
 287+ break;
 288+ }
 289+
 290+ return $ret_val;
 291+}
 292+
 293+/**
 294+ * Import a subscription signup message
 295+ *
 296+ * @param $msg
 297+ * @return int
 298+ */
 299+function recurring_import_subscr_signup( $msg ) {
 300+ // ensure there is not already a record of this account - if so, mark the message as succesfuly processed
 301+ if ( $recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
 302+ watchdog( 'recurring', 'Subscription account already exists', print_r( $msg, true ), WATCHDOG_NOTICE );
 303+ return 1;
 304+ }
 305+
 306+ // create contact record
 307+ $contact = _queue2civicrm_contact_insert( $msg );
 308+
 309+ // Insert the location record
 310+ $location = _queue2civicrm_location_insert( $msg, $contact );
 311+
 312+ // Tag contact for review
 313+ $tag = _queu2civicrm_tag_insert( $contact );
 314+
 315+ // prepare query for inserting data to civicrm_contribution_recur
 316+ $query = "INSERT INTO {civicrm.civicrm_contribution_recur} (
 317+ contact_id,
 318+ amount,
 319+ frequency_unit,
 320+ frequency_interval,
 321+ installments,
 322+ start_date,
 323+ create_date,
 324+ trxn_id,
 325+ next_sched_contribution )
 326+ VALUES ( %d, %d, '%s', %d, '%s', '%s', '%s', '%s' )";
 327+
 328+ $result = db_query( $query,
 329+ $contact->id,
 330+ $msg[ 'amount' ],
 331+ $msg[ 'frequency_unit' ],
 332+ $msg[ 'frequency_interval' ],
 333+ $msg[ 'installments' ],
 334+ $msg[ 'start_date' ],
 335+ $msg[ 'create_date' ],
 336+ $msg[ 'subscr_id' ],
 337+ strtotime( "+" . $msg[ 'frequency_interval' ] . " " . $msg[ 'frequency_unit' ], $msg[ 'start_date' ] )
 338+ );
 339+
 340+ if ( !$result ) {
 341+ watchdog( 'recurring', 'Failed inserting subscriber signup for subscriber id ', print_r( $msg['subscr_id'], true), WATCHDOG_NOTICE );
 342+ return 0;
 343+ } else {
 344+ watchdog( 'recurring', 'Succesfully inserted subscription signup for subscriber id ', print_r( $msg[ 'subscr_id' ], true), WATCHDOG_NOTICE );
 345+ return 1;
 346+ }
 347+}
 348+
 349+/**
 350+ * Process a subscriber cancelation
 351+ *
 352+ * @param array $msg
 353+ * @return int
 354+ */
 355+function recurring_import_subscr_cancel( $msg ) {
 356+ // ensure we have a record of the subscription
 357+ if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
 358+ watchdog( 'recurring', 'Subscription account does not exist', print_r( $msg, true ), WATCHDOG_NOTICE );
 359+ return 2;
 360+ }
 361+
 362+ $query = "UPDATE {civicrm.civicrm_contribution_recur} SET cancel_date='%s', end_date='%s' WHERE trxn_id=%d";
 363+ $result = db_query( $query, $msg[ 'cancel_date' ], $msg[ 'end_date' ], $msg[ 'subscr_id' ] );
 364+ if ( !$result ) {
 365+ watchdog( 'recurring', 'There was a problem updating the subscription for cancelation for subscriber id ', print_r( $msg[ 'subscr_id' ], true ), WATCHDOG_NOTICE );
 366+ return 0;
 367+ } else {
 368+ watchdog( 'recurring', 'Succesfuly cacneled subscription for subscriber id ', print_r( $msg[ 'subsr_id' ], true ), WATCHDOG_NOTICE );
 369+ return 1;
 370+ }
 371+}
 372+
 373+/**
 374+ * Process an expired subscription
 375+ *
 376+ * @param array $msg
 377+ * @return int
 378+ */
 379+function recurring_import_subscr_eot( $msg ) {
 380+ // ensure we have a record of the subscription
 381+ if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
 382+ watchdog( 'recurring', 'Subscription account does not exist', print_r( $msg, true ), WATCHDOG_NOTICE );
 383+ return 2;
 384+ }
 385+
 386+ $query = "UPDATE {civicrm.civicrm_contribution_recur} SET end_date='%s' WHERE trxn_id=%d";
 387+ $result = db_query( $query, $msg[ 'end_date' ], $msg[ 'subscr_id' ] );
 388+ if ( !$result ) {
 389+ watchdog( 'recurring', 'There was a problem updating the subscription for EOT ', print_r( $msg[ 'subscr_id' ], true ), WATCHDOG_NOTICE );
 390+ return 0;
 391+ } else {
 392+ watchdog( 'recurring', 'Succesfuly ended subscription for subscriber id ', print_r( $msg[ 'subsr_id' ], true ), WATCHDOG_NOTICE );
 393+ return 1;
 394+ }
 395+}
 396+
 397+/**
 398+ * Process a subscription modification
 399+ *
 400+ * @param array $msg
 401+ * @return int
 402+ */
 403+function recurring_import_subscr_modify( $msg ) {
 404+ // ensure we have a record of the subscription
 405+ if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
 406+ watchdog( 'recurring', 'Subscription account does not exist', print_r( $msg, true ), WATCHDOG_NOTICE );
 407+ return 2;
 408+ }
 409+
 410+ // prepare query for updating data to civicrm_contribution_recur
 411+ $query = "UPDATE {civicrm.civicrm_contribution_recur} SET
 412+ SET
 413+ amount=%d,
 414+ frequency_unit='%s',
 415+ frequency_interval=%d,
 416+ modified_date='%s',
 417+ next_sched_contribution='%s'
 418+ WHERE
 419+ trxn_id=%d";
 420+
 421+ $result = db_query( $query,
 422+ $msg[ 'amount' ],
 423+ $msg[ 'frequency_unit' ],
 424+ $msg[ 'frequency_interval' ],
 425+ $msg[ 'modified_date' ],
 426+ strtotime( "+" . $msg[ 'frequency_interval' ] . " " . $msg[ 'frequency_unit' ], $msg[ 'start_date' ] ),
 427+ $msg[ 'subscr_id' ]
 428+ );
 429+
 430+ if ( !$result ) {
 431+ watchdog( 'recurring', 'There was a problem updating the subscription record for subscription id ', print_r( $msg['subscr_id'], true), WATCHDOG_NOTICE );
 432+ return 0;
 433+ }
 434+
 435+ // update the contact
 436+ $contact = _queue2civicrm_contact_update( $msg, $recur_record->contact_id );
 437+
 438+ // Insert the location record
 439+ $location = _queue2civicrm_location_update( $msg, $contact );
 440+
 441+ // Tag contact for review
 442+ $tag = _queu2civicrm_tag_insert( $contact );
 443+
 444+ watchdog( 'recurring', 'Subscription succesfully modified for subscription id ', print_r( $msg['subscr_id'], true ), WATCHDOG_NOTICE );
 445+ return 1;
 446+}
 447+
 448+/**
 449+ * Process failed subscription payment
 450+ * @param $msg
 451+ * @return unknown_type
 452+ */
 453+function recurring_import_subscr_failed( $msg ) {
 454+ // ensure we have a record of the subscription
 455+ if ( !$recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] ) ) {
 456+ watchdog( 'recurring', 'Subscription account does not exist', print_r( $msg, true ), WATCHDOG_NOTICE );
 457+ return 2;
 458+ }
 459+
 460+ $query = "UPDATE {civicrm.civicrm_contribution_recur} SET failure_count=%d, failure_retry_date='%s' WHERE trxn_id=%d";
 461+ $result = db_query( $query, $msg[ 'failure_count' ], $msg[ 'failure_retery_date' ], $msg[ 'subscr_id' ] );
 462+ if ( !$result ) {
 463+ watchdog( 'recurring', 'There was a problem updating the subscription for failed payment for subscriber id ', print_r( $msg[ 'subscr_id' ], true ), WATCHDOG_NOTICE );
 464+ return 0;
 465+ } else {
 466+ watchdog( 'recurring', 'Succesfuly cacneled subscription for failed payment for subscriber id ', print_r( $msg[ 'subsr_id' ], true ), WATCHDOG_NOTICE );
 467+ return 1;
 468+ }
 469+}
 470+
 471+/**
 472+ * Get recurring record in CiviCRM for the given subscriber id
 473+ * @param int $subscr_id
 474+ * @return object
 475+ */
 476+function recurring_get_recur_record( $subscr_id ) {
 477+ $query = "SELECT * FROM civicrm.civicrm_contribution_recur WHERE trxn_id = %d";
 478+ $recur_record = db_fetch_object( db_query( $query, $subscr_id ) );
 479+ watchdog( 'recurring', 'Recurring record: ', print_r( $recur_record, true ), WATCHDOG_DEBUG );
 480+ return $recur_record;
 481+}
 482+
 483+/**
 484+ * Convert queued message to a standardized format
 485+ *
 486+ * This is mostly wrapper to ensure that all necessary normalization occurs
 487+ * on the message.
 488+ *
 489+ * @param array $msgnginx syslog
 490+ * @return array
 491+ */
 492+function recurring_normalize_msg( $msg ) {
 493+ // Decode the message body.
 494+ if ( !is_array( $msg ) ) {
 495+ $msg = json_decode($msg->body, true);
 496+ }
 497+
 498+ // we can safely assume we have a raw msg from paypal if contribution_tracking_id isn't set
 499+ if ( !isset( $msg[ 'contribution_tracking_id' ]) ) {
 500+ $msg = recurring_normalize_paypal_msg( $msg );
 501+ }
 502+
 503+ $msg = _queue2civicrm_normalize_msg( $msg );
 504+ return $msg;
 505+}
 506+
 507+/**
 508+ * Normalize raw PayPal message
 509+ *
 510+ * It is possible that we'll get a raw message from PayPal. If that is the
 511+ * case, this will convert the raw PayPal message to our normalized format.
 512+ *
 513+ * This is large and unwieldly.
 514+ *
 515+ * @param $msg
 516+ * @return array
 517+ */
 518+function recurring_normalize_paypal_msg( $msg ) {
 519+ $msg_normalized = array();
 520+
 521+ $timestamp = ( !isset( $msg[ 'payment_date' ] ) || !strlen( $msg[ 'payment_date' ] )) ? strtotime( "now" ) : strtotime( $msg[ 'payment_date' ] );
 522+
 523+
 524+ // the subscription id
 525+ $msg_normalized[ 'subscr_id' ] = $msg[ 'subscr_id' ];
 526+ $msg_normalized[ 'txn_type' ] = $msg[ 'txn_type' ];
 527+ $msg_normalized[ 'contribution_tracking_id' ] = $msg[ 'custom' ];
 528+ $msg_normalized[ 'email' ] = $msg[ 'payer_email' ];
 529+
 530+ // Premium info
 531+ if ( isset( $msg[ 'option_selection1' ] ) && !is_numeric( $msg[ 'option_selection1' ] ) ) {
 532+ $msg_normalized[ 'size' ] = $msg[ 'option_selection1' ];
 533+ $msg_normalized[ 'premium_language' ] = $msg[ 'option_selection2' ];
 534+ }
 535+
 536+ // Contact info
 537+ if ( $msg[ 'txn_type' ] == 'subscr_signup' || $msg[ 'txn_type' ] == 'subscr_modify' ) {
 538+ $msg_normalized[ 'first_name' ] = $msg[ 'first_name' ];
 539+ $msg_normalized[ 'last_name' ] = $msg[ 'last_name' ];
 540+ $split = split("\n", str_replace("\r", '', $msg[ 'address_street' ]));
 541+ $msg_normalized[ 'street_address' ] = $split[0];
 542+ $msg_normalized[ 'supplemental_address_1' ] = $split[1];
 543+ $msg_normalized[ 'city' ] = $msg[ 'address_city' ];
 544+ $msg_normalized[ 'state_province' ] = $msg[ 'address_state' ];
 545+ $msg_normalized[ 'country' ] = $msg[ 'address_country_code' ];
 546+ $msg_normalized[ 'postal_code' ] = $msg[ 'address_zip' ];
 547+
 548+ // Shipping info (address same as above since PayPal only passes 1 address)
 549+ $split = split(" ", $msg[ 'address_name' ]);
 550+ $msg_normalized[ 'last_name_2' ] = array_pop($split);
 551+ $msg_normalized[ 'first_name_2' ] = implode(" ", $split);
 552+ $split = split("\n", str_replace("\r", '', $msg[ 'address_street' ]));
 553+ $msg_normalized[ 'street_address_2' ] = $split[0];
 554+ $msg_normalized[ 'supplemental_address_2' ] = $split[1];
 555+ $msg_normalized[ 'city_2' ] = $msg[ 'address_city' ];
 556+ $msg_normalized[ 'state_province_2' ] = $msg[ 'address_state' ];
 557+ $msg_normalized[ 'country_2' ] = $msg[ 'address_country_code' ];
 558+ $msg_normalized[ 'postal_code_2' ] = $msg[ 'address_zip' ];
 559+ }
 560+
 561+ // payment-specific message handling
 562+ if ( $msg[ 'txn_type' ] == 'subscr_payment' ) {
 563+ // default to not update contribution tracking data
 564+ $msg_normalized[ 'contribution_tracking_update' ] = false;
 565+
 566+ // get the database connection to the tracking table
 567+ $query = "SELECT * FROM {contribution_tracking} WHERE id=%d";
 568+ $result = db_query( $query, $msg[ 'custom' ] );
 569+ if ( $tracking_data = db_fetch_object( $result ) ) {
 570+ // if we don't have a contribution id in the tracking data, we need to update
 571+ if ( !$tracking_data[ 'contribution_id' ] || !is_numeric( $tracking_data[ 'contribution_id' ] ) ) {
 572+ $msg_normalized[ 'contribution_tracking_update' ] = true;
 573+ $msg_normalized[ 'optout' ] = $tracking_data[ 'optout' ];
 574+ $msg_normalized[ 'anonymous' ] = $tracking_data[ 'anonymous' ];
 575+ $msg_normalized[ 'comment' ] = $tracking_data[ 'note' ];
 576+ }
 577+ } else {
 578+ watchdog( 'recurring', "There is no contribution tracking id associated with this message.", array(), WATCHDOG_NOTICE );
 579+ }
 580+
 581+ $msg_normalized[ 'gateway' ] = ( strlen( $msg[ 'gateway' ] )) ? $msg[ 'gateway' ] : 'paypal';
 582+ $msg_normalized[ 'gateway_txn_id' ] = $msg[ 'txn_id' ];
 583+ $msg_normalized[ 'original_currency' ] = $msg[ 'mc_currency' ];
 584+ $msg_normalized[ 'original_gross' ] = $msg[ 'mc_gross' ];
 585+ $msg_normalized[ 'fee' ] = $msg[ 'mc_fee' ];
 586+ $msg_normalized[ 'gross' ] = $msg[ 'mc_gross' ];
 587+ $msg_normalized[ 'net' ] = $msg_normalized[ 'gross' ] - $msg_normalized[ 'fee' ];
 588+ $msg_normalized[ 'payment_date' ] = strtotime( $msg[ 'payment_date' ] );
 589+ } else {
 590+
 591+ // break the period out for civicrm
 592+ if( isset( $msg[ 'period3' ] ) ) {
 593+ // map paypal period unit to civicrm period units
 594+ $period_map = array(
 595+ 'm' => 'month',
 596+ 'd' => 'day',
 597+ 'w' => 'week',
 598+ 'y' => 'year',
 599+ );
 600+
 601+ $period = explode( " ", $msg[ 'period3' ] );
 602+ $msg_normalized[ 'frequency_interval' ] = $period[0];
 603+ $msg_normalized[ 'frequency_unit' ] = $period_map[ strtolower( $period[1] ) ];
 604+ }
 605+
 606+ if ( isset( $msg[ 'recur_times' ] ) ) {
 607+ $msg_normalized[ 'installments' ] = $mst[ 'recur_times' ];
 608+ }
 609+
 610+ if ( isset( $msg[ 'amount3' ] ) ) {
 611+ $msg_normalized[ 'amount' ] = $msg[ 'amount3' ];
 612+ }
 613+
 614+ if ( isset( $msg[ 'subscr_date' ] ) ) {
 615+ if ( $msg[ 'txn_type' ] == 'subscr_signup' ) {
 616+ $msg[ 'create_date' ] = strtotime( $msg[ 'subscr_date' ] );
 617+ $msg[ 'start_date' ] = strtotime( $msg[ 'subscr_date' ] );
 618+ } elseif( $msg[ 'txn_type' ] == 'subscr_cancel' ) {
 619+ $msg[ 'cancel_date' ] = strtotime( $msg[ 'subscr_date' ] );
 620+ }
 621+ }
 622+
 623+ if ( $msg[ 'txn_type' ] == 'subscr_modify' ) {
 624+ $msg[ 'modified_date' ] = $msg[ 'subscr_effective' ];
 625+ }
 626+
 627+ if ( $msg[ 'txn_type' ] == 'subscr_failed' ) {
 628+ $recur_record = recurring_get_recur_record( $msg[ 'subscr_id' ] );
 629+ $msg_normalized[ 'failure_count' ] = $recur_record->failure_count + 1;
 630+ $msg_normalized[ 'failure_retry_date' ] = strtotime( $msg[ 'retry_at' ] );
 631+ }
 632+ }
 633+
 634+ $msg_normalized[ 'date' ] = $timestamp;
 635+
 636+ return $msg_normalized;
 637+}
\ No newline at end of file
Index: civicrm/trunk/sites/all/modules/queue2civicrm/recurring/recurring.install
@@ -0,0 +1,10 @@
 2+<?php
 3+
 4+function recurring_update_6100() {
 5+ $items = array();
 6+
 7+ // add index on trxn_id (which is mapped to subscr_id
 8+ $items[] = update_sql( "CREATE INDEX {civicrm.civicrm_contribution_recur}_trxn_id_idx ON {civicrm.civicrm_contribution_recur} (trxn_id)" );
 9+
 10+ return $items;
 11+}
\ No newline at end of file
Index: civicrm/trunk/sites/all/modules/queue2civicrm/recurring/recurring.info
@@ -0,0 +1,5 @@
 2+name = Recurring Payments
 3+description = Adds support for recurring payments to queue2civicrm
 4+dependencies[] = queue2civicrm
 5+package = queue2civicrm
 6+core = 6.x
\ No newline at end of file
Index: civicrm/trunk/sites/all/modules/queue2civicrm/queue2civicrm_common.inc
@@ -0,0 +1,325 @@
 2+<?php
 3+/**
 4+ * Common functions for queue2civicrm modules
 5+ */
 6+
 7+/**
 8+ * Connect using the Stomp library.
 9+ */
 10+function _queue2civicrm_stomp_connection() {
 11+ static $con = NULL;
 12+
 13+ if (!isset($con)) {
 14+ require_once drupal_get_path('module', 'queue2civicrm') . '/Stomp.php';
 15+ watchdog('queue2civicrm', 'Attempting connection to queue server: ' . variable_get('queue2civicrm_url', 'tcp://localhost:61613'));
 16+
 17+ $con = new Stomp(variable_get('queue2civicrm_url', 'tcp://localhost:61613'));
 18+
 19+ try {
 20+ $con->connect();
 21+ register_shutdown_function('_queue2civicrm_stomp_disconnect');
 22+ }
 23+ catch (Stomp_Exception $e) {
 24+ $con = FALSE;
 25+ watchdog('queue2civicrm', 'Queue connection failed: ' . $e->getMessage(), array(), WATCHDOG_ERROR);
 26+ }
 27+ }
 28+
 29+ return $con;
 30+}
 31+
 32+/**
 33+ * Disconnect. Only used as a shutdown function.
 34+ */
 35+function _queue2civicrm_stomp_disconnect() {
 36+ $con = _queue2civicrm_stomp_connection();
 37+ $con->disconnect();
 38+}
 39+
 40+/**
 41+ * Normalize the queued message
 42+ *
 43+ * Decodes the message and updates some of the message fields in ways
 44+ * that are consistent with how we need to insert data into Civi.
 45+ * This should be useful by other modules processing contribution messages
 46+ * out of the queue.
 47+ *
 48+ * @param mixed $msg
 49+ * @return array
 50+ */
 51+function _queue2civicrm_normalize_msg( $msg ) {
 52+ // Decode the message body.
 53+ if ( !is_array( $msg ) ) {
 54+ $msg = json_decode($msg->body, true);
 55+ }
 56+
 57+ // hack to ignore msgs in the queue w/o a contribution tracking id
 58+ if ( !isset( $msg[ 'contribution_tracking_id' ] )) {
 59+ watchdog( 'queue2civicrm', 'Contribution missing contribution_tracking_id' );
 60+ return FALSE;
 61+ }
 62+
 63+ // Convert times to Unix timestamps.
 64+ if (!is_integer($msg['date'])) {
 65+ $msg['date'] = strtotime($msg['date']);
 66+ }
 67+
 68+ watchdog('queue2civicrm', 'Contribution (pre-conversion):<pre>' . check_plain(print_r($msg, TRUE)) . '</pre>');
 69+
 70+ $msg['gateway_txn_id'] .= ' ' . time();
 71+
 72+ return $msg;
 73+}
 74+
 75+/**
 76+ * Normalize contribution amounts
 77+ *
 78+ * Do exchange rate conversions and set appropriate fields for CiviCRM
 79+ * based on information contained in the message.
 80+ *
 81+ * @param $msg
 82+ * @return array
 83+ */
 84+function _queue2civicrm_normalize_contrib_amnts( $msg ) {
 85+ // round the amounts and do exchange rate conversion
 86+ $msg['fee'] = round( exchange_rate_convert($msg['original_currency'], $msg['fee'], $msg['date']), 2 );
 87+ $msg['gross'] = round( exchange_rate_convert($msg['original_currency'], $msg['gross'], $msg['date']), 2 );
 88+ $msg['net'] = round( exchange_rate_convert($msg['original_currency'], $msg['net'], $msg['date']), 2 );
 89+
 90+ return $msg;
 91+}
 92+
 93+/**
 94+ * Insert the contact record
 95+ *
 96+ * Serves as a standard way for message porcessors to handle contact
 97+ * insertion.
 98+ *
 99+ * @param array $msg
 100+ * @return array
 101+ */
 102+function _queue2civicrm_contact_insert( $msg ) {
 103+ require_once 'api/v2/Contact.php';
 104+
 105+ // Insert the contact record
 106+ $contact = array(
 107+ 'id' => NULL,
 108+ 'contact_type' => 'Individual',
 109+ 'first_name' => $msg['first_name'],
 110+ 'middle_name' => $msg['middle_name'],
 111+ 'last_name' => $msg['last_name'],
 112+ 'do_not_trade' => ($msg['anonymous'] ? 1 : 0 ),
 113+ 'contact_source' => 'online donation'
 114+ );
 115+
 116+ // Honor the opt-out checkbox, if present
 117+ if (array_key_exists('optout', $msg)) {
 118+ $contact['is_opt_out'] = $msg['optout'];
 119+ }
 120+ $contact['sort_name'] = trim($contact['last_name'] . ', ' . $contact['first_name'], ' ,');
 121+ $contact['display_name'] = trim($contact['first_name'] . ' ' . $contact['last_name']);
 122+ $contact_result = &civicrm_contact_add( $contact );
 123+
 124+ return $contact_result;
 125+}
 126+
 127+/**
 128+ * Update the contact record
 129+ *
 130+ * Serves as a standard way for message porcessors to handle contact
 131+ * updates.
 132+ *
 133+ * @param array $msg
 134+ * @return array
 135+ */
 136+function _queue2civicrm_contact_update( $msg, $contact_id ) {
 137+ require_once 'api/v2/Contact.php';
 138+
 139+ // Insert the contact record
 140+ $contact = array(
 141+ 'id' => $contact_id,
 142+ 'first_name' => $msg['first_name'],
 143+ 'middle_name' => $msg['middle_name'],
 144+ 'last_name' => $msg['last_name'],
 145+ );
 146+
 147+ // Honor the opt-out checkbox, if present
 148+ if (array_key_exists('optout', $msg)) {
 149+ $contact['is_opt_out'] = $msg['optout'];
 150+ }
 151+ $contact['sort_name'] = trim($contact['last_name'] . ', ' . $contact['first_name'], ' ,');
 152+ $contact['display_name'] = trim($contact['first_name'] . ' ' . $contact['last_name']);
 153+ $contact_result = &civicrm_contact_add( $contact );
 154+
 155+ return $contact_result;
 156+}
 157+
 158+/**
 159+ * Insert the location record
 160+ *
 161+ * Serves as a standard way for message porcessors to handle location
 162+ * insertion.
 163+ *
 164+ * @param array $msg
 165+ * @param array $contact
 166+ * @return array
 167+ */
 168+function _queue2civicrm_location_insert( $msg, $contact ) {
 169+ require_once 'api/v2/Location.php';
 170+
 171+ $address = array(
 172+ 'contact_id' => $contact['id'],
 173+ 'location_type' => 'Home',
 174+ 'street_address' => $msg['street_address'],
 175+ 'supplemental_address_1' => $msg['supplemental_address_1'],
 176+ 'city' => $msg['city'],
 177+ 'state_province' => _queue2civicrm_get_state( $msg[ 'country' ], $msg['state_province'] ),
 178+ 'postal_code' => $msg['postal_code'],
 179+ 'country' => $msg['country'],
 180+ 'is_primary' => 1,
 181+ 'is_billing' => 1,
 182+ 'email' => $msg['email']
 183+ );
 184+ $location_result = &civicrm_location_add( $address );
 185+
 186+ return $location_result;
 187+}
 188+
 189+/**
 190+ * Update the location record
 191+ *
 192+ * Serves as a standard way for message porcessors to handle location
 193+ * updates.
 194+ *
 195+ * @param array $msg
 196+ * @param array $contact
 197+ * @return array
 198+ */
 199+function _queue2civicrm_location_update( $msg, $contact ) {
 200+ require_once 'api/v2/Location.php';
 201+
 202+ $address = array(
 203+ 'contact_id' => $contact['id'],
 204+ 'street_address' => $msg['street_address'],
 205+ 'supplemental_address_1' => $msg['supplemental_address_1'],
 206+ 'city' => $msg['city'],
 207+ 'state_province' => _queue2civicrm_get_state( $msg[ 'country' ], $msg['state_province'] ),
 208+ 'postal_code' => $msg['postal_code'],
 209+ 'country' => $msg['country'],
 210+ 'email' => $msg['email']
 211+ );
 212+ $location_result = &civicrm_location_update( $address );
 213+
 214+ return $location_result;
 215+}
 216+
 217+/**
 218+ * Insert the contribution record
 219+ *
 220+ * Serves as a standard way for message porcessors to handle contribution
 221+ * insertion.
 222+ *
 223+ * @param array $msg
 224+ * @param array $contact
 225+ * @return array
 226+ */
 227+function _queue2civicrm_contribution_insert( $msg, $contact, $recur_id=NULL ) {
 228+ require_once 'api/v2/Contribute.php';
 229+
 230+ // Insert the contribution record
 231+ $contribution = array(
 232+ 'contact_id' => $contact['id'],
 233+ 'total_amount' => $msg['gross'],
 234+ 'contribution_type_id' => 9, // cash donation @fixme this needs to be pulled from a variable that was available with fundcore, but no longer exists
 235+ 'payment_instrument_id' => 6, // contribution @fixme see above
 236+ 'fee_amount' => $msg['fee'],
 237+ 'net_amount' => $msg['net'],
 238+ 'trxn_id' => strtoupper($msg['gateway']) . ' ' . $msg['gateway_txn_id'],
 239+ 'receive_date' => date('Y-m-d H:i:s', $msg['date']),
 240+ 'currency' => 'USD',
 241+ 'source' => $msg['original_currency'] . ' ' . $msg['original_gross'],
 242+ 'contribution_recur_id' => $recur_id
 243+ );
 244+
 245+ /**
 246+ * Apply custom field defaults.
 247+ *
 248+ * Civicrm API v2 does NOT provide methods for custom groups/fields beyond creation and deleition,
 249+ * so we hack this cusotm. Hopefully this won't be forever...
 250+ *
 251+ * At least we can use the CiviCRM DAO stuff to stuff necessary custom data in to the contribution object.
 252+ *
 253+ * At the moment, all custom fields get default values for online contribution EXCEPT for 'Donor Comment'.
 254+ */
 255+ $query = "SELECT id, custom_group_id, label, default_value FROM civicrm_custom_field WHERE custom_group_id IN (SELECT id FROM civicrm_custom_group WHERE extends='Contribution' && is_active='1');";
 256+ $dao = CRM_Core_DAO::executeQuery( $query ); // Execute's query using CiviCRM data object stuff
 257+ while ( $dao->fetch() ) {
 258+ if ( $dao->label == 'Donor Comment' ) {
 259+ $comment = ( $msg[ 'comment' ] ) ? $msg[ 'comment'] : '';
 260+ $contribution[ 'custom_' . $dao->id ] = $comment;
 261+ } elseif ( $dao->default_value ) { // if we dont make sure $dao->default_value has some value, Civi breaks when we try to insert
 262+ $contribution[ 'custom_' . $dao->id ] = $dao->default_value;
 263+ }
 264+ }
 265+
 266+ watchdog( 'queue2civicrm', 'Contribution array for civicrm_contribution_add(): ' . print_r($contribution, TRUE));
 267+
 268+ $contribution_result = civicrm_contribution_add( $contribution );
 269+
 270+ watchdog( 'queue2civicrm', 'Contribution result from civicrm_contribution_add(): ' . print_r($contribution_result, TRUE));
 271+
 272+ return $contribution_result;
 273+}
 274+
 275+/**
 276+ * Map contribution to tracking record in contribution_tracking table
 277+ *
 278+ * @param array $msg
 279+ * @param array $contribution
 280+ */
 281+function _queue2civicrm_update_contribution_tracking( $msg, $contribution ) {
 282+ if (array_key_exists( 'contribution_tracking_id', $msg )) {
 283+ db_query('UPDATE {contribution_tracking} SET contribution_id = %d WHERE id = %d LIMIT 1', $contribution['id'], $msg['contribution_tracking_id']);
 284+ }
 285+}
 286+
 287+/**
 288+ * Insert the tag record
 289+ *
 290+ * Serves as a standard way for message porcessors to handle tag
 291+ * insertion.
 292+ *
 293+ * @param array $contact
 294+ * @return array
 295+ */
 296+function _queue2civicrm_tag_insert( $contact ) {
 297+ require_once 'api/v2/EntityTag.php';
 298+
 299+ $tag = array(
 300+ 'tag_id' => 7, // review tag @fixme should this also be variable?
 301+ 'contact_id' => $contact['id']
 302+ );
 303+ $tag_result = &civicrm_entity_tag_add( $tag );
 304+ return $tag_result;
 305+}
 306+
 307+/**
 308+ * Find correct state for insertion
 309+ *
 310+ * When passing CiviCRM a state abbreviation, odd things can happen - like getting the right abbreviation, but the wrong state
 311+ * So we'll pull back the correct state/province name based off of a user's country/state abbreviation
 312+ */
 313+function _queue2civicrm_get_state( $country, $state ) {
 314+
 315+ if ( strlen( $country ) == 2 ) {
 316+ $query = "SELECT s.name AS state_name FROM civicrm_country c, civicrm_state_province s WHERE s.country_id=c.id AND c.iso_code='" . addslashes( $country ) . "' AND s.abbreviation='" . addslashes( $state ) . "'";
 317+ } else {
 318+ $query = "SELECT s.name AS state_name FROM civicrm_country c, civicrm_state_province s WHERE s.country_id=c.id AND c.name='" . addslashes( $country ) . "' AND s.abbreviation='" . addslashes( $state ) . "'";
 319+ }
 320+ $dao = CRM_Core_DAO::executeQuery( $query );
 321+ while ( $dao->fetch() ) {
 322+ $state = ( !is_null( $dao->state_name ) ) ? $dao->state_name : $state;
 323+ }
 324+
 325+ return $state;
 326+}
\ No newline at end of file
Index: civicrm/trunk/sites/all/modules/queue2civicrm/queue2civicrm.module
@@ -1,6 +1,7 @@
22 <?php
 3+// include common functions
 4+require_once( drupal_get_path( 'module', 'queue2civicrm' ) . '/queue2civicrm_common.inc' );
35
4 -
56 /**
67 * Implementation of hook_menu().
78 */
@@ -131,6 +132,9 @@
132133 }
133134 }
134135
 136+ // allow for other modules to handle their own related batch processing
 137+ module_invoke_all( 'queue2civicrm_batch_process', $processed );
 138+
135139 if ($processed > 0) {
136140 watchdog('queue2civicrm', 'Successfully processed ' . $processed . ' contribution(s).');
137141 }
@@ -140,39 +144,6 @@
141145 }
142146
143147 /**
144 - * Connect using the Stomp library.
145 - */
146 -function _queue2civicrm_stomp_connection() {
147 - static $con = NULL;
148 -
149 - if (!isset($con)) {
150 - require_once drupal_get_path('module', 'queue2civicrm') . '/Stomp.php';
151 - watchdog('queue2civicrm', 'Attempting connection to queue server: ' . variable_get('queue2civicrm_url', 'tcp://localhost:61613'));
152 -
153 - $con = new Stomp(variable_get('queue2civicrm_url', 'tcp://localhost:61613'));
154 -
155 - try {
156 - $con->connect();
157 - register_shutdown_function('_queue2civicrm_stomp_disconnect');
158 - }
159 - catch (Stomp_Exception $e) {
160 - $con = FALSE;
161 - watchdog('queue2civicrm', 'Queue connection failed: ' . $e->getMessage(), array(), WATCHDOG_ERROR);
162 - }
163 - }
164 -
165 - return $con;
166 -}
167 -
168 -/**
169 - * Disconnect. Only used as a shutdown function.
170 - */
171 -function _queue2civicrm_stomp_disconnect() {
172 - $con = _queue2civicrm_stomp_connection();
173 - $con->disconnect();
174 -}
175 -
176 -/**
177148 * Remove one item from the queue and process it.
178149 */
179150 function queue2civicrm_dequeue() {
@@ -209,132 +180,30 @@
210181 */
211182 function queue2civicrm_import( $msg ) {
212183 civicrm_initialize(true);
213 - require_once 'api/v2/Contact.php';
214 - require_once 'api/v2/Location.php';
215 - require_once 'api/v2/Contribute.php';
216 - require_once 'api/v2/EntityTag.php';
217184
218 - // Decode the message body.
219 - if ( !is_array( $msg ) ) {
220 - $msg = (array) json_decode($msg->body);
221 - }
222 -
223 - // hack to ignore msgs in the queue w/o a contribution tracking id
224 - if ( !isset( $msg[ 'contribution_tracking_id' ] )) {
225 - watchdog( 'queue2civicrm', 'Contribution missing contribution_tracking_id' );
226 - return FALSE;
227 - }
228 -
229 - // Convert times to Unix timestamps.
230 - if (!is_integer($msg['date'])) {
231 - $msg['date'] = strtotime($msg['date']);
232 - }
 185+ $msg = _queue2civicrm_normalize_msg( $msg );
233186
234 - watchdog('queue2civicrm', 'Contribution (pre-conversion):<pre>' . check_plain(print_r($msg, TRUE)) . '</pre>');
235 -
236 - // round the amounts and do exchange rate conversion
237 - $msg['fee'] = round( exchange_rate_convert($msg['original_currency'], $msg['fee'], $msg['date']), 2 );
238 - $msg['gross'] = round( exchange_rate_convert($msg['original_currency'], $msg['gross'], $msg['date']), 2 );
239 - $msg['net'] = round( exchange_rate_convert($msg['original_currency'], $msg['net'], $msg['date']), 2 );
240 -
241 - $msg['gateway_txn_id'] .= ' ' . time();
242 -
 187+ // set the correct amount fields/data and do exchange rate conversions.
 188+ $msg = _queue2civicrm_normalize_contrib_amnts( $msg );
 189+
243190 //dsm($contribution);
244191 watchdog('queue2civicrm', 'Contribution:<pre>' . check_plain(print_r($msg, TRUE)) . '</pre>');
245192
246 - // Insert the contact record
247 - $contact = array(
248 - 'id' => NULL,
249 - 'contact_type' => 'Individual',
250 - 'first_name' => $msg['first_name'],
251 - 'middle_name' => $msg['middle_name'],
252 - 'last_name' => $msg['last_name'],
253 - 'do_not_trade' => ($msg['anonymous'] ? 1 : 0 ),
254 - 'contact_source' => 'online donation'
255 - );
256 - // Honor the opt-out checkbox, if present
257 - if (array_key_exists('optout', $msg)) {
258 - $contact['is_opt_out'] = $msg['optout'];
259 - }
260 - $contact['sort_name'] = trim($contact['last_name'] . ', ' . $contact['first_name'], ' ,');
261 - $contact['display_name'] = trim($contact['first_name'] . ' ' . $contact['last_name']);
262 - $contact_result = &civicrm_contact_add( $contact );
263 - $contact['id'] = $contact_result['contact_id'];
 193+ // insert the contact information
 194+ $contact = _queue2civicrm_contact_insert( $msg );
264195
265 -
266196 // Insert the location record
267 - $address = array(
268 - 'contact_id' => $contact['id'],
269 - 'location_type' => 'Home',
270 - 'street_address' => $msg['street_address'],
271 - 'supplemental_address_1' => $msg['supplemental_address_1'],
272 - 'city' => $msg['city'],
273 - 'state_province' => queue2civicrm_get_state( $msg[ 'country' ], $msg['state_province'] ),
274 - 'postal_code' => $msg['postal_code'],
275 - 'country' => $msg['country'],
276 - 'is_primary' => 1,
277 - 'is_billing' => 1,
278 - 'email' => $msg['email']
279 - );
280 - $location_result = &civicrm_location_add( $address );
281 -
282 -
283 - // Insert the contribution record
284 - $contribution = array(
285 - 'contact_id' => $contact['id'],
286 - 'total_amount' => $msg['gross'],
287 - 'contribution_type_id' => 9, // cash donation @fixme this needs to be pulled from a variable that was available with fundcore, but no longer exists
288 - 'payment_instrument_id' => 6, // contribution @fixme see above
289 - 'fee_amount' => $msg['fee'],
290 - 'net_amount' => $msg['net'],
291 - 'trxn_id' => strtoupper($msg['gateway']) . ' ' . $msg['gateway_txn_id'],
292 - 'receive_date' => date('Y-m-d H:i:s', $msg['date']),
293 - 'currency' => 'USD',
294 - 'source' => $msg['original_currency'] . ' ' . $msg['original_gross'],
295 - );
296 -
297 - /**
298 - * Apply custom field defaults.
299 - *
300 - * Civicrm API v2 does NOT provide methods for custom groups/fields beyond creation and deleition,
301 - * so we hack this cusotm. Hopefully this won't be forever...
302 - *
303 - * At least we can use the CiviCRM DAO stuff to stuff necessary custom data in to the contribution object.
304 - *
305 - * At the moment, all custom fields get default values for online contribution EXCEPT for 'Donor Comment'.
306 - */
307 - $query = "SELECT id, custom_group_id, label, default_value FROM civicrm_custom_field WHERE custom_group_id IN (SELECT id FROM civicrm_custom_group WHERE extends='Contribution' && is_active='1');";
308 - $dao = CRM_Core_DAO::executeQuery( $query ); // Execute's query using CiviCRM data object stuff
309 - while ( $dao->fetch() ) {
310 - if ( $dao->label == 'Donor Comment' ) {
311 - $comment = ( $msg[ 'comment' ] ) ? $msg[ 'comment'] : '';
312 - $contribution[ 'custom_' . $dao->id ] = $comment;
313 - } elseif ( $dao->default_value ) { // if we dont make sure $dao->default_value has some value, Civi breaks when we try to insert
314 - $contribution[ 'custom_' . $dao->id ] = $dao->default_value;
315 - }
316 - }
317 -
318 - watchdog( 'queue2civicrm', 'Contribution array for civicrm_contribution_add(): ' . print_r($contribution, TRUE));
 197+ $location = _queue2civicrm_location_insert( $msg, $contact );
319198
320 - $contribution_result = civicrm_contribution_add( $contribution );
321 -
322 - watchdog( 'queue2civicrm', 'Contribution result from civicrm_contribution_add(): ' . print_r($contribution_result, TRUE));
323 -
324 - $contribution['id'] = $contribution_result['id'];
325 -
 199+ // Insert the contribution record.
 200+ $contribution = _queue2civicrm_contribution_insert( $msg, $contact );
326201
 202+ // Tag contact for review
 203+ $tag = _queu2civicrm_tag_insert( $contact );
 204+
327205 // Map the tracking record to the CiviCRM contribution
328 - if (array_key_exists('contribution_tracking_id', $msg)) {
329 - db_query('UPDATE {contribution_tracking} SET contribution_id = %d WHERE id = %d LIMIT 1', $contribution['id'], $msg['contribution_tracking_id']);
330 - }
 206+ _queue2civicrm_update_contribution_tracking( $msg, $contribution );
331207
332 - // Tag contact for review
333 - $tag = array(
334 - 'tag_id' => 7, // review tag @fixme should this also be variable?
335 - 'contact_id' => $contact['id']
336 - );
337 - $tag_result = &civicrm_entity_tag_add( $tag );
338 -
339208 // construct an array of useful info to invocations of queue2civicrm_import
340209 $contribution_info = array(
341210 'contribution_id' => $contribution['id'],
@@ -345,33 +214,10 @@
346215 // Send thank you email, other post-import things
347216 module_invoke_all( 'queue2civicrm_import', $contribution_info );
348217
349 -
350218 return TRUE;
351 -
352219 }
353220
354221 /**
355 - * Find correct state for insertion
356 - *
357 - * When passing CiviCRM a state abbreviation, odd things can happen - like getting the right abbreviation, but the wrong state
358 - * So we'll pull back the correct state/province name based off of a user's country/state abbreviation
359 - */
360 -function queue2civicrm_get_state( $country, $state ) {
361 -
362 - if ( strlen( $country ) == 2 ) {
363 - $query = "SELECT s.name AS state_name FROM civicrm_country c, civicrm_state_province s WHERE s.country_id=c.id AND c.iso_code='" . addslashes( $country ) . "' AND s.abbreviation='" . addslashes( $state ) . "'";
364 - } else {
365 - $query = "SELECT s.name AS state_name FROM civicrm_country c, civicrm_state_province s WHERE s.country_id=c.id AND c.name='" . addslashes( $country ) . "' AND s.abbreviation='" . addslashes( $state ) . "'";
366 - }
367 - $dao = CRM_Core_DAO::executeQuery( $query );
368 - while ( $dao->fetch() ) {
369 - $state = ( !is_null( $dao->state_name ) ) ? $dao->state_name : $state;
370 - }
371 -
372 - return $state;
373 -}
374 -
375 -/**
376222 * Make the form to insert a test message into the queue
377223 */
378224 function queue2civicrm_insertmq_form() {

Follow-up revisions

RevisionCommit summaryAuthorDate
r80589Followup r79965; Added class to facilitate db switching and dynamically conne...awjrichards21:09, 19 January 2011

Comments

#Comment by Awjrichards (talk | contribs)   02:03, 19 January 2011

This whole module needs to be updated to not use a statically defined db for CiviCRM. Take a look at fundcore module for a good example of how to do this.

Status & tagging log