Browse Source

Split Apc_target::make_apc_call into two functions.

In pfs_variable.cc we will have to make an additional step between
enqueue_request and wait_for_completion. These two functions will be called
directly and therefore both should have a public interface
10.10-MDEV-16440
Nikita Malyavin 4 years ago
parent
commit
fb922c2c80
  1. 149
      sql/my_apc.cc
  2. 19
      sql/my_apc.h
  3. 4
      storage/perfschema/pfs_variable.cc

149
sql/my_apc.cc

@ -100,13 +100,75 @@ void init_show_explain_psi_keys(void)
if (PSI_server == NULL)
return;
PSI_server->register_cond("sql", show_explain_psi_conds,
PSI_server->register_cond("sql", show_explain_psi_conds,
array_elements(show_explain_psi_conds));
}
#endif
/**
Wait gracefully until the request is completed.
@retval 0 -- Success
@retval 1 -- Timeout
*/
int Apc_target::wait_for_completion(THD *caller_thd, Call_request *apc_request,
int timeout_sec)
{
struct timespec abstime;
const int timeout= timeout_sec;
set_timespec(abstime, timeout);
int res = 1;
int wait_res= 0;
PSI_stage_info old_stage;
caller_thd->ENTER_COND(&apc_request->COND_request, LOCK_thd_kill_ptr,
&stage_show_explain, &old_stage);
/* todo: how about processing other errors here? */
while (!apc_request->processed && (wait_res != ETIMEDOUT))
{
/* We own LOCK_thd_kill_ptr */
wait_res= mysql_cond_timedwait(&apc_request->COND_request,
LOCK_thd_kill_ptr, &abstime);
if (caller_thd->killed)
break;
}
/*
if (!apc_request->processed)
{
/*
The wait has timed out, or this thread was KILLed.
Remove the request from the queue (ok to do because we own
LOCK_thd_kill_ptr)
*/
apc_request->processed= TRUE;
dequeue_request(apc_request);
res= 1;
}
else
{
/* Request was successfully executed and dequeued by the target thread */
res= 0;
}
/*
exit_cond() will call mysql_mutex_unlock(LOCK_thd_kill_ptr) for us:
*/
caller_thd->EXIT_COND(&old_stage);
/* Destroy all APC request data */
mysql_cond_destroy(&apc_request->COND_request);
return res;
}
/** Create and post the request */
void Apc_target::enqueue_request(Call_request *request_buff, Apc_call *call)
{
request_buff->call= call;
request_buff->processed= FALSE;
mysql_cond_init(key_show_explain_request_COND, &request_buff->COND_request,
NULL);
enqueue_request(request_buff);
request_buff->what="enqueued by make_apc_call";
}
/**
Make an APC (Async Procedure Call) to another thread.
@detail
@ -123,7 +185,7 @@ void init_show_explain_psi_keys(void)
timeout occurred)
*/
bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call,
bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call,
int timeout_sec, bool *timed_out)
{
bool res= TRUE;
@ -131,58 +193,10 @@ bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call,
if (enabled)
{
/* Create and post the request */
Call_request apc_request;
apc_request.call= call;
apc_request.processed= FALSE;
mysql_cond_init(key_show_explain_request_COND, &apc_request.COND_request,
NULL);
enqueue_request(&apc_request);
apc_request.what="enqueued by make_apc_call";
struct timespec abstime;
const int timeout= timeout_sec;
set_timespec(abstime, timeout);
int wait_res= 0;
PSI_stage_info old_stage;
caller_thd->ENTER_COND(&apc_request.COND_request, LOCK_thd_kill_ptr,
&stage_show_explain, &old_stage);
/* todo: how about processing other errors here? */
while (!apc_request.processed && (wait_res != ETIMEDOUT))
{
/* We own LOCK_thd_kill_ptr */
wait_res= mysql_cond_timedwait(&apc_request.COND_request,
LOCK_thd_kill_ptr, &abstime);
// &apc_request.LOCK_request, &abstime);
if (caller_thd->killed)
break;
}
if (!apc_request.processed)
{
/*
The wait has timed out, or this thread was KILLed.
Remove the request from the queue (ok to do because we own
LOCK_thd_kill_ptr)
*/
apc_request.processed= TRUE;
dequeue_request(&apc_request);
*timed_out= TRUE;
res= TRUE;
}
else
{
/* Request was successfully executed and dequeued by the target thread */
res= FALSE;
}
/*
exit_cond() will call mysql_mutex_unlock(LOCK_thd_kill_ptr) for us:
*/
caller_thd->EXIT_COND(&old_stage);
/* Destroy all APC request data */
mysql_cond_destroy(&apc_request.COND_request);
enqueue_request(&apc_request, call);
res= wait_for_completion(caller_thd, &apc_request, timeout_sec);
*timed_out= res;
}
else
{
@ -201,22 +215,15 @@ bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call,
This should be called periodically by the APC target thread.
*/
void Apc_target::process_apc_requests()
void Apc_target::process_apc_requests(bool lock)
{
while (1)
{
Call_request *request;
if (lock)
mysql_mutex_lock(LOCK_thd_kill_ptr);
if (!(request= get_first_in_queue()))
{
/* No requests in the queue */
mysql_mutex_unlock(LOCK_thd_kill_ptr);
break;
}
/*
Remove the request from the queue (we're holding queue lock so we can be
while (Call_request *request= get_first_in_queue())
{
/*
Remove the request from the queue (we're holding queue lock so we can be
sure that request owner won't try to remove it)
*/
request->what="dequeued by process_apc_requests";
@ -224,13 +231,17 @@ void Apc_target::process_apc_requests()
request->processed= TRUE;
request->call->call_in_target_thread();
mysql_cond_signal(&request->COND_request);
mysql_mutex_unlock(LOCK_thd_kill_ptr);
mysql_mutex_lock(LOCK_thd_kill_ptr);
request->what="func called by process_apc_requests";
#ifndef DBUG_OFF
n_calls_processed++;
#endif
mysql_cond_signal(&request->COND_request);
mysql_mutex_unlock(LOCK_thd_kill_ptr);
}
/* No requests in the queue */
if (lock)
mysql_mutex_unlock(LOCK_thd_kill_ptr);
}

19
sql/my_apc.h

@ -73,7 +73,8 @@ public:
process_apc_requests();
}
void process_apc_requests();
void process_apc_requests(bool lock=true);
void process_apc_requests_lock_owned();
/*
A lightweight function, intended to be used in frequent checks like this:
@ -94,15 +95,20 @@ public:
virtual void call_in_target_thread()= 0;
virtual ~Apc_call() {}
};
class Call_request;
/* Make a call in the target thread (see function definition for details) */
bool make_apc_call(THD *caller_thd, Apc_call *call, int timeout_sec, bool *timed_out);
bool make_apc_call(THD *caller_thd, Apc_call *call,
int timeout_sec, bool *timed_out);
void enqueue_request(Call_request *request_buff, Apc_call *call);
int wait_for_completion(THD *caller_thd, Call_request *request,
int timeout_sec);
#ifndef DBUG_OFF
int n_calls_processed; /* Number of calls served by this target */
#endif
private:
class Call_request;
/*
Non-zero value means we're enabled. It's an int, not bool, because one can
@ -122,7 +128,8 @@ private:
operation.
*/
Call_request *apc_calls;
public:
class Call_request
{
public:
@ -140,7 +147,7 @@ private:
const char *what; /* (debug) state of the request */
};
private:
void enqueue_request(Call_request *qe);
void dequeue_request(Call_request *qe);

4
storage/perfschema/pfs_variable.cc

@ -392,9 +392,7 @@ int PFS_system_variable_cache::make_call(Request_func func, uint param)
else
{
PFS_system_variable_cache_apc apc_call(this, func, param);
bool timed_out;
ret= m_safe_thd->apc_target.make_apc_call(requestor_thd, &apc_call, 30,
&timed_out);
ret= m_safe_thd->apc_target.make_apc_call(requestor_thd, &apc_call, 30);
}
return ret;
}

Loading…
Cancel
Save