Browse Source

Move THD list handling to THD_list

Implemented and integrated THD_list as a replacement for the global
thread list. It uses own mutex instead of LOCK_thread_count for THD
list protection.

Removed unused first_global_thread() and next_global_thread().

delayed_insert_threads is now protected by LOCK_delayed_insert. Although
this patch doesn't fix very wrong synchronization of this variable.

After this patch there are only 2 legitimate uses of LOCK_thread_count
left, both in mysqld.cc: thread_count and ready_to_exit.

Aim is to reduce usage of LOCK_thread_count and COND_thread_count.
Part of MDEV-15135.
pull/1152/head
Sergey Vojtovich 7 years ago
parent
commit
3503fbbebf
  1. 3
      include/thread_pool_priv.h
  2. 9
      libmysqld/lib_sql.cc
  3. 9
      plugin/feedback/sender_thread.cc
  4. 6
      plugin/handler_socket/handlersocket/database.cpp
  5. 24
      sql/event_scheduler.cc
  6. 264
      sql/mysqld.cc
  7. 4
      sql/mysqld.h
  8. 4
      sql/rpl_parallel.cc
  9. 8
      sql/slave.cc
  10. 8
      sql/sql_class.cc
  11. 101
      sql/sql_class.h
  12. 2
      sql/sql_connect.cc
  13. 16
      sql/sql_insert.cc
  14. 113
      sql/sql_parse.cc
  15. 30
      sql/sql_plugin.cc
  16. 110
      sql/sql_repl.cc
  17. 495
      sql/sql_show.cc
  18. 2
      sql/threadpool_common.cc
  19. 50
      sql/threadpool_generic.cc
  20. 165
      sql/wsrep_mysqld.cc

3
include/thread_pool_priv.h

@ -61,9 +61,6 @@ void thd_set_mysys_var(THD *thd, st_my_thread_var *mysys_var);
my_socket thd_get_fd(THD *thd);
int thd_store_globals(THD* thd);
THD *first_global_thread();
THD *next_global_thread(THD *thd);
/* Print to the MySQL error log */
void sql_print_error(const char *format, ...);

9
libmysqld/lib_sql.cc

@ -432,11 +432,9 @@ int emb_unbuffered_fetch(MYSQL *mysql, char **row)
static void emb_free_embedded_thd(MYSQL *mysql)
{
THD *thd= (THD*)mysql->thd;
mysql_mutex_lock(&LOCK_thread_count);
server_threads.erase(thd);
thd->clear_data_list();
thd->store_globals();
thd->unlink();
mysql_mutex_unlock(&LOCK_thread_count);
delete thd;
my_pthread_setspecific_ptr(THR_THD, 0);
mysql->thd=0;
@ -711,10 +709,7 @@ void *create_embedded_thd(int client_flag)
thd->first_data= 0;
thd->data_tail= &thd->first_data;
bzero((char*) &thd->net, sizeof(thd->net));
mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
server_threads.insert(thd);
thd->mysys_var= 0;
thd->reset_globals();
return thd;

9
plugin/feedback/sender_thread.cc

@ -90,9 +90,7 @@ static int prepare_for_fill(TABLE_LIST *tables)
in SHOW STATUS and we want to avoid skewing the statistics)
*/
thd->variables.pseudo_thread_id= thd->thread_id;
mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
server_threads.insert(thd);
thd->thread_stack= (char*) &tables;
if (thd->store_globals())
return 1;
@ -258,12 +256,9 @@ ret:
reset all thread local status variables to minimize
the effect of the background thread on SHOW STATUS.
*/
mysql_mutex_lock(&LOCK_thread_count);
server_threads.erase(thd);
thd->set_status_var_init();
thd->killed= KILL_CONNECTION;
thd->unlink();
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
delete thd;
thd= 0;
}

6
plugin/handler_socket/handlersocket/database.cpp

@ -280,7 +280,7 @@ dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
DBG_THR(fprintf(stderr,
"thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu "
"O: %zu %zu %zu %zu %zu %zu %zu\n",
thd->thread_stack, sizeof(THD), sizeof(LOCK_thread_count),
thd->thread_stack, sizeof(THD), sizeof(mysql_mutex_t),
DENA_THR_OFFSETOF(mdl_context),
DENA_THR_OFFSETOF(net),
DENA_THR_OFFSETOF(LOCK_thd_data),
@ -307,7 +307,7 @@ dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
}
{
thd->thread_id = next_thread_id();
add_to_active_threads(thd);
server_threads.insert(thd);
}
DBG_THR(fprintf(stderr, "HNDSOCK init thread wsts\n"));
@ -341,10 +341,8 @@ dbcontext::term_thread()
close_tables_if();
my_pthread_setspecific_ptr(THR_THD, 0);
{
pthread_mutex_lock(&LOCK_thread_count);
delete thd;
thd = 0;
pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end();
}
}

24
sql/event_scheduler.cc

@ -150,7 +150,7 @@ deinit_event_thread(THD *thd)
{
thd->proc_info= "Clearing";
DBUG_PRINT("exit", ("Event thread finishing"));
unlink_not_visible_thd(thd);
server_threads.erase(thd);
delete thd;
}
@ -185,7 +185,7 @@ pre_init_event_thread(THD* thd)
thd->net.read_timeout= slave_net_timeout;
thd->variables.option_bits|= OPTION_AUTO_IS_NULL;
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
add_to_active_threads(thd);
server_threads.insert(thd);
/*
Guarantees that we will see the thread in SHOW PROCESSLIST though its
@ -679,20 +679,20 @@ end:
Event_scheduler::workers_count()
*/
static my_bool workers_count_callback(THD *thd, uint32_t *count)
{
if (thd->system_thread == SYSTEM_THREAD_EVENT_WORKER)
++*count;
return 0;
}
uint
Event_scheduler::workers_count()
{
THD *tmp;
uint count= 0;
uint32_t count= 0;
DBUG_ENTER("Event_scheduler::workers_count");
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
++count;
mysql_mutex_unlock(&LOCK_thread_count);
DBUG_PRINT("exit", ("%d", count));
server_threads.iterate(workers_count_callback, &count);
DBUG_RETURN(count);
}

264
sql/mysqld.cc

