]>
Commit | Line | Data |
---|---|---|
731bfa0a ER |
1 | *** dbinc/repmgr.h 2007-10-31 10:23:52.000000000 -0700 |
2 | --- dbinc/repmgr.h 2007-10-31 10:23:53.000000000 -0700 | |
3 | *************** | |
4 | *** 36,41 **** | |
5 | --- 36,55 ---- | |
6 | #endif | |
7 | ||
8 | /* | |
9 | + * The (arbitrary) maximum number of outgoing messages we're willing to hold, on | |
10 | + * a queue per connection, waiting for TCP buffer space to become available in | |
11 | + * the kernel. Rather than exceeding this limit, we simply discard additional | |
12 | + * messages (since this is always allowed by the replication protocol). | |
13 | + * As a special dispensation, if a message is destined for a specific remote | |
14 | + * site (i.e., it's not a broadcast), then we first try blocking the sending | |
15 | + * thread, waiting for space to become available (though we only wait a limited | |
16 | + * time). This is so as to be able to handle the immediate flood of (a | |
17 | + * potentially large number of) outgoing messages that replication generates, in | |
18 | + * a tight loop, when handling PAGE_REQ, LOG_REQ and ALL_REQ requests. | |
19 | + */ | |
20 | + #define OUT_QUEUE_LIMIT 10 | |
21 | + | |
22 | + /* | |
23 | * The system value is available from sysconf(_SC_HOST_NAME_MAX). | |
24 | * Historically, the maximum host name was 256. | |
25 | */ | |
26 | *************** | |
27 | *** 47,52 **** | |
28 | --- 61,71 ---- | |
29 | #define MAX_SITE_LOC_STRING (MAXHOSTNAMELEN+20) | |
30 | typedef char SITE_STRING_BUFFER[MAX_SITE_LOC_STRING+1]; | |
31 | ||
32 | + /* Default timeout values, in seconds. */ | |
33 | + #define DB_REPMGR_DEFAULT_ACK_TIMEOUT (1 * US_PER_SEC) | |
34 | + #define DB_REPMGR_DEFAULT_CONNECTION_RETRY (30 * US_PER_SEC) | |
35 | + #define DB_REPMGR_DEFAULT_ELECTION_RETRY (10 * US_PER_SEC) | |
36 | + | |
37 | struct __repmgr_connection; | |
38 | typedef struct __repmgr_connection REPMGR_CONNECTION; | |
39 | struct __repmgr_queue; typedef struct __repmgr_queue REPMGR_QUEUE; | |
40 | *************** | |
41 | *** 171,178 **** | |
42 | #ifdef DB_WIN32 | |
43 | WSAEVENT event_object; | |
44 | #endif | |
45 | ! #define CONN_CONNECTING 0x01 /* nonblocking connect in progress */ | |
46 | ! #define CONN_DEFUNCT 0x02 /* socket close pending */ | |
47 | u_int32_t flags; | |
48 | ||
49 | /* | |
50 | --- 190,198 ---- | |
51 | #ifdef DB_WIN32 | |
52 | WSAEVENT event_object; | |
53 | #endif | |
54 | ! #define CONN_CONGESTED 0x01 /* msg thread wait has exceeded timeout */ | |
55 | ! #define CONN_CONNECTING 0x02 /* nonblocking connect in progress */ | |
56 | ! #define CONN_DEFUNCT 0x04 /* socket close pending */ | |
57 | u_int32_t flags; | |
58 | ||
59 | /* | |
60 | *************** | |
61 | *** 180,189 **** | |
62 | * send() function's thread. But if TCP doesn't have enough network | |
63 | * buffer space for us when we first try it, we instead allocate some | |
64 | * memory, and copy the message, and then send it as space becomes | |
65 | ! * available in our main select() thread. | |
66 | */ | |
67 | OUT_Q_HEADER outbound_queue; | |
68 | int out_queue_length; | |
69 | ||
70 | /* | |
71 | * Input: while we're reading a message, we keep track of what phase | |
72 | --- 200,215 ---- | |
73 | * send() function's thread. But if TCP doesn't have enough network | |
74 | * buffer space for us when we first try it, we instead allocate some | |
75 | * memory, and copy the message, and then send it as space becomes | |
76 | ! * available in our main select() thread. In some cases, if the queue | |
77 | ! * gets too long we wait until it's drained, and then append to it. | |
78 | ! * This condition variable's associated mutex is the normal per-repmgr | |
79 | ! * db_rep->mutex, because that mutex is always held anyway whenever the | |
80 | ! * output queue is consulted. | |
81 | */ | |
82 | OUT_Q_HEADER outbound_queue; | |
83 | int out_queue_length; | |
84 | + cond_var_t drained; | |
85 | + int blockers; /* ref count of msg threads waiting on us */ | |
86 | ||
87 | /* | |
88 | * Input: while we're reading a message, we keep track of what phase | |
89 | *** dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700 | |
90 | --- dbinc_auto/int_def.in 2007-10-31 10:23:52.000000000 -0700 | |
91 | *************** | |
92 | *** 1420,1425 **** | |
93 | --- 1420,1428 ---- | |
94 | #define __repmgr_wake_waiting_senders __repmgr_wake_waiting_senders@DB_VERSION_UNIQUE_NAME@ | |
95 | #define __repmgr_await_ack __repmgr_await_ack@DB_VERSION_UNIQUE_NAME@ | |
96 | #define __repmgr_compute_wait_deadline __repmgr_compute_wait_deadline@DB_VERSION_UNIQUE_NAME@ | |
97 | + #define __repmgr_await_drain __repmgr_await_drain@DB_VERSION_UNIQUE_NAME@ | |
98 | + #define __repmgr_alloc_cond __repmgr_alloc_cond@DB_VERSION_UNIQUE_NAME@ | |
99 | + #define __repmgr_free_cond __repmgr_free_cond@DB_VERSION_UNIQUE_NAME@ | |
100 | #define __repmgr_init_sync __repmgr_init_sync@DB_VERSION_UNIQUE_NAME@ | |
101 | #define __repmgr_close_sync __repmgr_close_sync@DB_VERSION_UNIQUE_NAME@ | |
102 | #define __repmgr_net_init __repmgr_net_init@DB_VERSION_UNIQUE_NAME@ | |
103 | *** dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700 | |
104 | --- dbinc_auto/repmgr_ext.h 2007-10-31 10:23:52.000000000 -0700 | |
105 | *************** | |
106 | *** 21,30 **** | |
107 | int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); | |
108 | void __repmgr_stash_generation __P((DB_ENV *)); | |
109 | int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); | |
110 | ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *)); | |
111 | int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); | |
112 | ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *, int)); | |
113 | ! void __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *)); | |
114 | int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); | |
115 | int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *)); | |
116 | int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **)); | |
117 | --- 21,30 ---- | |
118 | int __repmgr_handle_event __P((DB_ENV *, u_int32_t, void *)); | |
119 | void __repmgr_stash_generation __P((DB_ENV *)); | |
120 | int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *, int, u_int32_t)); | |
121 | ! int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, u_int, const DBT *, const DBT *, int)); | |
122 | int __repmgr_is_permanent __P((DB_ENV *, const DB_LSN *)); | |
123 | ! int __repmgr_bust_connection __P((DB_ENV *, REPMGR_CONNECTION *)); | |
124 | ! int __repmgr_cleanup_connection __P((DB_ENV *, REPMGR_CONNECTION *)); | |
125 | int __repmgr_find_site __P((DB_ENV *, const char *, u_int)); | |
126 | int __repmgr_pack_netaddr __P((DB_ENV *, const char *, u_int, ADDRINFO *, repmgr_netaddr_t *)); | |
127 | int __repmgr_getaddr __P((DB_ENV *, const char *, u_int, int, ADDRINFO **)); | |
128 | *************** | |
129 | *** 39,44 **** | |
130 | --- 39,47 ---- | |
131 | int __repmgr_wake_waiting_senders __P((DB_ENV *)); | |
132 | int __repmgr_await_ack __P((DB_ENV *, const DB_LSN *)); | |
133 | void __repmgr_compute_wait_deadline __P((DB_ENV*, struct timespec *, db_timeout_t)); | |
134 | + int __repmgr_await_drain __P((DB_ENV *, REPMGR_CONNECTION *, db_timeout_t)); | |
135 | + int __repmgr_alloc_cond __P((cond_var_t *)); | |
136 | + int __repmgr_free_cond __P((cond_var_t *)); | |
137 | int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); | |
138 | int __repmgr_close_sync __P((DB_ENV *)); | |
139 | int __repmgr_net_init __P((DB_ENV *, DB_REP *)); | |
140 | *** repmgr/repmgr_method.c 2007-10-31 10:23:52.000000000 -0700 | |
141 | --- repmgr/repmgr_method.c 2007-10-31 10:23:53.000000000 -0700 | |
142 | *************** | |
143 | *** 196,204 **** | |
144 | int ret; | |
145 | ||
146 | /* Set some default values. */ | |
147 | ! db_rep->ack_timeout = 1 * US_PER_SEC; /* 1 second */ | |
148 | ! db_rep->connection_retry_wait = 30 * US_PER_SEC; /* 30 seconds */ | |
149 | ! db_rep->election_retry_wait = 10 * US_PER_SEC; /* 10 seconds */ | |
150 | db_rep->config_nsites = 0; | |
151 | db_rep->peer = DB_EID_INVALID; | |
152 | db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; | |
153 | --- 196,204 ---- | |
154 | int ret; | |
155 | ||
156 | /* Set some default values. */ | |
157 | ! db_rep->ack_timeout = DB_REPMGR_DEFAULT_ACK_TIMEOUT; | |
158 | ! db_rep->connection_retry_wait = DB_REPMGR_DEFAULT_CONNECTION_RETRY; | |
159 | ! db_rep->election_retry_wait = DB_REPMGR_DEFAULT_ELECTION_RETRY; | |
160 | db_rep->config_nsites = 0; | |
161 | db_rep->peer = DB_EID_INVALID; | |
162 | db_rep->perm_policy = DB_REPMGR_ACKS_QUORUM; | |
163 | *************** | |
164 | *** 238,243 **** | |
165 | --- 238,244 ---- | |
166 | DB_ENV *dbenv; | |
167 | { | |
168 | DB_REP *db_rep; | |
169 | + REPMGR_CONNECTION *conn; | |
170 | int ret; | |
171 | ||
172 | db_rep = dbenv->rep_handle; | |
173 | *************** | |
174 | *** 254,259 **** | |
175 | --- 255,266 ---- | |
176 | ||
177 | if ((ret = __repmgr_signal(&db_rep->queue_nonempty)) != 0) | |
178 | goto unlock; | |
179 | + | |
180 | + TAILQ_FOREACH(conn, &db_rep->connections, entries) { | |
181 | + if (conn->blockers > 0 && | |
182 | + ((ret = __repmgr_signal(&conn->drained)) != 0)) | |
183 | + goto unlock; | |
184 | + } | |
185 | UNLOCK_MUTEX(db_rep->mutex); | |
186 | ||
187 | return (__repmgr_wake_main_thread(dbenv)); | |
188 | *** repmgr/repmgr_msg.c 2007-10-31 10:23:52.000000000 -0700 | |
189 | --- repmgr/repmgr_msg.c 2007-10-31 10:23:53.000000000 -0700 | |
190 | *************** | |
191 | *** 183,192 **** | |
192 | ||
193 | /* | |
194 | * Acknowledges a message. | |
195 | - * | |
196 | - * !!! | |
197 | - * Note that this cannot be called from the select() thread, in case we call | |
198 | - * __repmgr_bust_connection(..., FALSE). | |
199 | */ | |
200 | static int | |
201 | ack_message(dbenv, generation, lsn) | |
202 | --- 183,188 ---- | |
203 | *************** | |
204 | *** 227,235 **** | |
205 | rec2.size = 0; | |
206 | ||
207 | conn = site->ref.conn; | |
208 | if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, | |
209 | ! &control2, &rec2)) == DB_REP_UNAVAIL) | |
210 | ! ret = __repmgr_bust_connection(dbenv, conn, FALSE); | |
211 | } | |
212 | ||
213 | UNLOCK_MUTEX(db_rep->mutex); | |
214 | --- 223,236 ---- | |
215 | rec2.size = 0; | |
216 | ||
217 | conn = site->ref.conn; | |
218 | + /* | |
219 | + * It's hard to imagine anyone would care about a lost ack if | |
220 | + * the path to the master is so congested as to need blocking; | |
221 | + * so pass "blockable" argument as FALSE. | |
222 | + */ | |
223 | if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_ACK, | |
224 | ! &control2, &rec2, FALSE)) == DB_REP_UNAVAIL) | |
225 | ! ret = __repmgr_bust_connection(dbenv, conn); | |
226 | } | |
227 | ||
228 | UNLOCK_MUTEX(db_rep->mutex); | |
229 | *** repmgr/repmgr_net.c 2007-10-31 10:23:52.000000000 -0700 | |
230 | --- repmgr/repmgr_net.c 2007-10-31 10:23:53.000000000 -0700 | |
231 | *************** | |
232 | *** 63,69 **** | |
233 | static void setup_sending_msg | |
234 | __P((struct sending_msg *, u_int, const DBT *, const DBT *)); | |
235 | static int __repmgr_send_internal | |
236 | ! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *)); | |
237 | static int enqueue_msg | |
238 | __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); | |
239 | static int flatten __P((DB_ENV *, struct sending_msg *)); | |
240 | --- 63,69 ---- | |
241 | static void setup_sending_msg | |
242 | __P((struct sending_msg *, u_int, const DBT *, const DBT *)); | |
243 | static int __repmgr_send_internal | |
244 | ! __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, int)); | |
245 | static int enqueue_msg | |
246 | __P((DB_ENV *, REPMGR_CONNECTION *, struct sending_msg *, size_t)); | |
247 | static int flatten __P((DB_ENV *, struct sending_msg *)); | |
248 | *************** | |
249 | *** 73,85 **** | |
250 | * __repmgr_send -- | |
251 | * The send function for DB_ENV->rep_set_transport. | |
252 | * | |
253 | - * !!! | |
254 | - * This is only ever called as the replication transport call-back, which means | |
255 | - * it's either on one of our message processing threads or an application | |
256 | - * thread. It mustn't be called from the select() thread, because we might call | |
257 | - * __repmgr_bust_connection(..., FALSE) here, and that's not allowed in the | |
258 | - * select() thread. | |
259 | - * | |
260 | * PUBLIC: int __repmgr_send __P((DB_ENV *, const DBT *, const DBT *, | |
261 | * PUBLIC: const DB_LSN *, int, u_int32_t)); | |
262 | */ | |
263 | --- 73,78 ---- | |
264 | *************** | |
265 | *** 126,134 **** | |
266 | } | |
267 | ||
268 | conn = site->ref.conn; | |
269 | if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, | |
270 | ! control, rec)) == DB_REP_UNAVAIL && | |
271 | ! (t_ret = __repmgr_bust_connection(dbenv, conn, FALSE)) != 0) | |
272 | ret = t_ret; | |
273 | if (ret != 0) | |
274 | goto out; | |
275 | --- 119,128 ---- | |
276 | } | |
277 | ||
278 | conn = site->ref.conn; | |
279 | + /* Pass the "blockable" argument as TRUE. */ | |
280 | if ((ret = __repmgr_send_one(dbenv, conn, REPMGR_REP_MESSAGE, | |
281 | ! control, rec, TRUE)) == DB_REP_UNAVAIL && | |
282 | ! (t_ret = __repmgr_bust_connection(dbenv, conn)) != 0) | |
283 | ret = t_ret; | |
284 | if (ret != 0) | |
285 | goto out; | |
286 | *************** | |
287 | *** 222,228 **** | |
288 | if (site->state != SITE_CONNECTED) | |
289 | return (NULL); | |
290 | ||
291 | ! if (F_ISSET(site->ref.conn, CONN_CONNECTING)) | |
292 | return (NULL); | |
293 | return (site); | |
294 | } | |
295 | --- 216,222 ---- | |
296 | if (site->state != SITE_CONNECTED) | |
297 | return (NULL); | |
298 | ||
299 | ! if (F_ISSET(site->ref.conn, CONN_CONNECTING|CONN_DEFUNCT)) | |
300 | return (NULL); | |
301 | return (site); | |
302 | } | |
303 | *************** | |
304 | *** 235,244 **** | |
305 | * | |
306 | * !!! | |
307 | * Caller must hold dbenv->mutex. | |
308 | - * | |
309 | - * !!! | |
310 | - * Note that this cannot be called from the select() thread, in case we call | |
311 | - * __repmgr_bust_connection(..., FALSE). | |
312 | */ | |
313 | static int | |
314 | __repmgr_send_broadcast(dbenv, control, rec, nsitesp, npeersp) | |
315 | --- 229,234 ---- | |
316 | *************** | |
317 | *** 268,281 **** | |
318 | !IS_VALID_EID(conn->eid)) | |
319 | continue; | |
320 | ||
321 | ! if ((ret = __repmgr_send_internal(dbenv, conn, &msg)) == 0) { | |
322 | site = SITE_FROM_EID(conn->eid); | |
323 | nsites++; | |
324 | if (site->priority > 0) | |
325 | npeers++; | |
326 | } else if (ret == DB_REP_UNAVAIL) { | |
327 | ! if ((ret = __repmgr_bust_connection( | |
328 | ! dbenv, conn, FALSE)) != 0) | |
329 | return (ret); | |
330 | } else | |
331 | return (ret); | |
332 | --- 258,277 ---- | |
333 | !IS_VALID_EID(conn->eid)) | |
334 | continue; | |
335 | ||
336 | ! /* | |
337 | ! * Broadcast messages are either application threads committing | |
338 | ! * transactions, or replication status message that we can | |
339 | ! * afford to lose. So don't allow blocking for them (pass | |
340 | ! * "blockable" argument as FALSE). | |
341 | ! */ | |
342 | ! if ((ret = __repmgr_send_internal(dbenv, | |
343 | ! conn, &msg, FALSE)) == 0) { | |
344 | site = SITE_FROM_EID(conn->eid); | |
345 | nsites++; | |
346 | if (site->priority > 0) | |
347 | npeers++; | |
348 | } else if (ret == DB_REP_UNAVAIL) { | |
349 | ! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) | |
350 | return (ret); | |
351 | } else | |
352 | return (ret); | |
353 | *************** | |
354 | *** 301,339 **** | |
355 | * intersperse writes that are part of two single messages. | |
356 | * | |
357 | * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, | |
358 | ! * PUBLIC: u_int, const DBT *, const DBT *)); | |
359 | */ | |
360 | int | |
361 | ! __repmgr_send_one(dbenv, conn, msg_type, control, rec) | |
362 | DB_ENV *dbenv; | |
363 | REPMGR_CONNECTION *conn; | |
364 | u_int msg_type; | |
365 | const DBT *control, *rec; | |
366 | { | |
367 | struct sending_msg msg; | |
368 | ||
369 | setup_sending_msg(&msg, msg_type, control, rec); | |
370 | ! return (__repmgr_send_internal(dbenv, conn, &msg)); | |
371 | } | |
372 | ||
373 | /* | |
374 | * Attempts a "best effort" to send a message on the given site. If there is an | |
375 | ! * excessive backlog of message already queued on the connection, we simply drop | |
376 | ! * this message, and still return 0 even in this case. | |
377 | */ | |
378 | static int | |
379 | ! __repmgr_send_internal(dbenv, conn, msg) | |
380 | DB_ENV *dbenv; | |
381 | REPMGR_CONNECTION *conn; | |
382 | struct sending_msg *msg; | |
383 | { | |
384 | ! #define OUT_QUEUE_LIMIT 10 /* arbitrary, for now */ | |
385 | REPMGR_IOVECS iovecs; | |
386 | SITE_STRING_BUFFER buffer; | |
387 | int ret; | |
388 | size_t nw; | |
389 | size_t total_written; | |
390 | ||
391 | DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); | |
392 | if (!STAILQ_EMPTY(&conn->outbound_queue)) { | |
393 | /* | |
394 | --- 297,355 ---- | |
395 | * intersperse writes that are part of two single messages. | |
396 | * | |
397 | * PUBLIC: int __repmgr_send_one __P((DB_ENV *, REPMGR_CONNECTION *, | |
398 | ! * PUBLIC: u_int, const DBT *, const DBT *, int)); | |
399 | */ | |
400 | int | |
401 | ! __repmgr_send_one(dbenv, conn, msg_type, control, rec, blockable) | |
402 | DB_ENV *dbenv; | |
403 | REPMGR_CONNECTION *conn; | |
404 | u_int msg_type; | |
405 | const DBT *control, *rec; | |
406 | + int blockable; | |
407 | { | |
408 | struct sending_msg msg; | |
409 | ||
410 | setup_sending_msg(&msg, msg_type, control, rec); | |
411 | ! return (__repmgr_send_internal(dbenv, conn, &msg, blockable)); | |
412 | } | |
413 | ||
414 | /* | |
415 | * Attempts a "best effort" to send a message on the given site. If there is an | |
416 | ! * excessive backlog of message already queued on the connection, what shall we | |
417 | ! * do? If the caller doesn't mind blocking, we'll wait (a limited amount of | |
418 | ! * time) for the queue to drain. Otherwise we'll simply drop the message. This | |
419 | ! * is always allowed by the replication protocol. But in the case of a | |
420 | ! * multi-message response to a request like PAGE_REQ, LOG_REQ or ALL_REQ we | |
421 | ! * almost always get a flood of messages that instantly fills our queue, so | |
422 | ! * blocking improves performance (by avoiding the need for the client to | |
423 | ! * re-request). | |
424 | ! * | |
425 | ! * How long shall we wait? We could of course create a new timeout | |
426 | ! * configuration type, so that the application could set it directly. But that | |
427 | ! * would start to overwhelm the user with too many choices to think about. We | |
428 | ! * already have an ACK timeout, which is the user's estimate of how long it | |
429 | ! * should take to send a message to the client, have it be processed, and return | |
430 | ! * a message back to us. We multiply that by the queue size, because that's how | |
431 | ! * many messages have to be swallowed up by the client before we're able to | |
432 | ! * start sending again (at least to a rough approximation). | |
433 | */ | |
434 | static int | |
435 | ! __repmgr_send_internal(dbenv, conn, msg, blockable) | |
436 | DB_ENV *dbenv; | |
437 | REPMGR_CONNECTION *conn; | |
438 | struct sending_msg *msg; | |
439 | + int blockable; | |
440 | { | |
441 | ! DB_REP *db_rep; | |
442 | REPMGR_IOVECS iovecs; | |
443 | SITE_STRING_BUFFER buffer; | |
444 | + db_timeout_t drain_to; | |
445 | int ret; | |
446 | size_t nw; | |
447 | size_t total_written; | |
448 | ||
449 | + db_rep = dbenv->rep_handle; | |
450 | + | |
451 | DB_ASSERT(dbenv, !F_ISSET(conn, CONN_CONNECTING)); | |
452 | if (!STAILQ_EMPTY(&conn->outbound_queue)) { | |
453 | /* | |
454 | *************** | |
455 | *** 344,358 **** | |
456 | RPRINT(dbenv, (dbenv, "msg to %s to be queued", | |
457 | __repmgr_format_eid_loc(dbenv->rep_handle, | |
458 | conn->eid, buffer))); | |
459 | if (conn->out_queue_length < OUT_QUEUE_LIMIT) | |
460 | return (enqueue_msg(dbenv, conn, msg, 0)); | |
461 | else { | |
462 | RPRINT(dbenv, (dbenv, "queue limit exceeded")); | |
463 | STAT(dbenv->rep_handle-> | |
464 | region->mstat.st_msgs_dropped++); | |
465 | ! return (0); | |
466 | } | |
467 | } | |
468 | ||
469 | /* | |
470 | * Send as much data to the site as we can, without blocking. Keep | |
471 | --- 360,393 ---- | |
472 | RPRINT(dbenv, (dbenv, "msg to %s to be queued", | |
473 | __repmgr_format_eid_loc(dbenv->rep_handle, | |
474 | conn->eid, buffer))); | |
475 | + if (conn->out_queue_length >= OUT_QUEUE_LIMIT && | |
476 | + blockable && !F_ISSET(conn, CONN_CONGESTED)) { | |
477 | + RPRINT(dbenv, (dbenv, | |
478 | + "block msg thread, await queue space")); | |
479 | + | |
480 | + if ((drain_to = db_rep->ack_timeout) == 0) | |
481 | + drain_to = DB_REPMGR_DEFAULT_ACK_TIMEOUT; | |
482 | + conn->blockers++; | |
483 | + ret = __repmgr_await_drain(dbenv, | |
484 | + conn, drain_to * OUT_QUEUE_LIMIT); | |
485 | + conn->blockers--; | |
486 | + if (db_rep->finished) | |
487 | + return (DB_TIMEOUT); | |
488 | + if (ret != 0) | |
489 | + return (ret); | |
490 | + if (STAILQ_EMPTY(&conn->outbound_queue)) | |
491 | + goto empty; | |
492 | + } | |
493 | if (conn->out_queue_length < OUT_QUEUE_LIMIT) | |
494 | return (enqueue_msg(dbenv, conn, msg, 0)); | |
495 | else { | |
496 | RPRINT(dbenv, (dbenv, "queue limit exceeded")); | |
497 | STAT(dbenv->rep_handle-> | |
498 | region->mstat.st_msgs_dropped++); | |
499 | ! return (blockable ? DB_TIMEOUT : 0); | |
500 | } | |
501 | } | |
502 | + empty: | |
503 | ||
504 | /* | |
505 | * Send as much data to the site as we can, without blocking. Keep | |
506 | *************** | |
507 | *** 498,521 **** | |
508 | ||
509 | /* | |
510 | * Abandons a connection, to recover from an error. Upon entry the conn struct | |
511 | ! * must be on the connections list. | |
512 | ! * | |
513 | ! * If the 'do_close' flag is true, we do the whole job; the clean-up includes | |
514 | ! * removing the struct from the list and freeing all its memory, so upon return | |
515 | ! * the caller must not refer to it any further. Otherwise, we merely mark the | |
516 | ! * connection for clean-up later by the main thread. | |
517 | * | |
518 | * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, | |
519 | ! * PUBLIC: REPMGR_CONNECTION *, int)); | |
520 | * | |
521 | * !!! | |
522 | * Caller holds mutex. | |
523 | */ | |
524 | int | |
525 | ! __repmgr_bust_connection(dbenv, conn, do_close) | |
526 | DB_ENV *dbenv; | |
527 | REPMGR_CONNECTION *conn; | |
528 | - int do_close; | |
529 | { | |
530 | DB_REP *db_rep; | |
531 | int connecting, ret, eid; | |
532 | --- 533,553 ---- | |
533 | ||
534 | /* | |
535 | * Abandons a connection, to recover from an error. Upon entry the conn struct | |
536 | ! * must be on the connections list. For now, just mark it as unusable; it will | |
537 | ! * be fully cleaned up in the top-level select thread, as soon as possible. | |
538 | * | |
539 | * PUBLIC: int __repmgr_bust_connection __P((DB_ENV *, | |
540 | ! * PUBLIC: REPMGR_CONNECTION *)); | |
541 | * | |
542 | * !!! | |
543 | * Caller holds mutex. | |
544 | + * | |
545 | + * Must be idempotent | |
546 | */ | |
547 | int | |
548 | ! __repmgr_bust_connection(dbenv, conn) | |
549 | DB_ENV *dbenv; | |
550 | REPMGR_CONNECTION *conn; | |
551 | { | |
552 | DB_REP *db_rep; | |
553 | int connecting, ret, eid; | |
554 | *************** | |
555 | *** 526,537 **** | |
556 | DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); | |
557 | eid = conn->eid; | |
558 | connecting = F_ISSET(conn, CONN_CONNECTING); | |
559 | ! if (do_close) | |
560 | ! __repmgr_cleanup_connection(dbenv, conn); | |
561 | ! else { | |
562 | ! F_SET(conn, CONN_DEFUNCT); | |
563 | ! conn->eid = -1; | |
564 | ! } | |
565 | ||
566 | /* | |
567 | * When we first accepted the incoming connection, we set conn->eid to | |
568 | --- 558,566 ---- | |
569 | DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); | |
570 | eid = conn->eid; | |
571 | connecting = F_ISSET(conn, CONN_CONNECTING); | |
572 | ! | |
573 | ! F_SET(conn, CONN_DEFUNCT); | |
574 | ! conn->eid = -1; | |
575 | ||
576 | /* | |
577 | * When we first accepted the incoming connection, we set conn->eid to | |
578 | *************** | |
579 | *** 557,563 **** | |
580 | dbenv, ELECT_FAILURE_ELECTION)) != 0) | |
581 | return (ret); | |
582 | } | |
583 | ! } else if (!do_close) { | |
584 | /* | |
585 | * One way or another, make sure the main thread is poked, so | |
586 | * that we do the deferred clean-up. | |
587 | --- 586,592 ---- | |
588 | dbenv, ELECT_FAILURE_ELECTION)) != 0) | |
589 | return (ret); | |
590 | } | |
591 | ! } else { | |
592 | /* | |
593 | * One way or another, make sure the main thread is poked, so | |
594 | * that we do the deferred clean-up. | |
595 | *************** | |
596 | *** 568,577 **** | |
597 | } | |
598 | ||
599 | /* | |
600 | ! * PUBLIC: void __repmgr_cleanup_connection | |
601 | * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); | |
602 | */ | |
603 | ! void | |
604 | __repmgr_cleanup_connection(dbenv, conn) | |
605 | DB_ENV *dbenv; | |
606 | REPMGR_CONNECTION *conn; | |
607 | --- 597,610 ---- | |
608 | } | |
609 | ||
610 | /* | |
611 | ! * PUBLIC: int __repmgr_cleanup_connection | |
612 | * PUBLIC: __P((DB_ENV *, REPMGR_CONNECTION *)); | |
613 | + * | |
614 | + * !!! | |
615 | + * Idempotent. This can be called repeatedly as blocking message threads (of | |
616 | + * which there could be multiples) wake up in case of error on the connection. | |
617 | */ | |
618 | ! int | |
619 | __repmgr_cleanup_connection(dbenv, conn) | |
620 | DB_ENV *dbenv; | |
621 | REPMGR_CONNECTION *conn; | |
622 | *************** | |
623 | *** 580,596 **** | |
624 | QUEUED_OUTPUT *out; | |
625 | REPMGR_FLAT *msg; | |
626 | DBT *dbt; | |
627 | ||
628 | db_rep = dbenv->rep_handle; | |
629 | ||
630 | ! TAILQ_REMOVE(&db_rep->connections, conn, entries); | |
631 | if (conn->fd != INVALID_SOCKET) { | |
632 | ! (void)closesocket(conn->fd); | |
633 | #ifdef DB_WIN32 | |
634 | ! (void)WSACloseEvent(conn->event_object); | |
635 | #endif | |
636 | } | |
637 | ||
638 | /* | |
639 | * Deallocate any input and output buffers we may have. | |
640 | */ | |
641 | --- 613,643 ---- | |
642 | QUEUED_OUTPUT *out; | |
643 | REPMGR_FLAT *msg; | |
644 | DBT *dbt; | |
645 | + int ret; | |
646 | ||
647 | db_rep = dbenv->rep_handle; | |
648 | ||
649 | ! DB_ASSERT(dbenv, F_ISSET(conn, CONN_DEFUNCT) || db_rep->finished); | |
650 | ! | |
651 | if (conn->fd != INVALID_SOCKET) { | |
652 | ! ret = closesocket(conn->fd); | |
653 | ! conn->fd = INVALID_SOCKET; | |
654 | ! if (ret == SOCKET_ERROR) { | |
655 | ! ret = net_errno; | |
656 | ! __db_err(dbenv, ret, "closing socket"); | |
657 | ! } | |
658 | #ifdef DB_WIN32 | |
659 | ! if (!WSACloseEvent(conn->event_object) && ret != 0) | |
660 | ! ret = net_errno; | |
661 | #endif | |
662 | + if (ret != 0) | |
663 | + return (ret); | |
664 | } | |
665 | ||
666 | + if (conn->blockers > 0) | |
667 | + return (__repmgr_signal(&conn->drained)); | |
668 | + | |
669 | + TAILQ_REMOVE(&db_rep->connections, conn, entries); | |
670 | /* | |
671 | * Deallocate any input and output buffers we may have. | |
672 | */ | |
673 | *************** | |
674 | *** 614,620 **** | |
675 | --- 661,669 ---- | |
676 | __os_free(dbenv, out); | |
677 | } | |
678 | ||
679 | + ret = __repmgr_free_cond(&conn->drained); | |
680 | __os_free(dbenv, conn); | |
681 | + return (ret); | |
682 | } | |
683 | ||
684 | static int | |
685 | *************** | |
686 | *** 1063,1069 **** | |
687 | ||
688 | while (!TAILQ_EMPTY(&db_rep->connections)) { | |
689 | conn = TAILQ_FIRST(&db_rep->connections); | |
690 | ! __repmgr_cleanup_connection(dbenv, conn); | |
691 | } | |
692 | ||
693 | for (i = 0; i < db_rep->site_cnt; i++) { | |
694 | --- 1112,1118 ---- | |
695 | ||
696 | while (!TAILQ_EMPTY(&db_rep->connections)) { | |
697 | conn = TAILQ_FIRST(&db_rep->connections); | |
698 | ! (void)__repmgr_cleanup_connection(dbenv, conn); | |
699 | } | |
700 | ||
701 | for (i = 0; i < db_rep->site_cnt; i++) { | |
702 | *** repmgr/repmgr_posix.c 2007-10-31 10:23:52.000000000 -0700 | |
703 | --- repmgr/repmgr_posix.c 2007-10-31 10:23:53.000000000 -0700 | |
704 | *************** | |
705 | *** 21,26 **** | |
706 | --- 21,28 ---- | |
707 | size_t __repmgr_guesstimated_max = (128 * 1024); | |
708 | #endif | |
709 | ||
710 | + static int __repmgr_conn_work __P((DB_ENV *, | |
711 | + REPMGR_CONNECTION *, fd_set *, fd_set *, int)); | |
712 | static int finish_connecting __P((DB_ENV *, REPMGR_CONNECTION *)); | |
713 | ||
714 | /* | |
715 | *************** | |
716 | *** 189,194 **** | |
717 | --- 191,284 ---- | |
718 | } | |
719 | ||
720 | /* | |
721 | + * PUBLIC: int __repmgr_await_drain __P((DB_ENV *, | |
722 | + * PUBLIC: REPMGR_CONNECTION *, db_timeout_t)); | |
723 | + * | |
724 | + * Waits for space to become available on the connection's output queue. | |
725 | + * Various ways we can exit: | |
726 | + * | |
727 | + * 1. queue becomes non-full | |
728 | + * 2. exceed time limit | |
729 | + * 3. connection becomes defunct (due to error in another thread) | |
730 | + * 4. repmgr is shutting down | |
731 | + * 5. any unexpected system resource failure | |
732 | + * | |
733 | + * In cases #3 and #5 we return an error code. Caller is responsible for | |
734 | + * distinguishing the remaining cases if desired. | |
735 | + * | |
736 | + * !!! | |
737 | + * Caller must hold repmgr->mutex. | |
738 | + */ | |
739 | + int | |
740 | + __repmgr_await_drain(dbenv, conn, timeout) | |
741 | + DB_ENV *dbenv; | |
742 | + REPMGR_CONNECTION *conn; | |
743 | + db_timeout_t timeout; | |
744 | + { | |
745 | + DB_REP *db_rep; | |
746 | + struct timespec deadline; | |
747 | + int ret; | |
748 | + | |
749 | + db_rep = dbenv->rep_handle; | |
750 | + | |
751 | + __repmgr_compute_wait_deadline(dbenv, &deadline, timeout); | |
752 | + | |
753 | + ret = 0; | |
754 | + while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { | |
755 | + ret = pthread_cond_timedwait(&conn->drained, | |
756 | + &db_rep->mutex, &deadline); | |
757 | + switch (ret) { | |
758 | + case 0: | |
759 | + if (db_rep->finished) | |
760 | + goto out; /* #4. */ | |
761 | + /* | |
762 | + * Another thread could have stumbled into an error on | |
763 | + * the socket while we were waiting. | |
764 | + */ | |
765 | + if (F_ISSET(conn, CONN_DEFUNCT)) { | |
766 | + ret = DB_REP_UNAVAIL; /* #3. */ | |
767 | + goto out; | |
768 | + } | |
769 | + break; | |
770 | + case ETIMEDOUT: | |
771 | + F_SET(conn, CONN_CONGESTED); | |
772 | + ret = 0; | |
773 | + goto out; /* #2. */ | |
774 | + default: | |
775 | + goto out; /* #5. */ | |
776 | + } | |
777 | + } | |
778 | + /* #1. */ | |
779 | + | |
780 | + out: | |
781 | + return (ret); | |
782 | + } | |
783 | + | |
784 | + /* | |
785 | + * PUBLIC: int __repmgr_alloc_cond __P((cond_var_t *)); | |
786 | + * | |
787 | + * Initialize a condition variable (in allocated space). | |
788 | + */ | |
789 | + int | |
790 | + __repmgr_alloc_cond(c) | |
791 | + cond_var_t *c; | |
792 | + { | |
793 | + return (pthread_cond_init(c, NULL)); | |
794 | + } | |
795 | + | |
796 | + /* | |
797 | + * PUBLIC: int __repmgr_free_cond __P((cond_var_t *)); | |
798 | + * | |
799 | + * Clean up a previously initialized condition variable. | |
800 | + */ | |
801 | + int | |
802 | + __repmgr_free_cond(c) | |
803 | + cond_var_t *c; | |
804 | + { | |
805 | + return (pthread_cond_destroy(c)); | |
806 | + } | |
807 | + | |
808 | + /* | |
809 | * PUBLIC: int __repmgr_init_sync __P((DB_ENV *, DB_REP *)); | |
810 | * | |
811 | * Allocate/initialize all data necessary for thread synchronization. This | |
812 | *************** | |
813 | *** 443,449 **** | |
814 | REPMGR_RETRY *retry; | |
815 | db_timespec timeout; | |
816 | fd_set reads, writes; | |
817 | ! int ret, flow_control, maxfd, nready; | |
818 | u_int8_t buf[10]; /* arbitrary size */ | |
819 | ||
820 | flow_control = FALSE; | |
821 | --- 533,539 ---- | |
822 | REPMGR_RETRY *retry; | |
823 | db_timespec timeout; | |
824 | fd_set reads, writes; | |
825 | ! int ret, flow_control, maxfd; | |
826 | u_int8_t buf[10]; /* arbitrary size */ | |
827 | ||
828 | flow_control = FALSE; | |
829 | *************** | |
830 | *** 477,482 **** | |
831 | --- 567,575 ---- | |
832 | * each one. | |
833 | */ | |
834 | TAILQ_FOREACH(conn, &db_rep->connections, entries) { | |
835 | + if (F_ISSET(conn, CONN_DEFUNCT)) | |
836 | + continue; | |
837 | + | |
838 | if (F_ISSET(conn, CONN_CONNECTING)) { | |
839 | FD_SET((u_int)conn->fd, &reads); | |
840 | FD_SET((u_int)conn->fd, &writes); | |
841 | *************** | |
842 | *** 533,616 **** | |
843 | return (ret); | |
844 | } | |
845 | } | |
846 | - nready = ret; | |
847 | - | |
848 | LOCK_MUTEX(db_rep->mutex); | |
849 | ||
850 | - /* | |
851 | - * The first priority thing we must do is to clean up any | |
852 | - * pending defunct connections. Otherwise, if they have any | |
853 | - * lingering pending input, we get very confused if we try to | |
854 | - * process it. | |
855 | - * | |
856 | - * The TAILQ_FOREACH macro would be suitable here, except that | |
857 | - * it doesn't allow unlinking the current element, which is | |
858 | - * needed for cleanup_connection. | |
859 | - */ | |
860 | - for (conn = TAILQ_FIRST(&db_rep->connections); | |
861 | - conn != NULL; | |
862 | - conn = next) { | |
863 | - next = TAILQ_NEXT(conn, entries); | |
864 | - if (F_ISSET(conn, CONN_DEFUNCT)) | |
865 | - __repmgr_cleanup_connection(dbenv, conn); | |
866 | - } | |
867 | - | |
868 | if ((ret = __repmgr_retry_connections(dbenv)) != 0) | |
869 | goto out; | |
870 | - if (nready == 0) | |
871 | - continue; | |
872 | ||
873 | /* | |
874 | ! * Traverse the linked list. (Again, like TAILQ_FOREACH, except | |
875 | ! * that we need the ability to unlink an element along the way.) | |
876 | */ | |
877 | for (conn = TAILQ_FIRST(&db_rep->connections); | |
878 | conn != NULL; | |
879 | conn = next) { | |
880 | next = TAILQ_NEXT(conn, entries); | |
881 | ! if (F_ISSET(conn, CONN_CONNECTING)) { | |
882 | ! if (FD_ISSET((u_int)conn->fd, &reads) || | |
883 | ! FD_ISSET((u_int)conn->fd, &writes)) { | |
884 | ! if ((ret = finish_connecting(dbenv, | |
885 | ! conn)) == DB_REP_UNAVAIL) { | |
886 | ! if ((ret = | |
887 | ! __repmgr_bust_connection( | |
888 | ! dbenv, conn, TRUE)) != 0) | |
889 | ! goto out; | |
890 | ! } else if (ret != 0) | |
891 | ! goto out; | |
892 | ! } | |
893 | ! continue; | |
894 | ! } | |
895 | ! | |
896 | ! /* | |
897 | ! * Here, the site is connected, and the FD_SET's are | |
898 | ! * valid. | |
899 | ! */ | |
900 | ! if (FD_ISSET((u_int)conn->fd, &writes)) { | |
901 | ! if ((ret = __repmgr_write_some( | |
902 | ! dbenv, conn)) == DB_REP_UNAVAIL) { | |
903 | ! if ((ret = | |
904 | ! __repmgr_bust_connection(dbenv, | |
905 | ! conn, TRUE)) != 0) | |
906 | ! goto out; | |
907 | ! continue; | |
908 | ! } else if (ret != 0) | |
909 | ! goto out; | |
910 | ! } | |
911 | ! | |
912 | ! if (!flow_control && | |
913 | ! FD_ISSET((u_int)conn->fd, &reads)) { | |
914 | ! if ((ret = __repmgr_read_from_site(dbenv, conn)) | |
915 | ! == DB_REP_UNAVAIL) { | |
916 | ! if ((ret = | |
917 | ! __repmgr_bust_connection(dbenv, | |
918 | ! conn, TRUE)) != 0) | |
919 | ! goto out; | |
920 | ! continue; | |
921 | ! } else if (ret != 0) | |
922 | ! goto out; | |
923 | ! } | |
924 | } | |
925 | ||
926 | /* | |
927 | --- 626,650 ---- | |
928 | return (ret); | |
929 | } | |
930 | } | |
931 | LOCK_MUTEX(db_rep->mutex); | |
932 | ||
933 | if ((ret = __repmgr_retry_connections(dbenv)) != 0) | |
934 | goto out; | |
935 | ||
936 | /* | |
937 | ! * Examine each connection, to see what work needs to be done. | |
938 | ! * | |
939 | ! * The TAILQ_FOREACH macro would be suitable here, except that | |
940 | ! * it doesn't allow unlinking the current element, which is | |
941 | ! * needed for cleanup_connection. | |
942 | */ | |
943 | for (conn = TAILQ_FIRST(&db_rep->connections); | |
944 | conn != NULL; | |
945 | conn = next) { | |
946 | next = TAILQ_NEXT(conn, entries); | |
947 | ! if ((ret = __repmgr_conn_work(dbenv, | |
948 | ! conn, &reads, &writes, flow_control)) != 0) | |
949 | ! goto out; | |
950 | } | |
951 | ||
952 | /* | |
953 | *************** | |
954 | *** 637,642 **** | |
955 | --- 671,719 ---- | |
956 | } | |
957 | ||
958 | static int | |
959 | + __repmgr_conn_work(dbenv, conn, reads, writes, flow_control) | |
960 | + DB_ENV *dbenv; | |
961 | + REPMGR_CONNECTION *conn; | |
962 | + fd_set *reads, *writes; | |
963 | + int flow_control; | |
964 | + { | |
965 | + int ret; | |
966 | + u_int fd; | |
967 | + | |
968 | + if (F_ISSET(conn, CONN_DEFUNCT)) { | |
969 | + /* | |
970 | + * Deferred clean-up, from an error that happened in another | |
971 | + * thread, while we were sleeping in select(). | |
972 | + */ | |
973 | + return (__repmgr_cleanup_connection(dbenv, conn)); | |
974 | + } | |
975 | + | |
976 | + ret = 0; | |
977 | + fd = (u_int)conn->fd; | |
978 | + | |
979 | + if (F_ISSET(conn, CONN_CONNECTING)) { | |
980 | + if (FD_ISSET(fd, reads) || FD_ISSET(fd, writes)) | |
981 | + ret = finish_connecting(dbenv, conn); | |
982 | + } else { | |
983 | + /* | |
984 | + * Here, the site is connected, and the FD_SET's are valid. | |
985 | + */ | |
986 | + if (FD_ISSET(fd, writes)) | |
987 | + ret = __repmgr_write_some(dbenv, conn); | |
988 | + | |
989 | + if (ret == 0 && !flow_control && FD_ISSET(fd, reads)) | |
990 | + ret = __repmgr_read_from_site(dbenv, conn); | |
991 | + } | |
992 | + | |
993 | + if (ret == DB_REP_UNAVAIL) { | |
994 | + if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) | |
995 | + return (ret); | |
996 | + ret = __repmgr_cleanup_connection(dbenv, conn); | |
997 | + } | |
998 | + return (ret); | |
999 | + } | |
1000 | + | |
1001 | + static int | |
1002 | finish_connecting(dbenv, conn) | |
1003 | DB_ENV *dbenv; | |
1004 | REPMGR_CONNECTION *conn; | |
1005 | *************** | |
1006 | *** 657,662 **** | |
1007 | --- 734,740 ---- | |
1008 | goto err_rpt; | |
1009 | } | |
1010 | ||
1011 | + DB_ASSERT(dbenv, F_ISSET(conn, CONN_CONNECTING)); | |
1012 | F_CLR(conn, CONN_CONNECTING); | |
1013 | return (__repmgr_send_handshake(dbenv, conn)); | |
1014 | ||
1015 | *************** | |
1016 | *** 671,690 **** | |
1017 | "connecting to %s", __repmgr_format_site_loc(site, buffer)); | |
1018 | ||
1019 | /* If we've exhausted the list of possible addresses, give up. */ | |
1020 | ! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) | |
1021 | return (DB_REP_UNAVAIL); | |
1022 | ||
1023 | /* | |
1024 | * This is just like a little mini-"bust_connection", except that we | |
1025 | * don't reschedule for later, 'cuz we're just about to try again right | |
1026 | ! * now. | |
1027 | * | |
1028 | * !!! | |
1029 | * Which means this must only be called on the select() thread, since | |
1030 | * only there are we allowed to actually close a connection. | |
1031 | */ | |
1032 | DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); | |
1033 | ! __repmgr_cleanup_connection(dbenv, conn); | |
1034 | ret = __repmgr_connect_site(dbenv, eid); | |
1035 | DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); | |
1036 | return (ret); | |
1037 | --- 749,773 ---- | |
1038 | "connecting to %s", __repmgr_format_site_loc(site, buffer)); | |
1039 | ||
1040 | /* If we've exhausted the list of possible addresses, give up. */ | |
1041 | ! if (ADDR_LIST_NEXT(&site->net_addr) == NULL) { | |
1042 | ! STAT(db_rep->region->mstat.st_connect_fail++); | |
1043 | return (DB_REP_UNAVAIL); | |
1044 | + } | |
1045 | ||
1046 | /* | |
1047 | * This is just like a little mini-"bust_connection", except that we | |
1048 | * don't reschedule for later, 'cuz we're just about to try again right | |
1049 | ! * now. (Note that we don't have to worry about message threads | |
1050 | ! * blocking on a full output queue: that can't happen when we're only | |
1051 | ! * just connecting.) | |
1052 | * | |
1053 | * !!! | |
1054 | * Which means this must only be called on the select() thread, since | |
1055 | * only there are we allowed to actually close a connection. | |
1056 | */ | |
1057 | DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); | |
1058 | ! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0) | |
1059 | ! return (ret); | |
1060 | ret = __repmgr_connect_site(dbenv, eid); | |
1061 | DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); | |
1062 | return (ret); | |
1063 | *** repmgr/repmgr_sel.c 2007-10-31 10:23:52.000000000 -0700 | |
1064 | --- repmgr/repmgr_sel.c 2007-10-31 10:23:53.000000000 -0700 | |
1065 | *************** | |
1066 | *** 36,45 **** | |
1067 | ||
1068 | /* | |
1069 | * PUBLIC: int __repmgr_accept __P((DB_ENV *)); | |
1070 | - * | |
1071 | - * !!! | |
1072 | - * Only ever called in the select() thread, since we may call | |
1073 | - * __repmgr_bust_connection(..., TRUE). | |
1074 | */ | |
1075 | int | |
1076 | __repmgr_accept(dbenv) | |
1077 | --- 36,41 ---- | |
1078 | *************** | |
1079 | *** 133,139 **** | |
1080 | case 0: | |
1081 | return (0); | |
1082 | case DB_REP_UNAVAIL: | |
1083 | ! return (__repmgr_bust_connection(dbenv, conn, TRUE)); | |
1084 | default: | |
1085 | return (ret); | |
1086 | } | |
1087 | --- 129,135 ---- | |
1088 | case 0: | |
1089 | return (0); | |
1090 | case DB_REP_UNAVAIL: | |
1091 | ! return (__repmgr_bust_connection(dbenv, conn)); | |
1092 | default: | |
1093 | return (ret); | |
1094 | } | |
1095 | *************** | |
1096 | *** 254,263 **** | |
1097 | * starting with the "current" element of its address list and trying as many | |
1098 | * addresses as necessary until the list is exhausted. | |
1099 | * | |
1100 | - * !!! | |
1101 | - * Only ever called in the select() thread, since we may call | |
1102 | - * __repmgr_bust_connection(..., TRUE). | |
1103 | - * | |
1104 | * PUBLIC: int __repmgr_connect_site __P((DB_ENV *, u_int eid)); | |
1105 | */ | |
1106 | int | |
1107 | --- 250,255 ---- | |
1108 | *************** | |
1109 | *** 332,338 **** | |
1110 | case 0: | |
1111 | break; | |
1112 | case DB_REP_UNAVAIL: | |
1113 | ! return (__repmgr_bust_connection(dbenv, con, TRUE)); | |
1114 | default: | |
1115 | return (ret); | |
1116 | } | |
1117 | --- 324,330 ---- | |
1118 | case 0: | |
1119 | break; | |
1120 | case DB_REP_UNAVAIL: | |
1121 | ! return (__repmgr_bust_connection(dbenv, con)); | |
1122 | default: | |
1123 | return (ret); | |
1124 | } | |
1125 | *************** | |
1126 | *** 437,443 **** | |
1127 | ||
1128 | DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); | |
1129 | ||
1130 | ! return (__repmgr_send_one(dbenv, conn, REPMGR_HANDSHAKE, &cntrl, &rec)); | |
1131 | } | |
1132 | ||
1133 | /* | |
1134 | --- 429,443 ---- | |
1135 | ||
1136 | DB_SET_DBT(rec, my_addr->host, strlen(my_addr->host) + 1); | |
1137 | ||
1138 | ! /* | |
1139 | ! * It would of course be disastrous to block the select() thread, so | |
1140 | ! * pass the "blockable" argument as FALSE. Fortunately blocking should | |
1141 | ! * never be necessary here, because the hand-shake is always the first | |
1142 | ! * thing we send. Which is a good thing, because it would be almost as | |
1143 | ! * disastrous if we allowed ourselves to drop a handshake. | |
1144 | ! */ | |
1145 | ! return (__repmgr_send_one(dbenv, | |
1146 | ! conn, REPMGR_HANDSHAKE, &cntrl, &rec, FALSE)); | |
1147 | } | |
1148 | ||
1149 | /* | |
1150 | *************** | |
1151 | *** 854,859 **** | |
1152 | --- 854,872 ---- | |
1153 | conn->out_queue_length--; | |
1154 | if (--msg->ref_count <= 0) | |
1155 | __os_free(dbenv, msg); | |
1156 | + | |
1157 | + /* | |
1158 | + * We've achieved enough movement to free up at least | |
1159 | + * one space in the outgoing queue. Wake any message | |
1160 | + * threads that may be waiting for space. Clear the | |
1161 | + * CONGESTED status so that when the queue reaches the | |
1162 | + * high-water mark again, the filling thread will be | |
1163 | + * allowed to try waiting again. | |
1164 | + */ | |
1165 | + F_CLR(conn, CONN_CONGESTED); | |
1166 | + if (conn->blockers > 0 && | |
1167 | + (ret = __repmgr_signal(&conn->drained)) != 0) | |
1168 | + return (ret); | |
1169 | } | |
1170 | } | |
1171 | ||
1172 | *** repmgr/repmgr_util.c 2007-10-31 10:23:52.000000000 -0700 | |
1173 | --- repmgr/repmgr_util.c 2007-10-31 10:23:53.000000000 -0700 | |
1174 | *************** | |
1175 | *** 103,108 **** | |
1176 | --- 103,113 ---- | |
1177 | db_rep = dbenv->rep_handle; | |
1178 | if ((ret = __os_malloc(dbenv, sizeof(REPMGR_CONNECTION), &c)) != 0) | |
1179 | return (ret); | |
1180 | + if ((ret = __repmgr_alloc_cond(&c->drained)) != 0) { | |
1181 | + __os_free(dbenv, c); | |
1182 | + return (ret); | |
1183 | + } | |
1184 | + c->blockers = 0; | |
1185 | ||
1186 | c->fd = s; | |
1187 | c->flags = flags; | |
1188 | *** repmgr/repmgr_windows.c 2007-10-31 10:23:52.000000000 -0700 | |
1189 | --- repmgr/repmgr_windows.c 2007-10-31 10:23:53.000000000 -0700 | |
1190 | *************** | |
1191 | *** 11,16 **** | |
1192 | --- 11,19 ---- | |
1193 | #define __INCLUDE_NETWORKING 1 | |
1194 | #include "db_int.h" | |
1195 | ||
1196 | + /* Convert time-out from microseconds to milliseconds, rounding up. */ | |
1197 | + #define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS) | |
1198 | + | |
1199 | typedef struct __ack_waiter { | |
1200 | HANDLE event; | |
1201 | const DB_LSN *lsnp; | |
1202 | *************** | |
1203 | *** 120,136 **** | |
1204 | { | |
1205 | DB_REP *db_rep; | |
1206 | ACK_WAITER *me; | |
1207 | ! DWORD ret; | |
1208 | ! DWORD timeout; | |
1209 | ||
1210 | db_rep = dbenv->rep_handle; | |
1211 | ||
1212 | if ((ret = allocate_wait_slot(dbenv, &me)) != 0) | |
1213 | goto err; | |
1214 | ||
1215 | - /* convert time-out from microseconds to milliseconds, rounding up */ | |
1216 | timeout = db_rep->ack_timeout > 0 ? | |
1217 | ! ((db_rep->ack_timeout + (US_PER_MS - 1)) / US_PER_MS) : INFINITE; | |
1218 | me->lsnp = lsnp; | |
1219 | if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, | |
1220 | FALSE)) == WAIT_FAILED) { | |
1221 | --- 123,137 ---- | |
1222 | { | |
1223 | DB_REP *db_rep; | |
1224 | ACK_WAITER *me; | |
1225 | ! DWORD ret, timeout; | |
1226 | ||
1227 | db_rep = dbenv->rep_handle; | |
1228 | ||
1229 | if ((ret = allocate_wait_slot(dbenv, &me)) != 0) | |
1230 | goto err; | |
1231 | ||
1232 | timeout = db_rep->ack_timeout > 0 ? | |
1233 | ! DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE; | |
1234 | me->lsnp = lsnp; | |
1235 | if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout, | |
1236 | FALSE)) == WAIT_FAILED) { | |
1237 | *************** | |
1238 | *** 211,216 **** | |
1239 | --- 212,296 ---- | |
1240 | db_rep->waiters->first_free = slot; | |
1241 | } | |
1242 | ||
1243 | + /* (See requirements described in repmgr_posix.c.) */ | |
1244 | + int | |
1245 | + __repmgr_await_drain(dbenv, conn, timeout) | |
1246 | + DB_ENV *dbenv; | |
1247 | + REPMGR_CONNECTION *conn; | |
1248 | + db_timeout_t timeout; | |
1249 | + { | |
1250 | + DB_REP *db_rep; | |
1251 | + db_timespec deadline, delta, now; | |
1252 | + db_timeout_t t; | |
1253 | + DWORD duration, ret; | |
1254 | + int round_up; | |
1255 | + | |
1256 | + db_rep = dbenv->rep_handle; | |
1257 | + | |
1258 | + __os_gettime(dbenv, &deadline); | |
1259 | + DB_TIMEOUT_TO_TIMESPEC(timeout, &delta); | |
1260 | + timespecadd(&deadline, &delta); | |
1261 | + | |
1262 | + while (conn->out_queue_length >= OUT_QUEUE_LIMIT) { | |
1263 | + if (!ResetEvent(conn->drained)) | |
1264 | + return (GetLastError()); | |
1265 | + | |
1266 | + /* How long until the deadline? */ | |
1267 | + __os_gettime(dbenv, &now); | |
1268 | + if (timespeccmp(&now, &deadline, >=)) { | |
1269 | + F_SET(conn, CONN_CONGESTED); | |
1270 | + return (0); | |
1271 | + } | |
1272 | + delta = deadline; | |
1273 | + timespecsub(&delta, &now); | |
1274 | + round_up = TRUE; | |
1275 | + DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up); | |
1276 | + duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t); | |
1277 | + | |
1278 | + ret = SignalObjectAndWait(db_rep->mutex, | |
1279 | + conn->drained, duration, FALSE); | |
1280 | + LOCK_MUTEX(db_rep->mutex); | |
1281 | + if (ret == WAIT_FAILED) | |
1282 | + return (GetLastError()); | |
1283 | + else if (ret == WAIT_TIMEOUT) { | |
1284 | + F_SET(conn, CONN_CONGESTED); | |
1285 | + return (0); | |
1286 | + } else | |
1287 | + DB_ASSERT(dbenv, ret == WAIT_OBJECT_0); | |
1288 | + | |
1289 | + if (db_rep->finished) | |
1290 | + return (0); | |
1291 | + if (F_ISSET(conn, CONN_DEFUNCT)) | |
1292 | + return (DB_REP_UNAVAIL); | |
1293 | + } | |
1294 | + return (0); | |
1295 | + } | |
1296 | + | |
1297 | + /* | |
1298 | + * Creates a manual reset event, which is usually our best choice when we may | |
1299 | + * have multiple threads waiting on a single event. | |
1300 | + */ | |
1301 | + int | |
1302 | + __repmgr_alloc_cond(c) | |
1303 | + cond_var_t *c; | |
1304 | + { | |
1305 | + HANDLE event; | |
1306 | + | |
1307 | + if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) | |
1308 | + return (GetLastError()); | |
1309 | + *c = event; | |
1310 | + return (0); | |
1311 | + } | |
1312 | + | |
1313 | + int | |
1314 | + __repmgr_free_cond(c) | |
1315 | + cond_var_t *c; | |
1316 | + { | |
1317 | + if (CloseHandle(*c)) | |
1318 | + return (0); | |
1319 | + return (GetLastError()); | |
1320 | + } | |
1321 | + | |
1322 | /* | |
1323 | * Make resource allocation an all-or-nothing affair, outside of this and the | |
1324 | * close_sync function. db_rep->waiters should be non-NULL iff all of these | |
1325 | *************** | |
1326 | *** 488,493 **** | |
1327 | --- 568,576 ---- | |
1328 | * don't hurt anything flow-control-wise. | |
1329 | */ | |
1330 | TAILQ_FOREACH(conn, &db_rep->connections, entries) { | |
1331 | + if (F_ISSET(conn, CONN_DEFUNCT)) | |
1332 | + continue; | |
1333 | + | |
1334 | if (F_ISSET(conn, CONN_CONNECTING) || | |
1335 | !STAILQ_EMPTY(&conn->outbound_queue) || | |
1336 | (!flow_control || !IS_VALID_EID(conn->eid))) { | |
1337 | *************** | |
1338 | *** 534,541 **** | |
1339 | conn != NULL; | |
1340 | conn = next) { | |
1341 | next = TAILQ_NEXT(conn, entries); | |
1342 | ! if (F_ISSET(conn, CONN_DEFUNCT)) | |
1343 | ! __repmgr_cleanup_connection(dbenv, conn); | |
1344 | } | |
1345 | ||
1346 | /* | |
1347 | --- 617,626 ---- | |
1348 | conn != NULL; | |
1349 | conn = next) { | |
1350 | next = TAILQ_NEXT(conn, entries); | |
1351 | ! if (F_ISSET(conn, CONN_DEFUNCT) && | |
1352 | ! (ret = __repmgr_cleanup_connection(dbenv, | |
1353 | ! conn)) != 0) | |
1354 | ! goto unlock; | |
1355 | } | |
1356 | ||
1357 | /* | |
1358 | *************** | |
1359 | *** 587,597 **** | |
1360 | return (ret); | |
1361 | } | |
1362 | ||
1363 | - /* | |
1364 | - * !!! | |
1365 | - * Only ever called on the select() thread, since we may call | |
1366 | - * __repmgr_bust_connection(..., TRUE). | |
1367 | - */ | |
1368 | static int | |
1369 | handle_completion(dbenv, conn) | |
1370 | DB_ENV *dbenv; | |
1371 | --- 672,677 ---- | |
1372 | *************** | |
1373 | *** 651,660 **** | |
1374 | } | |
1375 | } | |
1376 | ||
1377 | ! return (0); | |
1378 | ! | |
1379 | ! err: if (ret == DB_REP_UNAVAIL) | |
1380 | ! return (__repmgr_bust_connection(dbenv, conn, TRUE)); | |
1381 | return (ret); | |
1382 | } | |
1383 | ||
1384 | --- 731,742 ---- | |
1385 | } | |
1386 | } | |
1387 | ||
1388 | ! err: | |
1389 | ! if (ret == DB_REP_UNAVAIL) { | |
1390 | ! if ((ret = __repmgr_bust_connection(dbenv, conn)) != 0) | |
1391 | ! return (ret); | |
1392 | ! ret = __repmgr_cleanup_connection(dbenv, conn); | |
1393 | ! } | |
1394 | return (ret); | |
1395 | } | |
1396 | ||
1397 | *************** | |
1398 | *** 708,714 **** | |
1399 | } | |
1400 | ||
1401 | DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); | |
1402 | ! __repmgr_cleanup_connection(dbenv, conn); | |
1403 | ret = __repmgr_connect_site(dbenv, eid); | |
1404 | DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); | |
1405 | return (ret); | |
1406 | --- 790,797 ---- | |
1407 | } | |
1408 | ||
1409 | DB_ASSERT(dbenv, !TAILQ_EMPTY(&db_rep->connections)); | |
1410 | ! if ((ret = __repmgr_cleanup_connection(dbenv, conn)) != 0) | |
1411 | ! return (ret); | |
1412 | ret = __repmgr_connect_site(dbenv, eid); | |
1413 | DB_ASSERT(dbenv, ret != DB_REP_UNAVAIL); | |
1414 | return (ret); |