diff --git a/sql/my_apc.cc b/sql/my_apc.cc index 932408b0215..06e4fdac9a3 100644 --- a/sql/my_apc.cc +++ b/sql/my_apc.cc @@ -235,10 +235,10 @@ bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call, This should be called periodically by the APC target thread. */ -void Apc_target::process_apc_requests(bool lock) +void Apc_target::process_apc_requests() { - if (lock) - mysql_mutex_lock(LOCK_thd_kill_ptr); + process_epoch++; + mysql_mutex_lock(LOCK_thd_kill_ptr); while (Call_request *request= get_first_in_queue()) { @@ -274,8 +274,8 @@ void Apc_target::process_apc_requests(bool lock) #endif } /* No requests in the queue */ - if (lock) - mysql_mutex_unlock(LOCK_thd_kill_ptr); + process_epoch++; + mysql_mutex_unlock(LOCK_thd_kill_ptr); } Apc_target::Call_request::Call_request() diff --git a/sql/my_apc.h b/sql/my_apc.h index 7135840e87b..673c42105e8 100644 --- a/sql/my_apc.h +++ b/sql/my_apc.h @@ -35,6 +35,8 @@ requestor. */ +#include + class THD; /* @@ -73,7 +75,7 @@ public: process_apc_requests(); } - void process_apc_requests(bool lock=true); + void process_apc_requests(); /* A lightweight function, intended to be used in frequent checks like this: @@ -108,6 +110,9 @@ public: #ifndef DBUG_OFF int n_calls_processed; /* Number of calls served by this target */ #endif + // Epoch counter that increases before the command + std::atomic epoch {0}; + std::atomic process_epoch {0}; private: /* diff --git a/sql/threadpool.h b/sql/threadpool.h index 7737d056b4a..bc64b8ba278 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -112,6 +112,15 @@ struct TP_connection /* Read for the next client command (async) with specified timeout */ virtual int start_io() = 0; + /** + Cancels async waiting on the next command. + On windows, enqueues the connection task immediately. + Removes the connection from the poll on other platforms + @return 0, if waiting was cancelled + 1, if waiting was not cancelled + -1. if error + */ + virtual int cancel_io() = 0; virtual void wait_begin(int type)= 0; virtual void wait_end() = 0; @@ -134,6 +143,7 @@ struct TP_pool virtual int get_thread_count() { return tp_stats.num_worker_threads; } virtual int get_idle_thread_count(){ return 0; } virtual void resume(TP_connection* c)=0; + virtual int wake(TP_connection *)=0; }; #ifdef _WIN32 @@ -148,6 +158,7 @@ struct TP_pool_win:TP_pool virtual int set_max_threads(uint); virtual int set_min_threads(uint); void resume(TP_connection *c); + int wake(TP_connection *) final; }; #endif @@ -162,6 +173,7 @@ struct TP_pool_generic :TP_pool virtual int set_stall_limit(uint); virtual int get_idle_thread_count(); void resume(TP_connection* c); + int wake(TP_connection *) final; }; #endif /* HAVE_POOL_OF_THREADS */ diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index f72f46b3a6b..19c3b512623 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -25,6 +25,8 @@ #include #include #include +#include "threadpool_generic.h" +#include #ifdef WITH_WSREP #include "wsrep_trans_observer.h" @@ -56,7 +58,6 @@ static void threadpool_remove_connection(THD *thd); static dispatch_command_return threadpool_process_request(THD *thd); static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c); -extern bool do_command(THD*); static inline TP_connection *get_TP_connection(THD *thd) { @@ -173,14 +174,9 @@ static TP_PRIORITY get_priority(TP_connection *c) return prio; } - -void tp_callback(TP_connection *c) +static bool tp_callback_run(TP_connection *c) { DBUG_ASSERT(c); - - Worker_thread_context worker_context; - worker_context.save(); - THD *thd= c->thd; c->state = TP_STATE_RUNNING; @@ -193,12 +189,14 @@ void tp_callback(TP_connection *c) if (!thd) { /* Bail out on connect error.*/ - goto error; + return false; } c->connect= 0; + thd->apc_target.epoch.fetch_add(1, std::memory_order_release); } else { + thd->apc_target.epoch.fetch_add(1, std::memory_order_release); retry: switch(threadpool_process_request(thd)) { @@ -212,11 +210,15 @@ retry: thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED; goto retry; } - worker_context.restore(); - return; + return true; case DISPATCH_COMMAND_CLOSE_CONNECTION: - /* QUIT or an error occurred. */ - goto error; + /* + QUIT or an error occurred. + + We can skip epoch increase here: process_apc_requests will be called + one last time after thd is unlinked. + */ + return false; case DISPATCH_COMMAND_SUCCESS: break; } @@ -229,19 +231,33 @@ retry: /* Read next command from client. */ c->set_io_timeout(thd->get_net_wait_timeout()); c->state= TP_STATE_IDLE; - if (c->start_io()) - goto error; - worker_context.restore(); - return; + thd->apc_target.epoch.fetch_add(1, std::memory_order_acquire); + if (unlikely(thd->apc_target.have_apc_requests())) + thd->apc_target.process_apc_requests(); + + int error= c->start_io(); + + return error == 0; +} -error: - c->thd= 0; - if (thd) +void tp_callback(TP_connection *c) +{ + Worker_thread_context worker_context; + worker_context.save(); + + bool success= tp_callback_run(c); + + if (!success) { - threadpool_remove_connection(thd); + THD *thd= c->thd; + c->thd= 0; + if (thd) + { + threadpool_remove_connection(thd); + } + delete c; } - delete c; worker_context.restore(); } @@ -320,6 +336,10 @@ static void threadpool_remove_connection(THD *thd) end_connection(thd); close_connection(thd, 0); unlink_thd(thd); + // The rest of APC requests should be processed after unlinking. + // This guarantees that no new APC requests can be added. + // TODO: better notify the requestor with some KILLED state here. + thd->apc_target.process_apc_requests(); PSI_CALL_delete_current_thread(); // before THD is destroyed delete thd; @@ -367,7 +387,14 @@ static dispatch_command_return threadpool_process_request(THD *thd) thread_attach(thd); if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED) + { + if (unlikely(thd->async_state.m_command == COM_SLEEP)) + return DISPATCH_COMMAND_SUCCESS; // Special case for thread pool APC + goto resume; + } + + thd->apc_target.process_apc_requests(); if (thd->killed >= KILL_CONNECTION) { @@ -573,10 +600,71 @@ static void tp_resume(THD* thd) { DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED); thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED; + thd->apc_target.epoch.fetch_add(1, std::memory_order_acquire); TP_connection* c = get_TP_connection(thd); pool->resume(c); } +static bool tp_notify_apc(THD *thd) +{ + mysql_mutex_assert_owner(&thd->LOCK_thd_kill); + TP_connection* c= get_TP_connection(thd); + longlong process_epoch= thd->apc_target.process_epoch; + longlong first_epoch= thd->apc_target.epoch; + while (1) + { + longlong epoch= thd->apc_target.epoch; + if (epoch & 1 || epoch != first_epoch) + { + // We are in the safe zone, where we can guarantee a processing. + break; + } + else + { + + /* + Continue to the retry-loop. + We are either before APC request check, or after the check and before + run, or the run is skipped. + */ + + process_epoch= thd->apc_target.process_epoch; + if (process_epoch & 1) + { + // We are hanging on LOCK_thd_kill in process_apc_requests, or going to. + break; + } + else + { + // We are either before apc requests checked or processed, or after APC + // processed. We can't distinguish these states, so just flush the pool + // and then retry if failed. + int status= pool->wake(c); + if (likely(status == 0)) + { + break; + } + else if (unlikely(status < 0)) + { + return false; + } + else + { + /* + If the run is skipped and somebody else took connection out of the + poll, then we will wait until the epoch change, therefore, will wait + until the task will reach the worker by the queue, which can be + long. + So longer sleep here (1ms). + */ + my_sleep(1000); + } + } + } + } + return true; +} + static scheduler_functions tp_scheduler_functions= { 0, // max_threads @@ -588,7 +676,8 @@ static scheduler_functions tp_scheduler_functions= tp_wait_end, // thd_wait_end tp_post_kill_notification, // post kill notification tp_end, // end - tp_resume + tp_resume, + tp_notify_apc, }; void pool_of_threads_scheduler(struct scheduler_functions *func, diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index eb08441a4d5..17777e89ade 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -1337,6 +1337,24 @@ void TP_pool_generic::resume(TP_connection* c) add(c); } +int TP_pool_generic::wake(TP_connection *c) +{ + int status= c->cancel_io(); + if (status == 0) + { + THD *thd= c->thd; + /* Set custom async_state to handle later in threadpool_process_request(). + This will avoid possible side effects of dry-running do_command() */ + DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::NONE); + DBUG_ASSERT(thd->async_state.m_command == COM_SLEEP); + thd->async_state.m_state= thd_async_state::enum_async_state::RESUMED; + + /* Add c to the task queue */ + resume(c); + } + return status; +} + /** MySQL scheduler callback: wait begin */ @@ -1497,7 +1515,7 @@ int TP_connection_generic::start_io() So we recalculate in which group the connection should be, based on thread_id and current group count, and migrate if necessary. */ - if (fix_group) + if (unlikely(fix_group)) { fix_group = false; thread_group_t *new_group= &all_groups[get_group_id(thd->thread_id)]; @@ -1512,7 +1530,7 @@ int TP_connection_generic::start_io() /* Bind to poll descriptor if not yet done. */ - if (!bound_to_poll_descriptor) + if (unlikely(!bound_to_poll_descriptor)) { bound_to_poll_descriptor= true; return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); @@ -1522,6 +1540,23 @@ int TP_connection_generic::start_io() } +int TP_connection_generic::cancel_io() +{ + int ret = io_poll_disassociate_fd(thread_group->pollfd,fd); + if (ret == 0) + { + // We have successfully disassociated fd. bound_to_poll_descriptor now will + // not be accessed from another threads. bound_to_poll_descriptor changes + // together with io_poll_associate_fd/io_poll_disassociate_fd pair + DBUG_ASSERT(bound_to_poll_descriptor); + bound_to_poll_descriptor= false; + return 0; + } + + // Hopefully, all POSIX implementations return ENOENT for the case in question + return errno == ENOENT ? 1 : -1; +} + /** Worker thread's main diff --git a/sql/threadpool_generic.h b/sql/threadpool_generic.h index b7a35b7cbf0..833177642f2 100644 --- a/sql/threadpool_generic.h +++ b/sql/threadpool_generic.h @@ -79,6 +79,7 @@ struct TP_connection_generic :public TP_connection int init() override { return 0; } void set_io_timeout(int sec) override; int start_io() override; + int cancel_io() final; void wait_begin(int type) override; void wait_end() override; @@ -88,6 +89,10 @@ struct TP_connection_generic :public TP_connection ulonglong abs_wait_timeout; ulonglong enqueue_time; TP_file_handle fd; + /** + Designates whether fd is currently connected to the poll denoted by + thread_group->pollfd. See also change_group. + */ bool bound_to_poll_descriptor; int waiting; bool fix_group; diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index ed68e31c755..eeed9b59281 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -90,6 +90,7 @@ public: void set_io_timeout(int sec) override; void wait_begin(int type) override; void wait_end() override; + int cancel_io() final; ulonglong timeout=ULLONG_MAX; OVERLAPPED overlapped{}; @@ -131,6 +132,11 @@ void TP_pool_win::resume(TP_connection* c) SubmitThreadpoolWork(((TP_connection_win*)c)->work); } +int TP_pool_win::wake(TP_connection *c) +{ + return c->cancel_io(); +} + #define CHECK_ALLOC_ERROR(op) \ do \ { \ @@ -177,6 +183,13 @@ int TP_connection_win::start_io() return 0; } +int TP_connection_win::cancel_io() +{ + if (CancelIoEx(sock.m_handle, &sock.m_overlapped)) + return 0; + return GetLastError() == ERROR_NOT_FOUND ? 1 : -1; +} + /* Recalculate wait timeout, maybe reset timer. */ @@ -286,10 +299,25 @@ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io) { TP_connection_win *c= (TP_connection_win *)context; + THD *thd= c->thd; /* How many bytes were preread into read buffer */ c->sock.end_read((ULONG)nbytes, io_result); + if (io_result == ERROR_OPERATION_ABORTED) + { + mysql_mutex_lock(&thd->LOCK_thd_kill); + if (!thd->is_killed()) + { + /* Set custom async_state to handle later in threadpool_process_request(). + This will avoid possible side effects of dry-running do_command() */ + DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::NONE); + DBUG_ASSERT(thd->async_state.m_command == COM_SLEEP); + thd->async_state.m_state= thd_async_state::enum_async_state::RESUMED; + } + mysql_mutex_unlock(&thd->LOCK_thd_kill); + } + /* Execute high priority connections immediately. 'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)