@ -651,26 +651,11 @@ Le_creator le_creator;
int bootstrap_error;
I_List<THD> threads;
THD_list server_threads;
Rpl_filter* cur_rpl_filter;
Rpl_filter* global_rpl_filter;
Rpl_filter* binlog_filter;
THD *first_global_thread()
{
if (threads.is_empty())
return NULL;
return threads.head();
}
THD *next_global_thread(THD *thd)
{
if (threads.is_last(thd))
return NULL;
struct ilink *next= thd->next;
return static_cast<THD*>(next);
}
struct system_variables global_system_variables;
/**
Following is just for options parsing, used with a difference against
@ -707,12 +692,7 @@ pthread_key(THD*, THR_THD);
/*
LOCK_thread_count protects the following variables:
thread_count Number of threads with THD that servers queries.
threads Linked list of active THD's.
The effect of this is that one can't unlink and
delete a THD as long as one has locked
LOCK_thread_count.
ready_to_exit
delayed_insert_threads
ready_to_exit
*/
mysql_mutex_t LOCK_thread_count;
@ -910,7 +890,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages,
key_LOCK_start_thread,
key_LOCK_thread_count, key_LOCK_thread_cache,
key_LOCK_thread_count, key_Thread_map_mutex, key_LOCK_thread_cache,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_relaylog_end_pos;
@ -1004,6 +984,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_Thread_map_mutex, "Thread_map::mutex", PSI_FLAG_GLOBAL },
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
{ &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
{ &key_LOCK_slave_state, "LOCK_slave_state", 0},
@ -1549,6 +1530,106 @@ static void end_ssl();
** Code to end mysqld
****************************************************************************/
static my_bool kill_all_threads(THD *thd, void *)
{
DBUG_PRINT("quit", ("Informing thread %ld that it's time to die",
(ulong) thd->thread_id));
/* We skip slave threads on this first loop through. */
if (thd->slave_thread)
return 0;
if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0))
return 0;
#ifdef WITH_WSREP
/* skip wsrep system threads as well */
if (WSREP(thd) && (wsrep_thd_is_applying(thd) || thd->wsrep_applier))
return 0;
#endif
thd->set_killed(KILL_SERVER_HARD);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
mysql_mutex_lock(&thd->LOCK_thd_kill);
if (thd->mysys_var)
{
thd->mysys_var->abort= 1;
mysql_mutex_lock(&thd->mysys_var->mutex);
if (thd->mysys_var->current_cond)
{
for (uint i= 0; i < 2; i++)
{
int ret= mysql_mutex_trylock(thd->mysys_var->current_mutex);
mysql_cond_broadcast(thd->mysys_var->current_cond);
if (!ret)
{
/* Thread has surely got the signal, unlock and abort */
mysql_mutex_unlock(thd->mysys_var->current_mutex);
break;
}
sleep(1);
}
}
mysql_mutex_unlock(&thd->mysys_var->mutex);
}
mysql_mutex_unlock(&thd->LOCK_thd_kill);
if (WSREP(thd)) mysql_mutex_unlock(&thd->LOCK_thd_data);
return 0;
}
static my_bool kill_all_threads_once_again(THD *thd, void *)
{
#ifndef __bsdi__ // Bug in BSDI kernel
if (thd->vio_ok())
{
if (global_system_variables.log_warnings)
sql_print_warning(ER_DEFAULT(ER_FORCING_CLOSE), my_progname,
(ulong) thd->thread_id,
(thd->main_security_ctx.user ?
thd->main_security_ctx.user : ""));
/*
close_connection() might need a valid current_thd
for memory allocation tracking.
*/
THD *save_thd= current_thd;
set_current_thd(thd);
close_connection(thd, ER_SERVER_SHUTDOWN);
set_current_thd(save_thd);
}
#endif
#ifdef WITH_WSREP
/*
* WSREP_TODO:
* this code block may turn out redundant. wsrep->disconnect()
* should terminate slave threads gracefully, and we don't need
* to signal them here.
* The code here makes sure mysqld will not hang during shutdown
* even if wsrep provider has problems in shutting down.
*/
if (WSREP(thd) && wsrep_thd_is_applying(thd))
{
sql_print_information("closing wsrep system thread");
thd->set_killed(KILL_CONNECTION);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
if (thd->mysys_var)
{
thd->mysys_var->abort=1;
mysql_mutex_lock(&thd->mysys_var->mutex);
if (thd->mysys_var->current_cond)
{
mysql_mutex_lock(thd->mysys_var->current_mutex);
mysql_cond_broadcast(thd->mysys_var->current_cond);
mysql_mutex_unlock(thd->mysys_var->current_mutex);
}
mysql_mutex_unlock(&thd->mysys_var->mutex);
}
}
#endif
return 0;
}
static void close_connections(void)
{
#ifdef EXTRA_DEBUG
@ -1626,58 +1707,7 @@ static void close_connections(void)
This will give the threads some time to gracefully abort their
statements and inform their clients that the server is about to die.
*/
THD *tmp;
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
(ulong) tmp->thread_id));
/* We skip slave threads on this first loop through. */
if (tmp->slave_thread)
continue;
/* cannot use 'continue' inside DBUG_EXECUTE_IF()... */
if (DBUG_EVALUATE_IF("only_kill_system_threads", !tmp->system_thread, 0))
continue;
#ifdef WITH_WSREP
/* skip wsrep system threads as well */
if (WSREP(tmp) && (wsrep_thd_is_applying(tmp) || tmp->wsrep_applier))
continue;
#endif
tmp->set_killed(KILL_SERVER_HARD);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data);
mysql_mutex_lock(&tmp->LOCK_thd_kill);
if (tmp->mysys_var)
{
tmp->mysys_var->abort=1;
mysql_mutex_lock(&tmp->mysys_var->mutex);
if (tmp->mysys_var->current_cond)
{
uint i;
for (i=0; i < 2; i++)
{
int ret= mysql_mutex_trylock(tmp->mysys_var->current_mutex);
mysql_cond_broadcast(tmp->mysys_var->current_cond);
if (!ret)
{
/* Thread has surely got the signal, unlock and abort */
mysql_mutex_unlock(tmp->mysys_var->current_mutex);
break;
}
sleep(1);
}
}
mysql_mutex_unlock(&tmp->mysys_var->mutex);
}
mysql_mutex_unlock(&tmp->LOCK_thd_kill);
if (WSREP(tmp)) mysql_mutex_unlock(&tmp->LOCK_thd_data);
}
mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list
server_threads.iterate(kill_all_threads);
Events::deinit();
slave_prepare_for_shutdown();
@ -1708,65 +1738,8 @@ static void close_connections(void)
This will ensure that threads that are waiting for a command from the
client on a blocking read call are aborted.
*/
server_threads.iterate(kill_all_threads_once_again);
for (;;)
{
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
if (!(tmp=threads.get()))
{
mysql_mutex_unlock(&LOCK_thread_count);
break;
}
#ifndef __bsdi__ // Bug in BSDI kernel
if (tmp->vio_ok())
{
if (global_system_variables.log_warnings)
sql_print_warning(ER_DEFAULT(ER_FORCING_CLOSE),my_progname,
(ulong) tmp->thread_id,
(tmp->main_security_ctx.user ?
tmp->main_security_ctx.user : ""));
/*
close_connection() might need a valid current_thd
for memory allocation tracking.
*/
THD* save_thd= current_thd;
set_current_thd(tmp);
close_connection(tmp,ER_SERVER_SHUTDOWN);
set_current_thd(save_thd);
}
#endif
#ifdef WITH_WSREP
/*
* WSREP_TODO:
* this code block may turn out redundant. wsrep->disconnect()
* should terminate slave threads gracefully, and we don't need
* to signal them here.
* The code here makes sure mysqld will not hang during shutdown
* even if wsrep provider has problems in shutting down.
*/
if (WSREP(tmp) && wsrep_thd_is_applying(tmp))
{
sql_print_information("closing wsrep system thread");
tmp->set_killed(KILL_CONNECTION);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
if (tmp->mysys_var)
{
tmp->mysys_var->abort=1;
mysql_mutex_lock(&tmp->mysys_var->mutex);
if (tmp->mysys_var->current_cond)
{
mysql_mutex_lock(tmp->mysys_var->current_mutex);
mysql_cond_broadcast(tmp->mysys_var->current_cond);
mysql_mutex_unlock(tmp->mysys_var->current_mutex);
}
mysql_mutex_unlock(&tmp->mysys_var->mutex);
}
}
#endif
DBUG_PRINT("quit",("Unlocking LOCK_thread_count"));
mysql_mutex_unlock(&LOCK_thread_count);
}
end_slave();
#ifdef WITH_WSREP
if (wsrep_inited == 1)
@ -2249,6 +2222,7 @@ static void wait_for_signal_thread_to_end()
static void clean_up_mutexes()
{
DBUG_ENTER("clean_up_mutexes");
server_threads.destroy();
mysql_rwlock_destroy(&LOCK_grant);
mysql_mutex_destroy(&LOCK_thread_count);
mysql_mutex_destroy(&LOCK_thread_cache);
@ -2798,7 +2772,7 @@ void unlink_thd(THD *thd)
thd->cleanup();
thd->add_status_to_global();
unlink_not_visible_thd(thd);
server_threads.erase(thd);
#ifdef WITH_WSREP
/*
@ -2910,7 +2884,7 @@ static bool cache_thread(THD *thd)
thd->thr_create_utime= microsecond_interval_timer();
thd->start_utime= thd->thr_create_utime;
add_to_active_threads(thd);
server_threads.insert(thd);
DBUG_RETURN(1);
}
}
@ -4627,6 +4601,7 @@ static int init_common_variables()
static int init_thread_environment()
{
DBUG_ENTER("init_thread_environment");
server_threads.init();
mysql_mutex_init(key_LOCK_thread_count, &LOCK_thread_count, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_thread_cache, &LOCK_thread_cache, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_start_thread, &LOCK_start_thread, MY_MUTEX_INIT_FAST);
@ -8265,7 +8240,6 @@ static int mysql_init_variables(void)
global_query_id= 1;
global_thread_id= 0;
strnmov(server_version, MYSQL_SERVER_VERSION, sizeof(server_version)-1);
threads.empty();
thread_cache.empty();
key_caches.empty();
if (!(dflt_key_cache= get_or_create_key_cache(default_key_cache_base.str,
@ -9973,6 +9947,14 @@ static my_thread_id thread_id_max= UINT_MAX32;
@param[out] low - lower bound for the range
@param[out] high - upper bound for the range
*/
static my_bool recalculate_callback(THD *thd, std::vector<my_thread_id> *ids)
{
ids->push_back(thd->thread_id);
return 0;
}
static void recalculate_thread_id_range(my_thread_id *low, my_thread_id *high)
{
std::vector<my_thread_id> ids;
@ -9980,15 +9962,7 @@ static void recalculate_thread_id_range(my_thread_id *low, my_thread_id *high)
// Add sentinels
ids.push_back(0);
ids.push_back(UINT_MAX32);
mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
THD *thd;
while ((thd=it++))
ids.push_back(thd->thread_id);
mysql_mutex_unlock(&LOCK_thread_count);
server_threads.iterate(recalculate_callback, &ids);
std::sort(ids.begin(), ids.end());
my_thread_id max_gap= 0;

4
sql/mysqld.h

@ -299,7 +299,6 @@ extern pthread_attr_t connection_attrib;
extern my_bool old_mode;
extern LEX_STRING opt_init_connect, opt_init_slave;
extern int bootstrap_error;
extern I_List<THD> threads;
extern char err_shared_dir[];
extern ulong connection_errors_select;
extern ulong connection_errors_accept;
@ -346,7 +345,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_rpl_group_info_sleep_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_start_thread,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
key_LOCK_error_messages, key_LOCK_thread_count, key_Thread_map_mutex,
key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
extern PSI_mutex_key key_LOCK_relaylog_end_pos;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,

4
sql/rpl_parallel.cc

@ -1023,7 +1023,7 @@ handle_rpl_parallel_thread(void *arg)
my_thread_init();
thd = new THD(next_thread_id());
thd->thread_stack = (char*)&thd;
add_to_active_threads(thd);
server_threads.insert(thd);
set_current_thd(thd);
pthread_detach_this_thread();
thd->init_for_queries();
@ -1432,7 +1432,7 @@ handle_rpl_parallel_thread(void *arg)
thd->temporary_tables= 0;
THD_CHECK_SENTRY(thd);
unlink_not_visible_thd(thd);
server_threads.erase(thd);
delete thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);

8
sql/slave.cc

@ -4698,7 +4698,7 @@ pthread_handler_t handle_slave_io(void *arg)
goto err_during_init;
}
thd->system_thread_info.rpl_io_info= &io_info;
add_to_active_threads(thd);
server_threads.insert(thd);
mi->slave_running = MYSQL_SLAVE_RUN_NOT_CONNECT;
mi->abort_slave = 0;
mysql_mutex_unlock(&mi->run_lock);
@ -5080,7 +5080,7 @@ err:
flush_master_info(mi, TRUE, TRUE);
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
unlink_not_visible_thd(thd);
server_threads.erase(thd);
mysql_mutex_lock(&mi->run_lock);
err_during_init:
@ -5368,7 +5368,7 @@ pthread_handler_t handle_slave_sql(void *arg)
/* Ensure that slave can exeute any alter table it gets from master */
thd->variables.alter_algorithm= (ulong) Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT;
add_to_active_threads(thd);
server_threads.insert(thd);
/*
We are going to set slave_running to 1. Assuming slave I/O thread is
alive and connected, this is going to make Seconds_Behind_Master be 0
@ -5714,7 +5714,7 @@ pthread_handler_t handle_slave_sql(void *arg)
}
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
unlink_not_visible_thd(thd);
server_threads.erase(thd);
mysql_mutex_lock(&rli->run_lock);
err_during_init:

8
sql/sql_class.cc

@ -1639,8 +1639,8 @@ THD::~THD()
THD *orig_thd= current_thd;
THD_CHECK_SENTRY(this);
DBUG_ENTER("~THD()");
/* Check that we have already called thd->unlink() */
DBUG_ASSERT(prev == 0 && next == 0);
/* Make sure threads are not available via server_threads. */
assert_not_linked();
/* This takes a long time so we should not do this under LOCK_thread_count */
mysql_mutex_assert_not_owner(&LOCK_thread_count);
@ -4772,14 +4772,14 @@ MYSQL_THD create_thd()
thd->set_command(COM_DAEMON);
thd->system_thread= SYSTEM_THREAD_GENERIC;
thd->security_ctx->host_or_ip="";
add_to_active_threads(thd);
server_threads.insert(thd);
return thd;
}
void destroy_thd(MYSQL_THD thd)
{
thd->add_status_to_global();
unlink_not_visible_thd(thd);
server_threads.erase(thd);
delete thd;
}

