From fb922c2c80cddcff4d28d0fb4b8f70eedeaa9db2 Mon Sep 17 00:00:00 2001 From: Nikita Malyavin Date: Thu, 17 Feb 2022 23:28:28 +0300 Subject: [PATCH] Split Apc_target::make_apc_call into two functions. In pfs_variable.cc we will have to make an additional step between enqueue_request and wait_for_completion. These two functions will be called directly and therefore both should have a public interface --- sql/my_apc.cc | 149 ++++++++++++++++------------- sql/my_apc.h | 19 ++-- storage/perfschema/pfs_variable.cc | 4 +- 3 files changed, 94 insertions(+), 78 deletions(-) diff --git a/sql/my_apc.cc b/sql/my_apc.cc index 9777deb399a..eb7e47e294c 100644 --- a/sql/my_apc.cc +++ b/sql/my_apc.cc @@ -100,13 +100,75 @@ void init_show_explain_psi_keys(void) if (PSI_server == NULL) return; - PSI_server->register_cond("sql", show_explain_psi_conds, + PSI_server->register_cond("sql", show_explain_psi_conds, array_elements(show_explain_psi_conds)); } #endif +/** + Wait gracefully until the request is completed. + @retval 0 -- Success + @retval 1 -- Timeout + */ +int Apc_target::wait_for_completion(THD *caller_thd, Call_request *apc_request, + int timeout_sec) +{ + struct timespec abstime; + const int timeout= timeout_sec; + set_timespec(abstime, timeout); + int res = 1; + int wait_res= 0; + PSI_stage_info old_stage; + caller_thd->ENTER_COND(&apc_request->COND_request, LOCK_thd_kill_ptr, + &stage_show_explain, &old_stage); + /* todo: how about processing other errors here? */ + while (!apc_request->processed && (wait_res != ETIMEDOUT)) + { + /* We own LOCK_thd_kill_ptr */ + wait_res= mysql_cond_timedwait(&apc_request->COND_request, + LOCK_thd_kill_ptr, &abstime); + if (caller_thd->killed) + break; + } -/* + if (!apc_request->processed) + { + /* + The wait has timed out, or this thread was KILLed. + Remove the request from the queue (ok to do because we own + LOCK_thd_kill_ptr) + */ + apc_request->processed= TRUE; + dequeue_request(apc_request); + res= 1; + } + else + { + /* Request was successfully executed and dequeued by the target thread */ + res= 0; + } + /* + exit_cond() will call mysql_mutex_unlock(LOCK_thd_kill_ptr) for us: + */ + caller_thd->EXIT_COND(&old_stage); + + /* Destroy all APC request data */ + mysql_cond_destroy(&apc_request->COND_request); + return res; +} + +/** Create and post the request */ +void Apc_target::enqueue_request(Call_request *request_buff, Apc_call *call) +{ + request_buff->call= call; + request_buff->processed= FALSE; + mysql_cond_init(key_show_explain_request_COND, &request_buff->COND_request, + NULL); + enqueue_request(request_buff); + request_buff->what="enqueued by make_apc_call"; +} + +/** Make an APC (Async Procedure Call) to another thread. @detail @@ -123,7 +185,7 @@ void init_show_explain_psi_keys(void) timeout occurred) */ -bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call, +bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call, int timeout_sec, bool *timed_out) { bool res= TRUE; @@ -131,58 +193,10 @@ bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call, if (enabled) { - /* Create and post the request */ Call_request apc_request; - apc_request.call= call; - apc_request.processed= FALSE; - mysql_cond_init(key_show_explain_request_COND, &apc_request.COND_request, - NULL); - enqueue_request(&apc_request); - apc_request.what="enqueued by make_apc_call"; - - struct timespec abstime; - const int timeout= timeout_sec; - set_timespec(abstime, timeout); - - int wait_res= 0; - PSI_stage_info old_stage; - caller_thd->ENTER_COND(&apc_request.COND_request, LOCK_thd_kill_ptr, - &stage_show_explain, &old_stage); - /* todo: how about processing other errors here? */ - while (!apc_request.processed && (wait_res != ETIMEDOUT)) - { - /* We own LOCK_thd_kill_ptr */ - wait_res= mysql_cond_timedwait(&apc_request.COND_request, - LOCK_thd_kill_ptr, &abstime); - // &apc_request.LOCK_request, &abstime); - if (caller_thd->killed) - break; - } - - if (!apc_request.processed) - { - /* - The wait has timed out, or this thread was KILLed. - Remove the request from the queue (ok to do because we own - LOCK_thd_kill_ptr) - */ - apc_request.processed= TRUE; - dequeue_request(&apc_request); - *timed_out= TRUE; - res= TRUE; - } - else - { - /* Request was successfully executed and dequeued by the target thread */ - res= FALSE; - } - /* - exit_cond() will call mysql_mutex_unlock(LOCK_thd_kill_ptr) for us: - */ - caller_thd->EXIT_COND(&old_stage); - - /* Destroy all APC request data */ - mysql_cond_destroy(&apc_request.COND_request); + enqueue_request(&apc_request, call); + res= wait_for_completion(caller_thd, &apc_request, timeout_sec); + *timed_out= res; } else { @@ -201,22 +215,15 @@ bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call, This should be called periodically by the APC target thread. */ -void Apc_target::process_apc_requests() +void Apc_target::process_apc_requests(bool lock) { - while (1) - { - Call_request *request; - + if (lock) mysql_mutex_lock(LOCK_thd_kill_ptr); - if (!(request= get_first_in_queue())) - { - /* No requests in the queue */ - mysql_mutex_unlock(LOCK_thd_kill_ptr); - break; - } - /* - Remove the request from the queue (we're holding queue lock so we can be + while (Call_request *request= get_first_in_queue()) + { + /* + Remove the request from the queue (we're holding queue lock so we can be sure that request owner won't try to remove it) */ request->what="dequeued by process_apc_requests"; @@ -224,13 +231,17 @@ void Apc_target::process_apc_requests() request->processed= TRUE; request->call->call_in_target_thread(); + mysql_cond_signal(&request->COND_request); + mysql_mutex_unlock(LOCK_thd_kill_ptr); + mysql_mutex_lock(LOCK_thd_kill_ptr); request->what="func called by process_apc_requests"; #ifndef DBUG_OFF n_calls_processed++; #endif - mysql_cond_signal(&request->COND_request); - mysql_mutex_unlock(LOCK_thd_kill_ptr); } + /* No requests in the queue */ + if (lock) + mysql_mutex_unlock(LOCK_thd_kill_ptr); } diff --git a/sql/my_apc.h b/sql/my_apc.h index cc98e36bbe4..faca081f80a 100644 --- a/sql/my_apc.h +++ b/sql/my_apc.h @@ -73,7 +73,8 @@ public: process_apc_requests(); } - void process_apc_requests(); + void process_apc_requests(bool lock=true); + void process_apc_requests_lock_owned(); /* A lightweight function, intended to be used in frequent checks like this: @@ -94,15 +95,20 @@ public: virtual void call_in_target_thread()= 0; virtual ~Apc_call() {} }; - + + class Call_request; /* Make a call in the target thread (see function definition for details) */ - bool make_apc_call(THD *caller_thd, Apc_call *call, int timeout_sec, bool *timed_out); + bool make_apc_call(THD *caller_thd, Apc_call *call, + int timeout_sec, bool *timed_out); + + void enqueue_request(Call_request *request_buff, Apc_call *call); + int wait_for_completion(THD *caller_thd, Call_request *request, + int timeout_sec); #ifndef DBUG_OFF int n_calls_processed; /* Number of calls served by this target */ #endif private: - class Call_request; /* Non-zero value means we're enabled. It's an int, not bool, because one can @@ -122,7 +128,8 @@ private: operation. */ Call_request *apc_calls; - + +public: class Call_request { public: @@ -140,7 +147,7 @@ private: const char *what; /* (debug) state of the request */ }; - +private: void enqueue_request(Call_request *qe); void dequeue_request(Call_request *qe); diff --git a/storage/perfschema/pfs_variable.cc b/storage/perfschema/pfs_variable.cc index 6299701813b..bb856eb305d 100644 --- a/storage/perfschema/pfs_variable.cc +++ b/storage/perfschema/pfs_variable.cc @@ -392,9 +392,7 @@ int PFS_system_variable_cache::make_call(Request_func func, uint param) else { PFS_system_variable_cache_apc apc_call(this, func, param); - bool timed_out; - ret= m_safe_thd->apc_target.make_apc_call(requestor_thd, &apc_call, 30, - &timed_out); + ret= m_safe_thd->apc_target.make_apc_call(requestor_thd, &apc_call, 30); } return ret; }