From d66850d484cb233398440aee4cac22ad6bfb1495 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Arkadiusz=20Mi=C5=9Bkiewicz?= Date: Sun, 5 Jul 2009 17:16:36 +0000 Subject: [PATCH] - drop obsolete files Changed files: patch.4.6.21.2 -> 1.2 patch.4.6.21.3 -> 1.2 patch.4.6.21.4 -> 1.2 --- patch.4.6.21.2 | 27 - patch.4.6.21.3 | 74 --- patch.4.6.21.4 | 1414 ------------------------------------------------ 3 files changed, 1515 deletions(-) delete mode 100644 patch.4.6.21.2 delete mode 100644 patch.4.6.21.3 delete mode 100644 patch.4.6.21.4 diff --git a/patch.4.6.21.2 b/patch.4.6.21.2 deleted file mode 100644 index f529971..0000000 --- a/patch.4.6.21.2 +++ /dev/null @@ -1,27 +0,0 @@ -*** mp/mp_region.c 2007-05-18 03:18:01.000000000 +1000 ---- mp/mp_region.c 2008-06-24 13:15:56.000000000 +1000 -*************** -*** 249,256 **** - mtx_base = htab[0].mtx_hash; - } - - if (mtx_base != MUTEX_INVALID) -! mtx_base += reginfo_off * htab_buckets; - - /* Allocate hash table space and initialize it. */ - if ((ret = __env_alloc(infop, ---- 249,262 ---- - mtx_base = htab[0].mtx_hash; - } - -+ /* -+ * We preallocated all of the mutexes in a block, so for regions after -+ * the first, we skip mutexes in use in earlier regions. Each region -+ * has the same number of buckets and there are two mutexes per hash -+ * bucket (the bucket mutex and the I/O mutex). -+ */ - if (mtx_base != MUTEX_INVALID) -! mtx_base += reginfo_off * htab_buckets * 2; - - /* Allocate hash table space and initialize it. */ - if ((ret = __env_alloc(infop, diff --git a/patch.4.6.21.3 b/patch.4.6.21.3 deleted file mode 100644 index 9d316ef..0000000 --- a/patch.4.6.21.3 +++ /dev/null @@ -1,74 +0,0 @@ -*** sequence/sequence.c ---- sequence/sequence.c -*************** -*** 196,202 **** - if ((ret = __db_get_flags(dbp, &tflags)) != 0) - goto err; - -! if (DB_IS_READONLY(dbp)) { - ret = __db_rdonly(dbp->dbenv, "DB_SEQUENCE->open"); - goto err; - } ---- 196,206 ---- - if ((ret = __db_get_flags(dbp, &tflags)) != 0) - goto err; - -! /* -! * We can let replication clients open sequences, but must -! * check later that they do not update them. -! */ -! if (F_ISSET(dbp, DB_AM_RDONLY)) { - ret = __db_rdonly(dbp->dbenv, "DB_SEQUENCE->open"); - goto err; - } -*************** -*** 252,257 **** ---- 256,266 ---- - if ((ret != DB_NOTFOUND && ret != DB_KEYEMPTY) || - !LF_ISSET(DB_CREATE)) - goto err; -+ if (IS_REP_CLIENT(dbenv) && -+ !F_ISSET(dbp, DB_AM_NOT_DURABLE)) { -+ ret = __db_rdonly(dbenv, "DB_SEQUENCE->open"); -+ goto err; -+ } - ret = 0; - - rp = &seq->seq_record; -*************** -*** 304,310 **** - */ - rp = seq->seq_data.data; - if (rp->seq_version == DB_SEQUENCE_OLDVER) { -! oldver: rp->seq_version = DB_SEQUENCE_VERSION; - if (__db_isbigendian()) { - if (IS_DB_AUTO_COMMIT(dbp, txn)) { - if ((ret = ---- 313,324 ---- - */ - rp = seq->seq_data.data; - if (rp->seq_version == DB_SEQUENCE_OLDVER) { -! oldver: if (IS_REP_CLIENT(dbenv) && -! !F_ISSET(dbp, DB_AM_NOT_DURABLE)) { -! ret = __db_rdonly(dbenv, "DB_SEQUENCE->open"); -! goto err; -! } -! rp->seq_version = DB_SEQUENCE_VERSION; - if (__db_isbigendian()) { - if (IS_DB_AUTO_COMMIT(dbp, txn)) { - if ((ret = -*************** -*** 713,718 **** ---- 727,738 ---- - - MUTEX_LOCK(dbenv, seq->mtx_seq); - -+ if (handle_check && IS_REP_CLIENT(dbenv) && -+ !F_ISSET(dbp, DB_AM_NOT_DURABLE)) { -+ ret = __db_rdonly(dbenv, "DB_SEQUENCE->get"); -+ goto err; -+ } -+ - if (rp->seq_min + delta > rp->seq_max) { - __db_errx(dbenv, "Sequence overflow"); - ret = EINVAL; diff --git a/patch.4.6.21.4 b/patch.4.6.21.4 deleted file mode 100644 index 7c1f7e2..0000000 --- a/patch.4.6.21.4 +++ /dev/null @@ -1,1414 +0,0 @@ -*** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700 ---- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700 -*************** -*** 36,41 **** ---- 36,55 ---- - #endif - - /* -+ * The (arbitrary) maximum number of outgoing messages we're willing to hold, on -+ * a queue per connection, waiting for TCP buffer space to become available in -+ * the kernel. Rather than exceeding this limit, we simply discard additional -+ * messages (since this is always allowed by the replication protocol). -+ * As a special dispensation, if a message is destined for a specific remote -+ * site (i.e., it's not a broadcast), then we first try blocking the sending -+ * thread, waiting for space to become available (though we only wait a limited -+ * time). This is so as to be able to handle the immediate flood of (a -+ * potentially large number of) outgoing messages that replication generates, in -+ * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests. -+ */ -+ #define OUT_QUEUE_LIMIT 10 -+ -+ /* - * The system value is available from sysconf(_SC_HOST_NAME_MAX). - * Historically, the maximum host name was 256. - */ -*************** -*** 47,52 **** ---- 61,71 ---- - #define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20) - typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1]; - -+ /* Default timeout values, in seconds. */ -+ #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC) -+ #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC) -+ #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC) -+ - struct __repmgr_connection; - typedef struct __repmgr_connection REPMGR_CONNECTION; - struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE; -*************** -*** 171,178 **** - #ifdef DB_WIN32 - WSAEVENT event_object; - #endif -! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */ -! #define CONN_DEFUNCT 0x02 /* socket close pending */ - u_int32_t flags; - - /* ---- 190,198 ---- - #ifdef DB_WIN32 - WSAEVENT event_object; - #endif -! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */ -! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */ -! #define CONN_DEFUNCT 0x04 /* socket close pending */ - u_int32_t flags; - - /* -*************** -*** 180,189 **** - * send() function's thread. But if TCP doesn't have enough network - * buffer space for us when we first try it, we instead allocate some - * memory, and copy the message, and then send it as space becomes -! * available in our main select() thread. - */ - OUT_Q_HEADER outbound_queue; - int out_queue_length; - - /* - * Input: while we're reading a message, we keep track of what phase ---- 200,215 ---- - * send() function's thread. But if TCP doesn't have enough network - * buffer space for us when we first try it, we instead allocate some - * memory, and copy the message, and then send it as space becomes -! * available in our main select() thread. In some cases, if the queue -! * gets too long we wait until it's drained, and then append to it. -! * This condition variable's associated mutex is the normal per-repmgr -! * db_rep->mutex, because that mutex is always held anyway whenever the -! * output queue is consulted. - */ - OUT_Q_HEADER outbound_queue; - int out_queue_length; -+ cond_var_t drained; -+ int blockers; /* ref count of msg threads waiting on us */ - - /* - * Input: while we're reading a message, we keep track of what phase -*** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700 ---- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700 -*************** -*** 1420,1425 **** ---- 1420,1428 ---- - #define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@ - #define __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@ - #define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@ -+ #define __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@ -+ #define __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@ -+ #define __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@ - #define __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@ - #define __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@ - #define __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@ -*** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700 ---- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700 -*************** -*** 21,30 **** - int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); - void __repmgr_stash_generation __P((DB_ENV *)); - int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); -! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *)); - int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); -! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int)); -! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *)); - int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); - int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *)); - int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **)); ---- 21,30 ---- - int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); - void __repmgr_stash_generation __P((DB_ENV *)); - int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); -! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int)); - int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); -! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *)); -! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *)); - int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); - int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *)); - int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **)); -*************** -*** 39,44 **** ---- 39,47 ---- - int __repmgr_wake_waiting_senders __P((DB_ENV *)); - int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *)); - void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t)); -+ int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t)); -+ int __repmgr_alloc_cond __P((cond_var_t *)); -+ int __repmgr_free_cond __P((cond_var_t *)); - int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); - int __repmgr_close_sync __P((DB_ENV *)); - int __repmgr_net_init __P((DB_ENV *, DB_REP *)); -*** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700 ---- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700 -*************** -*** 196,204 **** - int ret; - - /* Set some default values. */ -! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */ -! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */ -! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */ - db_rep->config_nsites = 0; - db_rep->peer = DB_EID_INVALID; - db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; ---- 196,204 ---- - int ret; - - /* Set some default values. */ -! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT; -! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; -! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; - db_rep->config_nsites = 0; - db_rep->peer = DB_EID_INVALID; - db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; -*************** -*** 238,243 **** ---- 238,244 ---- - DB_ENV *dbenv; - { - DB_REP *db_rep; -+ REPMGR_CONNECTION *conn; - int ret; - - db_rep = dbenv->rep_handle; -*************** -*** 254,259 **** ---- 255,266 ---- - - if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) - goto unlock; -+ -+ TAILQ_FOREACH(conn, &db_rep->connections, entries) { -+ if (conn->blockers > 0 && -+ ((ret = __repmgr_signal(&conn->drained)) != 0)) -+ goto unlock; -+ } - UNLOCK_MUTEX(db_rep->mutex); - - return (__repmgr_wake_main_thread(dbenv)); -*** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700 ---- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700 -*************** -*** 183,192 **** - - /* - * Acknowledges a message. -- * -- * !!! -- * Note that this cannot be called from the select() thread, in case we call -- * __repmgr_bust_connection(..., FALSE). - */ - static int - ack_message(dbenv, generation, lsn) ---- 183,188 ---- -*************** -*** 227,235 **** - rec2.size = 0; - - conn = site->ref.conn; - if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, -! &control2, &rec2)) == DB_REP_UNAVAIL) -! ret = __repmgr_bust_connection(dbenv, conn, FALSE); - } - - UNLOCK_MUTEX(db_rep->mutex); ---- 223,236 ---- - rec2.size = 0; - - conn = site->ref.conn; -+ /* -+ * It's hard to imagine anyone would care about a lost ack if -+ * the path to the master is so congested as to need blocking; -+ * so pass "blockable" argument as FALSE. -+ */ - if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, -! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL) -! ret = __repmgr_bust_connection(dbenv, conn); - } - - UNLOCK_MUTEX(db_rep->mutex); -*** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700 ---- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700 -*************** -*** 63,69 **** - static void setup_sending_msg - __P((struct sending_msg *, u_int, const DBT *, const DBT *)); - static int __repmgr_send_internal -! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *)); - static int enqueue_msg - __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); - static int flatten __P((DB_ENV *, struct sending_msg *)); ---- 63,69 ---- - static void setup_sending_msg - __P((struct sending_msg *, u_int, const DBT *, const DBT *)); - static int __repmgr_send_internal -! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int)); - static int enqueue_msg - __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); - static int flatten __P((DB_ENV *, struct sending_msg *)); -*************** -*** 73,85 **** - * __repmgr_send -- - * The send function for DB_ENV->rep_set_transport. - * -- * !!! -- * This is only ever called as the replication transport call-back, which means -- * it's either on one of our message processing threads or an application -- * thread. It mustn't be called from the select() thread, because we might call -- * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the -- * select() thread. -- * - * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, - * PUBLIC: const DB_LSN *, int, u_int32_t)); - */ ---- 73,78 ---- -*************** -*** 126,134 **** - } - - conn = site->ref.conn; - if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, -! control, rec)) == DB_REP_UNAVAIL && -! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0) - ret = t_ret; - if (ret != 0) - goto out; ---- 119,128 ---- - } - - conn = site->ref.conn; -+ /* Pass the "blockable" argument as TRUE. */ - if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, -! control, rec, TRUE)) == DB_REP_UNAVAIL && -! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0) - ret = t_ret; - if (ret != 0) - goto out; -*************** -*** 222,228 **** - if (site->state != SITE_CONNECTED) - return (NULL); - -! if (F_ISSET(site->ref.conn, CONN_CONNECTING)) - return (NULL); - return (site); - } ---- 216,222 ---- - if (site->state != SITE_CONNECTED) - return (NULL); - -! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT)) - return (NULL); - return (site); - } -*************** -*** 235,244 **** - * - * !!! - * Caller must hold dbenv->mutex. -- * -- * !!! -- * Note that this cannot be called from the select() thread, in case we call -- * __repmgr_bust_connection(..., FALSE). - */ - static int - __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp) ---- 229,234 ---- -*************** -*** 268,281 **** - !IS_VALID_EID(conn->eid)) - continue; - -! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) { - site = SITE_FROM_EID(conn->eid); - nsites++; - if (site->priority > 0) - npeers++; - } else if (ret == DB_REP_UNAVAIL) { -! if ((ret = __repmgr_bust_connection( -! dbenv, conn, FALSE)) != 0) - return (ret); - } else - return (ret); ---- 258,277 ---- - !IS_VALID_EID(conn->eid)) - continue; - -! /* -! * Broadcast messages are either application threads committing -! * transactions, or replication status message that we can -! * afford to lose. So don't allow blocking for them (pass -! * "blockable" argument as FALSE). -! */ -! if ((ret = __repmgr_send_internal(dbenv, -! conn, &msg, FALSE)) == 0) { - site = SITE_FROM_EID(conn->eid); - nsites++; - if (site->priority > 0) - npeers++; - } else if (ret == DB_REP_UNAVAIL) { -! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) - return (ret); - } else - return (ret); -*************** -*** 301,339 **** - * intersperse writes that are part of two single messages. - * - * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, -! * PUBLIC: u_int, const DBT *, const DBT *)); - */ - int -! __repmgr_send_one(dbenv, conn, msg_type, control, rec) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - u_int msg_type; - const DBT *control, *rec; - { - struct sending_msg msg; - - setup_sending_msg(&msg, msg_type, control, rec); -! return (__repmgr_send_internal(dbenv, conn, &msg)); - } - - /* - * Attempts a "best effort" to send a message on the given site. If there is an -! * excessive backlog of message already queued on the connection, we simply drop -! * this message, and still return 0 even in this case. - */ - static int -! __repmgr_send_internal(dbenv, conn, msg) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - struct sending_msg *msg; - { -! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */ - REPMGR_IOVECS iovecs; - SITE_STRING_BUFFER buffer; - int ret; - size_t nw; - size_t total_written; - - DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); - if (!STAILQ_EMPTY(&conn->outbound_queue)) { - /* ---- 297,355 ---- - * intersperse writes that are part of two single messages. - * - * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, -! * PUBLIC: u_int, const DBT *, const DBT *, int)); - */ - int -! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - u_int msg_type; - const DBT *control, *rec; -+ int blockable; - { - struct sending_msg msg; - - setup_sending_msg(&msg, msg_type, control, rec); -! return (__repmgr_send_internal(dbenv, conn, &msg, blockable)); - } - - /* - * Attempts a "best effort" to send a message on the given site. If there is an -! * excessive backlog of message already queued on the connection, what shall we -! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of -! * time) for the queue to drain. Otherwise we'll simply drop the message. This -! * is always allowed by the replication protocol. But in the case of a -! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we -! * almost always get a flood of messages that instantly fills our queue, so -! * blocking improves performance (by avoiding the need for the client to -! * re-request). -! * -! * How long shall we wait? We could of course create a new timeout -! * configuration type, so that the application could set it directly. But that -! * would start to overwhelm the user with too many choices to think about. We -! * already have an ACK timeout, which is the user's estimate of how long it -! * should take to send a message to the client, have it be processed, and return -! * a message back to us. We multiply that by the queue size, because that's how -! * many messages have to be swallowed up by the client before we're able to -! * start sending again (at least to a rough approximation). - */ - static int -! __repmgr_send_internal(dbenv, conn, msg, blockable) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - struct sending_msg *msg; -+ int blockable; - { -! DB_REP *db_rep; - REPMGR_IOVECS iovecs; - SITE_STRING_BUFFER buffer; -+ db_timeout_t drain_to; - int ret; - size_t nw; - size_t total_written; - -+ db_rep = dbenv->rep_handle; -+ - DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); - if (!STAILQ_EMPTY(&conn->outbound_queue)) { - /* -*************** -*** 344,358 **** - RPRINT(dbenv, (dbenv, "msg to %s to be queued", - __repmgr_format_eid_loc(dbenv->rep_handle, - conn->eid, buffer))); - if (conn->out_queue_length < OUT_QUEUE_LIMIT) - return (enqueue_msg(dbenv, conn, msg, 0)); - else { - RPRINT(dbenv, (dbenv, "queue limit exceeded")); - STAT(dbenv->rep_handle-> - region->mstat.st_msgs_dropped++); -! return (0); - } - } - - /* - * Send as much data to the site as we can, without blocking. Keep ---- 360,393 ---- - RPRINT(dbenv, (dbenv, "msg to %s to be queued", - __repmgr_format_eid_loc(dbenv->rep_handle, - conn->eid, buffer))); -+ if (conn->out_queue_length >= OUT_QUEUE_LIMIT && -+ blockable && !F_ISSET(conn, CONN_CONGESTED)) { -+ RPRINT(dbenv, (dbenv, -+ "block msg thread, await queue space")); -+ -+ if ((drain_to = db_rep->ack_timeout) == 0) -+ drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT; -+ conn->blockers++; -+ ret = __repmgr_await_drain(dbenv, -+ conn, drain_to * OUT_QUEUE_LIMIT); -+ conn->blockers--; -+ if (db_rep->finished) -+ return (DB_TIMEOUT); -+ if (ret != 0) -+ return (ret); -+ if (STAILQ_EMPTY(&conn->outbound_queue)) -+ goto empty; -+ } - if (conn->out_queue_length < OUT_QUEUE_LIMIT) - return (enqueue_msg(dbenv, conn, msg, 0)); - else { - RPRINT(dbenv, (dbenv, "queue limit exceeded")); - STAT(dbenv->rep_handle-> - region->mstat.st_msgs_dropped++); -! return (blockable ? DB_TIMEOUT : 0); - } - } -+ empty: - - /* - * Send as much data to the site as we can, without blocking. Keep -*************** -*** 498,521 **** - - /* - * Abandons a connection, to recover from an error. Upon entry the conn struct -! * must be on the connections list. -! * -! * If the 'do_close' flag is true, we do the whole job; the clean-up includes -! * removing the struct from the list and freeing all its memory, so upon return -! * the caller must not refer to it any further. Otherwise, we merely mark the -! * connection for clean-up later by the main thread. - * - * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, -! * PUBLIC: REPMGR_CONNECTION *, int)); - * - * !!! - * Caller holds mutex. - */ - int -! __repmgr_bust_connection(dbenv, conn, do_close) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -- int do_close; - { - DB_REP *db_rep; - int connecting, ret, eid; ---- 533,553 ---- - - /* - * Abandons a connection, to recover from an error. Upon entry the conn struct -! * must be on the connections list. For now, just mark it as unusable; it will -! * be fully cleaned up in the top-level select thread, as soon as possible. - * - * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, -! * PUBLIC: REPMGR_CONNECTION *)); - * - * !!! - * Caller holds mutex. -+ * -+ * Must be idempotent - */ - int -! __repmgr_bust_connection(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; - { - DB_REP *db_rep; - int connecting, ret, eid; -*************** -*** 526,537 **** - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); - eid = conn->eid; - connecting = F_ISSET(conn, CONN_CONNECTING); -! if (do_close) -! __repmgr_cleanup_connection(dbenv, conn); -! else { -! F_SET(conn, CONN_DEFUNCT); -! conn->eid = -1; -! } - - /* - * When we first accepted the incoming connection, we set conn->eid to ---- 558,566 ---- - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); - eid = conn->eid; - connecting = F_ISSET(conn, CONN_CONNECTING); -! -! F_SET(conn, CONN_DEFUNCT); -! conn->eid = -1; - - /* - * When we first accepted the incoming connection, we set conn->eid to -*************** -*** 557,563 **** - dbenv, ELECT_FAILURE_ELECTION)) != 0) - return (ret); - } -! } else if (!do_close) { - /* - * One way or another, make sure the main thread is poked, so - * that we do the deferred clean-up. ---- 586,592 ---- - dbenv, ELECT_FAILURE_ELECTION)) != 0) - return (ret); - } -! } else { - /* - * One way or another, make sure the main thread is poked, so - * that we do the deferred clean-up. -*************** -*** 568,577 **** - } - - /* -! * PUBLIC: void __repmgr_cleanup_connection - * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); - */ -! void - __repmgr_cleanup_connection(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; ---- 597,610 ---- - } - - /* -! * PUBLIC: int __repmgr_cleanup_connection - * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); -+ * -+ * !!! -+ * Idempotent. This can be called repeatedly as blocking message threads (of -+ * which there could be multiples) wake up in case of error on the connection. - */ -! int - __repmgr_cleanup_connection(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -*************** -*** 580,596 **** - QUEUED_OUTPUT *out; - REPMGR_FLAT *msg; - DBT *dbt; - - db_rep = dbenv->rep_handle; - -! TAILQ_REMOVE(&db_rep->connections, conn, entries); - if (conn->fd != INVALID_SOCKET) { -! (void)closesocket(conn->fd); - #ifdef DB_WIN32 -! (void)WSACloseEvent(conn->event_object); - #endif - } - - /* - * Deallocate any input and output buffers we may have. - */ ---- 613,643 ---- - QUEUED_OUTPUT *out; - REPMGR_FLAT *msg; - DBT *dbt; -+ int ret; - - db_rep = dbenv->rep_handle; - -! DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished); -! - if (conn->fd != INVALID_SOCKET) { -! ret = closesocket(conn->fd); -! conn->fd = INVALID_SOCKET; -! if (ret == SOCKET_ERROR) { -! ret = net_errno; -! __db_err(dbenv, ret, "closing socket"); -! } - #ifdef DB_WIN32 -! if (!WSACloseEvent(conn->event_object) && ret != 0) -! ret = net_errno; - #endif -+ if (ret != 0) -+ return (ret); - } - -+ if (conn->blockers > 0) -+ return (__repmgr_signal(&conn->drained)); -+ -+ TAILQ_REMOVE(&db_rep->connections, conn, entries); - /* - * Deallocate any input and output buffers we may have. - */ -*************** -*** 614,620 **** ---- 661,669 ---- - __os_free(dbenv, out); - } - -+ ret = __repmgr_free_cond(&conn->drained); - __os_free(dbenv, conn); -+ return (ret); - } - - static int -*************** -*** 1063,1069 **** - - while (!TAILQ_EMPTY(&db_rep->connections)) { - conn = TAILQ_FIRST(&db_rep->connections); -! __repmgr_cleanup_connection(dbenv, conn); - } - - for (i = 0; i < db_rep->site_cnt; i++) { ---- 1112,1118 ---- - - while (!TAILQ_EMPTY(&db_rep->connections)) { - conn = TAILQ_FIRST(&db_rep->connections); -! (void)__repmgr_cleanup_connection(dbenv, conn); - } - - for (i = 0; i < db_rep->site_cnt; i++) { -*** repmgr/repmgr_posix.c 2007-10-31 10:23:52.000000000 -0700 ---- repmgr/repmgr_posix.c 2007-10-31 10:23:53.000000000 -0700 -*************** -*** 21,26 **** ---- 21,28 ---- - size_t __repmgr_guesstimated_max = (128 * 1024); - #endif - -+ static int __repmgr_conn_work __P((DB_ENV *, -+ REPMGR_CONNECTION *, fd_set *, fd_set *, int)); - static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *)); - - /* -*************** -*** 189,194 **** ---- 191,284 ---- - } - - /* -+ * PUBLIC: int __repmgr_await_drain __P((DB_ENV *, -+ * PUBLIC: REPMGR_CONNECTION *, db_timeout_t)); -+ * -+ * Waits for space to become available on the connection's output queue. -+ * Various ways we can exit: -+ * -+ * 1. queue becomes non-full -+ * 2. exceed time limit -+ * 3. connection becomes defunct (due to error in another thread) -+ * 4. repmgr is shutting down -+ * 5. any unexpected system resource failure -+ * -+ * In cases #3 and #5 we return an error code. Caller is responsible for -+ * distinguishing the remaining cases if desired. -+ * -+ * !!! -+ * Caller must hold repmgr->mutex. -+ */ -+ int -+ __repmgr_await_drain(dbenv, conn, timeout) -+ DB_ENV *dbenv; -+ REPMGR_CONNECTION *conn; -+ db_timeout_t timeout; -+ { -+ DB_REP *db_rep; -+ struct timespec deadline; -+ int ret; -+ -+ db_rep = dbenv->rep_handle; -+ -+ __repmgr_compute_wait_deadline(dbenv, &deadline, timeout); -+ -+ ret = 0; -+ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { -+ ret = pthread_cond_timedwait(&conn->drained, -+ &db_rep->mutex, &deadline); -+ switch (ret) { -+ case 0: -+ if (db_rep->finished) -+ goto out; /* #4. */ -+ /* -+ * Another thread could have stumbled into an error on -+ * the socket while we were waiting. -+ */ -+ if (F_ISSET(conn, CONN_DEFUNCT)) { -+ ret = DB_REP_UNAVAIL; /* #3. */ -+ goto out; -+ } -+ break; -+ case ETIMEDOUT: -+ F_SET(conn, CONN_CONGESTED); -+ ret = 0; -+ goto out; /* #2. */ -+ default: -+ goto out; /* #5. */ -+ } -+ } -+ /* #1. */ -+ -+ out: -+ return (ret); -+ } -+ -+ /* -+ * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *)); -+ * -+ * Initialize a condition variable (in allocated space). -+ */ -+ int -+ __repmgr_alloc_cond(c) -+ cond_var_t *c; -+ { -+ return (pthread_cond_init(c, NULL)); -+ } -+ -+ /* -+ * PUBLIC: int __repmgr_free_cond __P((cond_var_t *)); -+ * -+ * Clean up a previously initialized condition variable. -+ */ -+ int -+ __repmgr_free_cond(c) -+ cond_var_t *c; -+ { -+ return (pthread_cond_destroy(c)); -+ } -+ -+ /* - * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); - * - * Allocate/initialize all data necessary for thread synchronization. This -*************** -*** 443,449 **** - REPMGR_RETRY *retry; - db_timespec timeout; - fd_set reads, writes; -! int ret, flow_control, maxfd, nready; - u_int8_t buf[10]; /* arbitrary size */ - - flow_control = FALSE; ---- 533,539 ---- - REPMGR_RETRY *retry; - db_timespec timeout; - fd_set reads, writes; -! int ret, flow_control, maxfd; - u_int8_t buf[10]; /* arbitrary size */ - - flow_control = FALSE; -*************** -*** 477,482 **** ---- 567,575 ---- - * each one. - */ - TAILQ_FOREACH(conn, &db_rep->connections, entries) { -+ if (F_ISSET(conn, CONN_DEFUNCT)) -+ continue; -+ - if (F_ISSET(conn, CONN_CONNECTING)) { - FD_SET((u_int)conn->fd, &reads); - FD_SET((u_int)conn->fd, &writes); -*************** -*** 533,616 **** - return (ret); - } - } -- nready = ret; -- - LOCK_MUTEX(db_rep->mutex); - -- /* -- * The first priority thing we must do is to clean up any -- * pending defunct connections. Otherwise, if they have any -- * lingering pending input, we get very confused if we try to -- * process it. -- * -- * The TAILQ_FOREACH macro would be suitable here, except that -- * it doesn't allow unlinking the current element, which is -- * needed for cleanup_connection. -- */ -- for (conn = TAILQ_FIRST(&db_rep->connections); -- conn != NULL; -- conn = next) { -- next = TAILQ_NEXT(conn, entries); -- if (F_ISSET(conn, CONN_DEFUNCT)) -- __repmgr_cleanup_connection(dbenv, conn); -- } -- - if ((ret = __repmgr_retry_connections(dbenv)) != 0) - goto out; -- if (nready == 0) -- continue; - - /* -! * Traverse the linked list. (Again, like TAILQ_FOREACH, except -! * that we need the ability to unlink an element along the way.) - */ - for (conn = TAILQ_FIRST(&db_rep->connections); - conn != NULL; - conn = next) { - next = TAILQ_NEXT(conn, entries); -! if (F_ISSET(conn, CONN_CONNECTING)) { -! if (FD_ISSET((u_int)conn->fd, &reads) || -! FD_ISSET((u_int)conn->fd, &writes)) { -! if ((ret = finish_connecting(dbenv, -! conn)) == DB_REP_UNAVAIL) { -! if ((ret = -! __repmgr_bust_connection( -! dbenv, conn, TRUE)) != 0) -! goto out; -! } else if (ret != 0) -! goto out; -! } -! continue; -! } -! -! /* -! * Here, the site is connected, and the FD_SET's are -! * valid. -! */ -! if (FD_ISSET((u_int)conn->fd, &writes)) { -! if ((ret = __repmgr_write_some( -! dbenv, conn)) == DB_REP_UNAVAIL) { -! if ((ret = -! __repmgr_bust_connection(dbenv, -! conn, TRUE)) != 0) -! goto out; -! continue; -! } else if (ret != 0) -! goto out; -! } -! -! if (!flow_control && -! FD_ISSET((u_int)conn->fd, &reads)) { -! if ((ret = __repmgr_read_from_site(dbenv, conn)) -! == DB_REP_UNAVAIL) { -! if ((ret = -! __repmgr_bust_connection(dbenv, -! conn, TRUE)) != 0) -! goto out; -! continue; -! } else if (ret != 0) -! goto out; -! } - } - - /* ---- 626,650 ---- - return (ret); - } - } - LOCK_MUTEX(db_rep->mutex); - - if ((ret = __repmgr_retry_connections(dbenv)) != 0) - goto out; - - /* -! * Examine each connection, to see what work needs to be done. -! * -! * The TAILQ_FOREACH macro would be suitable here, except that -! * it doesn't allow unlinking the current element, which is -! * needed for cleanup_connection. - */ - for (conn = TAILQ_FIRST(&db_rep->connections); - conn != NULL; - conn = next) { - next = TAILQ_NEXT(conn, entries); -! if ((ret = __repmgr_conn_work(dbenv, -! conn, &reads, &writes, flow_control)) != 0) -! goto out; - } - - /* -*************** -*** 637,642 **** ---- 671,719 ---- - } - - static int -+ __repmgr_conn_work(dbenv, conn, reads, writes, flow_control) -+ DB_ENV *dbenv; -+ REPMGR_CONNECTION *conn; -+ fd_set *reads, *writes; -+ int flow_control; -+ { -+ int ret; -+ u_int fd; -+ -+ if (F_ISSET(conn, CONN_DEFUNCT)) { -+ /* -+ * Deferred clean-up, from an error that happened in another -+ * thread, while we were sleeping in select(). -+ */ -+ return (__repmgr_cleanup_connection(dbenv, conn)); -+ } -+ -+ ret = 0; -+ fd = (u_int)conn->fd; -+ -+ if (F_ISSET(conn, CONN_CONNECTING)) { -+ if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes)) -+ ret = finish_connecting(dbenv, conn); -+ } else { -+ /* -+ * Here, the site is connected, and the FD_SET's are valid. -+ */ -+ if (FD_ISSET(fd, writes)) -+ ret = __repmgr_write_some(dbenv, conn); -+ -+ if (ret == 0 && !flow_control && FD_ISSET(fd, reads)) -+ ret = __repmgr_read_from_site(dbenv, conn); -+ } -+ -+ if (ret == DB_REP_UNAVAIL) { -+ if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) -+ return (ret); -+ ret = __repmgr_cleanup_connection(dbenv, conn); -+ } -+ return (ret); -+ } -+ -+ static int - finish_connecting(dbenv, conn) - DB_ENV *dbenv; - REPMGR_CONNECTION *conn; -*************** -*** 657,662 **** ---- 734,740 ---- - goto err_rpt; - } - -+ DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING)); - F_CLR(conn, CONN_CONNECTING); - return (__repmgr_send_handshake(dbenv, conn)); - -*************** -*** 671,690 **** - "connecting to %s", __repmgr_format_site_loc(site, buffer)); - - /* If we've exhausted the list of possible addresses, give up. */ -! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) - return (DB_REP_UNAVAIL); - - /* - * This is just like a little mini-"bust_connection", except that we - * don't reschedule for later, 'cuz we're just about to try again right -! * now. - * - * !!! - * Which means this must only be called on the select() thread, since - * only there are we allowed to actually close a connection. - */ - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); -! __repmgr_cleanup_connection(dbenv, conn); - ret = __repmgr_connect_site(dbenv, eid); - DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); - return (ret); ---- 749,773 ---- - "connecting to %s", __repmgr_format_site_loc(site, buffer)); - - /* If we've exhausted the list of possible addresses, give up. */ -! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) { -! STAT(db_rep->region->mstat.st_connect_fail++); - return (DB_REP_UNAVAIL); -+ } - - /* - * This is just like a little mini-"bust_connection", except that we - * don't reschedule for later, 'cuz we're just about to try again right -! * now. (Note that we don't have to worry about message threads -! * blocking on a full output queue: that can't happen when we're only -! * just connecting.) - * - * !!! - * Which means this must only be called on the select() thread, since - * only there are we allowed to actually close a connection. - */ - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); -! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0) -! return (ret); - ret = __repmgr_connect_site(dbenv, eid); - DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); - return (ret); -*** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700 ---- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700 -*************** -*** 36,45 **** - - /* - * PUBLIC: int __repmgr_accept __P((DB_ENV *)); -- * -- * !!! -- * Only ever called in the select() thread, since we may call -- * __repmgr_bust_connection(..., TRUE). - */ - int - __repmgr_accept(dbenv) ---- 36,41 ---- -*************** -*** 133,139 **** - case 0: - return (0); - case DB_REP_UNAVAIL: -! return (__repmgr_bust_connection(dbenv, conn, TRUE)); - default: - return (ret); - } ---- 129,135 ---- - case 0: - return (0); - case DB_REP_UNAVAIL: -! return (__repmgr_bust_connection(dbenv, conn)); - default: - return (ret); - } -*************** -*** 254,263 **** - * starting with the "current" element of its address list and trying as many - * addresses as necessary until the list is exhausted. - * -- * !!! -- * Only ever called in the select() thread, since we may call -- * __repmgr_bust_connection(..., TRUE). -- * - * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid)); - */ - int ---- 250,255 ---- -*************** -*** 332,338 **** - case 0: - break; - case DB_REP_UNAVAIL: -! return (__repmgr_bust_connection(dbenv, con, TRUE)); - default: - return (ret); - } ---- 324,330 ---- - case 0: - break; - case DB_REP_UNAVAIL: -! return (__repmgr_bust_connection(dbenv, con)); - default: - return (ret); - } -*************** -*** 437,443 **** - - DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); - -! return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec)); - } - - /* ---- 429,443 ---- - - DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); - -! /* -! * It would of course be disastrous to block the select() thread, so -! * pass the "blockable" argument as FALSE. Fortunately blocking should -! * never be necessary here, because the hand-shake is always the first -! * thing we send. Which is a good thing, because it would be almost as -! * disastrous if we allowed ourselves to drop a handshake. -! */ -! return (__repmgr_send_one(dbenv, -! conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE)); - } - - /* -*************** -*** 854,859 **** ---- 854,872 ---- - conn->out_queue_length--; - if (--msg->ref_count <= 0) - __os_free(dbenv, msg); -+ -+ /* -+ * We've achieved enough movement to free up at least -+ * one space in the outgoing queue. Wake any message -+ * threads that may be waiting for space. Clear the -+ * CONGESTED status so that when the queue reaches the -+ * high-water mark again, the filling thread will be -+ * allowed to try waiting again. -+ */ -+ F_CLR(conn, CONN_CONGESTED); -+ if (conn->blockers > 0 && -+ (ret = __repmgr_signal(&conn->drained)) != 0) -+ return (ret); - } - } - -*** repmgr/repmgr_util.c 2007-10-31 10:23:52.000000000 -0700 ---- repmgr/repmgr_util.c 2007-10-31 10:23:53.000000000 -0700 -*************** -*** 103,108 **** ---- 103,113 ---- - db_rep = dbenv->rep_handle; - if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0) - return (ret); -+ if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) { -+ __os_free(dbenv, c); -+ return (ret); -+ } -+ c->blockers = 0; - - c->fd = s; - c->flags = flags; -*** repmgr/repmgr_windows.c 2007-10-31 10:23:52.000000000 -0700 ---- repmgr/repmgr_windows.c 2007-10-31 10:23:53.000000000 -0700 -*************** -*** 11,16 **** ---- 11,19 ---- - #define __INCLUDE_NETWORKING 1 - #include "db_int.h" - -+ /* Convert time-out from microseconds to milliseconds, rounding up. */ -+ #define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS) -+ - typedef struct __ack_waiter { - HANDLE event; - const DB_LSN *lsnp; -*************** -*** 120,136 **** - { - DB_REP *db_rep; - ACK_WAITER *me; -! DWORD ret; -! DWORD timeout; - - db_rep = dbenv->rep_handle; - - if ((ret = allocate_wait_slot(dbenv, &me)) != 0) - goto err; - -- /* convert time-out from microseconds to milliseconds, rounding up */ - timeout = db_rep->ack_timeout > 0 ? -! ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE; - me->lsnp = lsnp; - if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, - FALSE)) == WAIT_FAILED) { ---- 123,137 ---- - { - DB_REP *db_rep; - ACK_WAITER *me; -! DWORD ret, timeout; - - db_rep = dbenv->rep_handle; - - if ((ret = allocate_wait_slot(dbenv, &me)) != 0) - goto err; - - timeout = db_rep->ack_timeout > 0 ? -! DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE; - me->lsnp = lsnp; - if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, - FALSE)) == WAIT_FAILED) { -*************** -*** 211,216 **** ---- 212,296 ---- - db_rep->waiters->first_free = slot; - } - -+ /* (See requirements described in repmgr_posix.c.) */ -+ int -+ __repmgr_await_drain(dbenv, conn, timeout) -+ DB_ENV *dbenv; -+ REPMGR_CONNECTION *conn; -+ db_timeout_t timeout; -+ { -+ DB_REP *db_rep; -+ db_timespec deadline, delta, now; -+ db_timeout_t t; -+ DWORD duration, ret; -+ int round_up; -+ -+ db_rep = dbenv->rep_handle; -+ -+ __os_gettime(dbenv, &deadline); -+ DB_TIMEOUT_TO_TIMESPEC(timeout, &delta); -+ timespecadd(&deadline, &delta); -+ -+ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { -+ if (!ResetEvent(conn->drained)) -+ return (GetLastError()); -+ -+ /* How long until the deadline? */ -+ __os_gettime(dbenv, &now); -+ if (timespeccmp(&now, &deadline, >=)) { -+ F_SET(conn, CONN_CONGESTED); -+ return (0); -+ } -+ delta = deadline; -+ timespecsub(&delta, &now); -+ round_up = TRUE; -+ DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up); -+ duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t); -+ -+ ret = SignalObjectAndWait(db_rep->mutex, -+ conn->drained, duration, FALSE); -+ LOCK_MUTEX(db_rep->mutex); -+ if (ret == WAIT_FAILED) -+ return (GetLastError()); -+ else if (ret == WAIT_TIMEOUT) { -+ F_SET(conn, CONN_CONGESTED); -+ return (0); -+ } else -+ DB_ASSERT(dbenv, ret == WAIT_OBJECT_0); -+ -+ if (db_rep->finished) -+ return (0); -+ if (F_ISSET(conn, CONN_DEFUNCT)) -+ return (DB_REP_UNAVAIL); -+ } -+ return (0); -+ } -+ -+ /* -+ * Creates a manual reset event, which is usually our best choice when we may -+ * have multiple threads waiting on a single event. -+ */ -+ int -+ __repmgr_alloc_cond(c) -+ cond_var_t *c; -+ { -+ HANDLE event; -+ -+ if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) -+ return (GetLastError()); -+ *c = event; -+ return (0); -+ } -+ -+ int -+ __repmgr_free_cond(c) -+ cond_var_t *c; -+ { -+ if (CloseHandle(*c)) -+ return (0); -+ return (GetLastError()); -+ } -+ - /* - * Make resource allocation an all-or-nothing affair, outside of this and the - * close_sync function. db_rep->waiters should be non-NULL iff all of these -*************** -*** 488,493 **** ---- 568,576 ---- - * don't hurt anything flow-control-wise. - */ - TAILQ_FOREACH(conn, &db_rep->connections, entries) { -+ if (F_ISSET(conn, CONN_DEFUNCT)) -+ continue; -+ - if (F_ISSET(conn, CONN_CONNECTING) || - !STAILQ_EMPTY(&conn->outbound_queue) || - (!flow_control || !IS_VALID_EID(conn->eid))) { -*************** -*** 534,541 **** - conn != NULL; - conn = next) { - next = TAILQ_NEXT(conn, entries); -! if (F_ISSET(conn, CONN_DEFUNCT)) -! __repmgr_cleanup_connection(dbenv, conn); - } - - /* ---- 617,626 ---- - conn != NULL; - conn = next) { - next = TAILQ_NEXT(conn, entries); -! if (F_ISSET(conn, CONN_DEFUNCT) && -! (ret = __repmgr_cleanup_connection(dbenv, -! conn)) != 0) -! goto unlock; - } - - /* -*************** -*** 587,597 **** - return (ret); - } - -- /* -- * !!! -- * Only ever called on the select() thread, since we may call -- * __repmgr_bust_connection(..., TRUE). -- */ - static int - handle_completion(dbenv, conn) - DB_ENV *dbenv; ---- 672,677 ---- -*************** -*** 651,660 **** - } - } - -! return (0); -! -! err: if (ret == DB_REP_UNAVAIL) -! return (__repmgr_bust_connection(dbenv, conn, TRUE)); - return (ret); - } - ---- 731,742 ---- - } - } - -! err: -! if (ret == DB_REP_UNAVAIL) { -! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) -! return (ret); -! ret = __repmgr_cleanup_connection(dbenv, conn); -! } - return (ret); - } - -*************** -*** 708,714 **** - } - - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); -! __repmgr_cleanup_connection(dbenv, conn); - ret = __repmgr_connect_site(dbenv, eid); - DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); - return (ret); ---- 790,797 ---- - } - - DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); -! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0) -! return (ret); - ret = __repmgr_connect_site(dbenv, eid); - DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); - return (ret); -- 2.44.0