101
sql/sql_class.h

@ -5004,27 +5004,6 @@ public:
}
};
inline void add_to_active_threads(THD *thd)
{
mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
}
/*
This should be called when you want to delete a thd that was not
running any queries.
This function will assert that the THD is linked.
*/
inline void unlink_not_visible_thd(THD *thd)
{
thd->assert_linked();
mysql_mutex_lock(&LOCK_thread_count);
thd->unlink();
mysql_mutex_unlock(&LOCK_thread_count);
}
/** A short cut for thd->get_stmt_da()->set_ok_status(). */
inline void
@ -6935,5 +6914,85 @@ private:
THD *thd;
};
/** THD registry */
class THD_list
{
I_List<THD> threads;
mutable mysql_mutex_t mutex;
public:
/**
Constructor replacement.
Unfortunately we can't use fair constructor to initialize mutex
for two reasons: PFS and embedded. The former can probably be fixed,
the latter can probably be dropped.
*/
void init()
{
mysql_mutex_init(key_Thread_map_mutex, &mutex, MY_MUTEX_INIT_FAST);
}
/** Destructor replacement. */
void destroy()
{
mysql_mutex_destroy(&mutex);
}
/**
Inserts thread to registry.
@param thd thread
Thread becomes accessible via server_threads.
*/
void insert(THD *thd)
{
mysql_mutex_lock(&mutex);
threads.append(thd);
mysql_mutex_unlock(&mutex);
}
/**
Removes thread from registry.
@param thd thread
Thread becomes not accessible via server_threads.
*/
void erase(THD *thd)
{
thd->assert_linked();
mysql_mutex_lock(&mutex);
thd->unlink();
mysql_mutex_unlock(&mutex);
}
/**
Iterates registered threads.
@param action called for every element
@param argument opque argument passed to action
@return
@retval 0 iteration completed successfully
@retval 1 iteration was interrupted (action returned 1)
*/
template <typename T> int iterate(my_bool (*action)(THD *thd, T *arg), T *arg= 0)
{
int res= 0;
mysql_mutex_lock(&mutex);
I_List_iterator<THD> it(threads);
while (auto tmp= it++)
if ((res= action(tmp, arg)))
break;
mysql_mutex_unlock(&mutex);
return res;
}
};
extern THD_list server_threads;
#endif /* MYSQL_SERVER */
#endif /* SQL_CLASS_INCLUDED */

