]> git.pld-linux.org Git - packages/db4.6.git/blame - patch.4.6.21.4
- do not define epoch 0
[packages/db4.6.git] / patch.4.6.21.4
CommitLineData
731bfa0a
ER
1*** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700
2--- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700
3***************
4*** 36,41 ****
5--- 36,55 ----
6 #endif
7
8 /*
9+ * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
10+ * a queue per connection, waiting for TCP buffer space to become available in
11+ * the kernel. Rather than exceeding this limit, we simply discard additional
12+ * messages (since this is always allowed by the replication protocol).
13+ * As a special dispensation, if a message is destined for a specific remote
14+ * site (i.e., it's not a broadcast), then we first try blocking the sending
15+ * thread, waiting for space to become available (though we only wait a limited
16+ * time). This is so as to be able to handle the immediate flood of (a
17+ * potentially large number of) outgoing messages that replication generates, in
18+ * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
19+ */
20+ #define OUT_QUEUE_LIMIT 10
21+
22+ /*
23 * The system value is available from sysconf(_SC_HOST_NAME_MAX).
24 * Historically, the maximum host name was 256.
25 */
26***************
27*** 47,52 ****
28--- 61,71 ----
29 #define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
30 typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
31
32+ /* Default timeout values, in seconds. */
33+ #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC)
34+ #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC)
35+ #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC)
36+
37 struct __repmgr_connection;
38 typedef struct __repmgr_connection REPMGR_CONNECTION;
39 struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
40***************
41*** 171,178 ****
42 #ifdef DB_WIN32
43 WSAEVENT event_object;
44 #endif
45! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */
46! #define CONN_DEFUNCT 0x02 /* socket close pending */
47 u_int32_t flags;
48
49 /*
50--- 190,198 ----
51 #ifdef DB_WIN32
52 WSAEVENT event_object;
53 #endif
54! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */
55! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */
56! #define CONN_DEFUNCT 0x04 /* socket close pending */
57 u_int32_t flags;
58
59 /*
60***************
61*** 180,189 ****
62 * send() function's thread. But if TCP doesn't have enough network
63 * buffer space for us when we first try it, we instead allocate some
64 * memory, and copy the message, and then send it as space becomes
65! * available in our main select() thread.
66 */
67 OUT_Q_HEADER outbound_queue;
68 int out_queue_length;
69
70 /*
71 * Input: while we're reading a message, we keep track of what phase
72--- 200,215 ----
73 * send() function's thread. But if TCP doesn't have enough network
74 * buffer space for us when we first try it, we instead allocate some
75 * memory, and copy the message, and then send it as space becomes
76! * available in our main select() thread. In some cases, if the queue
77! * gets too long we wait until it's drained, and then append to it.
78! * This condition variable's associated mutex is the normal per-repmgr
79! * db_rep->mutex, because that mutex is always held anyway whenever the
80! * output queue is consulted.
81 */
82 OUT_Q_HEADER outbound_queue;
83 int out_queue_length;
84+ cond_var_t drained;
85+ int blockers; /* ref count of msg threads waiting on us */
86
87 /*
88 * Input: while we're reading a message, we keep track of what phase
89*** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
90--- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700
91***************
92*** 1420,1425 ****
93--- 1420,1428 ----
94 #define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@
95 #define __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@
96 #define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@
97+ #define __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@
98+ #define __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@
99+ #define __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@
100 #define __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@
101 #define __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@
102 #define __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@
103*** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
104--- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700
105***************
106*** 21,30 ****
107 int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
108 void __repmgr_stash_generation __P((DB_ENV *));
109 int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
110! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
111 int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
112! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
113! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
114 int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
115 int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
116 int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
117--- 21,30 ----
118 int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
119 void __repmgr_stash_generation __P((DB_ENV *));
120 int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
121! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
122 int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
123! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
124! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
125 int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
126 int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
127 int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
128***************
129*** 39,44 ****
130--- 39,47 ----
131 int __repmgr_wake_waiting_senders __P((DB_ENV *));
132 int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
133 void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
134+ int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
135+ int __repmgr_alloc_cond __P((cond_var_t *));
136+ int __repmgr_free_cond __P((cond_var_t *));
137 int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
138 int __repmgr_close_sync __P((DB_ENV *));
139 int __repmgr_net_init __P((DB_ENV *, DB_REP *));
140*** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700
141--- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700
142***************
143*** 196,204 ****
144 int ret;
145
146 /* Set some default values. */
147! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */
148! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */
149! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */
150 db_rep->config_nsites = 0;
151 db_rep->peer = DB_EID_INVALID;
152 db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
153--- 196,204 ----
154 int ret;
155
156 /* Set some default values. */
157! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
158! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
159! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
160 db_rep->config_nsites = 0;
161 db_rep->peer = DB_EID_INVALID;
162 db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
163***************
164*** 238,243 ****
165--- 238,244 ----
166 DB_ENV *dbenv;
167 {
168 DB_REP *db_rep;
169+ REPMGR_CONNECTION *conn;
170 int ret;
171
172 db_rep = dbenv->rep_handle;
173***************
174*** 254,259 ****
175--- 255,266 ----
176
177 if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
178 goto unlock;
179+
180+ TAILQ_FOREACH(conn, &db_rep->connections, entries) {
181+ if (conn->blockers > 0 &&
182+ ((ret = __repmgr_signal(&conn->drained)) != 0))
183+ goto unlock;
184+ }
185 UNLOCK_MUTEX(db_rep->mutex);
186
187 return (__repmgr_wake_main_thread(dbenv));
188*** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700
189--- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700
190***************
191*** 183,192 ****
192
193 /*
194 * Acknowledges a message.
195- *
196- * !!!
197- * Note that this cannot be called from the select() thread, in case we call
198- * __repmgr_bust_connection(..., FALSE).
199 */
200 static int
201 ack_message(dbenv, generation, lsn)
202--- 183,188 ----
203***************
204*** 227,235 ****
205 rec2.size = 0;
206
207 conn = site->ref.conn;
208 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
209! &control2, &rec2)) == DB_REP_UNAVAIL)
210! ret = __repmgr_bust_connection(dbenv, conn, FALSE);
211 }
212
213 UNLOCK_MUTEX(db_rep->mutex);
214--- 223,236 ----
215 rec2.size = 0;
216
217 conn = site->ref.conn;
218+ /*
219+ * It's hard to imagine anyone would care about a lost ack if
220+ * the path to the master is so congested as to need blocking;
221+ * so pass "blockable" argument as FALSE.
222+ */
223 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
224! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
225! ret = __repmgr_bust_connection(dbenv, conn);
226 }
227
228 UNLOCK_MUTEX(db_rep->mutex);
229*** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700
230--- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700
231***************
232*** 63,69 ****
233 static void setup_sending_msg
234 __P((struct sending_msg *, u_int, const DBT *, const DBT *));
235 static int __repmgr_send_internal
236! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
237 static int enqueue_msg
238 __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
239 static int flatten __P((DB_ENV *, struct sending_msg *));
240--- 63,69 ----
241 static void setup_sending_msg
242 __P((struct sending_msg *, u_int, const DBT *, const DBT *));
243 static int __repmgr_send_internal
244! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
245 static int enqueue_msg
246 __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
247 static int flatten __P((DB_ENV *, struct sending_msg *));
248***************
249*** 73,85 ****
250 * __repmgr_send --
251 * The send function for DB_ENV->rep_set_transport.
252 *
253- * !!!
254- * This is only ever called as the replication transport call-back, which means
255- * it's either on one of our message processing threads or an application
256- * thread. It mustn't be called from the select() thread, because we might call
257- * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
258- * select() thread.
259- *
260 * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
261 * PUBLIC: const DB_LSN *, int, u_int32_t));
262 */
263--- 73,78 ----
264***************
265*** 126,134 ****
266 }
267
268 conn = site->ref.conn;
269 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
270! control, rec)) == DB_REP_UNAVAIL &&
271! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
272 ret = t_ret;
273 if (ret != 0)
274 goto out;
275--- 119,128 ----
276 }
277
278 conn = site->ref.conn;
279+ /* Pass the "blockable" argument as TRUE. */
280 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
281! control, rec, TRUE)) == DB_REP_UNAVAIL &&
282! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
283 ret = t_ret;
284 if (ret != 0)
285 goto out;
286***************
287*** 222,228 ****
288 if (site->state != SITE_CONNECTED)
289 return (NULL);
290
291! if (F_ISSET(site->ref.conn, CONN_CONNECTING))
292 return (NULL);
293 return (site);
294 }
295--- 216,222 ----
296 if (site->state != SITE_CONNECTED)
297 return (NULL);
298
299! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
300 return (NULL);
301 return (site);
302 }
303***************
304*** 235,244 ****
305 *
306 * !!!
307 * Caller must hold dbenv->mutex.
308- *
309- * !!!
310- * Note that this cannot be called from the select() thread, in case we call
311- * __repmgr_bust_connection(..., FALSE).
312 */
313 static int
314 __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
315--- 229,234 ----
316***************
317*** 268,281 ****
318 !IS_VALID_EID(conn->eid))
319 continue;
320
321! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
322 site = SITE_FROM_EID(conn->eid);
323 nsites++;
324 if (site->priority > 0)
325 npeers++;
326 } else if (ret == DB_REP_UNAVAIL) {
327! if ((ret = __repmgr_bust_connection(
328! dbenv, conn, FALSE)) != 0)
329 return (ret);
330 } else
331 return (ret);
332--- 258,277 ----
333 !IS_VALID_EID(conn->eid))
334 continue;
335
336! /*
337! * Broadcast messages are either application threads committing
338! * transactions, or replication status message that we can
339! * afford to lose. So don't allow blocking for them (pass
340! * "blockable" argument as FALSE).
341! */
342! if ((ret = __repmgr_send_internal(dbenv,
343! conn, &msg, FALSE)) == 0) {
344 site = SITE_FROM_EID(conn->eid);
345 nsites++;
346 if (site->priority > 0)
347 npeers++;
348 } else if (ret == DB_REP_UNAVAIL) {
349! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
350 return (ret);
351 } else
352 return (ret);
353***************
354*** 301,339 ****
355 * intersperse writes that are part of two single messages.
356 *
357 * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
358! * PUBLIC: u_int, const DBT *, const DBT *));
359 */
360 int
361! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
362 DB_ENV *dbenv;
363 REPMGR_CONNECTION *conn;
364 u_int msg_type;
365 const DBT *control, *rec;
366 {
367 struct sending_msg msg;
368
369 setup_sending_msg(&msg, msg_type, control, rec);
370! return (__repmgr_send_internal(dbenv, conn, &msg));
371 }
372
373 /*
374 * Attempts a "best effort" to send a message on the given site. If there is an
375! * excessive backlog of message already queued on the connection, we simply drop
376! * this message, and still return 0 even in this case.
377 */
378 static int
379! __repmgr_send_internal(dbenv, conn, msg)
380 DB_ENV *dbenv;
381 REPMGR_CONNECTION *conn;
382 struct sending_msg *msg;
383 {
384! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */
385 REPMGR_IOVECS iovecs;
386 SITE_STRING_BUFFER buffer;
387 int ret;
388 size_t nw;
389 size_t total_written;
390
391 DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
392 if (!STAILQ_EMPTY(&conn->outbound_queue)) {
393 /*
394--- 297,355 ----
395 * intersperse writes that are part of two single messages.
396 *
397 * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
398! * PUBLIC: u_int, const DBT *, const DBT *, int));
399 */
400 int
401! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
402 DB_ENV *dbenv;
403 REPMGR_CONNECTION *conn;
404 u_int msg_type;
405 const DBT *control, *rec;
406+ int blockable;
407 {
408 struct sending_msg msg;
409
410 setup_sending_msg(&msg, msg_type, control, rec);
411! return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
412 }
413
414 /*
415 * Attempts a "best effort" to send a message on the given site. If there is an
416! * excessive backlog of message already queued on the connection, what shall we
417! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of
418! * time) for the queue to drain. Otherwise we'll simply drop the message. This
419! * is always allowed by the replication protocol. But in the case of a
420! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
421! * almost always get a flood of messages that instantly fills our queue, so
422! * blocking improves performance (by avoiding the need for the client to
423! * re-request).
424! *
425! * How long shall we wait? We could of course create a new timeout
426! * configuration type, so that the application could set it directly. But that
427! * would start to overwhelm the user with too many choices to think about. We
428! * already have an ACK timeout, which is the user's estimate of how long it
429! * should take to send a message to the client, have it be processed, and return
430! * a message back to us. We multiply that by the queue size, because that's how
431! * many messages have to be swallowed up by the client before we're able to
432! * start sending again (at least to a rough approximation).
433 */
434 static int
435! __repmgr_send_internal(dbenv, conn, msg, blockable)
436 DB_ENV *dbenv;
437 REPMGR_CONNECTION *conn;
438 struct sending_msg *msg;
439+ int blockable;
440 {
441! DB_REP *db_rep;
442 REPMGR_IOVECS iovecs;
443 SITE_STRING_BUFFER buffer;
444+ db_timeout_t drain_to;
445 int ret;
446 size_t nw;
447 size_t total_written;
448
449+ db_rep = dbenv->rep_handle;
450+
451 DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
452 if (!STAILQ_EMPTY(&conn->outbound_queue)) {
453 /*
454***************
455*** 344,358 ****
456 RPRINT(dbenv, (dbenv, "msg to %s to be queued",
457 __repmgr_format_eid_loc(dbenv->rep_handle,
458 conn->eid, buffer)));
459 if (conn->out_queue_length < OUT_QUEUE_LIMIT)
460 return (enqueue_msg(dbenv, conn, msg, 0));
461 else {
462 RPRINT(dbenv, (dbenv, "queue limit exceeded"));
463 STAT(dbenv->rep_handle->
464 region->mstat.st_msgs_dropped++);
465! return (0);
466 }
467 }
468
469 /*
470 * Send as much data to the site as we can, without blocking. Keep
471--- 360,393 ----
472 RPRINT(dbenv, (dbenv, "msg to %s to be queued",
473 __repmgr_format_eid_loc(dbenv->rep_handle,
474 conn->eid, buffer)));
475+ if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
476+ blockable && !F_ISSET(conn, CONN_CONGESTED)) {
477+ RPRINT(dbenv, (dbenv,
478+ "block msg thread, await queue space"));
479+
480+ if ((drain_to = db_rep->ack_timeout) == 0)
481+ drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
482+ conn->blockers++;
483+ ret = __repmgr_await_drain(dbenv,
484+ conn, drain_to * OUT_QUEUE_LIMIT);
485+ conn->blockers--;
486+ if (db_rep->finished)
487+ return (DB_TIMEOUT);
488+ if (ret != 0)
489+ return (ret);
490+ if (STAILQ_EMPTY(&conn->outbound_queue))
491+ goto empty;
492+ }
493 if (conn->out_queue_length < OUT_QUEUE_LIMIT)
494 return (enqueue_msg(dbenv, conn, msg, 0));
495 else {
496 RPRINT(dbenv, (dbenv, "queue limit exceeded"));
497 STAT(dbenv->rep_handle->
498 region->mstat.st_msgs_dropped++);
499! return (blockable ? DB_TIMEOUT : 0);
500 }
501 }
502+ empty:
503
504 /*
505 * Send as much data to the site as we can, without blocking. Keep
506***************
507*** 498,521 ****
508
509 /*
510 * Abandons a connection, to recover from an error. Upon entry the conn struct
511! * must be on the connections list.
512! *
513! * If the 'do_close' flag is true, we do the whole job; the clean-up includes
514! * removing the struct from the list and freeing all its memory, so upon return
515! * the caller must not refer to it any further. Otherwise, we merely mark the
516! * connection for clean-up later by the main thread.
517 *
518 * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
519! * PUBLIC: REPMGR_CONNECTION *, int));
520 *
521 * !!!
522 * Caller holds mutex.
523 */
524 int
525! __repmgr_bust_connection(dbenv, conn, do_close)
526 DB_ENV *dbenv;
527 REPMGR_CONNECTION *conn;
528- int do_close;
529 {
530 DB_REP *db_rep;
531 int connecting, ret, eid;
532--- 533,553 ----
533
534 /*
535 * Abandons a connection, to recover from an error. Upon entry the conn struct
536! * must be on the connections list. For now, just mark it as unusable; it will
537! * be fully cleaned up in the top-level select thread, as soon as possible.
538 *
539 * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
540! * PUBLIC: REPMGR_CONNECTION *));
541 *
542 * !!!
543 * Caller holds mutex.
544+ *
545+ * Must be idempotent
546 */
547 int
548! __repmgr_bust_connection(dbenv, conn)
549 DB_ENV *dbenv;
550 REPMGR_CONNECTION *conn;
551 {
552 DB_REP *db_rep;
553 int connecting, ret, eid;
554***************
555*** 526,537 ****
556 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
557 eid = conn->eid;
558 connecting = F_ISSET(conn, CONN_CONNECTING);
559! if (do_close)
560! __repmgr_cleanup_connection(dbenv, conn);
561! else {
562! F_SET(conn, CONN_DEFUNCT);
563! conn->eid = -1;
564! }
565
566 /*
567 * When we first accepted the incoming connection, we set conn->eid to
568--- 558,566 ----
569 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
570 eid = conn->eid;
571 connecting = F_ISSET(conn, CONN_CONNECTING);
572!
573! F_SET(conn, CONN_DEFUNCT);
574! conn->eid = -1;
575
576 /*
577 * When we first accepted the incoming connection, we set conn->eid to
578***************
579*** 557,563 ****
580 dbenv, ELECT_FAILURE_ELECTION)) != 0)
581 return (ret);
582 }
583! } else if (!do_close) {
584 /*
585 * One way or another, make sure the main thread is poked, so
586 * that we do the deferred clean-up.
587--- 586,592 ----
588 dbenv, ELECT_FAILURE_ELECTION)) != 0)
589 return (ret);
590 }
591! } else {
592 /*
593 * One way or another, make sure the main thread is poked, so
594 * that we do the deferred clean-up.
595***************
596*** 568,577 ****
597 }
598
599 /*
600! * PUBLIC: void __repmgr_cleanup_connection
601 * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
602 */
603! void
604 __repmgr_cleanup_connection(dbenv, conn)
605 DB_ENV *dbenv;
606 REPMGR_CONNECTION *conn;
607--- 597,610 ----
608 }
609
610 /*
611! * PUBLIC: int __repmgr_cleanup_connection
612 * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *));
613+ *
614+ * !!!
615+ * Idempotent. This can be called repeatedly as blocking message threads (of
616+ * which there could be multiples) wake up in case of error on the connection.
617 */
618! int
619 __repmgr_cleanup_connection(dbenv, conn)
620 DB_ENV *dbenv;
621 REPMGR_CONNECTION *conn;
622***************
623*** 580,596 ****
624 QUEUED_OUTPUT *out;
625 REPMGR_FLAT *msg;
626 DBT *dbt;
627
628 db_rep = dbenv->rep_handle;
629
630! TAILQ_REMOVE(&db_rep->connections, conn, entries);
631 if (conn->fd != INVALID_SOCKET) {
632! (void)closesocket(conn->fd);
633 #ifdef DB_WIN32
634! (void)WSACloseEvent(conn->event_object);
635 #endif
636 }
637
638 /*
639 * Deallocate any input and output buffers we may have.
640 */
641--- 613,643 ----
642 QUEUED_OUTPUT *out;
643 REPMGR_FLAT *msg;
644 DBT *dbt;
645+ int ret;
646
647 db_rep = dbenv->rep_handle;
648
649! DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished);
650!
651 if (conn->fd != INVALID_SOCKET) {
652! ret = closesocket(conn->fd);
653! conn->fd = INVALID_SOCKET;
654! if (ret == SOCKET_ERROR) {
655! ret = net_errno;
656! __db_err(dbenv, ret, "closing socket");
657! }
658 #ifdef DB_WIN32
659! if (!WSACloseEvent(conn->event_object) && ret != 0)
660! ret = net_errno;
661 #endif
662+ if (ret != 0)
663+ return (ret);
664 }
665
666+ if (conn->blockers > 0)
667+ return (__repmgr_signal(&conn->drained));
668+
669+ TAILQ_REMOVE(&db_rep->connections, conn, entries);
670 /*
671 * Deallocate any input and output buffers we may have.
672 */
673***************
674*** 614,620 ****
675--- 661,669 ----
676 __os_free(dbenv, out);
677 }
678
679+ ret = __repmgr_free_cond(&conn->drained);
680 __os_free(dbenv, conn);
681+ return (ret);
682 }
683
684 static int
685***************
686*** 1063,1069 ****
687
688 while (!TAILQ_EMPTY(&db_rep->connections)) {
689 conn = TAILQ_FIRST(&db_rep->connections);
690! __repmgr_cleanup_connection(dbenv, conn);
691 }
692
693 for (i = 0; i < db_rep->site_cnt; i++) {
694--- 1112,1118 ----
695
696 while (!TAILQ_EMPTY(&db_rep->connections)) {
697 conn = TAILQ_FIRST(&db_rep->connections);
698! (void)__repmgr_cleanup_connection(dbenv, conn);
699 }
700
701 for (i = 0; i < db_rep->site_cnt; i++) {
702*** repmgr/repmgr_posix.c 2007-10-31 10:23:52.000000000 -0700
703--- repmgr/repmgr_posix.c 2007-10-31 10:23:53.000000000 -0700
704***************
705*** 21,26 ****
706--- 21,28 ----
707 size_t __repmgr_guesstimated_max = (128 * 1024);
708 #endif
709
710+ static int __repmgr_conn_work __P((DB_ENV *,
711+ REPMGR_CONNECTION *, fd_set *, fd_set *, int));
712 static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
713
714 /*
715***************
716*** 189,194 ****
717--- 191,284 ----
718 }
719
720 /*
721+ * PUBLIC: int __repmgr_await_drain __P((DB_ENV *,
722+ * PUBLIC: REPMGR_CONNECTION *, db_timeout_t));
723+ *
724+ * Waits for space to become available on the connection's output queue.
725+ * Various ways we can exit:
726+ *
727+ * 1. queue becomes non-full
728+ * 2. exceed time limit
729+ * 3. connection becomes defunct (due to error in another thread)
730+ * 4. repmgr is shutting down
731+ * 5. any unexpected system resource failure
732+ *
733+ * In cases #3 and #5 we return an error code. Caller is responsible for
734+ * distinguishing the remaining cases if desired.
735+ *
736+ * !!!
737+ * Caller must hold repmgr->mutex.
738+ */
739+ int
740+ __repmgr_await_drain(dbenv, conn, timeout)
741+ DB_ENV *dbenv;
742+ REPMGR_CONNECTION *conn;
743+ db_timeout_t timeout;
744+ {
745+ DB_REP *db_rep;
746+ struct timespec deadline;
747+ int ret;
748+
749+ db_rep = dbenv->rep_handle;
750+
751+ __repmgr_compute_wait_deadline(dbenv, &deadline, timeout);
752+
753+ ret = 0;
754+ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
755+ ret = pthread_cond_timedwait(&conn->drained,
756+ &db_rep->mutex, &deadline);
757+ switch (ret) {
758+ case 0:
759+ if (db_rep->finished)
760+ goto out; /* #4. */
761+ /*
762+ * Another thread could have stumbled into an error on
763+ * the socket while we were waiting.
764+ */
765+ if (F_ISSET(conn, CONN_DEFUNCT)) {
766+ ret = DB_REP_UNAVAIL; /* #3. */
767+ goto out;
768+ }
769+ break;
770+ case ETIMEDOUT:
771+ F_SET(conn, CONN_CONGESTED);
772+ ret = 0;
773+ goto out; /* #2. */
774+ default:
775+ goto out; /* #5. */
776+ }
777+ }
778+ /* #1. */
779+
780+ out:
781+ return (ret);
782+ }
783+
784+ /*
785+ * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
786+ *
787+ * Initialize a condition variable (in allocated space).
788+ */
789+ int
790+ __repmgr_alloc_cond(c)
791+ cond_var_t *c;
792+ {
793+ return (pthread_cond_init(c, NULL));
794+ }
795+
796+ /*
797+ * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
798+ *
799+ * Clean up a previously initialized condition variable.
800+ */
801+ int
802+ __repmgr_free_cond(c)
803+ cond_var_t *c;
804+ {
805+ return (pthread_cond_destroy(c));
806+ }
807+
808+ /*
809 * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
810 *
811 * Allocate/initialize all data necessary for thread synchronization. This
812***************
813*** 443,449 ****
814 REPMGR_RETRY *retry;
815 db_timespec timeout;
816 fd_set reads, writes;
817! int ret, flow_control, maxfd, nready;
818 u_int8_t buf[10]; /* arbitrary size */
819
820 flow_control = FALSE;
821--- 533,539 ----
822 REPMGR_RETRY *retry;
823 db_timespec timeout;
824 fd_set reads, writes;
825! int ret, flow_control, maxfd;
826 u_int8_t buf[10]; /* arbitrary size */
827
828 flow_control = FALSE;
829***************
830*** 477,482 ****
831--- 567,575 ----
832 * each one.
833 */
834 TAILQ_FOREACH(conn, &db_rep->connections, entries) {
835+ if (F_ISSET(conn, CONN_DEFUNCT))
836+ continue;
837+
838 if (F_ISSET(conn, CONN_CONNECTING)) {
839 FD_SET((u_int)conn->fd, &reads);
840 FD_SET((u_int)conn->fd, &writes);
841***************
842*** 533,616 ****
843 return (ret);
844 }
845 }
846- nready = ret;
847-
848 LOCK_MUTEX(db_rep->mutex);
849
850- /*
851- * The first priority thing we must do is to clean up any
852- * pending defunct connections. Otherwise, if they have any
853- * lingering pending input, we get very confused if we try to
854- * process it.
855- *
856- * The TAILQ_FOREACH macro would be suitable here, except that
857- * it doesn't allow unlinking the current element, which is
858- * needed for cleanup_connection.
859- */
860- for (conn = TAILQ_FIRST(&db_rep->connections);
861- conn != NULL;
862- conn = next) {
863- next = TAILQ_NEXT(conn, entries);
864- if (F_ISSET(conn, CONN_DEFUNCT))
865- __repmgr_cleanup_connection(dbenv, conn);
866- }
867-
868 if ((ret = __repmgr_retry_connections(dbenv)) != 0)
869 goto out;
870- if (nready == 0)
871- continue;
872
873 /*
874! * Traverse the linked list. (Again, like TAILQ_FOREACH, except
875! * that we need the ability to unlink an element along the way.)
876 */
877 for (conn = TAILQ_FIRST(&db_rep->connections);
878 conn != NULL;
879 conn = next) {
880 next = TAILQ_NEXT(conn, entries);
881! if (F_ISSET(conn, CONN_CONNECTING)) {
882! if (FD_ISSET((u_int)conn->fd, &reads) ||
883! FD_ISSET((u_int)conn->fd, &writes)) {
884! if ((ret = finish_connecting(dbenv,
885! conn)) == DB_REP_UNAVAIL) {
886! if ((ret =
887! __repmgr_bust_connection(
888! dbenv, conn, TRUE)) != 0)
889! goto out;
890! } else if (ret != 0)
891! goto out;
892! }
893! continue;
894! }
895!
896! /*
897! * Here, the site is connected, and the FD_SET's are
898! * valid.
899! */
900! if (FD_ISSET((u_int)conn->fd, &writes)) {
901! if ((ret = __repmgr_write_some(
902! dbenv, conn)) == DB_REP_UNAVAIL) {
903! if ((ret =
904! __repmgr_bust_connection(dbenv,
905! conn, TRUE)) != 0)
906! goto out;
907! continue;
908! } else if (ret != 0)
909! goto out;
910! }
911!
912! if (!flow_control &&
913! FD_ISSET((u_int)conn->fd, &reads)) {
914! if ((ret = __repmgr_read_from_site(dbenv, conn))
915! == DB_REP_UNAVAIL) {
916! if ((ret =
917! __repmgr_bust_connection(dbenv,
918! conn, TRUE)) != 0)
919! goto out;
920! continue;
921! } else if (ret != 0)
922! goto out;
923! }
924 }
925
926 /*
927--- 626,650 ----
928 return (ret);
929 }
930 }
931 LOCK_MUTEX(db_rep->mutex);
932
933 if ((ret = __repmgr_retry_connections(dbenv)) != 0)
934 goto out;
935
936 /*
937! * Examine each connection, to see what work needs to be done.
938! *
939! * The TAILQ_FOREACH macro would be suitable here, except that
940! * it doesn't allow unlinking the current element, which is
941! * needed for cleanup_connection.
942 */
943 for (conn = TAILQ_FIRST(&db_rep->connections);
944 conn != NULL;
945 conn = next) {
946 next = TAILQ_NEXT(conn, entries);
947! if ((ret = __repmgr_conn_work(dbenv,
948! conn, &reads, &writes, flow_control)) != 0)
949! goto out;
950 }
951
952 /*
953***************
954*** 637,642 ****
955--- 671,719 ----
956 }
957
958 static int
959+ __repmgr_conn_work(dbenv, conn, reads, writes, flow_control)
960+ DB_ENV *dbenv;
961+ REPMGR_CONNECTION *conn;
962+ fd_set *reads, *writes;
963+ int flow_control;
964+ {
965+ int ret;
966+ u_int fd;
967+
968+ if (F_ISSET(conn, CONN_DEFUNCT)) {
969+ /*
970+ * Deferred clean-up, from an error that happened in another
971+ * thread, while we were sleeping in select().
972+ */
973+ return (__repmgr_cleanup_connection(dbenv, conn));
974+ }
975+
976+ ret = 0;
977+ fd = (u_int)conn->fd;
978+
979+ if (F_ISSET(conn, CONN_CONNECTING)) {
980+ if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
981+ ret = finish_connecting(dbenv, conn);
982+ } else {
983+ /*
984+ * Here, the site is connected, and the FD_SET's are valid.
985+ */
986+ if (FD_ISSET(fd, writes))
987+ ret = __repmgr_write_some(dbenv, conn);
988+
989+ if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
990+ ret = __repmgr_read_from_site(dbenv, conn);
991+ }
992+
993+ if (ret == DB_REP_UNAVAIL) {
994+ if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
995+ return (ret);
996+ ret = __repmgr_cleanup_connection(dbenv, conn);
997+ }
998+ return (ret);
999+ }
1000+
1001+ static int
1002 finish_connecting(dbenv, conn)
1003 DB_ENV *dbenv;
1004 REPMGR_CONNECTION *conn;
1005***************
1006*** 657,662 ****
1007--- 734,740 ----
1008 goto err_rpt;
1009 }
1010
1011+ DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING));
1012 F_CLR(conn, CONN_CONNECTING);
1013 return (__repmgr_send_handshake(dbenv, conn));
1014
1015***************
1016*** 671,690 ****
1017 "connecting to %s", __repmgr_format_site_loc(site, buffer));
1018
1019 /* If we've exhausted the list of possible addresses, give up. */
1020! if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
1021 return (DB_REP_UNAVAIL);
1022
1023 /*
1024 * This is just like a little mini-"bust_connection", except that we
1025 * don't reschedule for later, 'cuz we're just about to try again right
1026! * now.
1027 *
1028 * !!!
1029 * Which means this must only be called on the select() thread, since
1030 * only there are we allowed to actually close a connection.
1031 */
1032 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1033! __repmgr_cleanup_connection(dbenv, conn);
1034 ret = __repmgr_connect_site(dbenv, eid);
1035 DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1036 return (ret);
1037--- 749,773 ----
1038 "connecting to %s", __repmgr_format_site_loc(site, buffer));
1039
1040 /* If we've exhausted the list of possible addresses, give up. */
1041! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
1042! STAT(db_rep->region->mstat.st_connect_fail++);
1043 return (DB_REP_UNAVAIL);
1044+ }
1045
1046 /*
1047 * This is just like a little mini-"bust_connection", except that we
1048 * don't reschedule for later, 'cuz we're just about to try again right
1049! * now. (Note that we don't have to worry about message threads
1050! * blocking on a full output queue: that can't happen when we're only
1051! * just connecting.)
1052 *
1053 * !!!
1054 * Which means this must only be called on the select() thread, since
1055 * only there are we allowed to actually close a connection.
1056 */
1057 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1058! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
1059! return (ret);
1060 ret = __repmgr_connect_site(dbenv, eid);
1061 DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1062 return (ret);
1063*** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700
1064--- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700
1065***************
1066*** 36,45 ****
1067
1068 /*
1069 * PUBLIC: int __repmgr_accept __P((DB_ENV *));
1070- *
1071- * !!!
1072- * Only ever called in the select() thread, since we may call
1073- * __repmgr_bust_connection(..., TRUE).
1074 */
1075 int
1076 __repmgr_accept(dbenv)
1077--- 36,41 ----
1078***************
1079*** 133,139 ****
1080 case 0:
1081 return (0);
1082 case DB_REP_UNAVAIL:
1083! return (__repmgr_bust_connection(dbenv, conn, TRUE));
1084 default:
1085 return (ret);
1086 }
1087--- 129,135 ----
1088 case 0:
1089 return (0);
1090 case DB_REP_UNAVAIL:
1091! return (__repmgr_bust_connection(dbenv, conn));
1092 default:
1093 return (ret);
1094 }
1095***************
1096*** 254,263 ****
1097 * starting with the "current" element of its address list and trying as many
1098 * addresses as necessary until the list is exhausted.
1099 *
1100- * !!!
1101- * Only ever called in the select() thread, since we may call
1102- * __repmgr_bust_connection(..., TRUE).
1103- *
1104 * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
1105 */
1106 int
1107--- 250,255 ----
1108***************
1109*** 332,338 ****
1110 case 0:
1111 break;
1112 case DB_REP_UNAVAIL:
1113! return (__repmgr_bust_connection(dbenv, con, TRUE));
1114 default:
1115 return (ret);
1116 }
1117--- 324,330 ----
1118 case 0:
1119 break;
1120 case DB_REP_UNAVAIL:
1121! return (__repmgr_bust_connection(dbenv, con));
1122 default:
1123 return (ret);
1124 }
1125***************
1126*** 437,443 ****
1127
1128 DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
1129
1130! return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
1131 }
1132
1133 /*
1134--- 429,443 ----
1135
1136 DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
1137
1138! /*
1139! * It would of course be disastrous to block the select() thread, so
1140! * pass the "blockable" argument as FALSE. Fortunately blocking should
1141! * never be necessary here, because the hand-shake is always the first
1142! * thing we send. Which is a good thing, because it would be almost as
1143! * disastrous if we allowed ourselves to drop a handshake.
1144! */
1145! return (__repmgr_send_one(dbenv,
1146! conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
1147 }
1148
1149 /*
1150***************
1151*** 854,859 ****
1152--- 854,872 ----
1153 conn->out_queue_length--;
1154 if (--msg->ref_count <= 0)
1155 __os_free(dbenv, msg);
1156+
1157+ /*
1158+ * We've achieved enough movement to free up at least
1159+ * one space in the outgoing queue. Wake any message
1160+ * threads that may be waiting for space. Clear the
1161+ * CONGESTED status so that when the queue reaches the
1162+ * high-water mark again, the filling thread will be
1163+ * allowed to try waiting again.
1164+ */
1165+ F_CLR(conn, CONN_CONGESTED);
1166+ if (conn->blockers > 0 &&
1167+ (ret = __repmgr_signal(&conn->drained)) != 0)
1168+ return (ret);
1169 }
1170 }
1171
1172*** repmgr/repmgr_util.c 2007-10-31 10:23:52.000000000 -0700
1173--- repmgr/repmgr_util.c 2007-10-31 10:23:53.000000000 -0700
1174***************
1175*** 103,108 ****
1176--- 103,113 ----
1177 db_rep = dbenv->rep_handle;
1178 if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
1179 return (ret);
1180+ if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
1181+ __os_free(dbenv, c);
1182+ return (ret);
1183+ }
1184+ c->blockers = 0;
1185
1186 c->fd = s;
1187 c->flags = flags;
1188*** repmgr/repmgr_windows.c 2007-10-31 10:23:52.000000000 -0700
1189--- repmgr/repmgr_windows.c 2007-10-31 10:23:53.000000000 -0700
1190***************
1191*** 11,16 ****
1192--- 11,19 ----
1193 #define __INCLUDE_NETWORKING 1
1194 #include "db_int.h"
1195
1196+ /* Convert time-out from microseconds to milliseconds, rounding up. */
1197+ #define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
1198+
1199 typedef struct __ack_waiter {
1200 HANDLE event;
1201 const DB_LSN *lsnp;
1202***************
1203*** 120,136 ****
1204 {
1205 DB_REP *db_rep;
1206 ACK_WAITER *me;
1207! DWORD ret;
1208! DWORD timeout;
1209
1210 db_rep = dbenv->rep_handle;
1211
1212 if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
1213 goto err;
1214
1215- /* convert time-out from microseconds to milliseconds, rounding up */
1216 timeout = db_rep->ack_timeout > 0 ?
1217! ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
1218 me->lsnp = lsnp;
1219 if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
1220 FALSE)) == WAIT_FAILED) {
1221--- 123,137 ----
1222 {
1223 DB_REP *db_rep;
1224 ACK_WAITER *me;
1225! DWORD ret, timeout;
1226
1227 db_rep = dbenv->rep_handle;
1228
1229 if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
1230 goto err;
1231
1232 timeout = db_rep->ack_timeout > 0 ?
1233! DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
1234 me->lsnp = lsnp;
1235 if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
1236 FALSE)) == WAIT_FAILED) {
1237***************
1238*** 211,216 ****
1239--- 212,296 ----
1240 db_rep->waiters->first_free = slot;
1241 }
1242
1243+ /* (See requirements described in repmgr_posix.c.) */
1244+ int
1245+ __repmgr_await_drain(dbenv, conn, timeout)
1246+ DB_ENV *dbenv;
1247+ REPMGR_CONNECTION *conn;
1248+ db_timeout_t timeout;
1249+ {
1250+ DB_REP *db_rep;
1251+ db_timespec deadline, delta, now;
1252+ db_timeout_t t;
1253+ DWORD duration, ret;
1254+ int round_up;
1255+
1256+ db_rep = dbenv->rep_handle;
1257+
1258+ __os_gettime(dbenv, &deadline);
1259+ DB_TIMEOUT_TO_TIMESPEC(timeout, &delta);
1260+ timespecadd(&deadline, &delta);
1261+
1262+ while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
1263+ if (!ResetEvent(conn->drained))
1264+ return (GetLastError());
1265+
1266+ /* How long until the deadline? */
1267+ __os_gettime(dbenv, &now);
1268+ if (timespeccmp(&now, &deadline, >=)) {
1269+ F_SET(conn, CONN_CONGESTED);
1270+ return (0);
1271+ }
1272+ delta = deadline;
1273+ timespecsub(&delta, &now);
1274+ round_up = TRUE;
1275+ DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
1276+ duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
1277+
1278+ ret = SignalObjectAndWait(db_rep->mutex,
1279+ conn->drained, duration, FALSE);
1280+ LOCK_MUTEX(db_rep->mutex);
1281+ if (ret == WAIT_FAILED)
1282+ return (GetLastError());
1283+ else if (ret == WAIT_TIMEOUT) {
1284+ F_SET(conn, CONN_CONGESTED);
1285+ return (0);
1286+ } else
1287+ DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
1288+
1289+ if (db_rep->finished)
1290+ return (0);
1291+ if (F_ISSET(conn, CONN_DEFUNCT))
1292+ return (DB_REP_UNAVAIL);
1293+ }
1294+ return (0);
1295+ }
1296+
1297+ /*
1298+ * Creates a manual reset event, which is usually our best choice when we may
1299+ * have multiple threads waiting on a single event.
1300+ */
1301+ int
1302+ __repmgr_alloc_cond(c)
1303+ cond_var_t *c;
1304+ {
1305+ HANDLE event;
1306+
1307+ if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
1308+ return (GetLastError());
1309+ *c = event;
1310+ return (0);
1311+ }
1312+
1313+ int
1314+ __repmgr_free_cond(c)
1315+ cond_var_t *c;
1316+ {
1317+ if (CloseHandle(*c))
1318+ return (0);
1319+ return (GetLastError());
1320+ }
1321+
1322 /*
1323 * Make resource allocation an all-or-nothing affair, outside of this and the
1324 * close_sync function. db_rep->waiters should be non-NULL iff all of these
1325***************
1326*** 488,493 ****
1327--- 568,576 ----
1328 * don't hurt anything flow-control-wise.
1329 */
1330 TAILQ_FOREACH(conn, &db_rep->connections, entries) {
1331+ if (F_ISSET(conn, CONN_DEFUNCT))
1332+ continue;
1333+
1334 if (F_ISSET(conn, CONN_CONNECTING) ||
1335 !STAILQ_EMPTY(&conn->outbound_queue) ||
1336 (!flow_control || !IS_VALID_EID(conn->eid))) {
1337***************
1338*** 534,541 ****
1339 conn != NULL;
1340 conn = next) {
1341 next = TAILQ_NEXT(conn, entries);
1342! if (F_ISSET(conn, CONN_DEFUNCT))
1343! __repmgr_cleanup_connection(dbenv, conn);
1344 }
1345
1346 /*
1347--- 617,626 ----
1348 conn != NULL;
1349 conn = next) {
1350 next = TAILQ_NEXT(conn, entries);
1351! if (F_ISSET(conn, CONN_DEFUNCT) &&
1352! (ret = __repmgr_cleanup_connection(dbenv,
1353! conn)) != 0)
1354! goto unlock;
1355 }
1356
1357 /*
1358***************
1359*** 587,597 ****
1360 return (ret);
1361 }
1362
1363- /*
1364- * !!!
1365- * Only ever called on the select() thread, since we may call
1366- * __repmgr_bust_connection(..., TRUE).
1367- */
1368 static int
1369 handle_completion(dbenv, conn)
1370 DB_ENV *dbenv;
1371--- 672,677 ----
1372***************
1373*** 651,660 ****
1374 }
1375 }
1376
1377! return (0);
1378!
1379! err: if (ret == DB_REP_UNAVAIL)
1380! return (__repmgr_bust_connection(dbenv, conn, TRUE));
1381 return (ret);
1382 }
1383
1384--- 731,742 ----
1385 }
1386 }
1387
1388! err:
1389! if (ret == DB_REP_UNAVAIL) {
1390! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
1391! return (ret);
1392! ret = __repmgr_cleanup_connection(dbenv, conn);
1393! }
1394 return (ret);
1395 }
1396
1397***************
1398*** 708,714 ****
1399 }
1400
1401 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1402! __repmgr_cleanup_connection(dbenv, conn);
1403 ret = __repmgr_connect_site(dbenv, eid);
1404 DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1405 return (ret);
1406--- 790,797 ----
1407 }
1408
1409 DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1410! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
1411! return (ret);
1412 ret = __repmgr_connect_site(dbenv, eid);
1413 DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1414 return (ret);
This page took 0.32415 seconds and 4 git commands to generate.