1 *** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700
2 --- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700
9 + * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
10 + * a queue per connection, waiting for TCP buffer space to become available in
11 + * the kernel. Rather than exceeding this limit, we simply discard additional
12 + * messages (since this is always allowed by the replication protocol).
13 + * As a special dispensation, if a message is destined for a specific remote
14 + * site (i.e., it's not a broadcast), then we first try blocking the sending
15 + * thread, waiting for space to become available (though we only wait a limited
16 + * time). This is so as to be able to handle the immediate flood of (a
17 + * potentially large number of) outgoing messages that replication generates, in
18 + * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
20 + #define OUT_QUEUE_LIMIT 10
23 * The system value is available from sysconf(_SC_HOST_NAME_MAX).
24 * Historically, the maximum host name was 256.
29 #define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
30 typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
32 + /* Default timeout values, in seconds. */
33 + #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC)
34 + #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC)
35 + #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC)
37 struct __repmgr_connection;
38 typedef struct __repmgr_connection REPMGR_CONNECTION;
39 struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
43 WSAEVENT event_object;
45 ! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */
46 ! #define CONN_DEFUNCT 0x02 /* socket close pending */
52 WSAEVENT event_object;
54 ! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */
55 ! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */
56 ! #define CONN_DEFUNCT 0x04 /* socket close pending */
62 * send() function's thread. But if TCP doesn't have enough network
63 * buffer space for us when we first try it, we instead allocate some
64 * memory, and copy the message, and then send it as space becomes
65 ! * available in our main select() thread.
67 OUT_Q_HEADER outbound_queue;
71 * Input: while we're reading a message, we keep track of what phase
73 * send() function's thread. But if TCP doesn't have enough network
74 * buffer space for us when we first try it, we instead allocate some
75 * memory, and copy the message, and then send it as space becomes
76 ! * available in our main select() thread. In some cases, if the queue
77 ! * gets too long we wait until it's drained, and then append to it.
78 ! * This condition variable's associated mutex is the normal per-repmgr
79 ! * db_rep->mutex, because that mutex is always held anyway whenever the
80 ! * output queue is consulted.
82 OUT_Q_HEADER outbound_queue;
85 + int blockers; /* ref count of msg threads waiting on us */
88 * Input: while we're reading a message, we keep track of what phase
89 *** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
90 --- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
94 #define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@
95 #define __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@
96 #define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@
97 + #define __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@
98 + #define __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@
99 + #define __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@
100 #define __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@
101 #define __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@
102 #define __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@
103 *** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
104 --- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
107 int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
108 void __repmgr_stash_generation __P((DB_ENV *));
109 int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
110 ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
111 int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
112 ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
113 ! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
114 int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
115 int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
116 int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
118 int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
119 void __repmgr_stash_generation __P((DB_ENV *));
120 int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
121 ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
122 int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
123 ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
124 ! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
125 int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
126 int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
127 int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
131 int __repmgr_wake_waiting_senders __P((DB_ENV *));
132 int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
133 void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
134 + int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
135 + int __repmgr_alloc_cond __P((cond_var_t *));
136 + int __repmgr_free_cond __P((cond_var_t *));
137 int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
138 int __repmgr_close_sync __P((DB_ENV *));
139 int __repmgr_net_init __P((DB_ENV *, DB_REP *));
140 *** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700
141 --- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700
146 /* Set some default values. */
147 ! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */
148 ! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */
149 ! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */
150 db_rep->config_nsites = 0;
151 db_rep->peer = DB_EID_INVALID;
152 db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
156 /* Set some default values. */
157 ! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
158 ! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
159 ! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
160 db_rep->config_nsites = 0;
161 db_rep->peer = DB_EID_INVALID;
162 db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
169 + REPMGR_CONNECTION *conn;
172 db_rep = dbenv->rep_handle;
177 if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
180 + TAILQ_FOREACH(conn, &db_rep->connections, entries) {
181 + if (conn->blockers > 0 &&
182 + ((ret = __repmgr_signal(&conn->drained)) != 0))
185 UNLOCK_MUTEX(db_rep->mutex);
187 return (__repmgr_wake_main_thread(dbenv));
188 *** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700
189 --- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700
194 * Acknowledges a message.
197 - * Note that this cannot be called from the select() thread, in case we call
198 - * __repmgr_bust_connection(..., FALSE).
201 ack_message(dbenv, generation, lsn)
207 conn = site->ref.conn;
208 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
209 ! &control2, &rec2)) == DB_REP_UNAVAIL)
210 ! ret = __repmgr_bust_connection(dbenv, conn, FALSE);
213 UNLOCK_MUTEX(db_rep->mutex);
217 conn = site->ref.conn;
219 + * It's hard to imagine anyone would care about a lost ack if
220 + * the path to the master is so congested as to need blocking;
221 + * so pass "blockable" argument as FALSE.
223 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
224 ! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
225 ! ret = __repmgr_bust_connection(dbenv, conn);
228 UNLOCK_MUTEX(db_rep->mutex);
229 *** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700
230 --- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700
233 static void setup_sending_msg
234 __P((struct sending_msg *, u_int, const DBT *, const DBT *));
235 static int __repmgr_send_internal
236 ! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
237 static int enqueue_msg
238 __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
239 static int flatten __P((DB_ENV *, struct sending_msg *));
241 static void setup_sending_msg
242 __P((struct sending_msg *, u_int, const DBT *, const DBT *));
243 static int __repmgr_send_internal
244 ! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
245 static int enqueue_msg
246 __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
247 static int flatten __P((DB_ENV *, struct sending_msg *));
251 * The send function for DB_ENV->rep_set_transport.
254 - * This is only ever called as the replication transport call-back, which means
255 - * it's either on one of our message processing threads or an application
256 - * thread. It mustn't be called from the select() thread, because we might call
257 - * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
260 * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
261 * PUBLIC: const DB_LSN *, int, u_int32_t));
268 conn = site->ref.conn;
269 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
270 ! control, rec)) == DB_REP_UNAVAIL &&
271 ! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
278 conn = site->ref.conn;
279 + /* Pass the "blockable" argument as TRUE. */
280 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
281 ! control, rec, TRUE)) == DB_REP_UNAVAIL &&
282 ! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
288 if (site->state != SITE_CONNECTED)
291 ! if (F_ISSET(site->ref.conn, CONN_CONNECTING))
296 if (site->state != SITE_CONNECTED)
299 ! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
307 * Caller must hold dbenv->mutex.
310 - * Note that this cannot be called from the select() thread, in case we call
311 - * __repmgr_bust_connection(..., FALSE).
314 __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
318 !IS_VALID_EID(conn->eid))
321 ! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
322 site = SITE_FROM_EID(conn->eid);
324 if (site->priority > 0)
326 } else if (ret == DB_REP_UNAVAIL) {
327 ! if ((ret = __repmgr_bust_connection(
328 ! dbenv, conn, FALSE)) != 0)
333 !IS_VALID_EID(conn->eid))
337 ! * Broadcast messages are either application threads committing
338 ! * transactions, or replication status message that we can
339 ! * afford to lose. So don't allow blocking for them (pass
340 ! * "blockable" argument as FALSE).
342 ! if ((ret = __repmgr_send_internal(dbenv,
343 ! conn, &msg, FALSE)) == 0) {
344 site = SITE_FROM_EID(conn->eid);
346 if (site->priority > 0)
348 } else if (ret == DB_REP_UNAVAIL) {
349 ! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
355 * intersperse writes that are part of two single messages.
357 * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
358 ! * PUBLIC: u_int, const DBT *, const DBT *));
361 ! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
363 REPMGR_CONNECTION *conn;
365 const DBT *control, *rec;
367 struct sending_msg msg;
369 setup_sending_msg(&msg, msg_type, control, rec);
370 ! return (__repmgr_send_internal(dbenv, conn, &msg));
374 * Attempts a "best effort" to send a message on the given site. If there is an
375 ! * excessive backlog of message already queued on the connection, we simply drop
376 ! * this message, and still return 0 even in this case.
379 ! __repmgr_send_internal(dbenv, conn, msg)
381 REPMGR_CONNECTION *conn;
382 struct sending_msg *msg;
384 ! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */
385 REPMGR_IOVECS iovecs;
386 SITE_STRING_BUFFER buffer;
389 size_t total_written;
391 DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
392 if (!STAILQ_EMPTY(&conn->outbound_queue)) {
395 * intersperse writes that are part of two single messages.
397 * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
398 ! * PUBLIC: u_int, const DBT *, const DBT *, int));
401 ! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
403 REPMGR_CONNECTION *conn;
405 const DBT *control, *rec;
408 struct sending_msg msg;
410 setup_sending_msg(&msg, msg_type, control, rec);
411 ! return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
415 * Attempts a "best effort" to send a message on the given site. If there is an
416 ! * excessive backlog of message already queued on the connection, what shall we
417 ! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of
418 ! * time) for the queue to drain. Otherwise we'll simply drop the message. This
419 ! * is always allowed by the replication protocol. But in the case of a
420 ! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
421 ! * almost always get a flood of messages that instantly fills our queue, so
422 ! * blocking improves performance (by avoiding the need for the client to
425 ! * How long shall we wait? We could of course create a new timeout
426 ! * configuration type, so that the application could set it directly. But that
427 ! * would start to overwhelm the user with too many choices to think about. We
428 ! * already have an ACK timeout, which is the user's estimate of how long it
429 ! * should take to send a message to the client, have it be processed, and return
430 ! * a message back to us. We multiply that by the queue size, because that's how
431 ! * many messages have to be swallowed up by the client before we're able to
432 ! * start sending again (at least to a rough approximation).
435 ! __repmgr_send_internal(dbenv, conn, msg, blockable)
437 REPMGR_CONNECTION *conn;
438 struct sending_msg *msg;
442 REPMGR_IOVECS iovecs;
443 SITE_STRING_BUFFER buffer;
444 + db_timeout_t drain_to;
447 size_t total_written;
449 + db_rep = dbenv->rep_handle;
451 DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
452 if (!STAILQ_EMPTY(&conn->outbound_queue)) {
456 RPRINT(dbenv, (dbenv, "msg to %s to be queued",
457 __repmgr_format_eid_loc(dbenv->rep_handle,
458 conn->eid, buffer)));
459 if (conn->out_queue_length < OUT_QUEUE_LIMIT)
460 return (enqueue_msg(dbenv, conn, msg, 0));
462 RPRINT(dbenv, (dbenv, "queue limit exceeded"));
463 STAT(dbenv->rep_handle->
464 region->mstat.st_msgs_dropped++);
470 * Send as much data to the site as we can, without blocking. Keep
472 RPRINT(dbenv, (dbenv, "msg to %s to be queued",
473 __repmgr_format_eid_loc(dbenv->rep_handle,
474 conn->eid, buffer)));
475 + if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
476 + blockable && !F_ISSET(conn, CONN_CONGESTED)) {
477 + RPRINT(dbenv, (dbenv,
478 + "block msg thread, await queue space"));
480 + if ((drain_to = db_rep->ack_timeout) == 0)
481 + drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
483 + ret = __repmgr_await_drain(dbenv,
484 + conn, drain_to * OUT_QUEUE_LIMIT);
486 + if (db_rep->finished)
487 + return (DB_TIMEOUT);
490 + if (STAILQ_EMPTY(&conn->outbound_queue))
493 if (conn->out_queue_length < OUT_QUEUE_LIMIT)
494 return (enqueue_msg(dbenv, conn, msg, 0));
496 RPRINT(dbenv, (dbenv, "queue limit exceeded"));
497 STAT(dbenv->rep_handle->
498 region->mstat.st_msgs_dropped++);
499 ! return (blockable ? DB_TIMEOUT : 0);
505 * Send as much data to the site as we can, without blocking. Keep
510 * Abandons a connection, to recover from an error. Upon entry the conn struct
511 ! * must be on the connections list.
513 ! * If the 'do_close' flag is true, we do the whole job; the clean-up includes
514 ! * removing the struct from the list and freeing all its memory, so upon return
515 ! * the caller must not refer to it any further. Otherwise, we merely mark the
516 ! * connection for clean-up later by the main thread.
518 * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
519 ! * PUBLIC: REPMGR_CONNECTION *, int));
522 * Caller holds mutex.
525 ! __repmgr_bust_connection(dbenv, conn, do_close)
527 REPMGR_CONNECTION *conn;
531 int connecting, ret, eid;
535 * Abandons a connection, to recover from an error. Upon entry the conn struct
536 ! * must be on the connections list. For now, just mark it as unusable; it will
537 ! * be fully cleaned up in the top-level select thread, as soon as possible.
539 * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
540 ! * PUBLIC: REPMGR_CONNECTION *));
543 * Caller holds mutex.
545 + * Must be idempotent
548 ! __repmgr_bust_connection(dbenv, conn)
550 REPMGR_CONNECTION *conn;
553 int connecting, ret, eid;
556 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
558 connecting = F_ISSET(conn, CONN_CONNECTING);
560 ! __repmgr_cleanup_connection(dbenv, conn);
562 ! F_SET(conn, CONN_DEFUNCT);
567 * When we first accepted the incoming connection, we set conn->eid to
569 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
571 connecting = F_ISSET(conn, CONN_CONNECTING);
573 ! F_SET(conn, CONN_DEFUNCT);
577 * When we first accepted the incoming connection, we set conn->eid to
580 dbenv, ELECT_FAILURE_ELECTION)) != 0)
583 ! } else if (!do_close) {
585 * One way or another, make sure the main thread is poked, so
586 * that we do the deferred clean-up.
588 dbenv, ELECT_FAILURE_ELECTION)) != 0)
593 * One way or another, make sure the main thread is poked, so
594 * that we do the deferred clean-up.
600 ! * PUBLIC: void __repmgr_cleanup_connection
601 * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
604 __repmgr_cleanup_connection(dbenv, conn)
606 REPMGR_CONNECTION *conn;
611 ! * PUBLIC: int __repmgr_cleanup_connection
612 * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
615 + * Idempotent. This can be called repeatedly as blocking message threads (of
616 + * which there could be multiples) wake up in case of error on the connection.
619 __repmgr_cleanup_connection(dbenv, conn)
621 REPMGR_CONNECTION *conn;
628 db_rep = dbenv->rep_handle;
630 ! TAILQ_REMOVE(&db_rep->connections, conn, entries);
631 if (conn->fd != INVALID_SOCKET) {
632 ! (void)closesocket(conn->fd);
634 ! (void)WSACloseEvent(conn->event_object);
639 * Deallocate any input and output buffers we may have.
647 db_rep = dbenv->rep_handle;
649 ! DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished);
651 if (conn->fd != INVALID_SOCKET) {
652 ! ret = closesocket(conn->fd);
653 ! conn->fd = INVALID_SOCKET;
654 ! if (ret == SOCKET_ERROR) {
656 ! __db_err(dbenv, ret, "closing socket");
659 ! if (!WSACloseEvent(conn->event_object) && ret != 0)
666 + if (conn->blockers > 0)
667 + return (__repmgr_signal(&conn->drained));
669 + TAILQ_REMOVE(&db_rep->connections, conn, entries);
671 * Deallocate any input and output buffers we may have.
676 __os_free(dbenv, out);
679 + ret = __repmgr_free_cond(&conn->drained);
680 __os_free(dbenv, conn);
688 while (!TAILQ_EMPTY(&db_rep->connections)) {
689 conn = TAILQ_FIRST(&db_rep->connections);
690 ! __repmgr_cleanup_connection(dbenv, conn);
693 for (i = 0; i < db_rep->site_cnt; i++) {
696 while (!TAILQ_EMPTY(&db_rep->connections)) {
697 conn = TAILQ_FIRST(&db_rep->connections);
698 ! (void)__repmgr_cleanup_connection(dbenv, conn);
701 for (i = 0; i < db_rep->site_cnt; i++) {
702 *** repmgr/repmgr_posix.c 2007-10-31 10:23:52.000000000 -0700
703 --- repmgr/repmgr_posix.c 2007-10-31 10:23:53.000000000 -0700
707 size_t __repmgr_guesstimated_max = (128 * 1024);
710 + static int __repmgr_conn_work __P((DB_ENV *,
711 + REPMGR_CONNECTION *, fd_set *, fd_set *, int));
712 static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
721 + * PUBLIC: int __repmgr_await_drain __P((DB_ENV *,
722 + * PUBLIC: REPMGR_CONNECTION *, db_timeout_t));
724 + * Waits for space to become available on the connection's output queue.
725 + * Various ways we can exit:
727 + * 1. queue becomes non-full
728 + * 2. exceed time limit
729 + * 3. connection becomes defunct (due to error in another thread)
730 + * 4. repmgr is shutting down
731 + * 5. any unexpected system resource failure
733 + * In cases #3 and #5 we return an error code. Caller is responsible for
734 + * distinguishing the remaining cases if desired.
737 + * Caller must hold repmgr->mutex.
740 + __repmgr_await_drain(dbenv, conn, timeout)
742 + REPMGR_CONNECTION *conn;
743 + db_timeout_t timeout;
746 + struct timespec deadline;
749 + db_rep = dbenv->rep_handle;
751 + __repmgr_compute_wait_deadline(dbenv, &deadline, timeout);
754 + while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
755 + ret = pthread_cond_timedwait(&conn->drained,
756 + &db_rep->mutex, &deadline);
759 + if (db_rep->finished)
760 + goto out; /* #4. */
762 + * Another thread could have stumbled into an error on
763 + * the socket while we were waiting.
765 + if (F_ISSET(conn, CONN_DEFUNCT)) {
766 + ret = DB_REP_UNAVAIL; /* #3. */
771 + F_SET(conn, CONN_CONGESTED);
773 + goto out; /* #2. */
775 + goto out; /* #5. */
785 + * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
787 + * Initialize a condition variable (in allocated space).
790 + __repmgr_alloc_cond(c)
793 + return (pthread_cond_init(c, NULL));
797 + * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
799 + * Clean up a previously initialized condition variable.
802 + __repmgr_free_cond(c)
805 + return (pthread_cond_destroy(c));
809 * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
811 * Allocate/initialize all data necessary for thread synchronization. This
816 fd_set reads, writes;
817 ! int ret, flow_control, maxfd, nready;
818 u_int8_t buf[10]; /* arbitrary size */
820 flow_control = FALSE;
824 fd_set reads, writes;
825 ! int ret, flow_control, maxfd;
826 u_int8_t buf[10]; /* arbitrary size */
828 flow_control = FALSE;
834 TAILQ_FOREACH(conn, &db_rep->connections, entries) {
835 + if (F_ISSET(conn, CONN_DEFUNCT))
838 if (F_ISSET(conn, CONN_CONNECTING)) {
839 FD_SET((u_int)conn->fd, &reads);
840 FD_SET((u_int)conn->fd, &writes);
848 LOCK_MUTEX(db_rep->mutex);
851 - * The first priority thing we must do is to clean up any
852 - * pending defunct connections. Otherwise, if they have any
853 - * lingering pending input, we get very confused if we try to
856 - * The TAILQ_FOREACH macro would be suitable here, except that
857 - * it doesn't allow unlinking the current element, which is
858 - * needed for cleanup_connection.
860 - for (conn = TAILQ_FIRST(&db_rep->connections);
863 - next = TAILQ_NEXT(conn, entries);
864 - if (F_ISSET(conn, CONN_DEFUNCT))
865 - __repmgr_cleanup_connection(dbenv, conn);
868 if ((ret = __repmgr_retry_connections(dbenv)) != 0)
874 ! * Traverse the linked list. (Again, like TAILQ_FOREACH, except
875 ! * that we need the ability to unlink an element along the way.)
877 for (conn = TAILQ_FIRST(&db_rep->connections);
880 next = TAILQ_NEXT(conn, entries);
881 ! if (F_ISSET(conn, CONN_CONNECTING)) {
882 ! if (FD_ISSET((u_int)conn->fd, &reads) ||
883 ! FD_ISSET((u_int)conn->fd, &writes)) {
884 ! if ((ret = finish_connecting(dbenv,
885 ! conn)) == DB_REP_UNAVAIL) {
887 ! __repmgr_bust_connection(
888 ! dbenv, conn, TRUE)) != 0)
890 ! } else if (ret != 0)
897 ! * Here, the site is connected, and the FD_SET's are
900 ! if (FD_ISSET((u_int)conn->fd, &writes)) {
901 ! if ((ret = __repmgr_write_some(
902 ! dbenv, conn)) == DB_REP_UNAVAIL) {
904 ! __repmgr_bust_connection(dbenv,
908 ! } else if (ret != 0)
912 ! if (!flow_control &&
913 ! FD_ISSET((u_int)conn->fd, &reads)) {
914 ! if ((ret = __repmgr_read_from_site(dbenv, conn))
915 ! == DB_REP_UNAVAIL) {
917 ! __repmgr_bust_connection(dbenv,
921 ! } else if (ret != 0)
931 LOCK_MUTEX(db_rep->mutex);
933 if ((ret = __repmgr_retry_connections(dbenv)) != 0)
937 ! * Examine each connection, to see what work needs to be done.
939 ! * The TAILQ_FOREACH macro would be suitable here, except that
940 ! * it doesn't allow unlinking the current element, which is
941 ! * needed for cleanup_connection.
943 for (conn = TAILQ_FIRST(&db_rep->connections);
946 next = TAILQ_NEXT(conn, entries);
947 ! if ((ret = __repmgr_conn_work(dbenv,
948 ! conn, &reads, &writes, flow_control)) != 0)
959 + __repmgr_conn_work(dbenv, conn, reads, writes, flow_control)
961 + REPMGR_CONNECTION *conn;
962 + fd_set *reads, *writes;
968 + if (F_ISSET(conn, CONN_DEFUNCT)) {
970 + * Deferred clean-up, from an error that happened in another
971 + * thread, while we were sleeping in select().
973 + return (__repmgr_cleanup_connection(dbenv, conn));
977 + fd = (u_int)conn->fd;
979 + if (F_ISSET(conn, CONN_CONNECTING)) {
980 + if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
981 + ret = finish_connecting(dbenv, conn);
984 + * Here, the site is connected, and the FD_SET's are valid.
986 + if (FD_ISSET(fd, writes))
987 + ret = __repmgr_write_some(dbenv, conn);
989 + if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
990 + ret = __repmgr_read_from_site(dbenv, conn);
993 + if (ret == DB_REP_UNAVAIL) {
994 + if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
996 + ret = __repmgr_cleanup_connection(dbenv, conn);
1002 finish_connecting(dbenv, conn)
1004 REPMGR_CONNECTION *conn;
1011 + DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING));
1012 F_CLR(conn, CONN_CONNECTING);
1013 return (__repmgr_send_handshake(dbenv, conn));
1017 "connecting to %s", __repmgr_format_site_loc(site, buffer));
1019 /* If we've exhausted the list of possible addresses, give up. */
1020 ! if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
1021 return (DB_REP_UNAVAIL);
1024 * This is just like a little mini-"bust_connection", except that we
1025 * don't reschedule for later, 'cuz we're just about to try again right
1029 * Which means this must only be called on the select() thread, since
1030 * only there are we allowed to actually close a connection.
1032 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1033 ! __repmgr_cleanup_connection(dbenv, conn);
1034 ret = __repmgr_connect_site(dbenv, eid);
1035 DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1038 "connecting to %s", __repmgr_format_site_loc(site, buffer));
1040 /* If we've exhausted the list of possible addresses, give up. */
1041 ! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
1042 ! STAT(db_rep->region->mstat.st_connect_fail++);
1043 return (DB_REP_UNAVAIL);
1047 * This is just like a little mini-"bust_connection", except that we
1048 * don't reschedule for later, 'cuz we're just about to try again right
1049 ! * now. (Note that we don't have to worry about message threads
1050 ! * blocking on a full output queue: that can't happen when we're only
1051 ! * just connecting.)
1054 * Which means this must only be called on the select() thread, since
1055 * only there are we allowed to actually close a connection.
1057 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1058 ! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
1060 ret = __repmgr_connect_site(dbenv, eid);
1061 DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1063 *** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700
1064 --- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700
1069 * PUBLIC: int __repmgr_accept __P((DB_ENV *));
1072 - * Only ever called in the select() thread, since we may call
1073 - * __repmgr_bust_connection(..., TRUE).
1076 __repmgr_accept(dbenv)
1082 case DB_REP_UNAVAIL:
1083 ! return (__repmgr_bust_connection(dbenv, conn, TRUE));
1090 case DB_REP_UNAVAIL:
1091 ! return (__repmgr_bust_connection(dbenv, conn));
1097 * starting with the "current" element of its address list and trying as many
1098 * addresses as necessary until the list is exhausted.
1101 - * Only ever called in the select() thread, since we may call
1102 - * __repmgr_bust_connection(..., TRUE).
1104 * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
1112 case DB_REP_UNAVAIL:
1113 ! return (__repmgr_bust_connection(dbenv, con, TRUE));
1120 case DB_REP_UNAVAIL:
1121 ! return (__repmgr_bust_connection(dbenv, con));
1128 DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
1130 ! return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
1136 DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
1139 ! * It would of course be disastrous to block the select() thread, so
1140 ! * pass the "blockable" argument as FALSE. Fortunately blocking should
1141 ! * never be necessary here, because the hand-shake is always the first
1142 ! * thing we send. Which is a good thing, because it would be almost as
1143 ! * disastrous if we allowed ourselves to drop a handshake.
1145 ! return (__repmgr_send_one(dbenv,
1146 ! conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
1153 conn->out_queue_length--;
1154 if (--msg->ref_count <= 0)
1155 __os_free(dbenv, msg);
1158 + * We've achieved enough movement to free up at least
1159 + * one space in the outgoing queue. Wake any message
1160 + * threads that may be waiting for space. Clear the
1161 + * CONGESTED status so that when the queue reaches the
1162 + * high-water mark again, the filling thread will be
1163 + * allowed to try waiting again.
1165 + F_CLR(conn, CONN_CONGESTED);
1166 + if (conn->blockers > 0 &&
1167 + (ret = __repmgr_signal(&conn->drained)) != 0)
1172 *** repmgr/repmgr_util.c 2007-10-31 10:23:52.000000000 -0700
1173 --- repmgr/repmgr_util.c 2007-10-31 10:23:53.000000000 -0700
1177 db_rep = dbenv->rep_handle;
1178 if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
1180 + if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
1181 + __os_free(dbenv, c);
1188 *** repmgr/repmgr_windows.c 2007-10-31 10:23:52.000000000 -0700
1189 --- repmgr/repmgr_windows.c 2007-10-31 10:23:53.000000000 -0700
1193 #define __INCLUDE_NETWORKING 1
1196 + /* Convert time-out from microseconds to milliseconds, rounding up. */
1197 + #define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
1199 typedef struct __ack_waiter {
1210 db_rep = dbenv->rep_handle;
1212 if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
1215 - /* convert time-out from microseconds to milliseconds, rounding up */
1216 timeout = db_rep->ack_timeout > 0 ?
1217 ! ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
1219 if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
1220 FALSE)) == WAIT_FAILED) {
1225 ! DWORD ret, timeout;
1227 db_rep = dbenv->rep_handle;
1229 if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
1232 timeout = db_rep->ack_timeout > 0 ?
1233 ! DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
1235 if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
1236 FALSE)) == WAIT_FAILED) {
1240 db_rep->waiters->first_free = slot;
1243 + /* (See requirements described in repmgr_posix.c.) */
1245 + __repmgr_await_drain(dbenv, conn, timeout)
1247 + REPMGR_CONNECTION *conn;
1248 + db_timeout_t timeout;
1251 + db_timespec deadline, delta, now;
1253 + DWORD duration, ret;
1256 + db_rep = dbenv->rep_handle;
1258 + __os_gettime(dbenv, &deadline);
1259 + DB_TIMEOUT_TO_TIMESPEC(timeout, &delta);
1260 + timespecadd(&deadline, &delta);
1262 + while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
1263 + if (!ResetEvent(conn->drained))
1264 + return (GetLastError());
1266 + /* How long until the deadline? */
1267 + __os_gettime(dbenv, &now);
1268 + if (timespeccmp(&now, &deadline, >=)) {
1269 + F_SET(conn, CONN_CONGESTED);
1273 + timespecsub(&delta, &now);
1275 + DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
1276 + duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
1278 + ret = SignalObjectAndWait(db_rep->mutex,
1279 + conn->drained, duration, FALSE);
1280 + LOCK_MUTEX(db_rep->mutex);
1281 + if (ret == WAIT_FAILED)
1282 + return (GetLastError());
1283 + else if (ret == WAIT_TIMEOUT) {
1284 + F_SET(conn, CONN_CONGESTED);
1287 + DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
1289 + if (db_rep->finished)
1291 + if (F_ISSET(conn, CONN_DEFUNCT))
1292 + return (DB_REP_UNAVAIL);
1298 + * Creates a manual reset event, which is usually our best choice when we may
1299 + * have multiple threads waiting on a single event.
1302 + __repmgr_alloc_cond(c)
1307 + if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
1308 + return (GetLastError());
1314 + __repmgr_free_cond(c)
1317 + if (CloseHandle(*c))
1319 + return (GetLastError());
1323 * Make resource allocation an all-or-nothing affair, outside of this and the
1324 * close_sync function. db_rep->waiters should be non-NULL iff all of these
1328 * don't hurt anything flow-control-wise.
1330 TAILQ_FOREACH(conn, &db_rep->connections, entries) {
1331 + if (F_ISSET(conn, CONN_DEFUNCT))
1334 if (F_ISSET(conn, CONN_CONNECTING) ||
1335 !STAILQ_EMPTY(&conn->outbound_queue) ||
1336 (!flow_control || !IS_VALID_EID(conn->eid))) {
1341 next = TAILQ_NEXT(conn, entries);
1342 ! if (F_ISSET(conn, CONN_DEFUNCT))
1343 ! __repmgr_cleanup_connection(dbenv, conn);
1350 next = TAILQ_NEXT(conn, entries);
1351 ! if (F_ISSET(conn, CONN_DEFUNCT) &&
1352 ! (ret = __repmgr_cleanup_connection(dbenv,
1365 - * Only ever called on the select() thread, since we may call
1366 - * __repmgr_bust_connection(..., TRUE).
1369 handle_completion(dbenv, conn)
1379 ! err: if (ret == DB_REP_UNAVAIL)
1380 ! return (__repmgr_bust_connection(dbenv, conn, TRUE));
1389 ! if (ret == DB_REP_UNAVAIL) {
1390 ! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
1392 ! ret = __repmgr_cleanup_connection(dbenv, conn);
1401 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1402 ! __repmgr_cleanup_connection(dbenv, conn);
1403 ret = __repmgr_connect_site(dbenv, eid);
1404 DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1409 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1410 ! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
1412 ret = __repmgr_connect_site(dbenv, eid);
1413 DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);