2
sql/sql_connect.cc

@ -1360,7 +1360,7 @@ void do_handle_one_connection(CONNECT *connect)
delete connect;
/* Make THD visible in show processlist */
add_to_active_threads(thd);
server_threads.insert(thd);
thd->thr_create_utime= thr_create_utime;
/* We need to set this because of time_out_user_resource_limits */

16
sql/sql_insert.cc

@ -2188,11 +2188,11 @@ public:
mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_delayed_insert);
delayed_insert_threads++;
mysql_mutex_unlock(&LOCK_delayed_insert);
delayed_lock= global_system_variables.low_priority_updates ?
TL_WRITE_LOW_PRIORITY : TL_WRITE;
mysql_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN;
}
~Delayed_insert()
@ -2210,15 +2210,9 @@ public:
mysql_cond_destroy(&cond);
mysql_cond_destroy(&cond_client);
/*
We could use unlink_not_visible_threads() here, but as
delayed_insert_threads also needs to be protected by
the LOCK_thread_count mutex, we open code this.
*/
mysql_mutex_lock(&LOCK_thread_count);
thd.unlink(); // Must be unlinked under lock
server_threads.erase(&thd);
mysql_mutex_assert_owner(&LOCK_delayed_insert);
delayed_insert_threads--;
mysql_mutex_unlock(&LOCK_thread_count);
my_free(thd.query());
thd.security_ctx->user= 0;
@ -2940,7 +2934,7 @@ pthread_handler_t handle_delayed_insert(void *arg)
pthread_detach_this_thread();
/* Add thread to THD list so that's it's visible in 'show processlist' */
thd->set_start_time();
add_to_active_threads(thd);
server_threads.insert(thd);
if (abort_loop)
thd->set_killed(KILL_CONNECTION);
else

113
sql/sql_parse.cc

