]> git.pld-linux.org Git - packages/memcached.git/blame - repcached.patch
up to 1.4.17
[packages/memcached.git] / repcached.patch
CommitLineData
894d11c1
ER
1--- memcached-1.4.4/Makefile.am Fri Oct 30 04:24:52 2009
2+++ repcached-2.2-1.4.4/Makefile.am Tue Feb 9 23:02:45 2010
3@@ -31,6 +31,10 @@
4 memcached_SOURCES += sasl_defs.c
5 endif
6
7+if ENABLE_REPLICATION
8+memcached_SOURCES += replication.h replication.c
9+endif
10+
11 memcached_debug_SOURCES = $(memcached_SOURCES)
12 memcached_CPPFLAGS = -DNDEBUG
13 memcached_debug_LDADD = @PROFILER_LDFLAGS@
894d11c1
ER
14--- memcached-1.4.4/assoc.c Sat Oct 24 00:38:01 2009
15+++ repcached-2.2-1.4.4/assoc.c Tue Feb 9 23:02:45 2010
16@@ -258,3 +258,51 @@
17 }
18
19
20+#ifdef USE_REPLICATION
21+char *assoc_key_snap(int *n)
22+{
23+ char *p = NULL;
24+ char *b = NULL;
25+ item *i = NULL;
26+ int co = 0;
27+ int sz = 1;
28+ int hs = 0;
29+ int hm = hashsize(hashpower);
30+
31+ hs = hm;
32+ while(hs--){
33+ if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
34+ i = old_hashtable[hs];
35+ }else{
36+ i = primary_hashtable[hs];
37+ }
38+ while(i){
39+ sz += i->nkey + 1;
40+ co++;
41+ i = i->h_next;
42+ }
43+ }
44+
45+ if(co){
46+ if((p = b = malloc(sz))){
47+ hs = hm;
48+ while(hs--){
49+ if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
50+ i = old_hashtable[hs];
51+ }else{
52+ i = primary_hashtable[hs];
53+ }
54+ while(i){
55+ memcpy(p, ITEM_key(i), i->nkey);
56+ p += i->nkey;
57+ *(p++) = 0;
58+ i = i->h_next;
59+ }
60+ }
61+ *(p++) = 0;
62+ }
63+ }
64+ if(n) *n = co;
65+ return(b);
66+}
67+#endif /* USE_REPLICATION */
894d11c1
ER
68--- memcached-1.4.4/assoc.h Sun Aug 30 03:00:58 2009
69+++ repcached-2.2-1.4.4/assoc.h Tue Feb 9 23:02:45 2010
70@@ -7,3 +7,6 @@
71 int start_assoc_maintenance_thread(void);
72 void stop_assoc_maintenance_thread(void);
73
74+#ifdef USE_REPLICATION
75+char *assoc_key_snap(int *n);
76+#endif /*USE_REPLICATION*/
894d11c1
ER
77--- memcached-1.4.4/config.h.in Fri Nov 27 09:34:56 2009
78+++ repcached-2.2-1.4.4/config.h.in Wed Feb 10 19:12:46 2010
79@@ -99,6 +99,9 @@
80 /* Define to 1 if you have the ANSI C header files. */
81 #undef STDC_HEADERS
82
83+/* Define this if you want to use replication */
84+#undef USE_REPLICATION
85+
86 /* Version number of package */
87 #undef VERSION
88
894d11c1
ER
89--- memcached-1.4.4/configure.ac Wed Nov 25 03:40:29 2009
90+++ repcached-2.2-1.4.4/configure.ac Tue Feb 9 23:02:45 2010
91@@ -382,6 +382,18 @@
92 AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
93 fi
94
95+dnl Check whether the user wants replication or not
96+AC_ARG_ENABLE(replication,
97+ [AS_HELP_STRING([--enable-replication],[support replication])],
98+ [if test "x$enable_threads" = "xyes"; then
99+ AC_MSG_ERROR([Can't enable threads and replication together.])
100+ else
101+ AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication])
102+ fi
103+ ])
104+
105+AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes")
106+
107 AC_CHECK_FUNCS(mlockall)
108 AC_CHECK_FUNCS(getpagesizes)
109 AC_CHECK_FUNCS(memcntl)
894d11c1
ER
110--- memcached-1.4.4/items.c Sat Oct 24 00:38:01 2009
111+++ repcached-2.2-1.4.4/items.c Tue Feb 9 23:02:45 2010
112@@ -155,6 +155,9 @@
113 STATS_LOCK();
114 stats.evictions++;
115 STATS_UNLOCK();
116+#ifdef USE_REPLICATION
117+ replication_call_del(ITEM_key(search), search->nkey);
118+#endif /* USE_REPLICATION */
119 }
120 do_item_unlink(search);
121 break;
122@@ -288,8 +291,14 @@
123 stats.total_items += 1;
124 STATS_UNLOCK();
125
126+#ifdef USE_REPLICATION
127+ /* Allocate a new CAS ID on link. */
128+ if(!(it->it_flags & ITEM_REPDATA))
129+ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
130+#else
131 /* Allocate a new CAS ID on link. */
132 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
133+#endif /* USE_REPLICATION */
134
135 item_link_q(it);
136
894d11c1
ER
137--- memcached-1.4.4/memcached.c Fri Nov 27 08:45:13 2009
138+++ repcached-2.2-1.4.4/memcached.c Wed Feb 10 16:08:37 2010
139@@ -102,6 +102,30 @@
140
141 static void conn_free(conn *c);
142
143+#ifdef USE_REPLICATION
144+static int rep_exit = 0;
145+static conn *rep_recv = NULL;
146+static conn *rep_send = NULL;
147+static conn *rep_conn = NULL;
148+static conn *rep_serv = NULL;
149+static int server_socket_replication(const int);
150+static void server_close_replication(void);
151+static int replication_init(void);
152+static int replication_server_init(void);
153+static int replication_client_init(void);
154+static int replication_start(void);
155+static int replication_connect(void);
156+static int replication_close(void);
157+static void replication_dispatch_close(void);
158+static int replication_marugoto(int);
159+static int replication_send(conn *);
160+static int replication_pop(void);
161+static int replication_push(void);
162+static int replication_exit(void);
163+static int replication_item(Q_ITEM *);
164+static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER;
165+#endif /* USE_REPLICATION */
166+
167 /** exported globals **/
168 struct stats stats;
169 struct settings settings;
170@@ -194,6 +218,11 @@
171 settings.backlog = 1024;
172 settings.binding_protocol = negotiating_prot;
173 settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
174+#ifdef USE_REPLICATION
175+ settings.rep_addr.s_addr = htonl(INADDR_ANY);
176+ settings.rep_port = 11212;
177+ settings.rep_qmax = 8192;
178+#endif /* USE_REPLICATION */
179 }
180
181 /*
182@@ -382,6 +411,10 @@
183 prot_text(c->protocol));
184 } else if (IS_UDP(transport)) {
185 fprintf(stderr, "<%d server listening (udp)\n", sfd);
186+#ifdef USE_REPLICATION
187+ } else if (init_state == conn_rep_listen) {
188+ fprintf(stderr, "<%d server listening (replication)\n", sfd);
189+#endif /* USE_REPLICATION */
190 } else if (c->protocol == negotiating_prot) {
191 fprintf(stderr, "<%d new auto-negotiating client connection\n",
192 sfd);
193@@ -593,7 +626,11 @@
194 "conn_nread",
195 "conn_swallow",
196 "conn_closing",
197- "conn_mwrite" };
198+ "conn_mwrite",
199+ "conn_repconnect",
200+ "conn_rep_listen",
201+ "conn_pipe_recv",
202+ "conn_pipe_send" };
203 return statenames[state];
204 }
205
206@@ -752,6 +789,14 @@
207
208 assert(c != NULL);
209
210+#ifdef USE_REPLICATION
211+ if (c == rep_conn){
212+ if (settings.verbose > 1)
213+ fprintf(stderr, "REP>%d %s\n", c->sfd, str);
214+ conn_set_state(c, conn_new_cmd);
215+ return;
216+ }
217+#endif /* USE_REPLICATION */
218 if (c->noreply) {
219 if (settings.verbose > 1)
220 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
221@@ -791,9 +836,11 @@
222 int comm = c->cmd;
223 enum store_item_type ret;
224
225- pthread_mutex_lock(&c->thread->stats.mutex);
226- c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
227- pthread_mutex_unlock(&c->thread->stats.mutex);
228+ if (c->thread) {
229+ pthread_mutex_lock(&c->thread->stats.mutex);
230+ c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
231+ pthread_mutex_unlock(&c->thread->stats.mutex);
232+ }
233
234 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
235 out_string(c, "CLIENT_ERROR bad data chunk");
236@@ -832,6 +879,11 @@
237
238 switch (ret) {
239 case STORED:
240+#ifdef USE_REPLICATION
241+ if( c != rep_conn ){
242+ replication_call_rep(ITEM_key(it), it->nkey);
243+ }
244+#endif /* USE_REPLICATION */
245 out_string(c, "STORED");
246 break;
247 case EXISTS:
248@@ -2410,6 +2462,11 @@
249 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
250 APPEND_STAT("threads", "%d", settings.num_threads);
251 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
252+#ifdef USE_REPLICATION
253+ APPEND_STAT("replication", "MASTER", 0);
254+ APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION);
255+ APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count());
256+#endif /*USE_REPLICATION*/
257 STATS_UNLOCK();
258 }
259
260@@ -2797,6 +2854,11 @@
261 switch(add_delta(c, it, incr, delta, temp)) {
262 case OK:
263 out_string(c, temp);
264+#ifdef USE_REPLICATION
265+ if( c != rep_conn){
266+ replication_call_rep(ITEM_key(it), it->nkey);
267+ }
268+#endif /* USE_REPLICATION */
269 break;
270 case NON_NUMERIC:
271 out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
272@@ -2911,17 +2973,25 @@
273 if (it) {
274 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
275
276- pthread_mutex_lock(&c->thread->stats.mutex);
277- c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
278- pthread_mutex_unlock(&c->thread->stats.mutex);
279+ if (c->thread) {
280+ pthread_mutex_lock(&c->thread->stats.mutex);
281+ c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
282+ pthread_mutex_unlock(&c->thread->stats.mutex);
283+ }
284
285 item_unlink(it);
286 item_remove(it); /* release our reference */
287+#ifdef USE_REPLICATION
288+ if( c != rep_conn )
289+ replication_call_del(key, nkey);
290+#endif /* USE_REPLICATION */
291 out_string(c, "DELETED");
292 } else {
293- pthread_mutex_lock(&c->thread->stats.mutex);
294- c->thread->stats.delete_misses++;
295- pthread_mutex_unlock(&c->thread->stats.mutex);
296+ if (c->thread) {
297+ pthread_mutex_lock(&c->thread->stats.mutex);
298+ c->thread->stats.delete_misses++;
299+ pthread_mutex_unlock(&c->thread->stats.mutex);
300+ }
301
302 out_string(c, "NOT_FOUND");
303 }
304@@ -2986,6 +3056,22 @@
305
306 process_update_command(c, tokens, ntokens, comm, true);
307
308+#ifdef USE_REPLICATION
309+ } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) {
310+
311+ process_update_command(c, tokens, ntokens, comm, true);
312+ if(c->item)
313+ ((item *)(c->item))->it_flags |= ITEM_REPDATA;
314+
315+ } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) {
316+ if(replication_start() == -1)
317+ exit(EXIT_FAILURE);
318+ if (settings.verbose > 0)
319+ fprintf(stderr,"replication: start\n");
320+ out_string(c, "OK");
321+ return;
322+
323+#endif /* USE_REPLICATION */
324 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
325
326 process_arithmetic_command(c, tokens, ntokens, 1);
327@@ -3012,11 +3098,17 @@
328
329 set_noreply_maybe(c, tokens, ntokens);
330
331- pthread_mutex_lock(&c->thread->stats.mutex);
332- c->thread->stats.flush_cmds++;
333- pthread_mutex_unlock(&c->thread->stats.mutex);
334+ if (c->thread) {
335+ pthread_mutex_lock(&c->thread->stats.mutex);
336+ c->thread->stats.flush_cmds++;
337+ pthread_mutex_unlock(&c->thread->stats.mutex);
338+ }
339
340 if(ntokens == (c->noreply ? 3 : 2)) {
341+#ifdef USE_REPLICATION
342+ if( c != rep_conn )
343+ replication_call_flush_all();
344+#endif
345 settings.oldest_live = current_time - 1;
346 item_flush_expired();
347 out_string(c, "OK");
348@@ -3029,6 +3121,11 @@
349 return;
350 }
351
352+#ifdef USE_REPLICATION
353+ if( c != rep_conn )
354+ replication_call_defer_flush_all(realtime(exptime) + process_started);
355+#endif
356+ settings.oldest_live = realtime(exptime) - 1;
357 /*
358 If exptime is zero realtime() would return zero too, and
359 realtime(exptime) - 1 would overflow to the max unsigned
360@@ -3275,9 +3372,11 @@
361 int avail = c->rsize - c->rbytes;
362 res = read(c->sfd, c->rbuf + c->rbytes, avail);
363 if (res > 0) {
364- pthread_mutex_lock(&c->thread->stats.mutex);
365- c->thread->stats.bytes_read += res;
366- pthread_mutex_unlock(&c->thread->stats.mutex);
367+ if (c->thread) {
368+ pthread_mutex_lock(&c->thread->stats.mutex);
369+ c->thread->stats.bytes_read += res;
370+ pthread_mutex_unlock(&c->thread->stats.mutex);
371+ }
372 gotdata = READ_DATA_RECEIVED;
373 c->rbytes += res;
374 if (res == avail) {
375@@ -3423,6 +3522,12 @@
376
377 assert(c != NULL);
378
379+#ifdef USE_REPLICATION
380+ if(rep_exit && (c->state != conn_pipe_recv)){
381+ return;
382+ }
383+#endif /* USE_REPLICATION */
384+
385 while (!stop) {
386
387 switch(c->state) {
388@@ -3502,9 +3607,11 @@
389 if (nreqs >= 0) {
390 reset_cmd_handler(c);
391 } else {
392- pthread_mutex_lock(&c->thread->stats.mutex);
393- c->thread->stats.conn_yields++;
394- pthread_mutex_unlock(&c->thread->stats.mutex);
395+ if (c->thread) {
396+ pthread_mutex_lock(&c->thread->stats.mutex);
397+ c->thread->stats.conn_yields++;
398+ pthread_mutex_unlock(&c->thread->stats.mutex);
399+ }
400 if (c->rbytes > 0) {
401 /* We have already read in data into the input buffer,
402 so libevent will most likely not signal read events
403@@ -3545,9 +3652,11 @@
404 /* now try reading from the socket */
405 res = read(c->sfd, c->ritem, c->rlbytes);
406 if (res > 0) {
407- pthread_mutex_lock(&c->thread->stats.mutex);
408- c->thread->stats.bytes_read += res;
409- pthread_mutex_unlock(&c->thread->stats.mutex);
410+ if (c->thread) {
411+ pthread_mutex_lock(&c->thread->stats.mutex);
412+ c->thread->stats.bytes_read += res;
413+ pthread_mutex_unlock(&c->thread->stats.mutex);
414+ }
415 if (c->rcurr == c->ritem) {
416 c->rcurr += res;
417 }
418@@ -3600,9 +3709,11 @@
419 /* now try reading from the socket */
420 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
421 if (res > 0) {
422- pthread_mutex_lock(&c->thread->stats.mutex);
423- c->thread->stats.bytes_read += res;
424- pthread_mutex_unlock(&c->thread->stats.mutex);
425+ if (c->thread) {
426+ pthread_mutex_lock(&c->thread->stats.mutex);
427+ c->thread->stats.bytes_read += res;
428+ pthread_mutex_unlock(&c->thread->stats.mutex);
429+ }
430 c->sbytes -= res;
431 break;
432 }
433@@ -3698,6 +3809,10 @@
434 case conn_closing:
435 if (IS_UDP(c->transport))
436 conn_cleanup(c);
437+#ifdef USE_REPLICATION
438+ else if(c == rep_conn)
439+ replication_close();
440+#endif /*USE_REPLICATION*/
441 else
442 conn_close(c);
443 stop = true;
444@@ -3706,6 +3821,70 @@
445 case conn_max_state:
446 assert(false);
447 break;
448+
449+#ifdef USE_REPLICATION
450+ case conn_pipe_recv:
451+ if(replication_pop()){
452+ replication_close();
453+ }else{
454+ replication_send(rep_conn);
455+ }
456+ stop = true;
457+ break;
458+
459+ case conn_rep_listen:
460+ if (settings.verbose > 0)
461+ fprintf(stderr,"replication: accept\n");
462+ addrlen = sizeof(addr);
463+ res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
464+ if(res == -1){
465+ if(errno == EAGAIN || errno == EWOULDBLOCK) {
466+ } else if (errno == EMFILE) {
467+ fprintf(stderr, "replication: Too many opened connections\n");
468+ } else {
469+ fprintf(stderr, "replication: accept error\n");
470+ }
471+ }else{
472+ if(rep_conn){
473+ close(res);
474+ fprintf(stderr,"replication: already connected\n");
475+ }else{
476+ if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){
477+ close(res);
478+ fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n");
479+ }else{
480+ server_close_replication();
481+ rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
482+ rep_conn->item = NULL;
483+ rep_conn->rbytes = 0;
484+ rep_conn->rcurr = rep_conn->rbuf;
485+ replication_connect();
486+ replication_marugoto(1);
487+ replication_marugoto(0);
488+ }
489+ }
490+ }
491+ stop = true;
492+ break;
493+
494+ case conn_repconnect:
495+ rep_conn = c;
496+ replication_connect();
497+ conn_set_state(c, conn_read);
498+ if (settings.verbose > 0)
499+ fprintf(stderr,"replication: marugoto copying\n");
500+ if(!update_event(c, EV_READ | EV_PERSIST)){
501+ fprintf(stderr, "replication: Couldn't update event\n");
502+ conn_set_state(c, conn_closing);
503+ }
504+ stop = true;
505+ break;
506+
507+ case conn_pipe_send:
508+ /* should not happen */
509+ fprintf(stderr, "replication: unexpected conn_pipe_send state\n");
510+ break;
511+#endif /* USE_REPLICATION */
512 }
513 }
514
515@@ -4002,6 +4181,89 @@
516 return 0;
517 }
518
519+#ifdef USE_REPLICATION
520+static int server_socket_replication(const int port) {
521+ int sfd;
522+ struct linger ling = {0, 0};
523+ struct addrinfo *ai;
524+ struct addrinfo *next;
525+ struct addrinfo hints;
526+ char port_buf[NI_MAXSERV];
527+ int error;
528+ int success = 0;
529+
530+ int flags =1;
531+
532+ memset(&hints, 0, sizeof (hints));
533+ hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
534+ hints.ai_family = AF_UNSPEC;
535+ hints.ai_protocol = IPPROTO_TCP;
536+ hints.ai_socktype = SOCK_STREAM;
537+ snprintf(port_buf, NI_MAXSERV, "%d", port);
538+ error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
539+ if (error != 0) {
540+ if (error != EAI_SYSTEM)
541+ fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
542+ else
543+ perror("getaddrinfo()");
544+
545+ return 1;
546+ }
547+
548+ for (next= ai; next; next= next->ai_next) {
549+ conn *rep_serv_add;
550+ if ((sfd = new_socket(next)) == -1) {
551+ freeaddrinfo(ai);
552+ return 1;
553+ }
554+ setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
555+ setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
556+ setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
557+ setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
558+
559+ if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
560+ if (errno != EADDRINUSE) {
561+ perror("bind()");
562+ close(sfd);
563+ freeaddrinfo(ai);
564+ return 1;
565+ }
566+ close(sfd);
567+ continue;
568+ } else {
569+ success++;
570+ if (listen(sfd, 1024) == -1) {
571+ perror("listen()");
572+ close(sfd);
573+ freeaddrinfo(ai);
574+ return 1;
575+ }
576+ }
577+
578+ if (!(rep_serv_add = conn_new(sfd, conn_rep_listen,
579+ EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) {
580+ fprintf(stderr, "failed to create replication server connection\n");
581+ exit(EXIT_FAILURE);
582+ }
583+
584+ rep_serv_add->next = rep_serv;
585+ rep_serv = rep_serv_add;
586+ }
587+
588+ freeaddrinfo(ai);
589+
590+ /* Return zero iff we detected no errors in starting up connections */
591+ return success == 0;
592+}
593+
594+static void server_close_replication(void) {
595+ while(rep_serv){
596+ conn_close(rep_serv);
597+ rep_serv = rep_serv->next;
598+ }
599+}
600+#endif /* USE_REPLICATION */
601+
602 /*
603 * We keep the current time of day in a global variable that's updated by a
604 * timer event. This saves us a bunch of time() system calls (we really only
605@@ -4041,6 +4303,9 @@
606
607 static void usage(void) {
608 printf(PACKAGE " " VERSION "\n");
609+#ifdef USE_REPLICATION
610+ printf("repcached %s\n",REPCACHED_VERSION);
611+#endif /* USE_REPLICATION */
612 printf("-p <num> TCP port number to listen on (default: 11211)\n"
613 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
614 "-s <file> UNIX socket path to listen on (disables network support)\n"
615@@ -4088,6 +4353,10 @@
616 #ifdef ENABLE_SASL
617 printf("-S Turn on Sasl authentication\n");
618 #endif
619+#ifdef USE_REPLICATION
620+ printf("-x <ip_addr> hostname or IP address of peer repcached\n");
621+ printf("-X <num> TCP port number for replication (default: 11212)\n");
622+#endif /* USE_REPLICATION */
623 return;
624 }
625
626@@ -4194,6 +4463,26 @@
627 exit(EXIT_SUCCESS);
628 }
629
630+#ifdef USE_REPLICATION
631+static void sig_handler_cb(int fd, short event, void *arg)
632+{
633+ struct event *signal = arg;
634+
635+ if (settings.verbose)
636+ fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal));
637+
638+ if (replication_exit()) {
639+ exit(EXIT_FAILURE);
640+ }
641+
642+ pthread_mutex_lock(&replication_pipe_lock);
643+ if (!rep_send) {
644+ exit(EXIT_SUCCESS);
645+ }
646+ pthread_mutex_unlock(&replication_pipe_lock);
647+}
648+#endif /* USE_REPLICATION */
649+
650 #ifndef HAVE_SIGIGNORE
651 static int sigignore(int sig) {
652 struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
653@@ -4249,6 +4538,57 @@
654 #endif
655 }
656
657+static void create_listening_sockets(void)
658+{
659+ /* create unix mode sockets after dropping privileges */
660+ if (settings.socketpath != NULL) {
661+ errno = 0;
662+ if (server_socket_unix(settings.socketpath,settings.access)) {
663+ vperror("failed to listen on UNIX socket: %s", settings.socketpath);
664+ exit(EX_OSERR);
665+ }
666+ }
667+
668+ /* create the listening socket, bind it, and init */
669+ if (settings.socketpath == NULL) {
670+ const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
671+ char temp_portnumber_filename[PATH_MAX];
672+ FILE *portnumber_file = NULL;
673+
674+ if (portnumber_filename != NULL) {
675+ snprintf(temp_portnumber_filename,
676+ sizeof(temp_portnumber_filename),
677+ "%s.lck", portnumber_filename);
678+
679+ portnumber_file = fopen(temp_portnumber_filename, "a");
680+ if (portnumber_file == NULL) {
681+ fprintf(stderr, "Failed to open \"%s\": %s\n",
682+ temp_portnumber_filename, strerror(errno));
683+ }
684+ }
685+
686+ errno = 0;
687+ if (settings.port && server_socket(settings.port, tcp_transport,
688+ portnumber_file)) {
689+ vperror("failed to listen on TCP port %d", settings.port);
690+ exit(EX_OSERR);
691+ }
692+
693+ /* create the UDP listening socket and bind it */
694+ errno = 0;
695+ if (settings.udpport && server_socket(settings.udpport, udp_transport,
696+ portnumber_file)) {
697+ vperror("failed to listen on UDP port %d", settings.udpport);
698+ exit(EX_OSERR);
699+ }
700+
701+ if (portnumber_file) {
702+ fclose(portnumber_file);
703+ rename(temp_portnumber_filename, portnumber_filename);
704+ }
705+ }
706+}
707+
708 int main (int argc, char **argv) {
709 int c;
710 bool lock_memory = false;
711@@ -4261,6 +4601,11 @@
712 struct rlimit rlim;
713 char unit = '\0';
714 int size_max = 0;
715+#ifdef USE_REPLICATION
716+ struct in_addr addr;
717+ struct addrinfo master_hint;
718+ struct addrinfo *master_addr;
719+#endif /* USE_REPLICATION */
720 /* listening sockets */
721 static int *l_socket = NULL;
722
723@@ -4307,6 +4652,11 @@
724 "B:" /* Binding protocol */
725 "I:" /* Max item size */
726 "S" /* Sasl ON */
727+#ifdef USE_REPLICATION
728+ "X:" /* replication port */
729+ "x:" /* replication master */
730+ "q:" /* replication queue length */
731+#endif /* USE_REPLICATION */
732 ))) {
733 switch (c) {
734 case 'a':
735@@ -4462,6 +4812,31 @@
736 );
737 }
738 break;
739+#ifdef USE_REPLICATION
740+ case 'x':
741+ if (inet_pton(AF_INET, optarg, &addr) <= 0) {
742+ memset(&master_hint, 0, sizeof(master_hint));
743+ master_hint.ai_flags = 0;
744+ master_hint.ai_socktype = 0;
745+ master_hint.ai_protocol = 0;
746+ if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){
747+ settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr;
748+ freeaddrinfo(master_addr);
749+ }else{
750+ fprintf(stderr, "Illegal address: %s\n", optarg);
751+ return 1;
752+ }
753+ } else {
754+ settings.rep_addr = addr;
755+ }
756+ break;
757+ case 'X':
758+ settings.rep_port = atoi(optarg);
759+ break;
760+ case 'q':
761+ settings.rep_qmax = atoi(optarg);
762+ break;
763+#endif /* USE_REPLICATION */
764 case 'S': /* set Sasl authentication to true. Default is false */
765 #ifndef ENABLE_SASL
766 fprintf(stderr, "This server is not built with SASL support.\n");
767@@ -4587,6 +4962,17 @@
768 /* initialize main thread libevent instance */
769 main_base = event_init();
770
771+#ifdef USE_REPLICATION
772+ /* register events for SIGINT and SIGTERM to handle them in main thread */
773+ struct event signal_int, signal_term;
774+ event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
775+ &signal_int);
776+ event_add(&signal_int, NULL);
777+ event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
778+ &signal_term);
779+ event_add(&signal_term, NULL);
780+#endif
781+
782 /* initialize other stuff */
783 stats_init();
784 assoc_init();
785@@ -4615,63 +5001,21 @@
786 /* initialise clock event */
787 clock_handler(0, 0, 0);
788
789- /* create unix mode sockets after dropping privileges */
790- if (settings.socketpath != NULL) {
791- errno = 0;
792- if (server_socket_unix(settings.socketpath,settings.access)) {
793- vperror("failed to listen on UNIX socket: %s", settings.socketpath);
794- exit(EX_OSERR);
795- }
796- }
797-
798- /* create the listening socket, bind it, and init */
799- if (settings.socketpath == NULL) {
800- int udp_port;
801-
802- const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
803- char temp_portnumber_filename[PATH_MAX];
804- FILE *portnumber_file = NULL;
805-
806- if (portnumber_filename != NULL) {
807- snprintf(temp_portnumber_filename,
808- sizeof(temp_portnumber_filename),
809- "%s.lck", portnumber_filename);
810-
811- portnumber_file = fopen(temp_portnumber_filename, "a");
812- if (portnumber_file == NULL) {
813- fprintf(stderr, "Failed to open \"%s\": %s\n",
814- temp_portnumber_filename, strerror(errno));
815- }
816- }
817-
818- errno = 0;
819- if (settings.port && server_socket(settings.port, tcp_transport,
820- portnumber_file)) {
821- vperror("failed to listen on TCP port %d", settings.port);
822- exit(EX_OSERR);
823- }
824-
825- /*
826- * initialization order: first create the listening sockets
827- * (may need root on low ports), then drop root if needed,
828- * then daemonise if needed, then init libevent (in some cases
829- * descriptors created by libevent wouldn't survive forking).
830- */
831- udp_port = settings.udpport ? settings.udpport : settings.port;
832-
833- /* create the UDP listening socket and bind it */
834- errno = 0;
835- if (settings.udpport && server_socket(settings.udpport, udp_transport,
836- portnumber_file)) {
837- vperror("failed to listen on UDP port %d", settings.udpport);
838- exit(EX_OSERR);
839- }
840+ /*
841+ * initialization order: first create the listening sockets
842+ * (may need root on low ports), then drop root if needed,
843+ * then daemonise if needed, then init libevent (in some cases
844+ * descriptors created by libevent wouldn't survive forking).
845+ */
846
847- if (portnumber_file) {
848- fclose(portnumber_file);
849- rename(temp_portnumber_filename, portnumber_filename);
850- }
851+#ifdef USE_REPLICATION
852+ if(replication_init() == -1){
853+ fprintf(stderr, "faild to replication init\n");
854+ exit(EXIT_FAILURE);
855 }
856+#else
857+ create_listening_sockets();
858+#endif
859
860 /* Drop privileges no longer needed */
861 drop_privileges();
862@@ -4694,3 +5038,401 @@
863
864 return EXIT_SUCCESS;
865 }
866+
867+#ifdef USE_REPLICATION
868+static int replication_start(void)
869+{
870+ static int start = 0;
871+ if(start)
872+ return(0);
873+
874+ create_listening_sockets();
875+
876+ start = 1;
877+ return(0);
878+}
879+
880+static int replication_server_init(void)
881+{
882+ rep_recv = NULL;
883+ rep_send = NULL;
884+ rep_conn = NULL;
885+ if(server_socket_replication(settings.rep_port)){
886+ fprintf(stderr, "replication: failed to initialize replication server socket\n");
887+ return(-1);
888+ }
889+ if (settings.verbose > 0)
890+ fprintf(stderr, "replication: listen\n");
891+ return(replication_start());
892+}
893+
894+static int replication_client_init(void)
895+{
896+ int s;
897+ conn *c;
898+ struct addrinfo ai;
899+ struct sockaddr_in server;
900+
901+ rep_recv = NULL;
902+ rep_send = NULL;
903+ rep_conn = NULL;
904+
905+ memset(&ai,0,sizeof(ai));
906+ ai.ai_family = AF_INET;
907+ ai.ai_socktype = SOCK_STREAM;
908+ s = new_socket(&ai);
909+
910+ if(s == -1) {
911+ fprintf(stderr, "replication: failed to replication client socket\n");
912+ return(-1);
913+ }else{
914+ /* connect */
915+ memset((char *)&server, 0, sizeof(server));
916+ server.sin_family = AF_INET;
917+ server.sin_addr = settings.rep_addr;
918+ server.sin_port = htons(settings.rep_port);
919+ if (settings.verbose > 0)
920+ fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port);
921+ if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){
922+ c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
923+ if(c == NULL){
924+ fprintf(stderr, "replication: failed to create client conn");
925+ close(s);
926+ return(-1);
927+ }
928+ drive_machine(c);
929+ }else{
930+ if(errno == EINPROGRESS){
931+ c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
932+ if(c == NULL){
933+ fprintf(stderr, "replication: failed to create client conn");
934+ close(s);
935+ return(-1);
936+ }
937+ }else{
938+ fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port));
939+ close(s);
940+ return(-1);
941+ }
942+ }
943+ }
944+ return(0);
945+}
946+
947+static int replication_init(void)
948+{
949+ if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){
950+ if(replication_client_init() != -1){
951+ return(0);
952+ }
953+ }
954+ return(replication_server_init());
955+}
956+
957+static int replication_connect(void)
958+{
959+ int f;
960+ int p[2];
961+
962+ if(pipe(p) == -1){
963+ fprintf(stderr, "replication: can't create pipe\n");
964+ return(-1);
965+ }else{
966+ if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) {
967+ fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
968+ return(-1);
969+ }
970+ if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) {
971+ fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
972+ return(-1);
973+ }
974+ pthread_mutex_lock(&replication_pipe_lock);
975+ rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
976+ rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
977+ event_del(&rep_send->event);
978+ pthread_mutex_unlock(&replication_pipe_lock);
979+ }
980+ return(0);
981+}
982+
983+static int replication_close(void)
984+{
985+ int c;
986+ int r;
987+ Q_ITEM *q;
988+
989+ if(settings.verbose > 0)
990+ fprintf(stderr,"replication: close\n");
991+ if(rep_recv){
992+ rep_recv->rbytes = sizeof(q);
993+ rep_recv->rcurr = rep_recv->rbuf;
994+ c = 0;
995+ do{
996+ r = read(rep_recv->sfd, rep_recv->rcurr, rep_recv->rbytes);
997+ if(r == -1){
998+ break;
999+ }
1000+ rep_recv->rbytes -= r;
1001+ rep_recv->rcurr += r;
1002+ if(!rep_recv->rbytes){
1003+ memcpy(&q, rep_recv->rbuf, sizeof(q));
1004+ rep_recv->rbytes = sizeof(q);
1005+ rep_recv->rcurr = rep_recv->rbuf;
1006+ qi_free(q);
1007+ c++;
1008+ }
1009+ }while(r);
1010+ conn_close(rep_recv);
1011+ rep_recv = NULL;
1012+ if (settings.verbose > 1) {
1013+ fprintf(stderr, "replication: qitem free %d items\n", qi_free_list());
1014+ fprintf(stderr, "replication: close recv %d items\n", c);
1015+ }
1016+ }
1017+ pthread_mutex_lock(&replication_pipe_lock);
1018+ if(rep_send){
1019+ conn_close(rep_send);
1020+ rep_send = NULL;
1021+ if (settings.verbose > 1)
1022+ fprintf(stderr,"replication: close send\n");
1023+ }
1024+ pthread_mutex_unlock(&replication_pipe_lock);
1025+ if(rep_conn){
1026+ conn_close(rep_conn);
1027+ rep_conn = NULL;
1028+ if (settings.verbose > 1)
1029+ fprintf(stderr,"replication: close conn\n");
1030+ }
1031+ if(!rep_exit)
1032+ replication_server_init();
1033+ return(0);
1034+}
1035+
1036+static void replication_dispatch_close(void)
1037+{
1038+ if (settings.verbose > 1)
1039+ fprintf(stderr, "replication: dispatch close\n");
1040+ pthread_mutex_lock(&replication_pipe_lock);
1041+ if (rep_send) {
1042+ conn_close(rep_send);
1043+ rep_send = NULL;
1044+ }
1045+ pthread_mutex_unlock(&replication_pipe_lock);
1046+}
1047+
1048+static int replication_marugoto(int f)
1049+{
1050+ static int keysend = 0;
1051+ static int keycount = 0;
1052+ static char *keylist = NULL;
1053+ static char *keyptr = NULL;
1054+
1055+ if(f){
1056+ if(keylist){
1057+ free(keylist);
1058+ keylist = NULL;
1059+ keyptr = NULL;
1060+ keycount = 0;
1061+ keysend = 0;
1062+ }
1063+ pthread_mutex_lock(&cache_lock);
1064+ keylist = (char *)assoc_key_snap((int *)&keycount);
1065+ pthread_mutex_unlock(&cache_lock);
1066+ keyptr = keylist;
1067+ if (!keyptr){
1068+ replication_call_marugoto_end();
1069+ }else{
1070+ if (settings.verbose > 0)
1071+ fprintf(stderr,"replication: marugoto start\n");
1072+ }
1073+ }else{
1074+ if(keyptr){
1075+ while(*keyptr){
1076+ item *it = item_get(keyptr, strlen(keyptr));
1077+ if(it){
1078+ item_remove(it);
1079+ if(replication_call_rep(keyptr, strlen(keyptr)) == -1){
1080+ return(-1);
1081+ }else{
1082+ keysend++;
1083+ keyptr += strlen(keyptr) + 1;
1084+ return(0);
1085+ }
1086+ }
1087+ keyptr += strlen(keyptr) + 1;
1088+ }
1089+ if(settings.verbose > 0)
1090+ fprintf(stderr,"replication: marugoto %d\n", keysend);
1091+ replication_call_marugoto_end();
1092+ if(settings.verbose > 0)
1093+ fprintf(stderr,"replication: marugoto owari\n");
1094+ free(keylist);
1095+ keylist = NULL;
1096+ keyptr = NULL;
1097+ keycount = 0;
1098+ keysend = 0;
1099+ }
1100+ }
1101+ return(0);
1102+}
1103+
1104+static int replication_send(conn *c)
1105+{
1106+ while(c->wbytes){
1107+ int w = write(c->sfd, c->wcurr, c->wbytes);
1108+ if(w == -1){
1109+ if(errno == EAGAIN || errno == EINTR){
1110+ }else{
1111+ fprintf(stderr,"replication: send error\n");
1112+ replication_close();
1113+ break;
1114+ }
1115+ }else{
1116+ c->wbytes -= w;
1117+ c->wcurr += w;
1118+ }
1119+ }
1120+ return(c->wbytes);
1121+}
1122+
1123+static int replication_pop(void)
1124+{
1125+ int r;
1126+ int c;
1127+ int m;
1128+ Q_ITEM **q;
1129+
1130+ if(settings.verbose > 1)
1131+ fprintf(stderr, "replication: pop\n");
1132+
1133+ if(!rep_recv)
1134+ return(0);
1135+
1136+ r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize);
1137+ if(r == -1){
1138+ if(errno == EAGAIN || errno == EINTR){
1139+ }else{
1140+ fprintf(stderr,"replication: pop error %d\n", errno);
1141+ return(-1);
1142+ }
1143+ }if(r == 0){
1144+ /* other end closed, trigger replication_close() */
1145+ return(-1);
1146+ }else{
1147+ c = r / sizeof(Q_ITEM *);
1148+ m = r % sizeof(Q_ITEM *);
1149+ q = (Q_ITEM **)(rep_recv->rbuf);
1150+ while(c--){
1151+ if(q[c]){
1152+ if(rep_conn && replication_cmd(rep_conn, q[c])){
1153+ replication_item(q[c]); /* error retry */
1154+ }else{
1155+ qi_free(q[c]);
1156+ }
1157+ }else{
1158+ if(!rep_exit){
1159+ if (settings.verbose)
1160+ fprintf(stderr,"replication: cleanup start\n");
1161+ rep_exit = 1;
1162+ }
1163+ }
1164+ }
1165+ }
1166+ if(rep_exit){
1167+ if(rep_conn->wbytes){
1168+ /* retry */
1169+ if(replication_exit()){
1170+ replication_close();
1171+ fprintf(stderr,"replication: cleanup error\n");
1172+ exit(EXIT_FAILURE);
1173+ }
1174+ }else{
1175+ /* finish */
1176+ replication_close();
1177+ if (settings.verbose)
1178+ fprintf(stderr,"replication: cleanup complete\n");
1179+ exit(EXIT_SUCCESS);
1180+ }
1181+ }
1182+ replication_marugoto(0);
1183+ return(0);
1184+}
1185+
1186+static int replication_push(void)
1187+{
1188+ int w;
1189+
1190+ while(rep_send->wbytes){
1191+ w = write(rep_send->sfd, rep_send->wcurr, rep_send->wbytes);
1192+ if(w == -1){
1193+ if(errno == EAGAIN || errno == EINTR){
1194+ fprintf(stderr,"replication: push EAGAIN or EINTR\n");
1195+ }else{
1196+ return(-1);
1197+ }
1198+ }else{
1199+ rep_send->wbytes -= w;
1200+ rep_send->wcurr += w;
1201+ }
1202+ }
1203+ rep_send->wcurr = rep_send->wbuf;
1204+ return(0);
1205+}
1206+
1207+static int replication_exit(void)
1208+{
1209+ return(replication_item(NULL));
1210+}
1211+
1212+static int replication_item(Q_ITEM *q)
1213+{
1214+ pthread_mutex_lock(&replication_pipe_lock);
1215+ if (!rep_send) {
1216+ pthread_mutex_unlock(&replication_pipe_lock);
1217+ return 0;
1218+ }
1219+ if(rep_send->wcurr + rep_send->wbytes + sizeof(q) > rep_send->wbuf + rep_send->wsize){
1220+ fprintf(stderr,"replication: buffer over fllow\n");
1221+ if(q){
1222+ qi_free(q);
1223+ }
1224+ pthread_mutex_unlock(&replication_pipe_lock);
1225+ replication_dispatch_close();
1226+ return(-1);
1227+ }
1228+ memcpy(rep_send->wcurr + rep_send->wbytes, &q, sizeof(q));
1229+ rep_send->wbytes += sizeof(q);
1230+ if(replication_push()){
1231+ fprintf(stderr, "replication: push error\n");
1232+ if(q){
1233+ qi_free(q);
1234+ }
1235+ pthread_mutex_unlock(&replication_pipe_lock);
1236+ replication_dispatch_close();
1237+ return(-1);
1238+ }
1239+ pthread_mutex_unlock(&replication_pipe_lock);
1240+ return(0);
1241+}
1242+
1243+int replication(enum CMD_TYPE type, R_CMD *cmd)
1244+{
1245+ Q_ITEM *q;
1246+
1247+ pthread_mutex_lock(&replication_pipe_lock);
1248+ if (!rep_send) {
1249+ pthread_mutex_unlock(&replication_pipe_lock);
1250+ return 0;
1251+ }
1252+ pthread_mutex_unlock(&replication_pipe_lock);
1253+
1254+ if((q = qi_new(type, cmd, false))) {
1255+ replication_item(q);
1256+ }else{
1257+ fprintf(stderr,"replication: can't create Q_ITEM\n");
1258+ replication_dispatch_close();
1259+ return(-1);
1260+ }
1261+ return(0);
1262+}
1263+#endif /* USE_REPLICATION */
c4fcb0e4
ER
1264--- memcached-1.4.5/memcached.h~ 2010-05-06 14:09:51.000000000 +0300
1265+++ memcached-1.4.5/memcached.h 2010-05-06 14:10:13.518051741 +0300
894d11c1
ER
1266@@ -144,7 +144,13 @@
1267 conn_swallow, /**< swallowing unnecessary bytes w/o storing */
1268 conn_closing, /**< closing this connection */
1269 conn_mwrite, /**< writing out many items sequentially */
1270- conn_max_state /**< Max state value (used for assertion) */
1271+#ifdef USE_REPLICATION
1272+ conn_repconnect, /**< replication connecting to master */
1273+ conn_rep_listen, /**< replication listening socket */
1274+ conn_pipe_recv, /**< replication command pipe recv */
1275+ conn_pipe_send, /**< replication command pipe send */
1276+#endif /* USE_REPLICATION */
1277+ conn_max_state, /**< Max state value (used for assertion) */
1278 };
1279
1280 enum bin_substates {
c4fcb0e4 1281@@ -247,7 +247,9 @@
894d11c1
ER
1282 uint64_t get_misses;
1283 uint64_t evictions;
c4fcb0e4 1284 uint64_t reclaimed;
894d11c1
ER
1285+#if 0
1286 time_t started; /* when the process was started */
1287+#endif
1288 bool accepting_conns; /* whether we are currently accepting */
1289 uint64_t listen_disabled_num;
1290 };
1291@@ -274,6 +282,11 @@
1292 int backlog;
1293 int item_size_max; /* Maximum item size, and upper end for slabs */
1294 bool sasl; /* SASL on/off */
1295+#ifdef USE_REPLICATION
1296+ struct in_addr rep_addr; /* replication addr */
1297+ int rep_port; /* replication port */
1298+ int rep_qmax; /* replication QITEM max */
1299+#endif /*USE_REPLICATION*/
1300 };
1301
1302 extern struct stats stats;
1303@@ -286,6 +299,10 @@
1304 /* temp */
1305 #define ITEM_SLABBED 4
1306
1307+#ifdef USE_REPLICATION
1308+#define ITEM_REPDATA 128
1309+#endif /*USE_REPLICATION*/
1310+
1311 /**
1312 * Structure for storing items within memcached.
1313 */
1314@@ -438,6 +455,10 @@
1315 #include "trace.h"
1316 #include "hash.h"
1317 #include "util.h"
1318+
1319+#ifdef USE_REPLICATION
1320+#include "replication.h"
1321+#endif /* USE_REPLICATION */
1322
1323 /*
1324 * Functions such as the libevent-related calls that need to do cross-thread
894d11c1
ER
1325--- memcached-1.4.4/replication.c Thu Jan 1 03:00:00 1970
1326+++ repcached-2.2-1.4.4/replication.c Wed Feb 10 18:40:48 2010
1327@@ -0,0 +1,355 @@
1328+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
1329+/*
1330+ *
1331+ */
1332+#include "memcached.h"
1333+#include "replication.h"
1334+#include <stdlib.h>
1335+#include <stdio.h>
1336+#include <unistd.h>
1337+#include <string.h>
1338+#include <errno.h>
1339+
1340+static Q_ITEM *q_freelist = NULL;
1341+static int q_itemcount = 0;
1342+static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER;
1343+
1344+int get_qi_count(void)
1345+{
1346+ int c;
1347+ pthread_mutex_lock(&replication_queue_lock);
1348+ c = q_itemcount;
1349+ pthread_mutex_unlock(&replication_queue_lock);
1350+ return(c);
1351+}
1352+
1353+Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse)
1354+{
1355+ Q_ITEM *q = NULL;
1356+ char *key = NULL;
1357+ uint32_t keylen = 0;
1358+ rel_time_t time = 0;
1359+
1360+ pthread_mutex_lock(&replication_queue_lock);
1361+ if(q_freelist){
1362+ q = q_freelist;
1363+ q_freelist = q->next;
1364+ }
1365+
1366+ if(NULL == q){
1367+ if(reuse) {
1368+ pthread_mutex_unlock(&replication_queue_lock);
1369+ return(NULL);
1370+ }
1371+ if(q_itemcount >= settings.rep_qmax) {
1372+ pthread_mutex_unlock(&replication_queue_lock);
1373+ return(NULL);
1374+ }
1375+ q = malloc(sizeof(Q_ITEM));
1376+ if (NULL == q){
1377+ fprintf(stderr,"replication: qi_new out of memory\n");
1378+ pthread_mutex_unlock(&replication_queue_lock);
1379+ return(NULL);
1380+ }
1381+ q_itemcount++;
1382+ if (settings.verbose > 2)
1383+ fprintf(stderr,"replication: alloc c=%d\n", q_itemcount);
1384+ }
1385+
1386+ pthread_mutex_unlock(&replication_queue_lock);
1387+
1388+ switch (type) {
1389+ case REPLICATION_REP:
1390+ case REPLICATION_DEL:
1391+ key = cmd->key;
1392+ keylen = cmd->keylen;
1393+ break;
1394+ case REPLICATION_FLUSH_ALL:
1395+ break;
1396+ case REPLICATION_DEFER_FLUSH_ALL:
1397+ time = cmd->time;
1398+ break;
1399+ case REPLICATION_MARUGOTO_END:
1400+ break;
1401+ default:
1402+ fprintf(stderr,"replication: got unknown command: %d\n", type);
1403+ return(NULL);
1404+ }
1405+
1406+ q->key = NULL;
1407+ q->type = type;
1408+ q->time = time;
1409+ q->next = NULL;
1410+ if (keylen) {
1411+ q->key = malloc(keylen + 1);
1412+ if(NULL == q->key){
1413+ qi_free(q);
1414+ q = NULL;
1415+ }else{
1416+ memcpy(q->key, key, keylen);
1417+ *(q->key + keylen) = 0;
1418+ }
1419+ }
1420+
1421+ return(q);
1422+}
1423+
1424+void qi_free(Q_ITEM *q)
1425+{
1426+ if(q){
1427+ if(q->key){
1428+ free(q->key);
1429+ q->key = NULL;
1430+ }
1431+ pthread_mutex_lock(&replication_queue_lock);
1432+ q->next = q_freelist;
1433+ q_freelist = q;
1434+ pthread_mutex_unlock(&replication_queue_lock);
1435+ }
1436+}
1437+
1438+int qi_free_list()
1439+{
1440+ int c = 0;
1441+ Q_ITEM *q = NULL;
1442+
1443+ pthread_mutex_lock(&replication_queue_lock);
1444+ while((q = q_freelist)){
1445+ q_itemcount--;
1446+ c++;
1447+ q_freelist = q->next;
1448+ free(q);
1449+ }
1450+ pthread_mutex_unlock(&replication_queue_lock);
1451+ return(c);
1452+}
1453+
1454+static int replication_get_num(char *p, int n)
1455+{
1456+ int l;
1457+ char b[64];
1458+ if(p)
1459+ l = sprintf(p, "%u", n);
1460+ else
1461+ l = sprintf(b, "%u", n);
1462+ return(l);
1463+}
1464+
1465+int replication_call_rep(char *key, size_t keylen)
1466+{
1467+ R_CMD r;
1468+ r.key = key;
1469+ r.keylen = keylen;
1470+ return(replication(REPLICATION_REP, &r));
1471+}
1472+
1473+int replication_call_del(char *key, size_t keylen)
1474+{
1475+ R_CMD r;
1476+ r.key = key;
1477+ r.keylen = keylen;
1478+ return(replication(REPLICATION_DEL, &r));
1479+}
1480+
1481+int replication_call_flush_all()
1482+{
1483+ R_CMD r;
1484+ r.key = NULL;
1485+ return(replication(REPLICATION_FLUSH_ALL, &r));
1486+}
1487+
1488+int replication_call_defer_flush_all(const rel_time_t time)
1489+{
1490+ R_CMD r;
1491+ r.key = NULL;
1492+ r.time = time;
1493+ return(replication(REPLICATION_DEFER_FLUSH_ALL, &r));
1494+}
1495+
1496+int replication_call_marugoto_end()
1497+{
1498+ R_CMD r;
1499+ r.key = NULL;
1500+ return(replication(REPLICATION_MARUGOTO_END, &r));
1501+}
1502+
1503+static int replication_alloc(conn *c, int s)
1504+{
1505+ char *p;
1506+ s += c->wbytes;
1507+ if(c->wsize < s){
1508+ while(c->wsize < s)
1509+ c->wsize += 4096;
1510+ if((p = malloc(c->wsize))){
1511+ memcpy(p, c->wbuf, c->wbytes);
1512+ free(c->wbuf);
1513+ c->wbuf = p;
1514+ }else{
1515+ return(-1);
1516+ }
1517+ }
1518+ return(0);
1519+}
1520+
1521+static int replication_del(conn *c, char *k)
1522+{
1523+ int l = 0;
1524+ char *s = "delete ";
1525+ char *n = "\r\n";
1526+ char *p = NULL;
1527+
1528+ l += strlen(s);
1529+ l += strlen(k);
1530+ l += strlen(n);
1531+ if(replication_alloc(c,l) == -1){
1532+ fprintf(stderr, "replication: del malloc error\n");
1533+ return(-1);
1534+ }
1535+ p = c->wbuf + c->wbytes;
1536+ memcpy(p, s, strlen(s));
1537+ p += strlen(s);
1538+ memcpy(p, k, strlen(k));
1539+ p += strlen(k);
1540+ memcpy(p, n, strlen(n));
1541+ p += strlen(n);
1542+ c->wbytes = p - c->wbuf;
1543+ c->wcurr = c->wbuf;
1544+ return(0);
1545+}
1546+
1547+static int replication_rep(conn *c, item *it)
1548+{
1549+ int exp = 0;
1550+ int len = 0;
1551+ char *s = "rep ";
1552+ char *n = "\r\n";
1553+ char *p = NULL;
1554+ char flag[40];
1555+
1556+ if(it->exptime)
1557+ exp = it->exptime + process_started;
1558+ flag[0]=0;
1559+ if((p=ITEM_suffix(it))){
1560+ int i;
1561+ memcpy(flag, p, it->nsuffix - 2);
1562+ flag[it->nsuffix - 2] = 0;
1563+ for(i=0;i<strlen(flag);i++){
1564+ if(flag[i] > ' ')
1565+ break;
1566+ }
1567+ memmove(flag,&flag[i],strlen(flag)-i);
1568+ for(p=flag;*p>' ';p++);
1569+ *p=0;
1570+ }
1571+ len += strlen(s);
1572+ len += it->nkey;
1573+ len += 1;
1574+ len += strlen(flag);
1575+ len += 1;
1576+ len += replication_get_num(NULL, exp);
1577+ len += 1;
1578+ len += replication_get_num(NULL, it->nbytes - 2);
1579+ len += 1;
1580+ len += replication_get_num(NULL, ITEM_get_cas(it));
1581+ len += strlen(n);
1582+ len += it->nbytes;
1583+ len += strlen(n);
1584+ if(replication_alloc(c,len) == -1){
1585+ fprintf(stderr, "replication: rep malloc error\n");
1586+ return(-1);
1587+ }
1588+ p = c->wbuf + c->wbytes;
1589+ memcpy(p, s, strlen(s));
1590+ p += strlen(s);
1591+ memcpy(p, ITEM_key(it), it->nkey);
1592+ p += it->nkey;
1593+ *(p++) = ' ';
1594+ memcpy(p, flag, strlen(flag));
1595+ p += strlen(flag);
1596+ *(p++) = ' ';
1597+ p += replication_get_num(p, exp);
1598+ *(p++) = ' ';
1599+ p += replication_get_num(p, it->nbytes - 2);
1600+ *(p++) = ' ';
1601+ p += replication_get_num(p, ITEM_get_cas(it));
1602+ memcpy(p, n, strlen(n));
1603+ p += strlen(n);
1604+ memcpy(p, ITEM_data(it), it->nbytes);
1605+ p += it->nbytes;
1606+ c->wbytes = p - c->wbuf;
1607+ c->wcurr = c->wbuf;
1608+ return(0);
1609+}
1610+
1611+static int replication_flush_all(conn *c, rel_time_t exp)
1612+{
1613+ char *s = "flush_all ";
1614+ char *n = "\r\n";
1615+ char *p = NULL;
1616+
1617+ int l = strlen(s) + strlen(n);
1618+ if (exp > 0)
1619+ l += replication_get_num(NULL, exp);
1620+ if(replication_alloc(c,l) == -1){
1621+ fprintf(stderr, "replication: flush_all malloc error\n");
1622+ return(-1);
1623+ }
1624+ p = c->wbuf + c->wbytes;
1625+ memcpy(p, s, strlen(s));
1626+ p += strlen(s);
1627+ if (exp > 0)
1628+ p += replication_get_num(p, exp);
1629+ memcpy(p, n, strlen(n));
1630+ p += strlen(n);
1631+ c->wbytes = p - c->wbuf;
1632+ c->wcurr = c->wbuf;
1633+ return(0);
1634+}
1635+
1636+static int replication_marugoto_end(conn *c)
1637+{
1638+ char *s = "marugoto_end";
1639+ char *n = "\r\n";
1640+ char *p = NULL;
1641+
1642+ int l = strlen(s) + strlen(n);
1643+ if(replication_alloc(c,l) == -1){
1644+ fprintf(stderr, "replication: marugoto_end malloc error\n");
1645+ return(-1);
1646+ }
1647+ p = c->wbuf + c->wbytes;
1648+ memcpy(p, s, strlen(s));
1649+ p += strlen(s);
1650+ memcpy(p, n, strlen(n));
1651+ p += strlen(n);
1652+ c->wbytes = p - c->wbuf;
1653+ c->wcurr = c->wbuf;
1654+ return(0);
1655+}
1656+
1657+int replication_cmd(conn *c, Q_ITEM *q)
1658+{
1659+ item *it;
1660+ int r;
1661+
1662+ switch (q->type) {
1663+ case REPLICATION_REP:
1664+ it = item_get(q->key, strlen(q->key));
1665+ if (!it)
1666+ return(replication_del(c, q->key));
1667+ r = replication_rep(c, it);
1668+ item_remove(it);
1669+ return r;
1670+ case REPLICATION_DEL:
1671+ return(replication_del(c, q->key));
1672+ case REPLICATION_FLUSH_ALL:
1673+ return(replication_flush_all(c, 0));
1674+ case REPLICATION_DEFER_FLUSH_ALL:
1675+ return(replication_flush_all(c, q->time));
1676+ case REPLICATION_MARUGOTO_END:
1677+ return(replication_marugoto_end(c));
1678+ default:
1679+ fprintf(stderr,"replication: got unknown command:%d\n", q->type);
1680+ return(0);
1681+ }
1682+}
894d11c1
ER
1683--- memcached-1.4.4/replication.h Thu Jan 1 03:00:00 1970
1684+++ repcached-2.2-1.4.4/replication.h Wed Feb 10 18:40:31 2010
1685@@ -0,0 +1,42 @@
1686+#ifndef MEMCACHED_REPLICATION_H
1687+#define MEMCACHED_REPLICATION_H
1688+#define REPCACHED_VERSION "2.2"
1689+#include <netdb.h>
1690+
1691+enum CMD_TYPE {
1692+ REPLICATION_REP,
1693+ REPLICATION_DEL,
1694+ REPLICATION_FLUSH_ALL,
1695+ REPLICATION_DEFER_FLUSH_ALL,
1696+ REPLICATION_MARUGOTO_END,
1697+};
1698+
1699+typedef struct queue_item_t Q_ITEM;
1700+struct queue_item_t {
1701+ enum CMD_TYPE type;
1702+ char *key;
1703+ rel_time_t time;
1704+ Q_ITEM *next;
1705+};
1706+
1707+typedef struct replication_cmd_t R_CMD;
1708+struct replication_cmd_t {
1709+ char *key;
1710+ int keylen;
1711+ rel_time_t time;
1712+};
1713+
1714+Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool);
1715+void qi_free(Q_ITEM *);
1716+int qi_free_list(void);
1717+int replication_cmd(conn *, Q_ITEM *);
1718+int get_qi_count(void);
1719+
1720+int replication_call_rep(char *key, size_t keylen);
1721+int replication_call_del(char *key, size_t keylen);
1722+int replication_call_flush_all(void);
1723+int replication_call_defer_flush_all(const rel_time_t time);
1724+int replication_call_marugoto_end(void);
1725+int replication(enum CMD_TYPE type, R_CMD *cmd);
1726+
1727+#endif
c4fcb0e4
ER
1728--- memcached-1.4.5/t/binary.t~ 2010-04-03 10:07:16.000000000 +0300
1729+++ memcached-1.4.5/t/binary.t 2010-05-06 14:13:25.718440750 +0300
1730@@ -2,11 +2,13 @@
894d11c1
ER
1731
1732 use strict;
1733 use warnings;
c4fcb0e4 1734-use Test::More tests => 3361;
894d11c1
ER
1735+use Test::More;
1736 use FindBin qw($Bin);
1737 use lib "$Bin/lib";
1738 use MemcachedTest;
894d11c1 1739
c4fcb0e4
ER
1740+Test::More::plan(tests => 3361 + (support_replication() ? 36 : 0));
1741+
894d11c1
ER
1742 my $server = new_memcached();
1743 ok($server, "started the server");
c4fcb0e4 1744
894d11c1
ER
1745--- memcached-1.4.4/t/issue_67.t Sun Nov 1 01:44:09 2009
1746+++ repcached-2.2-1.4.4/t/issue_67.t Wed Feb 10 17:50:12 2010
1747@@ -41,6 +41,10 @@
1748 my $exe = "$builddir/memcached-debug";
1749 croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe;
1750
1751+ if (support_replication()) {
1752+ $args .= ' -X 0';
1753+ }
1754+
1755 my $childpid = fork();
1756
1757 my $cmd = "$builddir/timedrun 10 $exe $args";
894d11c1
ER
1758--- memcached-1.4.4/t/lib/MemcachedTest.pm Fri Oct 30 04:24:52 2009
1759+++ repcached-2.2-1.4.4/t/lib/MemcachedTest.pm Wed Feb 10 17:53:34 2010
1760@@ -13,7 +13,8 @@
1761
1762
1763 @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats
1764- supports_sasl free_port);
1765+ supports_sasl free_port support_replication memcached_version
1766+ version2num);
1767
1768 sub sleep {
1769 my $n = shift;
1770@@ -148,6 +149,23 @@
1771 return 0;
1772 }
1773
1774+sub support_replication {
1775+ my $output = `$builddir/memcached-debug -h`;
1776+ return 1 if $output =~ /^-x <ip_addr>/m;
1777+ return 0;
1778+}
1779+
1780+sub memcached_version {
1781+ my $output = `$builddir/memcached-debug -h`;
1782+ return $1 if $output =~ /^memcached (\d[\d\.]+)/;
1783+ return 0;
1784+}
1785+
1786+sub version2num {
1787+ my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/);
1788+ return $major*100**2 + $minor*100 + $pl
1789+}
1790+
1791 sub new_memcached {
1792 my ($args, $passed_port) = @_;
1793 my $port = $passed_port || free_port();
1794@@ -171,6 +189,9 @@
1795 }
1796 if ($< == 0) {
1797 $args .= " -u root";
1798+ }
1799+ if (support_replication() && $args !~ m/-X/) {
1800+ $args .= ' -X 0';
1801 }
1802
1803 my $childpid = fork();
c4fcb0e4
ER
1804--- memcached-1.4.5/t/stats.t~ 2010-04-03 10:07:16.000000000 +0300
1805+++ memcached-1.4.5/t/stats.t 2010-05-06 14:15:28.521352735 +0300
1806@@ -57,7 +57,8 @@
894d11c1
ER
1807 my $stats = mem_stats($sock);
1808
1809 # Test number of keys
c4fcb0e4
ER
1810-is(scalar(keys(%$stats)), 38, "38 stats values");
1811+my $keys = 38 + (support_replication() ? 3 : 0);
1812+is(scalar(keys(%$stats)), $keys, "$keys stats values");
894d11c1
ER
1813
1814 # Test initial state
1815 foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses
894d11c1
ER
1816--- memcached-1.4.4/testapp.c Wed Nov 25 03:40:29 2009
1817+++ repcached-2.2-1.4.4/testapp.c Wed Feb 10 17:52:05 2010
1818@@ -300,6 +300,10 @@
1819 argv[arg++] = "-1";
1820 argv[arg++] = "-U";
1821 argv[arg++] = "0";
1822+#ifdef USE_REPLICATION
1823+ argv[arg++] = "-X";
1824+ argv[arg++] = "0";
1825+#endif
1826 /* Handle rpmbuild and the like doing this as root */
1827 if (getuid() == 0) {
1828 argv[arg++] = "-u";
This page took 0.30567 seconds and 4 git commands to generate.