# name : innodb_pass_corrupt_table.patch # introduced : 11 or before # maintainer : Yasufumi # #!!! notice !!! # Any small change to this file in the main branch # should be done or reviewed by the maintainer! --- a/storage/innobase/btr/btr0btr.c +++ b/storage/innobase/btr/btr0btr.c @@ -691,6 +691,12 @@ root_page_no = dict_index_get_page(index); block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH, mtr); + + if (srv_pass_corrupt_table && !block) { + return(0); + } + ut_a(block); + ut_a((ibool)!!page_is_comp(buf_block_get_frame(block)) == dict_table_is_comp(index->table)); #ifdef UNIV_BTR_DEBUG @@ -977,6 +983,12 @@ root = btr_root_get(index, &mtr); + if (srv_pass_corrupt_table && !root) { + mtr_commit(&mtr); + return(0); + } + ut_a(root); + if (flag == BTR_N_LEAF_PAGES) { seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF; @@ -1433,6 +1445,13 @@ mtr_start(&mtr); root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH, &mtr); + + if (srv_pass_corrupt_table && !root) { + mtr_commit(&mtr); + return; + } + ut_a(root); + #ifdef UNIV_BTR_DEBUG ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root, space)); @@ -1455,6 +1474,12 @@ mtr_start(&mtr); root = btr_page_get(space, zip_size, root_page_no, RW_X_LATCH, &mtr); + + if (srv_pass_corrupt_table && !root) { + mtr_commit(&mtr); + return; + } + ut_a(root); #ifdef UNIV_BTR_DEBUG ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root, space)); @@ -1488,6 +1513,11 @@ block = btr_block_get(space, zip_size, root_page_no, RW_X_LATCH, mtr); + if (srv_pass_corrupt_table && !block) { + return; + } + ut_a(block); + btr_search_drop_page_hash_index(block); header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP; --- a/storage/innobase/btr/btr0cur.c +++ b/storage/innobase/btr/btr0cur.c @@ -250,6 +250,11 @@ case BTR_MODIFY_LEAF: mode = latch_mode == BTR_SEARCH_LEAF ? RW_S_LATCH : RW_X_LATCH; get_block = btr_block_get(space, zip_size, page_no, mode, mtr); + + if (srv_pass_corrupt_table && !get_block) { + return; + } + ut_a(get_block); #ifdef UNIV_BTR_DEBUG ut_a(page_is_comp(get_block->frame) == page_is_comp(page)); #endif /* UNIV_BTR_DEBUG */ @@ -263,6 +268,11 @@ get_block = btr_block_get(space, zip_size, left_page_no, RW_X_LATCH, mtr); + + if (srv_pass_corrupt_table && !get_block) { + return; + } + ut_a(get_block); #ifdef UNIV_BTR_DEBUG ut_a(page_is_comp(get_block->frame) == page_is_comp(page)); @@ -274,6 +284,11 @@ get_block = btr_block_get(space, zip_size, page_no, RW_X_LATCH, mtr); + + if (srv_pass_corrupt_table && !get_block) { + return; + } + ut_a(get_block); #ifdef UNIV_BTR_DEBUG ut_a(page_is_comp(get_block->frame) == page_is_comp(page)); #endif /* UNIV_BTR_DEBUG */ @@ -285,6 +300,11 @@ get_block = btr_block_get(space, zip_size, right_page_no, RW_X_LATCH, mtr); + + if (srv_pass_corrupt_table && !get_block) { + return; + } + ut_a(get_block); #ifdef UNIV_BTR_DEBUG ut_a(page_is_comp(get_block->frame) == page_is_comp(page)); @@ -306,6 +326,11 @@ get_block = btr_block_get(space, zip_size, left_page_no, mode, mtr); cursor->left_block = get_block; + + if (srv_pass_corrupt_table && !get_block) { + return; + } + ut_a(get_block); #ifdef UNIV_BTR_DEBUG ut_a(page_is_comp(get_block->frame) == page_is_comp(page)); @@ -316,6 +341,11 @@ } get_block = btr_block_get(space, zip_size, page_no, mode, mtr); + + if (srv_pass_corrupt_table && !get_block) { + return; + } + ut_a(get_block); #ifdef UNIV_BTR_DEBUG ut_a(page_is_comp(get_block->frame) == page_is_comp(page)); #endif /* UNIV_BTR_DEBUG */ @@ -588,6 +618,19 @@ file, line, mtr); if (block == NULL) { + if (srv_pass_corrupt_table + && buf_mode != BUF_GET_IF_IN_POOL + && buf_mode != BUF_GET_IF_IN_POOL_OR_WATCH) { + page_cursor->block = 0; + page_cursor->rec = 0; + if (estimate) { + cursor->path_arr->nth_rec = ULINT_UNDEFINED; + } + goto func_exit; + } + ut_a(buf_mode == BUF_GET_IF_IN_POOL + || buf_mode == BUF_GET_IF_IN_POOL_OR_WATCH); + /* This must be a search to perform an insert/delete mark/ delete; try using the insert/delete buffer */ @@ -662,6 +705,16 @@ block->check_index_page_at_flush = TRUE; page = buf_block_get_frame(block); + if (srv_pass_corrupt_table && !page) { + page_cursor->block = 0; + page_cursor->rec = 0; + if (estimate) { + cursor->path_arr->nth_rec = ULINT_UNDEFINED; + } + goto func_exit; + } + ut_a(page); + if (rw_latch != RW_NO_LATCH) { #ifdef UNIV_ZIP_DEBUG const page_zip_des_t* page_zip @@ -866,6 +919,17 @@ RW_NO_LATCH, NULL, BUF_GET, file, line, mtr); page = buf_block_get_frame(block); + + if (srv_pass_corrupt_table && !page) { + page_cursor->block = 0; + page_cursor->rec = 0; + if (estimate) { + cursor->path_arr->nth_rec = ULINT_UNDEFINED; + } + break; + } + ut_a(page); + ut_ad(index->id == btr_page_get_index_id(page)); block->check_index_page_at_flush = TRUE; @@ -986,6 +1050,14 @@ RW_NO_LATCH, NULL, BUF_GET, file, line, mtr); page = buf_block_get_frame(block); + + if (srv_pass_corrupt_table && !page) { + page_cursor->block = 0; + page_cursor->rec = 0; + break; + } + ut_a(page); + ut_ad(index->id == btr_page_get_index_id(page)); if (height == ULINT_UNDEFINED) { @@ -1199,6 +1271,12 @@ *big_rec = NULL; block = btr_cur_get_block(cursor); + + if (srv_pass_corrupt_table && !block) { + return(DB_CORRUPTION); + } + ut_a(block); + page = buf_block_get_frame(block); index = cursor->index; zip_size = buf_block_get_zip_size(block); @@ -2988,6 +3066,11 @@ block = btr_cur_get_block(cursor); + if (srv_pass_corrupt_table && !block) { + return(DB_CORRUPTION); + } + ut_a(block); + ut_ad(page_is_leaf(buf_block_get_frame(block))); rec = btr_cur_get_rec(cursor); @@ -3701,6 +3784,11 @@ page = btr_cur_get_page(&cursor); + if (srv_pass_corrupt_table && !page) { + break; + } + ut_a(page); + rec = page_rec_get_next(page_get_infimum_rec(page)); if (!page_rec_is_supremum(rec)) { --- a/storage/innobase/btr/btr0pcur.c +++ b/storage/innobase/btr/btr0pcur.c @@ -32,7 +32,7 @@ #include "ut0byte.h" #include "rem0cmp.h" #include "trx0trx.h" - +#include "srv0srv.h" /**************************************************************//** Allocates memory for a persistent cursor object and initializes the cursor. @return own: persistent cursor */ @@ -102,6 +102,12 @@ ut_ad(cursor->latch_mode != BTR_NO_LATCHES); block = btr_pcur_get_block(cursor); + + if (srv_pass_corrupt_table && !block) { + return; + } + ut_a(block); + index = btr_cur_get_index(btr_pcur_get_btr_cur(cursor)); page_cursor = btr_pcur_get_page_cur(cursor); @@ -419,6 +425,15 @@ next_block = btr_block_get(space, zip_size, next_page_no, cursor->latch_mode, mtr); next_page = buf_block_get_frame(next_block); + + if (srv_pass_corrupt_table && !next_page) { + btr_leaf_page_release(btr_pcur_get_block(cursor), + cursor->latch_mode, mtr); + btr_pcur_get_page_cur(cursor)->block = 0; + btr_pcur_get_page_cur(cursor)->rec = 0; + return; + } + ut_a(next_page); #ifdef UNIV_BTR_DEBUG ut_a(page_is_comp(next_page) == page_is_comp(page)); ut_a(btr_page_get_prev(next_page, mtr) --- a/storage/innobase/btr/btr0sea.c +++ b/storage/innobase/btr/btr0sea.c @@ -42,7 +42,7 @@ #include "btr0pcur.h" #include "btr0btr.h" #include "ha0ha.h" - +#include "srv0srv.h" /** Flag: has the search system been enabled? Protected by btr_search_latch and btr_search_enabled_mutex. */ UNIV_INTERN char btr_search_enabled = TRUE; @@ -607,6 +607,11 @@ block = btr_cur_get_block(cursor); + if (srv_pass_corrupt_table && !block) { + return; + } + ut_a(block); + /* NOTE that the following two function calls do NOT protect info or block->n_fields etc. with any semaphore, to save CPU time! We cannot assume the fields are consistent when we return from --- a/storage/innobase/buf/buf0buf.c +++ b/storage/innobase/buf/buf0buf.c @@ -52,6 +52,7 @@ #include "log0recv.h" #include "page0zip.h" #include "trx0trx.h" +#include "srv0start.h" /* prototypes for new functions added to ha_innodb.cc */ trx_t* innobase_get_trx(); @@ -1135,6 +1136,11 @@ ready = buf_flush_ready_for_replace(&block->page); mutex_exit(&block->mutex); + if (block->page.is_corrupt) { + /* corrupt page may remain, it can be skipped */ + break; + } + if (!ready) { return(block); @@ -2006,6 +2012,13 @@ return(NULL); } + if (srv_pass_corrupt_table <= 1) { + if (bpage->is_corrupt) { + rw_lock_s_unlock(&buf_pool->page_hash_latch); + return(NULL); + } + } + block_mutex = buf_page_get_mutex_enter(bpage); rw_lock_s_unlock(&buf_pool->page_hash_latch); @@ -2583,6 +2596,13 @@ return(NULL); } + if (srv_pass_corrupt_table <= 1) { + if (block->page.is_corrupt) { + mutex_exit(block_mutex); + return(NULL); + } + } + switch (buf_block_get_state(block)) { buf_page_t* bpage; ibool success; @@ -3257,6 +3277,7 @@ bpage->newest_modification = 0; bpage->oldest_modification = 0; HASH_INVALIDATE(bpage, hash); + bpage->is_corrupt = FALSE; #if defined UNIV_DEBUG_FILE_ACCESSES || defined UNIV_DEBUG bpage->file_page_was_freed = FALSE; #endif /* UNIV_DEBUG_FILE_ACCESSES || UNIV_DEBUG */ @@ -3841,6 +3862,7 @@ (ulong) bpage->offset); } + if (!srv_pass_corrupt_table || !bpage->is_corrupt) { /* From version 3.23.38 up we store the page checksum to the 4 first bytes of the page end lsn field */ @@ -3882,6 +3904,23 @@ REFMAN "forcing-innodb-recovery.html\n" "InnoDB: about forcing recovery.\n", stderr); + if (srv_pass_corrupt_table && !trx_sys_sys_space(bpage->space) + && bpage->space < SRV_LOG_SPACE_FIRST_ID) { + trx_t* trx; + + fprintf(stderr, + "InnoDB: space %u will be treated as corrupt.\n", + bpage->space); + fil_space_set_corrupt(bpage->space); + + trx = innobase_get_trx(); + if (trx && trx->dict_operation_lock_mode == RW_X_LATCH) { + dict_table_set_corrupt_by_space(bpage->space, FALSE); + } else { + dict_table_set_corrupt_by_space(bpage->space, TRUE); + } + bpage->is_corrupt = TRUE; + } else if (srv_force_recovery < SRV_FORCE_IGNORE_CORRUPT) { fputs("InnoDB: Ending processing because of" " a corrupt database page.\n", @@ -3889,6 +3928,7 @@ exit(1); } } + } /**/ if (recv_recovery_is_on()) { /* Pages must be uncompressed for crash recovery. */ @@ -3898,8 +3938,11 @@ if (uncompressed && !recv_no_ibuf_operations) { ibuf_merge_or_delete_for_page( + /* Delete possible entries, if bpage is_corrupt */ + (srv_pass_corrupt_table && bpage->is_corrupt) ? NULL : (buf_block_t*) bpage, bpage->space, bpage->offset, buf_page_get_zip_size(bpage), + (srv_pass_corrupt_table && bpage->is_corrupt) ? FALSE : TRUE); } } --- a/storage/innobase/buf/buf0rea.c +++ b/storage/innobase/buf/buf0rea.c @@ -193,7 +193,14 @@ ((buf_block_t*) bpage)->frame, bpage, trx); } thd_wait_end(NULL); + + if (srv_pass_corrupt_table) { + if (*err != DB_SUCCESS) { + bpage->is_corrupt = TRUE; + } + } else { ut_a(*err == DB_SUCCESS); + } if (sync) { /* The i/o is already completed when we arrive from --- a/storage/innobase/dict/dict0dict.c +++ b/storage/innobase/dict/dict0dict.c @@ -54,6 +54,7 @@ #include "row0merge.h" #include "m_ctype.h" /* my_isspace() */ #include "ha_prototypes.h" /* innobase_strcasecmp(), innobase_casedn_str()*/ +#include "srv0start.h" /* SRV_LOG_SPACE_FIRST_ID */ #include @@ -750,7 +751,7 @@ mutex_exit(&(dict_sys->mutex)); - if (table != NULL) { + if (table != NULL && !table->is_corrupt) { /* If table->ibd_file_missing == TRUE, this will print an error message and return without doing anything. */ @@ -1293,7 +1294,7 @@ + dict_sys->size) > srv_dict_size_limit ) { prev_table = UT_LIST_GET_PREV(table_LRU, table); - if (table == self || table->n_mysql_handles_opened) + if (table == self || table->n_mysql_handles_opened || table->is_corrupt) goto next_loop; cached_foreign_tables = 0; @@ -4366,6 +4367,12 @@ heap = mem_heap_create(1000); while (index) { + if (table->is_corrupt) { + ut_a(srv_pass_corrupt_table); + mem_heap_free(heap); + return(FALSE); + } + size = btr_get_size(index, BTR_TOTAL_SIZE); index->stat_index_size = size; @@ -4513,6 +4520,12 @@ heap = mem_heap_create(1000); while (index) { + if (table->is_corrupt) { + ut_a(srv_pass_corrupt_table); + mem_heap_free(heap); + return; + } + /*===========================================*/ { dict_table_t* sys_stats; @@ -4705,6 +4718,13 @@ || (srv_force_recovery < SRV_FORCE_NO_LOG_REDO && dict_index_is_clust(index)))) { ulint size; + + if (table->is_corrupt) { + ut_a(srv_pass_corrupt_table); + dict_table_stats_unlock(table, RW_X_LATCH); + return; + } + size = btr_get_size(index, BTR_TOTAL_SIZE); index->stat_index_size = size; @@ -5501,4 +5521,42 @@ rw_lock_free(&dict_table_stats_latches[i]); } } + +/************************************************************************* +set is_corrupt flag by space_id*/ + +void +dict_table_set_corrupt_by_space( +/*============================*/ + ulint space_id, + ibool need_mutex) +{ + dict_table_t* table; + ibool found = FALSE; + + ut_a(!trx_sys_sys_space(space_id) && space_id < SRV_LOG_SPACE_FIRST_ID); + + if (need_mutex) + mutex_enter(&(dict_sys->mutex)); + + table = UT_LIST_GET_FIRST(dict_sys->table_LRU); + + while (table) { + if (table->space == space_id) { + table->is_corrupt = TRUE; + found = TRUE; + } + + table = UT_LIST_GET_NEXT(table_LRU, table); + } + + if (need_mutex) + mutex_exit(&(dict_sys->mutex)); + + if (!found) { + fprintf(stderr, "InnoDB: space to be marked as " + "crashed was not found for id %lu.\n", + (ulong) space_id); + } +} #endif /* !UNIV_HOTBACKUP */ --- a/storage/innobase/dict/dict0mem.c +++ b/storage/innobase/dict/dict0mem.c @@ -96,6 +96,8 @@ /* The number of transactions that are either waiting on the AUTOINC lock or have been granted the lock. */ table->n_waiting_or_granted_auto_inc_locks = 0; + + table->is_corrupt = FALSE; #endif /* !UNIV_HOTBACKUP */ ut_d(table->magic_n = DICT_TABLE_MAGIC_N); --- a/storage/innobase/fil/fil0fil.c +++ b/storage/innobase/fil/fil0fil.c @@ -235,6 +235,7 @@ file we have written to */ ibool is_in_unflushed_spaces; /*!< TRUE if this space is currently in unflushed_spaces */ + ibool is_corrupt; UT_LIST_NODE_T(fil_space_t) space_list; /*!< list of all spaces */ ulint magic_n;/*!< FIL_SPACE_MAGIC_N */ @@ -1294,6 +1295,8 @@ ut_fold_string(name), space); space->is_in_unflushed_spaces = FALSE; + space->is_corrupt = FALSE; + UT_LIST_ADD_LAST(space_list, fil_system->space_list, space); mutex_exit(&fil_system->mutex); @@ -5268,6 +5271,34 @@ ut_a(byte_offset % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a((len % OS_FILE_LOG_BLOCK_SIZE) == 0); + if (srv_pass_corrupt_table == 1 && space->is_corrupt) { + /* should ignore i/o for the crashed space */ + mutex_enter(&fil_system->mutex); + fil_node_complete_io(node, fil_system, type); + mutex_exit(&fil_system->mutex); + if (mode == OS_AIO_NORMAL) { + ut_a(space->purpose == FIL_TABLESPACE); + buf_page_io_complete(message); + } + if (type == OS_FILE_READ) { + return(DB_TABLESPACE_DELETED); + } else { + return(DB_SUCCESS); + } + } else { + if (srv_pass_corrupt_table > 1 && space->is_corrupt) { + /* should ignore write i/o for the crashed space */ + if (type == OS_FILE_WRITE) { + mutex_enter(&fil_system->mutex); + fil_node_complete_io(node, fil_system, type); + mutex_exit(&fil_system->mutex); + if (mode == OS_AIO_NORMAL) { + ut_a(space->purpose == FIL_TABLESPACE); + buf_page_io_complete(message); + } + return(DB_SUCCESS); + } + } #ifdef UNIV_HOTBACKUP /* In ibbackup do normal i/o, not aio */ if (type == OS_FILE_READ) { @@ -5282,6 +5313,8 @@ ret = os_aio(type, mode | wake_later, node->name, node->handle, buf, offset_low, offset_high, len, node, message, trx); #endif + } /**/ + ut_a(ret); if (mode == OS_AIO_SYNC) { @@ -5782,3 +5815,46 @@ return 0; } } + +/************************************************************************* +functions to access is_corrupt flag of fil_space_t*/ + +ibool +fil_space_is_corrupt( +/*=================*/ + ulint space_id) +{ + fil_space_t* space; + ibool ret = FALSE; + + mutex_enter(&fil_system->mutex); + + space = fil_space_get_by_id(space_id); + + if (space && space->is_corrupt) { + ret = TRUE; + } + + mutex_exit(&fil_system->mutex); + + return(ret); +} + +void +fil_space_set_corrupt( +/*==================*/ + ulint space_id) +{ + fil_space_t* space; + + mutex_enter(&fil_system->mutex); + + space = fil_space_get_by_id(space_id); + + if (space) { + space->is_corrupt = TRUE; + } + + mutex_exit(&fil_system->mutex); +} + --- a/storage/innobase/fsp/fsp0fsp.c +++ b/storage/innobase/fsp/fsp0fsp.c @@ -369,6 +369,12 @@ ut_ad(id || !zip_size); block = buf_page_get(id, zip_size, 0, RW_X_LATCH, mtr); + + if (srv_pass_corrupt_table && !block) { + return(0); + } + ut_a(block); + header = FSP_HEADER_OFFSET + buf_block_get_frame(block); buf_block_dbg_add_level(block, SYNC_FSP_PAGE); @@ -787,6 +793,12 @@ fsp_header_t* sp_header; block = buf_page_get(space, zip_size, 0, RW_X_LATCH, mtr); + + if (srv_pass_corrupt_table && !block) { + return(0); + } + ut_a(block); + buf_block_dbg_add_level(block, SYNC_FSP_PAGE); sp_header = FSP_HEADER_OFFSET + buf_block_get_frame(block); @@ -1866,6 +1878,11 @@ { fseg_inode_t* inode; + if (srv_pass_corrupt_table && !page) { + return(ULINT_UNDEFINED); + } + ut_a(page); + for (; i < FSP_SEG_INODES_PER_PAGE(zip_size); i++) { inode = fsp_seg_inode_page_get_nth_inode( @@ -1979,6 +1996,11 @@ page = buf_block_get_frame(block); + if (srv_pass_corrupt_table && !page) { + return(0); + } + ut_a(page); + n = fsp_seg_inode_page_find_free(page, 0, zip_size, mtr); ut_a(n != ULINT_UNDEFINED); @@ -2072,6 +2094,11 @@ inode = fut_get_ptr(space, zip_size, inode_addr, RW_X_LATCH, mtr); + if (srv_pass_corrupt_table && !inode) { + return(0); + } + ut_a(inode); + if (UNIV_UNLIKELY(!mach_read_from_8(inode + FSEG_ID))) { inode = NULL; @@ -2098,7 +2125,7 @@ { fseg_inode_t* inode = fseg_inode_try_get(header, space, zip_size, mtr); - ut_a(inode); + ut_a(srv_pass_corrupt_table || inode); return(inode); } @@ -3304,6 +3331,11 @@ descr = xdes_get_descriptor(space, zip_size, page, mtr); + if (srv_pass_corrupt_table && !descr) { + /* The page may be corrupt. pass it. */ + return; + } + ut_a(descr); if (xdes_get_bit(descr, XDES_FREE_BIT, page % FSP_EXTENT_SIZE, mtr)) { fputs("InnoDB: Dump of the tablespace extent descriptor: ", @@ -3551,6 +3583,11 @@ descr = xdes_get_descriptor(space, zip_size, header_page, mtr); + if (srv_pass_corrupt_table && !descr) { + /* The page may be corrupt. pass it. */ + return(TRUE); + } + /* Check that the header resides on a page which has not been freed yet */ @@ -3635,6 +3672,12 @@ inode = fseg_inode_get(header, space, zip_size, mtr); + if (srv_pass_corrupt_table && !inode) { + /* ignore the corruption */ + return(TRUE); + } + ut_a(inode); + descr = fseg_get_first_extent(inode, space, zip_size, mtr); if (descr != NULL) { --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -3979,6 +3979,12 @@ DBUG_RETURN(1); } + if (srv_pass_corrupt_table <= 1 && share->ib_table && share->ib_table->is_corrupt) { + free_share(share); + + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + /* Create buffers for packing the fields of a record. Why table->reclength did not work here? Obviously, because char fields when packed actually became 1 byte longer, when we also @@ -4006,6 +4012,19 @@ /* Get pointer to a table object in InnoDB dictionary cache */ ib_table = dict_table_get(norm_name, TRUE); + if (srv_pass_corrupt_table <= 1 && ib_table && ib_table->is_corrupt) { + free_share(share); + my_free(upd_buff); + + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + if (share->ib_table) { + ut_a(share->ib_table == ib_table); + } else { + share->ib_table = ib_table; + } + if (NULL == ib_table) { if (is_part && retries < 10) { ++retries; @@ -5174,6 +5193,10 @@ ha_statistic_increment(&SSV::ha_write_count); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) table->timestamp_field->set_time(); @@ -5391,6 +5414,10 @@ func_exit: innobase_active_small(); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + DBUG_RETURN(error_result); } @@ -5567,6 +5594,10 @@ ha_statistic_increment(&SSV::ha_update_count); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) table->timestamp_field->set_time(); @@ -5656,6 +5687,10 @@ innobase_active_small(); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + DBUG_RETURN(error); } @@ -5677,6 +5712,10 @@ ha_statistic_increment(&SSV::ha_delete_count); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + if (!prebuilt->upd_node) { row_get_prebuilt_update_vector(prebuilt); } @@ -5703,6 +5742,10 @@ innobase_active_small(); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + DBUG_RETURN(error); } @@ -5942,6 +5985,10 @@ ha_statistic_increment(&SSV::ha_read_key_count); + if (srv_pass_corrupt_table <= 1 && share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + index = prebuilt->index; if (UNIV_UNLIKELY(index == NULL)) { @@ -6007,6 +6054,10 @@ ret = DB_UNSUPPORTED; } + if (srv_pass_corrupt_table <= 1 && share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + switch (ret) { case DB_SUCCESS: error = 0; @@ -6122,6 +6173,10 @@ { DBUG_ENTER("change_active_index"); + if (srv_pass_corrupt_table <= 1 && share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + ut_ad(user_thd == ha_thd()); ut_a(prebuilt->trx == thd_to_trx(user_thd)); @@ -6212,6 +6267,10 @@ DBUG_ENTER("general_fetch"); + if (srv_pass_corrupt_table <= 1 && share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + ut_a(prebuilt->trx == thd_to_trx(user_thd)); innodb_srv_conc_enter_innodb(prebuilt->trx); @@ -6221,6 +6280,10 @@ innodb_srv_conc_exit_innodb(prebuilt->trx); + if (srv_pass_corrupt_table <= 1 && share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + switch (ret) { case DB_SUCCESS: error = 0; @@ -7487,10 +7550,18 @@ update_thd(ha_thd()); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + /* Truncate the table in InnoDB */ error = row_truncate_table_for_mysql(prebuilt->table, prebuilt->trx); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + error = convert_error_code_to_mysql(error, prebuilt->table->flags, NULL); @@ -7991,6 +8062,16 @@ return(ranges + (double) rows / (double) total_rows * time_for_scan); } +UNIV_INTERN +bool +ha_innobase::is_corrupt() const +{ + if (share->ib_table) + return ((bool)share->ib_table->is_corrupt); + else + return (FALSE); +} + /*********************************************************************//** Calculates the key number used inside MySQL for an Innobase index. We will first check the "index translation table" for a match of the index to get @@ -8168,7 +8249,7 @@ ib_table = prebuilt->table; if (flag & HA_STATUS_TIME) { - if (called_from_analyze || innobase_stats_on_metadata) { + if ((called_from_analyze || innobase_stats_on_metadata) && !share->ib_table->is_corrupt) { /* In sql_show we call with this flag: update then statistics so that they are up-to-date */ @@ -8461,10 +8542,18 @@ THD* thd, /*!< in: connection thread handle */ HA_CHECK_OPT* check_opt) /*!< in: currently ignored */ { + if (share->ib_table->is_corrupt) { + return(HA_ADMIN_CORRUPT); + } + /* Simply call ::info() with all the flags */ info_low(HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE, true /* called from analyze */); + if (share->ib_table->is_corrupt) { + return(HA_ADMIN_CORRUPT); + } + return(0); } @@ -8646,6 +8735,10 @@ my_error(ER_QUERY_INTERRUPTED, MYF(0)); } + if (share->ib_table->is_corrupt) { + return(HA_ADMIN_CORRUPT); + } + DBUG_RETURN(is_ok ? HA_ADMIN_OK : HA_ADMIN_CORRUPT); } @@ -9416,6 +9509,10 @@ update_thd(thd); + if (share->ib_table->is_corrupt) { + DBUG_RETURN(HA_ERR_CRASHED); + } + if (prebuilt->table->ibd_file_missing && !thd_tablespace_op(thd)) { ut_print_timestamp(stderr); fprintf(stderr, @@ -11823,6 +11920,26 @@ "0 (the default) disables automatic dumps.", NULL, NULL, 0, 0, UINT_MAX32, 0); +const char *corrupt_table_action_names[]= +{ + "assert", /* 0 */ + "warn", /* 1 */ + "salvage", /* 2 */ + NullS +}; +TYPELIB corrupt_table_action_typelib= +{ + array_elements(corrupt_table_action_names) - 1, "corrupt_table_action_typelib", + corrupt_table_action_names, NULL +}; +static MYSQL_SYSVAR_ENUM(corrupt_table_action, srv_pass_corrupt_table, + PLUGIN_VAR_RQCMDARG, + "Warn corruptions of user tables as 'corrupt table' instead of not crashing itself, " + "when used with file_per_table. " + "All file io for the datafile after detected as corrupt are disabled, " + "except for the deletion.", + NULL, NULL, 0, &corrupt_table_action_typelib); + static struct st_mysql_sys_var* innobase_system_variables[]= { MYSQL_SYSVAR(additional_mem_pool_size), MYSQL_SYSVAR(autoextend_increment), @@ -11910,6 +12027,7 @@ MYSQL_SYSVAR(purge_threads), MYSQL_SYSVAR(purge_batch_size), MYSQL_SYSVAR(rollback_segments), + MYSQL_SYSVAR(corrupt_table_action), NULL }; --- a/storage/innobase/handler/ha_innodb.h +++ b/storage/innobase/handler/ha_innodb.h @@ -52,6 +52,7 @@ innodb_idx_translate_t idx_trans_tbl; /*!< index translation table between MySQL and Innodb */ + dict_table_t* ib_table; } INNOBASE_SHARE; @@ -135,6 +136,7 @@ int close(void); double scan_time(); double read_time(uint index, uint ranges, ha_rows rows); + bool is_corrupt() const; int write_row(uchar * buf); int update_row(const uchar * old_data, uchar * new_data); --- a/storage/innobase/include/btr0btr.ic +++ b/storage/innobase/include/btr0btr.ic @@ -28,7 +28,7 @@ #include "mtr0mtr.h" #include "mtr0log.h" #include "page0zip.h" - +#include "srv0srv.h" #define BTR_MAX_NODE_LEVEL 50 /*!< Maximum B-tree page level (not really a hard limit). Used in debug assertions @@ -55,7 +55,9 @@ block = buf_page_get_gen(space, zip_size, page_no, mode, NULL, BUF_GET, file, line, mtr); - if (mode != RW_NO_LATCH) { + ut_a(srv_pass_corrupt_table || block); + + if (block && mode != RW_NO_LATCH) { buf_block_dbg_add_level(block, SYNC_TREE_NODE); } --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -1025,7 +1025,7 @@ const buf_block_t* block) /*!< in: pointer to the control block */ __attribute__((pure)); #else /* UNIV_DEBUG */ -# define buf_block_get_frame(block) (block)->frame +# define buf_block_get_frame(block) (block ? (block)->frame : 0) #endif /* UNIV_DEBUG */ /*********************************************************************//** Gets the space id of a block. @@ -1472,6 +1472,7 @@ 0 if the block was never accessed in the buffer pool */ /* @} */ + ibool is_corrupt; # if defined UNIV_DEBUG_FILE_ACCESSES || defined UNIV_DEBUG ibool file_page_was_freed; /*!< this is set to TRUE when fsp --- a/storage/innobase/include/buf0buf.ic +++ b/storage/innobase/include/buf0buf.ic @@ -34,7 +34,7 @@ #include "buf0flu.h" #include "buf0lru.h" #include "buf0rea.h" - +#include "srv0srv.h" /*********************************************************************//** Gets the current size of buffer buf_pool in bytes. @return size in bytes */ @@ -619,6 +619,12 @@ /*================*/ const buf_block_t* block) /*!< in: pointer to the control block */ { + ut_a(srv_pass_corrupt_table || block); + + if (srv_pass_corrupt_table && !block) { + return(0); + } + ut_ad(block); switch (buf_block_get_state(block)) { --- a/storage/innobase/include/dict0dict.h +++ b/storage/innobase/include/dict0dict.h @@ -1258,6 +1258,15 @@ dict_close(void); /*============*/ +/************************************************************************* +set is_corrupt flag by space_id*/ + +void +dict_table_set_corrupt_by_space( +/*============================*/ + ulint space_id, + ibool need_mutex); + #ifndef UNIV_NONINL #include "dict0dict.ic" #endif --- a/storage/innobase/include/dict0mem.h +++ b/storage/innobase/include/dict0mem.h @@ -666,6 +666,7 @@ the AUTOINC lock on this table. */ /* @} */ /*----------------------*/ + ibool is_corrupt; #endif /* !UNIV_HOTBACKUP */ #ifdef UNIV_DEBUG --- a/storage/innobase/include/fil0fil.h +++ b/storage/innobase/include/fil0fil.h @@ -750,6 +750,19 @@ fil_system_hash_nodes(void); /*========================*/ +/************************************************************************* +functions to access is_corrupt flag of fil_space_t*/ + +ibool +fil_space_is_corrupt( +/*=================*/ + ulint space_id); + +void +fil_space_set_corrupt( +/*==================*/ + ulint space_id); + typedef struct fil_space_struct fil_space_t; #endif --- a/storage/innobase/include/fut0fut.ic +++ b/storage/innobase/include/fut0fut.ic @@ -23,6 +23,7 @@ Created 12/13/1995 Heikki Tuuri ***********************************************************************/ +#include "srv0srv.h" #include "sync0rw.h" #include "buf0buf.h" @@ -48,6 +49,12 @@ ut_ad((rw_latch == RW_S_LATCH) || (rw_latch == RW_X_LATCH)); block = buf_page_get(space, zip_size, addr.page, rw_latch, mtr); + + if (srv_pass_corrupt_table && !block) { + return(0); + } + ut_a(block); + ptr = buf_block_get_frame(block) + addr.boffset; buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK); --- a/storage/innobase/include/page0page.h +++ b/storage/innobase/include/page0page.h @@ -527,7 +527,7 @@ page_is_leaf( /*=========*/ const page_t* page) /*!< in: page */ - __attribute__((nonnull, pure)); + __attribute__((pure)); /************************************************************//** Gets the pointer to the next record on the page. @return pointer to next record */ --- a/storage/innobase/include/page0page.ic +++ b/storage/innobase/include/page0page.ic @@ -274,6 +274,9 @@ /*=========*/ const page_t* page) /*!< in: page */ { + if (!page) { + return(FALSE); + } return(!*(const uint16*) (page + (PAGE_HEADER + PAGE_LEVEL))); } --- a/storage/innobase/include/page0zip.h +++ b/storage/innobase/include/page0zip.h @@ -114,7 +114,7 @@ const page_t* page, /*!< in: uncompressed page */ dict_index_t* index, /*!< in: index of the B-tree node */ mtr_t* mtr) /*!< in: mini-transaction, or NULL */ - __attribute__((nonnull(1,2,3))); + __attribute__((nonnull(1,3))); /**********************************************************************//** Decompress a page. This function should tolerate errors on the compressed --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -240,6 +240,7 @@ extern ulint srv_adaptive_flushing_method; extern ulint srv_expand_import; +extern ulint srv_pass_corrupt_table; extern ulint srv_dict_size_limit; /*-------------------------------------------*/ --- a/storage/innobase/page/page0zip.c +++ b/storage/innobase/page/page0zip.c @@ -1195,6 +1195,10 @@ FILE* logfile = NULL; #endif + if (!page) { + return(FALSE); + } + ut_a(page_is_comp(page)); ut_a(fil_page_get_type(page) == FIL_PAGE_INDEX); ut_ad(page_simple_validate_new((page_t*) page)); --- a/storage/innobase/row/row0ins.c +++ b/storage/innobase/row/row0ins.c @@ -1335,6 +1335,12 @@ const rec_t* rec = btr_pcur_get_rec(&pcur); const buf_block_t* block = btr_pcur_get_block(&pcur); + if (srv_pass_corrupt_table && !block) { + err = DB_CORRUPTION; + break; + } + ut_a(block); + if (page_rec_is_infimum(rec)) { continue; --- a/storage/innobase/row/row0merge.c +++ b/storage/innobase/row/row0merge.c @@ -1245,6 +1245,13 @@ if (UNIV_LIKELY(has_next)) { rec = btr_pcur_get_rec(&pcur); + + if (srv_pass_corrupt_table && !rec) { + err = DB_CORRUPTION; + goto err_exit; + } + ut_a(rec); + offsets = rec_get_offsets(rec, clust_index, NULL, ULINT_UNDEFINED, &row_heap); --- a/storage/innobase/row/row0sel.c +++ b/storage/innobase/row/row0sel.c @@ -3854,6 +3854,13 @@ /* PHASE 4: Look for matching records in a loop */ rec = btr_pcur_get_rec(pcur); + + if (srv_pass_corrupt_table && !rec) { + err = DB_CORRUPTION; + goto lock_wait_or_error; + } + ut_a(rec); + ut_ad(!!page_rec_is_comp(rec) == comp); #ifdef UNIV_SEARCH_DEBUG /* @@ -3931,7 +3938,13 @@ if (UNIV_UNLIKELY(next_offs >= UNIV_PAGE_SIZE - PAGE_DIR)) { wrong_offs: - if (srv_force_recovery == 0 || moves_up == FALSE) { + if (srv_pass_corrupt_table && !trx_sys_sys_space(index->table->space)) { + index->table->is_corrupt = TRUE; + fil_space_set_corrupt(index->table->space); + } + + if ((srv_force_recovery == 0 || moves_up == FALSE) + && srv_pass_corrupt_table <= 1) { ut_print_timestamp(stderr); buf_page_print(page_align(rec), 0); fprintf(stderr, @@ -3982,7 +3995,8 @@ offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap); - if (UNIV_UNLIKELY(srv_force_recovery > 0)) { + if (UNIV_UNLIKELY(srv_force_recovery > 0) + || (srv_pass_corrupt_table == 2 && index->table->is_corrupt)) { if (!rec_validate(rec, offsets) || !btr_index_rec_validate(rec, index, FALSE)) { fprintf(stderr, --- a/storage/innobase/srv/srv0srv.c +++ b/storage/innobase/srv/srv0srv.c @@ -430,6 +430,7 @@ UNIV_INTERN ulint srv_adaptive_flushing_method = 0; /* 0: native 1: estimate 2: keep_average */ UNIV_INTERN ulint srv_expand_import = 0; /* 0:disable 1:enable */ +UNIV_INTERN ulint srv_pass_corrupt_table = 0; /* 0:disable 1:enable */ UNIV_INTERN ulint srv_dict_size_limit = 0; /*-------------------------------------------*/ --- a/storage/innobase/srv/srv0start.c +++ b/storage/innobase/srv/srv0start.c @@ -2149,6 +2149,13 @@ os_fast_mutex_free(&srv_os_test_mutex); + if (!srv_file_per_table_original_value + && srv_pass_corrupt_table) { + fprintf(stderr, "InnoDB: Warning:" + " The option innodb_file_per_table is disabled," + " so using the option innodb_pass_corrupt_table doesn't make sense.\n"); + } + if (srv_print_verbose_log) { ut_print_timestamp(stderr); fprintf(stderr,