@ -8955,24 +8955,35 @@ void add_join_natural(TABLE_LIST *a, TABLE_LIST *b, List<String> *using_fields,
pointer - thread found, and its LOCK_thd_kill is locked.
*/
THD *find_thread_by_id(longlong id, bool query_id)
struct find_thread_callback_arg
{
THD *tmp;
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
find_thread_callback_arg(longlong id_arg, bool query_id_arg):
thd(0), id(id_arg), query_id(query_id_arg) {}
THD *thd;
longlong id;
bool query_id;
};
my_bool find_thread_callback(THD *thd, find_thread_callback_arg *arg)
{
if (thd->get_command() != COM_DAEMON &&
arg->id == (arg->query_id ? thd->query_id : (longlong) thd->thread_id))
{
if (tmp->get_command() == COM_DAEMON)
continue;
if (id == (query_id ? tmp->query_id : (longlong) tmp->thread_id))
{
if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data);
mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
break;
}
if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete
arg->thd= thd;
return 1;
}
mysql_mutex_unlock(&LOCK_thread_count);
return tmp;
return 0;
}
THD *find_thread_by_id(longlong id, bool query_id)
{
find_thread_callback_arg arg(id, query_id);
server_threads.iterate(find_thread_callback, &arg);
return arg.thd;
}
@ -9056,53 +9067,63 @@ kill_one_thread(THD *thd, longlong id, killed_state kill_signal, killed_type typ
are killed.
*/
static uint kill_threads_for_user(THD *thd, LEX_USER *user,
killed_state kill_signal, ha_rows *rows)
struct kill_threads_callback_arg
{
THD *tmp;
kill_threads_callback_arg(THD *thd_arg, LEX_USER *user_arg):
thd(thd_arg), user(user_arg) {}
THD *thd;
LEX_USER *user;
List<THD> threads_to_kill;
DBUG_ENTER("kill_threads_for_user");
*rows= 0;
if (unlikely(thd->is_fatal_error)) // If we run out of memory
DBUG_RETURN(ER_OUT_OF_RESOURCES);
};
DBUG_PRINT("enter", ("user: %s signal: %u", user->user.str,
(uint) kill_signal));
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
static my_bool kill_threads_callback(THD *thd, kill_threads_callback_arg *arg)
{
if (thd->security_ctx->user)
{
if (!tmp->security_ctx->user)
continue;
/*
Check that hostname (if given) and user name matches.
host.str[0] == '%' means that host name was not given. See sql_yacc.yy
*/
if (((user->host.str[0] == '%' && !user->host.str[1]) ||
!strcmp(tmp->security_ctx->host_or_ip, user->host.str)) &&
!strcmp(tmp->security_ctx->user, user->user.str))
if (((arg->user->host.str[0] == '%' && !arg->user->host.str[1]) ||
!strcmp(thd->security_ctx->host_or_ip, arg->user->host.str)) &&
!strcmp(thd->security_ctx->user, arg->user->user.str))
{
if (!(thd->security_ctx->master_access & SUPER_ACL) &&
!thd->security_ctx->user_matches(tmp->security_ctx))
{
mysql_mutex_unlock(&LOCK_thread_count);
DBUG_RETURN(ER_KILL_DENIED_ERROR);
}
if (!threads_to_kill.push_back(tmp, thd->mem_root))
if (!(arg->thd->security_ctx->master_access & SUPER_ACL) &&
!arg->thd->security_ctx->user_matches(thd->security_ctx))
return 1;
if (!arg->threads_to_kill.push_back(thd, arg->thd->mem_root))
{
if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data);
mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete
}
}
}
mysql_mutex_unlock(&LOCK_thread_count);
if (!threads_to_kill.is_empty())
return 0;
}
static uint kill_threads_for_user(THD *thd, LEX_USER *user,
killed_state kill_signal, ha_rows *rows)
{
kill_threads_callback_arg arg(thd, user);
DBUG_ENTER("kill_threads_for_user");
*rows= 0;
if (unlikely(thd->is_fatal_error)) // If we run out of memory
DBUG_RETURN(ER_OUT_OF_RESOURCES);
DBUG_PRINT("enter", ("user: %s signal: %u", user->user.str,
(uint) kill_signal));
if (server_threads.iterate(kill_threads_callback, &arg))
DBUG_RETURN(ER_KILL_DENIED_ERROR);
if (!arg.threads_to_kill.is_empty())
{
List_iterator_fast<THD> it2(threads_to_kill);
List_iterator_fast<THD> it2(arg.threads_to_kill);
THD *next_ptr;
THD *ptr= it2++;
do

30
sql/sql_plugin.cc

@ -4354,25 +4354,23 @@ void wsrep_plugins_pre_init()
members of wsrep startup threads with correct values, as these value
were not available at the time these threads were created.
*/
void wsrep_plugins_post_init()
{
THD *thd;
I_List_iterator<THD> it(threads);
while ((thd= it++))
my_bool post_init_callback(THD *thd, void *)
{
if (thd->wsrep_applier)
{
if (IF_WSREP(thd->wsrep_applier,1))
{
// Save options_bits as it will get overwritten in plugin_thdvar_init()
ulonglong option_bits_saved= thd->variables.option_bits;
plugin_thdvar_init(thd);
// Restore option_bits
thd->variables.option_bits= option_bits_saved;
}
// Save options_bits as it will get overwritten in plugin_thdvar_init()
ulonglong option_bits_saved= thd->variables.option_bits;
plugin_thdvar_init(thd);
// Restore option_bits
thd->variables.option_bits= option_bits_saved;
}
return 0;
}
return;
void wsrep_plugins_post_init()
{
server_threads.iterate(post_init_callback);
}
#endif /* WITH_WSREP */

110
sql/sql_repl.cc

@ -527,59 +527,48 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD *
Now they sync is done for next read.
*/
void adjust_linfo_offsets(my_off_t purge_offset)
static my_bool adjust_callback(THD *thd, my_off_t *purge_offset)
{
THD *tmp;
mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
while ((tmp=it++))
mysql_mutex_lock(&thd->LOCK_thd_data);
if (auto linfo= thd->current_linfo)
{
LOG_INFO* linfo;
mysql_mutex_lock(&tmp->LOCK_thd_data);
if ((linfo = tmp->current_linfo))
{
/*
Index file offset can be less that purge offset only if
we just started reading the index file. In that case
we have nothing to adjust
*/
if (linfo->index_file_offset < purge_offset)
linfo->fatal = (linfo->index_file_offset != 0);
else
linfo->index_file_offset -= purge_offset;
}
mysql_mutex_unlock(&tmp->LOCK_thd_data);
/*
Index file offset can be less that purge offset only if
we just started reading the index file. In that case
we have nothing to adjust
*/
if (linfo->index_file_offset < *purge_offset)
linfo->fatal= (linfo->index_file_offset != 0);
else
linfo->index_file_offset-= *purge_offset;
}
mysql_mutex_unlock(&LOCK_thread_count);
mysql_mutex_unlock(&thd->LOCK_thd_data);
return 0;
}
bool log_in_use(const char* log_name)
void adjust_linfo_offsets(my_off_t purge_offset)
{
size_t log_name_len = strlen(log_name) + 1;
THD *tmp;
bool result = 0;
mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
server_threads.iterate(adjust_callback, &purge_offset);
}
while ((tmp=it++))
{
LOG_INFO* linfo;
mysql_mutex_lock(&tmp->LOCK_thd_data);
if ((linfo = tmp->current_linfo))
result = !memcmp(log_name, linfo->log_file_name, log_name_len);
mysql_mutex_unlock(&tmp->LOCK_thd_data);
if (result)
break;
}
mysql_mutex_unlock(&LOCK_thread_count);
static my_bool log_in_use_callback(THD *thd, const char *log_name)
{
my_bool result= 0;
mysql_mutex_lock(&thd->LOCK_thd_data);
if (auto linfo= thd->current_linfo)
result= !memcmp(log_name, linfo->log_file_name, strlen(log_name) + 1);
mysql_mutex_unlock(&thd->LOCK_thd_data);
return result;
}
bool log_in_use(const char* log_name)
{
return server_threads.iterate(log_in_use_callback, log_name);
}
bool purge_error_message(THD* thd, int res)
{
uint errcode;
@ -3367,31 +3356,40 @@ err:
slave_server_id the slave's server id
*/
void kill_zombie_dump_threads(uint32 slave_server_id)
struct kill_callback_arg
{
mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
THD *tmp;
kill_callback_arg(uint32 id): slave_server_id(id), thd(0) {}
uint32 slave_server_id;
THD *thd;
};
while ((tmp=it++))
static my_bool kill_callback(THD *thd, kill_callback_arg *arg)
{
if (thd->get_command() == COM_BINLOG_DUMP &&
thd->variables.server_id == arg->slave_server_id)
{
if (tmp->get_command() == COM_BINLOG_DUMP &&
tmp->variables.server_id == slave_server_id)
{
mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
break;
}
arg->thd= thd;
mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete
return 1;
}
mysql_mutex_unlock(&LOCK_thread_count);
if (tmp)
return 0;
}
void kill_zombie_dump_threads(uint32 slave_server_id)
{
kill_callback_arg arg(slave_server_id);
server_threads.iterate(kill_callback, &arg);
if (arg.thd)
{
/*
Here we do not call kill_one_thread() as
it will be slow because it will iterate through the list
again. We just to do kill the thread ourselves.
*/
tmp->awake_no_mutex(KILL_SLAVE_SAME_ID);
mysql_mutex_unlock(&tmp->LOCK_thd_kill);
arg.thd->awake_no_mutex(KILL_SLAVE_SAME_ID);
mysql_mutex_unlock(&arg.thd->LOCK_thd_kill);
}
}

495
sql/sql_show.cc

@ -2734,13 +2734,111 @@ static const char *thread_state_info(THD *tmp)
}
struct list_callback_arg
{
list_callback_arg(const char *u, THD *t, ulong m):
user(u), thd(t), max_query_length(m) {}
I_List<thread_info> thread_infos;
const char *user;
THD *thd;
ulong max_query_length;
};
static my_bool list_callback(THD *tmp, list_callback_arg *arg)
{
Security_context *tmp_sctx= tmp->security_ctx;
bool got_thd_data;
if ((tmp->vio_ok() || tmp->system_thread) &&
(!arg->user || (!tmp->system_thread &&
tmp_sctx->user && !strcmp(tmp_sctx->user, arg->user))))
{
thread_info *thd_info= new (arg->thd->mem_root) thread_info;
thd_info->thread_id=tmp->thread_id;
thd_info->os_thread_id=tmp->os_thread_id;
thd_info->user= arg->thd->strdup(tmp_sctx->user ? tmp_sctx->user :
(tmp->system_thread ?
"system user" : "unauthenticated user"));
if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) &&
arg->thd->security_ctx->host_or_ip[0])
{
if ((thd_info->host= (char*) arg->thd->alloc(LIST_PROCESS_HOST_LEN+1)))
my_snprintf((char *) thd_info->host, LIST_PROCESS_HOST_LEN,
"%s:%u", tmp_sctx->host_or_ip, tmp->peer_port);
}
else
thd_info->host= arg->thd->strdup(tmp_sctx->host_or_ip[0] ?
tmp_sctx->host_or_ip :
tmp_sctx->host ? tmp_sctx->host : "");
thd_info->command=(int) tmp->get_command();
if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data)))
{
/* This is an approximation */
thd_info->proc_info= (char*) (tmp->killed >= KILL_QUERY ?
"Killed" : 0);
/* The following variables are only safe to access under a lock */
thd_info->db= 0;
if (tmp->db.str)
thd_info->db= arg->thd->strmake(tmp->db.str, tmp->db.length);
if (tmp->query())
{
uint length= MY_MIN(arg->max_query_length, tmp->query_length());
char *q= arg->thd->strmake(tmp->query(),length);
/* Safety: in case strmake failed, we set length to 0. */
thd_info->query_string=
CSET_STRING(q, q ? length : 0, tmp->query_charset());
}
/*
Progress report. We need to do this under a lock to ensure that all
is from the same stage.
*/
if (tmp->progress.max_counter)
{
uint max_stage= MY_MAX(tmp->progress.max_stage, 1);
thd_info->progress= (((tmp->progress.stage / (double) max_stage) +
((tmp->progress.counter /
(double) tmp->progress.max_counter) /
(double) max_stage)) *
100.0);
set_if_smaller(thd_info->progress, 100);
}
else
thd_info->progress= 0.0;
}
else
{
thd_info->proc_info= "Busy";
thd_info->progress= 0.0;
thd_info->db= "";
}
thd_info->state_info= thread_state_info(tmp);
thd_info->start_time= tmp->start_utime;
ulonglong utime_after_query_snapshot= tmp->utime_after_query;
if (thd_info->start_time < utime_after_query_snapshot)
thd_info->start_time= utime_after_query_snapshot; // COM_SLEEP
if (got_thd_data)
mysql_mutex_unlock(&tmp->LOCK_thd_data);
arg->thread_infos.append(thd_info);
}
return 0;
}
void mysqld_list_processes(THD *thd,const char *user, bool verbose)
{
Item *field;
List<Item> field_list;
I_List<thread_info> thread_infos;
ulong max_query_length= (verbose ? thd->variables.max_allowed_packet :
PROCESS_LIST_WIDTH);
list_callback_arg arg(user, thd,
verbose ? thd->variables.max_allowed_packet :
PROCESS_LIST_WIDTH);
Protocol *protocol= thd->protocol;
MEM_ROOT *mem_root= thd->mem_root;
DBUG_ENTER("mysqld_list_processes");
@ -2771,7 +2869,7 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
mem_root);
field->maybe_null=1;
field_list.push_back(field=new (mem_root)
Item_empty_string(thd, "Info", max_query_length),
Item_empty_string(thd, "Info", arg.max_query_length),
mem_root);
field->maybe_null=1;
if (!thd->variables.old_mode &&
@ -2790,102 +2888,13 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
if (thd->killed)
DBUG_VOID_RETURN;
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
THD *tmp;
while ((tmp=it++))
{
Security_context *tmp_sctx= tmp->security_ctx;
bool got_thd_data;
if ((tmp->vio_ok() || tmp->system_thread) &&
(!user || (!tmp->system_thread &&
tmp_sctx->user && !strcmp(tmp_sctx->user, user))))
{
thread_info *thd_info= new (thd->mem_root) thread_info;
thd_info->thread_id=tmp->thread_id;
thd_info->os_thread_id=tmp->os_thread_id;
thd_info->user= thd->strdup(tmp_sctx->user ? tmp_sctx->user :
(tmp->system_thread ?
"system user" : "unauthenticated user"));
if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) &&
thd->security_ctx->host_or_ip[0])
{
if ((thd_info->host= (char*) thd->alloc(LIST_PROCESS_HOST_LEN+1)))
my_snprintf((char *) thd_info->host, LIST_PROCESS_HOST_LEN,
"%s:%u", tmp_sctx->host_or_ip, tmp->peer_port);
}
else
thd_info->host= thd->strdup(tmp_sctx->host_or_ip[0] ?
tmp_sctx->host_or_ip :
tmp_sctx->host ? tmp_sctx->host : "");
thd_info->command=(int) tmp->get_command();
if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data)))
{
/* This is an approximation */
thd_info->proc_info= (char*) (tmp->killed >= KILL_QUERY ?
"Killed" : 0);
/*
The following variables are only safe to access under a lock
*/
thd_info->db= 0;
if (tmp->db.str)
thd_info->db= thd->strmake(tmp->db.str, tmp->db.length);
if (tmp->query())
{
uint length= MY_MIN(max_query_length, tmp->query_length());
char *q= thd->strmake(tmp->query(),length);
/* Safety: in case strmake failed, we set length to 0. */
thd_info->query_string=
CSET_STRING(q, q ? length : 0, tmp->query_charset());
}
server_threads.iterate(list_callback, &arg);
/*
Progress report. We need to do this under a lock to ensure that all
is from the same stage.
*/
if (tmp->progress.max_counter)
{
uint max_stage= MY_MAX(tmp->progress.max_stage, 1);
thd_info->progress= (((tmp->progress.stage / (double) max_stage) +
((tmp->progress.counter /
(double) tmp->progress.max_counter) /
(double) max_stage)) *
100.0);
set_if_smaller(thd_info->progress, 100);
}
else
thd_info->progress= 0.0;
}
else
{
thd_info->proc_info= "Busy";
thd_info->progress= 0.0;
thd_info->db= "";
}
thd_info->state_info= thread_state_info(tmp);
thd_info->start_time= tmp->start_utime;
ulonglong utime_after_query_snapshot= tmp->utime_after_query;
if (thd_info->start_time < utime_after_query_snapshot)
thd_info->start_time= utime_after_query_snapshot; // COM_SLEEP
if (got_thd_data)
mysql_mutex_unlock(&tmp->LOCK_thd_data);
thread_infos.append(thd_info);
}
}
mysql_mutex_unlock(&LOCK_thread_count);
thread_info *thd_info;
ulonglong now= microsecond_interval_timer();
char buff[20]; // For progress
String store_buffer(buff, sizeof(buff), system_charset_info);
while ((thd_info=thread_infos.get()))
while (auto thd_info= arg.thread_infos.get())
{
protocol->prepare_for_resend();
protocol->store(thd_info->thread_id);
@ -3169,152 +3178,150 @@ int fill_show_explain(THD *thd, TABLE_LIST *table, COND *cond)
}
int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
struct processlist_callback_arg
{
TABLE *table= tables->table;
CHARSET_INFO *cs= system_charset_info;
char *user;
ulonglong unow= microsecond_interval_timer();
DBUG_ENTER("fill_schema_processlist");
processlist_callback_arg(THD *thd_arg, TABLE *table_arg):
thd(thd_arg), table(table_arg), unow(microsecond_interval_timer()) {}
THD *thd;
TABLE *table;
ulonglong unow;
};
DEBUG_SYNC(thd,"fill_schema_processlist_after_unow");
user= thd->security_ctx->master_access & PROCESS_ACL ?
NullS : thd->security_ctx->priv_user;
static my_bool processlist_callback(THD *tmp, processlist_callback_arg *arg)
{
Security_context *tmp_sctx= tmp->security_ctx;
CHARSET_INFO *cs= system_charset_info;
const char *val;
ulonglong max_counter;
bool got_thd_data;
char *user= arg->thd->security_ctx->master_access & PROCESS_ACL ?
NullS : arg->thd->security_ctx->priv_user;
if ((!tmp->vio_ok() && !tmp->system_thread) ||
(user && (tmp->system_thread || !tmp_sctx->user ||
strcmp(tmp_sctx->user, user))))
return 0;
mysql_mutex_lock(&LOCK_thread_count);
restore_record(arg->table, s->default_values);
/* ID */
arg->table->field[0]->store((longlong) tmp->thread_id, TRUE);
/* USER */
val= tmp_sctx->user ? tmp_sctx->user :
(tmp->system_thread ? "system user" : "unauthenticated user");
arg->table->field[1]->store(val, strlen(val), cs);
/* HOST */
if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) &&
arg->thd->security_ctx->host_or_ip[0])
{
char host[LIST_PROCESS_HOST_LEN + 1];
my_snprintf(host, LIST_PROCESS_HOST_LEN, "%s:%u",
tmp_sctx->host_or_ip, tmp->peer_port);
arg->table->field[2]->store(host, strlen(host), cs);
}
else
arg->table->field[2]->store(tmp_sctx->host_or_ip,
strlen(tmp_sctx->host_or_ip), cs);
if (!thd->killed)
if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data)))
{
I_List_iterator<THD> it(threads);
THD* tmp;
while ((tmp= it++))
/* DB */
if (tmp->db.str)
{
Security_context *tmp_sctx= tmp->security_ctx;
const char *val;
ulonglong max_counter;
bool got_thd_data;
if ((!tmp->vio_ok() && !tmp->system_thread) ||
(user && (tmp->system_thread || !tmp_sctx->user ||
strcmp(tmp_sctx->user, user))))
continue;
restore_record(table, s->default_values);
/* ID */
table->field[0]->store((longlong) tmp->thread_id, TRUE);
/* USER */
val= tmp_sctx->user ? tmp_sctx->user :
(tmp->system_thread ? "system user" : "unauthenticated user");
table->field[1]->store(val, strlen(val), cs);
/* HOST */
if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) &&
thd->security_ctx->host_or_ip[0])
{
char host[LIST_PROCESS_HOST_LEN + 1];
my_snprintf(host, LIST_PROCESS_HOST_LEN, "%s:%u",
tmp_sctx->host_or_ip, tmp->peer_port);
table->field[2]->store(host, strlen(host), cs);
}
else
table->field[2]->store(tmp_sctx->host_or_ip,
strlen(tmp_sctx->host_or_ip), cs);
arg->table->field[3]->store(tmp->db.str, tmp->db.length, cs);
arg->table->field[3]->set_notnull();
}
}
if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data)))
{
/* DB */
if (tmp->db.str)
{
table->field[3]->store(tmp->db.str, tmp->db.length, cs);
table->field[3]->set_notnull();
}
}
/* COMMAND */
if ((val= (char *) (!got_thd_data ? "Busy" :
(tmp->killed >= KILL_QUERY ?
"Killed" : 0))))
arg->table->field[4]->store(val, strlen(val), cs);
else
arg->table->field[4]->store(command_name[tmp->get_command()].str,
command_name[tmp->get_command()].length, cs);
/* COMMAND */
if ((val= (char *) (!got_thd_data ? "Busy" :
(tmp->killed >= KILL_QUERY ?
"Killed" : 0))))
table->field[4]->store(val, strlen(val), cs);
else
table->field[4]->store(command_name[tmp->get_command()].str,
command_name[tmp->get_command()].length, cs);
/* MYSQL_TIME */
ulonglong utime= tmp->start_utime;
ulonglong utime_after_query_snapshot= tmp->utime_after_query;
if (utime < utime_after_query_snapshot)
utime= utime_after_query_snapshot; // COM_SLEEP
utime= utime && utime < arg->unow ? arg->unow - utime : 0;
/* MYSQL_TIME */
ulonglong utime= tmp->start_utime;
ulonglong utime_after_query_snapshot= tmp->utime_after_query;
if (utime < utime_after_query_snapshot)
utime= utime_after_query_snapshot; // COM_SLEEP
utime= utime && utime < unow ? unow - utime : 0;
arg->table->field[5]->store(utime / HRTIME_RESOLUTION, TRUE);
table->field[5]->store(utime / HRTIME_RESOLUTION, TRUE);
if (got_thd_data)
{
if (tmp->query())
{
arg->table->field[7]->store(tmp->query(),
MY_MIN(PROCESS_LIST_INFO_WIDTH,
tmp->query_length()), cs);
arg->table->field[7]->set_notnull();
if (got_thd_data)
{
if (tmp->query())
{
table->field[7]->store(tmp->query(),
MY_MIN(PROCESS_LIST_INFO_WIDTH,
tmp->query_length()), cs);
table->field[7]->set_notnull();
/* INFO_BINARY */
arg->table->field[16]->store(tmp->query(),
MY_MIN(PROCESS_LIST_INFO_WIDTH,
tmp->query_length()),
&my_charset_bin);
arg->table->field[16]->set_notnull();
}
/* INFO_BINARY */
table->field[16]->store(tmp->query(),
MY_MIN(PROCESS_LIST_INFO_WIDTH,
tmp->query_length()),
&my_charset_bin);
table->field[16]->set_notnull();
}
/*
Progress report. We need to do this under a lock to ensure that all
is from the same stage.
*/
if ((max_counter= tmp->progress.max_counter))
{
arg->table->field[9]->store((longlong) tmp->progress.stage + 1, 1);
arg->table->field[10]->store((longlong) tmp->progress.max_stage, 1);
arg->table->field[11]->store((double) tmp->progress.counter /
(double) max_counter*100.0);
}
mysql_mutex_unlock(&tmp->LOCK_thd_data);
}
/*
Progress report. We need to do this under a lock to ensure that all
is from the same stage.
*/
if ((max_counter= tmp->progress.max_counter))
{
table->field[9]->store((longlong) tmp->progress.stage + 1, 1);
table->field[10]->store((longlong) tmp->progress.max_stage, 1);
table->field[11]->store((double) tmp->progress.counter /
(double) max_counter*100.0);
}
mysql_mutex_unlock(&tmp->LOCK_thd_data);
}
/* STATE */
if ((val= thread_state_info(tmp)))
{
arg->table->field[6]->store(val, strlen(val), cs);
arg->table->field[6]->set_notnull();
}
/* STATE */
if ((val= thread_state_info(tmp)))
{
table->field[6]->store(val, strlen(val), cs);
table->field[6]->set_notnull();
}
/* TIME_MS */
arg->table->field[8]->store((double)(utime / (HRTIME_RESOLUTION / 1000.0)));
/* TIME_MS */
table->field[8]->store((double)(utime / (HRTIME_RESOLUTION / 1000.0)));
/*
This may become negative if we free a memory allocated by another
thread in this thread. However it's better that we notice it eventually
than hide it.
*/
arg->table->field[12]->store((longlong) tmp->status_var.local_memory_used,
FALSE);
arg->table->field[13]->store((longlong) tmp->status_var.max_local_memory_used,
FALSE);
arg->table->field[14]->store((longlong) tmp->get_examined_row_count(), TRUE);
/*
This may become negative if we free a memory allocated by another
thread in this thread. However it's better that we notice it eventually
than hide it.
*/
table->field[12]->store((longlong) tmp->status_var.local_memory_used,
FALSE);
table->field[13]->store((longlong) tmp->status_var.max_local_memory_used,
FALSE);
table->field[14]->store((longlong) tmp->get_examined_row_count(), TRUE);
/* QUERY_ID */
arg->table->field[15]->store(tmp->query_id, TRUE);
/* QUERY_ID */
table->field[15]->store(tmp->query_id, TRUE);
arg->table->field[17]->store(tmp->os_thread_id);
table->field[17]->store(tmp->os_thread_id);
if (schema_table_store_record(arg->thd, arg->table))
return 1;
return 0;
}
if (schema_table_store_record(thd, table))
{
mysql_mutex_unlock(&LOCK_thread_count);
DBUG_RETURN(1);
}
}
}
mysql_mutex_unlock(&LOCK_thread_count);
int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
{
processlist_callback_arg arg(thd, tables->table);
DBUG_ENTER("fill_schema_processlist");
DEBUG_SYNC(thd,"fill_schema_processlist_after_unow");
if (!thd->killed &&
server_threads.iterate(processlist_callback, &arg))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
@ -3770,36 +3777,38 @@ end:
Return number of threads used
*/
uint calc_sum_of_all_status(STATUS_VAR *to)
struct calc_sum_callback_arg
{
uint count= 0;
DBUG_ENTER("calc_sum_of_all_status");
calc_sum_callback_arg(STATUS_VAR *to_arg): to(to_arg), count(0) {}
STATUS_VAR *to;
uint count;
};
/* Ensure that thread id not killed during loop */
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
THD *tmp;
static my_bool calc_sum_callback(THD *thd, calc_sum_callback_arg *arg)
{
arg->count++;
if (!thd->status_in_global)
{
add_to_status(arg->to, &thd->status_var);
arg->to->local_memory_used+= thd->status_var.local_memory_used;
}
if (thd->get_command() != COM_SLEEP)
arg->to->threads_running++;
return 0;
}
uint calc_sum_of_all_status(STATUS_VAR *to)
{
calc_sum_callback_arg arg(to);
DBUG_ENTER("calc_sum_of_all_status");
/* Get global values as base */
*to= global_status_var;
to->local_memory_used= 0;
/* Add to this status from existing threads */
while ((tmp= it++))
{
count++;
if (!tmp->status_in_global)
{
add_to_status(to, &tmp->status_var);
to->local_memory_used+= tmp->status_var.local_memory_used;
}
if (tmp->get_command() != COM_SLEEP)
to->threads_running++;
}
mysql_mutex_unlock(&LOCK_thread_count);
DBUG_RETURN(count);
server_threads.iterate(calc_sum_callback, &arg);
DBUG_RETURN(arg.count);
}

2
sql/threadpool_common.cc

@ -243,7 +243,7 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
return NULL;
}
delete connect;
add_to_active_threads(thd);
server_threads.insert(thd);
thd->set_mysys_var(mysys_var);
thd->event_scheduler.data= scheduler_data;

