Browse Source

APC tpool implementation

10.10-MDEV-16440
Nikita Malyavin 4 years ago
parent
commit
3071f47f85
  1. 10
      sql/my_apc.cc
  2. 7
      sql/my_apc.h
  3. 12
      sql/threadpool.h
  4. 133
      sql/threadpool_common.cc
  5. 39
      sql/threadpool_generic.cc
  6. 5
      sql/threadpool_generic.h
  7. 28
      sql/threadpool_win.cc

10
sql/my_apc.cc

@ -235,10 +235,10 @@ bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call,
This should be called periodically by the APC target thread.
*/
void Apc_target::process_apc_requests(bool lock)
void Apc_target::process_apc_requests()
{
if (lock)
mysql_mutex_lock(LOCK_thd_kill_ptr);
process_epoch++;
mysql_mutex_lock(LOCK_thd_kill_ptr);
while (Call_request *request= get_first_in_queue())
{
@ -274,8 +274,8 @@ void Apc_target::process_apc_requests(bool lock)
#endif
}
/* No requests in the queue */
if (lock)
mysql_mutex_unlock(LOCK_thd_kill_ptr);
process_epoch++;
mysql_mutex_unlock(LOCK_thd_kill_ptr);
}
Apc_target::Call_request::Call_request()

7
sql/my_apc.h

@ -35,6 +35,8 @@
requestor.
*/
#include <atomic>
class THD;
/*
@ -73,7 +75,7 @@ public:
process_apc_requests();
}
void process_apc_requests(bool lock=true);
void process_apc_requests();
/*
A lightweight function, intended to be used in frequent checks like this:
@ -108,6 +110,9 @@ public:
#ifndef DBUG_OFF
int n_calls_processed; /* Number of calls served by this target */
#endif
// Epoch counter that increases before the command
std::atomic<longlong> epoch {0};
std::atomic<longlong> process_epoch {0};
private:
/*

12
sql/threadpool.h

@ -112,6 +112,15 @@ struct TP_connection
/* Read for the next client command (async) with specified timeout */
virtual int start_io() = 0;
/**
Cancels async waiting on the next command.
On windows, enqueues the connection task immediately.
Removes the connection from the poll on other platforms
@return 0, if waiting was cancelled
1, if waiting was not cancelled
-1. if error
*/
virtual int cancel_io() = 0;
virtual void wait_begin(int type)= 0;
virtual void wait_end() = 0;
@ -134,6 +143,7 @@ struct TP_pool
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
virtual void resume(TP_connection* c)=0;
virtual int wake(TP_connection *)=0;
};
#ifdef _WIN32
@ -148,6 +158,7 @@ struct TP_pool_win:TP_pool
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
void resume(TP_connection *c);
int wake(TP_connection *) final;
};
#endif
@ -162,6 +173,7 @@ struct TP_pool_generic :TP_pool
virtual int set_stall_limit(uint);
virtual int get_idle_thread_count();
void resume(TP_connection* c);
int wake(TP_connection *) final;
};
#endif /* HAVE_POOL_OF_THREADS */

133
sql/threadpool_common.cc

