--- memcached-1.4.4/Makefile.am Fri Oct 30 04:24:52 2009 +++ repcached-2.2-1.4.4/Makefile.am Tue Feb 9 23:02:45 2010 @@ -31,6 +31,10 @@ memcached_SOURCES += sasl_defs.c endif +if ENABLE_REPLICATION +memcached_SOURCES += replication.h replication.c +endif + memcached_debug_SOURCES = $(memcached_SOURCES) memcached_CPPFLAGS = -DNDEBUG memcached_debug_LDADD = @PROFILER_LDFLAGS@ --- memcached-1.4.4/assoc.c Sat Oct 24 00:38:01 2009 +++ repcached-2.2-1.4.4/assoc.c Tue Feb 9 23:02:45 2010 @@ -258,3 +258,51 @@ } +#ifdef USE_REPLICATION +char *assoc_key_snap(int *n) +{ + char *p = NULL; + char *b = NULL; + item *i = NULL; + int co = 0; + int sz = 1; + int hs = 0; + int hm = hashsize(hashpower); + + hs = hm; + while(hs--){ + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){ + i = old_hashtable[hs]; + }else{ + i = primary_hashtable[hs]; + } + while(i){ + sz += i->nkey + 1; + co++; + i = i->h_next; + } + } + + if(co){ + if((p = b = malloc(sz))){ + hs = hm; + while(hs--){ + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){ + i = old_hashtable[hs]; + }else{ + i = primary_hashtable[hs]; + } + while(i){ + memcpy(p, ITEM_key(i), i->nkey); + p += i->nkey; + *(p++) = 0; + i = i->h_next; + } + } + *(p++) = 0; + } + } + if(n) *n = co; + return(b); +} +#endif /* USE_REPLICATION */ --- memcached-1.4.4/assoc.h Sun Aug 30 03:00:58 2009 +++ repcached-2.2-1.4.4/assoc.h Tue Feb 9 23:02:45 2010 @@ -7,3 +7,6 @@ int start_assoc_maintenance_thread(void); void stop_assoc_maintenance_thread(void); +#ifdef USE_REPLICATION +char *assoc_key_snap(int *n); +#endif /*USE_REPLICATION*/ --- memcached-1.4.4/config.h.in Fri Nov 27 09:34:56 2009 +++ repcached-2.2-1.4.4/config.h.in Wed Feb 10 19:12:46 2010 @@ -99,6 +99,9 @@ /* Define to 1 if you have the ANSI C header files. */ #undef STDC_HEADERS +/* Define this if you want to use replication */ +#undef USE_REPLICATION + /* Version number of package */ #undef VERSION --- memcached-1.4.4/configure.ac Wed Nov 25 03:40:29 2009 +++ repcached-2.2-1.4.4/configure.ac Tue Feb 9 23:02:45 2010 @@ -382,6 +382,18 @@ AC_MSG_ERROR([Can't enable threads without the POSIX thread library.]) fi +dnl Check whether the user wants replication or not +AC_ARG_ENABLE(replication, + [AS_HELP_STRING([--enable-replication],[support replication])], + [if test "x$enable_threads" = "xyes"; then + AC_MSG_ERROR([Can't enable threads and replication together.]) + else + AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication]) + fi + ]) + +AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes") + AC_CHECK_FUNCS(mlockall) AC_CHECK_FUNCS(getpagesizes) AC_CHECK_FUNCS(memcntl) --- memcached-1.4.4/items.c Sat Oct 24 00:38:01 2009 +++ repcached-2.2-1.4.4/items.c Tue Feb 9 23:02:45 2010 @@ -155,6 +155,9 @@ STATS_LOCK(); stats.evictions++; STATS_UNLOCK(); +#ifdef USE_REPLICATION + replication_call_del(ITEM_key(search), search->nkey); +#endif /* USE_REPLICATION */ } do_item_unlink(search); break; @@ -288,8 +291,14 @@ stats.total_items += 1; STATS_UNLOCK(); +#ifdef USE_REPLICATION + /* Allocate a new CAS ID on link. */ + if(!(it->it_flags & ITEM_REPDATA)) + ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); +#else /* Allocate a new CAS ID on link. */ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); +#endif /* USE_REPLICATION */ item_link_q(it); --- memcached-1.4.4/memcached.c Fri Nov 27 08:45:13 2009 +++ repcached-2.2-1.4.4/memcached.c Wed Feb 10 16:08:37 2010 @@ -102,6 +102,30 @@ static void conn_free(conn *c); +#ifdef USE_REPLICATION +static int rep_exit = 0; +static conn *rep_recv = NULL; +static conn *rep_send = NULL; +static conn *rep_conn = NULL; +static conn *rep_serv = NULL; +static int server_socket_replication(const int); +static void server_close_replication(void); +static int replication_init(void); +static int replication_server_init(void); +static int replication_client_init(void); +static int replication_start(void); +static int replication_connect(void); +static int replication_close(void); +static void replication_dispatch_close(void); +static int replication_marugoto(int); +static int replication_send(conn *); +static int replication_pop(void); +static int replication_push(void); +static int replication_exit(void); +static int replication_item(Q_ITEM *); +static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER; +#endif /* USE_REPLICATION */ + /** exported globals **/ struct stats stats; struct settings settings; @@ -194,6 +218,11 @@ settings.backlog = 1024; settings.binding_protocol = negotiating_prot; settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */ +#ifdef USE_REPLICATION + settings.rep_addr.s_addr = htonl(INADDR_ANY); + settings.rep_port = 11212; + settings.rep_qmax = 8192; +#endif /* USE_REPLICATION */ } /* @@ -382,6 +411,10 @@ prot_text(c->protocol)); } else if (IS_UDP(transport)) { fprintf(stderr, "<%d server listening (udp)\n", sfd); +#ifdef USE_REPLICATION + } else if (init_state == conn_rep_listen) { + fprintf(stderr, "<%d server listening (replication)\n", sfd); +#endif /* USE_REPLICATION */ } else if (c->protocol == negotiating_prot) { fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd); @@ -593,7 +626,11 @@ "conn_nread", "conn_swallow", "conn_closing", - "conn_mwrite" }; + "conn_mwrite", + "conn_repconnect", + "conn_rep_listen", + "conn_pipe_recv", + "conn_pipe_send" }; return statenames[state]; } @@ -752,6 +789,14 @@ assert(c != NULL); +#ifdef USE_REPLICATION + if (c == rep_conn){ + if (settings.verbose > 1) + fprintf(stderr, "REP>%d %s\n", c->sfd, str); + conn_set_state(c, conn_new_cmd); + return; + } +#endif /* USE_REPLICATION */ if (c->noreply) { if (settings.verbose > 1) fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str); @@ -791,9 +836,11 @@ int comm = c->cmd; enum store_item_type ret; - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { out_string(c, "CLIENT_ERROR bad data chunk"); @@ -832,6 +879,11 @@ switch (ret) { case STORED: +#ifdef USE_REPLICATION + if( c != rep_conn ){ + replication_call_rep(ITEM_key(it), it->nkey); + } +#endif /* USE_REPLICATION */ out_string(c, "STORED"); break; case EXISTS: @@ -2410,6 +2462,11 @@ APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num); APPEND_STAT("threads", "%d", settings.num_threads); APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields); +#ifdef USE_REPLICATION + APPEND_STAT("replication", "MASTER", 0); + APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION); + APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count()); +#endif /*USE_REPLICATION*/ STATS_UNLOCK(); } @@ -2797,6 +2854,11 @@ switch(add_delta(c, it, incr, delta, temp)) { case OK: out_string(c, temp); +#ifdef USE_REPLICATION + if( c != rep_conn){ + replication_call_rep(ITEM_key(it), it->nkey); + } +#endif /* USE_REPLICATION */ break; case NON_NUMERIC: out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value"); @@ -2911,17 +2973,25 @@ if (it) { MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } item_unlink(it); item_remove(it); /* release our reference */ +#ifdef USE_REPLICATION + if( c != rep_conn ) + replication_call_del(key, nkey); +#endif /* USE_REPLICATION */ out_string(c, "DELETED"); } else { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.delete_misses++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.delete_misses++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } out_string(c, "NOT_FOUND"); } @@ -2986,6 +3056,22 @@ process_update_command(c, tokens, ntokens, comm, true); +#ifdef USE_REPLICATION + } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) { + + process_update_command(c, tokens, ntokens, comm, true); + if(c->item) + ((item *)(c->item))->it_flags |= ITEM_REPDATA; + + } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) { + if(replication_start() == -1) + exit(EXIT_FAILURE); + if (settings.verbose > 0) + fprintf(stderr,"replication: start\n"); + out_string(c, "OK"); + return; + +#endif /* USE_REPLICATION */ } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { process_arithmetic_command(c, tokens, ntokens, 1); @@ -3012,11 +3098,17 @@ set_noreply_maybe(c, tokens, ntokens); - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.flush_cmds++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.flush_cmds++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } if(ntokens == (c->noreply ? 3 : 2)) { +#ifdef USE_REPLICATION + if( c != rep_conn ) + replication_call_flush_all(); +#endif settings.oldest_live = current_time - 1; item_flush_expired(); out_string(c, "OK"); @@ -3029,6 +3121,11 @@ return; } +#ifdef USE_REPLICATION + if( c != rep_conn ) + replication_call_defer_flush_all(realtime(exptime) + process_started); +#endif + settings.oldest_live = realtime(exptime) - 1; /* If exptime is zero realtime() would return zero too, and realtime(exptime) - 1 would overflow to the max unsigned @@ -3275,9 +3372,11 @@ int avail = c->rsize - c->rbytes; res = read(c->sfd, c->rbuf + c->rbytes, avail); if (res > 0) { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.bytes_read += res; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.bytes_read += res; + pthread_mutex_unlock(&c->thread->stats.mutex); + } gotdata = READ_DATA_RECEIVED; c->rbytes += res; if (res == avail) { @@ -3423,6 +3522,12 @@ assert(c != NULL); +#ifdef USE_REPLICATION + if(rep_exit && (c->state != conn_pipe_recv)){ + return; + } +#endif /* USE_REPLICATION */ + while (!stop) { switch(c->state) { @@ -3502,9 +3607,11 @@ if (nreqs >= 0) { reset_cmd_handler(c); } else { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.conn_yields++; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.conn_yields++; + pthread_mutex_unlock(&c->thread->stats.mutex); + } if (c->rbytes > 0) { /* We have already read in data into the input buffer, so libevent will most likely not signal read events @@ -3545,9 +3652,11 @@ /* now try reading from the socket */ res = read(c->sfd, c->ritem, c->rlbytes); if (res > 0) { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.bytes_read += res; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.bytes_read += res; + pthread_mutex_unlock(&c->thread->stats.mutex); + } if (c->rcurr == c->ritem) { c->rcurr += res; } @@ -3600,9 +3709,11 @@ /* now try reading from the socket */ res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize); if (res > 0) { - pthread_mutex_lock(&c->thread->stats.mutex); - c->thread->stats.bytes_read += res; - pthread_mutex_unlock(&c->thread->stats.mutex); + if (c->thread) { + pthread_mutex_lock(&c->thread->stats.mutex); + c->thread->stats.bytes_read += res; + pthread_mutex_unlock(&c->thread->stats.mutex); + } c->sbytes -= res; break; } @@ -3698,6 +3809,10 @@ case conn_closing: if (IS_UDP(c->transport)) conn_cleanup(c); +#ifdef USE_REPLICATION + else if(c == rep_conn) + replication_close(); +#endif /*USE_REPLICATION*/ else conn_close(c); stop = true; @@ -3706,6 +3821,70 @@ case conn_max_state: assert(false); break; + +#ifdef USE_REPLICATION + case conn_pipe_recv: + if(replication_pop()){ + replication_close(); + }else{ + replication_send(rep_conn); + } + stop = true; + break; + + case conn_rep_listen: + if (settings.verbose > 0) + fprintf(stderr,"replication: accept\n"); + addrlen = sizeof(addr); + res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); + if(res == -1){ + if(errno == EAGAIN || errno == EWOULDBLOCK) { + } else if (errno == EMFILE) { + fprintf(stderr, "replication: Too many opened connections\n"); + } else { + fprintf(stderr, "replication: accept error\n"); + } + }else{ + if(rep_conn){ + close(res); + fprintf(stderr,"replication: already connected\n"); + }else{ + if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){ + close(res); + fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n"); + }else{ + server_close_replication(); + rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); + rep_conn->item = NULL; + rep_conn->rbytes = 0; + rep_conn->rcurr = rep_conn->rbuf; + replication_connect(); + replication_marugoto(1); + replication_marugoto(0); + } + } + } + stop = true; + break; + + case conn_repconnect: + rep_conn = c; + replication_connect(); + conn_set_state(c, conn_read); + if (settings.verbose > 0) + fprintf(stderr,"replication: marugoto copying\n"); + if(!update_event(c, EV_READ | EV_PERSIST)){ + fprintf(stderr, "replication: Couldn't update event\n"); + conn_set_state(c, conn_closing); + } + stop = true; + break; + + case conn_pipe_send: + /* should not happen */ + fprintf(stderr, "replication: unexpected conn_pipe_send state\n"); + break; +#endif /* USE_REPLICATION */ } } @@ -4002,6 +4181,89 @@ return 0; } +#ifdef USE_REPLICATION +static int server_socket_replication(const int port) { + int sfd; + struct linger ling = {0, 0}; + struct addrinfo *ai; + struct addrinfo *next; + struct addrinfo hints; + char port_buf[NI_MAXSERV]; + int error; + int success = 0; + + int flags =1; + + memset(&hints, 0, sizeof (hints)); + hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG; + hints.ai_family = AF_UNSPEC; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_socktype = SOCK_STREAM; + snprintf(port_buf, NI_MAXSERV, "%d", port); + error= getaddrinfo(settings.inter, port_buf, &hints, &ai); + if (error != 0) { + if (error != EAI_SYSTEM) + fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error)); + else + perror("getaddrinfo()"); + + return 1; + } + + for (next= ai; next; next= next->ai_next) { + conn *rep_serv_add; + if ((sfd = new_socket(next)) == -1) { + freeaddrinfo(ai); + return 1; + } + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); + + if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) { + if (errno != EADDRINUSE) { + perror("bind()"); + close(sfd); + freeaddrinfo(ai); + return 1; + } + close(sfd); + continue; + } else { + success++; + if (listen(sfd, 1024) == -1) { + perror("listen()"); + close(sfd); + freeaddrinfo(ai); + return 1; + } + } + + if (!(rep_serv_add = conn_new(sfd, conn_rep_listen, + EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) { + fprintf(stderr, "failed to create replication server connection\n"); + exit(EXIT_FAILURE); + } + + rep_serv_add->next = rep_serv; + rep_serv = rep_serv_add; + } + + freeaddrinfo(ai); + + /* Return zero iff we detected no errors in starting up connections */ + return success == 0; +} + +static void server_close_replication(void) { + while(rep_serv){ + conn_close(rep_serv); + rep_serv = rep_serv->next; + } +} +#endif /* USE_REPLICATION */ + /* * We keep the current time of day in a global variable that's updated by a * timer event. This saves us a bunch of time() system calls (we really only @@ -4041,6 +4303,9 @@ static void usage(void) { printf(PACKAGE " " VERSION "\n"); +#ifdef USE_REPLICATION + printf("repcached %s\n",REPCACHED_VERSION); +#endif /* USE_REPLICATION */ printf("-p TCP port number to listen on (default: 11211)\n" "-U UDP port number to listen on (default: 11211, 0 is off)\n" "-s UNIX socket path to listen on (disables network support)\n" @@ -4088,6 +4353,10 @@ #ifdef ENABLE_SASL printf("-S Turn on Sasl authentication\n"); #endif +#ifdef USE_REPLICATION + printf("-x hostname or IP address of peer repcached\n"); + printf("-X TCP port number for replication (default: 11212)\n"); +#endif /* USE_REPLICATION */ return; } @@ -4194,6 +4463,26 @@ exit(EXIT_SUCCESS); } +#ifdef USE_REPLICATION +static void sig_handler_cb(int fd, short event, void *arg) +{ + struct event *signal = arg; + + if (settings.verbose) + fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal)); + + if (replication_exit()) { + exit(EXIT_FAILURE); + } + + pthread_mutex_lock(&replication_pipe_lock); + if (!rep_send) { + exit(EXIT_SUCCESS); + } + pthread_mutex_unlock(&replication_pipe_lock); +} +#endif /* USE_REPLICATION */ + #ifndef HAVE_SIGIGNORE static int sigignore(int sig) { struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 }; @@ -4249,6 +4538,57 @@ #endif } +static void create_listening_sockets(void) +{ + /* create unix mode sockets after dropping privileges */ + if (settings.socketpath != NULL) { + errno = 0; + if (server_socket_unix(settings.socketpath,settings.access)) { + vperror("failed to listen on UNIX socket: %s", settings.socketpath); + exit(EX_OSERR); + } + } + + /* create the listening socket, bind it, and init */ + if (settings.socketpath == NULL) { + const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); + char temp_portnumber_filename[PATH_MAX]; + FILE *portnumber_file = NULL; + + if (portnumber_filename != NULL) { + snprintf(temp_portnumber_filename, + sizeof(temp_portnumber_filename), + "%s.lck", portnumber_filename); + + portnumber_file = fopen(temp_portnumber_filename, "a"); + if (portnumber_file == NULL) { + fprintf(stderr, "Failed to open \"%s\": %s\n", + temp_portnumber_filename, strerror(errno)); + } + } + + errno = 0; + if (settings.port && server_socket(settings.port, tcp_transport, + portnumber_file)) { + vperror("failed to listen on TCP port %d", settings.port); + exit(EX_OSERR); + } + + /* create the UDP listening socket and bind it */ + errno = 0; + if (settings.udpport && server_socket(settings.udpport, udp_transport, + portnumber_file)) { + vperror("failed to listen on UDP port %d", settings.udpport); + exit(EX_OSERR); + } + + if (portnumber_file) { + fclose(portnumber_file); + rename(temp_portnumber_filename, portnumber_filename); + } + } +} + int main (int argc, char **argv) { int c; bool lock_memory = false; @@ -4261,6 +4601,11 @@ struct rlimit rlim; char unit = '\0'; int size_max = 0; +#ifdef USE_REPLICATION + struct in_addr addr; + struct addrinfo master_hint; + struct addrinfo *master_addr; +#endif /* USE_REPLICATION */ /* listening sockets */ static int *l_socket = NULL; @@ -4307,6 +4652,11 @@ "B:" /* Binding protocol */ "I:" /* Max item size */ "S" /* Sasl ON */ +#ifdef USE_REPLICATION + "X:" /* replication port */ + "x:" /* replication master */ + "q:" /* replication queue length */ +#endif /* USE_REPLICATION */ ))) { switch (c) { case 'a': @@ -4462,6 +4812,31 @@ ); } break; +#ifdef USE_REPLICATION + case 'x': + if (inet_pton(AF_INET, optarg, &addr) <= 0) { + memset(&master_hint, 0, sizeof(master_hint)); + master_hint.ai_flags = 0; + master_hint.ai_socktype = 0; + master_hint.ai_protocol = 0; + if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){ + settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr; + freeaddrinfo(master_addr); + }else{ + fprintf(stderr, "Illegal address: %s\n", optarg); + return 1; + } + } else { + settings.rep_addr = addr; + } + break; + case 'X': + settings.rep_port = atoi(optarg); + break; + case 'q': + settings.rep_qmax = atoi(optarg); + break; +#endif /* USE_REPLICATION */ case 'S': /* set Sasl authentication to true. Default is false */ #ifndef ENABLE_SASL fprintf(stderr, "This server is not built with SASL support.\n"); @@ -4587,6 +4962,17 @@ /* initialize main thread libevent instance */ main_base = event_init(); +#ifdef USE_REPLICATION + /* register events for SIGINT and SIGTERM to handle them in main thread */ + struct event signal_int, signal_term; + event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb, + &signal_int); + event_add(&signal_int, NULL); + event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb, + &signal_term); + event_add(&signal_term, NULL); +#endif + /* initialize other stuff */ stats_init(); assoc_init(); @@ -4615,63 +5001,21 @@ /* initialise clock event */ clock_handler(0, 0, 0); - /* create unix mode sockets after dropping privileges */ - if (settings.socketpath != NULL) { - errno = 0; - if (server_socket_unix(settings.socketpath,settings.access)) { - vperror("failed to listen on UNIX socket: %s", settings.socketpath); - exit(EX_OSERR); - } - } - - /* create the listening socket, bind it, and init */ - if (settings.socketpath == NULL) { - int udp_port; - - const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); - char temp_portnumber_filename[PATH_MAX]; - FILE *portnumber_file = NULL; - - if (portnumber_filename != NULL) { - snprintf(temp_portnumber_filename, - sizeof(temp_portnumber_filename), - "%s.lck", portnumber_filename); - - portnumber_file = fopen(temp_portnumber_filename, "a"); - if (portnumber_file == NULL) { - fprintf(stderr, "Failed to open \"%s\": %s\n", - temp_portnumber_filename, strerror(errno)); - } - } - - errno = 0; - if (settings.port && server_socket(settings.port, tcp_transport, - portnumber_file)) { - vperror("failed to listen on TCP port %d", settings.port); - exit(EX_OSERR); - } - - /* - * initialization order: first create the listening sockets - * (may need root on low ports), then drop root if needed, - * then daemonise if needed, then init libevent (in some cases - * descriptors created by libevent wouldn't survive forking). - */ - udp_port = settings.udpport ? settings.udpport : settings.port; - - /* create the UDP listening socket and bind it */ - errno = 0; - if (settings.udpport && server_socket(settings.udpport, udp_transport, - portnumber_file)) { - vperror("failed to listen on UDP port %d", settings.udpport); - exit(EX_OSERR); - } + /* + * initialization order: first create the listening sockets + * (may need root on low ports), then drop root if needed, + * then daemonise if needed, then init libevent (in some cases + * descriptors created by libevent wouldn't survive forking). + */ - if (portnumber_file) { - fclose(portnumber_file); - rename(temp_portnumber_filename, portnumber_filename); - } +#ifdef USE_REPLICATION + if(replication_init() == -1){ + fprintf(stderr, "faild to replication init\n"); + exit(EXIT_FAILURE); } +#else + create_listening_sockets(); +#endif /* Drop privileges no longer needed */ drop_privileges(); @@ -4694,3 +5038,401 @@ return EXIT_SUCCESS; } + +#ifdef USE_REPLICATION +static int replication_start(void) +{ + static int start = 0; + if(start) + return(0); + + create_listening_sockets(); + + start = 1; + return(0); +} + +static int replication_server_init(void) +{ + rep_recv = NULL; + rep_send = NULL; + rep_conn = NULL; + if(server_socket_replication(settings.rep_port)){ + fprintf(stderr, "replication: failed to initialize replication server socket\n"); + return(-1); + } + if (settings.verbose > 0) + fprintf(stderr, "replication: listen\n"); + return(replication_start()); +} + +static int replication_client_init(void) +{ + int s; + conn *c; + struct addrinfo ai; + struct sockaddr_in server; + + rep_recv = NULL; + rep_send = NULL; + rep_conn = NULL; + + memset(&ai,0,sizeof(ai)); + ai.ai_family = AF_INET; + ai.ai_socktype = SOCK_STREAM; + s = new_socket(&ai); + + if(s == -1) { + fprintf(stderr, "replication: failed to replication client socket\n"); + return(-1); + }else{ + /* connect */ + memset((char *)&server, 0, sizeof(server)); + server.sin_family = AF_INET; + server.sin_addr = settings.rep_addr; + server.sin_port = htons(settings.rep_port); + if (settings.verbose > 0) + fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port); + if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){ + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base); + if(c == NULL){ + fprintf(stderr, "replication: failed to create client conn"); + close(s); + return(-1); + } + drive_machine(c); + }else{ + if(errno == EINPROGRESS){ + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base); + if(c == NULL){ + fprintf(stderr, "replication: failed to create client conn"); + close(s); + return(-1); + } + }else{ + fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port)); + close(s); + return(-1); + } + } + } + return(0); +} + +static int replication_init(void) +{ + if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){ + if(replication_client_init() != -1){ + return(0); + } + } + return(replication_server_init()); +} + +static int replication_connect(void) +{ + int f; + int p[2]; + + if(pipe(p) == -1){ + fprintf(stderr, "replication: can't create pipe\n"); + return(-1); + }else{ + if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) { + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n"); + return(-1); + } + if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) { + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n"); + return(-1); + } + pthread_mutex_lock(&replication_pipe_lock); + rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); + rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); + event_del(&rep_send->event); + pthread_mutex_unlock(&replication_pipe_lock); + } + return(0); +} + +static int replication_close(void) +{ + int c; + int r; + Q_ITEM *q; + + if(settings.verbose > 0) + fprintf(stderr,"replication: close\n"); + if(rep_recv){ + rep_recv->rbytes = sizeof(q); + rep_recv->rcurr = rep_recv->rbuf; + c = 0; + do{ + r = read(rep_recv->sfd, rep_recv->rcurr, rep_recv->rbytes); + if(r == -1){ + break; + } + rep_recv->rbytes -= r; + rep_recv->rcurr += r; + if(!rep_recv->rbytes){ + memcpy(&q, rep_recv->rbuf, sizeof(q)); + rep_recv->rbytes = sizeof(q); + rep_recv->rcurr = rep_recv->rbuf; + qi_free(q); + c++; + } + }while(r); + conn_close(rep_recv); + rep_recv = NULL; + if (settings.verbose > 1) { + fprintf(stderr, "replication: qitem free %d items\n", qi_free_list()); + fprintf(stderr, "replication: close recv %d items\n", c); + } + } + pthread_mutex_lock(&replication_pipe_lock); + if(rep_send){ + conn_close(rep_send); + rep_send = NULL; + if (settings.verbose > 1) + fprintf(stderr,"replication: close send\n"); + } + pthread_mutex_unlock(&replication_pipe_lock); + if(rep_conn){ + conn_close(rep_conn); + rep_conn = NULL; + if (settings.verbose > 1) + fprintf(stderr,"replication: close conn\n"); + } + if(!rep_exit) + replication_server_init(); + return(0); +} + +static void replication_dispatch_close(void) +{ + if (settings.verbose > 1) + fprintf(stderr, "replication: dispatch close\n"); + pthread_mutex_lock(&replication_pipe_lock); + if (rep_send) { + conn_close(rep_send); + rep_send = NULL; + } + pthread_mutex_unlock(&replication_pipe_lock); +} + +static int replication_marugoto(int f) +{ + static int keysend = 0; + static int keycount = 0; + static char *keylist = NULL; + static char *keyptr = NULL; + + if(f){ + if(keylist){ + free(keylist); + keylist = NULL; + keyptr = NULL; + keycount = 0; + keysend = 0; + } + pthread_mutex_lock(&cache_lock); + keylist = (char *)assoc_key_snap((int *)&keycount); + pthread_mutex_unlock(&cache_lock); + keyptr = keylist; + if (!keyptr){ + replication_call_marugoto_end(); + }else{ + if (settings.verbose > 0) + fprintf(stderr,"replication: marugoto start\n"); + } + }else{ + if(keyptr){ + while(*keyptr){ + item *it = item_get(keyptr, strlen(keyptr)); + if(it){ + item_remove(it); + if(replication_call_rep(keyptr, strlen(keyptr)) == -1){ + return(-1); + }else{ + keysend++; + keyptr += strlen(keyptr) + 1; + return(0); + } + } + keyptr += strlen(keyptr) + 1; + } + if(settings.verbose > 0) + fprintf(stderr,"replication: marugoto %d\n", keysend); + replication_call_marugoto_end(); + if(settings.verbose > 0) + fprintf(stderr,"replication: marugoto owari\n"); + free(keylist); + keylist = NULL; + keyptr = NULL; + keycount = 0; + keysend = 0; + } + } + return(0); +} + +static int replication_send(conn *c) +{ + while(c->wbytes){ + int w = write(c->sfd, c->wcurr, c->wbytes); + if(w == -1){ + if(errno == EAGAIN || errno == EINTR){ + }else{ + fprintf(stderr,"replication: send error\n"); + replication_close(); + break; + } + }else{ + c->wbytes -= w; + c->wcurr += w; + } + } + return(c->wbytes); +} + +static int replication_pop(void) +{ + int r; + int c; + int m; + Q_ITEM **q; + + if(settings.verbose > 1) + fprintf(stderr, "replication: pop\n"); + + if(!rep_recv) + return(0); + + r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize); + if(r == -1){ + if(errno == EAGAIN || errno == EINTR){ + }else{ + fprintf(stderr,"replication: pop error %d\n", errno); + return(-1); + } + }if(r == 0){ + /* other end closed, trigger replication_close() */ + return(-1); + }else{ + c = r / sizeof(Q_ITEM *); + m = r % sizeof(Q_ITEM *); + q = (Q_ITEM **)(rep_recv->rbuf); + while(c--){ + if(q[c]){ + if(rep_conn && replication_cmd(rep_conn, q[c])){ + replication_item(q[c]); /* error retry */ + }else{ + qi_free(q[c]); + } + }else{ + if(!rep_exit){ + if (settings.verbose) + fprintf(stderr,"replication: cleanup start\n"); + rep_exit = 1; + } + } + } + } + if(rep_exit){ + if(rep_conn->wbytes){ + /* retry */ + if(replication_exit()){ + replication_close(); + fprintf(stderr,"replication: cleanup error\n"); + exit(EXIT_FAILURE); + } + }else{ + /* finish */ + replication_close(); + if (settings.verbose) + fprintf(stderr,"replication: cleanup complete\n"); + exit(EXIT_SUCCESS); + } + } + replication_marugoto(0); + return(0); +} + +static int replication_push(void) +{ + int w; + + while(rep_send->wbytes){ + w = write(rep_send->sfd, rep_send->wcurr, rep_send->wbytes); + if(w == -1){ + if(errno == EAGAIN || errno == EINTR){ + fprintf(stderr,"replication: push EAGAIN or EINTR\n"); + }else{ + return(-1); + } + }else{ + rep_send->wbytes -= w; + rep_send->wcurr += w; + } + } + rep_send->wcurr = rep_send->wbuf; + return(0); +} + +static int replication_exit(void) +{ + return(replication_item(NULL)); +} + +static int replication_item(Q_ITEM *q) +{ + pthread_mutex_lock(&replication_pipe_lock); + if (!rep_send) { + pthread_mutex_unlock(&replication_pipe_lock); + return 0; + } + if(rep_send->wcurr + rep_send->wbytes + sizeof(q) > rep_send->wbuf + rep_send->wsize){ + fprintf(stderr,"replication: buffer over fllow\n"); + if(q){ + qi_free(q); + } + pthread_mutex_unlock(&replication_pipe_lock); + replication_dispatch_close(); + return(-1); + } + memcpy(rep_send->wcurr + rep_send->wbytes, &q, sizeof(q)); + rep_send->wbytes += sizeof(q); + if(replication_push()){ + fprintf(stderr, "replication: push error\n"); + if(q){ + qi_free(q); + } + pthread_mutex_unlock(&replication_pipe_lock); + replication_dispatch_close(); + return(-1); + } + pthread_mutex_unlock(&replication_pipe_lock); + return(0); +} + +int replication(enum CMD_TYPE type, R_CMD *cmd) +{ + Q_ITEM *q; + + pthread_mutex_lock(&replication_pipe_lock); + if (!rep_send) { + pthread_mutex_unlock(&replication_pipe_lock); + return 0; + } + pthread_mutex_unlock(&replication_pipe_lock); + + if((q = qi_new(type, cmd, false))) { + replication_item(q); + }else{ + fprintf(stderr,"replication: can't create Q_ITEM\n"); + replication_dispatch_close(); + return(-1); + } + return(0); +} +#endif /* USE_REPLICATION */ --- memcached-1.4.5/memcached.h~ 2010-05-06 14:09:51.000000000 +0300 +++ memcached-1.4.5/memcached.h 2010-05-06 14:10:13.518051741 +0300 @@ -144,7 +144,13 @@ conn_swallow, /**< swallowing unnecessary bytes w/o storing */ conn_closing, /**< closing this connection */ conn_mwrite, /**< writing out many items sequentially */ - conn_max_state /**< Max state value (used for assertion) */ +#ifdef USE_REPLICATION + conn_repconnect, /**< replication connecting to master */ + conn_rep_listen, /**< replication listening socket */ + conn_pipe_recv, /**< replication command pipe recv */ + conn_pipe_send, /**< replication command pipe send */ +#endif /* USE_REPLICATION */ + conn_max_state, /**< Max state value (used for assertion) */ }; enum bin_substates { @@ -247,7 +247,9 @@ uint64_t get_misses; uint64_t evictions; uint64_t reclaimed; +#if 0 time_t started; /* when the process was started */ +#endif bool accepting_conns; /* whether we are currently accepting */ uint64_t listen_disabled_num; }; @@ -274,6 +282,11 @@ int backlog; int item_size_max; /* Maximum item size, and upper end for slabs */ bool sasl; /* SASL on/off */ +#ifdef USE_REPLICATION + struct in_addr rep_addr; /* replication addr */ + int rep_port; /* replication port */ + int rep_qmax; /* replication QITEM max */ +#endif /*USE_REPLICATION*/ }; extern struct stats stats; @@ -286,6 +299,10 @@ /* temp */ #define ITEM_SLABBED 4 +#ifdef USE_REPLICATION +#define ITEM_REPDATA 128 +#endif /*USE_REPLICATION*/ + /** * Structure for storing items within memcached. */ @@ -438,6 +455,10 @@ #include "trace.h" #include "hash.h" #include "util.h" + +#ifdef USE_REPLICATION +#include "replication.h" +#endif /* USE_REPLICATION */ /* * Functions such as the libevent-related calls that need to do cross-thread --- memcached-1.4.4/replication.c Thu Jan 1 03:00:00 1970 +++ repcached-2.2-1.4.4/replication.c Wed Feb 10 18:40:48 2010 @@ -0,0 +1,355 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * + */ +#include "memcached.h" +#include "replication.h" +#include +#include +#include +#include +#include + +static Q_ITEM *q_freelist = NULL; +static int q_itemcount = 0; +static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER; + +int get_qi_count(void) +{ + int c; + pthread_mutex_lock(&replication_queue_lock); + c = q_itemcount; + pthread_mutex_unlock(&replication_queue_lock); + return(c); +} + +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse) +{ + Q_ITEM *q = NULL; + char *key = NULL; + uint32_t keylen = 0; + rel_time_t time = 0; + + pthread_mutex_lock(&replication_queue_lock); + if(q_freelist){ + q = q_freelist; + q_freelist = q->next; + } + + if(NULL == q){ + if(reuse) { + pthread_mutex_unlock(&replication_queue_lock); + return(NULL); + } + if(q_itemcount >= settings.rep_qmax) { + pthread_mutex_unlock(&replication_queue_lock); + return(NULL); + } + q = malloc(sizeof(Q_ITEM)); + if (NULL == q){ + fprintf(stderr,"replication: qi_new out of memory\n"); + pthread_mutex_unlock(&replication_queue_lock); + return(NULL); + } + q_itemcount++; + if (settings.verbose > 2) + fprintf(stderr,"replication: alloc c=%d\n", q_itemcount); + } + + pthread_mutex_unlock(&replication_queue_lock); + + switch (type) { + case REPLICATION_REP: + case REPLICATION_DEL: + key = cmd->key; + keylen = cmd->keylen; + break; + case REPLICATION_FLUSH_ALL: + break; + case REPLICATION_DEFER_FLUSH_ALL: + time = cmd->time; + break; + case REPLICATION_MARUGOTO_END: + break; + default: + fprintf(stderr,"replication: got unknown command: %d\n", type); + return(NULL); + } + + q->key = NULL; + q->type = type; + q->time = time; + q->next = NULL; + if (keylen) { + q->key = malloc(keylen + 1); + if(NULL == q->key){ + qi_free(q); + q = NULL; + }else{ + memcpy(q->key, key, keylen); + *(q->key + keylen) = 0; + } + } + + return(q); +} + +void qi_free(Q_ITEM *q) +{ + if(q){ + if(q->key){ + free(q->key); + q->key = NULL; + } + pthread_mutex_lock(&replication_queue_lock); + q->next = q_freelist; + q_freelist = q; + pthread_mutex_unlock(&replication_queue_lock); + } +} + +int qi_free_list() +{ + int c = 0; + Q_ITEM *q = NULL; + + pthread_mutex_lock(&replication_queue_lock); + while((q = q_freelist)){ + q_itemcount--; + c++; + q_freelist = q->next; + free(q); + } + pthread_mutex_unlock(&replication_queue_lock); + return(c); +} + +static int replication_get_num(char *p, int n) +{ + int l; + char b[64]; + if(p) + l = sprintf(p, "%u", n); + else + l = sprintf(b, "%u", n); + return(l); +} + +int replication_call_rep(char *key, size_t keylen) +{ + R_CMD r; + r.key = key; + r.keylen = keylen; + return(replication(REPLICATION_REP, &r)); +} + +int replication_call_del(char *key, size_t keylen) +{ + R_CMD r; + r.key = key; + r.keylen = keylen; + return(replication(REPLICATION_DEL, &r)); +} + +int replication_call_flush_all() +{ + R_CMD r; + r.key = NULL; + return(replication(REPLICATION_FLUSH_ALL, &r)); +} + +int replication_call_defer_flush_all(const rel_time_t time) +{ + R_CMD r; + r.key = NULL; + r.time = time; + return(replication(REPLICATION_DEFER_FLUSH_ALL, &r)); +} + +int replication_call_marugoto_end() +{ + R_CMD r; + r.key = NULL; + return(replication(REPLICATION_MARUGOTO_END, &r)); +} + +static int replication_alloc(conn *c, int s) +{ + char *p; + s += c->wbytes; + if(c->wsize < s){ + while(c->wsize < s) + c->wsize += 4096; + if((p = malloc(c->wsize))){ + memcpy(p, c->wbuf, c->wbytes); + free(c->wbuf); + c->wbuf = p; + }else{ + return(-1); + } + } + return(0); +} + +static int replication_del(conn *c, char *k) +{ + int l = 0; + char *s = "delete "; + char *n = "\r\n"; + char *p = NULL; + + l += strlen(s); + l += strlen(k); + l += strlen(n); + if(replication_alloc(c,l) == -1){ + fprintf(stderr, "replication: del malloc error\n"); + return(-1); + } + p = c->wbuf + c->wbytes; + memcpy(p, s, strlen(s)); + p += strlen(s); + memcpy(p, k, strlen(k)); + p += strlen(k); + memcpy(p, n, strlen(n)); + p += strlen(n); + c->wbytes = p - c->wbuf; + c->wcurr = c->wbuf; + return(0); +} + +static int replication_rep(conn *c, item *it) +{ + int exp = 0; + int len = 0; + char *s = "rep "; + char *n = "\r\n"; + char *p = NULL; + char flag[40]; + + if(it->exptime) + exp = it->exptime + process_started; + flag[0]=0; + if((p=ITEM_suffix(it))){ + int i; + memcpy(flag, p, it->nsuffix - 2); + flag[it->nsuffix - 2] = 0; + for(i=0;i ' ') + break; + } + memmove(flag,&flag[i],strlen(flag)-i); + for(p=flag;*p>' ';p++); + *p=0; + } + len += strlen(s); + len += it->nkey; + len += 1; + len += strlen(flag); + len += 1; + len += replication_get_num(NULL, exp); + len += 1; + len += replication_get_num(NULL, it->nbytes - 2); + len += 1; + len += replication_get_num(NULL, ITEM_get_cas(it)); + len += strlen(n); + len += it->nbytes; + len += strlen(n); + if(replication_alloc(c,len) == -1){ + fprintf(stderr, "replication: rep malloc error\n"); + return(-1); + } + p = c->wbuf + c->wbytes; + memcpy(p, s, strlen(s)); + p += strlen(s); + memcpy(p, ITEM_key(it), it->nkey); + p += it->nkey; + *(p++) = ' '; + memcpy(p, flag, strlen(flag)); + p += strlen(flag); + *(p++) = ' '; + p += replication_get_num(p, exp); + *(p++) = ' '; + p += replication_get_num(p, it->nbytes - 2); + *(p++) = ' '; + p += replication_get_num(p, ITEM_get_cas(it)); + memcpy(p, n, strlen(n)); + p += strlen(n); + memcpy(p, ITEM_data(it), it->nbytes); + p += it->nbytes; + c->wbytes = p - c->wbuf; + c->wcurr = c->wbuf; + return(0); +} + +static int replication_flush_all(conn *c, rel_time_t exp) +{ + char *s = "flush_all "; + char *n = "\r\n"; + char *p = NULL; + + int l = strlen(s) + strlen(n); + if (exp > 0) + l += replication_get_num(NULL, exp); + if(replication_alloc(c,l) == -1){ + fprintf(stderr, "replication: flush_all malloc error\n"); + return(-1); + } + p = c->wbuf + c->wbytes; + memcpy(p, s, strlen(s)); + p += strlen(s); + if (exp > 0) + p += replication_get_num(p, exp); + memcpy(p, n, strlen(n)); + p += strlen(n); + c->wbytes = p - c->wbuf; + c->wcurr = c->wbuf; + return(0); +} + +static int replication_marugoto_end(conn *c) +{ + char *s = "marugoto_end"; + char *n = "\r\n"; + char *p = NULL; + + int l = strlen(s) + strlen(n); + if(replication_alloc(c,l) == -1){ + fprintf(stderr, "replication: marugoto_end malloc error\n"); + return(-1); + } + p = c->wbuf + c->wbytes; + memcpy(p, s, strlen(s)); + p += strlen(s); + memcpy(p, n, strlen(n)); + p += strlen(n); + c->wbytes = p - c->wbuf; + c->wcurr = c->wbuf; + return(0); +} + +int replication_cmd(conn *c, Q_ITEM *q) +{ + item *it; + int r; + + switch (q->type) { + case REPLICATION_REP: + it = item_get(q->key, strlen(q->key)); + if (!it) + return(replication_del(c, q->key)); + r = replication_rep(c, it); + item_remove(it); + return r; + case REPLICATION_DEL: + return(replication_del(c, q->key)); + case REPLICATION_FLUSH_ALL: + return(replication_flush_all(c, 0)); + case REPLICATION_DEFER_FLUSH_ALL: + return(replication_flush_all(c, q->time)); + case REPLICATION_MARUGOTO_END: + return(replication_marugoto_end(c)); + default: + fprintf(stderr,"replication: got unknown command:%d\n", q->type); + return(0); + } +} --- memcached-1.4.4/replication.h Thu Jan 1 03:00:00 1970 +++ repcached-2.2-1.4.4/replication.h Wed Feb 10 18:40:31 2010 @@ -0,0 +1,42 @@ +#ifndef MEMCACHED_REPLICATION_H +#define MEMCACHED_REPLICATION_H +#define REPCACHED_VERSION "2.2" +#include + +enum CMD_TYPE { + REPLICATION_REP, + REPLICATION_DEL, + REPLICATION_FLUSH_ALL, + REPLICATION_DEFER_FLUSH_ALL, + REPLICATION_MARUGOTO_END, +}; + +typedef struct queue_item_t Q_ITEM; +struct queue_item_t { + enum CMD_TYPE type; + char *key; + rel_time_t time; + Q_ITEM *next; +}; + +typedef struct replication_cmd_t R_CMD; +struct replication_cmd_t { + char *key; + int keylen; + rel_time_t time; +}; + +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool); +void qi_free(Q_ITEM *); +int qi_free_list(void); +int replication_cmd(conn *, Q_ITEM *); +int get_qi_count(void); + +int replication_call_rep(char *key, size_t keylen); +int replication_call_del(char *key, size_t keylen); +int replication_call_flush_all(void); +int replication_call_defer_flush_all(const rel_time_t time); +int replication_call_marugoto_end(void); +int replication(enum CMD_TYPE type, R_CMD *cmd); + +#endif --- memcached-1.4.5/t/binary.t~ 2010-04-03 10:07:16.000000000 +0300 +++ memcached-1.4.5/t/binary.t 2010-05-06 14:13:25.718440750 +0300 @@ -2,11 +2,13 @@ use strict; use warnings; -use Test::More tests => 3361; +use Test::More; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; +Test::More::plan(tests => 3361 + (support_replication() ? 36 : 0)); + my $server = new_memcached(); ok($server, "started the server"); --- memcached-1.4.4/t/issue_67.t Sun Nov 1 01:44:09 2009 +++ repcached-2.2-1.4.4/t/issue_67.t Wed Feb 10 17:50:12 2010 @@ -41,6 +41,10 @@ my $exe = "$builddir/memcached-debug"; croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe; + if (support_replication()) { + $args .= ' -X 0'; + } + my $childpid = fork(); my $cmd = "$builddir/timedrun 10 $exe $args"; --- memcached-1.4.4/t/lib/MemcachedTest.pm Fri Oct 30 04:24:52 2009 +++ repcached-2.2-1.4.4/t/lib/MemcachedTest.pm Wed Feb 10 17:53:34 2010 @@ -13,7 +13,8 @@ @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats - supports_sasl free_port); + supports_sasl free_port support_replication memcached_version + version2num); sub sleep { my $n = shift; @@ -148,6 +149,23 @@ return 0; } +sub support_replication { + my $output = `$builddir/memcached-debug -h`; + return 1 if $output =~ /^-x /m; + return 0; +} + +sub memcached_version { + my $output = `$builddir/memcached-debug -h`; + return $1 if $output =~ /^memcached (\d[\d\.]+)/; + return 0; +} + +sub version2num { + my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/); + return $major*100**2 + $minor*100 + $pl +} + sub new_memcached { my ($args, $passed_port) = @_; my $port = $passed_port || free_port(); @@ -171,6 +189,9 @@ } if ($< == 0) { $args .= " -u root"; + } + if (support_replication() && $args !~ m/-X/) { + $args .= ' -X 0'; } my $childpid = fork(); --- memcached-1.4.5/t/stats.t~ 2010-04-03 10:07:16.000000000 +0300 +++ memcached-1.4.5/t/stats.t 2010-05-06 14:15:28.521352735 +0300 @@ -57,7 +57,8 @@ my $stats = mem_stats($sock); # Test number of keys -is(scalar(keys(%$stats)), 38, "38 stats values"); +my $keys = 38 + (support_replication() ? 3 : 0); +is(scalar(keys(%$stats)), $keys, "$keys stats values"); # Test initial state foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses --- memcached-1.4.4/testapp.c Wed Nov 25 03:40:29 2009 +++ repcached-2.2-1.4.4/testapp.c Wed Feb 10 17:52:05 2010 @@ -300,6 +300,10 @@ argv[arg++] = "-1"; argv[arg++] = "-U"; argv[arg++] = "0"; +#ifdef USE_REPLICATION + argv[arg++] = "-X"; + argv[arg++] = "0"; +#endif /* Handle rpmbuild and the like doing this as root */ if (getuid() == 0) { argv[arg++] = "-u";