1 --- memcached-1.4.4/Makefile.am Fri Oct 30 04:24:52 2009
2 +++ repcached-2.2-1.4.4/Makefile.am Tue Feb 9 23:02:45 2010
4 memcached_SOURCES += sasl_defs.c
8 +memcached_SOURCES += replication.h replication.c
11 memcached_debug_SOURCES = $(memcached_SOURCES)
12 memcached_CPPFLAGS = -DNDEBUG
13 memcached_debug_LDADD = @PROFILER_LDFLAGS@
14 --- memcached-1.4.4/assoc.c Sat Oct 24 00:38:01 2009
15 +++ repcached-2.2-1.4.4/assoc.c Tue Feb 9 23:02:45 2010
20 +#ifdef USE_REPLICATION
21 +char *assoc_key_snap(int *n)
29 + int hm = hashsize(hashpower);
33 + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
34 + i = old_hashtable[hs];
36 + i = primary_hashtable[hs];
46 + if((p = b = malloc(sz))){
49 + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
50 + i = old_hashtable[hs];
52 + i = primary_hashtable[hs];
55 + memcpy(p, ITEM_key(i), i->nkey);
67 +#endif /* USE_REPLICATION */
68 --- memcached-1.4.4/assoc.h Sun Aug 30 03:00:58 2009
69 +++ repcached-2.2-1.4.4/assoc.h Tue Feb 9 23:02:45 2010
71 int start_assoc_maintenance_thread(void);
72 void stop_assoc_maintenance_thread(void);
74 +#ifdef USE_REPLICATION
75 +char *assoc_key_snap(int *n);
76 +#endif /*USE_REPLICATION*/
77 --- memcached-1.4.4/config.h.in Fri Nov 27 09:34:56 2009
78 +++ repcached-2.2-1.4.4/config.h.in Wed Feb 10 19:12:46 2010
80 /* Define to 1 if you have the ANSI C header files. */
83 +/* Define this if you want to use replication */
84 +#undef USE_REPLICATION
86 /* Version number of package */
89 --- memcached-1.4.4/configure.ac Wed Nov 25 03:40:29 2009
90 +++ repcached-2.2-1.4.4/configure.ac Tue Feb 9 23:02:45 2010
92 AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
95 +dnl Check whether the user wants replication or not
96 +AC_ARG_ENABLE(replication,
97 + [AS_HELP_STRING([--enable-replication],[support replication])],
98 + [if test "x$enable_threads" = "xyes"; then
99 + AC_MSG_ERROR([Can't enable threads and replication together.])
101 + AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication])
105 +AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes")
107 AC_CHECK_FUNCS(mlockall)
108 AC_CHECK_FUNCS(getpagesizes)
109 AC_CHECK_FUNCS(memcntl)
110 --- memcached-1.4.4/items.c Sat Oct 24 00:38:01 2009
111 +++ repcached-2.2-1.4.4/items.c Tue Feb 9 23:02:45 2010
116 +#ifdef USE_REPLICATION
117 + replication_call_del(ITEM_key(search), search->nkey);
118 +#endif /* USE_REPLICATION */
120 do_item_unlink(search);
123 stats.total_items += 1;
126 +#ifdef USE_REPLICATION
127 + /* Allocate a new CAS ID on link. */
128 + if(!(it->it_flags & ITEM_REPDATA))
129 + ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
131 /* Allocate a new CAS ID on link. */
132 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
133 +#endif /* USE_REPLICATION */
137 --- memcached-1.4.4/memcached.c Fri Nov 27 08:45:13 2009
138 +++ repcached-2.2-1.4.4/memcached.c Wed Feb 10 16:08:37 2010
141 static void conn_free(conn *c);
143 +#ifdef USE_REPLICATION
144 +static int rep_exit = 0;
145 +static conn *rep_recv = NULL;
146 +static conn *rep_send = NULL;
147 +static conn *rep_conn = NULL;
148 +static conn *rep_serv = NULL;
149 +static int server_socket_replication(const int);
150 +static void server_close_replication(void);
151 +static int replication_init(void);
152 +static int replication_server_init(void);
153 +static int replication_client_init(void);
154 +static int replication_start(void);
155 +static int replication_connect(void);
156 +static int replication_close(void);
157 +static void replication_dispatch_close(void);
158 +static int replication_marugoto(int);
159 +static int replication_send(conn *);
160 +static int replication_pop(void);
161 +static int replication_push(void);
162 +static int replication_exit(void);
163 +static int replication_item(Q_ITEM *);
164 +static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER;
165 +#endif /* USE_REPLICATION */
167 /** exported globals **/
169 struct settings settings;
171 settings.backlog = 1024;
172 settings.binding_protocol = negotiating_prot;
173 settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
174 +#ifdef USE_REPLICATION
175 + settings.rep_addr.s_addr = htonl(INADDR_ANY);
176 + settings.rep_port = 11212;
177 + settings.rep_qmax = 8192;
178 +#endif /* USE_REPLICATION */
183 prot_text(c->protocol));
184 } else if (IS_UDP(transport)) {
185 fprintf(stderr, "<%d server listening (udp)\n", sfd);
186 +#ifdef USE_REPLICATION
187 + } else if (init_state == conn_rep_listen) {
188 + fprintf(stderr, "<%d server listening (replication)\n", sfd);
189 +#endif /* USE_REPLICATION */
190 } else if (c->protocol == negotiating_prot) {
191 fprintf(stderr, "<%d new auto-negotiating client connection\n",
202 + "conn_pipe_send" };
203 return statenames[state];
210 +#ifdef USE_REPLICATION
211 + if (c == rep_conn){
212 + if (settings.verbose > 1)
213 + fprintf(stderr, "REP>%d %s\n", c->sfd, str);
214 + conn_set_state(c, conn_new_cmd);
217 +#endif /* USE_REPLICATION */
219 if (settings.verbose > 1)
220 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
223 enum store_item_type ret;
225 - pthread_mutex_lock(&c->thread->stats.mutex);
226 - c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
227 - pthread_mutex_unlock(&c->thread->stats.mutex);
229 + pthread_mutex_lock(&c->thread->stats.mutex);
230 + c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
231 + pthread_mutex_unlock(&c->thread->stats.mutex);
234 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
235 out_string(c, "CLIENT_ERROR bad data chunk");
240 +#ifdef USE_REPLICATION
241 + if( c != rep_conn ){
242 + replication_call_rep(ITEM_key(it), it->nkey);
244 +#endif /* USE_REPLICATION */
245 out_string(c, "STORED");
248 @@ -2410,6 +2462,11 @@
249 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
250 APPEND_STAT("threads", "%d", settings.num_threads);
251 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
252 +#ifdef USE_REPLICATION
253 + APPEND_STAT("replication", "MASTER", 0);
254 + APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION);
255 + APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count());
256 +#endif /*USE_REPLICATION*/
260 @@ -2797,6 +2854,11 @@
261 switch(add_delta(c, it, incr, delta, temp)) {
264 +#ifdef USE_REPLICATION
265 + if( c != rep_conn){
266 + replication_call_rep(ITEM_key(it), it->nkey);
268 +#endif /* USE_REPLICATION */
271 out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
272 @@ -2911,17 +2973,25 @@
274 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
276 - pthread_mutex_lock(&c->thread->stats.mutex);
277 - c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
278 - pthread_mutex_unlock(&c->thread->stats.mutex);
280 + pthread_mutex_lock(&c->thread->stats.mutex);
281 + c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
282 + pthread_mutex_unlock(&c->thread->stats.mutex);
286 item_remove(it); /* release our reference */
287 +#ifdef USE_REPLICATION
288 + if( c != rep_conn )
289 + replication_call_del(key, nkey);
290 +#endif /* USE_REPLICATION */
291 out_string(c, "DELETED");
293 - pthread_mutex_lock(&c->thread->stats.mutex);
294 - c->thread->stats.delete_misses++;
295 - pthread_mutex_unlock(&c->thread->stats.mutex);
297 + pthread_mutex_lock(&c->thread->stats.mutex);
298 + c->thread->stats.delete_misses++;
299 + pthread_mutex_unlock(&c->thread->stats.mutex);
302 out_string(c, "NOT_FOUND");
304 @@ -2986,6 +3056,22 @@
306 process_update_command(c, tokens, ntokens, comm, true);
308 +#ifdef USE_REPLICATION
309 + } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) {
311 + process_update_command(c, tokens, ntokens, comm, true);
313 + ((item *)(c->item))->it_flags |= ITEM_REPDATA;
315 + } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) {
316 + if(replication_start() == -1)
317 + exit(EXIT_FAILURE);
318 + if (settings.verbose > 0)
319 + fprintf(stderr,"replication: start\n");
320 + out_string(c, "OK");
323 +#endif /* USE_REPLICATION */
324 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
326 process_arithmetic_command(c, tokens, ntokens, 1);
327 @@ -3012,11 +3098,17 @@
329 set_noreply_maybe(c, tokens, ntokens);
331 - pthread_mutex_lock(&c->thread->stats.mutex);
332 - c->thread->stats.flush_cmds++;
333 - pthread_mutex_unlock(&c->thread->stats.mutex);
335 + pthread_mutex_lock(&c->thread->stats.mutex);
336 + c->thread->stats.flush_cmds++;
337 + pthread_mutex_unlock(&c->thread->stats.mutex);
340 if(ntokens == (c->noreply ? 3 : 2)) {
341 +#ifdef USE_REPLICATION
342 + if( c != rep_conn )
343 + replication_call_flush_all();
345 settings.oldest_live = current_time - 1;
346 item_flush_expired();
348 @@ -3029,6 +3121,11 @@
352 +#ifdef USE_REPLICATION
353 + if( c != rep_conn )
354 + replication_call_defer_flush_all(realtime(exptime) + process_started);
356 + settings.oldest_live = realtime(exptime) - 1;
358 If exptime is zero realtime() would return zero too, and
359 realtime(exptime) - 1 would overflow to the max unsigned
360 @@ -3275,9 +3372,11 @@
361 int avail = c->rsize - c->rbytes;
362 res = read(c->sfd, c->rbuf + c->rbytes, avail);
364 - pthread_mutex_lock(&c->thread->stats.mutex);
365 - c->thread->stats.bytes_read += res;
366 - pthread_mutex_unlock(&c->thread->stats.mutex);
368 + pthread_mutex_lock(&c->thread->stats.mutex);
369 + c->thread->stats.bytes_read += res;
370 + pthread_mutex_unlock(&c->thread->stats.mutex);
372 gotdata = READ_DATA_RECEIVED;
375 @@ -3423,6 +3522,12 @@
379 +#ifdef USE_REPLICATION
380 + if(rep_exit && (c->state != conn_pipe_recv)){
383 +#endif /* USE_REPLICATION */
388 @@ -3502,9 +3607,11 @@
390 reset_cmd_handler(c);
392 - pthread_mutex_lock(&c->thread->stats.mutex);
393 - c->thread->stats.conn_yields++;
394 - pthread_mutex_unlock(&c->thread->stats.mutex);
396 + pthread_mutex_lock(&c->thread->stats.mutex);
397 + c->thread->stats.conn_yields++;
398 + pthread_mutex_unlock(&c->thread->stats.mutex);
401 /* We have already read in data into the input buffer,
402 so libevent will most likely not signal read events
403 @@ -3545,9 +3652,11 @@
404 /* now try reading from the socket */
405 res = read(c->sfd, c->ritem, c->rlbytes);
407 - pthread_mutex_lock(&c->thread->stats.mutex);
408 - c->thread->stats.bytes_read += res;
409 - pthread_mutex_unlock(&c->thread->stats.mutex);
411 + pthread_mutex_lock(&c->thread->stats.mutex);
412 + c->thread->stats.bytes_read += res;
413 + pthread_mutex_unlock(&c->thread->stats.mutex);
415 if (c->rcurr == c->ritem) {
418 @@ -3600,9 +3709,11 @@
419 /* now try reading from the socket */
420 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
422 - pthread_mutex_lock(&c->thread->stats.mutex);
423 - c->thread->stats.bytes_read += res;
424 - pthread_mutex_unlock(&c->thread->stats.mutex);
426 + pthread_mutex_lock(&c->thread->stats.mutex);
427 + c->thread->stats.bytes_read += res;
428 + pthread_mutex_unlock(&c->thread->stats.mutex);
433 @@ -3698,6 +3809,10 @@
435 if (IS_UDP(c->transport))
437 +#ifdef USE_REPLICATION
438 + else if(c == rep_conn)
439 + replication_close();
440 +#endif /*USE_REPLICATION*/
444 @@ -3706,6 +3821,70 @@
449 +#ifdef USE_REPLICATION
450 + case conn_pipe_recv:
451 + if(replication_pop()){
452 + replication_close();
454 + replication_send(rep_conn);
459 + case conn_rep_listen:
460 + if (settings.verbose > 0)
461 + fprintf(stderr,"replication: accept\n");
462 + addrlen = sizeof(addr);
463 + res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
465 + if(errno == EAGAIN || errno == EWOULDBLOCK) {
466 + } else if (errno == EMFILE) {
467 + fprintf(stderr, "replication: Too many opened connections\n");
469 + fprintf(stderr, "replication: accept error\n");
474 + fprintf(stderr,"replication: already connected\n");
476 + if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){
478 + fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n");
480 + server_close_replication();
481 + rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
482 + rep_conn->item = NULL;
483 + rep_conn->rbytes = 0;
484 + rep_conn->rcurr = rep_conn->rbuf;
485 + replication_connect();
486 + replication_marugoto(1);
487 + replication_marugoto(0);
494 + case conn_repconnect:
496 + replication_connect();
497 + conn_set_state(c, conn_read);
498 + if (settings.verbose > 0)
499 + fprintf(stderr,"replication: marugoto copying\n");
500 + if(!update_event(c, EV_READ | EV_PERSIST)){
501 + fprintf(stderr, "replication: Couldn't update event\n");
502 + conn_set_state(c, conn_closing);
507 + case conn_pipe_send:
508 + /* should not happen */
509 + fprintf(stderr, "replication: unexpected conn_pipe_send state\n");
511 +#endif /* USE_REPLICATION */
515 @@ -4002,6 +4181,89 @@
519 +#ifdef USE_REPLICATION
520 +static int server_socket_replication(const int port) {
522 + struct linger ling = {0, 0};
523 + struct addrinfo *ai;
524 + struct addrinfo *next;
525 + struct addrinfo hints;
526 + char port_buf[NI_MAXSERV];
532 + memset(&hints, 0, sizeof (hints));
533 + hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
534 + hints.ai_family = AF_UNSPEC;
535 + hints.ai_protocol = IPPROTO_TCP;
536 + hints.ai_socktype = SOCK_STREAM;
537 + snprintf(port_buf, NI_MAXSERV, "%d", port);
538 + error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
540 + if (error != EAI_SYSTEM)
541 + fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
543 + perror("getaddrinfo()");
548 + for (next= ai; next; next= next->ai_next) {
549 + conn *rep_serv_add;
550 + if ((sfd = new_socket(next)) == -1) {
554 + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
555 + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
556 + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
557 + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
559 + if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
560 + if (errno != EADDRINUSE) {
570 + if (listen(sfd, 1024) == -1) {
571 + perror("listen()");
578 + if (!(rep_serv_add = conn_new(sfd, conn_rep_listen,
579 + EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) {
580 + fprintf(stderr, "failed to create replication server connection\n");
581 + exit(EXIT_FAILURE);
584 + rep_serv_add->next = rep_serv;
585 + rep_serv = rep_serv_add;
590 + /* Return zero iff we detected no errors in starting up connections */
591 + return success == 0;
594 +static void server_close_replication(void) {
596 + conn_close(rep_serv);
597 + rep_serv = rep_serv->next;
600 +#endif /* USE_REPLICATION */
603 * We keep the current time of day in a global variable that's updated by a
604 * timer event. This saves us a bunch of time() system calls (we really only
605 @@ -4041,6 +4303,9 @@
607 static void usage(void) {
608 printf(PACKAGE " " VERSION "\n");
609 +#ifdef USE_REPLICATION
610 + printf("repcached %s\n",REPCACHED_VERSION);
611 +#endif /* USE_REPLICATION */
612 printf("-p <num> TCP port number to listen on (default: 11211)\n"
613 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
614 "-s <file> UNIX socket path to listen on (disables network support)\n"
615 @@ -4088,6 +4353,10 @@
617 printf("-S Turn on Sasl authentication\n");
619 +#ifdef USE_REPLICATION
620 + printf("-x <ip_addr> hostname or IP address of peer repcached\n");
621 + printf("-X <num> TCP port number for replication (default: 11212)\n");
622 +#endif /* USE_REPLICATION */
626 @@ -4194,6 +4463,26 @@
630 +#ifdef USE_REPLICATION
631 +static void sig_handler_cb(int fd, short event, void *arg)
633 + struct event *signal = arg;
635 + if (settings.verbose)
636 + fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal));
638 + if (replication_exit()) {
639 + exit(EXIT_FAILURE);
642 + pthread_mutex_lock(&replication_pipe_lock);
644 + exit(EXIT_SUCCESS);
646 + pthread_mutex_unlock(&replication_pipe_lock);
648 +#endif /* USE_REPLICATION */
650 #ifndef HAVE_SIGIGNORE
651 static int sigignore(int sig) {
652 struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
653 @@ -4249,6 +4538,57 @@
657 +static void create_listening_sockets(void)
659 + /* create unix mode sockets after dropping privileges */
660 + if (settings.socketpath != NULL) {
662 + if (server_socket_unix(settings.socketpath,settings.access)) {
663 + vperror("failed to listen on UNIX socket: %s", settings.socketpath);
668 + /* create the listening socket, bind it, and init */
669 + if (settings.socketpath == NULL) {
670 + const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
671 + char temp_portnumber_filename[PATH_MAX];
672 + FILE *portnumber_file = NULL;
674 + if (portnumber_filename != NULL) {
675 + snprintf(temp_portnumber_filename,
676 + sizeof(temp_portnumber_filename),
677 + "%s.lck", portnumber_filename);
679 + portnumber_file = fopen(temp_portnumber_filename, "a");
680 + if (portnumber_file == NULL) {
681 + fprintf(stderr, "Failed to open \"%s\": %s\n",
682 + temp_portnumber_filename, strerror(errno));
687 + if (settings.port && server_socket(settings.port, tcp_transport,
688 + portnumber_file)) {
689 + vperror("failed to listen on TCP port %d", settings.port);
693 + /* create the UDP listening socket and bind it */
695 + if (settings.udpport && server_socket(settings.udpport, udp_transport,
696 + portnumber_file)) {
697 + vperror("failed to listen on UDP port %d", settings.udpport);
701 + if (portnumber_file) {
702 + fclose(portnumber_file);
703 + rename(temp_portnumber_filename, portnumber_filename);
708 int main (int argc, char **argv) {
710 bool lock_memory = false;
711 @@ -4261,6 +4601,11 @@
715 +#ifdef USE_REPLICATION
716 + struct in_addr addr;
717 + struct addrinfo master_hint;
718 + struct addrinfo *master_addr;
719 +#endif /* USE_REPLICATION */
720 /* listening sockets */
721 static int *l_socket = NULL;
723 @@ -4307,6 +4652,11 @@
724 "B:" /* Binding protocol */
725 "I:" /* Max item size */
727 +#ifdef USE_REPLICATION
728 + "X:" /* replication port */
729 + "x:" /* replication master */
730 + "q:" /* replication queue length */
731 +#endif /* USE_REPLICATION */
735 @@ -4462,6 +4812,31 @@
739 +#ifdef USE_REPLICATION
741 + if (inet_pton(AF_INET, optarg, &addr) <= 0) {
742 + memset(&master_hint, 0, sizeof(master_hint));
743 + master_hint.ai_flags = 0;
744 + master_hint.ai_socktype = 0;
745 + master_hint.ai_protocol = 0;
746 + if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){
747 + settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr;
748 + freeaddrinfo(master_addr);
750 + fprintf(stderr, "Illegal address: %s\n", optarg);
754 + settings.rep_addr = addr;
758 + settings.rep_port = atoi(optarg);
761 + settings.rep_qmax = atoi(optarg);
763 +#endif /* USE_REPLICATION */
764 case 'S': /* set Sasl authentication to true. Default is false */
766 fprintf(stderr, "This server is not built with SASL support.\n");
767 @@ -4587,6 +4962,17 @@
768 /* initialize main thread libevent instance */
769 main_base = event_init();
771 +#ifdef USE_REPLICATION
772 + /* register events for SIGINT and SIGTERM to handle them in main thread */
773 + struct event signal_int, signal_term;
774 + event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
776 + event_add(&signal_int, NULL);
777 + event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
779 + event_add(&signal_term, NULL);
782 /* initialize other stuff */
785 @@ -4615,63 +5001,21 @@
786 /* initialise clock event */
787 clock_handler(0, 0, 0);
789 - /* create unix mode sockets after dropping privileges */
790 - if (settings.socketpath != NULL) {
792 - if (server_socket_unix(settings.socketpath,settings.access)) {
793 - vperror("failed to listen on UNIX socket: %s", settings.socketpath);
798 - /* create the listening socket, bind it, and init */
799 - if (settings.socketpath == NULL) {
802 - const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
803 - char temp_portnumber_filename[PATH_MAX];
804 - FILE *portnumber_file = NULL;
806 - if (portnumber_filename != NULL) {
807 - snprintf(temp_portnumber_filename,
808 - sizeof(temp_portnumber_filename),
809 - "%s.lck", portnumber_filename);
811 - portnumber_file = fopen(temp_portnumber_filename, "a");
812 - if (portnumber_file == NULL) {
813 - fprintf(stderr, "Failed to open \"%s\": %s\n",
814 - temp_portnumber_filename, strerror(errno));
819 - if (settings.port && server_socket(settings.port, tcp_transport,
820 - portnumber_file)) {
821 - vperror("failed to listen on TCP port %d", settings.port);
826 - * initialization order: first create the listening sockets
827 - * (may need root on low ports), then drop root if needed,
828 - * then daemonise if needed, then init libevent (in some cases
829 - * descriptors created by libevent wouldn't survive forking).
831 - udp_port = settings.udpport ? settings.udpport : settings.port;
833 - /* create the UDP listening socket and bind it */
835 - if (settings.udpport && server_socket(settings.udpport, udp_transport,
836 - portnumber_file)) {
837 - vperror("failed to listen on UDP port %d", settings.udpport);
841 + * initialization order: first create the listening sockets
842 + * (may need root on low ports), then drop root if needed,
843 + * then daemonise if needed, then init libevent (in some cases
844 + * descriptors created by libevent wouldn't survive forking).
847 - if (portnumber_file) {
848 - fclose(portnumber_file);
849 - rename(temp_portnumber_filename, portnumber_filename);
851 +#ifdef USE_REPLICATION
852 + if(replication_init() == -1){
853 + fprintf(stderr, "faild to replication init\n");
854 + exit(EXIT_FAILURE);
857 + create_listening_sockets();
860 /* Drop privileges no longer needed */
862 @@ -4694,3 +5038,401 @@
867 +#ifdef USE_REPLICATION
868 +static int replication_start(void)
870 + static int start = 0;
874 + create_listening_sockets();
880 +static int replication_server_init(void)
885 + if(server_socket_replication(settings.rep_port)){
886 + fprintf(stderr, "replication: failed to initialize replication server socket\n");
889 + if (settings.verbose > 0)
890 + fprintf(stderr, "replication: listen\n");
891 + return(replication_start());
894 +static int replication_client_init(void)
898 + struct addrinfo ai;
899 + struct sockaddr_in server;
905 + memset(&ai,0,sizeof(ai));
906 + ai.ai_family = AF_INET;
907 + ai.ai_socktype = SOCK_STREAM;
908 + s = new_socket(&ai);
911 + fprintf(stderr, "replication: failed to replication client socket\n");
915 + memset((char *)&server, 0, sizeof(server));
916 + server.sin_family = AF_INET;
917 + server.sin_addr = settings.rep_addr;
918 + server.sin_port = htons(settings.rep_port);
919 + if (settings.verbose > 0)
920 + fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port);
921 + if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){
922 + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
924 + fprintf(stderr, "replication: failed to create client conn");
930 + if(errno == EINPROGRESS){
931 + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
933 + fprintf(stderr, "replication: failed to create client conn");
938 + fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port));
947 +static int replication_init(void)
949 + if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){
950 + if(replication_client_init() != -1){
954 + return(replication_server_init());
957 +static int replication_connect(void)
963 + fprintf(stderr, "replication: can't create pipe\n");
966 + if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) {
967 + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
970 + if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) {
971 + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
974 + pthread_mutex_lock(&replication_pipe_lock);
975 + rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
976 + rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
977 + event_del(&rep_send->event);
978 + pthread_mutex_unlock(&replication_pipe_lock);
983 +static int replication_close(void)
989 + if(settings.verbose > 0)
990 + fprintf(stderr,"replication: close\n");
992 + rep_recv->rbytes = sizeof(q);
993 + rep_recv->rcurr = rep_recv->rbuf;
996 + r = read(rep_recv->sfd, rep_recv->rcurr, rep_recv->rbytes);
1000 + rep_recv->rbytes -= r;
1001 + rep_recv->rcurr += r;
1002 + if(!rep_recv->rbytes){
1003 + memcpy(&q, rep_recv->rbuf, sizeof(q));
1004 + rep_recv->rbytes = sizeof(q);
1005 + rep_recv->rcurr = rep_recv->rbuf;
1010 + conn_close(rep_recv);
1012 + if (settings.verbose > 1) {
1013 + fprintf(stderr, "replication: qitem free %d items\n", qi_free_list());
1014 + fprintf(stderr, "replication: close recv %d items\n", c);
1017 + pthread_mutex_lock(&replication_pipe_lock);
1019 + conn_close(rep_send);
1021 + if (settings.verbose > 1)
1022 + fprintf(stderr,"replication: close send\n");
1024 + pthread_mutex_unlock(&replication_pipe_lock);
1026 + conn_close(rep_conn);
1028 + if (settings.verbose > 1)
1029 + fprintf(stderr,"replication: close conn\n");
1032 + replication_server_init();
1036 +static void replication_dispatch_close(void)
1038 + if (settings.verbose > 1)
1039 + fprintf(stderr, "replication: dispatch close\n");
1040 + pthread_mutex_lock(&replication_pipe_lock);
1042 + conn_close(rep_send);
1045 + pthread_mutex_unlock(&replication_pipe_lock);
1048 +static int replication_marugoto(int f)
1050 + static int keysend = 0;
1051 + static int keycount = 0;
1052 + static char *keylist = NULL;
1053 + static char *keyptr = NULL;
1063 + pthread_mutex_lock(&cache_lock);
1064 + keylist = (char *)assoc_key_snap((int *)&keycount);
1065 + pthread_mutex_unlock(&cache_lock);
1068 + replication_call_marugoto_end();
1070 + if (settings.verbose > 0)
1071 + fprintf(stderr,"replication: marugoto start\n");
1076 + item *it = item_get(keyptr, strlen(keyptr));
1079 + if(replication_call_rep(keyptr, strlen(keyptr)) == -1){
1083 + keyptr += strlen(keyptr) + 1;
1087 + keyptr += strlen(keyptr) + 1;
1089 + if(settings.verbose > 0)
1090 + fprintf(stderr,"replication: marugoto %d\n", keysend);
1091 + replication_call_marugoto_end();
1092 + if(settings.verbose > 0)
1093 + fprintf(stderr,"replication: marugoto owari\n");
1104 +static int replication_send(conn *c)
1107 + int w = write(c->sfd, c->wcurr, c->wbytes);
1109 + if(errno == EAGAIN || errno == EINTR){
1111 + fprintf(stderr,"replication: send error\n");
1112 + replication_close();
1120 + return(c->wbytes);
1123 +static int replication_pop(void)
1130 + if(settings.verbose > 1)
1131 + fprintf(stderr, "replication: pop\n");
1136 + r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize);
1138 + if(errno == EAGAIN || errno == EINTR){
1140 + fprintf(stderr,"replication: pop error %d\n", errno);
1144 + /* other end closed, trigger replication_close() */
1147 + c = r / sizeof(Q_ITEM *);
1148 + m = r % sizeof(Q_ITEM *);
1149 + q = (Q_ITEM **)(rep_recv->rbuf);
1152 + if(rep_conn && replication_cmd(rep_conn, q[c])){
1153 + replication_item(q[c]); /* error retry */
1159 + if (settings.verbose)
1160 + fprintf(stderr,"replication: cleanup start\n");
1167 + if(rep_conn->wbytes){
1169 + if(replication_exit()){
1170 + replication_close();
1171 + fprintf(stderr,"replication: cleanup error\n");
1172 + exit(EXIT_FAILURE);
1176 + replication_close();
1177 + if (settings.verbose)
1178 + fprintf(stderr,"replication: cleanup complete\n");
1179 + exit(EXIT_SUCCESS);
1182 + replication_marugoto(0);
1186 +static int replication_push(void)
1190 + while(rep_send->wbytes){
1191 + w = write(rep_send->sfd, rep_send->wcurr, rep_send->wbytes);
1193 + if(errno == EAGAIN || errno == EINTR){
1194 + fprintf(stderr,"replication: push EAGAIN or EINTR\n");
1199 + rep_send->wbytes -= w;
1200 + rep_send->wcurr += w;
1203 + rep_send->wcurr = rep_send->wbuf;
1207 +static int replication_exit(void)
1209 + return(replication_item(NULL));
1212 +static int replication_item(Q_ITEM *q)
1214 + pthread_mutex_lock(&replication_pipe_lock);
1216 + pthread_mutex_unlock(&replication_pipe_lock);
1219 + if(rep_send->wcurr + rep_send->wbytes + sizeof(q) > rep_send->wbuf + rep_send->wsize){
1220 + fprintf(stderr,"replication: buffer over fllow\n");
1224 + pthread_mutex_unlock(&replication_pipe_lock);
1225 + replication_dispatch_close();
1228 + memcpy(rep_send->wcurr + rep_send->wbytes, &q, sizeof(q));
1229 + rep_send->wbytes += sizeof(q);
1230 + if(replication_push()){
1231 + fprintf(stderr, "replication: push error\n");
1235 + pthread_mutex_unlock(&replication_pipe_lock);
1236 + replication_dispatch_close();
1239 + pthread_mutex_unlock(&replication_pipe_lock);
1243 +int replication(enum CMD_TYPE type, R_CMD *cmd)
1247 + pthread_mutex_lock(&replication_pipe_lock);
1249 + pthread_mutex_unlock(&replication_pipe_lock);
1252 + pthread_mutex_unlock(&replication_pipe_lock);
1254 + if((q = qi_new(type, cmd, false))) {
1255 + replication_item(q);
1257 + fprintf(stderr,"replication: can't create Q_ITEM\n");
1258 + replication_dispatch_close();
1263 +#endif /* USE_REPLICATION */
1264 --- memcached-1.4.4/memcached.h Thu Nov 26 03:37:49 2009
1265 +++ repcached-2.2-1.4.4/memcached.h Tue Feb 9 23:02:45 2010
1266 @@ -144,7 +144,13 @@
1267 conn_swallow, /**< swallowing unnecessary bytes w/o storing */
1268 conn_closing, /**< closing this connection */
1269 conn_mwrite, /**< writing out many items sequentially */
1270 - conn_max_state /**< Max state value (used for assertion) */
1271 +#ifdef USE_REPLICATION
1272 + conn_repconnect, /**< replication connecting to master */
1273 + conn_rep_listen, /**< replication listening socket */
1274 + conn_pipe_recv, /**< replication command pipe recv */
1275 + conn_pipe_send, /**< replication command pipe send */
1276 +#endif /* USE_REPLICATION */
1277 + conn_max_state, /**< Max state value (used for assertion) */
1280 enum bin_substates {
1283 uint64_t get_misses;
1286 time_t started; /* when the process was started */
1288 bool accepting_conns; /* whether we are currently accepting */
1289 uint64_t listen_disabled_num;
1291 @@ -274,6 +282,11 @@
1293 int item_size_max; /* Maximum item size, and upper end for slabs */
1294 bool sasl; /* SASL on/off */
1295 +#ifdef USE_REPLICATION
1296 + struct in_addr rep_addr; /* replication addr */
1297 + int rep_port; /* replication port */
1298 + int rep_qmax; /* replication QITEM max */
1299 +#endif /*USE_REPLICATION*/
1302 extern struct stats stats;
1303 @@ -286,6 +299,10 @@
1305 #define ITEM_SLABBED 4
1307 +#ifdef USE_REPLICATION
1308 +#define ITEM_REPDATA 128
1309 +#endif /*USE_REPLICATION*/
1312 * Structure for storing items within memcached.
1314 @@ -438,6 +455,10 @@
1319 +#ifdef USE_REPLICATION
1320 +#include "replication.h"
1321 +#endif /* USE_REPLICATION */
1324 * Functions such as the libevent-related calls that need to do cross-thread
1325 --- memcached-1.4.4/replication.c Thu Jan 1 03:00:00 1970
1326 +++ repcached-2.2-1.4.4/replication.c Wed Feb 10 18:40:48 2010
1328 +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
1332 +#include "memcached.h"
1333 +#include "replication.h"
1334 +#include <stdlib.h>
1336 +#include <unistd.h>
1337 +#include <string.h>
1340 +static Q_ITEM *q_freelist = NULL;
1341 +static int q_itemcount = 0;
1342 +static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER;
1344 +int get_qi_count(void)
1347 + pthread_mutex_lock(&replication_queue_lock);
1349 + pthread_mutex_unlock(&replication_queue_lock);
1353 +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse)
1357 + uint32_t keylen = 0;
1358 + rel_time_t time = 0;
1360 + pthread_mutex_lock(&replication_queue_lock);
1363 + q_freelist = q->next;
1368 + pthread_mutex_unlock(&replication_queue_lock);
1371 + if(q_itemcount >= settings.rep_qmax) {
1372 + pthread_mutex_unlock(&replication_queue_lock);
1375 + q = malloc(sizeof(Q_ITEM));
1377 + fprintf(stderr,"replication: qi_new out of memory\n");
1378 + pthread_mutex_unlock(&replication_queue_lock);
1382 + if (settings.verbose > 2)
1383 + fprintf(stderr,"replication: alloc c=%d\n", q_itemcount);
1386 + pthread_mutex_unlock(&replication_queue_lock);
1389 + case REPLICATION_REP:
1390 + case REPLICATION_DEL:
1392 + keylen = cmd->keylen;
1394 + case REPLICATION_FLUSH_ALL:
1396 + case REPLICATION_DEFER_FLUSH_ALL:
1399 + case REPLICATION_MARUGOTO_END:
1402 + fprintf(stderr,"replication: got unknown command: %d\n", type);
1411 + q->key = malloc(keylen + 1);
1412 + if(NULL == q->key){
1416 + memcpy(q->key, key, keylen);
1417 + *(q->key + keylen) = 0;
1424 +void qi_free(Q_ITEM *q)
1431 + pthread_mutex_lock(&replication_queue_lock);
1432 + q->next = q_freelist;
1434 + pthread_mutex_unlock(&replication_queue_lock);
1443 + pthread_mutex_lock(&replication_queue_lock);
1444 + while((q = q_freelist)){
1447 + q_freelist = q->next;
1450 + pthread_mutex_unlock(&replication_queue_lock);
1454 +static int replication_get_num(char *p, int n)
1459 + l = sprintf(p, "%u", n);
1461 + l = sprintf(b, "%u", n);
1465 +int replication_call_rep(char *key, size_t keylen)
1469 + r.keylen = keylen;
1470 + return(replication(REPLICATION_REP, &r));
1473 +int replication_call_del(char *key, size_t keylen)
1477 + r.keylen = keylen;
1478 + return(replication(REPLICATION_DEL, &r));
1481 +int replication_call_flush_all()
1485 + return(replication(REPLICATION_FLUSH_ALL, &r));
1488 +int replication_call_defer_flush_all(const rel_time_t time)
1493 + return(replication(REPLICATION_DEFER_FLUSH_ALL, &r));
1496 +int replication_call_marugoto_end()
1500 + return(replication(REPLICATION_MARUGOTO_END, &r));
1503 +static int replication_alloc(conn *c, int s)
1508 + while(c->wsize < s)
1510 + if((p = malloc(c->wsize))){
1511 + memcpy(p, c->wbuf, c->wbytes);
1521 +static int replication_del(conn *c, char *k)
1524 + char *s = "delete ";
1531 + if(replication_alloc(c,l) == -1){
1532 + fprintf(stderr, "replication: del malloc error\n");
1535 + p = c->wbuf + c->wbytes;
1536 + memcpy(p, s, strlen(s));
1538 + memcpy(p, k, strlen(k));
1540 + memcpy(p, n, strlen(n));
1542 + c->wbytes = p - c->wbuf;
1543 + c->wcurr = c->wbuf;
1547 +static int replication_rep(conn *c, item *it)
1557 + exp = it->exptime + process_started;
1559 + if((p=ITEM_suffix(it))){
1561 + memcpy(flag, p, it->nsuffix - 2);
1562 + flag[it->nsuffix - 2] = 0;
1563 + for(i=0;i<strlen(flag);i++){
1567 + memmove(flag,&flag[i],strlen(flag)-i);
1568 + for(p=flag;*p>' ';p++);
1574 + len += strlen(flag);
1576 + len += replication_get_num(NULL, exp);
1578 + len += replication_get_num(NULL, it->nbytes - 2);
1580 + len += replication_get_num(NULL, ITEM_get_cas(it));
1582 + len += it->nbytes;
1584 + if(replication_alloc(c,len) == -1){
1585 + fprintf(stderr, "replication: rep malloc error\n");
1588 + p = c->wbuf + c->wbytes;
1589 + memcpy(p, s, strlen(s));
1591 + memcpy(p, ITEM_key(it), it->nkey);
1594 + memcpy(p, flag, strlen(flag));
1595 + p += strlen(flag);
1597 + p += replication_get_num(p, exp);
1599 + p += replication_get_num(p, it->nbytes - 2);
1601 + p += replication_get_num(p, ITEM_get_cas(it));
1602 + memcpy(p, n, strlen(n));
1604 + memcpy(p, ITEM_data(it), it->nbytes);
1606 + c->wbytes = p - c->wbuf;
1607 + c->wcurr = c->wbuf;
1611 +static int replication_flush_all(conn *c, rel_time_t exp)
1613 + char *s = "flush_all ";
1617 + int l = strlen(s) + strlen(n);
1619 + l += replication_get_num(NULL, exp);
1620 + if(replication_alloc(c,l) == -1){
1621 + fprintf(stderr, "replication: flush_all malloc error\n");
1624 + p = c->wbuf + c->wbytes;
1625 + memcpy(p, s, strlen(s));
1628 + p += replication_get_num(p, exp);
1629 + memcpy(p, n, strlen(n));
1631 + c->wbytes = p - c->wbuf;
1632 + c->wcurr = c->wbuf;
1636 +static int replication_marugoto_end(conn *c)
1638 + char *s = "marugoto_end";
1642 + int l = strlen(s) + strlen(n);
1643 + if(replication_alloc(c,l) == -1){
1644 + fprintf(stderr, "replication: marugoto_end malloc error\n");
1647 + p = c->wbuf + c->wbytes;
1648 + memcpy(p, s, strlen(s));
1650 + memcpy(p, n, strlen(n));
1652 + c->wbytes = p - c->wbuf;
1653 + c->wcurr = c->wbuf;
1657 +int replication_cmd(conn *c, Q_ITEM *q)
1662 + switch (q->type) {
1663 + case REPLICATION_REP:
1664 + it = item_get(q->key, strlen(q->key));
1666 + return(replication_del(c, q->key));
1667 + r = replication_rep(c, it);
1670 + case REPLICATION_DEL:
1671 + return(replication_del(c, q->key));
1672 + case REPLICATION_FLUSH_ALL:
1673 + return(replication_flush_all(c, 0));
1674 + case REPLICATION_DEFER_FLUSH_ALL:
1675 + return(replication_flush_all(c, q->time));
1676 + case REPLICATION_MARUGOTO_END:
1677 + return(replication_marugoto_end(c));
1679 + fprintf(stderr,"replication: got unknown command:%d\n", q->type);
1683 --- memcached-1.4.4/replication.h Thu Jan 1 03:00:00 1970
1684 +++ repcached-2.2-1.4.4/replication.h Wed Feb 10 18:40:31 2010
1686 +#ifndef MEMCACHED_REPLICATION_H
1687 +#define MEMCACHED_REPLICATION_H
1688 +#define REPCACHED_VERSION "2.2"
1694 + REPLICATION_FLUSH_ALL,
1695 + REPLICATION_DEFER_FLUSH_ALL,
1696 + REPLICATION_MARUGOTO_END,
1699 +typedef struct queue_item_t Q_ITEM;
1700 +struct queue_item_t {
1701 + enum CMD_TYPE type;
1707 +typedef struct replication_cmd_t R_CMD;
1708 +struct replication_cmd_t {
1714 +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool);
1715 +void qi_free(Q_ITEM *);
1716 +int qi_free_list(void);
1717 +int replication_cmd(conn *, Q_ITEM *);
1718 +int get_qi_count(void);
1720 +int replication_call_rep(char *key, size_t keylen);
1721 +int replication_call_del(char *key, size_t keylen);
1722 +int replication_call_flush_all(void);
1723 +int replication_call_defer_flush_all(const rel_time_t time);
1724 +int replication_call_marugoto_end(void);
1725 +int replication(enum CMD_TYPE type, R_CMD *cmd);
1728 --- memcached-1.4.4/t/binary.t Fri Nov 27 08:05:16 2009
1729 +++ repcached-2.2-1.4.4/t/binary.t Wed Feb 10 17:04:01 2010
1734 -use Test::More tests => 3349;
1736 use FindBin qw($Bin);
1740 +Test::More::plan(tests => support_replication() ? 3385 : 3349);
1742 my $server = new_memcached();
1743 ok($server, "started the server");
1744 --- memcached-1.4.4/t/issue_67.t Sun Nov 1 01:44:09 2009
1745 +++ repcached-2.2-1.4.4/t/issue_67.t Wed Feb 10 17:50:12 2010
1747 my $exe = "$builddir/memcached-debug";
1748 croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe;
1750 + if (support_replication()) {
1754 my $childpid = fork();
1756 my $cmd = "$builddir/timedrun 10 $exe $args";
1757 --- memcached-1.4.4/t/lib/MemcachedTest.pm Fri Oct 30 04:24:52 2009
1758 +++ repcached-2.2-1.4.4/t/lib/MemcachedTest.pm Wed Feb 10 17:53:34 2010
1762 @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats
1763 - supports_sasl free_port);
1764 + supports_sasl free_port support_replication memcached_version
1769 @@ -148,6 +149,23 @@
1773 +sub support_replication {
1774 + my $output = `$builddir/memcached-debug -h`;
1775 + return 1 if $output =~ /^-x <ip_addr>/m;
1779 +sub memcached_version {
1780 + my $output = `$builddir/memcached-debug -h`;
1781 + return $1 if $output =~ /^memcached (\d[\d\.]+)/;
1786 + my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/);
1787 + return $major*100**2 + $minor*100 + $pl
1791 my ($args, $passed_port) = @_;
1792 my $port = $passed_port || free_port();
1796 $args .= " -u root";
1798 + if (support_replication() && $args !~ m/-X/) {
1802 my $childpid = fork();
1803 --- memcached-1.4.4/t/stats.t Thu Nov 26 03:37:49 2009
1804 +++ repcached-2.2-1.4.4/t/stats.t Tue Feb 9 23:13:26 2010
1806 my $stats = mem_stats($sock);
1808 # Test number of keys
1809 -is(scalar(keys(%$stats)), 37, "37 stats values");
1810 +if (! support_replication()) {
1811 + is(scalar(keys(%$stats)), 37, "37 stats values");
1813 + is(scalar(keys(%$stats)), 40, "40 stats values");
1816 # Test initial state
1817 foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses
1818 --- memcached-1.4.4/testapp.c Wed Nov 25 03:40:29 2009
1819 +++ repcached-2.2-1.4.4/testapp.c Wed Feb 10 17:52:05 2010
1820 @@ -300,6 +300,10 @@
1824 +#ifdef USE_REPLICATION
1825 + argv[arg++] = "-X";
1826 + argv[arg++] = "0";
1828 /* Handle rpmbuild and the like doing this as root */
1829 if (getuid() == 0) {