diff options
author | Elan Ruusamäe | 2017-04-24 23:09:23 (GMT) |
---|---|---|
committer | Elan Ruusamäe | 2017-04-24 23:11:14 (GMT) |
commit | dacfa27336a56b0388e2779d16efed2b4ef3db22 (patch) | |
tree | dd4209b777c2a6fd4137c23dc5317da221cf9102 | |
parent | 040242e915527d1b43f0653f2a177cfbf47e6fd1 (diff) | |
download | memcached-dacfa27336a56b0388e2779d16efed2b4ef3db22.zip memcached-dacfa27336a56b0388e2779d16efed2b4ef3db22.tar.gz |
up to 1.4.36; drop repcached patch (unmaintained)auto/th/memcached-1.4.36-1
-rw-r--r-- | memcached.spec | 13 | ||||
-rw-r--r-- | repcached.patch | 1828 |
2 files changed, 4 insertions, 1837 deletions
diff --git a/memcached.spec b/memcached.spec index b3b6272..1fe5c48 100644 --- a/memcached.spec +++ b/memcached.spec @@ -8,23 +8,19 @@ # ^ #crawler.c:229:13: warning: format '%ld' expects argument of type 'long int', but argument 5 has type 'time_t {aka long long int}' [-Wformat=] -# Conditional build: -%bcond_with repcached # repcached support, http://repcached.lab.klab.org/ - Summary: A high-performance, distributed memory object caching system Summary(pl.UTF-8): Rozproszony, wysokiej wydajności system cache'owania obiektów Name: memcached -Version: 1.4.33 -Release: 2 +Version: 1.4.36 +Release: 1 License: BSD Group: Networking/Daemons Source0: http://www.memcached.org/files/%{name}-%{version}.tar.gz -# Source0-md5: 2d7f6476283cd36e21e521d901d37a8f +# Source0-md5: 1e028fbab7288911fcaa5ed2a21817fe Source1: %{name}.init Source2: %{name}.sysconfig Source3: %{name}.tmpfiles -URL: http://memcached.org/ -Patch0: repcached.patch +URL: https://memcached.org/about BuildRequires: autoconf BuildRequires: automake BuildRequires: libevent-devel >= 1.1 @@ -61,7 +57,6 @@ require access to the memcached binary include files. %prep %setup -q -%{?with_repcached:%patch0 -p1} sed -nie '1,/^$/p' ChangeLog diff --git a/repcached.patch b/repcached.patch deleted file mode 100644 index 4193fae..0000000 --- a/repcached.patch +++ /dev/null @@ -1,1828 +0,0 @@ ---- memcached-1.4.4/Makefile.am Fri Oct 30 04:24:52 2009 -+++ repcached-2.2-1.4.4/Makefile.am Tue Feb 9 23:02:45 2010 -@@ -31,6 +31,10 @@ - memcached_SOURCES += sasl_defs.c - endif - -+if ENABLE_REPLICATION -+memcached_SOURCES += replication.h replication.c -+endif -+ - memcached_debug_SOURCES = $(memcached_SOURCES) - memcached_CPPFLAGS = -DNDEBUG - memcached_debug_LDADD = @PROFILER_LDFLAGS@ ---- memcached-1.4.4/assoc.c Sat Oct 24 00:38:01 2009 -+++ repcached-2.2-1.4.4/assoc.c Tue Feb 9 23:02:45 2010 -@@ -258,3 +258,51 @@ - } - - -+#ifdef USE_REPLICATION -+char *assoc_key_snap(int *n) -+{ -+ char *p = NULL; -+ char *b = NULL; -+ item *i = NULL; -+ int co = 0; -+ int sz = 1; -+ int hs = 0; -+ int hm = hashsize(hashpower); -+ -+ hs = hm; -+ while(hs--){ -+ if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){ -+ i = old_hashtable[hs]; -+ }else{ -+ i = primary_hashtable[hs]; -+ } -+ while(i){ -+ sz += i->nkey + 1; -+ co++; -+ i = i->h_next; -+ } -+ } -+ -+ if(co){ -+ if((p = b = malloc(sz))){ -+ hs = hm; -+ while(hs--){ -+ if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){ -+ i = old_hashtable[hs]; -+ }else{ -+ i = primary_hashtable[hs]; -+ } -+ while(i){ -+ memcpy(p, ITEM_key(i), i->nkey); -+ p += i->nkey; -+ *(p++) = 0; -+ i = i->h_next; -+ } -+ } -+ *(p++) = 0; -+ } -+ } -+ if(n) *n = co; -+ return(b); -+} -+#endif /* USE_REPLICATION */ ---- memcached-1.4.4/assoc.h Sun Aug 30 03:00:58 2009 -+++ repcached-2.2-1.4.4/assoc.h Tue Feb 9 23:02:45 2010 -@@ -7,3 +7,6 @@ - int start_assoc_maintenance_thread(void); - void stop_assoc_maintenance_thread(void); - -+#ifdef USE_REPLICATION -+char *assoc_key_snap(int *n); -+#endif /*USE_REPLICATION*/ ---- memcached-1.4.4/config.h.in Fri Nov 27 09:34:56 2009 -+++ repcached-2.2-1.4.4/config.h.in Wed Feb 10 19:12:46 2010 -@@ -99,6 +99,9 @@ - /* Define to 1 if you have the ANSI C header files. */ - #undef STDC_HEADERS - -+/* Define this if you want to use replication */ -+#undef USE_REPLICATION -+ - /* Version number of package */ - #undef VERSION - ---- memcached-1.4.4/configure.ac Wed Nov 25 03:40:29 2009 -+++ repcached-2.2-1.4.4/configure.ac Tue Feb 9 23:02:45 2010 -@@ -382,6 +382,18 @@ - AC_MSG_ERROR([Can't enable threads without the POSIX thread library.]) - fi - -+dnl Check whether the user wants replication or not -+AC_ARG_ENABLE(replication, -+ [AS_HELP_STRING([--enable-replication],[support replication])], -+ [if test "x$enable_threads" = "xyes"; then -+ AC_MSG_ERROR([Can't enable threads and replication together.]) -+ else -+ AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication]) -+ fi -+ ]) -+ -+AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes") -+ - AC_CHECK_FUNCS(mlockall) - AC_CHECK_FUNCS(getpagesizes) - AC_CHECK_FUNCS(memcntl) ---- memcached-1.4.4/items.c Sat Oct 24 00:38:01 2009 -+++ repcached-2.2-1.4.4/items.c Tue Feb 9 23:02:45 2010 -@@ -155,6 +155,9 @@ - STATS_LOCK(); - stats.evictions++; - STATS_UNLOCK(); -+#ifdef USE_REPLICATION -+ replication_call_del(ITEM_key(search), search->nkey); -+#endif /* USE_REPLICATION */ - } - do_item_unlink(search); - break; -@@ -288,8 +291,14 @@ - stats.total_items += 1; - STATS_UNLOCK(); - -+#ifdef USE_REPLICATION -+ /* Allocate a new CAS ID on link. */ -+ if(!(it->it_flags & ITEM_REPDATA)) -+ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); -+#else - /* Allocate a new CAS ID on link. */ - ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); -+#endif /* USE_REPLICATION */ - - item_link_q(it); - ---- memcached-1.4.4/memcached.c Fri Nov 27 08:45:13 2009 -+++ repcached-2.2-1.4.4/memcached.c Wed Feb 10 16:08:37 2010 -@@ -102,6 +102,30 @@ - - static void conn_free(conn *c); - -+#ifdef USE_REPLICATION -+static int rep_exit = 0; -+static conn *rep_recv = NULL; -+static conn *rep_send = NULL; -+static conn *rep_conn = NULL; -+static conn *rep_serv = NULL; -+static int server_socket_replication(const int); -+static void server_close_replication(void); -+static int replication_init(void); -+static int replication_server_init(void); -+static int replication_client_init(void); -+static int replication_start(void); -+static int replication_connect(void); -+static int replication_close(void); -+static void replication_dispatch_close(void); -+static int replication_marugoto(int); -+static int replication_send(conn *); -+static int replication_pop(void); -+static int replication_push(void); -+static int replication_exit(void); -+static int replication_item(Q_ITEM *); -+static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER; -+#endif /* USE_REPLICATION */ -+ - /** exported globals **/ - struct stats stats; - struct settings settings; -@@ -194,6 +218,11 @@ - settings.backlog = 1024; - settings.binding_protocol = negotiating_prot; - settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */ -+#ifdef USE_REPLICATION -+ settings.rep_addr.s_addr = htonl(INADDR_ANY); -+ settings.rep_port = 11212; -+ settings.rep_qmax = 8192; -+#endif /* USE_REPLICATION */ - } - - /* -@@ -382,6 +411,10 @@ - prot_text(c->protocol)); - } else if (IS_UDP(transport)) { - fprintf(stderr, "<%d server listening (udp)\n", sfd); -+#ifdef USE_REPLICATION -+ } else if (init_state == conn_rep_listen) { -+ fprintf(stderr, "<%d server listening (replication)\n", sfd); -+#endif /* USE_REPLICATION */ - } else if (c->protocol == negotiating_prot) { - fprintf(stderr, "<%d new auto-negotiating client connection\n", - sfd); -@@ -593,7 +626,11 @@ - "conn_nread", - "conn_swallow", - "conn_closing", -- "conn_mwrite" }; -+ "conn_mwrite", -+ "conn_repconnect", -+ "conn_rep_listen", -+ "conn_pipe_recv", -+ "conn_pipe_send" }; - return statenames[state]; - } - -@@ -752,6 +789,14 @@ - - assert(c != NULL); - -+#ifdef USE_REPLICATION -+ if (c == rep_conn){ -+ if (settings.verbose > 1) -+ fprintf(stderr, "REP>%d %s\n", c->sfd, str); -+ conn_set_state(c, conn_new_cmd); -+ return; -+ } -+#endif /* USE_REPLICATION */ - if (c->noreply) { - if (settings.verbose > 1) - fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str); -@@ -791,9 +836,11 @@ - int comm = c->cmd; - enum store_item_type ret; - -- pthread_mutex_lock(&c->thread->stats.mutex); -- c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; -- pthread_mutex_unlock(&c->thread->stats.mutex); -+ if (c->thread) { -+ pthread_mutex_lock(&c->thread->stats.mutex); -+ c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; -+ pthread_mutex_unlock(&c->thread->stats.mutex); -+ } - - if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { - out_string(c, "CLIENT_ERROR bad data chunk"); -@@ -832,6 +879,11 @@ - - switch (ret) { - case STORED: -+#ifdef USE_REPLICATION -+ if( c != rep_conn ){ -+ replication_call_rep(ITEM_key(it), it->nkey); -+ } -+#endif /* USE_REPLICATION */ - out_string(c, "STORED"); - break; - case EXISTS: -@@ -2410,6 +2462,11 @@ - APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num); - APPEND_STAT("threads", "%d", settings.num_threads); - APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields); -+#ifdef USE_REPLICATION -+ APPEND_STAT("replication", "MASTER", 0); -+ APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION); -+ APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count()); -+#endif /*USE_REPLICATION*/ - STATS_UNLOCK(); - } - -@@ -2797,6 +2854,11 @@ - switch(add_delta(c, it, incr, delta, temp)) { - case OK: - out_string(c, temp); -+#ifdef USE_REPLICATION -+ if( c != rep_conn){ -+ replication_call_rep(ITEM_key(it), it->nkey); -+ } -+#endif /* USE_REPLICATION */ - break; - case NON_NUMERIC: - out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value"); -@@ -2911,17 +2973,25 @@ - if (it) { - MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); - -- pthread_mutex_lock(&c->thread->stats.mutex); -- c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++; -- pthread_mutex_unlock(&c->thread->stats.mutex); -+ if (c->thread) { -+ pthread_mutex_lock(&c->thread->stats.mutex); -+ c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++; -+ pthread_mutex_unlock(&c->thread->stats.mutex); -+ } - - item_unlink(it); - item_remove(it); /* release our reference */ -+#ifdef USE_REPLICATION -+ if( c != rep_conn ) -+ replication_call_del(key, nkey); -+#endif /* USE_REPLICATION */ - out_string(c, "DELETED"); - } else { -- pthread_mutex_lock(&c->thread->stats.mutex); -- c->thread->stats.delete_misses++; -- pthread_mutex_unlock(&c->thread->stats.mutex); -+ if (c->thread) { -+ pthread_mutex_lock(&c->thread->stats.mutex); -+ c->thread->stats.delete_misses++; -+ pthread_mutex_unlock(&c->thread->stats.mutex); -+ } - - out_string(c, "NOT_FOUND"); - } -@@ -2986,6 +3056,22 @@ - - process_update_command(c, tokens, ntokens, comm, true); - -+#ifdef USE_REPLICATION -+ } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) { -+ -+ process_update_command(c, tokens, ntokens, comm, true); -+ if(c->item) -+ ((item *)(c->item))->it_flags |= ITEM_REPDATA; -+ -+ } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) { -+ if(replication_start() == -1) -+ exit(EXIT_FAILURE); -+ if (settings.verbose > 0) -+ fprintf(stderr,"replication: start\n"); -+ out_string(c, "OK"); -+ return; -+ -+#endif /* USE_REPLICATION */ - } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { - - process_arithmetic_command(c, tokens, ntokens, 1); -@@ -3012,11 +3098,17 @@ - - set_noreply_maybe(c, tokens, ntokens); - -- pthread_mutex_lock(&c->thread->stats.mutex); -- c->thread->stats.flush_cmds++; -- pthread_mutex_unlock(&c->thread->stats.mutex); -+ if (c->thread) { -+ pthread_mutex_lock(&c->thread->stats.mutex); -+ c->thread->stats.flush_cmds++; -+ pthread_mutex_unlock(&c->thread->stats.mutex); -+ } - - if(ntokens == (c->noreply ? 3 : 2)) { -+#ifdef USE_REPLICATION -+ if( c != rep_conn ) -+ replication_call_flush_all(); -+#endif - settings.oldest_live = current_time - 1; - item_flush_expired(); - out_string(c, "OK"); -@@ -3029,6 +3121,11 @@ - return; - } - -+#ifdef USE_REPLICATION -+ if( c != rep_conn ) -+ replication_call_defer_flush_all(realtime(exptime) + process_started); -+#endif -+ settings.oldest_live = realtime(exptime) - 1; - /* - If exptime is zero realtime() would return zero too, and - realtime(exptime) - 1 would overflow to the max unsigned -@@ -3275,9 +3372,11 @@ - int avail = c->rsize - c->rbytes; - res = read(c->sfd, c->rbuf + c->rbytes, avail); - if (res > 0) { -- pthread_mutex_lock(&c->thread->stats.mutex); -- c->thread->stats.bytes_read += res; -- pthread_mutex_unlock(&c->thread->stats.mutex); -+ if (c->thread) { -+ pthread_mutex_lock(&c->thread->stats.mutex); -+ c->thread->stats.bytes_read += res; -+ pthread_mutex_unlock(&c->thread->stats.mutex); -+ } - gotdata = READ_DATA_RECEIVED; - c->rbytes += res; - if (res == avail) { -@@ -3423,6 +3522,12 @@ - - assert(c != NULL); - -+#ifdef USE_REPLICATION -+ if(rep_exit && (c->state != conn_pipe_recv)){ -+ return; -+ } -+#endif /* USE_REPLICATION */ -+ - while (!stop) { - - switch(c->state) { -@@ -3502,9 +3607,11 @@ - if (nreqs >= 0) { - reset_cmd_handler(c); - } else { -- pthread_mutex_lock(&c->thread->stats.mutex); -- c->thread->stats.conn_yields++; -- pthread_mutex_unlock(&c->thread->stats.mutex); -+ if (c->thread) { -+ pthread_mutex_lock(&c->thread->stats.mutex); -+ c->thread->stats.conn_yields++; -+ pthread_mutex_unlock(&c->thread->stats.mutex); -+ } - if (c->rbytes > 0) { - /* We have already read in data into the input buffer, - so libevent will most likely not signal read events -@@ -3545,9 +3652,11 @@ - /* now try reading from the socket */ - res = read(c->sfd, c->ritem, c->rlbytes); - if (res > 0) { -- pthread_mutex_lock(&c->thread->stats.mutex); -- c->thread->stats.bytes_read += res; -- pthread_mutex_unlock(&c->thread->stats.mutex); -+ if (c->thread) { -+ pthread_mutex_lock(&c->thread->stats.mutex); -+ c->thread->stats.bytes_read += res; -+ pthread_mutex_unlock(&c->thread->stats.mutex); -+ } - if (c->rcurr == c->ritem) { - c->rcurr += res; - } -@@ -3600,9 +3709,11 @@ - /* now try reading from the socket */ - res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize); - if (res > 0) { -- pthread_mutex_lock(&c->thread->stats.mutex); -- c->thread->stats.bytes_read += res; -- pthread_mutex_unlock(&c->thread->stats.mutex); -+ if (c->thread) { -+ pthread_mutex_lock(&c->thread->stats.mutex); -+ c->thread->stats.bytes_read += res; -+ pthread_mutex_unlock(&c->thread->stats.mutex); -+ } - c->sbytes -= res; - break; - } -@@ -3698,6 +3809,10 @@ - case conn_closing: - if (IS_UDP(c->transport)) - conn_cleanup(c); -+#ifdef USE_REPLICATION -+ else if(c == rep_conn) -+ replication_close(); -+#endif /*USE_REPLICATION*/ - else - conn_close(c); - stop = true; -@@ -3706,6 +3821,70 @@ - case conn_max_state: - assert(false); - break; -+ -+#ifdef USE_REPLICATION -+ case conn_pipe_recv: -+ if(replication_pop()){ -+ replication_close(); -+ }else{ -+ replication_send(rep_conn); -+ } -+ stop = true; -+ break; -+ -+ case conn_rep_listen: -+ if (settings.verbose > 0) -+ fprintf(stderr,"replication: accept\n"); -+ addrlen = sizeof(addr); -+ res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); -+ if(res == -1){ -+ if(errno == EAGAIN || errno == EWOULDBLOCK) { -+ } else if (errno == EMFILE) { -+ fprintf(stderr, "replication: Too many opened connections\n"); -+ } else { -+ fprintf(stderr, "replication: accept error\n"); -+ } -+ }else{ -+ if(rep_conn){ -+ close(res); -+ fprintf(stderr,"replication: already connected\n"); -+ }else{ -+ if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){ -+ close(res); -+ fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n"); -+ }else{ -+ server_close_replication(); -+ rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); -+ rep_conn->item = NULL; -+ rep_conn->rbytes = 0; -+ rep_conn->rcurr = rep_conn->rbuf; -+ replication_connect(); -+ replication_marugoto(1); -+ replication_marugoto(0); -+ } -+ } -+ } -+ stop = true; -+ break; -+ -+ case conn_repconnect: -+ rep_conn = c; -+ replication_connect(); -+ conn_set_state(c, conn_read); -+ if (settings.verbose > 0) -+ fprintf(stderr,"replication: marugoto copying\n"); -+ if(!update_event(c, EV_READ | EV_PERSIST)){ -+ fprintf(stderr, "replication: Couldn't update event\n"); -+ conn_set_state(c, conn_closing); -+ } -+ stop = true; -+ break; -+ -+ case conn_pipe_send: -+ /* should not happen */ -+ fprintf(stderr, "replication: unexpected conn_pipe_send state\n"); -+ break; -+#endif /* USE_REPLICATION */ - } - } - -@@ -4002,6 +4181,89 @@ - return 0; - } - -+#ifdef USE_REPLICATION -+static int server_socket_replication(const int port) { -+ int sfd; -+ struct linger ling = {0, 0}; -+ struct addrinfo *ai; -+ struct addrinfo *next; -+ struct addrinfo hints; -+ char port_buf[NI_MAXSERV]; -+ int error; -+ int success = 0; -+ -+ int flags =1; -+ -+ memset(&hints, 0, sizeof (hints)); -+ hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG; -+ hints.ai_family = AF_UNSPEC; -+ hints.ai_protocol = IPPROTO_TCP; -+ hints.ai_socktype = SOCK_STREAM; -+ snprintf(port_buf, NI_MAXSERV, "%d", port); -+ error= getaddrinfo(settings.inter, port_buf, &hints, &ai); -+ if (error != 0) { -+ if (error != EAI_SYSTEM) -+ fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error)); -+ else -+ perror("getaddrinfo()"); -+ -+ return 1; -+ } -+ -+ for (next= ai; next; next= next->ai_next) { -+ conn *rep_serv_add; -+ if ((sfd = new_socket(next)) == -1) { -+ freeaddrinfo(ai); -+ return 1; -+ } -+ setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); -+ setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); -+ setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); -+ setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); -+ -+ if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) { -+ if (errno != EADDRINUSE) { -+ perror("bind()"); -+ close(sfd); -+ freeaddrinfo(ai); -+ return 1; -+ } -+ close(sfd); -+ continue; -+ } else { -+ success++; -+ if (listen(sfd, 1024) == -1) { -+ perror("listen()"); -+ close(sfd); -+ freeaddrinfo(ai); -+ return 1; -+ } -+ } -+ -+ if (!(rep_serv_add = conn_new(sfd, conn_rep_listen, -+ EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) { -+ fprintf(stderr, "failed to create replication server connection\n"); -+ exit(EXIT_FAILURE); -+ } -+ -+ rep_serv_add->next = rep_serv; -+ rep_serv = rep_serv_add; -+ } -+ -+ freeaddrinfo(ai); -+ -+ /* Return zero iff we detected no errors in starting up connections */ -+ return success == 0; -+} -+ -+static void server_close_replication(void) { -+ while(rep_serv){ -+ conn_close(rep_serv); -+ rep_serv = rep_serv->next; -+ } -+} -+#endif /* USE_REPLICATION */ -+ - /* - * We keep the current time of day in a global variable that's updated by a - * timer event. This saves us a bunch of time() system calls (we really only -@@ -4041,6 +4303,9 @@ - - static void usage(void) { - printf(PACKAGE " " VERSION "\n"); -+#ifdef USE_REPLICATION -+ printf("repcached %s\n",REPCACHED_VERSION); -+#endif /* USE_REPLICATION */ - printf("-p <num> TCP port number to listen on (default: 11211)\n" - "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n" - "-s <file> UNIX socket path to listen on (disables network support)\n" -@@ -4088,6 +4353,10 @@ - #ifdef ENABLE_SASL - printf("-S Turn on Sasl authentication\n"); - #endif -+#ifdef USE_REPLICATION -+ printf("-x <ip_addr> hostname or IP address of peer repcached\n"); -+ printf("-X <num> TCP port number for replication (default: 11212)\n"); -+#endif /* USE_REPLICATION */ - return; - } - -@@ -4194,6 +4463,26 @@ - exit(EXIT_SUCCESS); - } - -+#ifdef USE_REPLICATION -+static void sig_handler_cb(int fd, short event, void *arg) -+{ -+ struct event *signal = arg; -+ -+ if (settings.verbose) -+ fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal)); -+ -+ if (replication_exit()) { -+ exit(EXIT_FAILURE); -+ } -+ -+ pthread_mutex_lock(&replication_pipe_lock); -+ if (!rep_send) { -+ exit(EXIT_SUCCESS); -+ } -+ pthread_mutex_unlock(&replication_pipe_lock); -+} -+#endif /* USE_REPLICATION */ -+ - #ifndef HAVE_SIGIGNORE - static int sigignore(int sig) { - struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 }; -@@ -4249,6 +4538,57 @@ - #endif - } - -+static void create_listening_sockets(void) -+{ -+ /* create unix mode sockets after dropping privileges */ -+ if (settings.socketpath != NULL) { -+ errno = 0; -+ if (server_socket_unix(settings.socketpath,settings.access)) { -+ vperror("failed to listen on UNIX socket: %s", settings.socketpath); -+ exit(EX_OSERR); -+ } -+ } -+ -+ /* create the listening socket, bind it, and init */ -+ if (settings.socketpath == NULL) { -+ const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); -+ char temp_portnumber_filename[PATH_MAX]; -+ FILE *portnumber_file = NULL; -+ -+ if (portnumber_filename != NULL) { -+ snprintf(temp_portnumber_filename, -+ sizeof(temp_portnumber_filename), -+ "%s.lck", portnumber_filename); -+ -+ portnumber_file = fopen(temp_portnumber_filename, "a"); -+ if (portnumber_file == NULL) { -+ fprintf(stderr, "Failed to open \"%s\": %s\n", -+ temp_portnumber_filename, strerror(errno)); -+ } -+ } -+ -+ errno = 0; -+ if (settings.port && server_socket(settings.port, tcp_transport, -+ portnumber_file)) { -+ vperror("failed to listen on TCP port %d", settings.port); -+ exit(EX_OSERR); -+ } -+ -+ /* create the UDP listening socket and bind it */ -+ errno = 0; -+ if (settings.udpport && server_socket(settings.udpport, udp_transport, -+ portnumber_file)) { -+ vperror("failed to listen on UDP port %d", settings.udpport); -+ exit(EX_OSERR); -+ } -+ -+ if (portnumber_file) { -+ fclose(portnumber_file); -+ rename(temp_portnumber_filename, portnumber_filename); -+ } -+ } -+} -+ - int main (int argc, char **argv) { - int c; - bool lock_memory = false; -@@ -4261,6 +4601,11 @@ - struct rlimit rlim; - char unit = '\0'; - int size_max = 0; -+#ifdef USE_REPLICATION -+ struct in_addr addr; -+ struct addrinfo master_hint; -+ struct addrinfo *master_addr; -+#endif /* USE_REPLICATION */ - /* listening sockets */ - static int *l_socket = NULL; - -@@ -4307,6 +4652,11 @@ - "B:" /* Binding protocol */ - "I:" /* Max item size */ - "S" /* Sasl ON */ -+#ifdef USE_REPLICATION -+ "X:" /* replication port */ -+ "x:" /* replication master */ -+ "q:" /* replication queue length */ -+#endif /* USE_REPLICATION */ - ))) { - switch (c) { - case 'a': -@@ -4462,6 +4812,31 @@ - ); - } - break; -+#ifdef USE_REPLICATION -+ case 'x': -+ if (inet_pton(AF_INET, optarg, &addr) <= 0) { -+ memset(&master_hint, 0, sizeof(master_hint)); -+ master_hint.ai_flags = 0; -+ master_hint.ai_socktype = 0; -+ master_hint.ai_protocol = 0; -+ if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){ -+ settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr; -+ freeaddrinfo(master_addr); -+ }else{ -+ fprintf(stderr, "Illegal address: %s\n", optarg); -+ return 1; -+ } -+ } else { -+ settings.rep_addr = addr; -+ } -+ break; -+ case 'X': -+ settings.rep_port = atoi(optarg); -+ break; -+ case 'q': -+ settings.rep_qmax = atoi(optarg); -+ break; -+#endif /* USE_REPLICATION */ - case 'S': /* set Sasl authentication to true. Default is false */ - #ifndef ENABLE_SASL - fprintf(stderr, "This server is not built with SASL support.\n"); -@@ -4587,6 +4962,17 @@ - /* initialize main thread libevent instance */ - main_base = event_init(); - -+#ifdef USE_REPLICATION -+ /* register events for SIGINT and SIGTERM to handle them in main thread */ -+ struct event signal_int, signal_term; -+ event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb, -+ &signal_int); -+ event_add(&signal_int, NULL); -+ event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb, -+ &signal_term); -+ event_add(&signal_term, NULL); -+#endif -+ - /* initialize other stuff */ - stats_init(); - assoc_init(); -@@ -4615,63 +5001,21 @@ - /* initialise clock event */ - clock_handler(0, 0, 0); - -- /* create unix mode sockets after dropping privileges */ -- if (settings.socketpath != NULL) { -- errno = 0; -- if (server_socket_unix(settings.socketpath,settings.access)) { -- vperror("failed to listen on UNIX socket: %s", settings.socketpath); -- exit(EX_OSERR); -- } -- } -- -- /* create the listening socket, bind it, and init */ -- if (settings.socketpath == NULL) { -- int udp_port; -- -- const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); -- char temp_portnumber_filename[PATH_MAX]; -- FILE *portnumber_file = NULL; -- -- if (portnumber_filename != NULL) { -- snprintf(temp_portnumber_filename, -- sizeof(temp_portnumber_filename), -- "%s.lck", portnumber_filename); -- -- portnumber_file = fopen(temp_portnumber_filename, "a"); -- if (portnumber_file == NULL) { -- fprintf(stderr, "Failed to open \"%s\": %s\n", -- temp_portnumber_filename, strerror(errno)); -- } -- } -- -- errno = 0; -- if (settings.port && server_socket(settings.port, tcp_transport, -- portnumber_file)) { -- vperror("failed to listen on TCP port %d", settings.port); -- exit(EX_OSERR); -- } -- -- /* -- * initialization order: first create the listening sockets -- * (may need root on low ports), then drop root if needed, -- * then daemonise if needed, then init libevent (in some cases -- * descriptors created by libevent wouldn't survive forking). -- */ -- udp_port = settings.udpport ? settings.udpport : settings.port; -- -- /* create the UDP listening socket and bind it */ -- errno = 0; -- if (settings.udpport && server_socket(settings.udpport, udp_transport, -- portnumber_file)) { -- vperror("failed to listen on UDP port %d", settings.udpport); -- exit(EX_OSERR); -- } -+ /* -+ * initialization order: first create the listening sockets -+ * (may need root on low ports), then drop root if needed, -+ * then daemonise if needed, then init libevent (in some cases -+ * descriptors created by libevent wouldn't survive forking). -+ */ - -- if (portnumber_file) { -- fclose(portnumber_file); -- rename(temp_portnumber_filename, portnumber_filename); -- } -+#ifdef USE_REPLICATION -+ if(replication_init() == -1){ -+ fprintf(stderr, "faild to replication init\n"); -+ exit(EXIT_FAILURE); - } -+#else -+ create_listening_sockets(); -+#endif - - /* Drop privileges no longer needed */ - drop_privileges(); -@@ -4694,3 +5038,401 @@ - - return EXIT_SUCCESS; - } -+ -+#ifdef USE_REPLICATION -+static int replication_start(void) -+{ -+ static int start = 0; -+ if(start) -+ return(0); -+ -+ create_listening_sockets(); -+ -+ start = 1; -+ return(0); -+} -+ -+static int replication_server_init(void) -+{ -+ rep_recv = NULL; -+ rep_send = NULL; -+ rep_conn = NULL; -+ if(server_socket_replication(settings.rep_port)){ -+ fprintf(stderr, "replication: failed to initialize replication server socket\n"); -+ return(-1); -+ } -+ if (settings.verbose > 0) -+ fprintf(stderr, "replication: listen\n"); -+ return(replication_start()); -+} -+ -+static int replication_client_init(void) -+{ -+ int s; -+ conn *c; -+ struct addrinfo ai; -+ struct sockaddr_in server; -+ -+ rep_recv = NULL; -+ rep_send = NULL; -+ rep_conn = NULL; -+ -+ memset(&ai,0,sizeof(ai)); -+ ai.ai_family = AF_INET; -+ ai.ai_socktype = SOCK_STREAM; -+ s = new_socket(&ai); -+ -+ if(s == -1) { -+ fprintf(stderr, "replication: failed to replication client socket\n"); -+ return(-1); -+ }else{ -+ /* connect */ -+ memset((char *)&server, 0, sizeof(server)); -+ server.sin_family = AF_INET; -+ server.sin_addr = settings.rep_addr; -+ server.sin_port = htons(settings.rep_port); -+ if (settings.verbose > 0) -+ fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port); -+ if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){ -+ c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base); -+ if(c == NULL){ -+ fprintf(stderr, "replication: failed to create client conn"); -+ close(s); -+ return(-1); -+ } -+ drive_machine(c); -+ }else{ -+ if(errno == EINPROGRESS){ -+ c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base); -+ if(c == NULL){ -+ fprintf(stderr, "replication: failed to create client conn"); -+ close(s); -+ return(-1); -+ } -+ }else{ -+ fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port)); -+ close(s); -+ return(-1); -+ } -+ } -+ } -+ return(0); -+} -+ -+static int replication_init(void) -+{ -+ if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){ -+ if(replication_client_init() != -1){ -+ return(0); -+ } -+ } -+ return(replication_server_init()); -+} -+ -+static int replication_connect(void) -+{ -+ int f; -+ int p[2]; -+ -+ if(pipe(p) == -1){ -+ fprintf(stderr, "replication: can't create pipe\n"); -+ return(-1); -+ }else{ -+ if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) { -+ fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n"); -+ return(-1); -+ } -+ if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) { -+ fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n"); -+ return(-1); -+ } -+ pthread_mutex_lock(&replication_pipe_lock); -+ rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); -+ rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); -+ event_del(&rep_send->event); -+ pthread_mutex_unlock(&replication_pipe_lock); -+ } -+ return(0); -+} -+ -+static int replication_close(void) -+{ -+ int c; -+ int r; -+ Q_ITEM *q; -+ -+ if(settings.verbose > 0) -+ fprintf(stderr,"replication: close\n"); -+ if(rep_recv){ -+ rep_recv->rbytes = sizeof(q); -+ rep_recv->rcurr = rep_recv->rbuf; -+ c = 0; -+ do{ -+ r = read(rep_recv->sfd, rep_recv->rcurr, rep_recv->rbytes); -+ if(r == -1){ -+ break; -+ } -+ rep_recv->rbytes -= r; -+ rep_recv->rcurr += r; -+ if(!rep_recv->rbytes){ -+ memcpy(&q, rep_recv->rbuf, sizeof(q)); -+ rep_recv->rbytes = sizeof(q); -+ rep_recv->rcurr = rep_recv->rbuf; -+ qi_free(q); -+ c++; -+ } -+ }while(r); -+ conn_close(rep_recv); -+ rep_recv = NULL; -+ if (settings.verbose > 1) { -+ fprintf(stderr, "replication: qitem free %d items\n", qi_free_list()); -+ fprintf(stderr, "replication: close recv %d items\n", c); -+ } -+ } -+ pthread_mutex_lock(&replication_pipe_lock); -+ if(rep_send){ -+ conn_close(rep_send); -+ rep_send = NULL; -+ if (settings.verbose > 1) -+ fprintf(stderr,"replication: close send\n"); -+ } -+ pthread_mutex_unlock(&replication_pipe_lock); -+ if(rep_conn){ -+ conn_close(rep_conn); -+ rep_conn = NULL; -+ if (settings.verbose > 1) -+ fprintf(stderr,"replication: close conn\n"); -+ } -+ if(!rep_exit) -+ replication_server_init(); -+ return(0); -+} -+ -+static void replication_dispatch_close(void) -+{ -+ if (settings.verbose > 1) -+ fprintf(stderr, "replication: dispatch close\n"); -+ pthread_mutex_lock(&replication_pipe_lock); -+ if (rep_send) { -+ conn_close(rep_send); -+ rep_send = NULL; -+ } -+ pthread_mutex_unlock(&replication_pipe_lock); -+} -+ -+static int replication_marugoto(int f) -+{ -+ static int keysend = 0; -+ static int keycount = 0; -+ static char *keylist = NULL; -+ static char *keyptr = NULL; -+ -+ if(f){ -+ if(keylist){ -+ free(keylist); -+ keylist = NULL; -+ keyptr = NULL; -+ keycount = 0; -+ keysend = 0; -+ } -+ pthread_mutex_lock(&cache_lock); -+ keylist = (char *)assoc_key_snap((int *)&keycount); -+ pthread_mutex_unlock(&cache_lock); -+ keyptr = keylist; -+ if (!keyptr){ -+ replication_call_marugoto_end(); -+ }else{ -+ if (settings.verbose > 0) -+ fprintf(stderr,"replication: marugoto start\n"); -+ } -+ }else{ -+ if(keyptr){ -+ while(*keyptr){ -+ item *it = item_get(keyptr, strlen(keyptr)); -+ if(it){ -+ item_remove(it); -+ if(replication_call_rep(keyptr, strlen(keyptr)) == -1){ -+ return(-1); -+ }else{ -+ keysend++; -+ keyptr += strlen(keyptr) + 1; -+ return(0); -+ } -+ } -+ keyptr += strlen(keyptr) + 1; -+ } -+ if(settings.verbose > 0) -+ fprintf(stderr,"replication: marugoto %d\n", keysend); -+ replication_call_marugoto_end(); -+ if(settings.verbose > 0) -+ fprintf(stderr,"replication: marugoto owari\n"); -+ free(keylist); -+ keylist = NULL; -+ keyptr = NULL; -+ keycount = 0; -+ keysend = 0; -+ } -+ } -+ return(0); -+} -+ -+static int replication_send(conn *c) -+{ -+ while(c->wbytes){ -+ int w = write(c->sfd, c->wcurr, c->wbytes); -+ if(w == -1){ -+ if(errno == EAGAIN || errno == EINTR){ -+ }else{ -+ fprintf(stderr,"replication: send error\n"); -+ replication_close(); -+ break; -+ } -+ }else{ -+ c->wbytes -= w; -+ c->wcurr += w; -+ } -+ } -+ return(c->wbytes); -+} -+ -+static int replication_pop(void) -+{ -+ int r; -+ int c; -+ int m; -+ Q_ITEM **q; -+ -+ if(settings.verbose > 1) -+ fprintf(stderr, "replication: pop\n"); -+ -+ if(!rep_recv) -+ return(0); -+ -+ r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize); -+ if(r == -1){ -+ if(errno == EAGAIN || errno == EINTR){ -+ }else{ -+ fprintf(stderr,"replication: pop error %d\n", errno); -+ return(-1); -+ } -+ }if(r == 0){ -+ /* other end closed, trigger replication_close() */ -+ return(-1); -+ }else{ -+ c = r / sizeof(Q_ITEM *); -+ m = r % sizeof(Q_ITEM *); -+ q = (Q_ITEM **)(rep_recv->rbuf); -+ while(c--){ -+ if(q[c]){ -+ if(rep_conn && replication_cmd(rep_conn, q[c])){ -+ replication_item(q[c]); /* error retry */ -+ }else{ -+ qi_free(q[c]); -+ } -+ }else{ -+ if(!rep_exit){ -+ if (settings.verbose) -+ fprintf(stderr,"replication: cleanup start\n"); -+ rep_exit = 1; -+ } -+ } -+ } -+ } -+ if(rep_exit){ -+ if(rep_conn->wbytes){ -+ /* retry */ -+ if(replication_exit()){ -+ replication_close(); -+ fprintf(stderr,"replication: cleanup error\n"); -+ exit(EXIT_FAILURE); -+ } -+ }else{ -+ /* finish */ -+ replication_close(); -+ if (settings.verbose) -+ fprintf(stderr,"replication: cleanup complete\n"); -+ exit(EXIT_SUCCESS); -+ } -+ } -+ replication_marugoto(0); -+ return(0); -+} -+ -+static int replication_push(void) -+{ -+ int w; -+ -+ while(rep_send->wbytes){ -+ w = write(rep_send->sfd, rep_send->wcurr, rep_send->wbytes); -+ if(w == -1){ -+ if(errno == EAGAIN || errno == EINTR){ -+ fprintf(stderr,"replication: push EAGAIN or EINTR\n"); -+ }else{ -+ return(-1); -+ } -+ }else{ -+ rep_send->wbytes -= w; -+ rep_send->wcurr += w; -+ } -+ } -+ rep_send->wcurr = rep_send->wbuf; -+ return(0); -+} -+ -+static int replication_exit(void) -+{ -+ return(replication_item(NULL)); -+} -+ -+static int replication_item(Q_ITEM *q) -+{ -+ pthread_mutex_lock(&replication_pipe_lock); -+ if (!rep_send) { -+ pthread_mutex_unlock(&replication_pipe_lock); -+ return 0; -+ } -+ if(rep_send->wcurr + rep_send->wbytes + sizeof(q) > rep_send->wbuf + rep_send->wsize){ -+ fprintf(stderr,"replication: buffer over fllow\n"); -+ if(q){ -+ qi_free(q); -+ } -+ pthread_mutex_unlock(&replication_pipe_lock); -+ replication_dispatch_close(); -+ return(-1); -+ } -+ memcpy(rep_send->wcurr + rep_send->wbytes, &q, sizeof(q)); -+ rep_send->wbytes += sizeof(q); -+ if(replication_push()){ -+ fprintf(stderr, "replication: push error\n"); -+ if(q){ -+ qi_free(q); -+ } -+ pthread_mutex_unlock(&replication_pipe_lock); -+ replication_dispatch_close(); -+ return(-1); -+ } -+ pthread_mutex_unlock(&replication_pipe_lock); -+ return(0); -+} -+ -+int replication(enum CMD_TYPE type, R_CMD *cmd) -+{ -+ Q_ITEM *q; -+ -+ pthread_mutex_lock(&replication_pipe_lock); -+ if (!rep_send) { -+ pthread_mutex_unlock(&replication_pipe_lock); -+ return 0; -+ } -+ pthread_mutex_unlock(&replication_pipe_lock); -+ -+ if((q = qi_new(type, cmd, false))) { -+ replication_item(q); -+ }else{ -+ fprintf(stderr,"replication: can't create Q_ITEM\n"); -+ replication_dispatch_close(); -+ return(-1); -+ } -+ return(0); -+} -+#endif /* USE_REPLICATION */ ---- memcached-1.4.5/memcached.h~ 2010-05-06 14:09:51.000000000 +0300 -+++ memcached-1.4.5/memcached.h 2010-05-06 14:10:13.518051741 +0300 -@@ -144,7 +144,13 @@ - conn_swallow, /**< swallowing unnecessary bytes w/o storing */ - conn_closing, /**< closing this connection */ - conn_mwrite, /**< writing out many items sequentially */ -- conn_max_state /**< Max state value (used for assertion) */ -+#ifdef USE_REPLICATION -+ conn_repconnect, /**< replication connecting to master */ -+ conn_rep_listen, /**< replication listening socket */ -+ conn_pipe_recv, /**< replication command pipe recv */ -+ conn_pipe_send, /**< replication command pipe send */ -+#endif /* USE_REPLICATION */ -+ conn_max_state, /**< Max state value (used for assertion) */ - }; - - enum bin_substates { -@@ -247,7 +247,9 @@ - uint64_t get_misses; - uint64_t evictions; - uint64_t reclaimed; -+#if 0 - time_t started; /* when the process was started */ -+#endif - bool accepting_conns; /* whether we are currently accepting */ - uint64_t listen_disabled_num; - }; -@@ -274,6 +282,11 @@ - int backlog; - int item_size_max; /* Maximum item size, and upper end for slabs */ - bool sasl; /* SASL on/off */ -+#ifdef USE_REPLICATION -+ struct in_addr rep_addr; /* replication addr */ -+ int rep_port; /* replication port */ -+ int rep_qmax; /* replication QITEM max */ -+#endif /*USE_REPLICATION*/ - }; - - extern struct stats stats; -@@ -286,6 +299,10 @@ - /* temp */ - #define ITEM_SLABBED 4 - -+#ifdef USE_REPLICATION -+#define ITEM_REPDATA 128 -+#endif /*USE_REPLICATION*/ -+ - /** - * Structure for storing items within memcached. - */ -@@ -438,6 +455,10 @@ - #include "trace.h" - #include "hash.h" - #include "util.h" -+ -+#ifdef USE_REPLICATION -+#include "replication.h" -+#endif /* USE_REPLICATION */ - - /* - * Functions such as the libevent-related calls that need to do cross-thread ---- memcached-1.4.4/replication.c Thu Jan 1 03:00:00 1970 -+++ repcached-2.2-1.4.4/replication.c Wed Feb 10 18:40:48 2010 -@@ -0,0 +1,355 @@ -+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -+/* -+ * -+ */ -+#include "memcached.h" -+#include "replication.h" -+#include <stdlib.h> -+#include <stdio.h> -+#include <unistd.h> -+#include <string.h> -+#include <errno.h> -+ -+static Q_ITEM *q_freelist = NULL; -+static int q_itemcount = 0; -+static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER; -+ -+int get_qi_count(void) -+{ -+ int c; -+ pthread_mutex_lock(&replication_queue_lock); -+ c = q_itemcount; -+ pthread_mutex_unlock(&replication_queue_lock); -+ return(c); -+} -+ -+Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse) -+{ -+ Q_ITEM *q = NULL; -+ char *key = NULL; -+ uint32_t keylen = 0; -+ rel_time_t time = 0; -+ -+ pthread_mutex_lock(&replication_queue_lock); -+ if(q_freelist){ -+ q = q_freelist; -+ q_freelist = q->next; -+ } -+ -+ if(NULL == q){ -+ if(reuse) { -+ pthread_mutex_unlock(&replication_queue_lock); -+ return(NULL); -+ } -+ if(q_itemcount >= settings.rep_qmax) { -+ pthread_mutex_unlock(&replication_queue_lock); -+ return(NULL); -+ } -+ q = malloc(sizeof(Q_ITEM)); -+ if (NULL == q){ -+ fprintf(stderr,"replication: qi_new out of memory\n"); -+ pthread_mutex_unlock(&replication_queue_lock); -+ return(NULL); -+ } -+ q_itemcount++; -+ if (settings.verbose > 2) -+ fprintf(stderr,"replication: alloc c=%d\n", q_itemcount); -+ } -+ -+ pthread_mutex_unlock(&replication_queue_lock); -+ -+ switch (type) { -+ case REPLICATION_REP: -+ case REPLICATION_DEL: -+ key = cmd->key; -+ keylen = cmd->keylen; -+ break; -+ case REPLICATION_FLUSH_ALL: -+ break; -+ case REPLICATION_DEFER_FLUSH_ALL: -+ time = cmd->time; -+ break; -+ case REPLICATION_MARUGOTO_END: -+ break; -+ default: -+ fprintf(stderr,"replication: got unknown command: %d\n", type); -+ return(NULL); -+ } -+ -+ q->key = NULL; -+ q->type = type; -+ q->time = time; -+ q->next = NULL; -+ if (keylen) { -+ q->key = malloc(keylen + 1); -+ if(NULL == q->key){ -+ qi_free(q); -+ q = NULL; -+ }else{ -+ memcpy(q->key, key, keylen); -+ *(q->key + keylen) = 0; -+ } -+ } -+ -+ return(q); -+} -+ -+void qi_free(Q_ITEM *q) -+{ -+ if(q){ -+ if(q->key){ -+ free(q->key); -+ q->key = NULL; -+ } -+ pthread_mutex_lock(&replication_queue_lock); -+ q->next = q_freelist; -+ q_freelist = q; -+ pthread_mutex_unlock(&replication_queue_lock); -+ } -+} -+ -+int qi_free_list() -+{ -+ int c = 0; -+ Q_ITEM *q = NULL; -+ -+ pthread_mutex_lock(&replication_queue_lock); -+ while((q = q_freelist)){ -+ q_itemcount--; -+ c++; -+ q_freelist = q->next; -+ free(q); -+ } -+ pthread_mutex_unlock(&replication_queue_lock); -+ return(c); -+} -+ -+static int replication_get_num(char *p, int n) -+{ -+ int l; -+ char b[64]; -+ if(p) -+ l = sprintf(p, "%u", n); -+ else -+ l = sprintf(b, "%u", n); -+ return(l); -+} -+ -+int replication_call_rep(char *key, size_t keylen) -+{ -+ R_CMD r; -+ r.key = key; -+ r.keylen = keylen; -+ return(replication(REPLICATION_REP, &r)); -+} -+ -+int replication_call_del(char *key, size_t keylen) -+{ -+ R_CMD r; -+ r.key = key; -+ r.keylen = keylen; -+ return(replication(REPLICATION_DEL, &r)); -+} -+ -+int replication_call_flush_all() -+{ -+ R_CMD r; -+ r.key = NULL; -+ return(replication(REPLICATION_FLUSH_ALL, &r)); -+} -+ -+int replication_call_defer_flush_all(const rel_time_t time) -+{ -+ R_CMD r; -+ r.key = NULL; -+ r.time = time; -+ return(replication(REPLICATION_DEFER_FLUSH_ALL, &r)); -+} -+ -+int replication_call_marugoto_end() -+{ -+ R_CMD r; -+ r.key = NULL; -+ return(replication(REPLICATION_MARUGOTO_END, &r)); -+} -+ -+static int replication_alloc(conn *c, int s) -+{ -+ char *p; -+ s += c->wbytes; -+ if(c->wsize < s){ -+ while(c->wsize < s) -+ c->wsize += 4096; -+ if((p = malloc(c->wsize))){ -+ memcpy(p, c->wbuf, c->wbytes); -+ free(c->wbuf); -+ c->wbuf = p; -+ }else{ -+ return(-1); -+ } -+ } -+ return(0); -+} -+ -+static int replication_del(conn *c, char *k) -+{ -+ int l = 0; -+ char *s = "delete "; -+ char *n = "\r\n"; -+ char *p = NULL; -+ -+ l += strlen(s); -+ l += strlen(k); -+ l += strlen(n); -+ if(replication_alloc(c,l) == -1){ -+ fprintf(stderr, "replication: del malloc error\n"); -+ return(-1); -+ } -+ p = c->wbuf + c->wbytes; -+ memcpy(p, s, strlen(s)); -+ p += strlen(s); -+ memcpy(p, k, strlen(k)); -+ p += strlen(k); -+ memcpy(p, n, strlen(n)); -+ p += strlen(n); -+ c->wbytes = p - c->wbuf; -+ c->wcurr = c->wbuf; -+ return(0); -+} -+ -+static int replication_rep(conn *c, item *it) -+{ -+ int exp = 0; -+ int len = 0; -+ char *s = "rep "; -+ char *n = "\r\n"; -+ char *p = NULL; -+ char flag[40]; -+ -+ if(it->exptime) -+ exp = it->exptime + process_started; -+ flag[0]=0; -+ if((p=ITEM_suffix(it))){ -+ int i; -+ memcpy(flag, p, it->nsuffix - 2); -+ flag[it->nsuffix - 2] = 0; -+ for(i=0;i<strlen(flag);i++){ -+ if(flag[i] > ' ') -+ break; -+ } -+ memmove(flag,&flag[i],strlen(flag)-i); -+ for(p=flag;*p>' ';p++); -+ *p=0; -+ } -+ len += strlen(s); -+ len += it->nkey; -+ len += 1; -+ len += strlen(flag); -+ len += 1; -+ len += replication_get_num(NULL, exp); -+ len += 1; -+ len += replication_get_num(NULL, it->nbytes - 2); -+ len += 1; -+ len += replication_get_num(NULL, ITEM_get_cas(it)); -+ len += strlen(n); -+ len += it->nbytes; -+ len += strlen(n); -+ if(replication_alloc(c,len) == -1){ -+ fprintf(stderr, "replication: rep malloc error\n"); -+ return(-1); -+ } -+ p = c->wbuf + c->wbytes; -+ memcpy(p, s, strlen(s)); -+ p += strlen(s); -+ memcpy(p, ITEM_key(it), it->nkey); -+ p += it->nkey; -+ *(p++) = ' '; -+ memcpy(p, flag, strlen(flag)); -+ p += strlen(flag); -+ *(p++) = ' '; -+ p += replication_get_num(p, exp); -+ *(p++) = ' '; -+ p += replication_get_num(p, it->nbytes - 2); -+ *(p++) = ' '; -+ p += replication_get_num(p, ITEM_get_cas(it)); -+ memcpy(p, n, strlen(n)); -+ p += strlen(n); -+ memcpy(p, ITEM_data(it), it->nbytes); -+ p += it->nbytes; -+ c->wbytes = p - c->wbuf; -+ c->wcurr = c->wbuf; -+ return(0); -+} -+ -+static int replication_flush_all(conn *c, rel_time_t exp) -+{ -+ char *s = "flush_all "; -+ char *n = "\r\n"; -+ char *p = NULL; -+ -+ int l = strlen(s) + strlen(n); -+ if (exp > 0) -+ l += replication_get_num(NULL, exp); -+ if(replication_alloc(c,l) == -1){ -+ fprintf(stderr, "replication: flush_all malloc error\n"); -+ return(-1); -+ } -+ p = c->wbuf + c->wbytes; -+ memcpy(p, s, strlen(s)); -+ p += strlen(s); -+ if (exp > 0) -+ p += replication_get_num(p, exp); -+ memcpy(p, n, strlen(n)); -+ p += strlen(n); -+ c->wbytes = p - c->wbuf; -+ c->wcurr = c->wbuf; -+ return(0); -+} -+ -+static int replication_marugoto_end(conn *c) -+{ -+ char *s = "marugoto_end"; -+ char *n = "\r\n"; -+ char *p = NULL; -+ -+ int l = strlen(s) + strlen(n); -+ if(replication_alloc(c,l) == -1){ -+ fprintf(stderr, "replication: marugoto_end malloc error\n"); -+ return(-1); -+ } -+ p = c->wbuf + c->wbytes; -+ memcpy(p, s, strlen(s)); -+ p += strlen(s); -+ memcpy(p, n, strlen(n)); -+ p += strlen(n); -+ c->wbytes = p - c->wbuf; -+ c->wcurr = c->wbuf; -+ return(0); -+} -+ -+int replication_cmd(conn *c, Q_ITEM *q) -+{ -+ item *it; -+ int r; -+ -+ switch (q->type) { -+ case REPLICATION_REP: -+ it = item_get(q->key, strlen(q->key)); -+ if (!it) -+ return(replication_del(c, q->key)); -+ r = replication_rep(c, it); -+ item_remove(it); -+ return r; -+ case REPLICATION_DEL: -+ return(replication_del(c, q->key)); -+ case REPLICATION_FLUSH_ALL: -+ return(replication_flush_all(c, 0)); -+ case REPLICATION_DEFER_FLUSH_ALL: -+ return(replication_flush_all(c, q->time)); -+ case REPLICATION_MARUGOTO_END: -+ return(replication_marugoto_end(c)); -+ default: -+ fprintf(stderr,"replication: got unknown command:%d\n", q->type); -+ return(0); -+ } -+} ---- memcached-1.4.4/replication.h Thu Jan 1 03:00:00 1970 -+++ repcached-2.2-1.4.4/replication.h Wed Feb 10 18:40:31 2010 -@@ -0,0 +1,42 @@ -+#ifndef MEMCACHED_REPLICATION_H -+#define MEMCACHED_REPLICATION_H -+#define REPCACHED_VERSION "2.2" -+#include <netdb.h> -+ -+enum CMD_TYPE { -+ REPLICATION_REP, -+ REPLICATION_DEL, -+ REPLICATION_FLUSH_ALL, -+ REPLICATION_DEFER_FLUSH_ALL, -+ REPLICATION_MARUGOTO_END, -+}; -+ -+typedef struct queue_item_t Q_ITEM; -+struct queue_item_t { -+ enum CMD_TYPE type; -+ char *key; -+ rel_time_t time; -+ Q_ITEM *next; -+}; -+ -+typedef struct replication_cmd_t R_CMD; -+struct replication_cmd_t { -+ char *key; -+ int keylen; -+ rel_time_t time; -+}; -+ -+Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool); -+void qi_free(Q_ITEM *); -+int qi_free_list(void); -+int replication_cmd(conn *, Q_ITEM *); -+int get_qi_count(void); -+ -+int replication_call_rep(char *key, size_t keylen); -+int replication_call_del(char *key, size_t keylen); -+int replication_call_flush_all(void); -+int replication_call_defer_flush_all(const rel_time_t time); -+int replication_call_marugoto_end(void); -+int replication(enum CMD_TYPE type, R_CMD *cmd); -+ -+#endif ---- memcached-1.4.5/t/binary.t~ 2010-04-03 10:07:16.000000000 +0300 -+++ memcached-1.4.5/t/binary.t 2010-05-06 14:13:25.718440750 +0300 -@@ -2,11 +2,13 @@ - - use strict; - use warnings; --use Test::More tests => 3361; -+use Test::More; - use FindBin qw($Bin); - use lib "$Bin/lib"; - use MemcachedTest; - -+Test::More::plan(tests => 3361 + (support_replication() ? 36 : 0)); -+ - my $server = new_memcached(); - ok($server, "started the server"); - ---- memcached-1.4.4/t/issue_67.t Sun Nov 1 01:44:09 2009 -+++ repcached-2.2-1.4.4/t/issue_67.t Wed Feb 10 17:50:12 2010 -@@ -41,6 +41,10 @@ - my $exe = "$builddir/memcached-debug"; - croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe; - -+ if (support_replication()) { -+ $args .= ' -X 0'; -+ } -+ - my $childpid = fork(); - - my $cmd = "$builddir/timedrun 10 $exe $args"; ---- memcached-1.4.4/t/lib/MemcachedTest.pm Fri Oct 30 04:24:52 2009 -+++ repcached-2.2-1.4.4/t/lib/MemcachedTest.pm Wed Feb 10 17:53:34 2010 -@@ -13,7 +13,8 @@ - - - @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats -- supports_sasl free_port); -+ supports_sasl free_port support_replication memcached_version -+ version2num); - - sub sleep { - my $n = shift; -@@ -148,6 +149,23 @@ - return 0; - } - -+sub support_replication { -+ my $output = `$builddir/memcached-debug -h`; -+ return 1 if $output =~ /^-x <ip_addr>/m; -+ return 0; -+} -+ -+sub memcached_version { -+ my $output = `$builddir/memcached-debug -h`; -+ return $1 if $output =~ /^memcached (\d[\d\.]+)/; -+ return 0; -+} -+ -+sub version2num { -+ my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/); -+ return $major*100**2 + $minor*100 + $pl -+} -+ - sub new_memcached { - my ($args, $passed_port) = @_; - my $port = $passed_port || free_port(); -@@ -171,6 +189,9 @@ - } - if ($< == 0) { - $args .= " -u root"; -+ } -+ if (support_replication() && $args !~ m/-X/) { -+ $args .= ' -X 0'; - } - - my $childpid = fork(); ---- memcached-1.4.5/t/stats.t~ 2010-04-03 10:07:16.000000000 +0300 -+++ memcached-1.4.5/t/stats.t 2010-05-06 14:15:28.521352735 +0300 -@@ -57,7 +57,8 @@ - my $stats = mem_stats($sock); - - # Test number of keys --is(scalar(keys(%$stats)), 38, "38 stats values"); -+my $keys = 38 + (support_replication() ? 3 : 0); -+is(scalar(keys(%$stats)), $keys, "$keys stats values"); - - # Test initial state - foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses ---- memcached-1.4.4/testapp.c Wed Nov 25 03:40:29 2009 -+++ repcached-2.2-1.4.4/testapp.c Wed Feb 10 17:52:05 2010 -@@ -300,6 +300,10 @@ - argv[arg++] = "-1"; - argv[arg++] = "-U"; - argv[arg++] = "0"; -+#ifdef USE_REPLICATION -+ argv[arg++] = "-X"; -+ argv[arg++] = "0"; -+#endif - /* Handle rpmbuild and the like doing this as root */ - if (getuid() == 0) { - argv[arg++] = "-u"; |