50
sql/threadpool_generic.cc

@ -578,43 +578,24 @@ static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
Also, recalculate time when next timeout check should run.
*/
static void timeout_check(pool_timer_t *timer)
static my_bool timeout_check(THD *thd, pool_timer_t *timer)
{
DBUG_ENTER("timeout_check");
mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
/* Reset next timeout check, it will be recalculated in the loop below */
my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
THD *thd;
while ((thd=it++))
if (thd->net.reading_or_writing == 1)
{
if (thd->net.reading_or_writing != 1)
continue;
TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data;
if (!connection)
{
/*
Connection does not have scheduler data. This happens for example
if THD belongs to a different scheduler, that is listening to extra_port.
*/
continue;
}
if(connection->abs_wait_timeout < timer->current_microtime)
{
tp_timeout_handler(connection);
}
else
/*
Check if connection does not have scheduler data. This happens for example
if THD belongs to a different scheduler, that is listening to extra_port.
*/
if (auto connection= (TP_connection_generic *) thd->event_scheduler.data)
{
set_next_timeout_check(connection->abs_wait_timeout);
if (connection->abs_wait_timeout < timer->current_microtime)
tp_timeout_handler(connection);
else
set_next_timeout_check(connection->abs_wait_timeout);
}
}
mysql_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN;
DBUG_RETURN(0);
}
@ -671,7 +652,12 @@ static void* timer_thread(void *param)
/* Check if any client exceeded wait_timeout */
if (timer->next_timeout_check <= timer->current_microtime)
timeout_check(timer);
{
/* Reset next timeout check, it will be recalculated below */
my_atomic_fas64((volatile int64*) &timer->next_timeout_check,
ULONGLONG_MAX);
server_threads.iterate(timeout_check, timer);
}
}
mysql_mutex_unlock(&timer->mutex);
}

