Index: trunk/tools/trainwreck/trainwreck.c |
— | — | @@ -92,6 +92,7 @@ |
93 | 93 | |
94 | 94 | static char *binlog_file; |
95 | 95 | static int64_t binlog_pos = 4; |
| 96 | +static int max_buffer = 0; |
96 | 97 | |
97 | 98 | regex_t *db_regex; |
98 | 99 | |
— | — | @@ -128,6 +129,7 @@ |
129 | 130 | pthread_mutex_t lq_mtx; |
130 | 131 | pthread_cond_t lq_cond; |
131 | 132 | struct lqhead lq_head; |
| 133 | + int lq_entries; |
132 | 134 | } le_queue_t; |
133 | 135 | |
134 | 136 | static void lq_init(le_queue_t *); |
— | — | @@ -329,6 +331,8 @@ |
330 | 332 | do_ignore_errno(atoi(value)); |
331 | 333 | } else if (!strcmp(opt, "nwriters")) { |
332 | 334 | nwriters = atoi(value); |
| 335 | + } else if (!strcmp(opt, "max-buffer")) { |
| 336 | + max_buffer = atoi(value); |
333 | 337 | } else if (!strcmp(opt, "only-replicate")) { |
334 | 338 | int err; |
335 | 339 | db_regex = calloc(1, sizeof(*db_regex)); |
— | — | @@ -736,6 +740,7 @@ |
737 | 741 | le_queue_t *q; |
738 | 742 | { |
739 | 743 | TAILQ_INIT(&q->lq_head); |
| 744 | + q->lq_entries = 0; |
740 | 745 | } |
741 | 746 | |
742 | 747 | static void |
— | — | @@ -745,9 +750,17 @@ |
746 | 751 | { |
747 | 752 | lq_entry_t *entry; |
748 | 753 | pthread_mutex_lock(&q->lq_mtx); |
| 754 | + while (q->lq_entries >= max_buffer) { |
| 755 | + logmsg("queue is full, sleeping..."); |
| 756 | + pthread_mutex_unlock(&q->lq_mtx); |
| 757 | + sleep(5); |
| 758 | + pthread_mutex_lock(&q->lq_mtx); |
| 759 | + } |
| 760 | + |
749 | 761 | entry = calloc(1, sizeof(lq_entry_t)); |
750 | 762 | entry->lqe_item = e; |
751 | 763 | TAILQ_INSERT_TAIL(&q->lq_head, entry, lqe_q); |
| 764 | + q->lq_entries++; |
752 | 765 | pthread_cond_signal(&q->lq_cond); |
753 | 766 | pthread_mutex_unlock(&q->lq_mtx); |
754 | 767 | } |
— | — | @@ -785,6 +798,7 @@ |
786 | 799 | |
787 | 800 | qe = TAILQ_FIRST(&q->lq_head); |
788 | 801 | TAILQ_REMOVE(&q->lq_head, qe, lqe_q); |
| 802 | + q->lq_entries--; |
789 | 803 | |
790 | 804 | pthread_mutex_unlock(&q->lq_mtx); |
791 | 805 | |
— | — | @@ -891,6 +905,7 @@ |
892 | 906 | } |
893 | 907 | executed_up_to(self, e->le_file, e->le_pos); |
894 | 908 | } |
| 909 | + free_log_entry(e); |
895 | 910 | self->wr_status = ST_WAIT_FOR_ENTRY; |
896 | 911 | } |
897 | 912 | return NULL; |