]> git.pld-linux.org Git - packages/db4.6.git/blob - patch.4.6.21.4
- do not define epoch 0
[packages/db4.6.git] / patch.4.6.21.4
1 *** dbinc/repmgr.h      2007-10-31 10:23:52.000000000 -0700
2 --- dbinc/repmgr.h      2007-10-31 10:23:53.000000000 -0700
3 ***************
4 *** 36,41 ****
5 --- 36,55 ----
6   #endif
7   
8   /*
9 +  * The (arbitrary) maximum number of outgoing messages we're willing to hold, on
10 +  * a queue per connection, waiting for TCP buffer space to become available in
11 +  * the kernel.  Rather than exceeding this limit, we simply discard additional
12 +  * messages (since this is always allowed by the replication protocol).
13 +  *    As a special dispensation, if a message is destined for a specific remote
14 +  * site (i.e., it's not a broadcast), then we first try blocking the sending
15 +  * thread, waiting for space to become available (though we only wait a limited
16 +  * time).  This is so as to be able to handle the immediate flood of (a
17 +  * potentially large number of) outgoing messages that replication generates, in
18 +  * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests.
19 +  */
20 + #define       OUT_QUEUE_LIMIT 10
21
22 + /*
23    * The system value is available from sysconf(_SC_HOST_NAME_MAX).
24    * Historically, the maximum host name was 256.
25    */
26 ***************
27 *** 47,52 ****
28 --- 61,71 ----
29   #define       MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20)
30   typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1];
31   
32 + /* Default timeout values, in seconds. */
33 + #define       DB_REPMGR_DEFAULT_ACK_TIMEOUT           (1 * US_PER_SEC)
34 + #define       DB_REPMGR_DEFAULT_CONNECTION_RETRY      (30 * US_PER_SEC)
35 + #define       DB_REPMGR_DEFAULT_ELECTION_RETRY        (10 * US_PER_SEC)
36
37   struct __repmgr_connection;
38       typedef struct __repmgr_connection REPMGR_CONNECTION;
39   struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE;
40 ***************
41 *** 171,178 ****
42   #ifdef DB_WIN32
43         WSAEVENT event_object;
44   #endif
45 ! #define       CONN_CONNECTING 0x01    /* nonblocking connect in progress */
46 ! #define       CONN_DEFUNCT    0x02    /* socket close pending */
47         u_int32_t flags;
48   
49         /*
50 --- 190,198 ----
51   #ifdef DB_WIN32
52         WSAEVENT event_object;
53   #endif
54 ! #define       CONN_CONGESTED  0x01    /* msg thread wait has exceeded timeout */
55 ! #define       CONN_CONNECTING 0x02    /* nonblocking connect in progress */
56 ! #define       CONN_DEFUNCT    0x04    /* socket close pending */
57         u_int32_t flags;
58   
59         /*
60 ***************
61 *** 180,189 ****
62          * send() function's thread.  But if TCP doesn't have enough network
63          * buffer space for us when we first try it, we instead allocate some
64          * memory, and copy the message, and then send it as space becomes
65 !        * available in our main select() thread.
66          */
67         OUT_Q_HEADER outbound_queue;
68         int out_queue_length;
69   
70         /*
71          * Input: while we're reading a message, we keep track of what phase
72 --- 200,215 ----
73          * send() function's thread.  But if TCP doesn't have enough network
74          * buffer space for us when we first try it, we instead allocate some
75          * memory, and copy the message, and then send it as space becomes
76 !        * available in our main select() thread.  In some cases, if the queue
77 !        * gets too long we wait until it's drained, and then append to it.
78 !        * This condition variable's associated mutex is the normal per-repmgr
79 !        * db_rep->mutex, because that mutex is always held anyway whenever the
80 !        * output queue is consulted.
81          */
82         OUT_Q_HEADER outbound_queue;
83         int out_queue_length;
84 +       cond_var_t drained;
85 +       int blockers;           /* ref count of msg threads waiting on us */
86   
87         /*
88          * Input: while we're reading a message, we keep track of what phase
89 *** dbinc_auto/int_def.in       2007-10-31 10:23:52.000000000 -0700
90 --- dbinc_auto/int_def.in       2007-10-31 10:23:52.000000000 -0700
91 ***************
92 *** 1420,1425 ****
93 --- 1420,1428 ----
94   #define       __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@
95   #define       __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@
96   #define       __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@
97 + #define       __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@
98 + #define       __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@
99 + #define       __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@
100   #define       __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@
101   #define       __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@
102   #define       __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@
103 *** dbinc_auto/repmgr_ext.h     2007-10-31 10:23:52.000000000 -0700
104 --- dbinc_auto/repmgr_ext.h     2007-10-31 10:23:52.000000000 -0700
105 ***************
106 *** 21,30 ****
107   int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
108   void __repmgr_stash_generation __P((DB_ENV *));
109   int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
110 ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *));
111   int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
112 ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int));
113 ! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
114   int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
115   int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
116   int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
117 --- 21,30 ----
118   int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *));
119   void __repmgr_stash_generation __P((DB_ENV *));
120   int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t));
121 ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int));
122   int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *));
123 ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *));
124 ! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *));
125   int __repmgr_find_site __P((DB_ENV *, const char *, u_int));
126   int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *));
127   int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **));
128 ***************
129 *** 39,44 ****
130 --- 39,47 ----
131   int __repmgr_wake_waiting_senders __P((DB_ENV *));
132   int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *));
133   void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t));
134 + int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t));
135 + int __repmgr_alloc_cond __P((cond_var_t *));
136 + int __repmgr_free_cond __P((cond_var_t *));
137   int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
138   int __repmgr_close_sync __P((DB_ENV *));
139   int __repmgr_net_init __P((DB_ENV *, DB_REP *));
140 *** repmgr/repmgr_method.c      2007-10-31 10:23:52.000000000 -0700
141 --- repmgr/repmgr_method.c      2007-10-31 10:23:53.000000000 -0700
142 ***************
143 *** 196,204 ****
144         int ret;
145   
146         /* Set some default values. */
147 !       db_rep->ack_timeout = 1 * US_PER_SEC;                   /*  1 second */
148 !       db_rep->connection_retry_wait = 30 * US_PER_SEC;        /* 30 seconds */
149 !       db_rep->election_retry_wait = 10 * US_PER_SEC;          /* 10 seconds */
150         db_rep->config_nsites = 0;
151         db_rep->peer = DB_EID_INVALID;
152         db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
153 --- 196,204 ----
154         int ret;
155   
156         /* Set some default values. */
157 !       db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
158 !       db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY;
159 !       db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY;
160         db_rep->config_nsites = 0;
161         db_rep->peer = DB_EID_INVALID;
162         db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM;
163 ***************
164 *** 238,243 ****
165 --- 238,244 ----
166         DB_ENV *dbenv;
167   {
168         DB_REP *db_rep;
169 +       REPMGR_CONNECTION *conn;
170         int ret;
171   
172         db_rep = dbenv->rep_handle;
173 ***************
174 *** 254,259 ****
175 --- 255,266 ----
176   
177         if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0)
178                 goto unlock;
179
180 +       TAILQ_FOREACH(conn, &db_rep->connections, entries) {
181 +               if (conn->blockers > 0 &&
182 +                   ((ret = __repmgr_signal(&conn->drained)) != 0))
183 +                       goto unlock;
184 +       }
185         UNLOCK_MUTEX(db_rep->mutex);
186   
187         return (__repmgr_wake_main_thread(dbenv));
188 *** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700
189 --- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700
190 ***************
191 *** 183,192 ****
192   
193   /*
194    * Acknowledges a message.
195 -  *
196 -  * !!!
197 -  * Note that this cannot be called from the select() thread, in case we call
198 -  * __repmgr_bust_connection(..., FALSE).
199    */
200   static int
201   ack_message(dbenv, generation, lsn)
202 --- 183,188 ----
203 ***************
204 *** 227,235 ****
205                 rec2.size = 0;
206   
207                 conn = site->ref.conn;
208                 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
209 !                   &control2, &rec2)) == DB_REP_UNAVAIL)
210 !                       ret = __repmgr_bust_connection(dbenv, conn, FALSE);
211         }
212   
213         UNLOCK_MUTEX(db_rep->mutex);
214 --- 223,236 ----
215                 rec2.size = 0;
216   
217                 conn = site->ref.conn;
218 +               /*
219 +                * It's hard to imagine anyone would care about a lost ack if
220 +                * the path to the master is so congested as to need blocking;
221 +                * so pass "blockable" argument as FALSE.
222 +                */
223                 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK,
224 !                   &control2, &rec2, FALSE)) == DB_REP_UNAVAIL)
225 !                       ret = __repmgr_bust_connection(dbenv, conn);
226         }
227   
228         UNLOCK_MUTEX(db_rep->mutex);
229 *** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700
230 --- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700
231 ***************
232 *** 63,69 ****
233   static void setup_sending_msg
234       __P((struct sending_msg *, u_int, const DBT *, const DBT *));
235   static int __repmgr_send_internal
236 !     __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *));
237   static int enqueue_msg
238       __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
239   static int flatten __P((DB_ENV *, struct sending_msg *));
240 --- 63,69 ----
241   static void setup_sending_msg
242       __P((struct sending_msg *, u_int, const DBT *, const DBT *));
243   static int __repmgr_send_internal
244 !     __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int));
245   static int enqueue_msg
246       __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t));
247   static int flatten __P((DB_ENV *, struct sending_msg *));
248 ***************
249 *** 73,85 ****
250    * __repmgr_send --
251    *    The send function for DB_ENV->rep_set_transport.
252    *
253 -  * !!!
254 -  * This is only ever called as the replication transport call-back, which means
255 -  * it's either on one of our message processing threads or an application
256 -  * thread.  It mustn't be called from the select() thread, because we might call
257 -  * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the
258 -  * select() thread.
259 -  *
260    * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *,
261    * PUBLIC:     const DB_LSN *, int, u_int32_t));
262    */
263 --- 73,78 ----
264 ***************
265 *** 126,134 ****
266                 }
267   
268                 conn = site->ref.conn;
269                 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
270 !                   control, rec)) == DB_REP_UNAVAIL &&
271 !                   (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0)
272                         ret = t_ret;
273                 if (ret != 0)
274                         goto out;
275 --- 119,128 ----
276                 }
277   
278                 conn = site->ref.conn;
279 +               /* Pass the "blockable" argument as TRUE. */
280                 if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE,
281 !                   control, rec, TRUE)) == DB_REP_UNAVAIL &&
282 !                   (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0)
283                         ret = t_ret;
284                 if (ret != 0)
285                         goto out;
286 ***************
287 *** 222,228 ****
288         if (site->state != SITE_CONNECTED)
289                 return (NULL);
290   
291 !       if (F_ISSET(site->ref.conn, CONN_CONNECTING))
292                 return (NULL);
293         return (site);
294   }
295 --- 216,222 ----
296         if (site->state != SITE_CONNECTED)
297                 return (NULL);
298   
299 !       if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT))
300                 return (NULL);
301         return (site);
302   }
303 ***************
304 *** 235,244 ****
305    *
306    * !!!
307    * Caller must hold dbenv->mutex.
308 -  *
309 -  * !!!
310 -  * Note that this cannot be called from the select() thread, in case we call
311 -  * __repmgr_bust_connection(..., FALSE).
312    */
313   static int
314   __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp)
315 --- 229,234 ----
316 ***************
317 *** 268,281 ****
318                     !IS_VALID_EID(conn->eid))
319                         continue;
320   
321 !               if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) {
322                         site = SITE_FROM_EID(conn->eid);
323                         nsites++;
324                         if (site->priority > 0)
325                                 npeers++;
326                 } else if (ret == DB_REP_UNAVAIL) {
327 !                       if ((ret = __repmgr_bust_connection(
328 !                            dbenv, conn, FALSE)) != 0)
329                                 return (ret);
330                 } else
331                         return (ret);
332 --- 258,277 ----
333                     !IS_VALID_EID(conn->eid))
334                         continue;
335   
336 !               /*
337 !                * Broadcast messages are either application threads committing
338 !                * transactions, or replication status message that we can
339 !                * afford to lose.  So don't allow blocking for them (pass
340 !                * "blockable" argument as FALSE).
341 !                */
342 !               if ((ret = __repmgr_send_internal(dbenv,
343 !                   conn, &msg, FALSE)) == 0) {
344                         site = SITE_FROM_EID(conn->eid);
345                         nsites++;
346                         if (site->priority > 0)
347                                 npeers++;
348                 } else if (ret == DB_REP_UNAVAIL) {
349 !                       if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
350                                 return (ret);
351                 } else
352                         return (ret);
353 ***************
354 *** 301,339 ****
355    * intersperse writes that are part of two single messages.
356    *
357    * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
358 !  * PUBLIC:    u_int, const DBT *, const DBT *));
359    */
360   int
361 ! __repmgr_send_one(dbenv, conn, msg_type, control, rec)
362         DB_ENV *dbenv;
363         REPMGR_CONNECTION *conn;
364         u_int msg_type;
365         const DBT *control, *rec;
366   {
367         struct sending_msg msg;
368   
369         setup_sending_msg(&msg, msg_type, control, rec);
370 !       return (__repmgr_send_internal(dbenv, conn, &msg));
371   }
372   
373   /*
374    * Attempts a "best effort" to send a message on the given site.  If there is an
375 !  * excessive backlog of message already queued on the connection, we simply drop
376 !  * this message, and still return 0 even in this case.
377    */
378   static int
379 ! __repmgr_send_internal(dbenv, conn, msg)
380         DB_ENV *dbenv;
381         REPMGR_CONNECTION *conn;
382         struct sending_msg *msg;
383   {
384 ! #define       OUT_QUEUE_LIMIT 10      /* arbitrary, for now */
385         REPMGR_IOVECS iovecs;
386         SITE_STRING_BUFFER buffer;
387         int ret;
388         size_t nw;
389         size_t total_written;
390   
391         DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
392         if (!STAILQ_EMPTY(&conn->outbound_queue)) {
393                 /*
394 --- 297,355 ----
395    * intersperse writes that are part of two single messages.
396    *
397    * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *,
398 !  * PUBLIC:    u_int, const DBT *, const DBT *, int));
399    */
400   int
401 ! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable)
402         DB_ENV *dbenv;
403         REPMGR_CONNECTION *conn;
404         u_int msg_type;
405         const DBT *control, *rec;
406 +       int blockable;
407   {
408         struct sending_msg msg;
409   
410         setup_sending_msg(&msg, msg_type, control, rec);
411 !       return (__repmgr_send_internal(dbenv, conn, &msg, blockable));
412   }
413   
414   /*
415    * Attempts a "best effort" to send a message on the given site.  If there is an
416 !  * excessive backlog of message already queued on the connection, what shall we
417 !  * do?  If the caller doesn't mind blocking, we'll wait (a limited amount of
418 !  * time) for the queue to drain.  Otherwise we'll simply drop the message.  This
419 !  * is always allowed by the replication protocol.  But in the case of a
420 !  * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we
421 !  * almost always get a flood of messages that instantly fills our queue, so
422 !  * blocking improves performance (by avoiding the need for the client to
423 !  * re-request).
424 !  *
425 !  * How long shall we wait?  We could of course create a new timeout
426 !  * configuration type, so that the application could set it directly.  But that
427 !  * would start to overwhelm the user with too many choices to think about.  We
428 !  * already have an ACK timeout, which is the user's estimate of how long it
429 !  * should take to send a message to the client, have it be processed, and return
430 !  * a message back to us.  We multiply that by the queue size, because that's how
431 !  * many messages have to be swallowed up by the client before we're able to
432 !  * start sending again (at least to a rough approximation).
433    */
434   static int
435 ! __repmgr_send_internal(dbenv, conn, msg, blockable)
436         DB_ENV *dbenv;
437         REPMGR_CONNECTION *conn;
438         struct sending_msg *msg;
439 +       int blockable;
440   {
441 !       DB_REP *db_rep;
442         REPMGR_IOVECS iovecs;
443         SITE_STRING_BUFFER buffer;
444 +       db_timeout_t drain_to;
445         int ret;
446         size_t nw;
447         size_t total_written;
448   
449 +       db_rep = dbenv->rep_handle;
450
451         DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING));
452         if (!STAILQ_EMPTY(&conn->outbound_queue)) {
453                 /*
454 ***************
455 *** 344,358 ****
456                 RPRINT(dbenv, (dbenv, "msg to %s to be queued",
457                     __repmgr_format_eid_loc(dbenv->rep_handle,
458                     conn->eid, buffer)));
459                 if (conn->out_queue_length < OUT_QUEUE_LIMIT)
460                         return (enqueue_msg(dbenv, conn, msg, 0));
461                 else {
462                         RPRINT(dbenv, (dbenv, "queue limit exceeded"));
463                         STAT(dbenv->rep_handle->
464                             region->mstat.st_msgs_dropped++);
465 !                       return (0);
466                 }
467         }
468   
469         /*
470          * Send as much data to the site as we can, without blocking.  Keep
471 --- 360,393 ----
472                 RPRINT(dbenv, (dbenv, "msg to %s to be queued",
473                     __repmgr_format_eid_loc(dbenv->rep_handle,
474                     conn->eid, buffer)));
475 +               if (conn->out_queue_length >= OUT_QUEUE_LIMIT &&
476 +                   blockable && !F_ISSET(conn, CONN_CONGESTED)) {
477 +                       RPRINT(dbenv, (dbenv,
478 +                           "block msg thread, await queue space"));
479
480 +                       if ((drain_to = db_rep->ack_timeout) == 0)
481 +                               drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT;
482 +                       conn->blockers++;
483 +                       ret = __repmgr_await_drain(dbenv,
484 +                           conn, drain_to * OUT_QUEUE_LIMIT);
485 +                       conn->blockers--;
486 +                       if (db_rep->finished)
487 +                               return (DB_TIMEOUT);
488 +                       if (ret != 0)
489 +                               return (ret);
490 +                       if (STAILQ_EMPTY(&conn->outbound_queue))
491 +                               goto empty;
492 +               }
493                 if (conn->out_queue_length < OUT_QUEUE_LIMIT)
494                         return (enqueue_msg(dbenv, conn, msg, 0));
495                 else {
496                         RPRINT(dbenv, (dbenv, "queue limit exceeded"));
497                         STAT(dbenv->rep_handle->
498                             region->mstat.st_msgs_dropped++);
499 !                       return (blockable ? DB_TIMEOUT : 0);
500                 }
501         }
502 + empty:
503   
504         /*
505          * Send as much data to the site as we can, without blocking.  Keep
506 ***************
507 *** 498,521 ****
508   
509   /*
510    * Abandons a connection, to recover from an error.  Upon entry the conn struct
511 !  * must be on the connections list.
512 !  *
513 !  * If the 'do_close' flag is true, we do the whole job; the clean-up includes
514 !  * removing the struct from the list and freeing all its memory, so upon return
515 !  * the caller must not refer to it any further.  Otherwise, we merely mark the
516 !  * connection for clean-up later by the main thread.
517    *
518    * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
519 !  * PUBLIC:     REPMGR_CONNECTION *, int));
520    *
521    * !!!
522    * Caller holds mutex.
523    */
524   int
525 ! __repmgr_bust_connection(dbenv, conn, do_close)
526         DB_ENV *dbenv;
527         REPMGR_CONNECTION *conn;
528 -       int do_close;
529   {
530         DB_REP *db_rep;
531         int connecting, ret, eid;
532 --- 533,553 ----
533   
534   /*
535    * Abandons a connection, to recover from an error.  Upon entry the conn struct
536 !  * must be on the connections list.  For now, just mark it as unusable; it will
537 !  * be fully cleaned up in the top-level select thread, as soon as possible.
538    *
539    * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *,
540 !  * PUBLIC:     REPMGR_CONNECTION *));
541    *
542    * !!!
543    * Caller holds mutex.
544 +  *
545 +  * Must be idempotent
546    */
547   int
548 ! __repmgr_bust_connection(dbenv, conn)
549         DB_ENV *dbenv;
550         REPMGR_CONNECTION *conn;
551   {
552         DB_REP *db_rep;
553         int connecting, ret, eid;
554 ***************
555 *** 526,537 ****
556         DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
557         eid = conn->eid;
558         connecting = F_ISSET(conn, CONN_CONNECTING);
559 !       if (do_close)
560 !               __repmgr_cleanup_connection(dbenv, conn);
561 !       else {
562 !               F_SET(conn, CONN_DEFUNCT);
563 !               conn->eid = -1;
564 !       }
565   
566         /*
567          * When we first accepted the incoming connection, we set conn->eid to
568 --- 558,566 ----
569         DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
570         eid = conn->eid;
571         connecting = F_ISSET(conn, CONN_CONNECTING);
572
573 !       F_SET(conn, CONN_DEFUNCT);
574 !       conn->eid = -1;
575   
576         /*
577          * When we first accepted the incoming connection, we set conn->eid to
578 ***************
579 *** 557,563 ****
580                             dbenv, ELECT_FAILURE_ELECTION)) != 0)
581                                 return (ret);
582                 }
583 !       } else if (!do_close) {
584                 /*
585                  * One way or another, make sure the main thread is poked, so
586                  * that we do the deferred clean-up.
587 --- 586,592 ----
588                             dbenv, ELECT_FAILURE_ELECTION)) != 0)
589                                 return (ret);
590                 }
591 !       } else {
592                 /*
593                  * One way or another, make sure the main thread is poked, so
594                  * that we do the deferred clean-up.
595 ***************
596 *** 568,577 ****
597   }
598   
599   /*
600 !  * PUBLIC: void __repmgr_cleanup_connection
601    * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
602    */
603 ! void
604   __repmgr_cleanup_connection(dbenv, conn)
605         DB_ENV *dbenv;
606         REPMGR_CONNECTION *conn;
607 --- 597,610 ----
608   }
609   
610   /*
611 !  * PUBLIC: int __repmgr_cleanup_connection
612    * PUBLIC:    __P((DB_ENV *, REPMGR_CONNECTION *));
613 +  *
614 +  * !!!
615 +  * Idempotent.  This can be called repeatedly as blocking message threads (of
616 +  * which there could be multiples) wake up in case of error on the connection.
617    */
618 ! int
619   __repmgr_cleanup_connection(dbenv, conn)
620         DB_ENV *dbenv;
621         REPMGR_CONNECTION *conn;
622 ***************
623 *** 580,596 ****
624         QUEUED_OUTPUT *out;
625         REPMGR_FLAT *msg;
626         DBT *dbt;
627   
628         db_rep = dbenv->rep_handle;
629   
630 !       TAILQ_REMOVE(&db_rep->connections, conn, entries);
631         if (conn->fd != INVALID_SOCKET) {
632 !               (void)closesocket(conn->fd);
633   #ifdef DB_WIN32
634 !               (void)WSACloseEvent(conn->event_object);
635   #endif
636         }
637   
638         /*
639          * Deallocate any input and output buffers we may have.
640          */
641 --- 613,643 ----
642         QUEUED_OUTPUT *out;
643         REPMGR_FLAT *msg;
644         DBT *dbt;
645 +       int ret;
646   
647         db_rep = dbenv->rep_handle;
648   
649 !       DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished);
650 !       
651         if (conn->fd != INVALID_SOCKET) {
652 !               ret = closesocket(conn->fd);
653 !               conn->fd = INVALID_SOCKET;
654 !               if (ret == SOCKET_ERROR) {
655 !                       ret = net_errno;
656 !                       __db_err(dbenv, ret, "closing socket");
657 !               }
658   #ifdef DB_WIN32
659 !               if (!WSACloseEvent(conn->event_object) && ret != 0)
660 !                       ret = net_errno;
661   #endif
662 +               if (ret != 0)
663 +                       return (ret);
664         }
665   
666 +       if (conn->blockers > 0)
667 +               return (__repmgr_signal(&conn->drained));
668
669 +       TAILQ_REMOVE(&db_rep->connections, conn, entries);
670         /*
671          * Deallocate any input and output buffers we may have.
672          */
673 ***************
674 *** 614,620 ****
675 --- 661,669 ----
676                 __os_free(dbenv, out);
677         }
678   
679 +       ret = __repmgr_free_cond(&conn->drained);
680         __os_free(dbenv, conn);
681 +       return (ret);
682   }
683   
684   static int
685 ***************
686 *** 1063,1069 ****
687   
688         while (!TAILQ_EMPTY(&db_rep->connections)) {
689                 conn = TAILQ_FIRST(&db_rep->connections);
690 !               __repmgr_cleanup_connection(dbenv, conn);
691         }
692   
693         for (i = 0; i < db_rep->site_cnt; i++) {
694 --- 1112,1118 ----
695   
696         while (!TAILQ_EMPTY(&db_rep->connections)) {
697                 conn = TAILQ_FIRST(&db_rep->connections);
698 !               (void)__repmgr_cleanup_connection(dbenv, conn);
699         }
700   
701         for (i = 0; i < db_rep->site_cnt; i++) {
702 *** repmgr/repmgr_posix.c       2007-10-31 10:23:52.000000000 -0700
703 --- repmgr/repmgr_posix.c       2007-10-31 10:23:53.000000000 -0700
704 ***************
705 *** 21,26 ****
706 --- 21,28 ----
707   size_t __repmgr_guesstimated_max = (128 * 1024);
708   #endif
709   
710 + static int __repmgr_conn_work __P((DB_ENV *,
711 +     REPMGR_CONNECTION *, fd_set *, fd_set *, int));
712   static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *));
713   
714   /*
715 ***************
716 *** 189,194 ****
717 --- 191,284 ----
718   }
719   
720   /*
721 +  * PUBLIC: int __repmgr_await_drain __P((DB_ENV *,
722 +  * PUBLIC:    REPMGR_CONNECTION *, db_timeout_t));
723 +  *
724 +  * Waits for space to become available on the connection's output queue.
725 +  * Various ways we can exit:
726 +  *
727 +  * 1. queue becomes non-full
728 +  * 2. exceed time limit
729 +  * 3. connection becomes defunct (due to error in another thread)
730 +  * 4. repmgr is shutting down
731 +  * 5. any unexpected system resource failure
732 +  *
733 +  * In cases #3 and #5 we return an error code.  Caller is responsible for
734 +  * distinguishing the remaining cases if desired.
735 +  * 
736 +  * !!!
737 +  * Caller must hold repmgr->mutex.
738 +  */
739 + int
740 + __repmgr_await_drain(dbenv, conn, timeout)
741 +       DB_ENV *dbenv;
742 +       REPMGR_CONNECTION *conn;
743 +       db_timeout_t timeout;
744 + {
745 +       DB_REP *db_rep;
746 +       struct timespec deadline;
747 +       int ret;
748
749 +       db_rep = dbenv->rep_handle;
750
751 +       __repmgr_compute_wait_deadline(dbenv, &deadline, timeout);
752
753 +       ret = 0;
754 +       while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
755 +               ret = pthread_cond_timedwait(&conn->drained,
756 +                   &db_rep->mutex, &deadline);
757 +               switch (ret) {
758 +               case 0:
759 +                       if (db_rep->finished)
760 +                               goto out; /* #4. */
761 +                       /*
762 +                        * Another thread could have stumbled into an error on
763 +                        * the socket while we were waiting.
764 +                        */
765 +                       if (F_ISSET(conn, CONN_DEFUNCT)) {
766 +                               ret = DB_REP_UNAVAIL; /* #3. */
767 +                               goto out;
768 +                       }
769 +                       break;
770 +               case ETIMEDOUT:
771 +                       F_SET(conn, CONN_CONGESTED);
772 +                       ret = 0;
773 +                       goto out; /* #2. */
774 +               default:
775 +                       goto out; /* #5. */
776 +               }
777 +       }
778 +       /* #1. */
779
780 + out:
781 +       return (ret);
782 + }
783
784 + /*
785 +  * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *));
786 +  *
787 +  * Initialize a condition variable (in allocated space).
788 +  */
789 + int
790 + __repmgr_alloc_cond(c)
791 +       cond_var_t *c;
792 + {
793 +       return (pthread_cond_init(c, NULL));
794 + }
795
796 + /*
797 +  * PUBLIC: int __repmgr_free_cond __P((cond_var_t *));
798 +  *
799 +  * Clean up a previously initialized condition variable.
800 +  */
801 + int
802 + __repmgr_free_cond(c)
803 +       cond_var_t *c;
804 + {
805 +       return (pthread_cond_destroy(c));
806 + }
807
808 + /*
809    * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *));
810    *
811    * Allocate/initialize all data necessary for thread synchronization.  This
812 ***************
813 *** 443,449 ****
814         REPMGR_RETRY *retry;
815         db_timespec timeout;
816         fd_set reads, writes;
817 !       int ret, flow_control, maxfd, nready;
818         u_int8_t buf[10];       /* arbitrary size */
819   
820         flow_control = FALSE;
821 --- 533,539 ----
822         REPMGR_RETRY *retry;
823         db_timespec timeout;
824         fd_set reads, writes;
825 !       int ret, flow_control, maxfd;
826         u_int8_t buf[10];       /* arbitrary size */
827   
828         flow_control = FALSE;
829 ***************
830 *** 477,482 ****
831 --- 567,575 ----
832                  * each one.
833                  */
834                 TAILQ_FOREACH(conn, &db_rep->connections, entries) {
835 +                       if (F_ISSET(conn, CONN_DEFUNCT))
836 +                               continue;
837 +                       
838                         if (F_ISSET(conn, CONN_CONNECTING)) {
839                                 FD_SET((u_int)conn->fd, &reads);
840                                 FD_SET((u_int)conn->fd, &writes);
841 ***************
842 *** 533,616 ****
843                                 return (ret);
844                         }
845                 }
846 -               nready = ret;
847
848                 LOCK_MUTEX(db_rep->mutex);
849   
850 -               /*
851 -                * The first priority thing we must do is to clean up any
852 -                * pending defunct connections.  Otherwise, if they have any
853 -                * lingering pending input, we get very confused if we try to
854 -                * process it.
855 -                *
856 -                * The TAILQ_FOREACH macro would be suitable here, except that
857 -                * it doesn't allow unlinking the current element, which is
858 -                * needed for cleanup_connection.
859 -                */
860 -               for (conn = TAILQ_FIRST(&db_rep->connections);
861 -                    conn != NULL;
862 -                    conn = next) {
863 -                       next = TAILQ_NEXT(conn, entries);
864 -                       if (F_ISSET(conn, CONN_DEFUNCT))
865 -                               __repmgr_cleanup_connection(dbenv, conn);
866 -               }
867
868                 if ((ret = __repmgr_retry_connections(dbenv)) != 0)
869                         goto out;
870 -               if (nready == 0)
871 -                       continue;
872   
873                 /*
874 !                * Traverse the linked list.  (Again, like TAILQ_FOREACH, except
875 !                * that we need the ability to unlink an element along the way.)
876                  */
877                 for (conn = TAILQ_FIRST(&db_rep->connections);
878                      conn != NULL;
879                      conn = next) {
880                         next = TAILQ_NEXT(conn, entries);
881 !                       if (F_ISSET(conn, CONN_CONNECTING)) {
882 !                               if (FD_ISSET((u_int)conn->fd, &reads) ||
883 !                                   FD_ISSET((u_int)conn->fd, &writes)) {
884 !                                       if ((ret = finish_connecting(dbenv,
885 !                                           conn)) == DB_REP_UNAVAIL) {
886 !                                               if ((ret =
887 !                                                   __repmgr_bust_connection(
888 !                                                   dbenv, conn, TRUE)) != 0)
889 !                                                       goto out;
890 !                                       } else if (ret != 0)
891 !                                               goto out;
892 !                               }
893 !                               continue;
894 !                       }
895
896 !                       /*
897 !                        * Here, the site is connected, and the FD_SET's are
898 !                        * valid.
899 !                        */
900 !                       if (FD_ISSET((u_int)conn->fd, &writes)) {
901 !                               if ((ret = __repmgr_write_some(
902 !                                   dbenv, conn)) == DB_REP_UNAVAIL) {
903 !                                       if ((ret =
904 !                                           __repmgr_bust_connection(dbenv,
905 !                                           conn, TRUE)) != 0)
906 !                                               goto out;
907 !                                       continue;
908 !                               } else if (ret != 0)
909 !                                       goto out;
910 !                       }
911
912 !                       if (!flow_control &&
913 !                           FD_ISSET((u_int)conn->fd, &reads)) {
914 !                               if ((ret = __repmgr_read_from_site(dbenv, conn))
915 !                                   == DB_REP_UNAVAIL) {
916 !                                       if ((ret =
917 !                                           __repmgr_bust_connection(dbenv,
918 !                                           conn, TRUE)) != 0)
919 !                                               goto out;
920 !                                       continue;
921 !                               } else if (ret != 0)
922 !                                       goto out;
923 !                       }
924                 }
925   
926                 /*
927 --- 626,650 ----
928                                 return (ret);
929                         }
930                 }
931                 LOCK_MUTEX(db_rep->mutex);
932   
933                 if ((ret = __repmgr_retry_connections(dbenv)) != 0)
934                         goto out;
935   
936                 /*
937 !                * Examine each connection, to see what work needs to be done.
938 !                * 
939 !                * The TAILQ_FOREACH macro would be suitable here, except that
940 !                * it doesn't allow unlinking the current element, which is
941 !                * needed for cleanup_connection.
942                  */
943                 for (conn = TAILQ_FIRST(&db_rep->connections);
944                      conn != NULL;
945                      conn = next) {
946                         next = TAILQ_NEXT(conn, entries);
947 !                       if ((ret = __repmgr_conn_work(dbenv,
948 !                           conn, &reads, &writes, flow_control)) != 0)
949 !                               goto out;
950                 }
951   
952                 /*
953 ***************
954 *** 637,642 ****
955 --- 671,719 ----
956   }
957   
958   static int
959 + __repmgr_conn_work(dbenv, conn, reads, writes, flow_control)
960 +       DB_ENV *dbenv;
961 +       REPMGR_CONNECTION *conn;
962 +       fd_set *reads, *writes;
963 +       int flow_control;
964 + {
965 +       int ret;
966 +       u_int fd;
967
968 +       if (F_ISSET(conn, CONN_DEFUNCT)) {
969 +               /*
970 +                * Deferred clean-up, from an error that happened in another
971 +                * thread, while we were sleeping in select().
972 +               */
973 +               return (__repmgr_cleanup_connection(dbenv, conn));
974 +       }
975
976 +       ret = 0;
977 +       fd = (u_int)conn->fd;
978 +       
979 +       if (F_ISSET(conn, CONN_CONNECTING)) {
980 +               if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes))
981 +                       ret = finish_connecting(dbenv, conn);
982 +       } else {
983 +               /*
984 +                * Here, the site is connected, and the FD_SET's are valid.
985 +                */
986 +               if (FD_ISSET(fd, writes))
987 +                       ret = __repmgr_write_some(dbenv, conn);
988 +                               
989 +               if (ret == 0 && !flow_control && FD_ISSET(fd, reads))
990 +                       ret = __repmgr_read_from_site(dbenv, conn);
991 +       }
992
993 +       if (ret == DB_REP_UNAVAIL) {
994 +               if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
995 +                       return (ret);
996 +               ret = __repmgr_cleanup_connection(dbenv, conn);
997 +       }
998 +       return (ret);
999 + }
1000
1001 + static int
1002   finish_connecting(dbenv, conn)
1003         DB_ENV *dbenv;
1004         REPMGR_CONNECTION *conn;
1005 ***************
1006 *** 657,662 ****
1007 --- 734,740 ----
1008                 goto err_rpt;
1009         }
1010   
1011 +       DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING));
1012         F_CLR(conn, CONN_CONNECTING);
1013         return (__repmgr_send_handshake(dbenv, conn));
1014   
1015 ***************
1016 *** 671,690 ****
1017             "connecting to %s", __repmgr_format_site_loc(site, buffer));
1018   
1019         /* If we've exhausted the list of possible addresses, give up. */
1020 !       if (ADDR_LIST_NEXT(&site->net_addr) == NULL)
1021                 return (DB_REP_UNAVAIL);
1022   
1023         /*
1024          * This is just like a little mini-"bust_connection", except that we
1025          * don't reschedule for later, 'cuz we're just about to try again right
1026 !        * now.
1027          *
1028          * !!!
1029          * Which means this must only be called on the select() thread, since
1030          * only there are we allowed to actually close a connection.
1031          */
1032         DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1033 !       __repmgr_cleanup_connection(dbenv, conn);
1034         ret = __repmgr_connect_site(dbenv, eid);
1035         DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1036         return (ret);
1037 --- 749,773 ----
1038             "connecting to %s", __repmgr_format_site_loc(site, buffer));
1039   
1040         /* If we've exhausted the list of possible addresses, give up. */
1041 !       if (ADDR_LIST_NEXT(&site->net_addr) == NULL) {
1042 !               STAT(db_rep->region->mstat.st_connect_fail++);
1043                 return (DB_REP_UNAVAIL);
1044 +       }
1045   
1046         /*
1047          * This is just like a little mini-"bust_connection", except that we
1048          * don't reschedule for later, 'cuz we're just about to try again right
1049 !        * now.  (Note that we don't have to worry about message threads
1050 !        * blocking on a full output queue: that can't happen when we're only
1051 !        * just connecting.)
1052          *
1053          * !!!
1054          * Which means this must only be called on the select() thread, since
1055          * only there are we allowed to actually close a connection.
1056          */
1057         DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1058 !       if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
1059 !               return (ret);
1060         ret = __repmgr_connect_site(dbenv, eid);
1061         DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1062         return (ret);
1063 *** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700
1064 --- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700
1065 ***************
1066 *** 36,45 ****
1067   
1068   /*
1069    * PUBLIC: int __repmgr_accept __P((DB_ENV *));
1070 -  *
1071 -  * !!!
1072 -  * Only ever called in the select() thread, since we may call
1073 -  * __repmgr_bust_connection(..., TRUE).
1074    */
1075   int
1076   __repmgr_accept(dbenv)
1077 --- 36,41 ----
1078 ***************
1079 *** 133,139 ****
1080         case 0:
1081                 return (0);
1082         case DB_REP_UNAVAIL:
1083 !               return (__repmgr_bust_connection(dbenv, conn, TRUE));
1084         default:
1085                 return (ret);
1086         }
1087 --- 129,135 ----
1088         case 0:
1089                 return (0);
1090         case DB_REP_UNAVAIL:
1091 !               return (__repmgr_bust_connection(dbenv, conn));
1092         default:
1093                 return (ret);
1094         }
1095 ***************
1096 *** 254,263 ****
1097    * starting with the "current" element of its address list and trying as many
1098    * addresses as necessary until the list is exhausted.
1099    *
1100 -  * !!!
1101 -  * Only ever called in the select() thread, since we may call
1102 -  * __repmgr_bust_connection(..., TRUE).
1103 -  *
1104    * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid));
1105    */
1106   int
1107 --- 250,255 ----
1108 ***************
1109 *** 332,338 ****
1110                 case 0:
1111                         break;
1112                 case DB_REP_UNAVAIL:
1113 !                       return (__repmgr_bust_connection(dbenv, con, TRUE));
1114                 default:
1115                         return (ret);
1116                 }
1117 --- 324,330 ----
1118                 case 0:
1119                         break;
1120                 case DB_REP_UNAVAIL:
1121 !                       return (__repmgr_bust_connection(dbenv, con));
1122                 default:
1123                         return (ret);
1124                 }
1125 ***************
1126 *** 437,443 ****
1127   
1128         DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
1129   
1130 !       return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec));
1131   }
1132   
1133   /*
1134 --- 429,443 ----
1135   
1136         DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1);
1137   
1138 !       /*
1139 !        * It would of course be disastrous to block the select() thread, so
1140 !        * pass the "blockable" argument as FALSE.  Fortunately blocking should
1141 !        * never be necessary here, because the hand-shake is always the first
1142 !        * thing we send.  Which is a good thing, because it would be almost as
1143 !        * disastrous if we allowed ourselves to drop a handshake.
1144 !        */
1145 !       return (__repmgr_send_one(dbenv,
1146 !           conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE));
1147   }
1148   
1149   /*
1150 ***************
1151 *** 854,859 ****
1152 --- 854,872 ----
1153                         conn->out_queue_length--;
1154                         if (--msg->ref_count <= 0)
1155                                 __os_free(dbenv, msg);
1156
1157 +                       /*
1158 +                        * We've achieved enough movement to free up at least
1159 +                        * one space in the outgoing queue.  Wake any message
1160 +                        * threads that may be waiting for space.  Clear the
1161 +                        * CONGESTED status so that when the queue reaches the
1162 +                        * high-water mark again, the filling thread will be
1163 +                        * allowed to try waiting again.
1164 +                        */
1165 +                       F_CLR(conn, CONN_CONGESTED);
1166 +                       if (conn->blockers > 0 &&
1167 +                           (ret = __repmgr_signal(&conn->drained)) != 0)
1168 +                               return (ret);
1169                 }
1170         }
1171   
1172 *** repmgr/repmgr_util.c        2007-10-31 10:23:52.000000000 -0700
1173 --- repmgr/repmgr_util.c        2007-10-31 10:23:53.000000000 -0700
1174 ***************
1175 *** 103,108 ****
1176 --- 103,113 ----
1177         db_rep = dbenv->rep_handle;
1178         if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0)
1179                 return (ret);
1180 +       if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) {
1181 +               __os_free(dbenv, c);
1182 +               return (ret);
1183 +       }
1184 +       c->blockers = 0;
1185   
1186         c->fd = s;
1187         c->flags = flags;
1188 *** repmgr/repmgr_windows.c     2007-10-31 10:23:52.000000000 -0700
1189 --- repmgr/repmgr_windows.c     2007-10-31 10:23:53.000000000 -0700
1190 ***************
1191 *** 11,16 ****
1192 --- 11,19 ----
1193   #define       __INCLUDE_NETWORKING    1
1194   #include "db_int.h"
1195   
1196 + /* Convert time-out from microseconds to milliseconds, rounding up. */
1197 + #define       DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
1198
1199   typedef struct __ack_waiter {
1200         HANDLE event;
1201         const DB_LSN *lsnp;
1202 ***************
1203 *** 120,136 ****
1204   {
1205         DB_REP *db_rep;
1206         ACK_WAITER *me;
1207 !       DWORD ret;
1208 !       DWORD timeout;
1209   
1210         db_rep = dbenv->rep_handle;
1211   
1212         if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
1213                 goto err;
1214   
1215 -       /* convert time-out from microseconds to milliseconds, rounding up */
1216         timeout = db_rep->ack_timeout > 0 ?
1217 !           ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE;
1218         me->lsnp = lsnp;
1219         if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
1220             FALSE)) == WAIT_FAILED) {
1221 --- 123,137 ----
1222   {
1223         DB_REP *db_rep;
1224         ACK_WAITER *me;
1225 !       DWORD ret, timeout;
1226   
1227         db_rep = dbenv->rep_handle;
1228   
1229         if ((ret = allocate_wait_slot(dbenv, &me)) != 0)
1230                 goto err;
1231   
1232         timeout = db_rep->ack_timeout > 0 ?
1233 !           DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
1234         me->lsnp = lsnp;
1235         if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
1236             FALSE)) == WAIT_FAILED) {
1237 ***************
1238 *** 211,216 ****
1239 --- 212,296 ----
1240         db_rep->waiters->first_free = slot;
1241   }
1242   
1243 + /* (See requirements described in repmgr_posix.c.) */
1244 + int
1245 + __repmgr_await_drain(dbenv, conn, timeout)
1246 +       DB_ENV *dbenv;
1247 +       REPMGR_CONNECTION *conn;
1248 +       db_timeout_t timeout;
1249 + {
1250 +       DB_REP *db_rep;
1251 +       db_timespec deadline, delta, now;
1252 +       db_timeout_t t;
1253 +       DWORD duration, ret;
1254 +       int round_up;
1255 +       
1256 +       db_rep = dbenv->rep_handle;
1257 +       
1258 +       __os_gettime(dbenv, &deadline);
1259 +       DB_TIMEOUT_TO_TIMESPEC(timeout, &delta);
1260 +       timespecadd(&deadline, &delta);
1261
1262 +       while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
1263 +               if (!ResetEvent(conn->drained))
1264 +                       return (GetLastError());
1265
1266 +               /* How long until the deadline? */
1267 +               __os_gettime(dbenv, &now);
1268 +               if (timespeccmp(&now, &deadline, >=)) {
1269 +                       F_SET(conn, CONN_CONGESTED);
1270 +                       return (0);
1271 +               }
1272 +               delta = deadline;
1273 +               timespecsub(&delta, &now);
1274 +               round_up = TRUE;
1275 +               DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
1276 +               duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
1277
1278 +               ret = SignalObjectAndWait(db_rep->mutex,
1279 +                   conn->drained, duration, FALSE);
1280 +               LOCK_MUTEX(db_rep->mutex);
1281 +               if (ret == WAIT_FAILED)
1282 +                       return (GetLastError());
1283 +               else if (ret == WAIT_TIMEOUT) {
1284 +                       F_SET(conn, CONN_CONGESTED);
1285 +                       return (0);
1286 +               } else
1287 +                       DB_ASSERT(dbenv, ret == WAIT_OBJECT_0);
1288 +               
1289 +               if (db_rep->finished)
1290 +                       return (0);
1291 +               if (F_ISSET(conn, CONN_DEFUNCT))
1292 +                       return (DB_REP_UNAVAIL);
1293 +       }
1294 +       return (0);
1295 + }
1296
1297 + /*
1298 +  * Creates a manual reset event, which is usually our best choice when we may
1299 +  * have multiple threads waiting on a single event.
1300 +  */
1301 + int
1302 + __repmgr_alloc_cond(c)
1303 +       cond_var_t *c;
1304 + {
1305 +       HANDLE event;
1306
1307 +       if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
1308 +               return (GetLastError());
1309 +       *c = event;
1310 +       return (0);
1311 + }
1312
1313 + int
1314 + __repmgr_free_cond(c)
1315 +       cond_var_t *c;
1316 + {
1317 +       if (CloseHandle(*c))
1318 +               return (0);
1319 +       return (GetLastError());
1320 + }
1321
1322   /*
1323    * Make resource allocation an all-or-nothing affair, outside of this and the
1324    * close_sync function.  db_rep->waiters should be non-NULL iff all of these
1325 ***************
1326 *** 488,493 ****
1327 --- 568,576 ----
1328                  * don't hurt anything flow-control-wise.
1329                  */
1330                 TAILQ_FOREACH(conn, &db_rep->connections, entries) {
1331 +                       if (F_ISSET(conn, CONN_DEFUNCT))
1332 +                               continue;
1333
1334                         if (F_ISSET(conn, CONN_CONNECTING) ||
1335                             !STAILQ_EMPTY(&conn->outbound_queue) ||
1336                             (!flow_control || !IS_VALID_EID(conn->eid))) {
1337 ***************
1338 *** 534,541 ****
1339                      conn != NULL;
1340                      conn = next) {
1341                         next = TAILQ_NEXT(conn, entries);
1342 !                       if (F_ISSET(conn, CONN_DEFUNCT))
1343 !                               __repmgr_cleanup_connection(dbenv, conn);
1344                 }
1345   
1346                 /*
1347 --- 617,626 ----
1348                      conn != NULL;
1349                      conn = next) {
1350                         next = TAILQ_NEXT(conn, entries);
1351 !                       if (F_ISSET(conn, CONN_DEFUNCT) &&
1352 !                           (ret = __repmgr_cleanup_connection(dbenv,
1353 !                           conn)) != 0)
1354 !                               goto unlock;
1355                 }
1356   
1357                 /*
1358 ***************
1359 *** 587,597 ****
1360         return (ret);
1361   }
1362   
1363 - /*
1364 -  * !!!
1365 -  * Only ever called on the select() thread, since we may call
1366 -  * __repmgr_bust_connection(..., TRUE).
1367 -  */
1368   static int
1369   handle_completion(dbenv, conn)
1370         DB_ENV *dbenv;
1371 --- 672,677 ----
1372 ***************
1373 *** 651,660 ****
1374                 }
1375         }
1376   
1377 !       return (0);
1378
1379 ! err:  if (ret == DB_REP_UNAVAIL)
1380 !               return (__repmgr_bust_connection(dbenv, conn, TRUE));
1381         return (ret);
1382   }
1383   
1384 --- 731,742 ----
1385                 }
1386         }
1387   
1388 ! err:
1389 !       if (ret == DB_REP_UNAVAIL) {
1390 !               if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0)
1391 !                       return (ret);
1392 !               ret = __repmgr_cleanup_connection(dbenv, conn);
1393 !       }
1394         return (ret);
1395   }
1396   
1397 ***************
1398 *** 708,714 ****
1399         }
1400   
1401         DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1402 !       __repmgr_cleanup_connection(dbenv, conn);
1403         ret = __repmgr_connect_site(dbenv, eid);
1404         DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1405         return (ret);
1406 --- 790,797 ----
1407         }
1408   
1409         DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections));
1410 !       if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0)
1411 !               return (ret);
1412         ret = __repmgr_connect_site(dbenv, eid);
1413         DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL);
1414         return (ret);
This page took 0.309904 seconds and 3 git commands to generate.