From 66de4fef763c7072f2dab3d1a54630f03b01197e Mon Sep 17 00:00:00 2001 From: Vladislav Vaintroub Date: Fri, 29 Nov 2019 22:26:04 +0000 Subject: [PATCH] MDEV-16264 - some improvements - wait notification, tpool_wait_begin/tpool_wait_end - to notify the threadpool that current thread is going to wait Use it to wait for IOs to complete and also when purge waits for workers. --- storage/innobase/os/os0file.cc | 16 ++++- storage/innobase/trx/trx0purge.cc | 8 ++- tpool/CMakeLists.txt | 1 + tpool/tpool.h | 4 ++ tpool/tpool_generic.cc | 100 ++++++++++++++++++++---------- tpool/tpool_structs.h | 5 ++ tpool/wait_notification.cc | 21 +++++++ 7 files changed, 118 insertions(+), 37 deletions(-) create mode 100644 tpool/wait_notification.cc diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc index 2f5dd12b407..542b1d4f2e8 100644 --- a/storage/innobase/os/os0file.cc +++ b/storage/innobase/os/os0file.cc @@ -84,10 +84,12 @@ class io_slots private: tpool::cache m_cache; tpool::task_group m_group; + int m_max_aio; public: io_slots(int max_submitted_io, int max_callback_concurrency) : m_cache(max_submitted_io), - m_group(max_callback_concurrency) + m_group(max_callback_concurrency), + m_max_aio(max_submitted_io) { } /* Get cached AIO control block */ @@ -112,6 +114,11 @@ public: m_cache.wait(); } + size_t pending_io_count() + { + return (size_t)m_max_aio - m_cache.size(); + } + tpool::task_group* get_task_group() { return &m_group; @@ -4058,7 +4065,12 @@ void os_aio_free() be other, synchronous, pending writes. */ void os_aio_wait_until_no_pending_writes() { - write_slots->wait(); + if (write_slots->pending_io_count()) + { + tpool::tpool_wait_begin(); + write_slots->wait(); + tpool::tpool_wait_end(); + } } diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index 593a4556a70..a8bd4a5b93c 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -1239,7 +1239,13 @@ extern tpool::waitable_task purge_worker_task; /** Wait for pending purge jobs to complete. */ static void trx_purge_wait_for_workers_to_complete() { - purge_worker_task.wait(); + if (purge_worker_task.get_ref_count()) + { + tpool::tpool_wait_begin(); + purge_worker_task.wait(); + tpool::tpool_wait_end(); + } + /* There should be no outstanding tasks as long as the worker threads are active. */ ut_ad(srv_get_task_queue_length() == 0); diff --git a/tpool/CMakeLists.txt b/tpool/CMakeLists.txt index fc33f9b6932..3e3f8e0b42a 100644 --- a/tpool/CMakeLists.txt +++ b/tpool/CMakeLists.txt @@ -22,6 +22,7 @@ ADD_LIBRARY(tpool STATIC tpool_generic.cc task_group.cc task.cc + wait_notification.cc ${EXTRA_SOURCES} ) diff --git a/tpool/tpool.h b/tpool/tpool.h index 8659e8adc74..472e59d5d9e 100644 --- a/tpool/tpool.h +++ b/tpool/tpool.h @@ -214,6 +214,8 @@ public: int bind(native_file_handle &fd) { return m_aio->bind(fd); } void unbind(const native_file_handle &fd) { m_aio->unbind(fd); } int submit_io(aiocb *cb) { return m_aio->submit_io(cb); } + virtual void wait_begin() {}; + virtual void wait_end() {}; virtual ~thread_pool() {} }; const int DEFAULT_MIN_POOL_THREADS= 1; @@ -221,6 +223,8 @@ const int DEFAULT_MAX_POOL_THREADS= 500; extern thread_pool * create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS, int max_threads= DEFAULT_MAX_POOL_THREADS); +extern "C" void tpool_wait_begin(); +extern "C" void tpool_wait_end(); #ifdef _WIN32 extern thread_pool * create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS, diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc index 4416dcadc7e..fd5cba67e80 100644 --- a/tpool/tpool_generic.cc +++ b/tpool/tpool_generic.cc @@ -70,8 +70,6 @@ namespace tpool and also ensures that idle timeout works well. LIFO wakeup order ensures that active threads stay active, and idle ones stay idle. - - to minimize spurious wakeups, some items are not put into the queue. Instead - submit() will pass the data directly to the thread it woke up. */ /** @@ -109,7 +107,8 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data { NONE = 0, EXECUTING_TASK = 1, - LONG_TASK = 2 + LONG_TASK = 2, + WAITING = 4 }; int m_state; @@ -154,6 +153,9 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data } }; + +static thread_local worker_data* tls_worker_data; + class thread_pool_generic : public thread_pool { /** Cache for per-worker structures */ @@ -186,6 +188,7 @@ class thread_pool_generic : public thread_pool /** Overall number of enqueues*/ unsigned long long m_tasks_enqueued; + unsigned long long m_group_enqueued; /** Overall number of dequeued tasks. */ unsigned long long m_tasks_dequeued; @@ -212,6 +215,8 @@ class thread_pool_generic : public thread_pool adjusting concurrency */ int m_long_tasks_count; + int m_waiting_task_count; + /** Last time thread was created*/ std::chrono::system_clock::time_point m_last_thread_creation; @@ -237,7 +242,8 @@ class thread_pool_generic : public thread_pool } bool add_thread(); bool wake(worker_wake_reason reason, task *t = nullptr); - void wake_or_create_thread(); + void maybe_wake_or_create_thread(); + bool too_many_active_threads(); bool get_task(worker_data *thread_var, task **t); bool wait_for_tasks(std::unique_lock &lk, worker_data *thread_var); @@ -250,6 +256,8 @@ class thread_pool_generic : public thread_pool public: thread_pool_generic(int min_threads, int max_threads); ~thread_pool_generic(); + void wait_begin() override; + void wait_end() override; void submit_task(task *task) override; virtual aio *create_native_aio(int max_io) override { @@ -447,31 +455,24 @@ bool thread_pool_generic::get_task(worker_data *thread_var, task **t) thread_var->m_state = worker_data::NONE; - if (m_task_queue.empty()) + while (m_task_queue.empty()) { if (m_in_shutdown) return false; if (!wait_for_tasks(lk, thread_var)) return false; - - /* Task was handed over directly by signaling thread.*/ - if (thread_var->m_wake_reason == WAKE_REASON_TASK) + if (m_task_queue.empty()) { - *t= thread_var->m_task; - goto end; + m_spurious_wakeups++; + continue; } - - if (m_task_queue.empty()) - return false; } /* Dequeue from the task queue.*/ *t= m_task_queue.front(); m_task_queue.pop(); m_tasks_dequeued++; - -end: thread_var->m_state |= worker_data::EXECUTING_TASK; thread_var->m_task_start_time = m_timestamp; return true; @@ -491,14 +492,18 @@ void thread_pool_generic::worker_end(worker_data* thread_data) } } +extern "C" void set_tls_pool(tpool::thread_pool* pool); + /* The worker get/execute task loop.*/ void thread_pool_generic::worker_main(worker_data *thread_var) { task* task; - + set_tls_pool(this); if(m_worker_init_callback) m_worker_init_callback(); + tls_worker_data = thread_var; + while (get_task(thread_var, &task) && task) { task->execute(); @@ -557,12 +562,10 @@ void thread_pool_generic::maintainence() m_long_tasks_count++; } } + + maybe_wake_or_create_thread(); + size_t thread_cnt = (int)thread_count(); - if (m_active_threads.size() - m_long_tasks_count < m_concurrency*OVERSUBSCRIBE_FACTOR) - { - wake_or_create_thread(); - return; - } if (m_last_activity == m_tasks_dequeued + m_wakeups && m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt) { @@ -638,7 +641,7 @@ bool thread_pool_generic::add_thread() } /** Wake a standby thread, and hand the given task over to this thread. */ -bool thread_pool_generic::wake(worker_wake_reason reason, task *t) +bool thread_pool_generic::wake(worker_wake_reason reason, task *) { assert(reason != WAKE_REASON_NONE); @@ -650,10 +653,6 @@ bool thread_pool_generic::wake(worker_wake_reason reason, task *t) assert(var->m_wake_reason == WAKE_REASON_NONE); var->m_wake_reason= reason; var->m_cv.notify_one(); - if (t) - { - var->m_task= t; - } m_wakeups++; return true; } @@ -673,10 +672,11 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : m_tasks_dequeued(), m_wakeups(), m_spurious_wakeups(), - m_concurrency(std::thread::hardware_concurrency()), + m_concurrency(std::thread::hardware_concurrency()*2), m_in_shutdown(), m_timestamp(), m_long_tasks_count(), + m_waiting_task_count(), m_last_thread_creation(), m_min_threads(min_threads), m_max_threads(max_threads), @@ -700,14 +700,15 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : } -void thread_pool_generic::wake_or_create_thread() +void thread_pool_generic::maybe_wake_or_create_thread() { - assert(!m_task_queue.empty()); + if (m_task_queue.empty()) + return; + if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency) + return; if (!m_standby_threads.empty()) { - auto t= m_task_queue.front(); - m_task_queue.pop(); - wake(WAKE_REASON_TASK, t); + wake(WAKE_REASON_TASK); } else { @@ -715,6 +716,11 @@ void thread_pool_generic::wake_or_create_thread() } } +bool thread_pool_generic::too_many_active_threads() +{ + return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > + m_concurrency* OVERSUBSCRIBE_FACTOR; +} /** Submit a new task*/ void thread_pool_generic::submit_task(task* task) @@ -725,9 +731,35 @@ void thread_pool_generic::submit_task(task* task) task->add_ref(); m_tasks_enqueued++; m_task_queue.push(task); + maybe_wake_or_create_thread(); +} + + +/* Notify thread pool that current thread is going to wait */ +void thread_pool_generic::wait_begin() +{ + if (!tls_worker_data || tls_worker_data->is_long_task()) + return; + tls_worker_data->m_state |= worker_data::WAITING; + std::unique_lock lk(m_mtx); + m_waiting_task_count++; + + /* Maintain concurrency */ + if (m_task_queue.empty()) + return; + if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count < m_concurrency) + maybe_wake_or_create_thread(); +} + - if (m_active_threads.size() - m_long_tasks_count < m_concurrency *OVERSUBSCRIBE_FACTOR) - wake_or_create_thread(); +void thread_pool_generic::wait_end() +{ + if (tls_worker_data && (tls_worker_data->m_state & worker_data::WAITING)) + { + tls_worker_data->m_state &= ~worker_data::WAITING; + std::unique_lock lk(m_mtx); + m_waiting_task_count--; + } } /** diff --git a/tpool/tpool_structs.h b/tpool/tpool_structs.h index 9d55741036c..d1c078d11f1 100644 --- a/tpool/tpool_structs.h +++ b/tpool/tpool_structs.h @@ -105,6 +105,11 @@ public: m_cv.wait(lk); m_waiters--; } + + size_t size() + { + return m_cache.size(); + } }; diff --git a/tpool/wait_notification.cc b/tpool/wait_notification.cc new file mode 100644 index 00000000000..d158d29c690 --- /dev/null +++ b/tpool/wait_notification.cc @@ -0,0 +1,21 @@ +#include + +static thread_local tpool::thread_pool* tls_thread_pool; + +extern "C" void set_tls_pool(tpool::thread_pool* pool) +{ + tls_thread_pool = pool; +} + +extern "C" void tpool_wait_begin() +{ + if (tls_thread_pool) + tls_thread_pool->wait_begin(); +} + + +extern "C" void tpool_wait_end() +{ + if (tls_thread_pool) + tls_thread_pool->wait_end(); +} \ No newline at end of file