|
|
|
@ -586,23 +586,17 @@ static srv_sys_t srv_sys; |
|
|
|
*/ |
|
|
|
struct purge_coordinator_state |
|
|
|
{ |
|
|
|
/* Snapshot of the last history length before the purge call.*/ |
|
|
|
uint32 m_history_length; |
|
|
|
Atomic_counter<int> m_running; |
|
|
|
purge_coordinator_state() : |
|
|
|
m_history_length(), m_running(0) |
|
|
|
{} |
|
|
|
/** Snapshot of the last history length before the purge call.*/ |
|
|
|
uint32 m_history_length; |
|
|
|
Atomic_counter<int> m_running; |
|
|
|
purge_coordinator_state() : m_history_length(), m_running(0) {} |
|
|
|
}; |
|
|
|
|
|
|
|
static purge_coordinator_state purge_state; |
|
|
|
extern tpool::waitable_task purge_coordinator_task; |
|
|
|
|
|
|
|
/** @return whether the purge coordinator thread is active */ |
|
|
|
bool purge_sys_t::running() |
|
|
|
{ |
|
|
|
return purge_coordinator_task.is_running(); |
|
|
|
} |
|
|
|
|
|
|
|
bool purge_sys_t::running() { return purge_coordinator_task.is_running(); } |
|
|
|
|
|
|
|
/** threadpool timer for srv_error_monitor_task(). */ |
|
|
|
std::unique_ptr<tpool::timer> srv_error_monitor_timer; |
|
|
|
@ -726,30 +720,29 @@ static void thread_pool_thread_end() |
|
|
|
|
|
|
|
void srv_thread_pool_init() |
|
|
|
{ |
|
|
|
DBUG_ASSERT(!srv_thread_pool); |
|
|
|
DBUG_ASSERT(!srv_thread_pool); |
|
|
|
|
|
|
|
#if defined (_WIN32)
|
|
|
|
srv_thread_pool = tpool::create_thread_pool_win(); |
|
|
|
srv_thread_pool= tpool::create_thread_pool_win(); |
|
|
|
#else
|
|
|
|
srv_thread_pool = tpool::create_thread_pool_generic(); |
|
|
|
srv_thread_pool= tpool::create_thread_pool_generic(); |
|
|
|
#endif
|
|
|
|
srv_thread_pool->set_thread_callbacks(thread_pool_thread_init, thread_pool_thread_end); |
|
|
|
srv_thread_pool->set_thread_callbacks(thread_pool_thread_init, |
|
|
|
thread_pool_thread_end); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void srv_thread_pool_end() |
|
|
|
{ |
|
|
|
ut_a(!srv_master_timer); |
|
|
|
delete srv_thread_pool; |
|
|
|
srv_thread_pool = nullptr; |
|
|
|
ut_ad(!srv_master_timer); |
|
|
|
delete srv_thread_pool; |
|
|
|
srv_thread_pool= nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
static bool need_srv_free; |
|
|
|
|
|
|
|
/** Initialize the server. */ |
|
|
|
static |
|
|
|
void |
|
|
|
srv_init() |
|
|
|
static void srv_init() |
|
|
|
{ |
|
|
|
mutex_create(LATCH_ID_SRV_INNODB_MONITOR, &srv_innodb_monitor_mutex); |
|
|
|
srv_thread_pool_init(); |
|
|
|
@ -1407,16 +1400,14 @@ srv_export_innodb_status(void) |
|
|
|
|
|
|
|
struct srv_monitor_state_t |
|
|
|
{ |
|
|
|
time_t last_monitor_time; |
|
|
|
ulint mutex_skipped; |
|
|
|
bool last_srv_print_monitor; |
|
|
|
srv_monitor_state_t() |
|
|
|
{ |
|
|
|
srv_last_monitor_time = time(NULL); |
|
|
|
last_monitor_time = srv_last_monitor_time; |
|
|
|
mutex_skipped = 0; |
|
|
|
last_srv_print_monitor = false; |
|
|
|
} |
|
|
|
time_t last_monitor_time; |
|
|
|
ulint mutex_skipped; |
|
|
|
bool last_srv_print_monitor; |
|
|
|
srv_monitor_state_t() : mutex_skipped(0), last_srv_print_monitor(false) |
|
|
|
{ |
|
|
|
srv_last_monitor_time = time(NULL); |
|
|
|
last_monitor_time= srv_last_monitor_time; |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
static srv_monitor_state_t monitor_state; |
|
|
|
@ -1558,18 +1549,18 @@ srv_inc_activity_count(void) |
|
|
|
srv_sys.activity_count.inc(); |
|
|
|
} |
|
|
|
|
|
|
|
/**
|
|
|
|
Check whether purge or master are still active. |
|
|
|
@return true if something is active, false if not. |
|
|
|
*/ |
|
|
|
#ifdef UNIV_DEBUG
|
|
|
|
/** @return whether purge or master task is active */ |
|
|
|
bool srv_any_background_activity() |
|
|
|
{ |
|
|
|
if (purge_sys.enabled() || srv_master_timer.get()) { |
|
|
|
ut_ad(!srv_read_only_mode); |
|
|
|
return true; |
|
|
|
} |
|
|
|
return false; |
|
|
|
if (purge_sys.enabled() || srv_master_timer.get()) |
|
|
|
{ |
|
|
|
ut_ad(!srv_read_only_mode); |
|
|
|
return true; |
|
|
|
} |
|
|
|
return false; |
|
|
|
} |
|
|
|
#endif /* UNIV_DEBUG */
|
|
|
|
|
|
|
|
/** Wake up the InnoDB master thread if it was suspended (not sleeping). */ |
|
|
|
void |
|
|
|
@ -1582,17 +1573,19 @@ srv_active_wake_master_thread_low() |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void purge_worker_callback(void*); |
|
|
|
void purge_coordinator_callback(void*); |
|
|
|
void purge_coordinator_timer_callback(void*); |
|
|
|
static void purge_worker_callback(void*); |
|
|
|
static void purge_coordinator_callback(void*); |
|
|
|
static void purge_coordinator_timer_callback(void*); |
|
|
|
|
|
|
|
tpool::task_group purge_task_group; |
|
|
|
tpool::waitable_task purge_worker_task(purge_worker_callback, nullptr, &purge_task_group); |
|
|
|
static tpool::task_group purge_task_group; |
|
|
|
tpool::waitable_task purge_worker_task(purge_worker_callback, nullptr, |
|
|
|
&purge_task_group); |
|
|
|
static tpool::task_group purge_coordinator_task_group(1); |
|
|
|
tpool::waitable_task purge_coordinator_task(purge_coordinator_callback, |
|
|
|
nullptr, |
|
|
|
&purge_coordinator_task_group); |
|
|
|
|
|
|
|
tpool::task_group purge_coordinator_task_group(1); |
|
|
|
tpool::waitable_task purge_coordinator_task(purge_coordinator_callback, nullptr, &purge_coordinator_task_group); |
|
|
|
|
|
|
|
tpool::timer* purge_coordinator_timer; |
|
|
|
static tpool::timer *purge_coordinator_timer; |
|
|
|
|
|
|
|
/** Wake up the purge threads if there is work to do. */ |
|
|
|
void |
|
|
|
@ -1965,10 +1958,7 @@ srv_shutdown(bool ibuf_merge) |
|
|
|
} while (n_bytes_merged || n_tables_to_drop); |
|
|
|
} |
|
|
|
|
|
|
|
/*********************************************************************//**
|
|
|
|
The periodic master controlling the server. |
|
|
|
@return a dummy parameter */ |
|
|
|
|
|
|
|
/** The periodic master task controlling the server. */ |
|
|
|
void srv_master_callback(void*) |
|
|
|
{ |
|
|
|
static ulint old_activity_count; |
|
|
|
@ -2117,17 +2107,16 @@ static uint32_t srv_do_purge(ulint* n_total_purged) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
std::queue<THD*> purge_thds; |
|
|
|
std::mutex purge_thd_mutex; |
|
|
|
static std::queue<THD*> purge_thds; |
|
|
|
static std::mutex purge_thd_mutex; |
|
|
|
|
|
|
|
void purge_create_background_thds(int n) |
|
|
|
static void purge_create_background_thds(int n) |
|
|
|
{ |
|
|
|
THD* thd = current_thd; |
|
|
|
std::unique_lock<std::mutex> lk(purge_thd_mutex); |
|
|
|
for (int i = 0; i < n; i++) { |
|
|
|
purge_thds.push(innobase_create_background_thd("InnoDB purge worker")); |
|
|
|
} |
|
|
|
set_current_thd(thd); |
|
|
|
THD *thd= current_thd; |
|
|
|
std::unique_lock<std::mutex> lk(purge_thd_mutex); |
|
|
|
while (n--) |
|
|
|
purge_thds.push(innobase_create_background_thd("InnoDB purge worker")); |
|
|
|
set_current_thd(thd); |
|
|
|
} |
|
|
|
|
|
|
|
extern void* thd_attach_thd(THD*); |
|
|
|
@ -2162,103 +2151,98 @@ void release_thd(THD *thd, void *ctx) |
|
|
|
Called by timer when purge coordinator decides |
|
|
|
to delay processing of purge records. |
|
|
|
*/ |
|
|
|
void purge_coordinator_timer_callback(void *) |
|
|
|
static void purge_coordinator_timer_callback(void *) |
|
|
|
{ |
|
|
|
if (!purge_sys.enabled() || purge_sys.paused() || |
|
|
|
purge_state.m_running || !trx_sys.rseg_history_len) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (purge_state.m_history_length < 5000 && |
|
|
|
purge_state.m_history_length == trx_sys.rseg_history_len) { |
|
|
|
|
|
|
|
/* No new records were added since wait started.
|
|
|
|
Simply wait for new records.The magic number 5000 is an |
|
|
|
approximation for the case where we have cached UNDO |
|
|
|
log records which prevent truncate of the UNDO segments.*/ |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
srv_wake_purge_thread_if_not_active(); |
|
|
|
if (!purge_sys.enabled() || purge_sys.paused() || |
|
|
|
purge_state.m_running || !trx_sys.rseg_history_len) |
|
|
|
return; |
|
|
|
|
|
|
|
if (purge_state.m_history_length < 5000 && |
|
|
|
purge_state.m_history_length == trx_sys.rseg_history_len) |
|
|
|
/* No new records were added since wait started.
|
|
|
|
Simply wait for new records. The magic number 5000 is an |
|
|
|
approximation for the case where we have cached UNDO |
|
|
|
log records which prevent truncate of the UNDO segments.*/ |
|
|
|
return; |
|
|
|
srv_wake_purge_thread_if_not_active(); |
|
|
|
} |
|
|
|
|
|
|
|
void purge_worker_callback(void*) |
|
|
|
static void purge_worker_callback(void*) |
|
|
|
{ |
|
|
|
ut_ad(!current_thd); |
|
|
|
ut_ad(!srv_read_only_mode); |
|
|
|
ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND); |
|
|
|
void* ctx; |
|
|
|
THD* thd = acquire_thd(&ctx); |
|
|
|
while (srv_task_execute()){} |
|
|
|
release_thd(thd,ctx); |
|
|
|
ut_ad(!current_thd); |
|
|
|
ut_ad(!srv_read_only_mode); |
|
|
|
ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND); |
|
|
|
void *ctx; |
|
|
|
THD *thd= acquire_thd(&ctx); |
|
|
|
while (srv_task_execute()) {} |
|
|
|
release_thd(thd,ctx); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void purge_coordinator_callback_low() |
|
|
|
static void purge_coordinator_callback_low() |
|
|
|
{ |
|
|
|
ulint n_total_purged = ULINT_UNDEFINED; |
|
|
|
purge_state.m_history_length = 0; |
|
|
|
|
|
|
|
if (!purge_sys.enabled() || purge_sys.paused()) { |
|
|
|
return; |
|
|
|
} |
|
|
|
do { |
|
|
|
n_total_purged = 0; |
|
|
|
|
|
|
|
int sigcount = purge_state.m_running; |
|
|
|
|
|
|
|
purge_state.m_history_length = srv_do_purge(&n_total_purged); |
|
|
|
|
|
|
|
/* Check if purge was woken by srv_wake_purge_thread_if_not_active() */ |
|
|
|
|
|
|
|
bool woken_during_purge = purge_state.m_running > sigcount; |
|
|
|
|
|
|
|
/*If last purge batch processed less that 1 page and there is still work to do,
|
|
|
|
delay the next batch by 10ms. Unless someone added work and woke us up. */ |
|
|
|
if (n_total_purged == 0){ |
|
|
|
|
|
|
|
if(trx_sys.rseg_history_len == 0) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if (!woken_during_purge) { |
|
|
|
/* Delay next purge round*/ |
|
|
|
purge_coordinator_timer->set_time(10, 0); |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
} while((purge_sys.enabled() && !purge_sys.paused()) || !srv_purge_should_exit()); |
|
|
|
ulint n_total_purged= ULINT_UNDEFINED; |
|
|
|
purge_state.m_history_length= 0; |
|
|
|
|
|
|
|
if (!purge_sys.enabled() || purge_sys.paused()) |
|
|
|
return; |
|
|
|
do |
|
|
|
{ |
|
|
|
n_total_purged = 0; |
|
|
|
int sigcount= purge_state.m_running; |
|
|
|
|
|
|
|
purge_state.m_history_length= srv_do_purge(&n_total_purged); |
|
|
|
|
|
|
|
/* Check if purge was woken by srv_wake_purge_thread_if_not_active() */ |
|
|
|
|
|
|
|
bool woken_during_purge= purge_state.m_running > sigcount; |
|
|
|
|
|
|
|
/* If last purge batch processed less than 1 page and there is
|
|
|
|
still work to do, delay the next batch by 10ms. Unless |
|
|
|
someone added work and woke us up. */ |
|
|
|
if (n_total_purged == 0) |
|
|
|
{ |
|
|
|
if (trx_sys.rseg_history_len == 0) |
|
|
|
return; |
|
|
|
if (!woken_during_purge) |
|
|
|
{ |
|
|
|
/* Delay next purge round*/ |
|
|
|
purge_coordinator_timer->set_time(10, 0); |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
while ((purge_sys.enabled() && !purge_sys.paused()) || |
|
|
|
!srv_purge_should_exit()); |
|
|
|
} |
|
|
|
|
|
|
|
void purge_coordinator_callback(void*) |
|
|
|
static void purge_coordinator_callback(void*) |
|
|
|
{ |
|
|
|
void* ctx; |
|
|
|
THD* thd = acquire_thd(&ctx); |
|
|
|
purge_coordinator_callback_low(); |
|
|
|
release_thd(thd,ctx); |
|
|
|
purge_state.m_running = 0; |
|
|
|
void *ctx; |
|
|
|
THD *thd= acquire_thd(&ctx); |
|
|
|
purge_coordinator_callback_low(); |
|
|
|
release_thd(thd,ctx); |
|
|
|
purge_state.m_running= 0; |
|
|
|
} |
|
|
|
|
|
|
|
void srv_init_purge_tasks(uint n_tasks) |
|
|
|
{ |
|
|
|
purge_task_group.set_max_tasks(n_tasks-1); |
|
|
|
purge_create_background_thds(n_tasks); |
|
|
|
purge_coordinator_timer = |
|
|
|
srv_thread_pool->create_timer(purge_coordinator_timer_callback, |
|
|
|
nullptr); |
|
|
|
purge_task_group.set_max_tasks(n_tasks - 1); |
|
|
|
purge_create_background_thds(n_tasks); |
|
|
|
purge_coordinator_timer= srv_thread_pool->create_timer |
|
|
|
(purge_coordinator_timer_callback, nullptr); |
|
|
|
} |
|
|
|
|
|
|
|
void srv_shutdown_purge_tasks() |
|
|
|
static void srv_shutdown_purge_tasks() |
|
|
|
{ |
|
|
|
purge_coordinator_task.wait(); |
|
|
|
delete purge_coordinator_timer; |
|
|
|
purge_coordinator_timer = nullptr; |
|
|
|
purge_worker_task.wait(); |
|
|
|
while (!purge_thds.empty()) { |
|
|
|
innobase_destroy_background_thd(purge_thds.front()); |
|
|
|
purge_thds.pop(); |
|
|
|
} |
|
|
|
purge_coordinator_task.wait(); |
|
|
|
delete purge_coordinator_timer; |
|
|
|
purge_coordinator_timer= nullptr; |
|
|
|
purge_worker_task.wait(); |
|
|
|
while (!purge_thds.empty()) |
|
|
|
{ |
|
|
|
innobase_destroy_background_thd(purge_thds.front()); |
|
|
|
purge_thds.pop(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/**********************************************************************//**
|
|
|
|
@ -2277,12 +2261,9 @@ srv_que_task_enqueue_low( |
|
|
|
mutex_exit(&srv_sys.tasks_mutex); |
|
|
|
} |
|
|
|
|
|
|
|
/**********************************************************************//**
|
|
|
|
Get count of tasks in the queue. |
|
|
|
@return number of tasks in queue */ |
|
|
|
ulint |
|
|
|
srv_get_task_queue_length(void) |
|
|
|
/*===========================*/ |
|
|
|
#ifdef UNIV_DEBUG
|
|
|
|
/** @return number of tasks in queue */ |
|
|
|
ulint srv_get_task_queue_length() |
|
|
|
{ |
|
|
|
ulint n_tasks; |
|
|
|
|
|
|
|
@ -2296,6 +2277,7 @@ srv_get_task_queue_length(void) |
|
|
|
|
|
|
|
return(n_tasks); |
|
|
|
} |
|
|
|
#endif
|
|
|
|
|
|
|
|
/** Wake up the purge coordinator. */ |
|
|
|
void |
|
|
|
@ -2324,4 +2306,4 @@ void srv_purge_shutdown() |
|
|
|
purge_sys.coordinator_shutdown(); |
|
|
|
srv_shutdown_purge_tasks(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |