1 *** lock/lock_deadlock.c 2008-03-11 00:31:33.000000000 +1100
2 --- lock/lock_deadlock.c 2008-12-16 21:54:18.000000000 +1100
8 ! u_int32_t *bitmap, *copymap, **deadp, **free_me, *tmpmap;
9 u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers;
10 u_int32_t lock_max, txn_max;
16 ! u_int32_t *bitmap, *copymap, **deadp, **deadlist, *tmpmap;
17 u_int32_t i, cid, keeper, killid, limit, nalloc, nlockers;
18 u_int32_t lock_max, txn_max;
22 if (IS_REP_CLIENT(env))
23 atype = DB_LOCK_MINWRITE;
30 if (IS_REP_CLIENT(env))
31 atype = DB_LOCK_MINWRITE;
33 ! copymap = tmpmap = NULL;
40 memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc);
42 if ((ret = __os_calloc(env, sizeof(u_int32_t), nalloc, &tmpmap)) != 0)
45 /* Find a deadlock. */
47 ! __dd_find(env, bitmap, idmap, nlockers, nalloc, &deadp)) != 0)
52 memcpy(copymap, bitmap, nlockers * sizeof(u_int32_t) * nalloc);
54 if ((ret = __os_calloc(env, sizeof(u_int32_t), nalloc, &tmpmap)) != 0)
57 /* Find a deadlock. */
59 ! __dd_find(env, bitmap, idmap, nlockers, nalloc, &deadlist)) != 0)
65 txn_max = TXN_MAXIMUM;
69 ! for (; *deadp != NULL; deadp++) {
72 killid = (u_int32_t)(*deadp - bitmap) / nalloc;
74 txn_max = TXN_MAXIMUM;
77 ! for (deadp = deadlist; *deadp != NULL; deadp++) {
80 killid = (u_int32_t)(*deadp - bitmap) / nalloc;
84 "Aborting locker %lx", (u_long)idmap[killid].id);
86 ! __os_free(env, tmpmap);
87 ! err1: __os_free(env, copymap);
89 ! err: if (free_me != NULL)
90 ! __os_free(env, free_me);
91 __os_free(env, bitmap);
92 __os_free(env, idmap);
96 "Aborting locker %lx", (u_long)idmap[killid].id);
98 ! err: if(copymap != NULL)
99 ! __os_free(env, copymap);
100 ! if (deadlist != NULL)
101 ! __os_free(env, deadlist);
103 ! __os_free(env, tmpmap);
104 __os_free(env, bitmap);
105 __os_free(env, idmap);
111 #define DD_INVALID_ID ((u_int32_t) -1)
115 + * Build the lock dependency bit maps.
116 + * Notes on syncronization:
117 + * LOCK_SYSTEM_LOCK is used to hold objects locked when we have
118 + * a single partition.
119 + * LOCK_LOCKERS is held while we are walking the lockers list and
120 + * to single thread the use of lockerp->dd_id.
121 + * LOCK_DD protects the DD list of objects.
125 __dd_build(env, atype, bmp, nlockers, allocp, idmap, rejectp)
130 * In particular we do not build the conflict array and our caller
131 * needs to expect this.
133 + LOCK_SYSTEM_LOCK(lt, region);
134 if (atype == DB_LOCK_EXPIRE) {
135 skip: LOCK_DD(env, region);
136 op = SH_TAILQ_FIRST(®ion->dd_objs, __db_lockobj);
139 OBJECT_UNLOCK(lt, region, indx);
141 UNLOCK_DD(env, region);
146 ! * We'll check how many lockers there are, add a few more in for
147 ! * good measure and then allocate all the structures. Then we'll
148 ! * verify that we have enough room when we go back in and get the
149 ! * mutex the second time.
151 ! retry: count = region->stat.st_nlockers;
157 OBJECT_UNLOCK(lt, region, indx);
159 UNLOCK_DD(env, region);
160 + LOCK_SYSTEM_UNLOCK(lt, region);
165 ! * Allocate after locking the region
166 ! * to make sure the structures are large enough.
168 ! LOCK_LOCKERS(env, region);
169 ! count = region->stat.st_nlockers;
171 + UNLOCK_LOCKERS(env, region);
177 if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK))
178 __db_msg(env, "%lu lockers", (u_long)count);
181 nentries = (u_int32_t)DB_ALIGN(count, 32) / 32;
184 ! * Allocate enough space for a count by count bitmap matrix.
187 ! * We can probably save the malloc's between iterations just
188 ! * reallocing if necessary because count grew by too much.
190 if ((ret = __os_calloc(env, (size_t)count,
191 ! sizeof(u_int32_t) * nentries, &bitmap)) != 0)
194 if ((ret = __os_calloc(env,
195 sizeof(u_int32_t), nentries, &tmpmap)) != 0) {
196 __os_free(env, bitmap);
200 if ((ret = __os_calloc(env,
201 (size_t)count, sizeof(locker_info), &id_array)) != 0) {
202 __os_free(env, bitmap);
203 __os_free(env, tmpmap);
208 - * Now go back in and actually fill in the matrix.
210 - if (region->stat.st_nlockers > count) {
211 - __os_free(env, bitmap);
212 - __os_free(env, tmpmap);
213 - __os_free(env, id_array);
218 * First we go through and assign each locker a deadlock detector id.
221 - LOCK_LOCKERS(env, region);
222 SH_TAILQ_FOREACH(lip, ®ion->lockers, ulinks, __db_locker) {
223 if (lip->master_locker == INVALID_ROFF) {
225 id_array[lip->dd_id].id = lip->id;
228 if (FLD_ISSET(env->dbenv->verbose, DB_VERB_DEADLOCK))
229 __db_msg(env, "%lu lockers", (u_long)count);
231 nentries = (u_int32_t)DB_ALIGN(count, 32) / 32;
233 ! /* Allocate enough space for a count by count bitmap matrix. */
234 if ((ret = __os_calloc(env, (size_t)count,
235 ! sizeof(u_int32_t) * nentries, &bitmap)) != 0) {
236 ! UNLOCK_LOCKERS(env, region);
240 if ((ret = __os_calloc(env,
241 sizeof(u_int32_t), nentries, &tmpmap)) != 0) {
242 + UNLOCK_LOCKERS(env, region);
243 __os_free(env, bitmap);
247 if ((ret = __os_calloc(env,
248 (size_t)count, sizeof(locker_info), &id_array)) != 0) {
249 + UNLOCK_LOCKERS(env, region);
250 __os_free(env, bitmap);
251 __os_free(env, tmpmap);
256 * First we go through and assign each locker a deadlock detector id.
259 SH_TAILQ_FOREACH(lip, ®ion->lockers, ulinks, __db_locker) {
260 if (lip->master_locker == INVALID_ROFF) {
261 + DB_ASSERT(env, id < count);
263 id_array[lip->dd_id].id = lip->id;
267 lip->dd_id = DD_INVALID_ID;
270 - UNLOCK_LOCKERS(env, region);
273 * We only need consider objects that have waiters, so we use
277 * status after building the bit maps so that we will not detect
278 * a blocked transaction without noting that it is already aborting.
280 - LOCK_LOCKERS(env, region);
281 for (id = 0; id < count; id++) {
282 if (!id_array[id].valid)
288 id_array[id].in_abort = 1;
290 UNLOCK_LOCKERS(env, region);
291 + LOCK_SYSTEM_UNLOCK(lt, region);
294 * Now we can release everything except the bitmap matrix that we
300 /* We must lock so this locker cannot go away while we abort it. */
301 + LOCK_SYSTEM_LOCK(lt, region);
302 LOCK_LOCKERS(env, region);
308 done: OBJECT_UNLOCK(lt, region, info->last_ndx);
310 out: UNLOCK_LOCKERS(env, region);
311 + LOCK_SYSTEM_UNLOCK(lt, region);