]> git.pld-linux.org Git - packages/memcached.git/blob - repcached.patch
up to 1.4.23
[packages/memcached.git] / repcached.patch
1 --- memcached-1.4.4/Makefile.am Fri Oct 30 04:24:52 2009
2 +++ repcached-2.2-1.4.4/Makefile.am     Tue Feb  9 23:02:45 2010
3 @@ -31,6 +31,10 @@
4  memcached_SOURCES += sasl_defs.c
5  endif
6  
7 +if ENABLE_REPLICATION
8 +memcached_SOURCES += replication.h replication.c
9 +endif
10 +
11  memcached_debug_SOURCES = $(memcached_SOURCES)
12  memcached_CPPFLAGS = -DNDEBUG
13  memcached_debug_LDADD = @PROFILER_LDFLAGS@
14 --- memcached-1.4.4/assoc.c     Sat Oct 24 00:38:01 2009
15 +++ repcached-2.2-1.4.4/assoc.c Tue Feb  9 23:02:45 2010
16 @@ -258,3 +258,51 @@
17  }
18  
19  
20 +#ifdef USE_REPLICATION
21 +char *assoc_key_snap(int *n)
22 +{
23 +    char *p = NULL;
24 +    char *b = NULL;
25 +    item *i = NULL;
26 +    int  co = 0;
27 +    int  sz = 1;
28 +    int  hs = 0;
29 +    int  hm = hashsize(hashpower);
30 +
31 +    hs = hm;
32 +    while(hs--){
33 +        if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
34 +            i = old_hashtable[hs];
35 +        }else{
36 +            i = primary_hashtable[hs];
37 +        }
38 +        while(i){
39 +            sz += i->nkey + 1;
40 +            co++;
41 +            i = i->h_next;
42 +        }
43 +    }
44 +
45 +    if(co){
46 +        if((p = b = malloc(sz))){
47 +            hs = hm;
48 +            while(hs--){
49 +                if(expanding && hs < hashsize(hashpower - 1) && hs >= expand_bucket){
50 +                    i = old_hashtable[hs];
51 +                }else{
52 +                    i = primary_hashtable[hs];
53 +                }
54 +                while(i){
55 +                    memcpy(p, ITEM_key(i), i->nkey);
56 +                    p += i->nkey;
57 +                    *(p++) = 0;
58 +                    i = i->h_next;
59 +                }
60 +            }
61 +            *(p++) = 0;
62 +        }
63 +    }
64 +    if(n) *n = co;
65 +    return(b);
66 +}
67 +#endif /* USE_REPLICATION */
68 --- memcached-1.4.4/assoc.h     Sun Aug 30 03:00:58 2009
69 +++ repcached-2.2-1.4.4/assoc.h Tue Feb  9 23:02:45 2010
70 @@ -7,3 +7,6 @@
71  int start_assoc_maintenance_thread(void);
72  void stop_assoc_maintenance_thread(void);
73  
74 +#ifdef USE_REPLICATION
75 +char *assoc_key_snap(int *n);
76 +#endif /*USE_REPLICATION*/
77 --- memcached-1.4.4/config.h.in Fri Nov 27 09:34:56 2009
78 +++ repcached-2.2-1.4.4/config.h.in     Wed Feb 10 19:12:46 2010
79 @@ -99,6 +99,9 @@
80  /* Define to 1 if you have the ANSI C header files. */
81  #undef STDC_HEADERS
82  
83 +/* Define this if you want to use replication */
84 +#undef USE_REPLICATION
85 +
86  /* Version number of package */
87  #undef VERSION
88  
89 --- memcached-1.4.4/configure.ac        Wed Nov 25 03:40:29 2009
90 +++ repcached-2.2-1.4.4/configure.ac    Tue Feb  9 23:02:45 2010
91 @@ -382,6 +382,18 @@
92    AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
93  fi
94  
95 +dnl Check whether the user wants replication or not
96 +AC_ARG_ENABLE(replication,
97 +  [AS_HELP_STRING([--enable-replication],[support replication])],
98 +  [if test "x$enable_threads" = "xyes"; then
99 +     AC_MSG_ERROR([Can't enable threads and replication together.])
100 +   else
101 +     AC_DEFINE([USE_REPLICATION],,[Define this if you want to use replication])
102 +   fi
103 +  ])
104 +
105 +AM_CONDITIONAL(ENABLE_REPLICATION, test "x$enable_replication" = "xyes")
106 +
107  AC_CHECK_FUNCS(mlockall)
108  AC_CHECK_FUNCS(getpagesizes)
109  AC_CHECK_FUNCS(memcntl)
110 --- memcached-1.4.4/items.c     Sat Oct 24 00:38:01 2009
111 +++ repcached-2.2-1.4.4/items.c Tue Feb  9 23:02:45 2010
112 @@ -155,6 +155,9 @@
113                      STATS_LOCK();
114                      stats.evictions++;
115                      STATS_UNLOCK();
116 +#ifdef USE_REPLICATION
117 +                    replication_call_del(ITEM_key(search), search->nkey);
118 +#endif /* USE_REPLICATION */
119                  }
120                  do_item_unlink(search);
121                  break;
122 @@ -288,8 +291,14 @@
123      stats.total_items += 1;
124      STATS_UNLOCK();
125  
126 +#ifdef USE_REPLICATION
127 +    /* Allocate a new CAS ID on link. */
128 +    if(!(it->it_flags & ITEM_REPDATA))
129 +        ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
130 +#else
131      /* Allocate a new CAS ID on link. */
132      ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
133 +#endif /* USE_REPLICATION */
134  
135      item_link_q(it);
136  
137 --- memcached-1.4.4/memcached.c Fri Nov 27 08:45:13 2009
138 +++ repcached-2.2-1.4.4/memcached.c     Wed Feb 10 16:08:37 2010
139 @@ -102,6 +102,30 @@
140  
141  static void conn_free(conn *c);
142  
143 +#ifdef USE_REPLICATION
144 +static int   rep_exit = 0;
145 +static conn *rep_recv = NULL;
146 +static conn *rep_send = NULL;
147 +static conn *rep_conn = NULL;
148 +static conn *rep_serv = NULL;
149 +static int  server_socket_replication(const int);
150 +static void server_close_replication(void);
151 +static int  replication_init(void);
152 +static int  replication_server_init(void);
153 +static int  replication_client_init(void);
154 +static int  replication_start(void);
155 +static int  replication_connect(void);
156 +static int  replication_close(void);
157 +static void replication_dispatch_close(void);
158 +static int  replication_marugoto(int);
159 +static int  replication_send(conn *);
160 +static int  replication_pop(void);
161 +static int  replication_push(void);
162 +static int  replication_exit(void);
163 +static int  replication_item(Q_ITEM *);
164 +static pthread_mutex_t replication_pipe_lock = PTHREAD_MUTEX_INITIALIZER;
165 +#endif /* USE_REPLICATION */
166 +
167  /** exported globals **/
168  struct stats stats;
169  struct settings settings;
170 @@ -194,6 +218,11 @@
171      settings.backlog = 1024;
172      settings.binding_protocol = negotiating_prot;
173      settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
174 +#ifdef USE_REPLICATION
175 +    settings.rep_addr.s_addr = htonl(INADDR_ANY);
176 +    settings.rep_port = 11212;
177 +    settings.rep_qmax = 8192;
178 +#endif /* USE_REPLICATION */
179  }
180  
181  /*
182 @@ -382,6 +411,10 @@
183                  prot_text(c->protocol));
184          } else if (IS_UDP(transport)) {
185              fprintf(stderr, "<%d server listening (udp)\n", sfd);
186 +#ifdef USE_REPLICATION
187 +        } else if (init_state == conn_rep_listen) {
188 +            fprintf(stderr, "<%d server listening (replication)\n", sfd);
189 +#endif /* USE_REPLICATION */
190          } else if (c->protocol == negotiating_prot) {
191              fprintf(stderr, "<%d new auto-negotiating client connection\n",
192                      sfd);
193 @@ -593,7 +626,11 @@
194                                         "conn_nread",
195                                         "conn_swallow",
196                                         "conn_closing",
197 -                                       "conn_mwrite" };
198 +                                       "conn_mwrite",
199 +                                       "conn_repconnect",
200 +                                       "conn_rep_listen",
201 +                                       "conn_pipe_recv",
202 +                                       "conn_pipe_send" };
203      return statenames[state];
204  }
205  
206 @@ -752,6 +789,14 @@
207  
208      assert(c != NULL);
209  
210 +#ifdef USE_REPLICATION
211 +    if (c == rep_conn){
212 +        if (settings.verbose > 1)
213 +            fprintf(stderr, "REP>%d %s\n", c->sfd, str);
214 +        conn_set_state(c, conn_new_cmd);
215 +        return;
216 +    }
217 +#endif /* USE_REPLICATION */
218      if (c->noreply) {
219          if (settings.verbose > 1)
220              fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
221 @@ -791,9 +836,11 @@
222      int comm = c->cmd;
223      enum store_item_type ret;
224  
225 -    pthread_mutex_lock(&c->thread->stats.mutex);
226 -    c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
227 -    pthread_mutex_unlock(&c->thread->stats.mutex);
228 +    if (c->thread) {
229 +        pthread_mutex_lock(&c->thread->stats.mutex);
230 +        c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
231 +        pthread_mutex_unlock(&c->thread->stats.mutex);
232 +    }
233  
234      if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
235          out_string(c, "CLIENT_ERROR bad data chunk");
236 @@ -832,6 +879,11 @@
237  
238        switch (ret) {
239        case STORED:
240 +#ifdef USE_REPLICATION
241 +          if( c != rep_conn ){
242 +            replication_call_rep(ITEM_key(it), it->nkey);
243 +          }
244 +#endif /* USE_REPLICATION */
245            out_string(c, "STORED");
246            break;
247        case EXISTS:
248 @@ -2410,6 +2462,11 @@
249      APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
250      APPEND_STAT("threads", "%d", settings.num_threads);
251      APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
252 +#ifdef USE_REPLICATION
253 +    APPEND_STAT("replication", "MASTER", 0);
254 +    APPEND_STAT("repcached_version", "%s", REPCACHED_VERSION);
255 +    APPEND_STAT("repcached_qi_free", "%u", settings.rep_qmax - get_qi_count());
256 +#endif /*USE_REPLICATION*/
257      STATS_UNLOCK();
258  }
259  
260 @@ -2797,6 +2854,11 @@
261      switch(add_delta(c, it, incr, delta, temp)) {
262      case OK:
263          out_string(c, temp);
264 +#ifdef USE_REPLICATION
265 +        if( c != rep_conn){
266 +            replication_call_rep(ITEM_key(it), it->nkey);
267 +        }
268 +#endif /* USE_REPLICATION */
269          break;
270      case NON_NUMERIC:
271          out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
272 @@ -2911,17 +2973,25 @@
273      if (it) {
274          MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
275  
276 -        pthread_mutex_lock(&c->thread->stats.mutex);
277 -        c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
278 -        pthread_mutex_unlock(&c->thread->stats.mutex);
279 +        if (c->thread) {
280 +            pthread_mutex_lock(&c->thread->stats.mutex);
281 +            c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
282 +            pthread_mutex_unlock(&c->thread->stats.mutex);
283 +        }
284  
285          item_unlink(it);
286          item_remove(it);      /* release our reference */
287 +#ifdef USE_REPLICATION
288 +        if( c != rep_conn )
289 +            replication_call_del(key, nkey);
290 +#endif /* USE_REPLICATION */
291          out_string(c, "DELETED");
292      } else {
293 -        pthread_mutex_lock(&c->thread->stats.mutex);
294 -        c->thread->stats.delete_misses++;
295 -        pthread_mutex_unlock(&c->thread->stats.mutex);
296 +        if (c->thread) {
297 +            pthread_mutex_lock(&c->thread->stats.mutex);
298 +            c->thread->stats.delete_misses++;
299 +            pthread_mutex_unlock(&c->thread->stats.mutex);
300 +        }
301  
302          out_string(c, "NOT_FOUND");
303      }
304 @@ -2986,6 +3056,22 @@
305  
306          process_update_command(c, tokens, ntokens, comm, true);
307  
308 +#ifdef USE_REPLICATION
309 +    } else if ((ntokens == 7) && (strcmp(tokens[COMMAND_TOKEN].value, "rep") == 0 && (comm = NREAD_SET)) && (c == rep_conn)) {
310 +
311 +        process_update_command(c, tokens, ntokens, comm, true);
312 +        if(c->item)
313 +            ((item *)(c->item))->it_flags |= ITEM_REPDATA;
314 +
315 +    } else if ((ntokens == 2) && (strcmp(tokens[COMMAND_TOKEN].value, "marugoto_end") == 0) && (c == rep_conn)) {
316 +        if(replication_start() == -1)
317 +            exit(EXIT_FAILURE);
318 +        if (settings.verbose > 0)
319 +            fprintf(stderr,"replication: start\n");
320 +        out_string(c, "OK");
321 +        return;
322 +
323 +#endif /* USE_REPLICATION */
324      } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
325  
326          process_arithmetic_command(c, tokens, ntokens, 1);
327 @@ -3012,11 +3098,17 @@
328  
329          set_noreply_maybe(c, tokens, ntokens);
330  
331 -        pthread_mutex_lock(&c->thread->stats.mutex);
332 -        c->thread->stats.flush_cmds++;
333 -        pthread_mutex_unlock(&c->thread->stats.mutex);
334 +        if (c->thread) {
335 +            pthread_mutex_lock(&c->thread->stats.mutex);
336 +            c->thread->stats.flush_cmds++;
337 +            pthread_mutex_unlock(&c->thread->stats.mutex);
338 +        }
339  
340          if(ntokens == (c->noreply ? 3 : 2)) {
341 +#ifdef USE_REPLICATION
342 +            if( c != rep_conn )
343 +                replication_call_flush_all();
344 +#endif
345              settings.oldest_live = current_time - 1;
346              item_flush_expired();
347              out_string(c, "OK");
348 @@ -3029,6 +3121,11 @@
349              return;
350          }
351  
352 +#ifdef USE_REPLICATION
353 +        if( c != rep_conn )
354 +            replication_call_defer_flush_all(realtime(exptime) + process_started);
355 +#endif
356 +        settings.oldest_live = realtime(exptime) - 1;
357          /*
358            If exptime is zero realtime() would return zero too, and
359            realtime(exptime) - 1 would overflow to the max unsigned
360 @@ -3275,9 +3372,11 @@
361          int avail = c->rsize - c->rbytes;
362          res = read(c->sfd, c->rbuf + c->rbytes, avail);
363          if (res > 0) {
364 -            pthread_mutex_lock(&c->thread->stats.mutex);
365 -            c->thread->stats.bytes_read += res;
366 -            pthread_mutex_unlock(&c->thread->stats.mutex);
367 +            if (c->thread) {
368 +                pthread_mutex_lock(&c->thread->stats.mutex);
369 +                c->thread->stats.bytes_read += res;
370 +                pthread_mutex_unlock(&c->thread->stats.mutex);
371 +            }
372              gotdata = READ_DATA_RECEIVED;
373              c->rbytes += res;
374              if (res == avail) {
375 @@ -3423,6 +3522,12 @@
376  
377      assert(c != NULL);
378  
379 +#ifdef USE_REPLICATION
380 +    if(rep_exit && (c->state != conn_pipe_recv)){
381 +        return;
382 +    }
383 +#endif /* USE_REPLICATION */
384 +
385      while (!stop) {
386  
387          switch(c->state) {
388 @@ -3502,9 +3607,11 @@
389              if (nreqs >= 0) {
390                  reset_cmd_handler(c);
391              } else {
392 -                pthread_mutex_lock(&c->thread->stats.mutex);
393 -                c->thread->stats.conn_yields++;
394 -                pthread_mutex_unlock(&c->thread->stats.mutex);
395 +                if (c->thread) {
396 +                    pthread_mutex_lock(&c->thread->stats.mutex);
397 +                    c->thread->stats.conn_yields++;
398 +                    pthread_mutex_unlock(&c->thread->stats.mutex);
399 +                }
400                  if (c->rbytes > 0) {
401                      /* We have already read in data into the input buffer,
402                         so libevent will most likely not signal read events
403 @@ -3545,9 +3652,11 @@
404              /*  now try reading from the socket */
405              res = read(c->sfd, c->ritem, c->rlbytes);
406              if (res > 0) {
407 -                pthread_mutex_lock(&c->thread->stats.mutex);
408 -                c->thread->stats.bytes_read += res;
409 -                pthread_mutex_unlock(&c->thread->stats.mutex);
410 +                if (c->thread) {
411 +                    pthread_mutex_lock(&c->thread->stats.mutex);
412 +                    c->thread->stats.bytes_read += res;
413 +                    pthread_mutex_unlock(&c->thread->stats.mutex);
414 +                }
415                  if (c->rcurr == c->ritem) {
416                      c->rcurr += res;
417                  }
418 @@ -3600,9 +3709,11 @@
419              /*  now try reading from the socket */
420              res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
421              if (res > 0) {
422 -                pthread_mutex_lock(&c->thread->stats.mutex);
423 -                c->thread->stats.bytes_read += res;
424 -                pthread_mutex_unlock(&c->thread->stats.mutex);
425 +                if (c->thread) {
426 +                    pthread_mutex_lock(&c->thread->stats.mutex);
427 +                    c->thread->stats.bytes_read += res;
428 +                    pthread_mutex_unlock(&c->thread->stats.mutex);
429 +                }
430                  c->sbytes -= res;
431                  break;
432              }
433 @@ -3698,6 +3809,10 @@
434          case conn_closing:
435              if (IS_UDP(c->transport))
436                  conn_cleanup(c);
437 +#ifdef USE_REPLICATION
438 +            else if(c == rep_conn)
439 +                replication_close();
440 +#endif /*USE_REPLICATION*/
441              else
442                  conn_close(c);
443              stop = true;
444 @@ -3706,6 +3821,70 @@
445          case conn_max_state:
446              assert(false);
447              break;
448 +
449 +#ifdef USE_REPLICATION
450 +        case conn_pipe_recv:
451 +            if(replication_pop()){
452 +                replication_close();
453 +            }else{
454 +                replication_send(rep_conn);
455 +            }
456 +            stop = true;
457 +            break;
458 +
459 +        case conn_rep_listen:
460 +            if (settings.verbose > 0)
461 +                fprintf(stderr,"replication: accept\n");
462 +            addrlen = sizeof(addr);
463 +            res = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
464 +            if(res == -1){
465 +                if(errno == EAGAIN || errno == EWOULDBLOCK) {
466 +                } else if (errno == EMFILE) {
467 +                    fprintf(stderr, "replication: Too many opened connections\n");
468 +                } else {
469 +                    fprintf(stderr, "replication: accept error\n");
470 +                }
471 +            }else{
472 +                if(rep_conn){
473 +                    close(res);
474 +                    fprintf(stderr,"replication: already connected\n");
475 +                }else{
476 +                    if((flags = fcntl(res, F_GETFL, 0)) < 0 || fcntl(res, F_SETFL, flags | O_NONBLOCK) < 0){
477 +                        close(res);
478 +                        fprintf(stderr, "replication: Can't Setting O_NONBLOCK\n");
479 +                    }else{
480 +                        server_close_replication();
481 +                        rep_conn = conn_new(res, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
482 +                        rep_conn->item   = NULL;
483 +                        rep_conn->rbytes = 0;
484 +                        rep_conn->rcurr  = rep_conn->rbuf;
485 +                        replication_connect();
486 +                        replication_marugoto(1);
487 +                        replication_marugoto(0);
488 +                    }
489 +                }
490 +            }
491 +            stop = true;
492 +            break;
493 +
494 +        case conn_repconnect:
495 +            rep_conn = c;
496 +            replication_connect();
497 +            conn_set_state(c, conn_read);
498 +            if (settings.verbose > 0)
499 +                fprintf(stderr,"replication: marugoto copying\n");
500 +            if(!update_event(c, EV_READ | EV_PERSIST)){
501 +                fprintf(stderr, "replication: Couldn't update event\n");
502 +                conn_set_state(c, conn_closing);
503 +            }
504 +            stop = true;
505 +            break;
506 +
507 +        case conn_pipe_send:
508 +            /* should not happen */
509 +            fprintf(stderr, "replication: unexpected conn_pipe_send state\n");
510 +            break;
511 +#endif /* USE_REPLICATION */
512          }
513      }
514  
515 @@ -4002,6 +4181,89 @@
516      return 0;
517  }
518  
519 +#ifdef USE_REPLICATION
520 +static int server_socket_replication(const int port) {
521 +    int sfd;
522 +    struct linger ling = {0, 0};
523 +    struct addrinfo *ai;
524 +    struct addrinfo *next;
525 +    struct addrinfo hints;
526 +    char port_buf[NI_MAXSERV];
527 +    int error;
528 +    int success = 0;
529 +
530 +    int flags =1;
531 +
532 +    memset(&hints, 0, sizeof (hints));
533 +    hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
534 +    hints.ai_family = AF_UNSPEC;
535 +    hints.ai_protocol = IPPROTO_TCP;
536 +    hints.ai_socktype = SOCK_STREAM;
537 +    snprintf(port_buf, NI_MAXSERV, "%d", port);
538 +    error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
539 +    if (error != 0) {
540 +      if (error != EAI_SYSTEM)
541 +        fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
542 +      else
543 +        perror("getaddrinfo()");
544 +
545 +      return 1;
546 +    }
547 +
548 +    for (next= ai; next; next= next->ai_next) {
549 +        conn *rep_serv_add;
550 +        if ((sfd = new_socket(next)) == -1) {
551 +            freeaddrinfo(ai);
552 +            return 1;
553 +        }
554 +        setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
555 +        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
556 +        setsockopt(sfd, SOL_SOCKET, SO_LINGER,    (void *)&ling,  sizeof(ling));
557 +        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
558 +
559 +        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
560 +            if (errno != EADDRINUSE) {
561 +                perror("bind()");
562 +                close(sfd);
563 +                freeaddrinfo(ai);
564 +                return 1;
565 +            }
566 +            close(sfd);
567 +            continue;
568 +        } else {
569 +            success++;
570 +            if (listen(sfd, 1024) == -1) {
571 +                perror("listen()");
572 +                close(sfd);
573 +                freeaddrinfo(ai);
574 +                return 1;
575 +            }
576 +        }
577 +
578 +        if (!(rep_serv_add = conn_new(sfd, conn_rep_listen,
579 +                                       EV_READ | EV_PERSIST, 1, tcp_transport, main_base))) {
580 +            fprintf(stderr, "failed to create replication server connection\n");
581 +            exit(EXIT_FAILURE);
582 +        }
583 +
584 +        rep_serv_add->next = rep_serv;
585 +        rep_serv = rep_serv_add;
586 +    }
587 +
588 +    freeaddrinfo(ai);
589 +
590 +    /* Return zero iff we detected no errors in starting up connections */
591 +    return success == 0;
592 +}
593 +
594 +static void server_close_replication(void) {
595 +  while(rep_serv){
596 +      conn_close(rep_serv);
597 +      rep_serv = rep_serv->next;
598 +  }
599 +}
600 +#endif /* USE_REPLICATION */
601 +
602  /*
603   * We keep the current time of day in a global variable that's updated by a
604   * timer event. This saves us a bunch of time() system calls (we really only
605 @@ -4041,6 +4303,9 @@
606  
607  static void usage(void) {
608      printf(PACKAGE " " VERSION "\n");
609 +#ifdef USE_REPLICATION
610 +    printf("repcached %s\n",REPCACHED_VERSION);
611 +#endif /* USE_REPLICATION */
612      printf("-p <num>      TCP port number to listen on (default: 11211)\n"
613             "-U <num>      UDP port number to listen on (default: 11211, 0 is off)\n"
614             "-s <file>     UNIX socket path to listen on (disables network support)\n"
615 @@ -4088,6 +4353,10 @@
616  #ifdef ENABLE_SASL
617      printf("-S            Turn on Sasl authentication\n");
618  #endif
619 +#ifdef USE_REPLICATION
620 +    printf("-x <ip_addr>  hostname or IP address of peer repcached\n");
621 +    printf("-X <num>      TCP port number for replication (default: 11212)\n");
622 +#endif /* USE_REPLICATION */
623      return;
624  }
625  
626 @@ -4194,6 +4463,26 @@
627      exit(EXIT_SUCCESS);
628  }
629  
630 +#ifdef USE_REPLICATION
631 +static void sig_handler_cb(int fd, short event, void *arg)
632 +{
633 +    struct event *signal = arg;
634 +
635 +    if (settings.verbose)
636 +        fprintf(stderr, "got signal %d\n", EVENT_SIGNAL(signal));
637 +
638 +    if (replication_exit()) {
639 +        exit(EXIT_FAILURE);
640 +    }
641 +
642 +    pthread_mutex_lock(&replication_pipe_lock);
643 +    if (!rep_send) {
644 +        exit(EXIT_SUCCESS);
645 +    }
646 +    pthread_mutex_unlock(&replication_pipe_lock);
647 +}
648 +#endif /* USE_REPLICATION */
649 +
650  #ifndef HAVE_SIGIGNORE
651  static int sigignore(int sig) {
652      struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
653 @@ -4249,6 +4538,57 @@
654  #endif
655  }
656  
657 +static void create_listening_sockets(void)
658 +{
659 +    /* create unix mode sockets after dropping privileges */
660 +    if (settings.socketpath != NULL) {
661 +        errno = 0;
662 +        if (server_socket_unix(settings.socketpath,settings.access)) {
663 +            vperror("failed to listen on UNIX socket: %s", settings.socketpath);
664 +            exit(EX_OSERR);
665 +        }
666 +    }
667 +
668 +    /* create the listening socket, bind it, and init */
669 +    if (settings.socketpath == NULL) {
670 +        const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
671 +        char temp_portnumber_filename[PATH_MAX];
672 +        FILE *portnumber_file = NULL;
673 +
674 +        if (portnumber_filename != NULL) {
675 +            snprintf(temp_portnumber_filename,
676 +                     sizeof(temp_portnumber_filename),
677 +                     "%s.lck", portnumber_filename);
678 +
679 +            portnumber_file = fopen(temp_portnumber_filename, "a");
680 +            if (portnumber_file == NULL) {
681 +                fprintf(stderr, "Failed to open \"%s\": %s\n",
682 +                        temp_portnumber_filename, strerror(errno));
683 +            }
684 +        }
685 +
686 +        errno = 0;
687 +        if (settings.port && server_socket(settings.port, tcp_transport,
688 +                                           portnumber_file)) {
689 +            vperror("failed to listen on TCP port %d", settings.port);
690 +            exit(EX_OSERR);
691 +        }
692 +
693 +        /* create the UDP listening socket and bind it */
694 +        errno = 0;
695 +        if (settings.udpport && server_socket(settings.udpport, udp_transport,
696 +                                              portnumber_file)) {
697 +            vperror("failed to listen on UDP port %d", settings.udpport);
698 +            exit(EX_OSERR);
699 +        }
700 +
701 +        if (portnumber_file) {
702 +            fclose(portnumber_file);
703 +            rename(temp_portnumber_filename, portnumber_filename);
704 +        }
705 +    }
706 +}
707 +
708  int main (int argc, char **argv) {
709      int c;
710      bool lock_memory = false;
711 @@ -4261,6 +4601,11 @@
712      struct rlimit rlim;
713      char unit = '\0';
714      int size_max = 0;
715 +#ifdef USE_REPLICATION
716 +    struct in_addr   addr;
717 +    struct addrinfo  master_hint;
718 +    struct addrinfo *master_addr;
719 +#endif /* USE_REPLICATION */
720      /* listening sockets */
721      static int *l_socket = NULL;
722  
723 @@ -4307,6 +4652,11 @@
724            "B:"  /* Binding protocol */
725            "I:"  /* Max item size */
726            "S"   /* Sasl ON */
727 +#ifdef USE_REPLICATION
728 +          "X:"  /* replication port */
729 +          "x:"  /* replication master */
730 +          "q:"  /* replication queue length */
731 +#endif /* USE_REPLICATION */
732          ))) {
733          switch (c) {
734          case 'a':
735 @@ -4462,6 +4812,31 @@
736                  );
737              }
738              break;
739 +#ifdef USE_REPLICATION
740 +        case 'x':
741 +            if (inet_pton(AF_INET, optarg, &addr) <= 0) {
742 +                memset(&master_hint, 0, sizeof(master_hint));
743 +                master_hint.ai_flags    = 0;
744 +                master_hint.ai_socktype = 0;
745 +                master_hint.ai_protocol = 0;
746 +                if(!getaddrinfo(optarg, NULL, &master_hint, &master_addr)){
747 +                    settings.rep_addr = ((struct sockaddr_in *)(master_addr->ai_addr)) -> sin_addr;
748 +                    freeaddrinfo(master_addr);
749 +                }else{
750 +                    fprintf(stderr, "Illegal address: %s\n", optarg);
751 +                    return 1;
752 +                }
753 +            } else {
754 +                settings.rep_addr = addr;
755 +            }
756 +            break;
757 +        case 'X':
758 +            settings.rep_port = atoi(optarg);
759 +            break;
760 +        case 'q':
761 +            settings.rep_qmax = atoi(optarg);
762 +            break;
763 +#endif /* USE_REPLICATION */
764          case 'S': /* set Sasl authentication to true. Default is false */
765  #ifndef ENABLE_SASL
766              fprintf(stderr, "This server is not built with SASL support.\n");
767 @@ -4587,6 +4962,17 @@
768      /* initialize main thread libevent instance */
769      main_base = event_init();
770  
771 +#ifdef USE_REPLICATION
772 +    /* register events for SIGINT and SIGTERM to handle them in main thread */
773 +    struct event signal_int, signal_term;
774 +    event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
775 +              &signal_int);
776 +    event_add(&signal_int, NULL);
777 +    event_set(&signal_term, SIGTERM, EV_SIGNAL|EV_PERSIST, sig_handler_cb,
778 +              &signal_term);
779 +    event_add(&signal_term, NULL);
780 +#endif
781 +
782      /* initialize other stuff */
783      stats_init();
784      assoc_init();
785 @@ -4615,63 +5001,21 @@
786      /* initialise clock event */
787      clock_handler(0, 0, 0);
788  
789 -    /* create unix mode sockets after dropping privileges */
790 -    if (settings.socketpath != NULL) {
791 -        errno = 0;
792 -        if (server_socket_unix(settings.socketpath,settings.access)) {
793 -            vperror("failed to listen on UNIX socket: %s", settings.socketpath);
794 -            exit(EX_OSERR);
795 -        }
796 -    }
797 -
798 -    /* create the listening socket, bind it, and init */
799 -    if (settings.socketpath == NULL) {
800 -        int udp_port;
801 -
802 -        const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
803 -        char temp_portnumber_filename[PATH_MAX];
804 -        FILE *portnumber_file = NULL;
805 -
806 -        if (portnumber_filename != NULL) {
807 -            snprintf(temp_portnumber_filename,
808 -                     sizeof(temp_portnumber_filename),
809 -                     "%s.lck", portnumber_filename);
810 -
811 -            portnumber_file = fopen(temp_portnumber_filename, "a");
812 -            if (portnumber_file == NULL) {
813 -                fprintf(stderr, "Failed to open \"%s\": %s\n",
814 -                        temp_portnumber_filename, strerror(errno));
815 -            }
816 -        }
817 -
818 -        errno = 0;
819 -        if (settings.port && server_socket(settings.port, tcp_transport,
820 -                                           portnumber_file)) {
821 -            vperror("failed to listen on TCP port %d", settings.port);
822 -            exit(EX_OSERR);
823 -        }
824 -
825 -        /*
826 -         * initialization order: first create the listening sockets
827 -         * (may need root on low ports), then drop root if needed,
828 -         * then daemonise if needed, then init libevent (in some cases
829 -         * descriptors created by libevent wouldn't survive forking).
830 -         */
831 -        udp_port = settings.udpport ? settings.udpport : settings.port;
832 -
833 -        /* create the UDP listening socket and bind it */
834 -        errno = 0;
835 -        if (settings.udpport && server_socket(settings.udpport, udp_transport,
836 -                                              portnumber_file)) {
837 -            vperror("failed to listen on UDP port %d", settings.udpport);
838 -            exit(EX_OSERR);
839 -        }
840 +    /*
841 +     * initialization order: first create the listening sockets
842 +     * (may need root on low ports), then drop root if needed,
843 +     * then daemonise if needed, then init libevent (in some cases
844 +     * descriptors created by libevent wouldn't survive forking).
845 +     */
846  
847 -        if (portnumber_file) {
848 -            fclose(portnumber_file);
849 -            rename(temp_portnumber_filename, portnumber_filename);
850 -        }
851 +#ifdef USE_REPLICATION
852 +    if(replication_init() == -1){
853 +        fprintf(stderr, "faild to replication init\n");
854 +        exit(EXIT_FAILURE);
855      }
856 +#else
857 +    create_listening_sockets();
858 +#endif
859  
860      /* Drop privileges no longer needed */
861      drop_privileges();
862 @@ -4694,3 +5038,401 @@
863  
864      return EXIT_SUCCESS;
865  }
866 +
867 +#ifdef USE_REPLICATION
868 +static int replication_start(void)
869 +{
870 +    static int start = 0;
871 +    if(start)
872 +        return(0);
873 +
874 +    create_listening_sockets();
875 +
876 +    start = 1;
877 +    return(0);
878 +}
879 +
880 +static int replication_server_init(void)
881 +{
882 +    rep_recv = NULL;
883 +    rep_send = NULL;
884 +    rep_conn = NULL;
885 +    if(server_socket_replication(settings.rep_port)){
886 +        fprintf(stderr, "replication: failed to initialize replication server socket\n");
887 +        return(-1);
888 +    }
889 +    if (settings.verbose > 0)
890 +        fprintf(stderr, "replication: listen\n");
891 +    return(replication_start());
892 +}
893 +
894 +static int replication_client_init(void)
895 +{
896 +    int s;
897 +    conn *c;
898 +    struct addrinfo    ai;
899 +    struct sockaddr_in server;
900 +
901 +    rep_recv  = NULL;
902 +    rep_send  = NULL;
903 +    rep_conn  = NULL;
904 +
905 +    memset(&ai,0,sizeof(ai));
906 +    ai.ai_family   = AF_INET;
907 +    ai.ai_socktype = SOCK_STREAM;
908 +    s = new_socket(&ai);
909 +
910 +    if(s == -1) {
911 +        fprintf(stderr, "replication: failed to replication client socket\n");
912 +        return(-1);
913 +    }else{
914 +        /* connect */
915 +        memset((char *)&server, 0, sizeof(server));
916 +        server.sin_family = AF_INET;
917 +        server.sin_addr   = settings.rep_addr;
918 +        server.sin_port   = htons(settings.rep_port);
919 +        if (settings.verbose > 0)
920 +            fprintf(stderr,"replication: connect (peer=%s:%d)\n", inet_ntoa(settings.rep_addr), settings.rep_port);
921 +        if(connect(s,(struct sockaddr *)&server, sizeof(server)) == 0){
922 +            c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
923 +            if(c == NULL){
924 +                fprintf(stderr, "replication: failed to create client conn");
925 +                close(s);
926 +                return(-1);
927 +            }
928 +            drive_machine(c);
929 +        }else{
930 +            if(errno == EINPROGRESS){
931 +                c = conn_new(s, conn_repconnect, EV_WRITE | EV_PERSIST, DATA_BUFFER_SIZE, false, main_base);
932 +                if(c == NULL){
933 +                    fprintf(stderr, "replication: failed to create client conn");
934 +                    close(s);
935 +                    return(-1);
936 +                }
937 +            }else{
938 +                fprintf(stdout,"replication: can't connect %s:%d\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port));
939 +                close(s);
940 +                return(-1);
941 +            }
942 +        }
943 +    }
944 +    return(0);
945 +}
946 +
947 +static int replication_init(void)
948 +{
949 +    if(settings.rep_addr.s_addr != htonl(INADDR_ANY)){
950 +        if(replication_client_init() != -1){
951 +            return(0);
952 +        }
953 +    }
954 +    return(replication_server_init());
955 +}
956 +
957 +static int replication_connect(void)
958 +{
959 +    int f;
960 +    int p[2];
961 +
962 +    if(pipe(p) == -1){
963 +        fprintf(stderr, "replication: can't create pipe\n");
964 +        return(-1);
965 +    }else{
966 +        if((f = fcntl(p[0], F_GETFL, 0)) < 0 || fcntl(p[0], F_SETFL, f | O_NONBLOCK) < 0) {
967 +            fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
968 +            return(-1);
969 +        }
970 +        if((f = fcntl(p[1], F_GETFL, 0)) < 0 || fcntl(p[1], F_SETFL, f | O_NONBLOCK) < 0) {
971 +            fprintf(stderr, "replication: can't setting O_NONBLOCK pipe[0]\n");
972 +            return(-1);
973 +        }
974 +        pthread_mutex_lock(&replication_pipe_lock);
975 +        rep_recv = conn_new(p[0], conn_pipe_recv, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
976 +        rep_send = conn_new(p[1], conn_pipe_send, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport, main_base);
977 +        event_del(&rep_send->event);
978 +        pthread_mutex_unlock(&replication_pipe_lock);
979 +    }
980 +    return(0);
981 +}
982 +
983 +static int replication_close(void)
984 +{
985 +    int     c;
986 +    int     r;
987 +    Q_ITEM *q;
988 +
989 +    if(settings.verbose > 0)
990 +        fprintf(stderr,"replication: close\n");
991 +    if(rep_recv){
992 +        rep_recv->rbytes = sizeof(q);
993 +        rep_recv->rcurr  = rep_recv->rbuf;
994 +        c = 0;
995 +        do{
996 +            r = read(rep_recv->sfd, rep_recv->rcurr, rep_recv->rbytes);
997 +            if(r == -1){
998 +                break;
999 +            }
1000 +            rep_recv->rbytes -= r;
1001 +            rep_recv->rcurr  += r;
1002 +            if(!rep_recv->rbytes){
1003 +                memcpy(&q, rep_recv->rbuf, sizeof(q));
1004 +                rep_recv->rbytes = sizeof(q);
1005 +                rep_recv->rcurr  = rep_recv->rbuf;
1006 +                qi_free(q);
1007 +                c++;
1008 +            }
1009 +        }while(r);
1010 +        conn_close(rep_recv);
1011 +        rep_recv = NULL;
1012 +        if (settings.verbose > 1) {
1013 +            fprintf(stderr, "replication: qitem free %d items\n", qi_free_list());
1014 +            fprintf(stderr, "replication: close recv %d items\n", c);
1015 +        }
1016 +    }
1017 +    pthread_mutex_lock(&replication_pipe_lock);
1018 +    if(rep_send){
1019 +        conn_close(rep_send);
1020 +        rep_send = NULL;
1021 +        if (settings.verbose > 1)
1022 +            fprintf(stderr,"replication: close send\n");
1023 +    }
1024 +    pthread_mutex_unlock(&replication_pipe_lock);
1025 +    if(rep_conn){
1026 +        conn_close(rep_conn);
1027 +        rep_conn = NULL;
1028 +        if (settings.verbose > 1)
1029 +            fprintf(stderr,"replication: close conn\n");
1030 +    }
1031 +    if(!rep_exit)
1032 +        replication_server_init();
1033 +    return(0);
1034 +}
1035 +
1036 +static void replication_dispatch_close(void)
1037 +{
1038 +    if (settings.verbose > 1)
1039 +        fprintf(stderr, "replication: dispatch close\n");
1040 +    pthread_mutex_lock(&replication_pipe_lock);
1041 +    if (rep_send) {
1042 +        conn_close(rep_send);
1043 +        rep_send = NULL;
1044 +    }
1045 +    pthread_mutex_unlock(&replication_pipe_lock);
1046 +}
1047 +
1048 +static int replication_marugoto(int f)
1049 +{
1050 +    static int   keysend  = 0;
1051 +    static int   keycount = 0;
1052 +    static char *keylist  = NULL;
1053 +    static char *keyptr   = NULL;
1054 +
1055 +    if(f){
1056 +        if(keylist){
1057 +            free(keylist);
1058 +            keylist  = NULL;
1059 +            keyptr   = NULL;
1060 +            keycount = 0;
1061 +            keysend  = 0;
1062 +        }
1063 +        pthread_mutex_lock(&cache_lock);
1064 +        keylist = (char *)assoc_key_snap((int *)&keycount);
1065 +        pthread_mutex_unlock(&cache_lock);
1066 +        keyptr  = keylist;
1067 +        if (!keyptr){
1068 +            replication_call_marugoto_end();
1069 +        }else{
1070 +        if (settings.verbose > 0)
1071 +            fprintf(stderr,"replication: marugoto start\n");
1072 +        }
1073 +    }else{
1074 +        if(keyptr){
1075 +            while(*keyptr){
1076 +                item *it = item_get(keyptr, strlen(keyptr));
1077 +                if(it){
1078 +                    item_remove(it);
1079 +                    if(replication_call_rep(keyptr, strlen(keyptr)) == -1){
1080 +                        return(-1);
1081 +                    }else{
1082 +                        keysend++;
1083 +                        keyptr += strlen(keyptr) + 1;
1084 +                        return(0);
1085 +                    }
1086 +                }
1087 +                keyptr += strlen(keyptr) + 1;
1088 +            }
1089 +            if(settings.verbose > 0)
1090 +                fprintf(stderr,"replication: marugoto %d\n", keysend);
1091 +            replication_call_marugoto_end();
1092 +            if(settings.verbose > 0)
1093 +                fprintf(stderr,"replication: marugoto owari\n");
1094 +            free(keylist);
1095 +            keylist  = NULL;
1096 +            keyptr   = NULL;
1097 +            keycount = 0;
1098 +            keysend  = 0;
1099 +        }
1100 +    }
1101 +    return(0);
1102 +}
1103 +
1104 +static int replication_send(conn *c)
1105 +{
1106 +    while(c->wbytes){
1107 +        int w = write(c->sfd, c->wcurr, c->wbytes);
1108 +        if(w == -1){
1109 +            if(errno == EAGAIN || errno == EINTR){
1110 +            }else{
1111 +                fprintf(stderr,"replication: send error\n");
1112 +                replication_close();
1113 +                break;
1114 +            }
1115 +        }else{
1116 +            c->wbytes -= w;
1117 +            c->wcurr  += w;
1118 +        }
1119 +    }
1120 +    return(c->wbytes);
1121 +}
1122 +
1123 +static int replication_pop(void)
1124 +{
1125 +    int      r;
1126 +    int      c;
1127 +    int      m;
1128 +    Q_ITEM **q;
1129 +
1130 +    if(settings.verbose > 1)
1131 +        fprintf(stderr, "replication: pop\n");
1132 +
1133 +    if(!rep_recv)
1134 +        return(0);
1135 +
1136 +    r = read(rep_recv->sfd, rep_recv->rbuf, rep_recv->rsize);
1137 +    if(r == -1){
1138 +        if(errno == EAGAIN || errno == EINTR){
1139 +        }else{
1140 +            fprintf(stderr,"replication: pop error %d\n", errno);
1141 +            return(-1);
1142 +        }
1143 +    }if(r == 0){
1144 +        /* other end closed, trigger replication_close() */
1145 +        return(-1);
1146 +    }else{
1147 +        c = r / sizeof(Q_ITEM *);
1148 +        m = r % sizeof(Q_ITEM *);
1149 +        q = (Q_ITEM **)(rep_recv->rbuf);
1150 +        while(c--){
1151 +            if(q[c]){
1152 +                if(rep_conn && replication_cmd(rep_conn, q[c])){
1153 +                    replication_item(q[c]); /* error retry */
1154 +                }else{
1155 +                    qi_free(q[c]);
1156 +                }
1157 +            }else{
1158 +                if(!rep_exit){
1159 +                    if (settings.verbose)
1160 +                        fprintf(stderr,"replication: cleanup start\n");
1161 +                    rep_exit = 1;
1162 +                }
1163 +            }
1164 +        }
1165 +    }
1166 +    if(rep_exit){
1167 +        if(rep_conn->wbytes){
1168 +            /* retry */
1169 +            if(replication_exit()){
1170 +                replication_close();
1171 +                fprintf(stderr,"replication: cleanup error\n");
1172 +                exit(EXIT_FAILURE);
1173 +            }
1174 +        }else{
1175 +            /* finish */
1176 +            replication_close();
1177 +            if (settings.verbose)
1178 +                fprintf(stderr,"replication: cleanup complete\n");
1179 +            exit(EXIT_SUCCESS);
1180 +        }
1181 +    }
1182 +    replication_marugoto(0);
1183 +    return(0);
1184 +}
1185 +
1186 +static int replication_push(void)
1187 +{
1188 +    int w;
1189 +
1190 +    while(rep_send->wbytes){
1191 +        w = write(rep_send->sfd, rep_send->wcurr, rep_send->wbytes);
1192 +        if(w == -1){
1193 +            if(errno == EAGAIN || errno == EINTR){
1194 +                fprintf(stderr,"replication: push EAGAIN or EINTR\n");
1195 +            }else{
1196 +                return(-1);
1197 +            }
1198 +        }else{
1199 +            rep_send->wbytes -= w;
1200 +            rep_send->wcurr  += w;
1201 +        }
1202 +    }
1203 +    rep_send->wcurr = rep_send->wbuf;
1204 +    return(0);
1205 +}
1206 +
1207 +static int replication_exit(void)
1208 +{
1209 +    return(replication_item(NULL));
1210 +}
1211 +
1212 +static int replication_item(Q_ITEM *q)
1213 +{
1214 +    pthread_mutex_lock(&replication_pipe_lock);
1215 +    if (!rep_send) {
1216 +        pthread_mutex_unlock(&replication_pipe_lock);
1217 +        return 0;
1218 +    }
1219 +    if(rep_send->wcurr + rep_send->wbytes + sizeof(q) > rep_send->wbuf + rep_send->wsize){
1220 +        fprintf(stderr,"replication: buffer over fllow\n");
1221 +        if(q){
1222 +            qi_free(q);
1223 +        }
1224 +        pthread_mutex_unlock(&replication_pipe_lock);
1225 +        replication_dispatch_close();
1226 +        return(-1);
1227 +    }
1228 +    memcpy(rep_send->wcurr + rep_send->wbytes, &q, sizeof(q));
1229 +    rep_send->wbytes += sizeof(q);
1230 +    if(replication_push()){
1231 +        fprintf(stderr, "replication: push error\n");
1232 +        if(q){
1233 +            qi_free(q);
1234 +        }
1235 +        pthread_mutex_unlock(&replication_pipe_lock);
1236 +        replication_dispatch_close();
1237 +        return(-1);
1238 +    }
1239 +    pthread_mutex_unlock(&replication_pipe_lock);
1240 +    return(0);
1241 +}
1242 +
1243 +int replication(enum CMD_TYPE type, R_CMD *cmd)
1244 +{
1245 +    Q_ITEM *q;
1246 +
1247 +    pthread_mutex_lock(&replication_pipe_lock);
1248 +    if (!rep_send) {
1249 +        pthread_mutex_unlock(&replication_pipe_lock);
1250 +        return 0;
1251 +    }
1252 +    pthread_mutex_unlock(&replication_pipe_lock);
1253 +
1254 +    if((q = qi_new(type, cmd, false))) {
1255 +        replication_item(q);
1256 +    }else{
1257 +        fprintf(stderr,"replication: can't create Q_ITEM\n");
1258 +        replication_dispatch_close();
1259 +        return(-1);
1260 +    }
1261 +    return(0);
1262 +}
1263 +#endif /* USE_REPLICATION */
1264 --- memcached-1.4.5/memcached.h~        2010-05-06 14:09:51.000000000 +0300
1265 +++ memcached-1.4.5/memcached.h 2010-05-06 14:10:13.518051741 +0300
1266 @@ -144,7 +144,13 @@
1267      conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
1268      conn_closing,    /**< closing this connection */
1269      conn_mwrite,     /**< writing out many items sequentially */
1270 -    conn_max_state   /**< Max state value (used for assertion) */
1271 +#ifdef USE_REPLICATION
1272 +    conn_repconnect, /**< replication connecting to master */
1273 +    conn_rep_listen, /**< replication listening socket */
1274 +    conn_pipe_recv,  /**< replication command pipe recv */
1275 +    conn_pipe_send,  /**< replication command pipe send */
1276 +#endif /* USE_REPLICATION */
1277 +    conn_max_state,  /**< Max state value (used for assertion) */
1278  };
1279  
1280  enum bin_substates {
1281 @@ -247,7 +247,9 @@
1282      uint64_t      get_misses;
1283      uint64_t      evictions;
1284      uint64_t      reclaimed;
1285 +#if 0
1286      time_t        started;          /* when the process was started */
1287 +#endif
1288      bool          accepting_conns;  /* whether we are currently accepting */
1289      uint64_t      listen_disabled_num;
1290  };
1291 @@ -274,6 +282,11 @@
1292      int backlog;
1293      int item_size_max;        /* Maximum item size, and upper end for slabs */
1294      bool sasl;              /* SASL on/off */
1295 +#ifdef USE_REPLICATION
1296 +    struct in_addr rep_addr;    /* replication addr */
1297 +    int rep_port;               /* replication port */
1298 +    int rep_qmax;               /* replication QITEM max */
1299 +#endif /*USE_REPLICATION*/
1300  };
1301  
1302  extern struct stats stats;
1303 @@ -286,6 +299,10 @@
1304  /* temp */
1305  #define ITEM_SLABBED 4
1306  
1307 +#ifdef USE_REPLICATION
1308 +#define ITEM_REPDATA 128
1309 +#endif /*USE_REPLICATION*/
1310 +
1311  /**
1312   * Structure for storing items within memcached.
1313   */
1314 @@ -438,6 +455,10 @@
1315  #include "trace.h"
1316  #include "hash.h"
1317  #include "util.h"
1318 +
1319 +#ifdef USE_REPLICATION
1320 +#include "replication.h"
1321 +#endif /* USE_REPLICATION */
1322  
1323  /*
1324   * Functions such as the libevent-related calls that need to do cross-thread
1325 --- memcached-1.4.4/replication.c       Thu Jan  1 03:00:00 1970
1326 +++ repcached-2.2-1.4.4/replication.c   Wed Feb 10 18:40:48 2010
1327 @@ -0,0 +1,355 @@
1328 +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
1329 +/*
1330 + *
1331 + */
1332 +#include "memcached.h"
1333 +#include "replication.h"
1334 +#include <stdlib.h>
1335 +#include <stdio.h>
1336 +#include <unistd.h>
1337 +#include <string.h>
1338 +#include <errno.h>
1339 +
1340 +static Q_ITEM *q_freelist  = NULL;
1341 +static int     q_itemcount = 0;
1342 +static pthread_mutex_t replication_queue_lock = PTHREAD_MUTEX_INITIALIZER;
1343 +
1344 +int get_qi_count(void)
1345 +{
1346 +    int c;
1347 +    pthread_mutex_lock(&replication_queue_lock);
1348 +    c = q_itemcount;
1349 +    pthread_mutex_unlock(&replication_queue_lock);
1350 +    return(c);
1351 +}
1352 +
1353 +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool reuse)
1354 +{
1355 +    Q_ITEM     *q      = NULL;
1356 +    char       *key    = NULL;
1357 +    uint32_t    keylen = 0;
1358 +    rel_time_t  time   = 0;
1359 +
1360 +    pthread_mutex_lock(&replication_queue_lock);
1361 +    if(q_freelist){
1362 +        q = q_freelist;
1363 +        q_freelist = q->next;
1364 +    }
1365 +
1366 +    if(NULL == q){
1367 +        if(reuse) {
1368 +            pthread_mutex_unlock(&replication_queue_lock);
1369 +            return(NULL);
1370 +        }
1371 +        if(q_itemcount >= settings.rep_qmax) {
1372 +            pthread_mutex_unlock(&replication_queue_lock);
1373 +            return(NULL);
1374 +        }
1375 +        q = malloc(sizeof(Q_ITEM));
1376 +        if (NULL == q){
1377 +            fprintf(stderr,"replication: qi_new out of memory\n");
1378 +            pthread_mutex_unlock(&replication_queue_lock);
1379 +            return(NULL);
1380 +        }
1381 +        q_itemcount++;
1382 +        if (settings.verbose > 2)
1383 +            fprintf(stderr,"replication: alloc c=%d\n", q_itemcount);
1384 +    }
1385 +
1386 +    pthread_mutex_unlock(&replication_queue_lock);
1387 +
1388 +    switch (type) {
1389 +    case REPLICATION_REP:
1390 +    case REPLICATION_DEL:
1391 +        key    = cmd->key;
1392 +        keylen = cmd->keylen;
1393 +        break;
1394 +    case REPLICATION_FLUSH_ALL:
1395 +        break;
1396 +    case REPLICATION_DEFER_FLUSH_ALL:
1397 +        time   = cmd->time;
1398 +        break;
1399 +    case REPLICATION_MARUGOTO_END:
1400 +        break;
1401 +    default:
1402 +        fprintf(stderr,"replication: got unknown command: %d\n", type);
1403 +        return(NULL);
1404 +    }
1405 +
1406 +    q->key  = NULL;
1407 +    q->type = type;
1408 +    q->time = time;
1409 +    q->next = NULL;
1410 +    if (keylen) {
1411 +        q->key = malloc(keylen + 1);
1412 +        if(NULL == q->key){
1413 +            qi_free(q);
1414 +            q = NULL;
1415 +        }else{
1416 +            memcpy(q->key, key, keylen);
1417 +            *(q->key + keylen) = 0;
1418 +        }
1419 +    }
1420 +
1421 +    return(q);
1422 +}
1423 +
1424 +void qi_free(Q_ITEM *q)
1425 +{
1426 +    if(q){
1427 +        if(q->key){
1428 +            free(q->key);
1429 +            q->key = NULL;
1430 +        }
1431 +        pthread_mutex_lock(&replication_queue_lock);
1432 +        q->next = q_freelist;
1433 +        q_freelist = q;
1434 +        pthread_mutex_unlock(&replication_queue_lock);
1435 +    }
1436 +}
1437 +
1438 +int qi_free_list()
1439 +{
1440 +    int     c = 0;
1441 +    Q_ITEM *q = NULL;
1442 +
1443 +    pthread_mutex_lock(&replication_queue_lock);
1444 +    while((q = q_freelist)){
1445 +        q_itemcount--;
1446 +        c++;
1447 +        q_freelist = q->next;
1448 +        free(q);
1449 +    }
1450 +    pthread_mutex_unlock(&replication_queue_lock);
1451 +    return(c);
1452 +}
1453 +
1454 +static int replication_get_num(char *p, int n)
1455 +{
1456 +    int  l;
1457 +    char b[64];
1458 +    if(p)
1459 +        l = sprintf(p, "%u", n);
1460 +    else
1461 +        l = sprintf(b, "%u", n);
1462 +    return(l);
1463 +}
1464 +
1465 +int replication_call_rep(char *key, size_t keylen)
1466 +{
1467 +    R_CMD r;
1468 +    r.key    = key;
1469 +    r.keylen = keylen;
1470 +    return(replication(REPLICATION_REP, &r));
1471 +}
1472 +
1473 +int replication_call_del(char *key, size_t keylen)
1474 +{
1475 +    R_CMD r;
1476 +    r.key    = key;
1477 +    r.keylen = keylen;
1478 +    return(replication(REPLICATION_DEL, &r));
1479 +}
1480 +
1481 +int replication_call_flush_all()
1482 +{
1483 +    R_CMD r;
1484 +    r.key = NULL;
1485 +    return(replication(REPLICATION_FLUSH_ALL, &r));
1486 +}
1487 +
1488 +int replication_call_defer_flush_all(const rel_time_t time)
1489 +{
1490 +    R_CMD r;
1491 +    r.key  = NULL;
1492 +    r.time = time;
1493 +    return(replication(REPLICATION_DEFER_FLUSH_ALL, &r));
1494 +}
1495 +
1496 +int replication_call_marugoto_end()
1497 +{
1498 +    R_CMD r;
1499 +    r.key = NULL;
1500 +    return(replication(REPLICATION_MARUGOTO_END, &r));
1501 +}
1502 +
1503 +static int replication_alloc(conn *c, int s)
1504 +{
1505 +    char *p;
1506 +    s += c->wbytes;
1507 +    if(c->wsize < s){
1508 +        while(c->wsize < s)
1509 +            c->wsize += 4096;
1510 +        if((p = malloc(c->wsize))){
1511 +            memcpy(p, c->wbuf, c->wbytes);
1512 +            free(c->wbuf);
1513 +            c->wbuf = p;
1514 +        }else{
1515 +            return(-1);
1516 +        }
1517 +    }
1518 +    return(0);
1519 +}
1520 +
1521 +static int replication_del(conn *c, char *k)
1522 +{
1523 +    int   l = 0;
1524 +    char *s = "delete ";
1525 +    char *n = "\r\n";
1526 +    char *p = NULL;
1527 +
1528 +    l += strlen(s);
1529 +    l += strlen(k);
1530 +    l += strlen(n);
1531 +    if(replication_alloc(c,l) == -1){
1532 +        fprintf(stderr, "replication: del malloc error\n");
1533 +        return(-1);
1534 +    }
1535 +    p = c->wbuf + c->wbytes;
1536 +    memcpy(p, s, strlen(s));
1537 +    p += strlen(s);
1538 +    memcpy(p, k, strlen(k));
1539 +    p += strlen(k);
1540 +    memcpy(p, n, strlen(n));
1541 +    p += strlen(n);
1542 +    c->wbytes = p - c->wbuf;
1543 +    c->wcurr  = c->wbuf;
1544 +    return(0);
1545 +}
1546 +
1547 +static int replication_rep(conn *c, item *it)
1548 +{
1549 +    int exp = 0;
1550 +    int len = 0;
1551 +    char *s = "rep ";
1552 +    char *n = "\r\n";
1553 +    char *p = NULL;
1554 +    char flag[40];
1555 +
1556 +    if(it->exptime)
1557 +        exp = it->exptime + process_started;
1558 +    flag[0]=0;
1559 +    if((p=ITEM_suffix(it))){
1560 +        int i;
1561 +        memcpy(flag, p, it->nsuffix - 2);
1562 +        flag[it->nsuffix - 2] = 0;
1563 +        for(i=0;i<strlen(flag);i++){
1564 +            if(flag[i] > ' ')
1565 +                break;
1566 +        }
1567 +        memmove(flag,&flag[i],strlen(flag)-i);
1568 +        for(p=flag;*p>' ';p++);
1569 +        *p=0;
1570 +    }
1571 +    len += strlen(s);
1572 +    len += it->nkey;
1573 +    len += 1;
1574 +    len += strlen(flag);
1575 +    len += 1;
1576 +    len += replication_get_num(NULL, exp);
1577 +    len += 1;
1578 +    len += replication_get_num(NULL, it->nbytes - 2);
1579 +    len += 1;
1580 +    len += replication_get_num(NULL, ITEM_get_cas(it));
1581 +    len += strlen(n);
1582 +    len += it->nbytes;
1583 +    len += strlen(n);
1584 +    if(replication_alloc(c,len) == -1){
1585 +        fprintf(stderr, "replication: rep malloc error\n");
1586 +        return(-1);
1587 +    }
1588 +    p = c->wbuf + c->wbytes;
1589 +    memcpy(p, s, strlen(s));
1590 +    p += strlen(s);
1591 +    memcpy(p, ITEM_key(it), it->nkey);
1592 +    p += it->nkey;
1593 +    *(p++) = ' ';
1594 +    memcpy(p, flag, strlen(flag));
1595 +    p += strlen(flag);
1596 +    *(p++) = ' ';
1597 +    p += replication_get_num(p, exp);
1598 +    *(p++) = ' ';
1599 +    p += replication_get_num(p, it->nbytes - 2);
1600 +    *(p++) = ' ';
1601 +    p += replication_get_num(p, ITEM_get_cas(it));
1602 +    memcpy(p, n, strlen(n));
1603 +    p += strlen(n);
1604 +    memcpy(p, ITEM_data(it), it->nbytes);
1605 +    p += it->nbytes;
1606 +    c->wbytes = p - c->wbuf;
1607 +    c->wcurr  = c->wbuf;
1608 +    return(0);
1609 +}
1610 +
1611 +static int replication_flush_all(conn *c, rel_time_t exp)
1612 +{
1613 +    char *s = "flush_all ";
1614 +    char *n = "\r\n";
1615 +    char *p = NULL;
1616 +
1617 +    int l = strlen(s) + strlen(n);
1618 +    if (exp > 0)
1619 +        l += replication_get_num(NULL, exp);
1620 +    if(replication_alloc(c,l) == -1){
1621 +        fprintf(stderr, "replication: flush_all malloc error\n");
1622 +        return(-1);
1623 +    }
1624 +    p = c->wbuf + c->wbytes;
1625 +    memcpy(p, s, strlen(s));
1626 +    p += strlen(s);
1627 +    if (exp > 0)
1628 +        p += replication_get_num(p, exp);
1629 +    memcpy(p, n, strlen(n));
1630 +    p += strlen(n);
1631 +    c->wbytes = p - c->wbuf;
1632 +    c->wcurr  = c->wbuf;
1633 +    return(0);
1634 +}
1635 +
1636 +static int replication_marugoto_end(conn *c)
1637 +{
1638 +    char *s = "marugoto_end";
1639 +    char *n = "\r\n";
1640 +    char *p = NULL;
1641 +
1642 +    int l = strlen(s) + strlen(n);
1643 +    if(replication_alloc(c,l) == -1){
1644 +        fprintf(stderr, "replication: marugoto_end malloc error\n");
1645 +        return(-1);
1646 +    }
1647 +    p = c->wbuf + c->wbytes;
1648 +    memcpy(p, s, strlen(s));
1649 +    p += strlen(s);
1650 +    memcpy(p, n, strlen(n));
1651 +    p += strlen(n);
1652 +    c->wbytes = p - c->wbuf;
1653 +    c->wcurr  = c->wbuf;
1654 +    return(0);
1655 +}
1656 +
1657 +int replication_cmd(conn *c, Q_ITEM *q)
1658 +{
1659 +    item *it;
1660 +    int r;
1661 +
1662 +    switch (q->type) {
1663 +    case REPLICATION_REP:
1664 +        it = item_get(q->key, strlen(q->key));
1665 +        if (!it)
1666 +            return(replication_del(c, q->key));
1667 +        r = replication_rep(c, it);
1668 +        item_remove(it);
1669 +        return r;
1670 +    case REPLICATION_DEL:
1671 +        return(replication_del(c, q->key));
1672 +    case REPLICATION_FLUSH_ALL:
1673 +        return(replication_flush_all(c, 0));
1674 +    case REPLICATION_DEFER_FLUSH_ALL:
1675 +        return(replication_flush_all(c, q->time));
1676 +    case REPLICATION_MARUGOTO_END:
1677 +        return(replication_marugoto_end(c));
1678 +    default:
1679 +        fprintf(stderr,"replication: got unknown command:%d\n", q->type);
1680 +        return(0);
1681 +    }
1682 +}
1683 --- memcached-1.4.4/replication.h       Thu Jan  1 03:00:00 1970
1684 +++ repcached-2.2-1.4.4/replication.h   Wed Feb 10 18:40:31 2010
1685 @@ -0,0 +1,42 @@
1686 +#ifndef MEMCACHED_REPLICATION_H
1687 +#define MEMCACHED_REPLICATION_H
1688 +#define REPCACHED_VERSION "2.2"
1689 +#include <netdb.h>
1690 +
1691 +enum CMD_TYPE {
1692 +  REPLICATION_REP,
1693 +  REPLICATION_DEL,
1694 +  REPLICATION_FLUSH_ALL,
1695 +  REPLICATION_DEFER_FLUSH_ALL,
1696 +  REPLICATION_MARUGOTO_END,
1697 +};
1698 +
1699 +typedef struct queue_item_t Q_ITEM;
1700 +struct queue_item_t {
1701 +  enum CMD_TYPE  type;
1702 +  char          *key;
1703 +  rel_time_t     time;
1704 +  Q_ITEM        *next;
1705 +};
1706 +
1707 +typedef struct replication_cmd_t R_CMD;
1708 +struct replication_cmd_t {
1709 +  char       *key;
1710 +  int         keylen;
1711 +  rel_time_t  time;
1712 +};
1713 +
1714 +Q_ITEM *qi_new(enum CMD_TYPE type, R_CMD *cmd, bool);
1715 +void    qi_free(Q_ITEM *);
1716 +int     qi_free_list(void);
1717 +int     replication_cmd(conn *, Q_ITEM *);
1718 +int     get_qi_count(void);
1719 +
1720 +int replication_call_rep(char *key, size_t keylen);
1721 +int replication_call_del(char *key, size_t keylen);
1722 +int replication_call_flush_all(void);
1723 +int replication_call_defer_flush_all(const rel_time_t time);
1724 +int replication_call_marugoto_end(void);
1725 +int replication(enum CMD_TYPE type, R_CMD *cmd);
1726 +
1727 +#endif
1728 --- memcached-1.4.5/t/binary.t~ 2010-04-03 10:07:16.000000000 +0300
1729 +++ memcached-1.4.5/t/binary.t  2010-05-06 14:13:25.718440750 +0300
1730 @@ -2,11 +2,13 @@
1731  
1732  use strict;
1733  use warnings;
1734 -use Test::More tests => 3361;
1735 +use Test::More;
1736  use FindBin qw($Bin);
1737  use lib "$Bin/lib";
1738  use MemcachedTest;
1739  
1740 +Test::More::plan(tests => 3361 + (support_replication() ? 36 : 0));
1741 +
1742  my $server = new_memcached();
1743  ok($server, "started the server");
1744  
1745 --- memcached-1.4.4/t/issue_67.t        Sun Nov  1 01:44:09 2009
1746 +++ repcached-2.2-1.4.4/t/issue_67.t    Wed Feb 10 17:50:12 2010
1747 @@ -41,6 +41,10 @@
1748      my $exe = "$builddir/memcached-debug";
1749      croak("memcached binary doesn't exist.  Haven't run 'make' ?\n") unless -e $exe;
1750  
1751 +    if (support_replication()) {
1752 +        $args .= ' -X 0';
1753 +    }
1754 +
1755      my $childpid = fork();
1756  
1757      my $cmd = "$builddir/timedrun 10 $exe $args";
1758 --- memcached-1.4.4/t/lib/MemcachedTest.pm      Fri Oct 30 04:24:52 2009
1759 +++ repcached-2.2-1.4.4/t/lib/MemcachedTest.pm  Wed Feb 10 17:53:34 2010
1760 @@ -13,7 +13,8 @@
1761  
1762  
1763  @EXPORT = qw(new_memcached sleep mem_get_is mem_gets mem_gets_is mem_stats
1764 -             supports_sasl free_port);
1765 +             supports_sasl free_port support_replication memcached_version
1766 +             version2num);
1767  
1768  sub sleep {
1769      my $n = shift;
1770 @@ -148,6 +149,23 @@
1771      return 0;
1772  }
1773  
1774 +sub support_replication {
1775 +    my $output = `$builddir/memcached-debug -h`;
1776 +    return 1 if $output =~ /^-x <ip_addr>/m;
1777 +    return 0;
1778 +}
1779 +
1780 +sub memcached_version {
1781 +    my $output = `$builddir/memcached-debug -h`;
1782 +    return $1 if $output =~ /^memcached (\d[\d\.]+)/;
1783 +    return 0;
1784 +}
1785 +
1786 +sub version2num {
1787 +    my($major,$minor,$pl) = ($_[0] =~ /^(\d+)\.(\d+)\.(\d+)$/);
1788 +    return $major*100**2 + $minor*100 + $pl
1789 +}
1790 +
1791  sub new_memcached {
1792      my ($args, $passed_port) = @_;
1793      my $port = $passed_port || free_port();
1794 @@ -171,6 +189,9 @@
1795      }
1796      if ($< == 0) {
1797          $args .= " -u root";
1798 +    }
1799 +    if (support_replication() && $args !~ m/-X/) {
1800 +        $args .= ' -X 0';
1801      }
1802  
1803      my $childpid = fork();
1804 --- memcached-1.4.5/t/stats.t~  2010-04-03 10:07:16.000000000 +0300
1805 +++ memcached-1.4.5/t/stats.t   2010-05-06 14:15:28.521352735 +0300
1806 @@ -57,7 +57,8 @@
1807  my $stats = mem_stats($sock);
1808  
1809  # Test number of keys
1810 -is(scalar(keys(%$stats)), 38, "38 stats values");
1811 +my $keys = 38 + (support_replication() ? 3 : 0);
1812 +is(scalar(keys(%$stats)), $keys, "$keys stats values");
1813  
1814  # Test initial state
1815  foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses
1816 --- memcached-1.4.4/testapp.c   Wed Nov 25 03:40:29 2009
1817 +++ repcached-2.2-1.4.4/testapp.c       Wed Feb 10 17:52:05 2010
1818 @@ -300,6 +300,10 @@
1819          argv[arg++] = "-1";
1820          argv[arg++] = "-U";
1821          argv[arg++] = "0";
1822 +#ifdef USE_REPLICATION
1823 +        argv[arg++] = "-X";
1824 +        argv[arg++] = "0";
1825 +#endif
1826          /* Handle rpmbuild and the like doing this as root */
1827          if (getuid() == 0) {
1828              argv[arg++] = "-u";
This page took 0.149149 seconds and 3 git commands to generate.