165
sql/wsrep_mysqld.cc

@ -2204,22 +2204,16 @@ static inline bool is_committing_connection(THD *thd)
return ret;
}
static bool have_client_connections()
static my_bool have_client_connections(THD *thd, void*)
{
THD *tmp;
I_List_iterator<THD> it(threads);
while ((tmp=it++))
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) thd->thread_id));
if (is_client_connection(thd) && thd->killed == KILL_CONNECTION)
{
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
{
(void)abort_replicated(tmp);
return true;
}
(void)abort_replicated(thd);
return 1;
}
return false;
return 0;
}
static void wsrep_close_thread(THD *thd)
@ -2240,89 +2234,72 @@ static void wsrep_close_thread(THD *thd)
}
}
static my_bool have_committing_connections()
static my_bool have_committing_connections(THD *thd, void *)
{
THD *tmp;
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
if (!is_client_connection(tmp))
continue;
if (is_committing_connection(tmp))
{
mysql_mutex_unlock(&LOCK_thread_count);
return TRUE;
}
}
mysql_mutex_unlock(&LOCK_thread_count);
return FALSE;
return is_client_connection(thd) && is_committing_connection(thd) ? 1 : 0;
}
int wsrep_wait_committing_connections_close(int wait_time)
{
int sleep_time= 100;
while (have_committing_connections() && wait_time > 0)
while (server_threads.iterate(have_committing_connections) && wait_time > 0)
{
WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
my_sleep(sleep_time);
wait_time -= sleep_time;
}
if (have_committing_connections())
return server_threads.iterate(have_committing_connections);
}
static my_bool kill_all_threads(THD *thd, THD *caller_thd)
{
DBUG_PRINT("quit", ("Informing thread %lld that it's time to die",
(longlong) thd->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
if (is_client_connection(thd) && thd != caller_thd)
{
return 1;
if (is_replaying_connection(thd))
thd->set_killed(KILL_CONNECTION);
else if (!abort_replicated(thd))
{
/* replicated transactions must be skipped */
WSREP_DEBUG("closing connection %lld", (longlong) thd->thread_id);
/* instead of wsrep_close_thread() we do now soft kill by THD::awake */
thd->awake(KILL_CONNECTION);
}
}
return 0;
}
static my_bool kill_remaining_threads(THD *thd, THD *caller_thd)
{
#ifndef __bsdi__ // Bug in BSDI kernel
if (is_client_connection(thd) &&
!abort_replicated(thd) &&
!is_replaying_connection(thd) &&
thd != caller_thd)
{
WSREP_INFO("killing local connection: %lld", (longlong) thd->thread_id);
close_connection(thd, 0);
}
#endif
return 0;
}
void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
{
/*
First signal all threads that it's time to die
*/
THD *tmp;
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
bool kill_cached_threads_saved= kill_cached_threads;
kill_cached_threads= true; // prevent future threads caching
mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
if (!is_client_connection(tmp))
continue;
if (tmp == except_caller_thd)
{
DBUG_ASSERT(is_client_connection(tmp));
continue;
}
if (is_replaying_connection(tmp))
{
tmp->set_killed(KILL_CONNECTION);
continue;
}
/* replicated transactions must be skipped */
if (abort_replicated(tmp))
continue;
WSREP_DEBUG("closing connection %lld", (longlong) tmp->thread_id);
/*
instead of wsrep_close_thread() we do now soft kill by THD::awake
*/
tmp->awake(KILL_CONNECTION);
}
server_threads.iterate(kill_all_threads, except_caller_thd);
mysql_mutex_unlock(&LOCK_thread_count);
if (thread_count)
@ -2332,26 +2309,12 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
/*
Force remaining threads to die by closing the connection to the client
*/
I_List_iterator<THD> it2(threads);
while ((tmp=it2++))
{
#ifndef __bsdi__ // Bug in BSDI kernel
if (is_client_connection(tmp) &&
!abort_replicated(tmp) &&
!is_replaying_connection(tmp) &&
tmp != except_caller_thd)
{
WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id);
close_connection(tmp,0);
}
#endif
}
server_threads.iterate(kill_remaining_threads, except_caller_thd);
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
while (wait_to_end && have_client_connections())
while (wait_to_end && server_threads.iterate(have_client_connections))
{
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
@ -2371,25 +2334,22 @@ void wsrep_close_applier(THD *thd)
wsrep_close_thread(thd);
}
void wsrep_close_threads(THD *thd)
static my_bool wsrep_close_threads_callback(THD *thd, THD *caller_thd)
{
THD *tmp;
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) thd->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
if (thd->wsrep_applier && thd != caller_thd)
{
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
if (tmp->wsrep_applier && tmp != thd)
{
WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
wsrep_close_thread (tmp);
}
WSREP_DEBUG("closing wsrep thread %lld", (longlong) thd->thread_id);
wsrep_close_thread(thd);
}
return 0;
}
mysql_mutex_unlock(&LOCK_thread_count);
void wsrep_close_threads(THD *thd)
{
server_threads.iterate(wsrep_close_threads_callback, thd);
}
void wsrep_wait_appliers_close(THD *thd)
@ -2730,7 +2690,7 @@ void* start_wsrep_THD(void *arg)
goto error;
}
mysql_mutex_lock(&LOCK_thread_count);
statistic_increment(thread_created, &LOCK_status);
if (wsrep_gtid_mode)
{
@ -2739,14 +2699,13 @@ void* start_wsrep_THD(void *arg)
}
thd->real_id=pthread_self(); // Keep purify happy
thread_created++;
threads.append(thd);
my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
(void) mysql_mutex_unlock(&LOCK_thread_count);
server_threads.insert(thd);
/* from bootstrap()... */
thd->bootstrap=1;
@ -2842,7 +2801,7 @@ void* start_wsrep_THD(void *arg)
*/
}
unlink_not_visible_thd(thd);
server_threads.erase(thd);
delete thd;
my_thread_end();
return(NULL);

Loading…
Cancel
Save