Index: trunk/tools/trainwreck/trainwreck.c |
— | — | @@ -99,6 +99,7 @@ |
100 | 100 | |
101 | 101 | static int execute_query(MYSQL *, char const *); |
102 | 102 | |
| 103 | +#define ET_SYNC -2 |
103 | 104 | #define ET_QUERY 2 |
104 | 105 | #define ET_INTVAR 5 |
105 | 106 | #define ET_ROTATE 4 |
— | — | @@ -170,6 +171,10 @@ |
171 | 172 | |
172 | 173 | static writer_t the_writer; |
173 | 174 | static void executed_up_to(writer_t *, char const *, logpos_t); |
| 175 | +static void sync_ack(writer_t *); |
| 176 | +static int nsyncs; |
| 177 | +static pthread_mutex_t sync_mtx = PTHREAD_MUTEX_INITIALIZER; |
| 178 | +static pthread_cond_t sync_cond = PTHREAD_COND_INITIALIZER; |
174 | 179 | |
175 | 180 | static int ctl_port; |
176 | 181 | |
— | — | @@ -565,6 +570,36 @@ |
566 | 571 | strlcpy(ent->le_database, lastdb, sizeof(ent->le_database)); |
567 | 572 | |
568 | 573 | if (ent->le_type == ET_ROTATE && ent->le_time != 0) { |
| 574 | + int i; |
| 575 | + /* |
| 576 | + * Insert an ET_SYNC event into every writer, and wait |
| 577 | + * for all writers to sync. This is needed to simplify |
| 578 | + * binlog management. |
| 579 | + */ |
| 580 | + nsyncs = nwriters; |
| 581 | + for (i = 0; i < nwriters; ++i) { |
| 582 | + logentry_t *ent; |
| 583 | + ent = calloc(1, sizeof(*ent)); |
| 584 | + ent->le_type = ET_SYNC; |
| 585 | + lq_put(&writers[i].wr_log_queue, ent); |
| 586 | + } |
| 587 | + |
| 588 | + pthread_mutex_lock(&sync_mtx); |
| 589 | + while (nsyncs) |
| 590 | + pthread_cond_wait(&sync_cond, &sync_mtx); |
| 591 | + pthread_mutex_unlock(&sync_mtx); |
| 592 | + |
| 593 | + /* |
| 594 | + * Set the saved position for all writers to the new |
| 595 | + * log position. |
| 596 | + */ |
| 597 | + for (i = 0; i < nwriters; ++i) |
| 598 | + executed_up_to(&writers[i], ent->le_info, 4); |
| 599 | + |
| 600 | + /* |
| 601 | + * Now do the actual rotation. |
| 602 | + */ |
| 603 | + |
569 | 604 | strdup_free(&curfile, ent->le_info); |
570 | 605 | curpos = 4; |
571 | 606 | logmsg("rotating to %s,4", curfile); |
— | — | @@ -775,7 +810,8 @@ |
776 | 811 | lq_entry_t *entry; |
777 | 812 | pthread_mutex_lock(&q->lq_mtx); |
778 | 813 | while (q->lq_entries >= max_buffer) { |
779 | | - logmsg("queue is full, sleeping..."); |
| 814 | + if (debug) |
| 815 | + logmsg("queue is full, sleeping..."); |
780 | 816 | pthread_mutex_unlock(&q->lq_mtx); |
781 | 817 | sleep(5); |
782 | 818 | pthread_mutex_lock(&q->lq_mtx); |
— | — | @@ -908,7 +944,12 @@ |
909 | 945 | exit(1); |
910 | 946 | } |
911 | 947 | |
912 | | - if (e->le_type == ET_INTVAR) { |
| 948 | + switch (e->le_type) { |
| 949 | + case ET_SYNC: |
| 950 | + sync_ack(self); |
| 951 | + break; |
| 952 | + |
| 953 | + case ET_INTVAR: { |
913 | 954 | char query[128]; |
914 | 955 | snprintf(query, sizeof(query), "SET INSERT_ID=%llu", |
915 | 956 | (unsigned long long) e->le_insert_id); |
— | — | @@ -920,7 +961,11 @@ |
921 | 962 | query); |
922 | 963 | exit(1); |
923 | 964 | } |
924 | | - } else if (e->le_type == ET_QUERY) { |
| 965 | + |
| 966 | + break; |
| 967 | + } |
| 968 | + |
| 969 | + case ET_QUERY: { |
925 | 970 | char *query; |
926 | 971 | query = e->le_info; |
927 | 972 | if (execute_query(self->wr_conn, query) != 0) { |
— | — | @@ -932,13 +977,25 @@ |
933 | 978 | exit(1); |
934 | 979 | } |
935 | 980 | executed_up_to(self, e->le_file, e->le_pos); |
| 981 | + |
| 982 | + break; |
936 | 983 | } |
| 984 | + } |
937 | 985 | free_log_entry(e); |
938 | 986 | self->wr_status = ST_WAIT_FOR_ENTRY; |
939 | 987 | } |
940 | 988 | return NULL; |
941 | 989 | } |
942 | 990 | |
| 991 | +static void |
| 992 | +sync_ack(wr) |
| 993 | + writer_t *wr; |
| 994 | +{ |
| 995 | + pthread_mutex_lock(&sync_mtx); |
| 996 | + --nsyncs; |
| 997 | + pthread_mutex_unlock(&sync_mtx); |
| 998 | +} |
| 999 | + |
943 | 1000 | static int |
944 | 1001 | execute_query(conn, q) |
945 | 1002 | MYSQL *conn; |