Index: trunk/tools/trainwreck/trainwreck.c |
— | — | @@ -34,6 +34,8 @@ |
35 | 35 | |
36 | 36 | #define BINLOG_NAMELEN 512 |
37 | 37 | |
| 38 | +#define MASTER_RETRY 30 |
| 39 | + |
38 | 40 | static void strdup_free(char **, char const *); |
39 | 41 | static void usage(void); |
40 | 42 | |
— | — | @@ -118,6 +120,8 @@ |
119 | 121 | static status_t reader_st = ST_STOPPED; |
120 | 122 | static int master_thread_stop; |
121 | 123 | |
| 124 | +static void slave_process_logs(writer_t *); |
| 125 | +static int slave_connect(writer_t *); |
122 | 126 | static void executed_up_to(writer_t *, char const *, logpos_t); |
123 | 127 | static void sync_ack(writer_t *); |
124 | 128 | static int nsyncs; |
— | — | @@ -318,9 +322,14 @@ |
319 | 323 | reader_st = ST_INITIALISING; |
320 | 324 | (void) pthread_mutex_unlock(&rst_mtx); |
321 | 325 | |
322 | | - if (master_connect() == -1) |
323 | | - return -1; |
| 326 | + for (;;) { |
| 327 | + if (master_connect() == 0) |
| 328 | + break; |
324 | 329 | |
| 330 | + logmsg("master connect failed; sleeping for retry"); |
| 331 | + (void) sleep(MASTER_RETRY); |
| 332 | + } |
| 333 | + |
325 | 334 | (void) pthread_create(&master_thread, NULL, read_master_logs, NULL); |
326 | 335 | |
327 | 336 | return 0; |
— | — | @@ -352,7 +361,7 @@ |
353 | 362 | break; |
354 | 363 | |
355 | 364 | reconnect: |
356 | | - (void) sleep(30); |
| 365 | + (void) sleep(MASTER_RETRY); |
357 | 366 | logmsg("reconnecting to master..."); |
358 | 367 | mysql_close(master_conn); |
359 | 368 | if (master_connect() == -1) |
— | — | @@ -422,11 +431,11 @@ |
423 | 432 | (void) pthread_mutex_unlock(&rst_mtx); |
424 | 433 | |
425 | 434 | if (master_thread_stop) { |
426 | | - logmsg("shutting down"); |
427 | | - (void) pthread_mutex_unlock(&rst_mtx); |
428 | 435 | mysql_close(master_conn); |
429 | 436 | |
430 | 437 | (void) pthread_mutex_lock(&rst_mtx); |
| 438 | + logmsg("shutting down; read up to %s,%lu", |
| 439 | + curfile, (unsigned long) curpos); |
431 | 440 | reader_st = ST_STOPPED; |
432 | 441 | master_thread_stop = 0; |
433 | 442 | free(curfile); |
— | — | @@ -437,7 +446,6 @@ |
438 | 447 | return 0; |
439 | 448 | } |
440 | 449 | |
441 | | - |
442 | 450 | if (len == packet_error) { |
443 | 451 | logmsg("%s,%lu: error retrieving binlogs from server: (%d) %s", |
444 | 452 | curfile, (unsigned long) curpos, |
— | — | @@ -445,8 +453,6 @@ |
446 | 454 | return -1; |
447 | 455 | } |
448 | 456 | |
449 | | - (void) pthread_mutex_unlock(&rst_mtx); |
450 | | - |
451 | 457 | if ((ent = parse_binlog(master_conn->net.read_pos + 1, len - 1)) == NULL) { |
452 | 458 | logmsg("failed parsing binlog"); |
453 | 459 | return -1; |
— | — | @@ -484,8 +490,7 @@ |
485 | 491 | if ((db_regex == NULL || regexec(db_regex, ent->le_database, 0, NULL, 0) == 0) && |
486 | 492 | (ignore_regex == NULL || regexec(ignore_regex, ent->le_database, 0, NULL, 0) != 0) && |
487 | 493 | (ent->le_type == ET_INTVAR || ent->le_type == ET_QUERY)) { |
488 | | - writer_t *writer; |
489 | | - lq_put(&writer->wr_log_queue, ent); |
| 494 | + lq_put(&writer.wr_log_queue, ent); |
490 | 495 | } else { |
491 | 496 | free_log_entry(ent); |
492 | 497 | } |
— | — | @@ -664,7 +669,8 @@ |
665 | 670 | if (debug) |
666 | 671 | logmsg("queue is full, sleeping..."); |
667 | 672 | (void) pthread_mutex_unlock(&q->lq_mtx); |
668 | | - (void) sleep(5); |
| 673 | + /* (void) sleep(5);*/ |
| 674 | + (void) usleep(5000); |
669 | 675 | (void) pthread_mutex_lock(&q->lq_mtx); |
670 | 676 | } |
671 | 677 | |
— | — | @@ -731,11 +737,32 @@ |
732 | 738 | return 0; |
733 | 739 | } |
734 | 740 | |
| 741 | +int |
| 742 | +slave_connect(self) |
| 743 | + writer_t *self; |
| 744 | +{ |
| 745 | + if ((self->wr_conn = mysql_init(NULL)) == NULL) { |
| 746 | + logmsg("out of memory in mysql_init"); |
| 747 | + return -1; |
| 748 | + } |
| 749 | + |
| 750 | + mysql_options(self->wr_conn, MYSQL_READ_DEFAULT_GROUP, "trainwreck-slave"); |
| 751 | + |
| 752 | + if (mysql_real_connect(self->wr_conn, slave_host, slave_user, slave_pass, NULL, |
| 753 | + slave_port, NULL, 0) == NULL) { |
| 754 | + logmsg("cannot connect to slave %s:%d: %s", |
| 755 | + slave_host, slave_port, mysql_error(self->wr_conn)); |
| 756 | + mysql_close(self->wr_conn); |
| 757 | + return -1; |
| 758 | + } |
| 759 | + |
| 760 | + return 0; |
| 761 | +} |
| 762 | + |
735 | 763 | static void * |
736 | 764 | slave_write_thread(p) |
737 | 765 | void *p; |
738 | 766 | { |
739 | | -logentry_t *e; |
740 | 767 | writer_t *self = p; |
741 | 768 | char namebuf[16]; |
742 | 769 | |
— | — | @@ -753,30 +780,41 @@ |
754 | 781 | |
755 | 782 | self->wr_status = ST_INITIALISING; |
756 | 783 | |
757 | | - if ((self->wr_conn = mysql_init(NULL)) == NULL) { |
758 | | - logmsg("out of memory in mysql_init"); |
759 | | - return 0; |
| 784 | + for (;;) { |
| 785 | + if (slave_connect(self) == 0) |
| 786 | + break; |
| 787 | + sleep(30); |
760 | 788 | } |
761 | 789 | |
762 | | - mysql_options(self->wr_conn, MYSQL_READ_DEFAULT_GROUP, "trainwreck-slave"); |
763 | | - |
764 | | - if (mysql_real_connect(self->wr_conn, slave_host, slave_user, slave_pass, NULL, |
765 | | - slave_port, NULL, 0) == NULL) { |
766 | | - logmsg("cannot connect to slave %s:%d: %s", |
767 | | - slave_host, slave_port, mysql_error(self->wr_conn)); |
768 | | - return 0; |
769 | | - } |
770 | | - |
771 | 790 | if (retrieve_binlog_position(self) != 0) { |
772 | 791 | logmsg("could not retrieve binlog position"); |
773 | | - exit(1); |
| 792 | + mysql_close(self->wr_conn); |
| 793 | + return NULL; |
774 | 794 | } |
775 | 795 | |
776 | 796 | (void) pthread_mutex_lock(&wi_mtx); |
777 | 797 | writers_initialising--; |
778 | 798 | (void) pthread_cond_signal(&wi_cond); |
779 | 799 | (void) pthread_mutex_unlock(&wi_mtx); |
780 | | - |
| 800 | + |
| 801 | + for (;;) { |
| 802 | + slave_process_logs(self); |
| 803 | + mysql_close(self->wr_conn); |
| 804 | + sleep(30); |
| 805 | + |
| 806 | + for (;;) { |
| 807 | + if (slave_connect(self) == 0) |
| 808 | + break; |
| 809 | + sleep(30); |
| 810 | + } |
| 811 | + } |
| 812 | +} |
| 813 | + |
| 814 | +void |
| 815 | +slave_process_logs(self) |
| 816 | + writer_t *self; |
| 817 | +{ |
| 818 | +logentry_t *e; |
781 | 819 | self->wr_status = ST_WAIT_FOR_ENTRY; |
782 | 820 | while ((e = lq_get(&self->wr_log_queue)) != NULL) { |
783 | 821 | if (debug) |
— | — | @@ -791,7 +829,8 @@ |
792 | 830 | logmsg("%s,%lu: cannot select \"%s\": %s", |
793 | 831 | e->le_file, (unsigned long) e->le_pos, |
794 | 832 | e->le_database, mysql_error(self->wr_conn)); |
795 | | - exit(1); |
| 833 | + mysql_close(self->wr_conn); |
| 834 | + return; |
796 | 835 | } |
797 | 836 | |
798 | 837 | switch (e->le_type) { |
— | — | @@ -805,7 +844,7 @@ |
806 | 845 | e->le_database, |
807 | 846 | mysql_errno(self->wr_conn), mysql_error(self->wr_conn), |
808 | 847 | query); |
809 | | - exit(1); |
| 848 | + return; |
810 | 849 | } |
811 | 850 | |
812 | 851 | break; |
— | — | @@ -820,7 +859,7 @@ |
821 | 860 | e->le_database, |
822 | 861 | mysql_errno(self->wr_conn), mysql_error(self->wr_conn), |
823 | 862 | query); |
824 | | - exit(1); |
| 863 | + return; |
825 | 864 | } |
826 | 865 | executed_up_to(self, e->le_file, e->le_pos); |
827 | 866 | |
— | — | @@ -830,7 +869,7 @@ |
831 | 870 | free_log_entry(e); |
832 | 871 | self->wr_status = ST_WAIT_FOR_ENTRY; |
833 | 872 | } |
834 | | - return NULL; |
| 873 | + return; |
835 | 874 | } |
836 | 875 | |
837 | 876 | /*ARGSUSED*/ |
Index: trunk/tools/trainwreck/Makefile |
— | — | @@ -1,13 +1,8 @@ |
2 | | -#CC = gcc |
3 | 2 | CC = cc |
4 | 3 | CFLAGS = -g -mt -xc99=none -D_FILE_OFFSET_BITS=64 |
5 | | -#CFLAGS = -g -D_REENTRANT -m64 -DBIG_TABLES -DHAVE_RWLOCK_T |
6 | | -#CFLAGS = -g -D_REENTRANT -D_FILE_OFFSET_BITS=64 |
7 | 4 | LDFLAGS = |
8 | 5 | MYSQL_CFLAGS:sh = mysql_config --include |
9 | | -#MYSQL_LIBS:sh = mysql_config --libs_r |
10 | | -#MYSQL_LIBS = -L/usr/local/mysql/lib/mysql -R/usr/local/mysql/lib/mysql -lmysqlclient |
11 | | -MYSQL_LIBS = -L/opt/mysql/lib/mysql -R/opt/mysql/lib/mysql -lmysqlclient |
| 6 | +MYSQL_LIBS:sh = mysql_config --libs_r |
12 | 7 | INCLUDES = |
13 | 8 | LIBS = -lrt -ldoor |
14 | 9 | PROG = trainwreck |