1 diff -urN --exclude *.m4 memcached-1.4.4/Makefile.am repcached-2.2-1.4.4/Makefile.am
2 --- memcached-1.4.4/Makefile.am Fri Oct 30 04:24:52 2009
3 +++ repcached-2.2-1.4.4/Makefile.am Tue Feb 9 23:02:45 2010
5 memcached_SOURCES += sasl_defs.c
9 +memcached_SOURCES += replication.h replication.c
12 memcached_debug_SOURCES = $(memcached_SOURCES)
13 memcached_CPPFLAGS = -DNDEBUG
14 memcached_debug_LDADD = @PROFILER_LDFLAGS@
15 diff -urN --exclude *.m4 memcached-1.4.4/Makefile.in repcached-2.2-1.4.4/Makefile.in
16 diff -urN --exclude *.m4 memcached-1.4.4/assoc.c repcached-2.2-1.4.4/assoc.c
17 --- memcached-1.4.4/assoc.c Sat Oct 24 00:38:01 2009
18 +++ repcached-2.2-1.4.4/assoc.c Tue Feb 9 23:02:45 2010
23 +#ifdef USE_REPLICATION
24 +char *assoc_key_snap(int *n)
32 + int hm = hashsize(hashpower);
36 + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
37 + i = old_hashtable[hs];
39 + i = primary_hashtable[hs];
49 + if((p = b = malloc(sz))){
52 + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
53 + i = old_hashtable[hs];
55 + i = primary_hashtable[hs];
58 + memcpy(p, ITEM_key(i), i->nkey);
70 +#endif /* USE_REPLICATION */
71 diff -urN --exclude *.m4 memcached-1.4.4/assoc.h repcached-2.2-1.4.4/assoc.h
72 --- memcached-1.4.4/assoc.h Sun Aug 30 03:00:58 2009
73 +++ repcached-2.2-1.4.4/assoc.h Tue Feb 9 23:02:45 2010
75 int start_assoc_maintenance_thread(void);
76 void stop_assoc_maintenance_thread(void);
78 +#ifdef USE_REPLICATION
79 +char *assoc_key_snap(int *n);
80 +#endif /*USE_REPLICATION*/
81 diff -urN --exclude *.m4 memcached-1.4.4/config.guess repcached-2.2-1.4.4/config.guess
82 diff -urN --exclude *.m4 memcached-1.4.4/config.h.in repcached-2.2-1.4.4/config.h.in
83 --- memcached-1.4.4/config.h.in Fri Nov 27 09:34:56 2009
84 +++ repcached-2.2-1.4.4/config.h.in Wed Feb 10 19:12:46 2010
86 /* Define to 1 if you have the ANSI C header files. */
89 +/* Define this if you want to use replication */
90 +#undef USE_REPLICATION
92 /* Version number of package */
95 diff -urN --exclude *.m4 memcached-1.4.4/config.sub repcached-2.2-1.4.4/config.sub
96 diff -urN --exclude *.m4 memcached-1.4.4/configure repcached-2.2-1.4.4/configure
97 diff -urN --exclude *.m4 memcached-1.4.4/configure.ac repcached-2.2-1.4.4/configure.ac
98 --- memcached-1.4.4/configure.ac Wed Nov 25 03:40:29 2009
99 +++ repcached-2.2-1.4.4/configure.ac Tue Feb 9 23:02:45 2010
101 AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
104 +dnl Check whether the user wants replication or not
105 +AC_ARG_ENABLE(replication,
106 + [AS_HELP_STRING([--enable-replication],[support replication])],
107 + [if test "x$enable_threads" = "xyes"; then
108 + AC_MSG_ERROR([Can't enable threads and replication together.])
110 + AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication])
114 +AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes")
116 AC_CHECK_FUNCS(mlockall)
117 AC_CHECK_FUNCS(getpagesizes)
118 AC_CHECK_FUNCS(memcntl)
119 diff -urN --exclude *.m4 memcached-1.4.4/doc/Makefile repcached-2.2-1.4.4/doc/Makefile
120 diff -urN --exclude *.m4 memcached-1.4.4/items.c repcached-2.2-1.4.4/items.c
121 --- memcached-1.4.4/items.c Sat Oct 24 00:38:01 2009
122 +++ repcached-2.2-1.4.4/items.c Tue Feb 9 23:02:45 2010
127 +#ifdef USE_REPLICATION
128 + replication_call_del(ITEM_key(search), search->nkey);
129 +#endif /* USE_REPLICATION */
131 do_item_unlink(search);
134 stats.total_items += 1;
137 +#ifdef USE_REPLICATION
138 + /* Allocate a new CAS ID on link. */
139 + if(!(it->it_flags & ITEM_REPDATA))
140 + ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
142 /* Allocate a new CAS ID on link. */
143 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
144 +#endif /* USE_REPLICATION */
148 diff -urN --exclude *.m4 memcached-1.4.4/memcached.c repcached-2.2-1.4.4/memcached.c
149 --- memcached-1.4.4/memcached.c Fri Nov 27 08:45:13 2009
150 +++ repcached-2.2-1.4.4/memcached.c Wed Feb 10 16:08:37 2010
153 static void conn_free(conn *c);
155 +#ifdef USE_REPLICATION
156 +static int rep_exit = 0;
157 +static conn *rep_recv = NULL;
158 +static conn *rep_send = NULL;
159 +static conn *rep_conn = NULL;
160 +static conn *rep_serv = NULL;
161 +static int server_socket_replication(const int);
162 +static void server_close_replication(void);
163 +static int replication_init(void);
164 +static int replication_server_init(void);
165 +static int replication_client_init(void);
166 +static int replication_start(void);
167 +static int replication_connect(void);
168 +static int replication_close(void);
169 +static void replication_dispatch_close(void);
170 +static int replication_marugoto(int);
171 +static int replication_send(conn *);
172 +static int replication_pop(void);
173 +static int replication_push(void);
174 +static int replication_exit(void);
175 +static int replication_item(Q_ITEM *);
176 +static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER;
177 +#endif /* USE_REPLICATION */
179 /** exported globals **/
181 struct settings settings;
183 settings.backlog = 1024;
184 settings.binding_protocol = negotiating_prot;
185 settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
186 +#ifdef USE_REPLICATION
187 + settings.rep_addr.s_addr = htonl(INADDR_ANY);
188 + settings.rep_port = 11212;
189 + settings.rep_qmax = 8192;
190 +#endif /* USE_REPLICATION */
195 prot_text(c->protocol));
196 } else if (IS_UDP(transport)) {
197 fprintf(stderr, "<%d server listening (udp)\n", sfd);
198 +#ifdef USE_REPLICATION
199 + } else if (init_state == conn_rep_listen) {
200 + fprintf(stderr, "<%d server listening (replication)\n", sfd);
201 +#endif /* USE_REPLICATION */
202 } else if (c->protocol == negotiating_prot) {
203 fprintf(stderr, "<%d new auto-negotiating client connection\n",
214 + "conn_pipe_send" };
215 return statenames[state];
222 +#ifdef USE_REPLICATION
223 + if (c == rep_conn){
224 + if (settings.verbose > 1)
225 + fprintf(stderr, "REP>%d %s\n", c->sfd, str);
226 + conn_set_state(c, conn_new_cmd);
229 +#endif /* USE_REPLICATION */
231 if (settings.verbose > 1)
232 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
235 enum store_item_type ret;
237 - pthread_mutex_lock(&c->thread->stats.mutex);
238 - c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
239 - pthread_mutex_unlock(&c->thread->stats.mutex);
241 + pthread_mutex_lock(&c->thread->stats.mutex);
242 + c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
243 + pthread_mutex_unlock(&c->thread->stats.mutex);
246 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
247 out_string(c, "CLIENT_ERROR bad data chunk");
252 +#ifdef USE_REPLICATION
253 + if( c != rep_conn ){
254 + replication_call_rep(ITEM_key(it), it->nkey);
256 +#endif /* USE_REPLICATION */
257 out_string(c, "STORED");
260 @@ -2410,6 +2462,11 @@
261 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
262 APPEND_STAT("threads", "%d", settings.num_threads);
263 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
264 +#ifdef USE_REPLICATION
265 + APPEND_STAT("replication", "MASTER", 0);
266 + APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION);
267 + APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count());
268 +#endif /*USE_REPLICATION*/
272 @@ -2797,6 +2854,11 @@
273 switch(add_delta(c, it, incr, delta, temp)) {
276 +#ifdef USE_REPLICATION
277 + if( c != rep_conn){
278 + replication_call_rep(ITEM_key(it), it->nkey);
280 +#endif /* USE_REPLICATION */
283 out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
284 @@ -2911,17 +2973,25 @@
286 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
288 - pthread_mutex_lock(&c->thread->stats.mutex);
289 - c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
290 - pthread_mutex_unlock(&c->thread->stats.mutex);
292 + pthread_mutex_lock(&c->thread->stats.mutex);
293 + c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
294 + pthread_mutex_unlock(&c->thread->stats.mutex);
298 item_remove(it); /* release our reference */
299 +#ifdef USE_REPLICATION
300 + if( c != rep_conn )
301 + replication_call_del(key, nkey);
302 +#endif /* USE_REPLICATION */
303 out_string(c, "DELETED");
305 - pthread_mutex_lock(&c->thread->stats.mutex);
306 - c->thread->stats.delete_misses++;
307 - pthread_mutex_unlock(&c->thread->stats.mutex);
309 + pthread_mutex_lock(&c->thread->stats.mutex);
310 + c->thread->stats.delete_misses++;
311 + pthread_mutex_unlock(&c->thread->stats.mutex);
314 out_string(c, "NOT_FOUND");
316 @@ -2986,6 +3056,22 @@
318 process_update_command(c, tokens, ntokens, comm, true);
320 +#ifdef USE_REPLICATION
321 + } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) {
323 + process_update_command(c, tokens, ntokens, comm, true);
325 + ((item *)(c->item))->it_flags |= ITEM_REPDATA;
327 + } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) {
328 + if(replication_start() == -1)
329 + exit(EXIT_FAILURE);
330 + if (settings.verbose > 0)
331 + fprintf(stderr,"replication: start\n");
332 + out_string(c, "OK");
335 +#endif /* USE_REPLICATION */
336 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
338 process_arithmetic_command(c, tokens, ntokens, 1);
339 @@ -3012,11 +3098,17 @@
341 set_noreply_maybe(c, tokens, ntokens);
343 - pthread_mutex_lock(&c->thread->stats.mutex);
344 - c->thread->stats.flush_cmds++;
345 - pthread_mutex_unlock(&c->thread->stats.mutex);
347 + pthread_mutex_lock(&c->thread->stats.mutex);
348 + c->thread->stats.flush_cmds++;
349 + pthread_mutex_unlock(&c->thread->stats.mutex);
352 if(ntokens == (c->noreply ? 3 : 2)) {
353 +#ifdef USE_REPLICATION
354 + if( c != rep_conn )
355 + replication_call_flush_all();
357 settings.oldest_live = current_time - 1;
358 item_flush_expired();
360 @@ -3029,6 +3121,11 @@
364 +#ifdef USE_REPLICATION
365 + if( c != rep_conn )
366 + replication_call_defer_flush_all(realtime(exptime) + process_started);
368 + settings.oldest_live = realtime(exptime) - 1;
370 If exptime is zero realtime() would return zero too, and
371 realtime(exptime) - 1 would overflow to the max unsigned
372 @@ -3275,9 +3372,11 @@
373 int avail = c->rsize - c->rbytes;
374 res = read(c->sfd, c->rbuf + c->rbytes, avail);
376 - pthread_mutex_lock(&c->thread->stats.mutex);
377 - c->thread->stats.bytes_read += res;
378 - pthread_mutex_unlock(&c->thread->stats.mutex);
380 + pthread_mutex_lock(&c->thread->stats.mutex);
381 + c->thread->stats.bytes_read += res;
382 + pthread_mutex_unlock(&c->thread->stats.mutex);
384 gotdata = READ_DATA_RECEIVED;
387 @@ -3423,6 +3522,12 @@
391 +#ifdef USE_REPLICATION
392 + if(rep_exit && (c->state != conn_pipe_recv)){
395 +#endif /* USE_REPLICATION */
400 @@ -3502,9 +3607,11 @@
402 reset_cmd_handler(c);
404 - pthread_mutex_lock(&c->thread->stats.mutex);
405 - c->thread->stats.conn_yields++;
406 - pthread_mutex_unlock(&c->thread->stats.mutex);
408 + pthread_mutex_lock(&c->thread->stats.mutex);
409 + c->thread->stats.conn_yields++;
410 + pthread_mutex_unlock(&c->thread->stats.mutex);
413 /* We have already read in data into the input buffer,
414 so libevent will most likely not signal read events
415 @@ -3545,9 +3652,11 @@
416 /* now try reading from the socket */
417 res = read(c->sfd, c->ritem, c->rlbytes);
419 - pthread_mutex_lock(&c->thread->stats.mutex);
420 - c->thread->stats.bytes_read += res;
421 - pthread_mutex_unlock(&c->thread->stats.mutex);
423 + pthread_mutex_lock(&c->thread->stats.mutex);
424 + c->thread->stats.bytes_read += res;
425 + pthread_mutex_unlock(&c->thread->stats.mutex);
427 if (c->rcurr == c->ritem) {
430 @@ -3600,9 +3709,11 @@
431 /* now try reading from the socket */
432 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
434 - pthread_mutex_lock(&c->thread->stats.mutex);
435 - c->thread->stats.bytes_read += res;
436 - pthread_mutex_unlock(&c->thread->stats.mutex);
438 + pthread_mutex_lock(&c->thread->stats.mutex);
439 + c->thread->stats.bytes_read += res;
440 + pthread_mutex_unlock(&c->thread->stats.mutex);
445 @@ -3698,6 +3809,10 @@
447 if (IS_UDP(c->transport))
449 +#ifdef USE_REPLICATION
450 + else if(c == rep_conn)
451 + replication_close();
452 +#endif /*USE_REPLICATION*/
456 @@ -3706,6 +3821,70 @@
461 +#ifdef USE_REPLICATION
462 + case conn_pipe_recv:
463 + if(replication_pop()){
464 + replication_close();
466 + replication_send(rep_conn);
471 + case conn_rep_listen:
472 + if (settings.verbose > 0)
473 + fprintf(stderr,"replication: accept\n");
474 + addrlen = sizeof(addr);
475 + res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
477 + if(errno == EAGAIN || errno == EWOULDBLOCK) {
478 + } else if (errno == EMFILE) {
479 + fprintf(stderr, "replication: Too many opened connections\n");
481 + fprintf(stderr, "replication: accept error\n");
486 + fprintf(stderr,"replication: already connected\n");
488 + if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){
490 + fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n");
492 + server_close_replication();
493 + rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
494 + rep_conn->item = NULL;
495 + rep_conn->rbytes = 0;
496 + rep_conn->rcurr = rep_conn->rbuf;
497 + replication_connect();
498 + replication_marugoto(1);
499 + replication_marugoto(0);
506 + case conn_repconnect:
508 + replication_connect();
509 + conn_set_state(c, conn_read);
510 + if (settings.verbose > 0)
511 + fprintf(stderr,"replication: marugoto copying\n");
512 + if(!update_event(c, EV_READ | EV_PERSIST)){
513 + fprintf(stderr, "replication: Couldn't update event\n");
514 + conn_set_state(c, conn_closing);
519 + case conn_pipe_send:
520 + /* should not happen */
521 + fprintf(stderr, "replication: unexpected conn_pipe_send state\n");
523 +#endif /* USE_REPLICATION */
527 @@ -4002,6 +4181,89 @@
531 +#ifdef USE_REPLICATION
532 +static int server_socket_replication(const int port) {
534 + struct linger ling = {0, 0};
535 + struct addrinfo *ai;
536 + struct addrinfo *next;
537 + struct addrinfo hints;
538 + char port_buf[NI_MAXSERV];
544 + memset(&hints, 0, sizeof (hints));
545 + hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
546 + hints.ai_family = AF_UNSPEC;
547 + hints.ai_protocol = IPPROTO_TCP;
548 + hints.ai_socktype = SOCK_STREAM;
549 + snprintf(port_buf, NI_MAXSERV, "%d", port);
550 + error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
552 + if (error != EAI_SYSTEM)
553 + fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
555 + perror("getaddrinfo()");
560 + for (next= ai; next; next= next->ai_next) {
561 + conn *rep_serv_add;
562 + if ((sfd = new_socket(next)) == -1) {
566 + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
567 + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
568 + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
569 + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
571 + if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
572 + if (errno != EADDRINUSE) {
582 + if (listen(sfd, 1024) == -1) {
583 + perror("listen()");
590 + if (!(rep_serv_add = conn_new(sfd, conn_rep_listen,
591 + EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) {
592 + fprintf(stderr, "failed to create replication server connection\n");
593 + exit(EXIT_FAILURE);
596 + rep_serv_add->next = rep_serv;
597 + rep_serv = rep_serv_add;
602 + /* Return zero iff we detected no errors in starting up connections */
603 + return success == 0;
606 +static void server_close_replication(void) {
608 + conn_close(rep_serv);
609 + rep_serv = rep_serv->next;
612 +#endif /* USE_REPLICATION */
615 * We keep the current time of day in a global variable that's updated by a
616 * timer event. This saves us a bunch of time() system calls (we really only
617 @@ -4041,6 +4303,9 @@
619 static void usage(void) {
620 printf(PACKAGE " " VERSION "\n");
621 +#ifdef USE_REPLICATION
622 + printf("repcached %s\n",REPCACHED_VERSION);
623 +#endif /* USE_REPLICATION */
624 printf("-p <num> TCP port number to listen on (default: 11211)\n"
625 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
626 "-s <file> UNIX socket path to listen on (disables network support)\n"
627 @@ -4088,6 +4353,10 @@
629 printf("-S Turn on Sasl authentication\n");
631 +#ifdef USE_REPLICATION
632 + printf("-x <ip_addr> hostname or IP address of peer repcached\n");
633 + printf("-X <num> TCP port number for replication (default: 11212)\n");
634 +#endif /* USE_REPLICATION */
638 @@ -4194,6 +4463,26 @@
642 +#ifdef USE_REPLICATION
643 +static void sig_handler_cb(int fd, short event, void *arg)
645 + struct event *signal = arg;
647 + if (settings.verbose)
648 + fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal));
650 + if (replication_exit()) {
651 + exit(EXIT_FAILURE);
654 + pthread_mutex_lock(&replication_pipe_lock);
656 + exit(EXIT_SUCCESS);
658 + pthread_mutex_unlock(&replication_pipe_lock);
660 +#endif /* USE_REPLICATION */
662 #ifndef HAVE_SIGIGNORE
663 static int sigignore(int sig) {
664 struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
665 @@ -4249,6 +4538,57 @@
669 +static void create_listening_sockets(void)
671 + /* create unix mode sockets after dropping privileges */
672 + if (settings.socketpath != NULL) {
674 + if (server_socket_unix(settings.socketpath,settings.access)) {
675 + vperror("failed to listen on UNIX socket: %s", settings.socketpath);
680 + /* create the listening socket, bind it, and init */
681 + if (settings.socketpath == NULL) {
682 + const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
683 + char temp_portnumber_filename[PATH_MAX];
684 + FILE *portnumber_file = NULL;
686 + if (portnumber_filename != NULL) {
687 + snprintf(temp_portnumber_filename,
688 + sizeof(temp_portnumber_filename),
689 + "%s.lck", portnumber_filename);
691 + portnumber_file = fopen(temp_portnumber_filename, "a");
692 + if (portnumber_file == NULL) {
693 + fprintf(stderr, "Failed to open \"%s\": %s\n",
694 + temp_portnumber_filename, strerror(errno));
699 + if (settings.port && server_socket(settings.port, tcp_transport,
700 + portnumber_file)) {
701 + vperror("failed to listen on TCP port %d", settings.port);
705 + /* create the UDP listening socket and bind it */
707 + if (settings.udpport && server_socket(settings.udpport, udp_transport,
708 + portnumber_file)) {
709 + vperror("failed to listen on UDP port %d", settings.udpport);
713 + if (portnumber_file) {
714 + fclose(portnumber_file);
715 + rename(temp_portnumber_filename, portnumber_filename);
720 int main (int argc, char **argv) {
722 bool lock_memory = false;
723 @@ -4261,6 +4601,11 @@
727 +#ifdef USE_REPLICATION
728 + struct in_addr addr;
729 + struct addrinfo master_hint;
730 + struct addrinfo *master_addr;
731 +#endif /* USE_REPLICATION */
732 /* listening sockets */
733 static int *l_socket = NULL;
735 @@ -4307,6 +4652,11 @@
736 "B:" /* Binding protocol */
737 "I:" /* Max item size */
739 +#ifdef USE_REPLICATION
740 + "X:" /* replication port */
741 + "x:" /* replication master */
742 + "q:" /* replication queue length */
743 +#endif /* USE_REPLICATION */
747 @@ -4462,6 +4812,31 @@
751 +#ifdef USE_REPLICATION
753 + if (inet_pton(AF_INET, optarg, &addr) <= 0) {
754 + memset(&master_hint, 0, sizeof(master_hint));
755 + master_hint.ai_flags = 0;
756 + master_hint.ai_socktype = 0;
757 + master_hint.ai_protocol = 0;
758 + if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){
759 + settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr;
760 + freeaddrinfo(master_addr);
762 + fprintf(stderr, "Illegal address: %s\n", optarg);
766 + settings.rep_addr = addr;
770 + settings.rep_port = atoi(optarg);
773 + settings.rep_qmax = atoi(optarg);
775 +#endif /* USE_REPLICATION */
776 case 'S': /* set Sasl authentication to true. Default is false */
778 fprintf(stderr, "This server is not built with SASL support.\n");
779 @@ -4587,6 +4962,17 @@
780 /* initialize main thread libevent instance */
781 main_base = event_init();
783 +#ifdef USE_REPLICATION
784 + /* register events for SIGINT and SIGTERM to handle them in main thread */
785 + struct event signal_int, signal_term;
786 + event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
788 + event_add(&signal_int, NULL);
789 + event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
791 + event_add(&signal_term, NULL);
794 /* initialize other stuff */
797 @@ -4615,63 +5001,21 @@
798 /* initialise clock event */
799 clock_handler(0, 0, 0);
801 - /* create unix mode sockets after dropping privileges */
802 - if (settings.socketpath != NULL) {
804 - if (server_socket_unix(settings.socketpath,settings.access)) {
805 - vperror("failed to listen on UNIX socket: %s", settings.socketpath);
810 - /* create the listening socket, bind it, and init */
811 - if (settings.socketpath == NULL) {
814 - const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
815 - char temp_portnumber_filename[PATH_MAX];
816 - FILE *portnumber_file = NULL;
818 - if (portnumber_filename != NULL) {
819 - snprintf(temp_portnumber_filename,
820 - sizeof(temp_portnumber_filename),
821 - "%s.lck", portnumber_filename);
823 - portnumber_file = fopen(temp_portnumber_filename, "a");
824 - if (portnumber_file == NULL) {
825 - fprintf(stderr, "Failed to open \"%s\": %s\n",
826 - temp_portnumber_filename, strerror(errno));
831 - if (settings.port && server_socket(settings.port, tcp_transport,
832 - portnumber_file)) {
833 - vperror("failed to listen on TCP port %d", settings.port);
838 - * initialization order: first create the listening sockets
839 - * (may need root on low ports), then drop root if needed,
840 - * then daemonise if needed, then init libevent (in some cases
841 - * descriptors created by libevent wouldn't survive forking).
843 - udp_port = settings.udpport ? settings.udpport : settings.port;
845 - /* create the UDP listening socket and bind it */
847 - if (settings.udpport && server_socket(settings.udpport, udp_transport,
848 - portnumber_file)) {
849 - vperror("failed to listen on UDP port %d", settings.udpport);
853 + * initialization order: first create the listening sockets
854 + * (may need root on low ports), then drop root if needed,
855 + * then daemonise if needed, then init libevent (in some cases
856 + * descriptors created by libevent wouldn't survive forking).
859 - if (portnumber_file) {
860 - fclose(portnumber_file);
861 - rename(temp_portnumber_filename, portnumber_filename);
863 +#ifdef USE_REPLICATION
864 + if(replication_init() == -1){
865 + fprintf(stderr, "faild to replication init\n");
866 + exit(EXIT_FAILURE);
869 + create_listening_sockets();
872 /* Drop privileges no longer needed */
874 @@ -4694,3 +5038,401 @@
879 +#ifdef USE_REPLICATION
880 +static int replication_start(void)
882 + static int start = 0;
886 + create_listening_sockets();
892 +static int replication_server_init(void)
897 + if(server_socket_replication(settings.rep_port)){
898 + fprintf(stderr, "replication: failed to initialize replication server socket\n");
901 + if (settings.verbose > 0)
902 + fprintf(stderr, "replication: listen\n");
903 + return(replication_start());
906 +static int replication_client_init(void)
910 + struct addrinfo ai;
911 + struct sockaddr_in server;
917 + memset(&ai,0,sizeof(ai));
918 + ai.ai_family = AF_INET;
919 + ai.ai_socktype = SOCK_STREAM;
920 + s = new_socket(&ai);
923 + fprintf(stderr, "replication: failed to replication client socket\n");
927 + memset((char *)&server, 0, sizeof(server));
928 + server.sin_family = AF_INET;
929 + server.sin_addr = settings.rep_addr;
930 + server.sin_port = htons(settings.rep_port);
931 + if (settings.verbose > 0)
932 + fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port);
933 + if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){
934 + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
936 + fprintf(stderr, "replication: failed to create client conn");
942 + if(errno == EINPROGRESS){
943 + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
945 + fprintf(stderr, "replication: failed to create client conn");
950 + fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port));
959 +static int replication_init(void)
961 + if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){
962 + if(replication_client_init() != -1){
966 + return(replication_server_init());
969 +static int replication_connect(void)
975 + fprintf(stderr, "replication: can't create pipe\n");
978 + if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) {
979 + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
982 + if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) {
983 + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
986 + pthread_mutex_lock(&replication_pipe_lock);
987 + rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
988 + rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
989 + event_del(&rep_send->event);
990 + pthread_mutex_unlock(&replication_pipe_lock);
995 +static int replication_close(void)
1001 + if(settings.verbose > 0)
1002 + fprintf(stderr,"replication: close\n");
1004 + rep_recv->rbytes = sizeof(q);
1005 + rep_recv->rcurr = rep_recv->rbuf;
1008 + r = read(rep_recv->sfd, rep_recv->rcurr, rep_recv->rbytes);
1012 + rep_recv->rbytes -= r;
1013 + rep_recv->rcurr += r;
1014 + if(!rep_recv->rbytes){
1015 + memcpy(&q, rep_recv->rbuf, sizeof(q));
1016 + rep_recv->rbytes = sizeof(q);
1017 + rep_recv->rcurr = rep_recv->rbuf;
1022 + conn_close(rep_recv);
1024 + if (settings.verbose > 1) {
1025 + fprintf(stderr, "replication: qitem free %d items\n", qi_free_list());
1026 + fprintf(stderr, "replication: close recv %d items\n", c);
1029 + pthread_mutex_lock(&replication_pipe_lock);
1031 + conn_close(rep_send);
1033 + if (settings.verbose > 1)
1034 + fprintf(stderr,"replication: close send\n");
1036 + pthread_mutex_unlock(&replication_pipe_lock);
1038 + conn_close(rep_conn);
1040 + if (settings.verbose > 1)
1041 + fprintf(stderr,"replication: close conn\n");
1044 + replication_server_init();
1048 +static void replication_dispatch_close(void)
1050 + if (settings.verbose > 1)
1051 + fprintf(stderr, "replication: dispatch close\n");
1052 + pthread_mutex_lock(&replication_pipe_lock);
1054 + conn_close(rep_send);
1057 + pthread_mutex_unlock(&replication_pipe_lock);
1060 +static int replication_marugoto(int f)
1062 + static int keysend = 0;
1063 + static int keycount = 0;
1064 + static char *keylist = NULL;
1065 + static char *keyptr = NULL;
1075 + pthread_mutex_lock(&cache_lock);
1076 + keylist = (char *)assoc_key_snap((int *)&keycount);
1077 + pthread_mutex_unlock(&cache_lock);
1080 + replication_call_marugoto_end();
1082 + if (settings.verbose > 0)
1083 + fprintf(stderr,"replication: marugoto start\n");
1088 + item *it = item_get(keyptr, strlen(keyptr));
1091 + if(replication_call_rep(keyptr, strlen(keyptr)) == -1){
1095 + keyptr += strlen(keyptr) + 1;
1099 + keyptr += strlen(keyptr) + 1;
1101 + if(settings.verbose > 0)
1102 + fprintf(stderr,"replication: marugoto %d\n", keysend);
1103 + replication_call_marugoto_end();
1104 + if(settings.verbose > 0)
1105 + fprintf(stderr,"replication: marugoto owari\n");
1116 +static int replication_send(conn *c)
1119 + int w = write(c->sfd, c->wcurr, c->wbytes);
1121 + if(errno == EAGAIN || errno == EINTR){
1123 + fprintf(stderr,"replication: send error\n");
1124 + replication_close();
1132 + return(c->wbytes);
1135 +static int replication_pop(void)
1142 + if(settings.verbose > 1)
1143 + fprintf(stderr, "replication: pop\n");
1148 + r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize);
1150 + if(errno == EAGAIN || errno == EINTR){
1152 + fprintf(stderr,"replication: pop error %d\n", errno);
1156 + /* other end closed, trigger replication_close() */
1159 + c = r / sizeof(Q_ITEM *);
1160 + m = r % sizeof(Q_ITEM *);
1161 + q = (Q_ITEM **)(rep_recv->rbuf);
1164 + if(rep_conn && replication_cmd(rep_conn, q[c])){
1165 + replication_item(q[c]); /* error retry */
1171 + if (settings.verbose)
1172 + fprintf(stderr,"replication: cleanup start\n");
1179 + if(rep_conn->wbytes){
1181 + if(replication_exit()){
1182 + replication_close();
1183 + fprintf(stderr,"replication: cleanup error\n");
1184 + exit(EXIT_FAILURE);
1188 + replication_close();
1189 + if (settings.verbose)
1190 + fprintf(stderr,"replication: cleanup complete\n");
1191 + exit(EXIT_SUCCESS);
1194 + replication_marugoto(0);
1198 +static int replication_push(void)
1202 + while(rep_send->wbytes){
1203 + w = write(rep_send->sfd, rep_send->wcurr, rep_send->wbytes);
1205 + if(errno == EAGAIN || errno == EINTR){
1206 + fprintf(stderr,"replication: push EAGAIN or EINTR\n");
1211 + rep_send->wbytes -= w;
1212 + rep_send->wcurr += w;
1215 + rep_send->wcurr = rep_send->wbuf;
1219 +static int replication_exit(void)
1221 + return(replication_item(NULL));
1224 +static int replication_item(Q_ITEM *q)
1226 + pthread_mutex_lock(&replication_pipe_lock);
1228 + pthread_mutex_unlock(&replication_pipe_lock);
1231 + if(rep_send->wcurr + rep_send->wbytes + sizeof(q) > rep_send->wbuf + rep_send->wsize){
1232 + fprintf(stderr,"replication: buffer over fllow\n");
1236 + pthread_mutex_unlock(&replication_pipe_lock);
1237 + replication_dispatch_close();
1240 + memcpy(rep_send->wcurr + rep_send->wbytes, &q, sizeof(q));
1241 + rep_send->wbytes += sizeof(q);
1242 + if(replication_push()){
1243 + fprintf(stderr, "replication: push error\n");
1247 + pthread_mutex_unlock(&replication_pipe_lock);
1248 + replication_dispatch_close();
1251 + pthread_mutex_unlock(&replication_pipe_lock);
1255 +int replication(enum CMD_TYPE type, R_CMD *cmd)
1259 + pthread_mutex_lock(&replication_pipe_lock);
1261 + pthread_mutex_unlock(&replication_pipe_lock);
1264 + pthread_mutex_unlock(&replication_pipe_lock);
1266 + if((q = qi_new(type, cmd, false))) {
1267 + replication_item(q);
1269 + fprintf(stderr,"replication: can't create Q_ITEM\n");
1270 + replication_dispatch_close();
1275 +#endif /* USE_REPLICATION */
1276 diff -urN --exclude *.m4 memcached-1.4.4/memcached.h repcached-2.2-1.4.4/memcached.h
1277 --- memcached-1.4.4/memcached.h Thu Nov 26 03:37:49 2009
1278 +++ repcached-2.2-1.4.4/memcached.h Tue Feb 9 23:02:45 2010
1279 @@ -144,7 +144,13 @@
1280 conn_swallow, /**< swallowing unnecessary bytes w/o storing */
1281 conn_closing, /**< closing this connection */
1282 conn_mwrite, /**< writing out many items sequentially */
1283 - conn_max_state /**< Max state value (used for assertion) */
1284 +#ifdef USE_REPLICATION
1285 + conn_repconnect, /**< replication connecting to master */
1286 + conn_rep_listen, /**< replication listening socket */
1287 + conn_pipe_recv, /**< replication command pipe recv */
1288 + conn_pipe_send, /**< replication command pipe send */
1289 +#endif /* USE_REPLICATION */
1290 + conn_max_state, /**< Max state value (used for assertion) */
1293 enum bin_substates {
1296 uint64_t get_misses;
1299 time_t started; /* when the process was started */
1301 bool accepting_conns; /* whether we are currently accepting */
1302 uint64_t listen_disabled_num;
1304 @@ -274,6 +282,11 @@
1306 int item_size_max; /* Maximum item size, and upper end for slabs */
1307 bool sasl; /* SASL on/off */
1308 +#ifdef USE_REPLICATION
1309 + struct in_addr rep_addr; /* replication addr */
1310 + int rep_port; /* replication port */
1311 + int rep_qmax; /* replication QITEM max */
1312 +#endif /*USE_REPLICATION*/
1315 extern struct stats stats;
1316 @@ -286,6 +299,10 @@
1318 #define ITEM_SLABBED 4
1320 +#ifdef USE_REPLICATION
1321 +#define ITEM_REPDATA 128
1322 +#endif /*USE_REPLICATION*/
1325 * Structure for storing items within memcached.
1327 @@ -438,6 +455,10 @@
1332 +#ifdef USE_REPLICATION
1333 +#include "replication.h"
1334 +#endif /* USE_REPLICATION */
1337 * Functions such as the libevent-related calls that need to do cross-thread
1338 diff -urN --exclude *.m4 memcached-1.4.4/memcached.spec repcached-2.2-1.4.4/memcached.spec
1339 diff -urN --exclude *.m4 memcached-1.4.4/replication.c repcached-2.2-1.4.4/replication.c
1340 --- memcached-1.4.4/replication.c Thu Jan 1 03:00:00 1970
1341 +++ repcached-2.2-1.4.4/replication.c Wed Feb 10 18:40:48 2010
1343 +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
1347 +#include "memcached.h"
1348 +#include "replication.h"
1349 +#include <stdlib.h>
1351 +#include <unistd.h>
1352 +#include <string.h>
1355 +static Q_ITEM *q_freelist = NULL;
1356 +static int q_itemcount = 0;
1357 +static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER;
1359 +int get_qi_count(void)
1362 + pthread_mutex_lock(&replication_queue_lock);
1364 + pthread_mutex_unlock(&replication_queue_lock);
1368 +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse)
1372 + uint32_t keylen = 0;
1373 + rel_time_t time = 0;
1375 + pthread_mutex_lock(&replication_queue_lock);
1378 + q_freelist = q->next;
1383 + pthread_mutex_unlock(&replication_queue_lock);
1386 + if(q_itemcount >= settings.rep_qmax) {
1387 + pthread_mutex_unlock(&replication_queue_lock);
1390 + q = malloc(sizeof(Q_ITEM));
1392 + fprintf(stderr,"replication: qi_new out of memory\n");
1393 + pthread_mutex_unlock(&replication_queue_lock);
1397 + if (settings.verbose > 2)
1398 + fprintf(stderr,"replication: alloc c=%d\n", q_itemcount);
1401 + pthread_mutex_unlock(&replication_queue_lock);
1404 + case REPLICATION_REP:
1405 + case REPLICATION_DEL:
1407 + keylen = cmd->keylen;
1409 + case REPLICATION_FLUSH_ALL:
1411 + case REPLICATION_DEFER_FLUSH_ALL:
1414 + case REPLICATION_MARUGOTO_END:
1417 + fprintf(stderr,"replication: got unknown command: %d\n", type);
1426 + q->key = malloc(keylen + 1);
1427 + if(NULL == q->key){
1431 + memcpy(q->key, key, keylen);
1432 + *(q->key + keylen) = 0;
1439 +void qi_free(Q_ITEM *q)
1446 + pthread_mutex_lock(&replication_queue_lock);
1447 + q->next = q_freelist;
1449 + pthread_mutex_unlock(&replication_queue_lock);
1458 + pthread_mutex_lock(&replication_queue_lock);
1459 + while((q = q_freelist)){
1462 + q_freelist = q->next;
1465 + pthread_mutex_unlock(&replication_queue_lock);
1469 +static int replication_get_num(char *p, int n)
1474 + l = sprintf(p, "%u", n);
1476 + l = sprintf(b, "%u", n);
1480 +int replication_call_rep(char *key, size_t keylen)
1484 + r.keylen = keylen;
1485 + return(replication(REPLICATION_REP, &r));
1488 +int replication_call_del(char *key, size_t keylen)
1492 + r.keylen = keylen;
1493 + return(replication(REPLICATION_DEL, &r));
1496 +int replication_call_flush_all()
1500 + return(replication(REPLICATION_FLUSH_ALL, &r));
1503 +int replication_call_defer_flush_all(const rel_time_t time)
1508 + return(replication(REPLICATION_DEFER_FLUSH_ALL, &r));
1511 +int replication_call_marugoto_end()
1515 + return(replication(REPLICATION_MARUGOTO_END, &r));
1518 +static int replication_alloc(conn *c, int s)
1523 + while(c->wsize < s)
1525 + if((p = malloc(c->wsize))){
1526 + memcpy(p, c->wbuf, c->wbytes);
1536 +static int replication_del(conn *c, char *k)
1539 + char *s = "delete ";
1546 + if(replication_alloc(c,l) == -1){
1547 + fprintf(stderr, "replication: del malloc error\n");
1550 + p = c->wbuf + c->wbytes;
1551 + memcpy(p, s, strlen(s));
1553 + memcpy(p, k, strlen(k));
1555 + memcpy(p, n, strlen(n));
1557 + c->wbytes = p - c->wbuf;
1558 + c->wcurr = c->wbuf;
1562 +static int replication_rep(conn *c, item *it)
1572 + exp = it->exptime + process_started;
1574 + if((p=ITEM_suffix(it))){
1576 + memcpy(flag, p, it->nsuffix - 2);
1577 + flag[it->nsuffix - 2] = 0;
1578 + for(i=0;i<strlen(flag);i++){
1582 + memmove(flag,&flag[i],strlen(flag)-i);
1583 + for(p=flag;*p>' ';p++);
1589 + len += strlen(flag);
1591 + len += replication_get_num(NULL, exp);
1593 + len += replication_get_num(NULL, it->nbytes - 2);
1595 + len += replication_get_num(NULL, ITEM_get_cas(it));
1597 + len += it->nbytes;
1599 + if(replication_alloc(c,len) == -1){
1600 + fprintf(stderr, "replication: rep malloc error\n");
1603 + p = c->wbuf + c->wbytes;
1604 + memcpy(p, s, strlen(s));
1606 + memcpy(p, ITEM_key(it), it->nkey);
1609 + memcpy(p, flag, strlen(flag));
1610 + p += strlen(flag);
1612 + p += replication_get_num(p, exp);
1614 + p += replication_get_num(p, it->nbytes - 2);
1616 + p += replication_get_num(p, ITEM_get_cas(it));
1617 + memcpy(p, n, strlen(n));
1619 + memcpy(p, ITEM_data(it), it->nbytes);
1621 + c->wbytes = p - c->wbuf;
1622 + c->wcurr = c->wbuf;
1626 +static int replication_flush_all(conn *c, rel_time_t exp)
1628 + char *s = "flush_all ";
1632 + int l = strlen(s) + strlen(n);
1634 + l += replication_get_num(NULL, exp);
1635 + if(replication_alloc(c,l) == -1){
1636 + fprintf(stderr, "replication: flush_all malloc error\n");
1639 + p = c->wbuf + c->wbytes;
1640 + memcpy(p, s, strlen(s));
1643 + p += replication_get_num(p, exp);
1644 + memcpy(p, n, strlen(n));
1646 + c->wbytes = p - c->wbuf;
1647 + c->wcurr = c->wbuf;
1651 +static int replication_marugoto_end(conn *c)
1653 + char *s = "marugoto_end";
1657 + int l = strlen(s) + strlen(n);
1658 + if(replication_alloc(c,l) == -1){
1659 + fprintf(stderr, "replication: marugoto_end malloc error\n");
1662 + p = c->wbuf + c->wbytes;
1663 + memcpy(p, s, strlen(s));
1665 + memcpy(p, n, strlen(n));
1667 + c->wbytes = p - c->wbuf;
1668 + c->wcurr = c->wbuf;
1672 +int replication_cmd(conn *c, Q_ITEM *q)
1677 + switch (q->type) {
1678 + case REPLICATION_REP:
1679 + it = item_get(q->key, strlen(q->key));
1681 + return(replication_del(c, q->key));
1682 + r = replication_rep(c, it);
1685 + case REPLICATION_DEL:
1686 + return(replication_del(c, q->key));
1687 + case REPLICATION_FLUSH_ALL:
1688 + return(replication_flush_all(c, 0));
1689 + case REPLICATION_DEFER_FLUSH_ALL:
1690 + return(replication_flush_all(c, q->time));
1691 + case REPLICATION_MARUGOTO_END:
1692 + return(replication_marugoto_end(c));
1694 + fprintf(stderr,"replication: got unknown command:%d\n", q->type);
1698 diff -urN --exclude *.m4 memcached-1.4.4/replication.h repcached-2.2-1.4.4/replication.h
1699 --- memcached-1.4.4/replication.h Thu Jan 1 03:00:00 1970
1700 +++ repcached-2.2-1.4.4/replication.h Wed Feb 10 18:40:31 2010
1702 +#ifndef MEMCACHED_REPLICATION_H
1703 +#define MEMCACHED_REPLICATION_H
1704 +#define REPCACHED_VERSION "2.2"
1710 + REPLICATION_FLUSH_ALL,
1711 + REPLICATION_DEFER_FLUSH_ALL,
1712 + REPLICATION_MARUGOTO_END,
1715 +typedef struct queue_item_t Q_ITEM;
1716 +struct queue_item_t {
1717 + enum CMD_TYPE type;
1723 +typedef struct replication_cmd_t R_CMD;
1724 +struct replication_cmd_t {
1730 +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool);
1731 +void qi_free(Q_ITEM *);
1732 +int qi_free_list(void);
1733 +int replication_cmd(conn *, Q_ITEM *);
1734 +int get_qi_count(void);
1736 +int replication_call_rep(char *key, size_t keylen);
1737 +int replication_call_del(char *key, size_t keylen);
1738 +int replication_call_flush_all(void);
1739 +int replication_call_defer_flush_all(const rel_time_t time);
1740 +int replication_call_marugoto_end(void);
1741 +int replication(enum CMD_TYPE type, R_CMD *cmd);
1744 diff -urN --exclude *.m4 memcached-1.4.4/t/binary.t repcached-2.2-1.4.4/t/binary.t
1745 --- memcached-1.4.4/t/binary.t Fri Nov 27 08:05:16 2009
1746 +++ repcached-2.2-1.4.4/t/binary.t Wed Feb 10 17:04:01 2010
1751 -use Test::More tests => 3349;
1753 use FindBin qw($Bin);
1757 +Test::More::plan(tests => support_replication() ? 3385 : 3349);
1759 my $server = new_memcached();
1760 ok($server, "started the server");
1761 diff -urN --exclude *.m4 memcached-1.4.4/t/issue_67.t repcached-2.2-1.4.4/t/issue_67.t
1762 --- memcached-1.4.4/t/issue_67.t Sun Nov 1 01:44:09 2009
1763 +++ repcached-2.2-1.4.4/t/issue_67.t Wed Feb 10 17:50:12 2010
1765 my $exe = "$builddir/memcached-debug";
1766 croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe;
1768 + if (support_replication()) {
1772 my $childpid = fork();
1774 my $cmd = "$builddir/timedrun 10 $exe $args";
1775 diff -urN --exclude *.m4 memcached-1.4.4/t/lib/MemcachedTest.pm repcached-2.2-1.4.4/t/lib/MemcachedTest.pm
1776 --- memcached-1.4.4/t/lib/MemcachedTest.pm Fri Oct 30 04:24:52 2009
1777 +++ repcached-2.2-1.4.4/t/lib/MemcachedTest.pm Wed Feb 10 17:53:34 2010
1781 @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats
1782 - supports_sasl free_port);
1783 + supports_sasl free_port support_replication memcached_version
1788 @@ -148,6 +149,23 @@
1792 +sub support_replication {
1793 + my $output = `$builddir/memcached-debug -h`;
1794 + return 1 if $output =~ /^-x <ip_addr>/m;
1798 +sub memcached_version {
1799 + my $output = `$builddir/memcached-debug -h`;
1800 + return $1 if $output =~ /^memcached (\d[\d\.]+)/;
1805 + my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/);
1806 + return $major*100**2 + $minor*100 + $pl
1810 my ($args, $passed_port) = @_;
1811 my $port = $passed_port || free_port();
1815 $args .= " -u root";
1817 + if (support_replication() && $args !~ m/-X/) {
1821 my $childpid = fork();
1822 diff -urN --exclude *.m4 memcached-1.4.4/t/stats.t repcached-2.2-1.4.4/t/stats.t
1823 --- memcached-1.4.4/t/stats.t Thu Nov 26 03:37:49 2009
1824 +++ repcached-2.2-1.4.4/t/stats.t Tue Feb 9 23:13:26 2010
1826 my $stats = mem_stats($sock);
1828 # Test number of keys
1829 -is(scalar(keys(%$stats)), 37, "37 stats values");
1830 +if (! support_replication()) {
1831 + is(scalar(keys(%$stats)), 37, "37 stats values");
1833 + is(scalar(keys(%$stats)), 40, "40 stats values");
1836 # Test initial state
1837 foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses
1838 diff -urN --exclude *.m4 memcached-1.4.4/testapp.c repcached-2.2-1.4.4/testapp.c
1839 --- memcached-1.4.4/testapp.c Wed Nov 25 03:40:29 2009
1840 +++ repcached-2.2-1.4.4/testapp.c Wed Feb 10 17:52:05 2010
1841 @@ -300,6 +300,10 @@
1845 +#ifdef USE_REPLICATION
1846 + argv[arg++] = "-X";
1847 + argv[arg++] = "0";
1849 /* Handle rpmbuild and the like doing this as root */
1850 if (getuid() == 0) {