r24088 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r24087‎ | r24088 | r24089 >
Date:15:36, 14 July 2007
Author:river
Status:old
Tags:
Comment:
need a way to checkpoint writer threads when rotating binlogs with multiple
writers
Modified paths:
  • /trunk/tools/trainwreck/trainwreck.c (modified) (history)

Diff [purge]

Index: trunk/tools/trainwreck/trainwreck.c
@@ -99,6 +99,7 @@
100100
101101 static int execute_query(MYSQL *, char const *);
102102
 103+#define ET_SYNC -2
103104 #define ET_QUERY 2
104105 #define ET_INTVAR 5
105106 #define ET_ROTATE 4
@@ -170,6 +171,10 @@
171172
172173 static writer_t the_writer;
173174 static void executed_up_to(writer_t *, char const *, logpos_t);
 175+static void sync_ack(writer_t *);
 176+static int nsyncs;
 177+static pthread_mutex_t sync_mtx = PTHREAD_MUTEX_INITIALIZER;
 178+static pthread_cond_t sync_cond = PTHREAD_COND_INITIALIZER;
174179
175180 static int ctl_port;
176181
@@ -565,6 +570,36 @@
566571 strlcpy(ent->le_database, lastdb, sizeof(ent->le_database));
567572
568573 if (ent->le_type == ET_ROTATE && ent->le_time != 0) {
 574+ int i;
 575+ /*
 576+ * Insert an ET_SYNC event into every writer, and wait
 577+ * for all writers to sync. This is needed to simplify
 578+ * binlog management.
 579+ */
 580+ nsyncs = nwriters;
 581+ for (i = 0; i < nwriters; ++i) {
 582+ logentry_t *ent;
 583+ ent = calloc(1, sizeof(*ent));
 584+ ent->le_type = ET_SYNC;
 585+ lq_put(&writers[i].wr_log_queue, ent);
 586+ }
 587+
 588+ pthread_mutex_lock(&sync_mtx);
 589+ while (nsyncs)
 590+ pthread_cond_wait(&sync_cond, &sync_mtx);
 591+ pthread_mutex_unlock(&sync_mtx);
 592+
 593+ /*
 594+ * Set the saved position for all writers to the new
 595+ * log position.
 596+ */
 597+ for (i = 0; i < nwriters; ++i)
 598+ executed_up_to(&writers[i], ent->le_info, 4);
 599+
 600+ /*
 601+ * Now do the actual rotation.
 602+ */
 603+
569604 strdup_free(&curfile, ent->le_info);
570605 curpos = 4;
571606 logmsg("rotating to %s,4", curfile);
@@ -775,7 +810,8 @@
776811 lq_entry_t *entry;
777812 pthread_mutex_lock(&q->lq_mtx);
778813 while (q->lq_entries >= max_buffer) {
779 - logmsg("queue is full, sleeping...");
 814+ if (debug)
 815+ logmsg("queue is full, sleeping...");
780816 pthread_mutex_unlock(&q->lq_mtx);
781817 sleep(5);
782818 pthread_mutex_lock(&q->lq_mtx);
@@ -908,7 +944,12 @@
909945 exit(1);
910946 }
911947
912 - if (e->le_type == ET_INTVAR) {
 948+ switch (e->le_type) {
 949+ case ET_SYNC:
 950+ sync_ack(self);
 951+ break;
 952+
 953+ case ET_INTVAR: {
913954 char query[128];
914955 snprintf(query, sizeof(query), "SET INSERT_ID=%llu",
915956 (unsigned long long) e->le_insert_id);
@@ -920,7 +961,11 @@
921962 query);
922963 exit(1);
923964 }
924 - } else if (e->le_type == ET_QUERY) {
 965+
 966+ break;
 967+ }
 968+
 969+ case ET_QUERY: {
925970 char *query;
926971 query = e->le_info;
927972 if (execute_query(self->wr_conn, query) != 0) {
@@ -932,13 +977,25 @@
933978 exit(1);
934979 }
935980 executed_up_to(self, e->le_file, e->le_pos);
 981+
 982+ break;
936983 }
 984+ }
937985 free_log_entry(e);
938986 self->wr_status = ST_WAIT_FOR_ENTRY;
939987 }
940988 return NULL;
941989 }
942990
 991+static void
 992+sync_ack(wr)
 993+ writer_t *wr;
 994+{
 995+ pthread_mutex_lock(&sync_mtx);
 996+ --nsyncs;
 997+ pthread_mutex_unlock(&sync_mtx);
 998+}
 999+
9431000 static int
9441001 execute_query(conn, q)
9451002 MYSQL *conn;

Status & tagging log