@ -25,6 +25,8 @@
#include <threadpool.h>
#include <sql_class.h>
#include <sql_parse.h>
#include "threadpool_generic.h"
#include <my_global.h>
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
@ -56,7 +58,6 @@ static void threadpool_remove_connection(THD *thd);
static dispatch_command_return threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
extern bool do_command(THD*);
static inline TP_connection *get_TP_connection(THD *thd)
{
@ -173,14 +174,9 @@ static TP_PRIORITY get_priority(TP_connection *c)
return prio;
}
void tp_callback(TP_connection *c)
static bool tp_callback_run(TP_connection *c)
{
DBUG_ASSERT(c);
Worker_thread_context worker_context;
worker_context.save();
THD *thd= c->thd;
c->state = TP_STATE_RUNNING;
@ -193,12 +189,14 @@ void tp_callback(TP_connection *c)
if (!thd)
{
/* Bail out on connect error.*/
goto error;
return false;
}
c->connect= 0;
thd->apc_target.epoch.fetch_add(1, std::memory_order_release);
}
else
{
thd->apc_target.epoch.fetch_add(1, std::memory_order_release);
retry:
switch(threadpool_process_request(thd))
{
@ -212,11 +210,15 @@ retry:
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
goto retry;
}
worker_context.restore();
return;
return true;
case DISPATCH_COMMAND_CLOSE_CONNECTION:
/* QUIT or an error occurred. */
goto error;
/*
QUIT or an error occurred.
We can skip epoch increase here: process_apc_requests will be called
one last time after thd is unlinked.
*/
return false;
case DISPATCH_COMMAND_SUCCESS:
break;
}
@ -229,19 +231,33 @@ retry:
/* Read next command from client. */
c->set_io_timeout(thd->get_net_wait_timeout());
c->state= TP_STATE_IDLE;
if (c->start_io())
goto error;
worker_context.restore();
return;
thd->apc_target.epoch.fetch_add(1, std::memory_order_acquire);
if (unlikely(thd->apc_target.have_apc_requests()))
thd->apc_target.process_apc_requests();
int error= c->start_io();
return error == 0;
}
error:
c->thd= 0;
if (thd)
void tp_callback(TP_connection *c)
{
Worker_thread_context worker_context;
worker_context.save();
bool success= tp_callback_run(c);
if (!success)
{
threadpool_remove_connection(thd);
THD *thd= c->thd;
c->thd= 0;
if (thd)
{
threadpool_remove_connection(thd);
}
delete c;
}
delete c;
worker_context.restore();
}
@ -320,6 +336,10 @@ static void threadpool_remove_connection(THD *thd)
end_connection(thd);
close_connection(thd, 0);
unlink_thd(thd);
// The rest of APC requests should be processed after unlinking.
// This guarantees that no new APC requests can be added.
// TODO: better notify the requestor with some KILLED state here.
thd->apc_target.process_apc_requests();
PSI_CALL_delete_current_thread(); // before THD is destroyed
delete thd;
@ -367,7 +387,14 @@ static dispatch_command_return threadpool_process_request(THD *thd)
thread_attach(thd);
if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
{
if (unlikely(thd->async_state.m_command == COM_SLEEP))
return DISPATCH_COMMAND_SUCCESS; // Special case for thread pool APC
goto resume;
}
thd->apc_target.process_apc_requests();
if (thd->killed >= KILL_CONNECTION)
{
@ -573,10 +600,71 @@ static void tp_resume(THD* thd)
{
DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
thd->apc_target.epoch.fetch_add(1, std::memory_order_acquire);
TP_connection* c = get_TP_connection(thd);
pool->resume(c);
}
static bool tp_notify_apc(THD *thd)
{
mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
TP_connection* c= get_TP_connection(thd);
longlong process_epoch= thd->apc_target.process_epoch;
longlong first_epoch= thd->apc_target.epoch;
while (1)
{
longlong epoch= thd->apc_target.epoch;
if (epoch & 1 || epoch != first_epoch)
{
// We are in the safe zone, where we can guarantee a processing.
break;
}
else
{
/*
Continue to the retry-loop.
We are either before APC request check, or after the check and before
run, or the run is skipped.
*/
process_epoch= thd->apc_target.process_epoch;
if (process_epoch & 1)
{
// We are hanging on LOCK_thd_kill in process_apc_requests, or going to.
break;
}
else
{
// We are either before apc requests checked or processed, or after APC
// processed. We can't distinguish these states, so just flush the pool
// and then retry if failed.
int status= pool->wake(c);
if (likely(status == 0))
{
break;
}
else if (unlikely(status < 0))
{
return false;
}
else
{
/*
If the run is skipped and somebody else took connection out of the
poll, then we will wait until the epoch change, therefore, will wait
until the task will reach the worker by the queue, which can be
long.
So longer sleep here (1ms).
*/
my_sleep(1000);
}
}
}
}
return true;
}
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@ -588,7 +676,8 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
tp_end, // end
tp_resume
tp_resume,
tp_notify_apc,
};
void pool_of_threads_scheduler(struct scheduler_functions *func,

39
sql/threadpool_generic.cc

@ -1337,6 +1337,24 @@ void TP_pool_generic::resume(TP_connection* c)
add(c);
}
int TP_pool_generic::wake(TP_connection *c)
{
int status= c->cancel_io();
if (status == 0)
{
THD *thd= c->thd;
/* Set custom async_state to handle later in threadpool_process_request().
This will avoid possible side effects of dry-running do_command() */
DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::NONE);
DBUG_ASSERT(thd->async_state.m_command == COM_SLEEP);
thd->async_state.m_state= thd_async_state::enum_async_state::RESUMED;
/* Add c to the task queue */
resume(c);
}
return status;
}
/**
MySQL scheduler callback: wait begin
*/
@ -1497,7 +1515,7 @@ int TP_connection_generic::start_io()
So we recalculate in which group the connection should be, based
on thread_id and current group count, and migrate if necessary.
*/
if (fix_group)
if (unlikely(fix_group))
{
fix_group = false;
thread_group_t *new_group= &all_groups[get_group_id(thd->thread_id)];
@ -1512,7 +1530,7 @@ int TP_connection_generic::start_io()
/*
Bind to poll descriptor if not yet done.
*/
if (!bound_to_poll_descriptor)
if (unlikely(!bound_to_poll_descriptor))
{
bound_to_poll_descriptor= true;
return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
@ -1522,6 +1540,23 @@ int TP_connection_generic::start_io()
}
int TP_connection_generic::cancel_io()
{
int ret = io_poll_disassociate_fd(thread_group->pollfd,fd);
if (ret == 0)
{
// We have successfully disassociated fd. bound_to_poll_descriptor now will
// not be accessed from another threads. bound_to_poll_descriptor changes
// together with io_poll_associate_fd/io_poll_disassociate_fd pair
DBUG_ASSERT(bound_to_poll_descriptor);
bound_to_poll_descriptor= false;
return 0;
}
// Hopefully, all POSIX implementations return ENOENT for the case in question
return errno == ENOENT ? 1 : -1;
}
/**
Worker thread's main

5
sql/threadpool_generic.h

@ -79,6 +79,7 @@ struct TP_connection_generic :public TP_connection
int init() override { return 0; }
void set_io_timeout(int sec) override;
int start_io() override;
int cancel_io() final;
void wait_begin(int type) override;
void wait_end() override;
@ -88,6 +89,10 @@ struct TP_connection_generic :public TP_connection
ulonglong abs_wait_timeout;
ulonglong enqueue_time;
TP_file_handle fd;
/**
Designates whether fd is currently connected to the poll denoted by
thread_group->pollfd. See also change_group.
*/
bool bound_to_poll_descriptor;
int waiting;
bool fix_group;

28
sql/threadpool_win.cc

@ -90,6 +90,7 @@ public:
void set_io_timeout(int sec) override;
void wait_begin(int type) override;
void wait_end() override;
int cancel_io() final;
ulonglong timeout=ULLONG_MAX;
OVERLAPPED overlapped{};
@ -131,6 +132,11 @@ void TP_pool_win::resume(TP_connection* c)
SubmitThreadpoolWork(((TP_connection_win*)c)->work);
}
int TP_pool_win::wake(TP_connection *c)
{
return c->cancel_io();
}
#define CHECK_ALLOC_ERROR(op) \
do \
{ \
@ -177,6 +183,13 @@ int TP_connection_win::start_io()
return 0;
}
int TP_connection_win::cancel_io()
{
if (CancelIoEx(sock.m_handle, &sock.m_overlapped))
return 0;
return GetLastError() == ERROR_NOT_FOUND ? 1 : -1;
}
/*
Recalculate wait timeout, maybe reset timer.
*/
@ -286,10 +299,25 @@ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
{
TP_connection_win *c= (TP_connection_win *)context;
THD *thd= c->thd;
/* How many bytes were preread into read buffer */
c->sock.end_read((ULONG)nbytes, io_result);
if (io_result == ERROR_OPERATION_ABORTED)
{
mysql_mutex_lock(&thd->LOCK_thd_kill);
if (!thd->is_killed())
{
/* Set custom async_state to handle later in threadpool_process_request().
This will avoid possible side effects of dry-running do_command() */
DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::NONE);
DBUG_ASSERT(thd->async_state.m_command == COM_SLEEP);
thd->async_state.m_state= thd_async_state::enum_async_state::RESUMED;
}
mysql_mutex_unlock(&thd->LOCK_thd_kill);
}
/*
Execute high priority connections immediately.
'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)

Loading…
Cancel
Save