]>
Commit | Line | Data |
---|---|---|
894d11c1 ER |
1 | --- memcached-1.4.4/Makefile.am Fri Oct 30 04:24:52 2009 |
2 | +++ repcached-2.2-1.4.4/Makefile.am Tue Feb 9 23:02:45 2010 | |
3 | @@ -31,6 +31,10 @@ | |
4 | memcached_SOURCES += sasl_defs.c | |
5 | endif | |
6 | ||
7 | +if ENABLE_REPLICATION | |
8 | +memcached_SOURCES += replication.h replication.c | |
9 | +endif | |
10 | + | |
11 | memcached_debug_SOURCES = $(memcached_SOURCES) | |
12 | memcached_CPPFLAGS = -DNDEBUG | |
13 | memcached_debug_LDADD = @PROFILER_LDFLAGS@ | |
894d11c1 ER |
14 | --- memcached-1.4.4/assoc.c Sat Oct 24 00:38:01 2009 |
15 | +++ repcached-2.2-1.4.4/assoc.c Tue Feb 9 23:02:45 2010 | |
16 | @@ -258,3 +258,51 @@ | |
17 | } | |
18 | ||
19 | ||
20 | +#ifdef USE_REPLICATION | |
21 | +char *assoc_key_snap(int *n) | |
22 | +{ | |
23 | + char *p = NULL; | |
24 | + char *b = NULL; | |
25 | + item *i = NULL; | |
26 | + int co = 0; | |
27 | + int sz = 1; | |
28 | + int hs = 0; | |
29 | + int hm = hashsize(hashpower); | |
30 | + | |
31 | + hs = hm; | |
32 | + while(hs--){ | |
33 | + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){ | |
34 | + i = old_hashtable[hs]; | |
35 | + }else{ | |
36 | + i = primary_hashtable[hs]; | |
37 | + } | |
38 | + while(i){ | |
39 | + sz += i->nkey + 1; | |
40 | + co++; | |
41 | + i = i->h_next; | |
42 | + } | |
43 | + } | |
44 | + | |
45 | + if(co){ | |
46 | + if((p = b = malloc(sz))){ | |
47 | + hs = hm; | |
48 | + while(hs--){ | |
49 | + if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){ | |
50 | + i = old_hashtable[hs]; | |
51 | + }else{ | |
52 | + i = primary_hashtable[hs]; | |
53 | + } | |
54 | + while(i){ | |
55 | + memcpy(p, ITEM_key(i), i->nkey); | |
56 | + p += i->nkey; | |
57 | + *(p++) = 0; | |
58 | + i = i->h_next; | |
59 | + } | |
60 | + } | |
61 | + *(p++) = 0; | |
62 | + } | |
63 | + } | |
64 | + if(n) *n = co; | |
65 | + return(b); | |
66 | +} | |
67 | +#endif /* USE_REPLICATION */ | |
894d11c1 ER |
68 | --- memcached-1.4.4/assoc.h Sun Aug 30 03:00:58 2009 |
69 | +++ repcached-2.2-1.4.4/assoc.h Tue Feb 9 23:02:45 2010 | |
70 | @@ -7,3 +7,6 @@ | |
71 | int start_assoc_maintenance_thread(void); | |
72 | void stop_assoc_maintenance_thread(void); | |
73 | ||
74 | +#ifdef USE_REPLICATION | |
75 | +char *assoc_key_snap(int *n); | |
76 | +#endif /*USE_REPLICATION*/ | |
894d11c1 ER |
77 | --- memcached-1.4.4/config.h.in Fri Nov 27 09:34:56 2009 |
78 | +++ repcached-2.2-1.4.4/config.h.in Wed Feb 10 19:12:46 2010 | |
79 | @@ -99,6 +99,9 @@ | |
80 | /* Define to 1 if you have the ANSI C header files. */ | |
81 | #undef STDC_HEADERS | |
82 | ||
83 | +/* Define this if you want to use replication */ | |
84 | +#undef USE_REPLICATION | |
85 | + | |
86 | /* Version number of package */ | |
87 | #undef VERSION | |
88 | ||
894d11c1 ER |
89 | --- memcached-1.4.4/configure.ac Wed Nov 25 03:40:29 2009 |
90 | +++ repcached-2.2-1.4.4/configure.ac Tue Feb 9 23:02:45 2010 | |
91 | @@ -382,6 +382,18 @@ | |
92 | AC_MSG_ERROR([Can't enable threads without the POSIX thread library.]) | |
93 | fi | |
94 | ||
95 | +dnl Check whether the user wants replication or not | |
96 | +AC_ARG_ENABLE(replication, | |
97 | + [AS_HELP_STRING([--enable-replication],[support replication])], | |
98 | + [if test "x$enable_threads" = "xyes"; then | |
99 | + AC_MSG_ERROR([Can't enable threads and replication together.]) | |
100 | + else | |
101 | + AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication]) | |
102 | + fi | |
103 | + ]) | |
104 | + | |
105 | +AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes") | |
106 | + | |
107 | AC_CHECK_FUNCS(mlockall) | |
108 | AC_CHECK_FUNCS(getpagesizes) | |
109 | AC_CHECK_FUNCS(memcntl) | |
894d11c1 ER |
110 | --- memcached-1.4.4/items.c Sat Oct 24 00:38:01 2009 |
111 | +++ repcached-2.2-1.4.4/items.c Tue Feb 9 23:02:45 2010 | |
112 | @@ -155,6 +155,9 @@ | |
113 | STATS_LOCK(); | |
114 | stats.evictions++; | |
115 | STATS_UNLOCK(); | |
116 | +#ifdef USE_REPLICATION | |
117 | + replication_call_del(ITEM_key(search), search->nkey); | |
118 | +#endif /* USE_REPLICATION */ | |
119 | } | |
120 | do_item_unlink(search); | |
121 | break; | |
122 | @@ -288,8 +291,14 @@ | |
123 | stats.total_items += 1; | |
124 | STATS_UNLOCK(); | |
125 | ||
126 | +#ifdef USE_REPLICATION | |
127 | + /* Allocate a new CAS ID on link. */ | |
128 | + if(!(it->it_flags & ITEM_REPDATA)) | |
129 | + ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); | |
130 | +#else | |
131 | /* Allocate a new CAS ID on link. */ | |
132 | ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); | |
133 | +#endif /* USE_REPLICATION */ | |
134 | ||
135 | item_link_q(it); | |
136 | ||
894d11c1 ER |
137 | --- memcached-1.4.4/memcached.c Fri Nov 27 08:45:13 2009 |
138 | +++ repcached-2.2-1.4.4/memcached.c Wed Feb 10 16:08:37 2010 | |
139 | @@ -102,6 +102,30 @@ | |
140 | ||
141 | static void conn_free(conn *c); | |
142 | ||
143 | +#ifdef USE_REPLICATION | |
144 | +static int rep_exit = 0; | |
145 | +static conn *rep_recv = NULL; | |
146 | +static conn *rep_send = NULL; | |
147 | +static conn *rep_conn = NULL; | |
148 | +static conn *rep_serv = NULL; | |
149 | +static int server_socket_replication(const int); | |
150 | +static void server_close_replication(void); | |
151 | +static int replication_init(void); | |
152 | +static int replication_server_init(void); | |
153 | +static int replication_client_init(void); | |
154 | +static int replication_start(void); | |
155 | +static int replication_connect(void); | |
156 | +static int replication_close(void); | |
157 | +static void replication_dispatch_close(void); | |
158 | +static int replication_marugoto(int); | |
159 | +static int replication_send(conn *); | |
160 | +static int replication_pop(void); | |
161 | +static int replication_push(void); | |
162 | +static int replication_exit(void); | |
163 | +static int replication_item(Q_ITEM *); | |
164 | +static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER; | |
165 | +#endif /* USE_REPLICATION */ | |
166 | + | |
167 | /** exported globals **/ | |
168 | struct stats stats; | |
169 | struct settings settings; | |
170 | @@ -194,6 +218,11 @@ | |
171 | settings.backlog = 1024; | |
172 | settings.binding_protocol = negotiating_prot; | |
173 | settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */ | |
174 | +#ifdef USE_REPLICATION | |
175 | + settings.rep_addr.s_addr = htonl(INADDR_ANY); | |
176 | + settings.rep_port = 11212; | |
177 | + settings.rep_qmax = 8192; | |
178 | +#endif /* USE_REPLICATION */ | |
179 | } | |
180 | ||
181 | /* | |
182 | @@ -382,6 +411,10 @@ | |
183 | prot_text(c->protocol)); | |
184 | } else if (IS_UDP(transport)) { | |
185 | fprintf(stderr, "<%d server listening (udp)\n", sfd); | |
186 | +#ifdef USE_REPLICATION | |
187 | + } else if (init_state == conn_rep_listen) { | |
188 | + fprintf(stderr, "<%d server listening (replication)\n", sfd); | |
189 | +#endif /* USE_REPLICATION */ | |
190 | } else if (c->protocol == negotiating_prot) { | |
191 | fprintf(stderr, "<%d new auto-negotiating client connection\n", | |
192 | sfd); | |
193 | @@ -593,7 +626,11 @@ | |
194 | "conn_nread", | |
195 | "conn_swallow", | |
196 | "conn_closing", | |
197 | - "conn_mwrite" }; | |
198 | + "conn_mwrite", | |
199 | + "conn_repconnect", | |
200 | + "conn_rep_listen", | |
201 | + "conn_pipe_recv", | |
202 | + "conn_pipe_send" }; | |
203 | return statenames[state]; | |
204 | } | |
205 | ||
206 | @@ -752,6 +789,14 @@ | |
207 | ||
208 | assert(c != NULL); | |
209 | ||
210 | +#ifdef USE_REPLICATION | |
211 | + if (c == rep_conn){ | |
212 | + if (settings.verbose > 1) | |
213 | + fprintf(stderr, "REP>%d %s\n", c->sfd, str); | |
214 | + conn_set_state(c, conn_new_cmd); | |
215 | + return; | |
216 | + } | |
217 | +#endif /* USE_REPLICATION */ | |
218 | if (c->noreply) { | |
219 | if (settings.verbose > 1) | |
220 | fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str); | |
221 | @@ -791,9 +836,11 @@ | |
222 | int comm = c->cmd; | |
223 | enum store_item_type ret; | |
224 | ||
225 | - pthread_mutex_lock(&c->thread->stats.mutex); | |
226 | - c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; | |
227 | - pthread_mutex_unlock(&c->thread->stats.mutex); | |
228 | + if (c->thread) { | |
229 | + pthread_mutex_lock(&c->thread->stats.mutex); | |
230 | + c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++; | |
231 | + pthread_mutex_unlock(&c->thread->stats.mutex); | |
232 | + } | |
233 | ||
234 | if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { | |
235 | out_string(c, "CLIENT_ERROR bad data chunk"); | |
236 | @@ -832,6 +879,11 @@ | |
237 | ||
238 | switch (ret) { | |
239 | case STORED: | |
240 | +#ifdef USE_REPLICATION | |
241 | + if( c != rep_conn ){ | |
242 | + replication_call_rep(ITEM_key(it), it->nkey); | |
243 | + } | |
244 | +#endif /* USE_REPLICATION */ | |
245 | out_string(c, "STORED"); | |
246 | break; | |
247 | case EXISTS: | |
248 | @@ -2410,6 +2462,11 @@ | |
249 | APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num); | |
250 | APPEND_STAT("threads", "%d", settings.num_threads); | |
251 | APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields); | |
252 | +#ifdef USE_REPLICATION | |
253 | + APPEND_STAT("replication", "MASTER", 0); | |
254 | + APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION); | |
255 | + APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count()); | |
256 | +#endif /*USE_REPLICATION*/ | |
257 | STATS_UNLOCK(); | |
258 | } | |
259 | ||
260 | @@ -2797,6 +2854,11 @@ | |
261 | switch(add_delta(c, it, incr, delta, temp)) { | |
262 | case OK: | |
263 | out_string(c, temp); | |
264 | +#ifdef USE_REPLICATION | |
265 | + if( c != rep_conn){ | |
266 | + replication_call_rep(ITEM_key(it), it->nkey); | |
267 | + } | |
268 | +#endif /* USE_REPLICATION */ | |
269 | break; | |
270 | case NON_NUMERIC: | |
271 | out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value"); | |
272 | @@ -2911,17 +2973,25 @@ | |
273 | if (it) { | |
274 | MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey); | |
275 | ||
276 | - pthread_mutex_lock(&c->thread->stats.mutex); | |
277 | - c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++; | |
278 | - pthread_mutex_unlock(&c->thread->stats.mutex); | |
279 | + if (c->thread) { | |
280 | + pthread_mutex_lock(&c->thread->stats.mutex); | |
281 | + c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++; | |
282 | + pthread_mutex_unlock(&c->thread->stats.mutex); | |
283 | + } | |
284 | ||
285 | item_unlink(it); | |
286 | item_remove(it); /* release our reference */ | |
287 | +#ifdef USE_REPLICATION | |
288 | + if( c != rep_conn ) | |
289 | + replication_call_del(key, nkey); | |
290 | +#endif /* USE_REPLICATION */ | |
291 | out_string(c, "DELETED"); | |
292 | } else { | |
293 | - pthread_mutex_lock(&c->thread->stats.mutex); | |
294 | - c->thread->stats.delete_misses++; | |
295 | - pthread_mutex_unlock(&c->thread->stats.mutex); | |
296 | + if (c->thread) { | |
297 | + pthread_mutex_lock(&c->thread->stats.mutex); | |
298 | + c->thread->stats.delete_misses++; | |
299 | + pthread_mutex_unlock(&c->thread->stats.mutex); | |
300 | + } | |
301 | ||
302 | out_string(c, "NOT_FOUND"); | |
303 | } | |
304 | @@ -2986,6 +3056,22 @@ | |
305 | ||
306 | process_update_command(c, tokens, ntokens, comm, true); | |
307 | ||
308 | +#ifdef USE_REPLICATION | |
309 | + } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) { | |
310 | + | |
311 | + process_update_command(c, tokens, ntokens, comm, true); | |
312 | + if(c->item) | |
313 | + ((item *)(c->item))->it_flags |= ITEM_REPDATA; | |
314 | + | |
315 | + } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) { | |
316 | + if(replication_start() == -1) | |
317 | + exit(EXIT_FAILURE); | |
318 | + if (settings.verbose > 0) | |
319 | + fprintf(stderr,"replication: start\n"); | |
320 | + out_string(c, "OK"); | |
321 | + return; | |
322 | + | |
323 | +#endif /* USE_REPLICATION */ | |
324 | } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { | |
325 | ||
326 | process_arithmetic_command(c, tokens, ntokens, 1); | |
327 | @@ -3012,11 +3098,17 @@ | |
328 | ||
329 | set_noreply_maybe(c, tokens, ntokens); | |
330 | ||
331 | - pthread_mutex_lock(&c->thread->stats.mutex); | |
332 | - c->thread->stats.flush_cmds++; | |
333 | - pthread_mutex_unlock(&c->thread->stats.mutex); | |
334 | + if (c->thread) { | |
335 | + pthread_mutex_lock(&c->thread->stats.mutex); | |
336 | + c->thread->stats.flush_cmds++; | |
337 | + pthread_mutex_unlock(&c->thread->stats.mutex); | |
338 | + } | |
339 | ||
340 | if(ntokens == (c->noreply ? 3 : 2)) { | |
341 | +#ifdef USE_REPLICATION | |
342 | + if( c != rep_conn ) | |
343 | + replication_call_flush_all(); | |
344 | +#endif | |
345 | settings.oldest_live = current_time - 1; | |
346 | item_flush_expired(); | |
347 | out_string(c, "OK"); | |
348 | @@ -3029,6 +3121,11 @@ | |
349 | return; | |
350 | } | |
351 | ||
352 | +#ifdef USE_REPLICATION | |
353 | + if( c != rep_conn ) | |
354 | + replication_call_defer_flush_all(realtime(exptime) + process_started); | |
355 | +#endif | |
356 | + settings.oldest_live = realtime(exptime) - 1; | |
357 | /* | |
358 | If exptime is zero realtime() would return zero too, and | |
359 | realtime(exptime) - 1 would overflow to the max unsigned | |
360 | @@ -3275,9 +3372,11 @@ | |
361 | int avail = c->rsize - c->rbytes; | |
362 | res = read(c->sfd, c->rbuf + c->rbytes, avail); | |
363 | if (res > 0) { | |
364 | - pthread_mutex_lock(&c->thread->stats.mutex); | |
365 | - c->thread->stats.bytes_read += res; | |
366 | - pthread_mutex_unlock(&c->thread->stats.mutex); | |
367 | + if (c->thread) { | |
368 | + pthread_mutex_lock(&c->thread->stats.mutex); | |
369 | + c->thread->stats.bytes_read += res; | |
370 | + pthread_mutex_unlock(&c->thread->stats.mutex); | |
371 | + } | |
372 | gotdata = READ_DATA_RECEIVED; | |
373 | c->rbytes += res; | |
374 | if (res == avail) { | |
375 | @@ -3423,6 +3522,12 @@ | |
376 | ||
377 | assert(c != NULL); | |
378 | ||
379 | +#ifdef USE_REPLICATION | |
380 | + if(rep_exit && (c->state != conn_pipe_recv)){ | |
381 | + return; | |
382 | + } | |
383 | +#endif /* USE_REPLICATION */ | |
384 | + | |
385 | while (!stop) { | |
386 | ||
387 | switch(c->state) { | |
388 | @@ -3502,9 +3607,11 @@ | |
389 | if (nreqs >= 0) { | |
390 | reset_cmd_handler(c); | |
391 | } else { | |
392 | - pthread_mutex_lock(&c->thread->stats.mutex); | |
393 | - c->thread->stats.conn_yields++; | |
394 | - pthread_mutex_unlock(&c->thread->stats.mutex); | |
395 | + if (c->thread) { | |
396 | + pthread_mutex_lock(&c->thread->stats.mutex); | |
397 | + c->thread->stats.conn_yields++; | |
398 | + pthread_mutex_unlock(&c->thread->stats.mutex); | |
399 | + } | |
400 | if (c->rbytes > 0) { | |
401 | /* We have already read in data into the input buffer, | |
402 | so libevent will most likely not signal read events | |
403 | @@ -3545,9 +3652,11 @@ | |
404 | /* now try reading from the socket */ | |
405 | res = read(c->sfd, c->ritem, c->rlbytes); | |
406 | if (res > 0) { | |
407 | - pthread_mutex_lock(&c->thread->stats.mutex); | |
408 | - c->thread->stats.bytes_read += res; | |
409 | - pthread_mutex_unlock(&c->thread->stats.mutex); | |
410 | + if (c->thread) { | |
411 | + pthread_mutex_lock(&c->thread->stats.mutex); | |
412 | + c->thread->stats.bytes_read += res; | |
413 | + pthread_mutex_unlock(&c->thread->stats.mutex); | |
414 | + } | |
415 | if (c->rcurr == c->ritem) { | |
416 | c->rcurr += res; | |
417 | } | |
418 | @@ -3600,9 +3709,11 @@ | |
419 | /* now try reading from the socket */ | |
420 | res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize); | |
421 | if (res > 0) { | |
422 | - pthread_mutex_lock(&c->thread->stats.mutex); | |
423 | - c->thread->stats.bytes_read += res; | |
424 | - pthread_mutex_unlock(&c->thread->stats.mutex); | |
425 | + if (c->thread) { | |
426 | + pthread_mutex_lock(&c->thread->stats.mutex); | |
427 | + c->thread->stats.bytes_read += res; | |
428 | + pthread_mutex_unlock(&c->thread->stats.mutex); | |
429 | + } | |
430 | c->sbytes -= res; | |
431 | break; | |
432 | } | |
433 | @@ -3698,6 +3809,10 @@ | |
434 | case conn_closing: | |
435 | if (IS_UDP(c->transport)) | |
436 | conn_cleanup(c); | |
437 | +#ifdef USE_REPLICATION | |
438 | + else if(c == rep_conn) | |
439 | + replication_close(); | |
440 | +#endif /*USE_REPLICATION*/ | |
441 | else | |
442 | conn_close(c); | |
443 | stop = true; | |
444 | @@ -3706,6 +3821,70 @@ | |
445 | case conn_max_state: | |
446 | assert(false); | |
447 | break; | |
448 | + | |
449 | +#ifdef USE_REPLICATION | |
450 | + case conn_pipe_recv: | |
451 | + if(replication_pop()){ | |
452 | + replication_close(); | |
453 | + }else{ | |
454 | + replication_send(rep_conn); | |
455 | + } | |
456 | + stop = true; | |
457 | + break; | |
458 | + | |
459 | + case conn_rep_listen: | |
460 | + if (settings.verbose > 0) | |
461 | + fprintf(stderr,"replication: accept\n"); | |
462 | + addrlen = sizeof(addr); | |
463 | + res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); | |
464 | + if(res == -1){ | |
465 | + if(errno == EAGAIN || errno == EWOULDBLOCK) { | |
466 | + } else if (errno == EMFILE) { | |
467 | + fprintf(stderr, "replication: Too many opened connections\n"); | |
468 | + } else { | |
469 | + fprintf(stderr, "replication: accept error\n"); | |
470 | + } | |
471 | + }else{ | |
472 | + if(rep_conn){ | |
473 | + close(res); | |
474 | + fprintf(stderr,"replication: already connected\n"); | |
475 | + }else{ | |
476 | + if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){ | |
477 | + close(res); | |
478 | + fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n"); | |
479 | + }else{ | |
480 | + server_close_replication(); | |
481 | + rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); | |
482 | + rep_conn->item = NULL; | |
483 | + rep_conn->rbytes = 0; | |
484 | + rep_conn->rcurr = rep_conn->rbuf; | |
485 | + replication_connect(); | |
486 | + replication_marugoto(1); | |
487 | + replication_marugoto(0); | |
488 | + } | |
489 | + } | |
490 | + } | |
491 | + stop = true; | |
492 | + break; | |
493 | + | |
494 | + case conn_repconnect: | |
495 | + rep_conn = c; | |
496 | + replication_connect(); | |
497 | + conn_set_state(c, conn_read); | |
498 | + if (settings.verbose > 0) | |
499 | + fprintf(stderr,"replication: marugoto copying\n"); | |
500 | + if(!update_event(c, EV_READ | EV_PERSIST)){ | |
501 | + fprintf(stderr, "replication: Couldn't update event\n"); | |
502 | + conn_set_state(c, conn_closing); | |
503 | + } | |
504 | + stop = true; | |
505 | + break; | |
506 | + | |
507 | + case conn_pipe_send: | |
508 | + /* should not happen */ | |
509 | + fprintf(stderr, "replication: unexpected conn_pipe_send state\n"); | |
510 | + break; | |
511 | +#endif /* USE_REPLICATION */ | |
512 | } | |
513 | } | |
514 | ||
515 | @@ -4002,6 +4181,89 @@ | |
516 | return 0; | |
517 | } | |
518 | ||
519 | +#ifdef USE_REPLICATION | |
520 | +static int server_socket_replication(const int port) { | |
521 | + int sfd; | |
522 | + struct linger ling = {0, 0}; | |
523 | + struct addrinfo *ai; | |
524 | + struct addrinfo *next; | |
525 | + struct addrinfo hints; | |
526 | + char port_buf[NI_MAXSERV]; | |
527 | + int error; | |
528 | + int success = 0; | |
529 | + | |
530 | + int flags =1; | |
531 | + | |
532 | + memset(&hints, 0, sizeof (hints)); | |
533 | + hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG; | |
534 | + hints.ai_family = AF_UNSPEC; | |
535 | + hints.ai_protocol = IPPROTO_TCP; | |
536 | + hints.ai_socktype = SOCK_STREAM; | |
537 | + snprintf(port_buf, NI_MAXSERV, "%d", port); | |
538 | + error= getaddrinfo(settings.inter, port_buf, &hints, &ai); | |
539 | + if (error != 0) { | |
540 | + if (error != EAI_SYSTEM) | |
541 | + fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error)); | |
542 | + else | |
543 | + perror("getaddrinfo()"); | |
544 | + | |
545 | + return 1; | |
546 | + } | |
547 | + | |
548 | + for (next= ai; next; next= next->ai_next) { | |
549 | + conn *rep_serv_add; | |
550 | + if ((sfd = new_socket(next)) == -1) { | |
551 | + freeaddrinfo(ai); | |
552 | + return 1; | |
553 | + } | |
554 | + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); | |
555 | + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); | |
556 | + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); | |
557 | + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); | |
558 | + | |
559 | + if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) { | |
560 | + if (errno != EADDRINUSE) { | |
561 | + perror("bind()"); | |
562 | + close(sfd); | |
563 | + freeaddrinfo(ai); | |
564 | + return 1; | |
565 | + } | |
566 | + close(sfd); | |
567 | + continue; | |
568 | + } else { | |
569 | + success++; | |
570 | + if (listen(sfd, 1024) == -1) { | |
571 | + perror("listen()"); | |
572 | + close(sfd); | |
573 | + freeaddrinfo(ai); | |
574 | + return 1; | |
575 | + } | |
576 | + } | |
577 | + | |
578 | + if (!(rep_serv_add = conn_new(sfd, conn_rep_listen, | |
579 | + EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) { | |
580 | + fprintf(stderr, "failed to create replication server connection\n"); | |
581 | + exit(EXIT_FAILURE); | |
582 | + } | |
583 | + | |
584 | + rep_serv_add->next = rep_serv; | |
585 | + rep_serv = rep_serv_add; | |
586 | + } | |
587 | + | |
588 | + freeaddrinfo(ai); | |
589 | + | |
590 | + /* Return zero iff we detected no errors in starting up connections */ | |
591 | + return success == 0; | |
592 | +} | |
593 | + | |
594 | +static void server_close_replication(void) { | |
595 | + while(rep_serv){ | |
596 | + conn_close(rep_serv); | |
597 | + rep_serv = rep_serv->next; | |
598 | + } | |
599 | +} | |
600 | +#endif /* USE_REPLICATION */ | |
601 | + | |
602 | /* | |
603 | * We keep the current time of day in a global variable that's updated by a | |
604 | * timer event. This saves us a bunch of time() system calls (we really only | |
605 | @@ -4041,6 +4303,9 @@ | |
606 | ||
607 | static void usage(void) { | |
608 | printf(PACKAGE " " VERSION "\n"); | |
609 | +#ifdef USE_REPLICATION | |
610 | + printf("repcached %s\n",REPCACHED_VERSION); | |
611 | +#endif /* USE_REPLICATION */ | |
612 | printf("-p <num> TCP port number to listen on (default: 11211)\n" | |
613 | "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n" | |
614 | "-s <file> UNIX socket path to listen on (disables network support)\n" | |
615 | @@ -4088,6 +4353,10 @@ | |
616 | #ifdef ENABLE_SASL | |
617 | printf("-S Turn on Sasl authentication\n"); | |
618 | #endif | |
619 | +#ifdef USE_REPLICATION | |
620 | + printf("-x <ip_addr> hostname or IP address of peer repcached\n"); | |
621 | + printf("-X <num> TCP port number for replication (default: 11212)\n"); | |
622 | +#endif /* USE_REPLICATION */ | |
623 | return; | |
624 | } | |
625 | ||
626 | @@ -4194,6 +4463,26 @@ | |
627 | exit(EXIT_SUCCESS); | |
628 | } | |
629 | ||
630 | +#ifdef USE_REPLICATION | |
631 | +static void sig_handler_cb(int fd, short event, void *arg) | |
632 | +{ | |
633 | + struct event *signal = arg; | |
634 | + | |
635 | + if (settings.verbose) | |
636 | + fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal)); | |
637 | + | |
638 | + if (replication_exit()) { | |
639 | + exit(EXIT_FAILURE); | |
640 | + } | |
641 | + | |
642 | + pthread_mutex_lock(&replication_pipe_lock); | |
643 | + if (!rep_send) { | |
644 | + exit(EXIT_SUCCESS); | |
645 | + } | |
646 | + pthread_mutex_unlock(&replication_pipe_lock); | |
647 | +} | |
648 | +#endif /* USE_REPLICATION */ | |
649 | + | |
650 | #ifndef HAVE_SIGIGNORE | |
651 | static int sigignore(int sig) { | |
652 | struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 }; | |
653 | @@ -4249,6 +4538,57 @@ | |
654 | #endif | |
655 | } | |
656 | ||
657 | +static void create_listening_sockets(void) | |
658 | +{ | |
659 | + /* create unix mode sockets after dropping privileges */ | |
660 | + if (settings.socketpath != NULL) { | |
661 | + errno = 0; | |
662 | + if (server_socket_unix(settings.socketpath,settings.access)) { | |
663 | + vperror("failed to listen on UNIX socket: %s", settings.socketpath); | |
664 | + exit(EX_OSERR); | |
665 | + } | |
666 | + } | |
667 | + | |
668 | + /* create the listening socket, bind it, and init */ | |
669 | + if (settings.socketpath == NULL) { | |
670 | + const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); | |
671 | + char temp_portnumber_filename[PATH_MAX]; | |
672 | + FILE *portnumber_file = NULL; | |
673 | + | |
674 | + if (portnumber_filename != NULL) { | |
675 | + snprintf(temp_portnumber_filename, | |
676 | + sizeof(temp_portnumber_filename), | |
677 | + "%s.lck", portnumber_filename); | |
678 | + | |
679 | + portnumber_file = fopen(temp_portnumber_filename, "a"); | |
680 | + if (portnumber_file == NULL) { | |
681 | + fprintf(stderr, "Failed to open \"%s\": %s\n", | |
682 | + temp_portnumber_filename, strerror(errno)); | |
683 | + } | |
684 | + } | |
685 | + | |
686 | + errno = 0; | |
687 | + if (settings.port && server_socket(settings.port, tcp_transport, | |
688 | + portnumber_file)) { | |
689 | + vperror("failed to listen on TCP port %d", settings.port); | |
690 | + exit(EX_OSERR); | |
691 | + } | |
692 | + | |
693 | + /* create the UDP listening socket and bind it */ | |
694 | + errno = 0; | |
695 | + if (settings.udpport && server_socket(settings.udpport, udp_transport, | |
696 | + portnumber_file)) { | |
697 | + vperror("failed to listen on UDP port %d", settings.udpport); | |
698 | + exit(EX_OSERR); | |
699 | + } | |
700 | + | |
701 | + if (portnumber_file) { | |
702 | + fclose(portnumber_file); | |
703 | + rename(temp_portnumber_filename, portnumber_filename); | |
704 | + } | |
705 | + } | |
706 | +} | |
707 | + | |
708 | int main (int argc, char **argv) { | |
709 | int c; | |
710 | bool lock_memory = false; | |
711 | @@ -4261,6 +4601,11 @@ | |
712 | struct rlimit rlim; | |
713 | char unit = '\0'; | |
714 | int size_max = 0; | |
715 | +#ifdef USE_REPLICATION | |
716 | + struct in_addr addr; | |
717 | + struct addrinfo master_hint; | |
718 | + struct addrinfo *master_addr; | |
719 | +#endif /* USE_REPLICATION */ | |
720 | /* listening sockets */ | |
721 | static int *l_socket = NULL; | |
722 | ||
723 | @@ -4307,6 +4652,11 @@ | |
724 | "B:" /* Binding protocol */ | |
725 | "I:" /* Max item size */ | |
726 | "S" /* Sasl ON */ | |
727 | +#ifdef USE_REPLICATION | |
728 | + "X:" /* replication port */ | |
729 | + "x:" /* replication master */ | |
730 | + "q:" /* replication queue length */ | |
731 | +#endif /* USE_REPLICATION */ | |
732 | ))) { | |
733 | switch (c) { | |
734 | case 'a': | |
735 | @@ -4462,6 +4812,31 @@ | |
736 | ); | |
737 | } | |
738 | break; | |
739 | +#ifdef USE_REPLICATION | |
740 | + case 'x': | |
741 | + if (inet_pton(AF_INET, optarg, &addr) <= 0) { | |
742 | + memset(&master_hint, 0, sizeof(master_hint)); | |
743 | + master_hint.ai_flags = 0; | |
744 | + master_hint.ai_socktype = 0; | |
745 | + master_hint.ai_protocol = 0; | |
746 | + if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){ | |
747 | + settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr; | |
748 | + freeaddrinfo(master_addr); | |
749 | + }else{ | |
750 | + fprintf(stderr, "Illegal address: %s\n", optarg); | |
751 | + return 1; | |
752 | + } | |
753 | + } else { | |
754 | + settings.rep_addr = addr; | |
755 | + } | |
756 | + break; | |
757 | + case 'X': | |
758 | + settings.rep_port = atoi(optarg); | |
759 | + break; | |
760 | + case 'q': | |
761 | + settings.rep_qmax = atoi(optarg); | |
762 | + break; | |
763 | +#endif /* USE_REPLICATION */ | |
764 | case 'S': /* set Sasl authentication to true. Default is false */ | |
765 | #ifndef ENABLE_SASL | |
766 | fprintf(stderr, "This server is not built with SASL support.\n"); | |
767 | @@ -4587,6 +4962,17 @@ | |
768 | /* initialize main thread libevent instance */ | |
769 | main_base = event_init(); | |
770 | ||
771 | +#ifdef USE_REPLICATION | |
772 | + /* register events for SIGINT and SIGTERM to handle them in main thread */ | |
773 | + struct event signal_int, signal_term; | |
774 | + event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb, | |
775 | + &signal_int); | |
776 | + event_add(&signal_int, NULL); | |
777 | + event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb, | |
778 | + &signal_term); | |
779 | + event_add(&signal_term, NULL); | |
780 | +#endif | |
781 | + | |
782 | /* initialize other stuff */ | |
783 | stats_init(); | |
784 | assoc_init(); | |
785 | @@ -4615,63 +5001,21 @@ | |
786 | /* initialise clock event */ | |
787 | clock_handler(0, 0, 0); | |
788 | ||
789 | - /* create unix mode sockets after dropping privileges */ | |
790 | - if (settings.socketpath != NULL) { | |
791 | - errno = 0; | |
792 | - if (server_socket_unix(settings.socketpath,settings.access)) { | |
793 | - vperror("failed to listen on UNIX socket: %s", settings.socketpath); | |
794 | - exit(EX_OSERR); | |
795 | - } | |
796 | - } | |
797 | - | |
798 | - /* create the listening socket, bind it, and init */ | |
799 | - if (settings.socketpath == NULL) { | |
800 | - int udp_port; | |
801 | - | |
802 | - const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); | |
803 | - char temp_portnumber_filename[PATH_MAX]; | |
804 | - FILE *portnumber_file = NULL; | |
805 | - | |
806 | - if (portnumber_filename != NULL) { | |
807 | - snprintf(temp_portnumber_filename, | |
808 | - sizeof(temp_portnumber_filename), | |
809 | - "%s.lck", portnumber_filename); | |
810 | - | |
811 | - portnumber_file = fopen(temp_portnumber_filename, "a"); | |
812 | - if (portnumber_file == NULL) { | |
813 | - fprintf(stderr, "Failed to open \"%s\": %s\n", | |
814 | - temp_portnumber_filename, strerror(errno)); | |
815 | - } | |
816 | - } | |
817 | - | |
818 | - errno = 0; | |
819 | - if (settings.port && server_socket(settings.port, tcp_transport, | |
820 | - portnumber_file)) { | |
821 | - vperror("failed to listen on TCP port %d", settings.port); | |
822 | - exit(EX_OSERR); | |
823 | - } | |
824 | - | |
825 | - /* | |
826 | - * initialization order: first create the listening sockets | |
827 | - * (may need root on low ports), then drop root if needed, | |
828 | - * then daemonise if needed, then init libevent (in some cases | |
829 | - * descriptors created by libevent wouldn't survive forking). | |
830 | - */ | |
831 | - udp_port = settings.udpport ? settings.udpport : settings.port; | |
832 | - | |
833 | - /* create the UDP listening socket and bind it */ | |
834 | - errno = 0; | |
835 | - if (settings.udpport && server_socket(settings.udpport, udp_transport, | |
836 | - portnumber_file)) { | |
837 | - vperror("failed to listen on UDP port %d", settings.udpport); | |
838 | - exit(EX_OSERR); | |
839 | - } | |
840 | + /* | |
841 | + * initialization order: first create the listening sockets | |
842 | + * (may need root on low ports), then drop root if needed, | |
843 | + * then daemonise if needed, then init libevent (in some cases | |
844 | + * descriptors created by libevent wouldn't survive forking). | |
845 | + */ | |
846 | ||
847 | - if (portnumber_file) { | |
848 | - fclose(portnumber_file); | |
849 | - rename(temp_portnumber_filename, portnumber_filename); | |
850 | - } | |
851 | +#ifdef USE_REPLICATION | |
852 | + if(replication_init() == -1){ | |
853 | + fprintf(stderr, "faild to replication init\n"); | |
854 | + exit(EXIT_FAILURE); | |
855 | } | |
856 | +#else | |
857 | + create_listening_sockets(); | |
858 | +#endif | |
859 | ||
860 | /* Drop privileges no longer needed */ | |
861 | drop_privileges(); | |
862 | @@ -4694,3 +5038,401 @@ | |
863 | ||
864 | return EXIT_SUCCESS; | |
865 | } | |
866 | + | |
867 | +#ifdef USE_REPLICATION | |
868 | +static int replication_start(void) | |
869 | +{ | |
870 | + static int start = 0; | |
871 | + if(start) | |
872 | + return(0); | |
873 | + | |
874 | + create_listening_sockets(); | |
875 | + | |
876 | + start = 1; | |
877 | + return(0); | |
878 | +} | |
879 | + | |
880 | +static int replication_server_init(void) | |
881 | +{ | |
882 | + rep_recv = NULL; | |
883 | + rep_send = NULL; | |
884 | + rep_conn = NULL; | |
885 | + if(server_socket_replication(settings.rep_port)){ | |
886 | + fprintf(stderr, "replication: failed to initialize replication server socket\n"); | |
887 | + return(-1); | |
888 | + } | |
889 | + if (settings.verbose > 0) | |
890 | + fprintf(stderr, "replication: listen\n"); | |
891 | + return(replication_start()); | |
892 | +} | |
893 | + | |
894 | +static int replication_client_init(void) | |
895 | +{ | |
896 | + int s; | |
897 | + conn *c; | |
898 | + struct addrinfo ai; | |
899 | + struct sockaddr_in server; | |
900 | + | |
901 | + rep_recv = NULL; | |
902 | + rep_send = NULL; | |
903 | + rep_conn = NULL; | |
904 | + | |
905 | + memset(&ai,0,sizeof(ai)); | |
906 | + ai.ai_family = AF_INET; | |
907 | + ai.ai_socktype = SOCK_STREAM; | |
908 | + s = new_socket(&ai); | |
909 | + | |
910 | + if(s == -1) { | |
911 | + fprintf(stderr, "replication: failed to replication client socket\n"); | |
912 | + return(-1); | |
913 | + }else{ | |
914 | + /* connect */ | |
915 | + memset((char *)&server, 0, sizeof(server)); | |
916 | + server.sin_family = AF_INET; | |
917 | + server.sin_addr = settings.rep_addr; | |
918 | + server.sin_port = htons(settings.rep_port); | |
919 | + if (settings.verbose > 0) | |
920 | + fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port); | |
921 | + if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){ | |
922 | + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base); | |
923 | + if(c == NULL){ | |
924 | + fprintf(stderr, "replication: failed to create client conn"); | |
925 | + close(s); | |
926 | + return(-1); | |
927 | + } | |
928 | + drive_machine(c); | |
929 | + }else{ | |
930 | + if(errno == EINPROGRESS){ | |
931 | + c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base); | |
932 | + if(c == NULL){ | |
933 | + fprintf(stderr, "replication: failed to create client conn"); | |
934 | + close(s); | |
935 | + return(-1); | |
936 | + } | |
937 | + }else{ | |
938 | + fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port)); | |
939 | + close(s); | |
940 | + return(-1); | |
941 | + } | |
942 | + } | |
943 | + } | |
944 | + return(0); | |
945 | +} | |
946 | + | |
947 | +static int replication_init(void) | |
948 | +{ | |
949 | + if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){ | |
950 | + if(replication_client_init() != -1){ | |
951 | + return(0); | |
952 | + } | |
953 | + } | |
954 | + return(replication_server_init()); | |
955 | +} | |
956 | + | |
957 | +static int replication_connect(void) | |
958 | +{ | |
959 | + int f; | |
960 | + int p[2]; | |
961 | + | |
962 | + if(pipe(p) == -1){ | |
963 | + fprintf(stderr, "replication: can't create pipe\n"); | |
964 | + return(-1); | |
965 | + }else{ | |
966 | + if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) { | |
967 | + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n"); | |
968 | + return(-1); | |
969 | + } | |
970 | + if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) { | |
971 | + fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n"); | |
972 | + return(-1); | |
973 | + } | |
974 | + pthread_mutex_lock(&replication_pipe_lock); | |
975 | + rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); | |
976 | + rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base); | |
977 | + event_del(&rep_send->event); | |
978 | + pthread_mutex_unlock(&replication_pipe_lock); | |
979 | + } | |
980 | + return(0); | |
981 | +} | |
982 | + | |
983 | +static int replication_close(void) | |
984 | +{ | |
985 | + int c; | |
986 | + int r; | |
987 | + Q_ITEM *q; | |
988 | + | |
989 | + if(settings.verbose > 0) | |
990 | + fprintf(stderr,"replication: close\n"); | |
991 | + if(rep_recv){ | |
992 | + rep_recv->rbytes = sizeof(q); | |
993 | + rep_recv->rcurr = rep_recv->rbuf; | |
994 | + c = 0; | |
995 | + do{ | |
996 | + r = read(rep_recv->sfd, rep_recv->rcurr, rep_recv->rbytes); | |
997 | + if(r == -1){ | |
998 | + break; | |
999 | + } | |
1000 | + rep_recv->rbytes -= r; | |
1001 | + rep_recv->rcurr += r; | |
1002 | + if(!rep_recv->rbytes){ | |
1003 | + memcpy(&q, rep_recv->rbuf, sizeof(q)); | |
1004 | + rep_recv->rbytes = sizeof(q); | |
1005 | + rep_recv->rcurr = rep_recv->rbuf; | |
1006 | + qi_free(q); | |
1007 | + c++; | |
1008 | + } | |
1009 | + }while(r); | |
1010 | + conn_close(rep_recv); | |
1011 | + rep_recv = NULL; | |
1012 | + if (settings.verbose > 1) { | |
1013 | + fprintf(stderr, "replication: qitem free %d items\n", qi_free_list()); | |
1014 | + fprintf(stderr, "replication: close recv %d items\n", c); | |
1015 | + } | |
1016 | + } | |
1017 | + pthread_mutex_lock(&replication_pipe_lock); | |
1018 | + if(rep_send){ | |
1019 | + conn_close(rep_send); | |
1020 | + rep_send = NULL; | |
1021 | + if (settings.verbose > 1) | |
1022 | + fprintf(stderr,"replication: close send\n"); | |
1023 | + } | |
1024 | + pthread_mutex_unlock(&replication_pipe_lock); | |
1025 | + if(rep_conn){ | |
1026 | + conn_close(rep_conn); | |
1027 | + rep_conn = NULL; | |
1028 | + if (settings.verbose > 1) | |
1029 | + fprintf(stderr,"replication: close conn\n"); | |
1030 | + } | |
1031 | + if(!rep_exit) | |
1032 | + replication_server_init(); | |
1033 | + return(0); | |
1034 | +} | |
1035 | + | |
1036 | +static void replication_dispatch_close(void) | |
1037 | +{ | |
1038 | + if (settings.verbose > 1) | |
1039 | + fprintf(stderr, "replication: dispatch close\n"); | |
1040 | + pthread_mutex_lock(&replication_pipe_lock); | |
1041 | + if (rep_send) { | |
1042 | + conn_close(rep_send); | |
1043 | + rep_send = NULL; | |
1044 | + } | |
1045 | + pthread_mutex_unlock(&replication_pipe_lock); | |
1046 | +} | |
1047 | + | |
1048 | +static int replication_marugoto(int f) | |
1049 | +{ | |
1050 | + static int keysend = 0; | |
1051 | + static int keycount = 0; | |
1052 | + static char *keylist = NULL; | |
1053 | + static char *keyptr = NULL; | |
1054 | + | |
1055 | + if(f){ | |
1056 | + if(keylist){ | |
1057 | + free(keylist); | |
1058 | + keylist = NULL; | |
1059 | + keyptr = NULL; | |
1060 | + keycount = 0; | |
1061 | + keysend = 0; | |
1062 | + } | |
1063 | + pthread_mutex_lock(&cache_lock); | |
1064 | + keylist = (char *)assoc_key_snap((int *)&keycount); | |
1065 | + pthread_mutex_unlock(&cache_lock); | |
1066 | + keyptr = keylist; | |
1067 | + if (!keyptr){ | |
1068 | + replication_call_marugoto_end(); | |
1069 | + }else{ | |
1070 | + if (settings.verbose > 0) | |
1071 | + fprintf(stderr,"replication: marugoto start\n"); | |
1072 | + } | |
1073 | + }else{ | |
1074 | + if(keyptr){ | |
1075 | + while(*keyptr){ | |
1076 | + item *it = item_get(keyptr, strlen(keyptr)); | |
1077 | + if(it){ | |
1078 | + item_remove(it); | |
1079 | + if(replication_call_rep(keyptr, strlen(keyptr)) == -1){ | |
1080 | + return(-1); | |
1081 | + }else{ | |
1082 | + keysend++; | |
1083 | + keyptr += strlen(keyptr) + 1; | |
1084 | + return(0); | |
1085 | + } | |
1086 | + } | |
1087 | + keyptr += strlen(keyptr) + 1; | |
1088 | + } | |
1089 | + if(settings.verbose > 0) | |
1090 | + fprintf(stderr,"replication: marugoto %d\n", keysend); | |
1091 | + replication_call_marugoto_end(); | |
1092 | + if(settings.verbose > 0) | |
1093 | + fprintf(stderr,"replication: marugoto owari\n"); | |
1094 | + free(keylist); | |
1095 | + keylist = NULL; | |
1096 | + keyptr = NULL; | |
1097 | + keycount = 0; | |
1098 | + keysend = 0; | |
1099 | + } | |
1100 | + } | |
1101 | + return(0); | |
1102 | +} | |
1103 | + | |
1104 | +static int replication_send(conn *c) | |
1105 | +{ | |
1106 | + while(c->wbytes){ | |
1107 | + int w = write(c->sfd, c->wcurr, c->wbytes); | |
1108 | + if(w == -1){ | |
1109 | + if(errno == EAGAIN || errno == EINTR){ | |
1110 | + }else{ | |
1111 | + fprintf(stderr,"replication: send error\n"); | |
1112 | + replication_close(); | |
1113 | + break; | |
1114 | + } | |
1115 | + }else{ | |
1116 | + c->wbytes -= w; | |
1117 | + c->wcurr += w; | |
1118 | + } | |
1119 | + } | |
1120 | + return(c->wbytes); | |
1121 | +} | |
1122 | + | |
1123 | +static int replication_pop(void) | |
1124 | +{ | |
1125 | + int r; | |
1126 | + int c; | |
1127 | + int m; | |
1128 | + Q_ITEM **q; | |
1129 | + | |
1130 | + if(settings.verbose > 1) | |
1131 | + fprintf(stderr, "replication: pop\n"); | |
1132 | + | |
1133 | + if(!rep_recv) | |
1134 | + return(0); | |
1135 | + | |
1136 | + r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize); | |
1137 | + if(r == -1){ | |
1138 | + if(errno == EAGAIN || errno == EINTR){ | |
1139 | + }else{ | |
1140 | + fprintf(stderr,"replication: pop error %d\n", errno); | |
1141 | + return(-1); | |
1142 | + } | |
1143 | + }if(r == 0){ | |
1144 | + /* other end closed, trigger replication_close() */ | |
1145 | + return(-1); | |
1146 | + }else{ | |
1147 | + c = r / sizeof(Q_ITEM *); | |
1148 | + m = r % sizeof(Q_ITEM *); | |
1149 | + q = (Q_ITEM **)(rep_recv->rbuf); | |
1150 | + while(c--){ | |
1151 | + if(q[c]){ | |
1152 | + if(rep_conn && replication_cmd(rep_conn, q[c])){ | |
1153 | + replication_item(q[c]); /* error retry */ | |
1154 | + }else{ | |
1155 | + qi_free(q[c]); | |
1156 | + } | |
1157 | + }else{ | |
1158 | + if(!rep_exit){ | |
1159 | + if (settings.verbose) | |
1160 | + fprintf(stderr,"replication: cleanup start\n"); | |
1161 | + rep_exit = 1; | |
1162 | + } | |
1163 | + } | |
1164 | + } | |
1165 | + } | |
1166 | + if(rep_exit){ | |
1167 | + if(rep_conn->wbytes){ | |
1168 | + /* retry */ | |
1169 | + if(replication_exit()){ | |
1170 | + replication_close(); | |
1171 | + fprintf(stderr,"replication: cleanup error\n"); | |
1172 | + exit(EXIT_FAILURE); | |
1173 | + } | |
1174 | + }else{ | |
1175 | + /* finish */ | |
1176 | + replication_close(); | |
1177 | + if (settings.verbose) | |
1178 | + fprintf(stderr,"replication: cleanup complete\n"); | |
1179 | + exit(EXIT_SUCCESS); | |
1180 | + } | |
1181 | + } | |
1182 | + replication_marugoto(0); | |
1183 | + return(0); | |
1184 | +} | |
1185 | + | |
1186 | +static int replication_push(void) | |
1187 | +{ | |
1188 | + int w; | |
1189 | + | |
1190 | + while(rep_send->wbytes){ | |
1191 | + w = write(rep_send->sfd, rep_send->wcurr, rep_send->wbytes); | |
1192 | + if(w == -1){ | |
1193 | + if(errno == EAGAIN || errno == EINTR){ | |
1194 | + fprintf(stderr,"replication: push EAGAIN or EINTR\n"); | |
1195 | + }else{ | |
1196 | + return(-1); | |
1197 | + } | |
1198 | + }else{ | |
1199 | + rep_send->wbytes -= w; | |
1200 | + rep_send->wcurr += w; | |
1201 | + } | |
1202 | + } | |
1203 | + rep_send->wcurr = rep_send->wbuf; | |
1204 | + return(0); | |
1205 | +} | |
1206 | + | |
1207 | +static int replication_exit(void) | |
1208 | +{ | |
1209 | + return(replication_item(NULL)); | |
1210 | +} | |
1211 | + | |
1212 | +static int replication_item(Q_ITEM *q) | |
1213 | +{ | |
1214 | + pthread_mutex_lock(&replication_pipe_lock); | |
1215 | + if (!rep_send) { | |
1216 | + pthread_mutex_unlock(&replication_pipe_lock); | |
1217 | + return 0; | |
1218 | + } | |
1219 | + if(rep_send->wcurr + rep_send->wbytes + sizeof(q) > rep_send->wbuf + rep_send->wsize){ | |
1220 | + fprintf(stderr,"replication: buffer over fllow\n"); | |
1221 | + if(q){ | |
1222 | + qi_free(q); | |
1223 | + } | |
1224 | + pthread_mutex_unlock(&replication_pipe_lock); | |
1225 | + replication_dispatch_close(); | |
1226 | + return(-1); | |
1227 | + } | |
1228 | + memcpy(rep_send->wcurr + rep_send->wbytes, &q, sizeof(q)); | |
1229 | + rep_send->wbytes += sizeof(q); | |
1230 | + if(replication_push()){ | |
1231 | + fprintf(stderr, "replication: push error\n"); | |
1232 | + if(q){ | |
1233 | + qi_free(q); | |
1234 | + } | |
1235 | + pthread_mutex_unlock(&replication_pipe_lock); | |
1236 | + replication_dispatch_close(); | |
1237 | + return(-1); | |
1238 | + } | |
1239 | + pthread_mutex_unlock(&replication_pipe_lock); | |
1240 | + return(0); | |
1241 | +} | |
1242 | + | |
1243 | +int replication(enum CMD_TYPE type, R_CMD *cmd) | |
1244 | +{ | |
1245 | + Q_ITEM *q; | |
1246 | + | |
1247 | + pthread_mutex_lock(&replication_pipe_lock); | |
1248 | + if (!rep_send) { | |
1249 | + pthread_mutex_unlock(&replication_pipe_lock); | |
1250 | + return 0; | |
1251 | + } | |
1252 | + pthread_mutex_unlock(&replication_pipe_lock); | |
1253 | + | |
1254 | + if((q = qi_new(type, cmd, false))) { | |
1255 | + replication_item(q); | |
1256 | + }else{ | |
1257 | + fprintf(stderr,"replication: can't create Q_ITEM\n"); | |
1258 | + replication_dispatch_close(); | |
1259 | + return(-1); | |
1260 | + } | |
1261 | + return(0); | |
1262 | +} | |
1263 | +#endif /* USE_REPLICATION */ | |
c4fcb0e4 ER |
1264 | --- memcached-1.4.5/memcached.h~ 2010-05-06 14:09:51.000000000 +0300 |
1265 | +++ memcached-1.4.5/memcached.h 2010-05-06 14:10:13.518051741 +0300 | |
894d11c1 ER |
1266 | @@ -144,7 +144,13 @@ |
1267 | conn_swallow, /**< swallowing unnecessary bytes w/o storing */ | |
1268 | conn_closing, /**< closing this connection */ | |
1269 | conn_mwrite, /**< writing out many items sequentially */ | |
1270 | - conn_max_state /**< Max state value (used for assertion) */ | |
1271 | +#ifdef USE_REPLICATION | |
1272 | + conn_repconnect, /**< replication connecting to master */ | |
1273 | + conn_rep_listen, /**< replication listening socket */ | |
1274 | + conn_pipe_recv, /**< replication command pipe recv */ | |
1275 | + conn_pipe_send, /**< replication command pipe send */ | |
1276 | +#endif /* USE_REPLICATION */ | |
1277 | + conn_max_state, /**< Max state value (used for assertion) */ | |
1278 | }; | |
1279 | ||
1280 | enum bin_substates { | |
c4fcb0e4 | 1281 | @@ -247,7 +247,9 @@ |
894d11c1 ER |
1282 | uint64_t get_misses; |
1283 | uint64_t evictions; | |
c4fcb0e4 | 1284 | uint64_t reclaimed; |
894d11c1 ER |
1285 | +#if 0 |
1286 | time_t started; /* when the process was started */ | |
1287 | +#endif | |
1288 | bool accepting_conns; /* whether we are currently accepting */ | |
1289 | uint64_t listen_disabled_num; | |
1290 | }; | |
1291 | @@ -274,6 +282,11 @@ | |
1292 | int backlog; | |
1293 | int item_size_max; /* Maximum item size, and upper end for slabs */ | |
1294 | bool sasl; /* SASL on/off */ | |
1295 | +#ifdef USE_REPLICATION | |
1296 | + struct in_addr rep_addr; /* replication addr */ | |
1297 | + int rep_port; /* replication port */ | |
1298 | + int rep_qmax; /* replication QITEM max */ | |
1299 | +#endif /*USE_REPLICATION*/ | |
1300 | }; | |
1301 | ||
1302 | extern struct stats stats; | |
1303 | @@ -286,6 +299,10 @@ | |
1304 | /* temp */ | |
1305 | #define ITEM_SLABBED 4 | |
1306 | ||
1307 | +#ifdef USE_REPLICATION | |
1308 | +#define ITEM_REPDATA 128 | |
1309 | +#endif /*USE_REPLICATION*/ | |
1310 | + | |
1311 | /** | |
1312 | * Structure for storing items within memcached. | |
1313 | */ | |
1314 | @@ -438,6 +455,10 @@ | |
1315 | #include "trace.h" | |
1316 | #include "hash.h" | |
1317 | #include "util.h" | |
1318 | + | |
1319 | +#ifdef USE_REPLICATION | |
1320 | +#include "replication.h" | |
1321 | +#endif /* USE_REPLICATION */ | |
1322 | ||
1323 | /* | |
1324 | * Functions such as the libevent-related calls that need to do cross-thread | |
894d11c1 ER |
1325 | --- memcached-1.4.4/replication.c Thu Jan 1 03:00:00 1970 |
1326 | +++ repcached-2.2-1.4.4/replication.c Wed Feb 10 18:40:48 2010 | |
1327 | @@ -0,0 +1,355 @@ | |
1328 | +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ | |
1329 | +/* | |
1330 | + * | |
1331 | + */ | |
1332 | +#include "memcached.h" | |
1333 | +#include "replication.h" | |
1334 | +#include <stdlib.h> | |
1335 | +#include <stdio.h> | |
1336 | +#include <unistd.h> | |
1337 | +#include <string.h> | |
1338 | +#include <errno.h> | |
1339 | + | |
1340 | +static Q_ITEM *q_freelist = NULL; | |
1341 | +static int q_itemcount = 0; | |
1342 | +static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER; | |
1343 | + | |
1344 | +int get_qi_count(void) | |
1345 | +{ | |
1346 | + int c; | |
1347 | + pthread_mutex_lock(&replication_queue_lock); | |
1348 | + c = q_itemcount; | |
1349 | + pthread_mutex_unlock(&replication_queue_lock); | |
1350 | + return(c); | |
1351 | +} | |
1352 | + | |
1353 | +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse) | |
1354 | +{ | |
1355 | + Q_ITEM *q = NULL; | |
1356 | + char *key = NULL; | |
1357 | + uint32_t keylen = 0; | |
1358 | + rel_time_t time = 0; | |
1359 | + | |
1360 | + pthread_mutex_lock(&replication_queue_lock); | |
1361 | + if(q_freelist){ | |
1362 | + q = q_freelist; | |
1363 | + q_freelist = q->next; | |
1364 | + } | |
1365 | + | |
1366 | + if(NULL == q){ | |
1367 | + if(reuse) { | |
1368 | + pthread_mutex_unlock(&replication_queue_lock); | |
1369 | + return(NULL); | |
1370 | + } | |
1371 | + if(q_itemcount >= settings.rep_qmax) { | |
1372 | + pthread_mutex_unlock(&replication_queue_lock); | |
1373 | + return(NULL); | |
1374 | + } | |
1375 | + q = malloc(sizeof(Q_ITEM)); | |
1376 | + if (NULL == q){ | |
1377 | + fprintf(stderr,"replication: qi_new out of memory\n"); | |
1378 | + pthread_mutex_unlock(&replication_queue_lock); | |
1379 | + return(NULL); | |
1380 | + } | |
1381 | + q_itemcount++; | |
1382 | + if (settings.verbose > 2) | |
1383 | + fprintf(stderr,"replication: alloc c=%d\n", q_itemcount); | |
1384 | + } | |
1385 | + | |
1386 | + pthread_mutex_unlock(&replication_queue_lock); | |
1387 | + | |
1388 | + switch (type) { | |
1389 | + case REPLICATION_REP: | |
1390 | + case REPLICATION_DEL: | |
1391 | + key = cmd->key; | |
1392 | + keylen = cmd->keylen; | |
1393 | + break; | |
1394 | + case REPLICATION_FLUSH_ALL: | |
1395 | + break; | |
1396 | + case REPLICATION_DEFER_FLUSH_ALL: | |
1397 | + time = cmd->time; | |
1398 | + break; | |
1399 | + case REPLICATION_MARUGOTO_END: | |
1400 | + break; | |
1401 | + default: | |
1402 | + fprintf(stderr,"replication: got unknown command: %d\n", type); | |
1403 | + return(NULL); | |
1404 | + } | |
1405 | + | |
1406 | + q->key = NULL; | |
1407 | + q->type = type; | |
1408 | + q->time = time; | |
1409 | + q->next = NULL; | |
1410 | + if (keylen) { | |
1411 | + q->key = malloc(keylen + 1); | |
1412 | + if(NULL == q->key){ | |
1413 | + qi_free(q); | |
1414 | + q = NULL; | |
1415 | + }else{ | |
1416 | + memcpy(q->key, key, keylen); | |
1417 | + *(q->key + keylen) = 0; | |
1418 | + } | |
1419 | + } | |
1420 | + | |
1421 | + return(q); | |
1422 | +} | |
1423 | + | |
1424 | +void qi_free(Q_ITEM *q) | |
1425 | +{ | |
1426 | + if(q){ | |
1427 | + if(q->key){ | |
1428 | + free(q->key); | |
1429 | + q->key = NULL; | |
1430 | + } | |
1431 | + pthread_mutex_lock(&replication_queue_lock); | |
1432 | + q->next = q_freelist; | |
1433 | + q_freelist = q; | |
1434 | + pthread_mutex_unlock(&replication_queue_lock); | |
1435 | + } | |
1436 | +} | |
1437 | + | |
1438 | +int qi_free_list() | |
1439 | +{ | |
1440 | + int c = 0; | |
1441 | + Q_ITEM *q = NULL; | |
1442 | + | |
1443 | + pthread_mutex_lock(&replication_queue_lock); | |
1444 | + while((q = q_freelist)){ | |
1445 | + q_itemcount--; | |
1446 | + c++; | |
1447 | + q_freelist = q->next; | |
1448 | + free(q); | |
1449 | + } | |
1450 | + pthread_mutex_unlock(&replication_queue_lock); | |
1451 | + return(c); | |
1452 | +} | |
1453 | + | |
1454 | +static int replication_get_num(char *p, int n) | |
1455 | +{ | |
1456 | + int l; | |
1457 | + char b[64]; | |
1458 | + if(p) | |
1459 | + l = sprintf(p, "%u", n); | |
1460 | + else | |
1461 | + l = sprintf(b, "%u", n); | |
1462 | + return(l); | |
1463 | +} | |
1464 | + | |
1465 | +int replication_call_rep(char *key, size_t keylen) | |
1466 | +{ | |
1467 | + R_CMD r; | |
1468 | + r.key = key; | |
1469 | + r.keylen = keylen; | |
1470 | + return(replication(REPLICATION_REP, &r)); | |
1471 | +} | |
1472 | + | |
1473 | +int replication_call_del(char *key, size_t keylen) | |
1474 | +{ | |
1475 | + R_CMD r; | |
1476 | + r.key = key; | |
1477 | + r.keylen = keylen; | |
1478 | + return(replication(REPLICATION_DEL, &r)); | |
1479 | +} | |
1480 | + | |
1481 | +int replication_call_flush_all() | |
1482 | +{ | |
1483 | + R_CMD r; | |
1484 | + r.key = NULL; | |
1485 | + return(replication(REPLICATION_FLUSH_ALL, &r)); | |
1486 | +} | |
1487 | + | |
1488 | +int replication_call_defer_flush_all(const rel_time_t time) | |
1489 | +{ | |
1490 | + R_CMD r; | |
1491 | + r.key = NULL; | |
1492 | + r.time = time; | |
1493 | + return(replication(REPLICATION_DEFER_FLUSH_ALL, &r)); | |
1494 | +} | |
1495 | + | |
1496 | +int replication_call_marugoto_end() | |
1497 | +{ | |
1498 | + R_CMD r; | |
1499 | + r.key = NULL; | |
1500 | + return(replication(REPLICATION_MARUGOTO_END, &r)); | |
1501 | +} | |
1502 | + | |
1503 | +static int replication_alloc(conn *c, int s) | |
1504 | +{ | |
1505 | + char *p; | |
1506 | + s += c->wbytes; | |
1507 | + if(c->wsize < s){ | |
1508 | + while(c->wsize < s) | |
1509 | + c->wsize += 4096; | |
1510 | + if((p = malloc(c->wsize))){ | |
1511 | + memcpy(p, c->wbuf, c->wbytes); | |
1512 | + free(c->wbuf); | |
1513 | + c->wbuf = p; | |
1514 | + }else{ | |
1515 | + return(-1); | |
1516 | + } | |
1517 | + } | |
1518 | + return(0); | |
1519 | +} | |
1520 | + | |
1521 | +static int replication_del(conn *c, char *k) | |
1522 | +{ | |
1523 | + int l = 0; | |
1524 | + char *s = "delete "; | |
1525 | + char *n = "\r\n"; | |
1526 | + char *p = NULL; | |
1527 | + | |
1528 | + l += strlen(s); | |
1529 | + l += strlen(k); | |
1530 | + l += strlen(n); | |
1531 | + if(replication_alloc(c,l) == -1){ | |
1532 | + fprintf(stderr, "replication: del malloc error\n"); | |
1533 | + return(-1); | |
1534 | + } | |
1535 | + p = c->wbuf + c->wbytes; | |
1536 | + memcpy(p, s, strlen(s)); | |
1537 | + p += strlen(s); | |
1538 | + memcpy(p, k, strlen(k)); | |
1539 | + p += strlen(k); | |
1540 | + memcpy(p, n, strlen(n)); | |
1541 | + p += strlen(n); | |
1542 | + c->wbytes = p - c->wbuf; | |
1543 | + c->wcurr = c->wbuf; | |
1544 | + return(0); | |
1545 | +} | |
1546 | + | |
1547 | +static int replication_rep(conn *c, item *it) | |
1548 | +{ | |
1549 | + int exp = 0; | |
1550 | + int len = 0; | |
1551 | + char *s = "rep "; | |
1552 | + char *n = "\r\n"; | |
1553 | + char *p = NULL; | |
1554 | + char flag[40]; | |
1555 | + | |
1556 | + if(it->exptime) | |
1557 | + exp = it->exptime + process_started; | |
1558 | + flag[0]=0; | |
1559 | + if((p=ITEM_suffix(it))){ | |
1560 | + int i; | |
1561 | + memcpy(flag, p, it->nsuffix - 2); | |
1562 | + flag[it->nsuffix - 2] = 0; | |
1563 | + for(i=0;i<strlen(flag);i++){ | |
1564 | + if(flag[i] > ' ') | |
1565 | + break; | |
1566 | + } | |
1567 | + memmove(flag,&flag[i],strlen(flag)-i); | |
1568 | + for(p=flag;*p>' ';p++); | |
1569 | + *p=0; | |
1570 | + } | |
1571 | + len += strlen(s); | |
1572 | + len += it->nkey; | |
1573 | + len += 1; | |
1574 | + len += strlen(flag); | |
1575 | + len += 1; | |
1576 | + len += replication_get_num(NULL, exp); | |
1577 | + len += 1; | |
1578 | + len += replication_get_num(NULL, it->nbytes - 2); | |
1579 | + len += 1; | |
1580 | + len += replication_get_num(NULL, ITEM_get_cas(it)); | |
1581 | + len += strlen(n); | |
1582 | + len += it->nbytes; | |
1583 | + len += strlen(n); | |
1584 | + if(replication_alloc(c,len) == -1){ | |
1585 | + fprintf(stderr, "replication: rep malloc error\n"); | |
1586 | + return(-1); | |
1587 | + } | |
1588 | + p = c->wbuf + c->wbytes; | |
1589 | + memcpy(p, s, strlen(s)); | |
1590 | + p += strlen(s); | |
1591 | + memcpy(p, ITEM_key(it), it->nkey); | |
1592 | + p += it->nkey; | |
1593 | + *(p++) = ' '; | |
1594 | + memcpy(p, flag, strlen(flag)); | |
1595 | + p += strlen(flag); | |
1596 | + *(p++) = ' '; | |
1597 | + p += replication_get_num(p, exp); | |
1598 | + *(p++) = ' '; | |
1599 | + p += replication_get_num(p, it->nbytes - 2); | |
1600 | + *(p++) = ' '; | |
1601 | + p += replication_get_num(p, ITEM_get_cas(it)); | |
1602 | + memcpy(p, n, strlen(n)); | |
1603 | + p += strlen(n); | |
1604 | + memcpy(p, ITEM_data(it), it->nbytes); | |
1605 | + p += it->nbytes; | |
1606 | + c->wbytes = p - c->wbuf; | |
1607 | + c->wcurr = c->wbuf; | |
1608 | + return(0); | |
1609 | +} | |
1610 | + | |
1611 | +static int replication_flush_all(conn *c, rel_time_t exp) | |
1612 | +{ | |
1613 | + char *s = "flush_all "; | |
1614 | + char *n = "\r\n"; | |
1615 | + char *p = NULL; | |
1616 | + | |
1617 | + int l = strlen(s) + strlen(n); | |
1618 | + if (exp > 0) | |
1619 | + l += replication_get_num(NULL, exp); | |
1620 | + if(replication_alloc(c,l) == -1){ | |
1621 | + fprintf(stderr, "replication: flush_all malloc error\n"); | |
1622 | + return(-1); | |
1623 | + } | |
1624 | + p = c->wbuf + c->wbytes; | |
1625 | + memcpy(p, s, strlen(s)); | |
1626 | + p += strlen(s); | |
1627 | + if (exp > 0) | |
1628 | + p += replication_get_num(p, exp); | |
1629 | + memcpy(p, n, strlen(n)); | |
1630 | + p += strlen(n); | |
1631 | + c->wbytes = p - c->wbuf; | |
1632 | + c->wcurr = c->wbuf; | |
1633 | + return(0); | |
1634 | +} | |
1635 | + | |
1636 | +static int replication_marugoto_end(conn *c) | |
1637 | +{ | |
1638 | + char *s = "marugoto_end"; | |
1639 | + char *n = "\r\n"; | |
1640 | + char *p = NULL; | |
1641 | + | |
1642 | + int l = strlen(s) + strlen(n); | |
1643 | + if(replication_alloc(c,l) == -1){ | |
1644 | + fprintf(stderr, "replication: marugoto_end malloc error\n"); | |
1645 | + return(-1); | |
1646 | + } | |
1647 | + p = c->wbuf + c->wbytes; | |
1648 | + memcpy(p, s, strlen(s)); | |
1649 | + p += strlen(s); | |
1650 | + memcpy(p, n, strlen(n)); | |
1651 | + p += strlen(n); | |
1652 | + c->wbytes = p - c->wbuf; | |
1653 | + c->wcurr = c->wbuf; | |
1654 | + return(0); | |
1655 | +} | |
1656 | + | |
1657 | +int replication_cmd(conn *c, Q_ITEM *q) | |
1658 | +{ | |
1659 | + item *it; | |
1660 | + int r; | |
1661 | + | |
1662 | + switch (q->type) { | |
1663 | + case REPLICATION_REP: | |
1664 | + it = item_get(q->key, strlen(q->key)); | |
1665 | + if (!it) | |
1666 | + return(replication_del(c, q->key)); | |
1667 | + r = replication_rep(c, it); | |
1668 | + item_remove(it); | |
1669 | + return r; | |
1670 | + case REPLICATION_DEL: | |
1671 | + return(replication_del(c, q->key)); | |
1672 | + case REPLICATION_FLUSH_ALL: | |
1673 | + return(replication_flush_all(c, 0)); | |
1674 | + case REPLICATION_DEFER_FLUSH_ALL: | |
1675 | + return(replication_flush_all(c, q->time)); | |
1676 | + case REPLICATION_MARUGOTO_END: | |
1677 | + return(replication_marugoto_end(c)); | |
1678 | + default: | |
1679 | + fprintf(stderr,"replication: got unknown command:%d\n", q->type); | |
1680 | + return(0); | |
1681 | + } | |
1682 | +} | |
894d11c1 ER |
1683 | --- memcached-1.4.4/replication.h Thu Jan 1 03:00:00 1970 |
1684 | +++ repcached-2.2-1.4.4/replication.h Wed Feb 10 18:40:31 2010 | |
1685 | @@ -0,0 +1,42 @@ | |
1686 | +#ifndef MEMCACHED_REPLICATION_H | |
1687 | +#define MEMCACHED_REPLICATION_H | |
1688 | +#define REPCACHED_VERSION "2.2" | |
1689 | +#include <netdb.h> | |
1690 | + | |
1691 | +enum CMD_TYPE { | |
1692 | + REPLICATION_REP, | |
1693 | + REPLICATION_DEL, | |
1694 | + REPLICATION_FLUSH_ALL, | |
1695 | + REPLICATION_DEFER_FLUSH_ALL, | |
1696 | + REPLICATION_MARUGOTO_END, | |
1697 | +}; | |
1698 | + | |
1699 | +typedef struct queue_item_t Q_ITEM; | |
1700 | +struct queue_item_t { | |
1701 | + enum CMD_TYPE type; | |
1702 | + char *key; | |
1703 | + rel_time_t time; | |
1704 | + Q_ITEM *next; | |
1705 | +}; | |
1706 | + | |
1707 | +typedef struct replication_cmd_t R_CMD; | |
1708 | +struct replication_cmd_t { | |
1709 | + char *key; | |
1710 | + int keylen; | |
1711 | + rel_time_t time; | |
1712 | +}; | |
1713 | + | |
1714 | +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool); | |
1715 | +void qi_free(Q_ITEM *); | |
1716 | +int qi_free_list(void); | |
1717 | +int replication_cmd(conn *, Q_ITEM *); | |
1718 | +int get_qi_count(void); | |
1719 | + | |
1720 | +int replication_call_rep(char *key, size_t keylen); | |
1721 | +int replication_call_del(char *key, size_t keylen); | |
1722 | +int replication_call_flush_all(void); | |
1723 | +int replication_call_defer_flush_all(const rel_time_t time); | |
1724 | +int replication_call_marugoto_end(void); | |
1725 | +int replication(enum CMD_TYPE type, R_CMD *cmd); | |
1726 | + | |
1727 | +#endif | |
c4fcb0e4 ER |
1728 | --- memcached-1.4.5/t/binary.t~ 2010-04-03 10:07:16.000000000 +0300 |
1729 | +++ memcached-1.4.5/t/binary.t 2010-05-06 14:13:25.718440750 +0300 | |
1730 | @@ -2,11 +2,13 @@ | |
894d11c1 ER |
1731 | |
1732 | use strict; | |
1733 | use warnings; | |
c4fcb0e4 | 1734 | -use Test::More tests => 3361; |
894d11c1 ER |
1735 | +use Test::More; |
1736 | use FindBin qw($Bin); | |
1737 | use lib "$Bin/lib"; | |
1738 | use MemcachedTest; | |
894d11c1 | 1739 | |
c4fcb0e4 ER |
1740 | +Test::More::plan(tests => 3361 + (support_replication() ? 36 : 0)); |
1741 | + | |
894d11c1 ER |
1742 | my $server = new_memcached(); |
1743 | ok($server, "started the server"); | |
c4fcb0e4 | 1744 | |
894d11c1 ER |
1745 | --- memcached-1.4.4/t/issue_67.t Sun Nov 1 01:44:09 2009 |
1746 | +++ repcached-2.2-1.4.4/t/issue_67.t Wed Feb 10 17:50:12 2010 | |
1747 | @@ -41,6 +41,10 @@ | |
1748 | my $exe = "$builddir/memcached-debug"; | |
1749 | croak("memcached binary doesn't exist. Haven't run 'make' ?\n") unless -e $exe; | |
1750 | ||
1751 | + if (support_replication()) { | |
1752 | + $args .= ' -X 0'; | |
1753 | + } | |
1754 | + | |
1755 | my $childpid = fork(); | |
1756 | ||
1757 | my $cmd = "$builddir/timedrun 10 $exe $args"; | |
894d11c1 ER |
1758 | --- memcached-1.4.4/t/lib/MemcachedTest.pm Fri Oct 30 04:24:52 2009 |
1759 | +++ repcached-2.2-1.4.4/t/lib/MemcachedTest.pm Wed Feb 10 17:53:34 2010 | |
1760 | @@ -13,7 +13,8 @@ | |
1761 | ||
1762 | ||
1763 | @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats | |
1764 | - supports_sasl free_port); | |
1765 | + supports_sasl free_port support_replication memcached_version | |
1766 | + version2num); | |
1767 | ||
1768 | sub sleep { | |
1769 | my $n = shift; | |
1770 | @@ -148,6 +149,23 @@ | |
1771 | return 0; | |
1772 | } | |
1773 | ||
1774 | +sub support_replication { | |
1775 | + my $output = `$builddir/memcached-debug -h`; | |
1776 | + return 1 if $output =~ /^-x <ip_addr>/m; | |
1777 | + return 0; | |
1778 | +} | |
1779 | + | |
1780 | +sub memcached_version { | |
1781 | + my $output = `$builddir/memcached-debug -h`; | |
1782 | + return $1 if $output =~ /^memcached (\d[\d\.]+)/; | |
1783 | + return 0; | |
1784 | +} | |
1785 | + | |
1786 | +sub version2num { | |
1787 | + my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/); | |
1788 | + return $major*100**2 + $minor*100 + $pl | |
1789 | +} | |
1790 | + | |
1791 | sub new_memcached { | |
1792 | my ($args, $passed_port) = @_; | |
1793 | my $port = $passed_port || free_port(); | |
1794 | @@ -171,6 +189,9 @@ | |
1795 | } | |
1796 | if ($< == 0) { | |
1797 | $args .= " -u root"; | |
1798 | + } | |
1799 | + if (support_replication() && $args !~ m/-X/) { | |
1800 | + $args .= ' -X 0'; | |
1801 | } | |
1802 | ||
1803 | my $childpid = fork(); | |
c4fcb0e4 ER |
1804 | --- memcached-1.4.5/t/stats.t~ 2010-04-03 10:07:16.000000000 +0300 |
1805 | +++ memcached-1.4.5/t/stats.t 2010-05-06 14:15:28.521352735 +0300 | |
1806 | @@ -57,7 +57,8 @@ | |
894d11c1 ER |
1807 | my $stats = mem_stats($sock); |
1808 | ||
1809 | # Test number of keys | |
c4fcb0e4 ER |
1810 | -is(scalar(keys(%$stats)), 38, "38 stats values"); |
1811 | +my $keys = 38 + (support_replication() ? 3 : 0); | |
1812 | +is(scalar(keys(%$stats)), $keys, "$keys stats values"); | |
894d11c1 ER |
1813 | |
1814 | # Test initial state | |
1815 | foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses | |
894d11c1 ER |
1816 | --- memcached-1.4.4/testapp.c Wed Nov 25 03:40:29 2009 |
1817 | +++ repcached-2.2-1.4.4/testapp.c Wed Feb 10 17:52:05 2010 | |
1818 | @@ -300,6 +300,10 @@ | |
1819 | argv[arg++] = "-1"; | |
1820 | argv[arg++] = "-U"; | |
1821 | argv[arg++] = "0"; | |
1822 | +#ifdef USE_REPLICATION | |
1823 | + argv[arg++] = "-X"; | |
1824 | + argv[arg++] = "0"; | |
1825 | +#endif | |
1826 | /* Handle rpmbuild and the like doing this as root */ | |
1827 | if (getuid() == 0) { | |
1828 | argv[arg++] = "-u"; |