--- a/include/my_sys.h +++ b/include/my_sys.h @@ -524,6 +524,8 @@ #define my_b_tell(info) ((info)->pos_in_file + \ (size_t) (*(info)->current_pos - (info)->request_pos)) +#define my_b_write_tell(info) ((info)->pos_in_file + \ + ((info)->write_pos - (info)->write_buffer)) #define my_b_get_buffer_start(info) (info)->request_pos #define my_b_get_bytes_in_buffer(info) (char*) (info)->read_end - \ --- a/include/mysql/plugin.h +++ b/include/mysql/plugin.h @@ -559,6 +559,8 @@ #define EXTENDED_FOR_USERSTAT +#define EXTENDED_FOR_COMMIT_ORDERED + /** Create a temporary file. --- a/sql/handler.cc +++ b/sql/handler.cc @@ -90,6 +90,8 @@ static TYPELIB known_extensions= {0,"known_exts", NULL, NULL}; uint known_extensions_id= 0; +static int commit_one_phase_low(THD *thd, bool all, THD_TRANS *trans, + bool is_real_trans); static plugin_ref ha_default_plugin(THD *thd) @@ -1119,7 +1121,8 @@ */ bool is_real_trans= all || thd->transaction.all.ha_list == 0; Ha_trx_info *ha_info= trans->ha_list; - my_xid xid= thd->transaction.xid_state.xid.get_my_xid(); + bool need_commit_ordered; + my_xid xid; DBUG_ENTER("ha_commit_trans"); /* @@ -1152,13 +1155,20 @@ DBUG_RETURN(2); } - if (ha_info) + if (!ha_info) + { + /* Free resources and perform other cleanup even for 'empty' transactions. */ + if (is_real_trans) + thd->transaction.cleanup(); + DBUG_RETURN(0); + } + else { uint rw_ha_count; bool rw_trans; MDL_request mdl_request; - DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE();); + DBUG_EXECUTE_IF("crash_commit_before", abort();); /* Close all cursors that can not survive COMMIT */ if (is_real_trans) /* not a statement commit */ @@ -1197,57 +1207,80 @@ !thd->slave_thread) { my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only"); - ha_rollback_trans(thd, all); - error= 1; - goto end; + goto err; } - if (!trans->no_2pc && (rw_ha_count > 1)) + if (trans->no_2pc || (rw_ha_count <= 1)) { - for (; ha_info && !error; ha_info= ha_info->next()) + error= ha_commit_one_phase(thd, all); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + } + + need_commit_ordered= FALSE; + xid= thd->transaction.xid_state.xid.get_my_xid(); + + for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) { int err; - handlerton *ht= ha_info->ht(); + handlerton *ht= hi->ht(); /* Do not call two-phase commit if this particular transaction is read-only. This allows for simpler implementation in engines that are always read-only. */ - if (! ha_info->is_trx_read_write()) + if (! hi->is_trx_read_write()) continue; /* Sic: we know that prepare() is not NULL since otherwise trans->no_2pc would have been set. */ - if ((err= ht->prepare(ht, thd, all))) - { - my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); - error= 1; - } + err= ht->prepare(ht, thd, all); status_var_increment(thd->status_var.ha_prepare_count); + if (err) + my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); + + if (err) + goto err; + + need_commit_ordered|= (ht->commit_ordered != NULL); } - DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); - if (error || (is_real_trans && xid && - (error= !(cookie= tc_log->log_xid(thd, xid))))) + DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT();); + + if (!is_real_trans) { - ha_rollback_trans(thd, all); - error= 1; + error= commit_one_phase_low(thd, all, trans, is_real_trans); + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); goto end; } - DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); - } - error=ha_commit_one_phase(thd, all) ? (cookie ? 2 : 1) : 0; - DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); - if (cookie) + + cookie= tc_log->log_and_order(thd, xid, all, need_commit_ordered); + if (!cookie) + goto err; + + DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT();); + + error= commit_one_phase_low(thd, all, trans, is_real_trans) ? 2 : 0; + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + if (is_real_trans) /* userstat.patch */ + thd->diff_commit_trans++; /* userstat.patch */ + RUN_HOOK(transaction, after_commit, (thd, FALSE)); + + DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT();); if(tc_log->unlog(cookie, xid)) { error= 2; goto end; } - DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE();); - if (is_real_trans) - thd->diff_commit_trans++; - RUN_HOOK(transaction, after_commit, (thd, FALSE)); + + DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + goto end; + + /* Come here if error and we need to rollback. */ +err: + error= 1; /* Transaction was rolled back */ + ha_rollback_trans(thd, all); + end: if (rw_trans && mdl_request.ticket) { @@ -1260,9 +1293,6 @@ thd->mdl_context.release_lock(mdl_request.ticket); } } - /* Free resources and perform other cleanup even for 'empty' transactions. */ - else if (is_real_trans) - thd->transaction.cleanup(); DBUG_RETURN(error); } @@ -1279,7 +1309,6 @@ int ha_commit_one_phase(THD *thd, bool all) { - int error=0; THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; /* "real" is a nick name for a transaction for which a commit will @@ -1295,8 +1324,16 @@ transaction.all.ha_list, see why in trans_register_ha()). */ bool is_real_trans=all || thd->transaction.all.ha_list == 0; - Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; DBUG_ENTER("ha_commit_one_phase"); + DBUG_RETURN(commit_one_phase_low(thd, all, trans, is_real_trans)); +} + +static int +commit_one_phase_low(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) +{ + int error= 0; + Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; + DBUG_ENTER("commit_one_phase_low"); if (ha_info) { @@ -1894,7 +1931,16 @@ { bool warn= true; + /* + Holding the LOCK_commit_ordered mutex ensures that we get the same + snapshot for all engines (including the binary log). This allows us + among other things to do backups with + START TRANSACTION WITH CONSISTENT SNAPSHOT and + have a consistent binlog position. + */ + mysql_mutex_lock(&LOCK_commit_ordered); plugin_foreach(thd, snapshot_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &warn); + mysql_mutex_unlock(&LOCK_commit_ordered); /* Same idea as when one wants to CREATE TABLE in one engine which does not --- a/sql/handler.h +++ b/sql/handler.h @@ -756,6 +756,53 @@ and 'real commit' mean the same event. */ int (*commit)(handlerton *hton, THD *thd, bool all); + /* + The commit_ordered() method is called prior to the commit() method, after + the transaction manager has decided to commit (not rollback) the + transaction. Unlike commit(), commit_ordered() is called only when the + full transaction is committed, not for each commit of statement + transaction in a multi-statement transaction. + + Not that like prepare(), commit_ordered() is only called when 2-phase + commit takes place. Ie. when no binary log and only a single engine + participates in a transaction, one commit() is called, no + commit_ordered(). So engines must be prepared for this. + + The calls to commit_ordered() in multiple parallel transactions is + guaranteed to happen in the same order in every participating + handler. This can be used to ensure the same commit order among multiple + handlers (eg. in table handler and binlog). So if transaction T1 calls + into commit_ordered() of handler A before T2, then T1 will also call + commit_ordered() of handler B before T2. + + Engines that implement this method should during this call make the + transaction visible to other transactions, thereby making the order of + transaction commits be defined by the order of commit_ordered() calls. + + The intention is that commit_ordered() should do the minimal amount of + work that needs to happen in consistent commit order among handlers. To + preserve ordering, calls need to be serialised on a global mutex, so + doing any time-consuming or blocking operations in commit_ordered() will + limit scalability. + + Handlers can rely on commit_ordered() calls to be serialised (no two + calls can run in parallel, so no extra locking on the handler part is + required to ensure this). + + Note that commit_ordered() can be called from a different thread than the + one handling the transaction! So it can not do anything that depends on + thread local storage, in particular it can not call my_error() and + friends (instead it can store the error code and delay the call of + my_error() to the commit() method). + + Similarly, since commit_ordered() returns void, any return error code + must be saved and returned from the commit() method instead. + + The commit_ordered method is optional, and can be left unset if not + needed in a particular handler (then there will be no ordering guarantees + wrt. other engines and binary log). + */ + void (*commit_ordered)(handlerton *hton, THD *thd, bool all); int (*rollback)(handlerton *hton, THD *thd, bool all); int (*prepare)(handlerton *hton, THD *thd, bool all); int (*recover)(handlerton *hton, XID *xid_list, uint len); --- a/sql/log.cc +++ b/sql/log.cc @@ -71,6 +71,25 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); +static LEX_STRING const write_error_msg= + { C_STRING_WITH_LEN("error writing to the binary log") }; + +static my_bool mutexes_inited; +mysql_mutex_t LOCK_group_commit_queue; +mysql_mutex_t LOCK_commit_ordered; + +static ulonglong binlog_status_var_num_commits; +static ulonglong binlog_status_var_num_group_commits; + +static SHOW_VAR binlog_status_vars_detail[]= +{ + {"commits", + (char *)&binlog_status_var_num_commits, SHOW_LONGLONG}, + {"group_commits", + (char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG}, + {NullS, NullS, SHOW_LONG} +}; + /** purge logs, master and slave sides both, related error code convertor. @@ -167,41 +186,6 @@ } /* - Helper class to hold a mutex for the duration of the - block. - - Eliminates the need for explicit unlocking of mutexes on, e.g., - error returns. On passing a null pointer, the sentry will not do - anything. - */ -class Mutex_sentry -{ -public: - Mutex_sentry(mysql_mutex_t *mutex) - : m_mutex(mutex) - { - if (m_mutex) - mysql_mutex_lock(mutex); - } - - ~Mutex_sentry() - { - if (m_mutex) - mysql_mutex_unlock(m_mutex); -#ifndef DBUG_OFF - m_mutex= 0; -#endif - } - -private: - mysql_mutex_t *m_mutex; - - // It's not allowed to copy this object in any way - Mutex_sentry(Mutex_sentry const&); - void operator=(Mutex_sentry const&); -}; - -/* Helper classes to store non-transactional and transactional data before copying it to the binary log. */ @@ -211,7 +195,8 @@ binlog_cache_data(): m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF), incident(FALSE), changes_to_non_trans_temp_table_flag(FALSE), saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0), - ptr_binlog_cache_disk_use(0) + ptr_binlog_cache_disk_use(0), commit_bin_log_file_pos(0), + using_xa(FALSE), xa_xid(0) { } ~binlog_cache_data() @@ -270,6 +255,8 @@ variable after truncating the cache. */ cache_log.disk_writes= 0; + using_xa= FALSE; + commit_bin_log_file_pos= 0; DBUG_ASSERT(empty()); } @@ -411,6 +398,20 @@ binlog_cache_data& operator=(const binlog_cache_data& info); binlog_cache_data(const binlog_cache_data& info); + +public: + /* + Binlog position after current commit, available to storage engines during + commit_ordered() and commit(). + */ + ulonglong commit_bin_log_file_pos; + + /* + Flag set true if this transaction is committed with log_xid() as part of + XA, false if not. + */ + bool using_xa; + my_xid xa_xid; }; class binlog_cache_mngr { @@ -1624,7 +1625,7 @@ */ static inline int binlog_flush_cache(THD *thd, binlog_cache_data* cache_data, Log_event *end_evt, - bool is_transactional) + bool is_transactional, bool all) { DBUG_ENTER("binlog_flush_cache"); int error= 0; @@ -1643,8 +1644,8 @@ were, we would have to ensure that we're not ending a statement inside a stored function. */ - error= mysql_bin_log.write(thd, &cache_data->cache_log, end_evt, - cache_data->has_incident()); + error= mysql_bin_log.write_transaction_to_binlog(thd, cache_data, + end_evt, all); } cache_data->reset(); @@ -1663,12 +1664,12 @@ */ static inline int binlog_commit_flush_stmt_cache(THD *thd, - binlog_cache_mngr *cache_mngr) + binlog_cache_mngr *cache_mngr, bool all) { Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), FALSE, FALSE, TRUE, 0); return (binlog_flush_cache(thd, &cache_mngr->stmt_cache, &end_evt, - FALSE)); + FALSE, all)); } /** @@ -1681,12 +1682,12 @@ nonzero if an error pops up when flushing the cache. */ static inline int -binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr) +binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) { Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE, TRUE, 0); return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt, - TRUE)); + TRUE, all)); } /** @@ -1699,12 +1700,12 @@ nonzero if an error pops up when flushing the cache. */ static inline int -binlog_rollback_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr) +binlog_rollback_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) { Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE, TRUE, 0); return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt, - TRUE)); + TRUE, all)); } /** @@ -1719,11 +1720,11 @@ */ static inline int binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, - my_xid xid) + my_xid xid, bool all) { Xid_log_event end_evt(thd, xid); return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt, - TRUE)); + TRUE, all)); } /** @@ -1785,7 +1786,7 @@ do nothing. just pretend we can do 2pc, so that MySQL won't switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_xid() + real work will be done in MYSQL_BIN_LOG::log_and_order() */ return 0; } @@ -1818,7 +1819,7 @@ if (!cache_mngr->stmt_cache.empty()) { - error= binlog_commit_flush_stmt_cache(thd, cache_mngr); + error= binlog_commit_flush_stmt_cache(thd, cache_mngr, all); } if (cache_mngr->trx_cache.empty()) @@ -1837,7 +1838,7 @@ Otherwise, we accumulate the changes. */ if (!error && ending_trans(thd, all)) - error= binlog_commit_flush_trx_cache(thd, cache_mngr); + error= binlog_commit_flush_trx_cache(thd, cache_mngr, all); /* This is part of the stmt rollback. @@ -1881,7 +1882,7 @@ } else if (!cache_mngr->stmt_cache.empty()) { - error= binlog_commit_flush_stmt_cache(thd, cache_mngr); + error= binlog_commit_flush_stmt_cache(thd, cache_mngr, all); } if (cache_mngr->trx_cache.empty()) @@ -1929,7 +1930,7 @@ (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && thd->variables.binlog_format == BINLOG_FORMAT_MIXED))) - error= binlog_rollback_flush_trx_cache(thd, cache_mngr); + error= binlog_rollback_flush_trx_cache(thd, cache_mngr, all); /* Truncate the cache if: . aborting a single or multi-statement transaction or; @@ -2904,6 +2905,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), need_start_event(TRUE), + group_commit_queue(0), num_commits(0), num_group_commits(0), sync_period_ptr(sync_period), is_relay_log(0), signal_cnt(0), description_event_for_exec(0), description_event_for_queue(0) @@ -5361,19 +5363,15 @@ SYNOPSIS write_cache() cache Cache to write to the binary log - lock_log True if the LOCK_log mutex should be aquired, false otherwise - sync_log True if the log should be flushed and synced DESCRIPTION Write the contents of the cache to the binary log. The cache will be reset as a READ_CACHE to be able to read the contents from it. */ -int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, - bool lock_log, bool sync_log) +int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) { - Mutex_sentry sentry(lock_log ? &LOCK_log : NULL); - + mysql_mutex_assert_owner(&LOCK_log); if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) return ER_ERROR_ON_WRITE; uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; @@ -5484,6 +5482,8 @@ } /* Write data to the binary log file */ + DBUG_EXECUTE_IF("fail_binlog_write_1", + errno= 28; return ER_ERROR_ON_WRITE;); if (my_b_write(&log_file, cache->read_pos, length)) return ER_ERROR_ON_WRITE; thd->binlog_bytes_written+= length; @@ -5492,9 +5492,6 @@ DBUG_ASSERT(carry == 0); - if (sync_log) - return flush_and_sync(0); - return 0; // All OK } @@ -5535,8 +5532,6 @@ if (!is_open()) DBUG_RETURN(error); - LEX_STRING const write_error_msg= - { C_STRING_WITH_LEN("error writing to the binary log") }; Incident incident= INCIDENT_LOST_EVENTS; Incident_log_event ev(thd, incident, write_error_msg); if (lock) @@ -5585,112 +5580,332 @@ 'cache' needs to be reinitialized after this functions returns. */ -bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, - bool incident) +bool +MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_cache_data *cache_data, + Log_event *end_ev, bool all) +{ + group_commit_entry entry; + bool ret; + DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); + + entry.thd= thd; + entry.cache_data= cache_data; + entry.error= 0; + entry.all= all; + + /* + Log "BEGIN" at the beginning of every transaction. Here, a transaction is + either a BEGIN..COMMIT block or a single statement in autocommit mode. + + Create the necessary events here, where we have the correct THD (and + thread context). + + Due to group commit the actual writing to binlog may happen in a different + thread. + */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE, TRUE, 0); + entry.begin_event= &qinfo; + entry.end_event= end_ev; + if (cache_data->has_incident()) + { + Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); + entry.incident_event= &inc_ev; + ret = write_transaction_to_binlog_events(&entry); + } + else + { + entry.incident_event= NULL; + ret = write_transaction_to_binlog_events(&entry); + } + if (!ret) /* userstat.patch */ + thd->binlog_bytes_written += qinfo.data_written; /* userstat.patch */ + DBUG_RETURN(ret); +} + +bool +MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { - DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); + /* + To facilitate group commit for the binlog, we first queue up ourselves in + the group commit queue. Then the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. + */ + entry->thd->clear_wakeup_ready(); + mysql_mutex_lock(&LOCK_group_commit_queue); + group_commit_entry *orig_queue= group_commit_queue; + entry->next= orig_queue; + group_commit_queue= entry; + DEBUG_SYNC(entry->thd, "commit_group_commit_queue"); + mysql_mutex_unlock(&LOCK_group_commit_queue); + + /* + The first in the queue handle group commit for all; the others just wait + to be signalled when group commit is done. + */ + if (orig_queue != NULL) + entry->thd->wait_for_wakeup_ready(); + else + trx_group_commit_leader(entry); + + if (likely(!entry->error)) + return 0; + + switch (entry->error) + { + case ER_ERROR_ON_WRITE: + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno); + break; + case ER_ERROR_ON_READ: + my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + entry->cache_data->cache_log.file_name, entry->commit_errno); + break; + default: + /* + There are not (and should not be) any errors thrown not covered above. + But just in case one is added later without updating the above switch + statement, include a catch-all. + */ + my_printf_error(entry->error, + "Error writing transaction to binary log: %d", + MYF(ME_NOREFRESH), entry->error); + } + /* + Since we return error, this transaction XID will not be committed, so + we need to mark it as not needed for recovery (unlog() is not called + for a transaction if log_xid() fails). + */ + if (entry->cache_data->using_xa && entry->cache_data->xa_xid) + mark_xid_done(); + + return 1; +} + +/* + Do binlog group commit as the lead thread. + + This must be called when this thread/transaction is queued at the start of + the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group + commit all the transactions in the queue (more may have entered while waiting + for LOCK_log). After commit is done, all other threads in the queue will be + signalled. + + */ +void +MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) +{ + DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); + uint xid_count= 0; + uint write_count= 0; + bool check_purge= false; + group_commit_entry *current= 0; DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { - bool check_purge; - + /* + Lock the LOCK_log(), and once we get it, collect any additional writes + that queued up while we were waiting. + */ mysql_mutex_lock(&LOCK_log); + + DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); + mysql_mutex_lock(&LOCK_group_commit_queue); + current= group_commit_queue; + group_commit_queue= NULL; + mysql_mutex_unlock(&LOCK_group_commit_queue); + + /* As the queue is in reverse order of entering, reverse it. */ + group_commit_entry *queue= NULL; + while (current) + { + group_commit_entry *next= current->next; + current->next= queue; + queue= current; + current= next; + } + DBUG_ASSERT(leader == queue /* the leader should be first in queue */); /* - We only bother to write to the binary log if there is anything - to write. - */ - if (my_b_tell(cache) > 0) + Now we have in queue the list of transactions to be committed in order. + + Commit every transaction in the queue. + + Note that we are doing this in a different thread than the one running + the transaction! So we are limited in the operations we can do. In + particular, we cannot call my_error() on behalf of a transaction, as + that obtains the THD from thread local storage. Instead, we must set + current->error and let the thread do the error reporting itself once + we wake it up. + */ + for (current= queue; current != NULL; current= current->next) { + binlog_cache_data *cache_data= current->cache_data; + IO_CACHE *cache= &cache_data->cache_log; + /* - Log "BEGIN" at the beginning of every transaction. Here, a - transaction is either a BEGIN..COMMIT block or a single - statement in autocommit mode. + We only bother to write to the binary log if there is anything + to write. */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE, TRUE, 0); - if (qinfo.write(&log_file)) - goto err; - thd->binlog_bytes_written+= qinfo.data_written; - DBUG_EXECUTE_IF("crash_before_writing_xid", - { - if ((write_error= write_cache(thd, cache, false, true))) - DBUG_PRINT("info", ("error writing binlog cache: %d", - write_error)); - DBUG_PRINT("info", ("crashing before writing xid")); - DBUG_SUICIDE(); - }); - - if ((write_error= write_cache(thd, cache, false, false))) - goto err; - - if (commit_event && commit_event->write(&log_file)) - goto err; - if (commit_event) - thd->binlog_bytes_written+= commit_event->data_written; + if (my_b_tell(cache) > 0) + { + if ((current->error= write_transaction(current))) + current->commit_errno= errno; + write_count++; + } - if (incident && write_incident(thd, FALSE)) - goto err; + cache_data->commit_bin_log_file_pos= my_b_write_tell(&log_file); + if (cache_data->using_xa && cache_data->xa_xid) + xid_count++; + } + if (write_count > 0) + { bool synced= 0; if (flush_and_sync(&synced)) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_SUICIDE();); - if (cache->error) // Error on read { - sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + for (current= queue; current != NULL; current= current->next) + { + if (!current->error) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + } + } + } + else + { + signal_update(); } if (RUN_HOOK(binlog_storage, after_flush, - (thd, log_file_name, log_file.pos_in_file, synced))) + (leader->thd, log_file_name, log_file.pos_in_file, synced))) { sql_print_error("Failed to run 'after_flush' hooks"); - write_error=1; - goto err; + for (current= queue; current != NULL; current= current->next) + { + if (!current->error) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + } + } } - signal_update(); } /* - if commit_event is Xid_log_event, increase the number of - prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated + if any commit_events are Xid_log_event, increase the number of + prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated if there're prepared xids in it - see the comment in new_file() for an explanation. - If the commit_event is not Xid_log_event (then it's a Query_log_event) - rotate binlog, if necessary. + If no Xid_log_events (then it's all Query_log_event) rotate binlog, + if necessary. */ - if (commit_event && commit_event->get_type_code() == XID_EVENT) + if (xid_count > 0) { - mysql_mutex_lock(&LOCK_prep_xids); - prepared_xids++; - mysql_mutex_unlock(&LOCK_prep_xids); - mysql_mutex_unlock(&LOCK_log); + mark_xids_active(xid_count); } else { if (rotate(false, &check_purge)) - goto err; - mysql_mutex_unlock(&LOCK_log); - if (check_purge) - purge(); + { + for (current= queue; current != NULL; current= current->next) + { + if (!current->error) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + } + } + } } - } - DBUG_RETURN(0); + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); + mysql_mutex_lock(&LOCK_commit_ordered); + /* + We cannot unlock LOCK_log until we have locked LOCK_commit_ordered; + otherwise scheduling could allow the next group commit to run ahead of us, + messing up the order of commit_ordered() calls. But as soon as + LOCK_commit_ordered is obtained, we can let the next group commit start. + */ -err: - if (!write_error) - { - write_error= 1; - sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); + mysql_mutex_unlock(&LOCK_log); + + if (xid_count > 0 && check_purge) + { + purge(); + } + + DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log"); + ++num_group_commits; + + /* + Wakeup each participant waiting for our group commit, first calling the + commit_ordered() methods for any transactions doing 2-phase commit. + */ + current= queue; + while (current != NULL) + { + group_commit_entry *next; + + DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); + ++num_commits; + if (current->cache_data->using_xa && !current->error) + run_commit_ordered(current->thd, current->all); + + /* + Careful not to access current->next after waking up the other thread! As + it may change immediately after wakeup. + */ + next= current->next; + if (current != leader) // Don't wake up ourself + current->thd->signal_wakeup_ready(); + current= next; + } + DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); + mysql_mutex_unlock(&LOCK_commit_ordered); } - mysql_mutex_unlock(&LOCK_log); - DBUG_RETURN(1); + + DBUG_VOID_RETURN; } +int +MYSQL_BIN_LOG::write_transaction(group_commit_entry *entry) +{ + binlog_cache_data *cache_data= entry->cache_data; + IO_CACHE *cache= &cache_data->cache_log; + + if (entry->begin_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + DBUG_EXECUTE_IF("crash_before_writing_xid", + { + if ((write_cache(entry->thd, cache))) + DBUG_PRINT("info", ("error writing binlog cache")); + else + flush_and_sync(0); + + DBUG_PRINT("info", ("crashing before writing xid")); + abort(); + }); + + if (write_cache(entry->thd, cache)) + return ER_ERROR_ON_WRITE; + + if (entry->end_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + if (entry->incident_event && entry->incident_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + + if (cache->error) // Error on read + return ER_ERROR_ON_READ; + + return 0; +} + /** Wait until we get a signal that the relay log has been updated. @@ -6095,6 +6310,68 @@ } +void +TC_init() +{ + mysql_mutex_init(key_LOCK_group_commit_queue, &LOCK_group_commit_queue, MY_MUTEX_INIT_SLOW); + mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered, MY_MUTEX_INIT_SLOW); + mutexes_inited= TRUE; +} + + +void +TC_destroy() +{ + if (mutexes_inited) + { + mysql_mutex_destroy(&LOCK_group_commit_queue); + mysql_mutex_destroy(&LOCK_commit_ordered); + mutexes_inited= FALSE; + } +} + + +void +TC_LOG::run_commit_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + mysql_mutex_assert_owner(&LOCK_commit_ordered); + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->commit_ordered) + continue; + ht->commit_ordered(ht, thd, all); + DEBUG_SYNC(thd, "commit_after_run_commit_ordered"); + } +} + +int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, + bool need_commit_ordered) +{ + int cookie; + + cookie= 0; + if (xid) + cookie= log_one_transaction(xid); + + if (need_commit_ordered) + { + /* Only run commit_ordered() if log_xid was successful. */ + if (cookie) + { + mysql_mutex_lock(&LOCK_commit_ordered); + run_commit_ordered(thd, all); + mysql_mutex_unlock(&LOCK_commit_ordered); + } + } + + return cookie; +} + + /********* transaction coordinator log for 2pc - mmap() based solution *******/ /* @@ -6231,6 +6508,7 @@ mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_active, &COND_active, 0); mysql_cond_init(key_COND_pool, &COND_pool, 0); + mysql_cond_init(key_COND_queue_busy, &COND_queue_busy, 0); inited=6; @@ -6238,6 +6516,8 @@ active=pages; pool=pages+1; pool_last=pages+npages-1; + commit_ordered_queue= NULL; + commit_ordered_queue_busy= false; return 0; @@ -6343,7 +6623,7 @@ to the position in memory where xid was logged to. */ -int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid) +int TC_LOG_MMAP::log_one_transaction(my_xid xid) { int err; PAGE *p; @@ -6482,7 +6762,9 @@ mysql_mutex_destroy(&LOCK_sync); mysql_mutex_destroy(&LOCK_active); mysql_mutex_destroy(&LOCK_pool); + mysql_cond_destroy(&COND_active); mysql_cond_destroy(&COND_pool); + mysql_cond_destroy(&COND_queue_busy); case 5: data[0]='A'; // garble the first (signature) byte, in case mysql_file_delete fails case 4: @@ -6692,42 +6974,87 @@ mysql_cond_destroy(&COND_prep_xids); } -/** - @todo - group commit +/* + Do a binlog log_xid() for a group of transactions, linked through + thd->next_commit_ordered. @retval 0 error @retval 1 success */ -int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) +int TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, + bool need_commit_ordered __attribute__((unused))) { - DBUG_ENTER("TC_LOG_BINLOG::log"); + DBUG_ENTER("TC_LOG_BINLOG::log_and_order"); binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + cache_mngr->trx_cache.using_xa= TRUE; + cache_mngr->trx_cache.xa_xid= xid; /* We always commit the entire transaction when writing an XID. Also note that the return value is inverted. */ - DBUG_RETURN(!binlog_commit_flush_stmt_cache(thd, cache_mngr) && - !binlog_commit_flush_trx_cache(thd, cache_mngr, xid)); + DBUG_RETURN(!binlog_commit_flush_stmt_cache(thd, cache_mngr, all) && + !binlog_commit_flush_trx_cache(thd, cache_mngr, xid, all)); } -int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +/* + After an XID is logged, we need to hold on to the current binlog file until + it is fully committed in the storage engine. The reason is that crash + recovery only looks at the latest binlog, so we must make sure there are no + outstanding prepared (but not committed) transactions before rotating the + binlog. + + To handle this, we keep a count of outstanding XIDs. This function is used + to increase this count when committing one or more transactions to the + binary log. +*/ +void +TC_LOG_BINLOG::mark_xids_active(uint xid_count) { - DBUG_ENTER("TC_LOG_BINLOG::unlog"); + DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); + DBUG_PRINT("info", ("xid_count=%u", xid_count)); + mysql_mutex_lock(&LOCK_prep_xids); + prepared_xids+= xid_count; + mysql_mutex_unlock(&LOCK_prep_xids); + DBUG_VOID_RETURN; +} + +/* + Once an XID is committed, it is safe to rotate the binary log, as it can no + longer be needed during crash recovery. + + This function is called to mark an XID this way. It needs to decrease the + count of pending XIDs, and signal the log rotator thread when it reaches zero. +*/ +void +TC_LOG_BINLOG::mark_xid_done() +{ + my_bool send_signal; + + DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); mysql_mutex_lock(&LOCK_prep_xids); // prepared_xids can be 0 if the transaction had ignorable errors. DBUG_ASSERT(prepared_xids >= 0); if (prepared_xids > 0) prepared_xids--; - if (prepared_xids == 0) { + send_signal= (prepared_xids == 0); + mysql_mutex_unlock(&LOCK_prep_xids); + if (send_signal) { DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); mysql_cond_signal(&COND_prep_xids); } - mysql_mutex_unlock(&LOCK_prep_xids); - DBUG_RETURN(rotate_and_purge(0)); // as ::write() did not rotate + DBUG_VOID_RETURN; +} + +int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +{ + DBUG_ENTER("TC_LOG_BINLOG::unlog"); + if (xid) + mark_xid_done(); + DBUG_RETURN(rotate_and_purge(0)); } int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) @@ -6796,9 +7123,67 @@ { return (ulonglong) mysql_bin_log.get_log_file()->pos_in_file; } +/* + Get the current position of the MySQL binlog for transaction currently being + committed. + + This is valid to call from within storage engine commit_ordered() and + commit() methods only. + + Since it stores the position inside THD, it is safe to call without any + locking. + + Note that currently the binlog file name is not stored inside THD, but this + is still safe as it can only change when the log is rotated, and we never + rotate the binlog while commits are pending inside storage engines. +*/ +extern "C" +void mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file) +{ + binlog_cache_mngr *cache_mngr; + if (binlog_hton->state == SHOW_OPTION_YES + && (cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton))) + { + *out_pos= cache_mngr->trx_cache.commit_bin_log_file_pos; + *out_file= mysql_bin_log.get_log_fname(); + } + else + { + *out_pos= 0ULL; + *out_file= NULL; + } +} #endif /* INNODB_COMPATIBILITY_HOOKS */ +static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff) +{ + mysql_bin_log.set_status_variables(); + var->type= SHOW_ARRAY; + var->value= (char *)&binlog_status_vars_detail; + return 0; +} + +static SHOW_VAR binlog_status_vars_top[]= { + {"binlog", (char *) &show_binlog_vars, SHOW_FUNC}, + {NullS, NullS, SHOW_LONG} +}; + +/* + Copy out current values of status variables, for SHOW STATUS or + information_schema.global_status. + + This is called only under LOCK_status, so we can fill in a static array. +*/ +void +TC_LOG_BINLOG::set_status_variables() +{ + mysql_mutex_lock(&LOCK_commit_ordered); + binlog_status_var_num_commits= this->num_commits; + binlog_status_var_num_group_commits= this->num_group_commits; + mysql_mutex_unlock(&LOCK_commit_ordered); +} + struct st_mysql_storage_engine binlog_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -6813,7 +7198,7 @@ binlog_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, - NULL, /* status variables */ + binlog_status_vars_top, /* status variables */ NULL, /* system variables */ NULL, /* config options */ 0, /* flags */ --- a/sql/log.h +++ b/sql/log.h @@ -44,17 +44,42 @@ virtual int open(const char *opt_name)=0; virtual void close()=0; - virtual int log_xid(THD *thd, my_xid xid)=0; + virtual int log_and_order(THD *thd, my_xid xid, bool all, + bool need_commit_ordered)=0; virtual int unlog(ulong cookie, my_xid xid)=0; + + protected: + void run_commit_ordered(THD *thd, bool all); }; +/* + Locks used to ensure serialised execution of + TC_LOG::run_commit_ordered(), or any other code that calls handler + commit_ordered() methods. +*/ +extern mysql_mutex_t LOCK_group_commit_queue; +extern mysql_mutex_t LOCK_commit_ordered; + +extern void TC_init(); +extern void TC_destroy(); + class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging { public: TC_LOG_DUMMY() {} int open(const char *opt_name) { return 0; } void close() { } - int log_xid(THD *thd, my_xid xid) { return 1; } + /* + TC_LOG_DUMMY is only used when there are <= 1 XA-capable engines, and we + only use internal XA during commit when >= 2 XA-capable engines + participate. + */ + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_commit_ordered) + { + DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */); + return 1; + } int unlog(ulong cookie, my_xid xid) { return 0; } }; @@ -80,6 +105,13 @@ mysql_cond_t cond; // to wait for a sync } PAGE; + /* List of THDs for which to invoke commit_ordered(), in order. */ + struct commit_entry + { + struct commit_entry *next; + THD *thd; + }; + char logname[FN_REFLEN]; File fd; my_off_t file_length; @@ -94,16 +126,38 @@ */ mysql_mutex_t LOCK_active, LOCK_pool, LOCK_sync; mysql_cond_t COND_pool, COND_active; + /* + Queue of threads that need to call commit_ordered(). + Access to this queue must be protected by LOCK_group_commit_queue + */ + commit_entry *commit_ordered_queue; + /* + This flag and condition is used to reserve the queue while threads in it + each run the commit_ordered() methods one after the other. Only once the + last commit_ordered() in the queue is done can we start on a new queue + run. + + Since we start this process in the first thread in the queue and finish in + the last (and possibly different) thread, we need a condition variable for + this (we cannot unlock a mutex in a different thread than the one who + locked it). + + The condition is used together with the LOCK_group_commit_queue mutex. + */ + my_bool commit_ordered_queue_busy; + mysql_cond_t COND_queue_busy; public: TC_LOG_MMAP(): inited(0) {} int open(const char *opt_name); void close(); - int log_xid(THD *thd, my_xid xid); + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_commit_ordered); int unlog(ulong cookie, my_xid xid); int recover(); private: + int log_one_transaction(my_xid xid); void get_active_from_pool(); int sync(); int overflow(); @@ -271,9 +325,31 @@ time_t last_time; }; +class binlog_cache_data; class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG { private: + struct group_commit_entry + { + struct group_commit_entry *next; + THD *thd; + binlog_cache_data *cache_data; + /* + Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be + written during group commit. The incident_event is only valid if + trx_data->has_incident() is true. + */ + Log_event *begin_event; + Log_event *end_event; + Log_event *incident_event; + /* Set during group commit to record any per-thread error. */ + int error; + int commit_errno; + /* This is the `all' parameter for ha_commit_ordered(). */ + bool all; + /* True if we come in through XA log_and_order(), false otherwise. */ + }; + #ifdef HAVE_PSI_INTERFACE /** The instrumentation key to use for @ LOCK_index. */ PSI_mutex_key m_key_LOCK_index; @@ -325,6 +401,12 @@ In 5.0 it's 0 for relay logs too! */ bool no_auto_events; + /* Queue of transactions queued up to participate in group commit. */ + group_commit_entry *group_commit_queue; + /* Total number of committed transactions. */ + ulonglong num_commits; + /* Number of group commits done. */ + ulonglong num_group_commits; /* pointer to the sync period variable, for binlog this will be sync_binlog_period, for relay log this will be @@ -346,6 +428,11 @@ */ int new_file_without_locking(); int new_file_impl(bool need_lock); + int write_transaction(group_commit_entry *entry); + bool write_transaction_to_binlog_events(group_commit_entry *entry); + void trx_group_commit_leader(group_commit_entry *leader); + void mark_xid_done(); + void mark_xids_active(uint xid_count); public: MYSQL_LOG::generate_name; @@ -387,7 +474,8 @@ int open(const char *opt_name); void close(); - int log_xid(THD *thd, my_xid xid); + int log_and_order(THD *thd, my_xid xid, bool all, + bool need_commit_ordered); int unlog(ulong cookie, my_xid xid); int recover(IO_CACHE *log, Format_description_log_event *fdle); #if !defined(MYSQL_CLIENT) @@ -434,11 +522,11 @@ int new_file(); bool write(Log_event* event_info); // binary log write - bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident); + bool write_transaction_to_binlog(THD *thd, binlog_cache_data *cache_data, + Log_event *end_ev, bool all); bool write_incident(THD *thd, bool lock); - int write_cache(THD *thd, IO_CACHE *cache, - bool lock_log, bool flush_and_sync); + int write_cache(THD *thd, IO_CACHE *cache); void set_write_error(THD *thd, bool is_transactional); bool check_write_error(THD *thd); @@ -509,6 +597,7 @@ inline void unlock_index() { mysql_mutex_unlock(&LOCK_index);} inline IO_CACHE *get_index_file() { return &index_file;} inline uint32 get_open_count() { return open_count; } + void set_status_variables(); }; class Log_event_handler --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -1495,6 +1495,7 @@ ha_end(); if (tc_log) tc_log->close(); + TC_destroy(); delegates_destroy(); xid_cache_free(); table_def_free(); @@ -3911,6 +3912,8 @@ query_response_time_init(); #endif // HAVE_RESPONSE_TIME_DISTRIBUTION /* We have to initialize the storage engines before CSV logging */ + TC_init(); + init_global_table_stats(); init_global_index_stats(); @@ -7872,6 +7875,7 @@ key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; PSI_mutex_key key_RELAYLOG_LOCK_index; +PSI_mutex_key key_LOCK_wakeup_ready, key_LOCK_group_commit_queue, key_LOCK_commit_ordered; static PSI_mutex_info all_server_mutexes[]= { @@ -7892,6 +7896,7 @@ { &key_delayed_insert_mutex, "Delayed_insert::mutex", 0}, { &key_hash_filo_lock, "hash_filo::lock", 0}, { &key_LOCK_active_mi, "LOCK_active_mi", PSI_FLAG_GLOBAL}, + { &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL}, { &key_LOCK_connection_count, "LOCK_connection_count", PSI_FLAG_GLOBAL}, { &key_LOCK_crypt, "LOCK_crypt", PSI_FLAG_GLOBAL}, { &key_LOCK_delayed_create, "LOCK_delayed_create", PSI_FLAG_GLOBAL}, @@ -7907,6 +7912,7 @@ "LOCK_global_index_stats", PSI_FLAG_GLOBAL}, { &key_LOCK_gdl, "LOCK_gdl", PSI_FLAG_GLOBAL}, { &key_LOCK_global_system_variables, "LOCK_global_system_variables", PSI_FLAG_GLOBAL}, + { &key_LOCK_group_commit_queue, "LOCK_group_commit_queue", PSI_FLAG_GLOBAL}, { &key_LOCK_manager, "LOCK_manager", PSI_FLAG_GLOBAL}, { &key_LOCK_prepared_stmt_count, "LOCK_prepared_stmt_count", PSI_FLAG_GLOBAL}, { &key_LOCK_rpl_status, "LOCK_rpl_status", PSI_FLAG_GLOBAL}, @@ -7918,6 +7924,7 @@ { &key_LOCK_temporary_tables, "THD::LOCK_temporary_tables", 0}, { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL}, { &key_LOCK_uuid_generator, "LOCK_uuid_generator", PSI_FLAG_GLOBAL}, + { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0}, { &key_LOG_LOCK_log, "LOG::LOCK_log", 0}, { &key_master_info_data_lock, "Master_info::data_lock", 0}, { &key_master_info_run_lock, "Master_info::run_lock", 0}, @@ -7965,6 +7972,7 @@ key_TABLE_SHARE_cond, key_user_level_lock_cond, key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache; PSI_cond_key key_RELAYLOG_update_cond; +PSI_cond_key key_COND_wakeup_ready, key_COND_queue_busy; static PSI_cond_info all_server_conds[]= { @@ -7981,8 +7989,10 @@ { &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0}, { &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0}, { &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL}, + { &key_COND_queue_busy, "COND_queue_busy", PSI_FLAG_GLOBAL}, { &key_COND_rpl_status, "COND_rpl_status", PSI_FLAG_GLOBAL}, { &key_COND_server_started, "COND_server_started", PSI_FLAG_GLOBAL}, + { &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0}, { &key_delayed_insert_cond, "Delayed_insert::cond", 0}, { &key_delayed_insert_cond_client, "Delayed_insert::cond_client", 0}, { &key_item_func_sleep_cond, "Item_func_sleep::cond", 0}, --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -274,6 +274,7 @@ key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; extern PSI_mutex_key key_RELAYLOG_LOCK_index; +extern PSI_mutex_key key_LOCK_wakeup_ready, key_LOCK_group_commit_queue, key_LOCK_commit_ordered; extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, @@ -294,6 +295,7 @@ key_TABLE_SHARE_cond, key_user_level_lock_cond, key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache; extern PSI_cond_key key_RELAYLOG_update_cond; +extern PSI_cond_key key_COND_wakeup_ready, key_COND_queue_busy; extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, key_thread_handle_manager, key_thread_kill_server, key_thread_main, --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1005,6 +1005,8 @@ mysql_mutex_init(key_LOCK_thd_data, &LOCK_thd_data, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_temporary_tables, &LOCK_temporary_tables, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wakeup_ready, &LOCK_wakeup_ready, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wakeup_ready, &COND_wakeup_ready, NULL); /* Variables with default values */ proc_info="login"; @@ -1609,6 +1611,8 @@ my_free(db); db= NULL; free_root(&transaction.mem_root,MYF(0)); + mysql_cond_destroy(&COND_wakeup_ready); + mysql_mutex_destroy(&LOCK_wakeup_ready); mysql_mutex_destroy(&LOCK_thd_data); mysql_mutex_destroy(&LOCK_temporary_tables); #ifndef DBUG_OFF @@ -5297,6 +5301,24 @@ DBUG_RETURN(0); } +void +THD::wait_for_wakeup_ready() +{ + mysql_mutex_lock(&LOCK_wakeup_ready); + while (!wakeup_ready) + mysql_cond_wait(&COND_wakeup_ready, &LOCK_wakeup_ready); + mysql_mutex_unlock(&LOCK_wakeup_ready); +} + +void +THD::signal_wakeup_ready() +{ + mysql_mutex_lock(&LOCK_wakeup_ready); + wakeup_ready= true; + mysql_mutex_unlock(&LOCK_wakeup_ready); + mysql_cond_signal(&COND_wakeup_ready); +} + bool Discrete_intervals_list::append(ulonglong start, ulonglong val, ulonglong incr) { --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -3078,6 +3078,14 @@ LEX_STRING get_invoker_user() { return invoker_user; } LEX_STRING get_invoker_host() { return invoker_host; } bool has_invoker() { return invoker_user.length > 0; } + void clear_wakeup_ready() { wakeup_ready= false; } + /* + Sleep waiting for others to wake us up with signal_wakeup_ready(). + Must call clear_wakeup_ready() before waiting. + */ + void wait_for_wakeup_ready(); + /* Wake this thread up from wait_for_wakeup_ready(). */ + void signal_wakeup_ready(); private: /** The current internal error handler for this thread, or NULL. */ @@ -3120,6 +3128,16 @@ */ LEX_STRING invoker_user; LEX_STRING invoker_host; + /* + Flag, mutex and condition for a thread to wait for a signal from another + thread. + + Currently used to wait for group commit to complete, can also be used for + other purposes. + */ + bool wakeup_ready; + mysql_mutex_t LOCK_wakeup_ready; + mysql_cond_t COND_wakeup_ready; }; /* Returns string as 'IP' for the client-side of the connection represented by --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -889,6 +889,10 @@ DBUG_ENTER("dispatch_command"); DBUG_PRINT("info",("packet: '%*.s'; command: %d", packet_length, packet, command)); + DBUG_EXECUTE_IF("crash_dispatch_command_before", + { DBUG_PRINT("crash_dispatch_command_before", ("now")); + DBUG_ABORT(); }); + #if defined(ENABLED_PROFILING) thd->profiling.start_new_query(); #endif --- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result +++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result @@ -11,9 +11,9 @@ wait/synch/mutex/sql/HA_DATA_PARTITION::LOCK_auto_inc YES YES wait/synch/mutex/sql/LOCK_active_mi YES YES wait/synch/mutex/sql/LOCK_audit_mask YES YES +wait/synch/mutex/sql/LOCK_commit_ordered YES YES wait/synch/mutex/sql/LOCK_connection_count YES YES wait/synch/mutex/sql/LOCK_crypt YES YES -wait/synch/mutex/sql/LOCK_delayed_create YES YES select * from performance_schema.setup_instruments where name like 'Wait/Synch/Rwlock/sql/%' and name not in ('wait/synch/rwlock/sql/CRYPTO_dynlock_value::lock') @@ -38,6 +38,7 @@ NAME ENABLED TIMED wait/synch/cond/sql/COND_flush_thread_cache YES YES wait/synch/cond/sql/COND_manager YES YES +wait/synch/cond/sql/COND_queue_busy YES YES wait/synch/cond/sql/COND_queue_state YES YES wait/synch/cond/sql/COND_rpl_status YES YES wait/synch/cond/sql/COND_server_started YES YES @@ -45,7 +46,6 @@ wait/synch/cond/sql/COND_thread_count YES YES wait/synch/cond/sql/Delayed_insert::cond YES YES wait/synch/cond/sql/Delayed_insert::cond_client YES YES -wait/synch/cond/sql/Event_scheduler::COND_state YES YES select * from performance_schema.setup_instruments where name='Wait'; select * from performance_schema.setup_instruments --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -375,6 +375,9 @@ static INNOBASE_SHARE *get_share(const char *table_name); static void free_share(INNOBASE_SHARE *share); static int innobase_close_connection(handlerton *hton, THD* thd); +#ifdef EXTENDED_FOR_COMMIT_ORDERED +static void innobase_commit_ordered(handlerton *hton, THD* thd, bool all); +#endif static int innobase_commit(handlerton *hton, THD* thd, bool all); static int innobase_rollback(handlerton *hton, THD* thd, bool all); static int innobase_rollback_to_savepoint(handlerton *hton, THD* thd, @@ -1699,7 +1702,10 @@ trx_t* trx) /*!< in/out: InnoDB transaction handle */ { DBUG_ENTER("innobase_trx_init"); +#ifndef EXTENDED_FOR_COMMIT_ORDERED + /* used by innobase_commit_ordered */ DBUG_ASSERT(EQ_CURRENT_THD(thd)); +#endif DBUG_ASSERT(thd == trx->mysql_thd); trx->check_foreigns = !thd_test_options( @@ -1760,7 +1766,10 @@ { trx_t*& trx = thd_to_trx(thd); +#ifndef EXTENDED_FOR_COMMIT_ORDERED + /* used by innobase_commit_ordered */ ut_ad(EQ_CURRENT_THD(thd)); +#endif if (trx == NULL) { trx = innobase_trx_allocate(thd); @@ -1846,6 +1855,7 @@ { trx->is_registered = 0; trx->owns_prepare_mutex = 0; + trx->called_commit_ordered = 0; } /*********************************************************************//** @@ -1861,6 +1871,29 @@ } /*********************************************************************//** +*/ +static inline +void +trx_called_commit_ordered_set( +/*==========================*/ + trx_t* trx) +{ + ut_a(trx_is_registered_for_2pc(trx)); + trx->called_commit_ordered = 1; +} + +/*********************************************************************//** +*/ +static inline +bool +trx_called_commit_ordered( +/*======================*/ + const trx_t* trx) +{ + return(trx->called_commit_ordered == 1); +} + +/*********************************************************************//** Check if transaction is started. @reutrn true if transaction is in state started */ static @@ -2435,6 +2468,9 @@ innobase_hton->savepoint_set=innobase_savepoint; innobase_hton->savepoint_rollback=innobase_rollback_to_savepoint; innobase_hton->savepoint_release=innobase_release_savepoint; +#ifdef EXTENDED_FOR_COMMIT_ORDERED + innobase_hton->commit_ordered=innobase_commit_ordered; +#endif innobase_hton->commit=innobase_commit; innobase_hton->rollback=innobase_rollback; innobase_hton->prepare=innobase_xa_prepare; @@ -3187,6 +3223,126 @@ DBUG_RETURN(0); } +#ifdef EXTENDED_FOR_COMMIT_ORDERED +/* MEMO: + InnoDB is coded with intention that always trx is accessed by the owner thd. + (not protected by any mutex/lock) + So, the caller of innobase_commit_ordered() should be conscious of + cache coherency between multi CPU about the trx, if called from another thd. + + MariaDB's first implementation about it seems the cherency is protected by + the pthread_mutex LOCK_wakeup_ready. So, no problem for now. + + But we should be aware the importance of the coherency. + */ +/*****************************************************************//** +low function function innobase_commit_ordered().*/ +static +void +innobase_commit_ordered_low( +/*========================*/ + trx_t* trx, /*!< in: Innodb transaction */ + THD* thd) /*!< in: MySQL thread handle */ +{ + ulonglong tmp_pos; + DBUG_ENTER("innobase_commit_ordered"); + + /* This part was from innobase_commit() */ + + /* We need current binlog position for ibbackup to work. + Note, the position is current because commit_ordered is guaranteed + to be called in same sequenece as writing to binlog. */ +retry: + if (innobase_commit_concurrency > 0) { + mysql_mutex_lock(&commit_cond_m); + commit_threads++; + + if (commit_threads > innobase_commit_concurrency) { + commit_threads--; + mysql_cond_wait(&commit_cond, + &commit_cond_m); + mysql_mutex_unlock(&commit_cond_m); + goto retry; + } + else { + mysql_mutex_unlock(&commit_cond_m); + } + } + + mysql_bin_log_commit_pos(thd, &tmp_pos, &(trx->mysql_log_file_name)); + trx->mysql_log_offset = (ib_int64_t) tmp_pos; + + /* Don't do write + flush right now. For group commit + to work we want to do the flush in the innobase_commit() + method, which runs without holding any locks. */ + trx->flush_log_later = TRUE; + innobase_commit_low(trx); + trx->flush_log_later = FALSE; + + if (innobase_commit_concurrency > 0) { + mysql_mutex_lock(&commit_cond_m); + commit_threads--; + mysql_cond_signal(&commit_cond); + mysql_mutex_unlock(&commit_cond_m); + } + + DBUG_VOID_RETURN; +} + +/*****************************************************************//** +Perform the first, fast part of InnoDB commit. + +Doing it in this call ensures that we get the same commit order here +as in binlog and any other participating transactional storage engines. + +Note that we want to do as little as really needed here, as we run +under a global mutex. The expensive fsync() is done later, in +innobase_commit(), without a lock so group commit can take place. + +Note also that this method can be called from a different thread than +the one handling the rest of the transaction. */ +static +void +innobase_commit_ordered( +/*====================*/ + handlerton *hton, /*!< in: Innodb handlerton */ + THD* thd, /*!< in: MySQL thread handle of the user for whom + the transaction should be committed */ + bool all) /*!< in: TRUE - commit transaction + FALSE - the current SQL statement ended */ +{ + trx_t* trx; + DBUG_ENTER("innobase_commit_ordered"); + DBUG_ASSERT(hton == innodb_hton_ptr); + + trx = check_trx_exists(thd); + + /* Since we will reserve the kernel mutex, we have to release + the search system latch first to obey the latching order. */ + + if (trx->has_search_latch) { + trx_search_latch_release_if_reserved(trx); + } + + if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx)) { + /* We cannot throw error here; instead we will catch this error + again in innobase_commit() and report it from there. */ + DBUG_VOID_RETURN; + } + + /* commit_ordered is only called when committing the whole transaction + (or an SQL statement when autocommit is on). */ + DBUG_ASSERT(all || + (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); + + innobase_commit_ordered_low(trx, thd); + + trx_called_commit_ordered_set(trx); + + DBUG_VOID_RETURN; +} +#endif /* EXTENDED_FOR_COMMIT_ORDERED */ + /*****************************************************************//** Commits a transaction in an InnoDB database or marks an SQL statement ended. @@ -3238,6 +3394,16 @@ /* We were instructed to commit the whole transaction, or this is an SQL statement end and autocommit is on */ +#ifdef EXTENDED_FOR_COMMIT_ORDERED + ut_ad(!trx_has_prepare_commit_mutex(trx)); + + /* Run the fast part of commit if we did not already. */ + if (!trx_called_commit_ordered(trx)) { + innobase_commit_ordered_low(trx, thd); + } +#else + ut_ad(!trx_called_commit_ordered(trx)); + /* We need current binlog position for ibbackup to work. Note, the position is current because of prepare_commit_mutex */ @@ -3292,6 +3458,7 @@ mysql_mutex_unlock(&prepare_commit_mutex); } +#endif /* EXTENDED_FOR_COMMIT_ORDERED */ trx_deregister_from_2pc(trx); @@ -10981,6 +11148,7 @@ srv_active_wake_master_thread(); +#ifndef EXTENDED_FOR_COMMIT_ORDERED if (thd_sql_command(thd) != SQLCOM_XA_PREPARE && (all || !thd_test_options( @@ -11007,6 +11175,7 @@ mysql_mutex_lock(&prepare_commit_mutex); trx_owns_prepare_commit_mutex_set(trx); } +#endif /* ifndef EXTENDED_FOR_COMMIT_ORDERED */ return(error); } --- a/storage/innobase/handler/ha_innodb.h +++ b/storage/innobase/handler/ha_innodb.h @@ -240,6 +240,12 @@ struct charset_info_st *thd_charset(MYSQL_THD thd); LEX_STRING *thd_query_string(MYSQL_THD thd); +#ifdef EXTENDED_FOR_COMMIT_ORDERED +/** Get the file name and position of the MySQL binlog corresponding to the + * current commit. + */ +void mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file); +#else /** Get the file name of the MySQL binlog. * @return the name of the binlog file */ @@ -249,6 +255,7 @@ * @return byte offset from the beginning of the binlog */ ulonglong mysql_bin_log_file_pos(void); +#endif /** Check if a user thread is a replication slave thread --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -494,6 +494,7 @@ this is set to 1 then registered should also be set to 1. This is used in the XA code */ + unsigned called_commit_ordered:1;/* 1 if innobase_commit_ordered has run. */ /*------------------------------*/ ulint isolation_level;/* TRX_ISO_REPEATABLE_READ, ... */ ulint check_foreigns; /* normally TRUE, but if the user --- a/storage/innobase/trx/trx0trx.c +++ b/storage/innobase/trx/trx0trx.c @@ -111,6 +111,7 @@ trx->is_registered = 0; trx->owns_prepare_mutex = 0; + trx->called_commit_ordered = 0; trx->start_time = ut_time(); --- /dev/null +++ b/mysql-test/r/group_commit.result @@ -0,0 +1,63 @@ +CREATE TABLE t1 (a VARCHAR(10) PRIMARY KEY) ENGINE=innodb; +SELECT variable_value INTO @commits FROM information_schema.global_status +WHERE variable_name = 'binlog_commits'; +SELECT variable_value INTO @group_commits FROM information_schema.global_status +WHERE variable_name = 'binlog_group_commits'; +SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued"; +INSERT INTO t1 VALUES ("con1"); +set DEBUG_SYNC= "now WAIT_FOR group1_running"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL group2_con2"; +SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed"; +SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked"; +INSERT INTO t1 VALUES ("con2"); +SET DEBUG_SYNC= "now WAIT_FOR group2_con2"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL group2_con3"; +INSERT INTO t1 VALUES ("con3"); +SET DEBUG_SYNC= "now WAIT_FOR group2_con3"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL group2_con4"; +INSERT INTO t1 VALUES ("con4"); +SET DEBUG_SYNC= "now WAIT_FOR group2_con4"; +SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED; +SELECT * FROM t1 ORDER BY a; +a +SET DEBUG_SYNC= "now SIGNAL group2_queued"; +SELECT * FROM t1 ORDER BY a; +a +con1 +SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5"; +SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued"; +INSERT INTO t1 VALUES ("con5"); +SET DEBUG_SYNC= "now WAIT_FOR con5_leader"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL con6_queued"; +INSERT INTO t1 VALUES ("con6"); +SET DEBUG_SYNC= "now WAIT_FOR group3_con5"; +SELECT * FROM t1 ORDER BY a; +a +con1 +SET DEBUG_SYNC= "now SIGNAL group3_committed"; +SET DEBUG_SYNC= "now WAIT_FOR group2_visible"; +SELECT * FROM t1 ORDER BY a; +a +con1 +con2 +con3 +con4 +SET DEBUG_SYNC= "now SIGNAL group2_checked"; +SELECT * FROM t1 ORDER BY a; +a +con1 +con2 +con3 +con4 +con5 +con6 +SELECT variable_value - @commits FROM information_schema.global_status +WHERE variable_name = 'binlog_commits'; +variable_value - @commits +6 +SELECT variable_value - @group_commits FROM information_schema.global_status +WHERE variable_name = 'binlog_group_commits'; +variable_value - @group_commits +3 +SET DEBUG_SYNC= 'RESET'; +DROP TABLE t1; --- /dev/null +++ b/mysql-test/r/group_commit_binlog_pos.result @@ -0,0 +1,35 @@ +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; +INSERT INTO t1 VALUES (0); +SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con1_waiting WAIT_FOR con3_queued"; +SET DEBUG_SYNC= "commit_loop_entry_commit_ordered SIGNAL con1_loop WAIT_FOR con1_loop_cont EXECUTE 3"; +INSERT INTO t1 VALUES (1); +SET DEBUG_SYNC= "now WAIT_FOR con1_waiting"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL con2_queued"; +INSERT INTO t1 VALUES (2); +SET DEBUG_SYNC= "now WAIT_FOR con2_queued"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL con3_queued"; +INSERT INTO t1 VALUES (3); +SET DEBUG_SYNC= "now WAIT_FOR con1_loop"; +SET DEBUG_SYNC= "now SIGNAL con1_loop_cont"; +SET DEBUG_SYNC= "now WAIT_FOR con1_loop"; +SET DEBUG_SYNC= "now SIGNAL con1_loop_cont"; +SET DEBUG_SYNC= "now WAIT_FOR con1_loop"; +SELECT * FROM t1 ORDER BY a; +a +0 +1 +2 +SET SESSION debug="+d,crash_dispatch_command_before"; +SELECT 1; +Got one of the listed errors +Got one of the listed errors +Got one of the listed errors +SELECT * FROM t1 ORDER BY a; +a +0 +1 +2 +3 +InnoDB: Last MySQL binlog file position 0 768, file name ./master-bin.000001 +SET DEBUG_SYNC= 'RESET'; +DROP TABLE t1; --- /dev/null +++ b/mysql-test/r/group_commit_crash.result @@ -0,0 +1,120 @@ +CREATE TABLE t1(a CHAR(255), +b CHAR(255), +c CHAR(255), +d CHAR(255), +id INT AUTO_INCREMENT, +PRIMARY KEY(id)) ENGINE=InnoDB; +create table t2 like t1; +create procedure setcrash(IN i INT) +begin +CASE i +WHEN 1 THEN SET SESSION debug="d,crash_commit_after_prepare"; +WHEN 2 THEN SET SESSION debug="d,crash_commit_after_log"; +WHEN 3 THEN SET SESSION debug="d,crash_commit_before_unlog"; +WHEN 4 THEN SET SESSION debug="d,crash_commit_after"; +WHEN 5 THEN SET SESSION debug="d,crash_commit_before"; +ELSE BEGIN END; +END CASE; +end // +FLUSH TABLES; +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +RESET MASTER; +START TRANSACTION; +insert into t1 select * from t2; +call setcrash(5); +COMMIT; +Got one of the listed errors +SELECT * FROM t1 ORDER BY id; +a b c d id +SHOW BINLOG EVENTS LIMIT 2,1; +Log_name Pos Event_type Server_id End_log_pos Info +delete from t1; +RESET MASTER; +START TRANSACTION; +insert into t1 select * from t2; +call setcrash(4); +COMMIT; +Got one of the listed errors +SELECT * FROM t1 ORDER BY id; +a b c d id +a b c d 1 +a b c d 2 +a b c d 3 +a b c d 4 +a b c d 5 +a b c d 6 +a b c d 7 +a b c d 8 +a b c d 9 +a b c d 10 +SHOW BINLOG EVENTS LIMIT 2,1; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 175 Query 1 269 use `test`; insert into t1 select * from t2 +delete from t1; +RESET MASTER; +START TRANSACTION; +insert into t1 select * from t2; +call setcrash(3); +COMMIT; +Got one of the listed errors +SELECT * FROM t1 ORDER BY id; +a b c d id +a b c d 1 +a b c d 2 +a b c d 3 +a b c d 4 +a b c d 5 +a b c d 6 +a b c d 7 +a b c d 8 +a b c d 9 +a b c d 10 +SHOW BINLOG EVENTS LIMIT 2,1; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 175 Query 1 269 use `test`; insert into t1 select * from t2 +delete from t1; +RESET MASTER; +START TRANSACTION; +insert into t1 select * from t2; +call setcrash(2); +COMMIT; +Got one of the listed errors +SELECT * FROM t1 ORDER BY id; +a b c d id +a b c d 1 +a b c d 2 +a b c d 3 +a b c d 4 +a b c d 5 +a b c d 6 +a b c d 7 +a b c d 8 +a b c d 9 +a b c d 10 +SHOW BINLOG EVENTS LIMIT 2,1; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 175 Query 1 269 use `test`; insert into t1 select * from t2 +delete from t1; +RESET MASTER; +START TRANSACTION; +insert into t1 select * from t2; +call setcrash(1); +COMMIT; +Got one of the listed errors +SELECT * FROM t1 ORDER BY id; +a b c d id +SHOW BINLOG EVENTS LIMIT 2,1; +Log_name Pos Event_type Server_id End_log_pos Info +delete from t1; +DROP TABLE t1; +DROP TABLE t2; +DROP PROCEDURE setcrash; --- /dev/null +++ b/mysql-test/r/xa_binlog.result @@ -0,0 +1,32 @@ +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +SET binlog_format= mixed; +RESET MASTER; +XA START 'xatest'; +INSERT INTO t1 VALUES (1); +XA END 'xatest'; +XA PREPARE 'xatest'; +XA COMMIT 'xatest'; +XA START 'xatest'; +INSERT INTO t1 VALUES (2); +XA END 'xatest'; +XA COMMIT 'xatest' ONE PHASE; +BEGIN; +INSERT INTO t1 VALUES (3); +COMMIT; +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +SHOW BINLOG EVENTS LIMIT 1,9; +Log_name Pos Event_type Server_id End_log_pos Info +master-bin.000001 # Query 1 # BEGIN +master-bin.000001 # Query 1 # use `test`; INSERT INTO t1 VALUES (1) +master-bin.000001 # Query 1 # COMMIT +master-bin.000001 # Query 1 # BEGIN +master-bin.000001 # Query 1 # use `test`; INSERT INTO t1 VALUES (2) +master-bin.000001 # Xid 1 # COMMIT /* xid=XX */ +master-bin.000001 # Query 1 # BEGIN +master-bin.000001 # Query 1 # use `test`; INSERT INTO t1 VALUES (3) +master-bin.000001 # Xid 1 # COMMIT /* xid=XX */ +DROP TABLE t1; --- /dev/null +++ b/mysql-test/suite/binlog/r/binlog_ioerr.result @@ -0,0 +1,28 @@ +CALL mtr.add_suppression("Error writing file 'master-bin'"); +RESET MASTER; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; +INSERT INTO t1 VALUES(0); +SET SESSION debug='+d,fail_binlog_write_1'; +INSERT INTO t1 VALUES(1); +ERROR HY000: Error writing file 'master-bin' (errno: 28) +INSERT INTO t1 VALUES(2); +ERROR HY000: Error writing file 'master-bin' (errno: 28) +SET SESSION debug=''; +INSERT INTO t1 VALUES(3); +SELECT * FROM t1; +a +0 +3 +SHOW BINLOG EVENTS; +Log_name Pos Event_type Server_id End_log_pos Info +BINLOG POS Format_desc 1 ENDPOS Server ver: #, Binlog ver: # +BINLOG POS Query 1 ENDPOS use `test`; CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb +BINLOG POS Query 1 ENDPOS BEGIN +BINLOG POS Query 1 ENDPOS use `test`; INSERT INTO t1 VALUES(0) +BINLOG POS Xid 1 ENDPOS COMMIT /* XID */ +BINLOG POS Query 1 ENDPOS BEGIN +BINLOG POS Query 1 ENDPOS BEGIN +BINLOG POS Query 1 ENDPOS BEGIN +BINLOG POS Query 1 ENDPOS use `test`; INSERT INTO t1 VALUES(3) +BINLOG POS Xid 1 ENDPOS COMMIT /* XID */ +DROP TABLE t1; --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_ioerr.test @@ -0,0 +1,30 @@ +source include/have_debug.inc; +source include/have_innodb.inc; +source include/have_log_bin.inc; +source include/have_binlog_format_mixed_or_statement.inc; + +CALL mtr.add_suppression("Error writing file 'master-bin'"); + +RESET MASTER; + +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; +INSERT INTO t1 VALUES(0); +SET SESSION debug='+d,fail_binlog_write_1'; +--error ER_ERROR_ON_WRITE +INSERT INTO t1 VALUES(1); +--error ER_ERROR_ON_WRITE +INSERT INTO t1 VALUES(2); +SET SESSION debug=''; +INSERT INTO t1 VALUES(3); +SELECT * FROM t1; + +# Actually the output from this currently shows a bug. +# The injected IO error leaves partially written transactions in the binlog in +# the form of stray "BEGIN" events. +# These should disappear from the output if binlog error handling is improved +# (see MySQL Bug#37148 and WL#1790). +--replace_regex /\/\* xid=.* \*\//\/* XID *\// /Server ver: .*, Binlog ver: .*/Server ver: #, Binlog ver: #/ /table_id: [0-9]+/table_id: #/ +--replace_column 1 BINLOG 2 POS 5 ENDPOS +SHOW BINLOG EVENTS; + +DROP TABLE t1; --- /dev/null +++ b/mysql-test/t/group_commit.test @@ -0,0 +1,115 @@ +--source include/have_debug_sync.inc +--source include/have_innodb.inc +--source include/have_log_bin.inc + +# Test some group commit code paths by using debug_sync to do controlled +# commits of 6 transactions: first 1 alone, then 3 as a group, then 2 as a +# group. +# +# Group 3 is allowed to race as far as possible ahead before group 2 finishes +# to check some edge case for concurrency control. + +CREATE TABLE t1 (a VARCHAR(10) PRIMARY KEY) ENGINE=innodb; + +SELECT variable_value INTO @commits FROM information_schema.global_status + WHERE variable_name = 'binlog_commits'; +SELECT variable_value INTO @group_commits FROM information_schema.global_status + WHERE variable_name = 'binlog_group_commits'; + +connect(con1,localhost,root,,); +connect(con2,localhost,root,,); +connect(con3,localhost,root,,); +connect(con4,localhost,root,,); +connect(con5,localhost,root,,); +connect(con6,localhost,root,,); + +# Start group1 (with one thread) doing commit, waiting for +# group2 to queue up before finishing. + +connection con1; +SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group1_running WAIT_FOR group2_queued"; +send INSERT INTO t1 VALUES ("con1"); + +# Make group2 (with three threads) queue up. +# Make sure con2 is the group commit leader for group2. +# Make group2 wait with running commit_ordered() until group3 has committed. + +connection con2; +set DEBUG_SYNC= "now WAIT_FOR group1_running"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL group2_con2"; +SET DEBUG_SYNC= "commit_after_release_LOCK_log WAIT_FOR group3_committed"; +SET DEBUG_SYNC= "commit_after_group_run_commit_ordered SIGNAL group2_visible WAIT_FOR group2_checked"; +send INSERT INTO t1 VALUES ("con2"); +connection con3; +SET DEBUG_SYNC= "now WAIT_FOR group2_con2"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL group2_con3"; +send INSERT INTO t1 VALUES ("con3"); +connection con4; +SET DEBUG_SYNC= "now WAIT_FOR group2_con3"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL group2_con4"; +send INSERT INTO t1 VALUES ("con4"); + +# When group2 is queued, let group1 continue and queue group3. + +connection default; +SET DEBUG_SYNC= "now WAIT_FOR group2_con4"; + +# At this point, trasaction 1 is still not visible as commit_ordered() has not +# been called yet. +SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED; +SELECT * FROM t1 ORDER BY a; + +SET DEBUG_SYNC= "now SIGNAL group2_queued"; +connection con1; +reap; + +# Now transaction 1 is visible. +connection default; +SELECT * FROM t1 ORDER BY a; + +connection con5; +SET DEBUG_SYNC= "commit_before_get_LOCK_commit_ordered SIGNAL group3_con5"; +SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con5_leader WAIT_FOR con6_queued"; +send INSERT INTO t1 VALUES ("con5"); + +connection con6; +SET DEBUG_SYNC= "now WAIT_FOR con5_leader"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL con6_queued"; +send INSERT INTO t1 VALUES ("con6"); + +connection default; +SET DEBUG_SYNC= "now WAIT_FOR group3_con5"; +# Still only transaction 1 visible, as group2 have not yet run commit_ordered(). +SELECT * FROM t1 ORDER BY a; +SET DEBUG_SYNC= "now SIGNAL group3_committed"; +SET DEBUG_SYNC= "now WAIT_FOR group2_visible"; +# Now transactions 1-4 visible. +SELECT * FROM t1 ORDER BY a; +SET DEBUG_SYNC= "now SIGNAL group2_checked"; + +connection con2; +reap; + +connection con3; +reap; + +connection con4; +reap; + +connection con5; +reap; + +connection con6; +reap; + +connection default; +# Check all transactions finally visible. +SELECT * FROM t1 ORDER BY a; + +SELECT variable_value - @commits FROM information_schema.global_status + WHERE variable_name = 'binlog_commits'; +SELECT variable_value - @group_commits FROM information_schema.global_status + WHERE variable_name = 'binlog_group_commits'; + +SET DEBUG_SYNC= 'RESET'; +DROP TABLE t1; --- /dev/null +++ b/mysql-test/t/group_commit_binlog_pos-master.opt @@ -0,0 +1 @@ +--skip-stack-trace --skip-core-file --- /dev/null +++ b/mysql-test/t/group_commit_binlog_pos.test @@ -0,0 +1,89 @@ +--source include/have_debug_sync.inc +--source include/have_innodb.inc +--source include/have_log_bin.inc +--source include/have_binlog_format_mixed_or_statement.inc + +# Need DBUG to crash the server intentionally +--source include/have_debug.inc +# Don't test this under valgrind, memory leaks will occur as we crash +--source include/not_valgrind.inc + +# The test case currently uses grep and tail, which may be unavailable on +# some windows systems. But see MWL#191 for how to remove the need for grep. +--source include/not_windows.inc + +# XtraDB stores the binlog position corresponding to the last commit, and +# prints it during crash recovery. +# Test that we get the correct position when we group commit several +# transactions together. + +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb; +INSERT INTO t1 VALUES (0); + +connect(con1,localhost,root,,); +connect(con2,localhost,root,,); +connect(con3,localhost,root,,); + +# Queue up three commits for group commit. + +connection con1; +SET DEBUG_SYNC= "commit_after_get_LOCK_log SIGNAL con1_waiting WAIT_FOR con3_queued"; +SET DEBUG_SYNC= "commit_loop_entry_commit_ordered SIGNAL con1_loop WAIT_FOR con1_loop_cont EXECUTE 3"; +send INSERT INTO t1 VALUES (1); + +connection con2; +SET DEBUG_SYNC= "now WAIT_FOR con1_waiting"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL con2_queued"; +send INSERT INTO t1 VALUES (2); + +connection con3; +SET DEBUG_SYNC= "now WAIT_FOR con2_queued"; +SET DEBUG_SYNC= "commit_group_commit_queue SIGNAL con3_queued"; +send INSERT INTO t1 VALUES (3); + +connection default; +SET DEBUG_SYNC= "now WAIT_FOR con1_loop"; +# At this point, no transactions are committed. +SET DEBUG_SYNC= "now SIGNAL con1_loop_cont"; +SET DEBUG_SYNC= "now WAIT_FOR con1_loop"; +# At this point, 1 transaction is committed. +SET DEBUG_SYNC= "now SIGNAL con1_loop_cont"; +SET DEBUG_SYNC= "now WAIT_FOR con1_loop"; + +# At this point, 2 transactions are committed. +SELECT * FROM t1 ORDER BY a; + +connection con2; +reap; + +# Now crash the server with 1+2 in-memory committed, 3 only prepared. +connection default; +system echo wait-group_commit_binlog_pos.test >> $MYSQLTEST_VARDIR/tmp/mysqld.1.expect; +SET SESSION debug="+d,crash_dispatch_command_before"; +--error 2006,2013 +SELECT 1; + +connection con1; +--error 2006,2013 +reap; +connection con3; +--error 2006,2013 +reap; + +system echo restart-group_commit_binlog_pos.test >> $MYSQLTEST_VARDIR/tmp/mysqld.1.expect; + +connection default; +--enable_reconnect +--source include/wait_until_connected_again.inc + +# Crash recovery should recover all three transactions. +SELECT * FROM t1 ORDER BY a; + +# Check that the binlog position reported by InnoDB is the correct one +# for the end of the second transaction (as can be checked with +# mysqlbinlog). +let $MYSQLD_DATADIR= `SELECT @@datadir`; +--exec grep 'InnoDB: Last MySQL binlog file position' $MYSQLD_DATADIR/../../log/mysqld.1.err | tail -1 + +SET DEBUG_SYNC= 'RESET'; +DROP TABLE t1; --- /dev/null +++ b/mysql-test/t/group_commit_crash-master.opt @@ -0,0 +1 @@ +--skip-stack-trace --skip-core-file --- /dev/null +++ b/mysql-test/t/group_commit_crash.test @@ -0,0 +1,80 @@ +# Testing group commit by crashing a few times. +# Test adapted from the Facebook patch: lp:mysqlatfacebook +--source include/not_embedded.inc +# Don't test this under valgrind, memory leaks will occur +--source include/not_valgrind.inc + +# Binary must be compiled with debug for crash to occur +--source include/have_debug.inc +--source include/have_innodb.inc +--source include/have_log_bin.inc + +let $innodb_file_format_max_orig=`select @@innodb_file_format_max`; +CREATE TABLE t1(a CHAR(255), + b CHAR(255), + c CHAR(255), + d CHAR(255), + id INT AUTO_INCREMENT, + PRIMARY KEY(id)) ENGINE=InnoDB; +create table t2 like t1; +delimiter //; +create procedure setcrash(IN i INT) +begin + CASE i + WHEN 1 THEN SET SESSION debug="d,crash_commit_after_prepare"; + WHEN 2 THEN SET SESSION debug="d,crash_commit_after_log"; + WHEN 3 THEN SET SESSION debug="d,crash_commit_before_unlog"; + WHEN 4 THEN SET SESSION debug="d,crash_commit_after"; + WHEN 5 THEN SET SESSION debug="d,crash_commit_before"; + ELSE BEGIN END; + END CASE; +end // +delimiter ;// +# Avoid getting a crashed mysql.proc table. +FLUSH TABLES; + +let $numtests = 5; + +let $numinserts = 10; +while ($numinserts) +{ + dec $numinserts; + INSERT INTO t2(a, b, c, d) VALUES ('a', 'b', 'c', 'd'); +} + +--enable_reconnect + +while ($numtests) +{ + RESET MASTER; + + START TRANSACTION; + insert into t1 select * from t2; + # Write file to make mysql-test-run.pl expect crash + --exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.1.expect + + eval call setcrash($numtests); + + # Run the crashing query + --error 2006,2013 + COMMIT; + + # Poll the server waiting for it to be back online again. + --source include/wait_until_connected_again.inc + + # table and binlog should be in sync. + SELECT * FROM t1 ORDER BY id; + SHOW BINLOG EVENTS LIMIT 2,1; + + delete from t1; + + dec $numtests; +} + +# final cleanup +DROP TABLE t1; +DROP TABLE t2; +DROP PROCEDURE setcrash; +--disable_query_log +eval SET GLOBAL innodb_file_format_max=$innodb_file_format_max_orig; +--enable_query_log --- /dev/null +++ b/mysql-test/t/xa_binlog.test @@ -0,0 +1,32 @@ +--source include/have_innodb.inc +--source include/have_log_bin.inc + +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; + +# Fix binlog format (otherwise SHOW BINLOG EVENTS will fluctuate). +SET binlog_format= mixed; + +RESET MASTER; + +XA START 'xatest'; +INSERT INTO t1 VALUES (1); +XA END 'xatest'; +XA PREPARE 'xatest'; +XA COMMIT 'xatest'; + +XA START 'xatest'; +INSERT INTO t1 VALUES (2); +XA END 'xatest'; +XA COMMIT 'xatest' ONE PHASE; + +BEGIN; +INSERT INTO t1 VALUES (3); +COMMIT; + +SELECT * FROM t1 ORDER BY a; + +--replace_column 2 # 5 # +--replace_regex /xid=[0-9]+/xid=XX/ +SHOW BINLOG EVENTS LIMIT 1,9; + +DROP TABLE t1;