You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

966 lines
24 KiB

  1. /* Copyright (C) 2019, 2022, MariaDB Corporation.
  2. This program is free software; you can redistribute itand /or modify
  3. it under the terms of the GNU General Public License as published by
  4. the Free Software Foundation; version 2 of the License.
  5. This program is distributed in the hope that it will be useful,
  6. but WITHOUT ANY WARRANTY; without even the implied warranty of
  7. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
  8. GNU General Public License for more details.
  9. You should have received a copy of the GNU General Public License
  10. along with this program; if not, write to the Free Software
  11. Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
  12. #include "tpool_structs.h"
  13. #include <limits.h>
  14. #include <algorithm>
  15. #include <assert.h>
  16. #include <atomic>
  17. #include <chrono>
  18. #include <condition_variable>
  19. #include <iostream>
  20. #include <limits.h>
  21. #include <mutex>
  22. #include <queue>
  23. #include <stack>
  24. #include <thread>
  25. #include <vector>
  26. #include "tpool.h"
  27. #include <assert.h>
  28. #include <my_global.h>
  29. #include <my_dbug.h>
  30. #include <thr_timer.h>
  31. #include <stdlib.h>
  32. #include "aligned.h"
  33. namespace tpool
  34. {
  35. #ifdef __linux__
  36. #if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO)
  37. extern aio* create_linux_aio(thread_pool* tp, int max_io);
  38. #else
  39. aio *create_linux_aio(thread_pool *, int) { return nullptr; };
  40. #endif
  41. #endif
  42. #ifdef _WIN32
  43. extern aio* create_win_aio(thread_pool* tp, int max_io);
  44. #endif
  45. static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
  46. static const int OVERSUBSCRIBE_FACTOR = 2;
  47. /**
  48. Process the cb synchronously
  49. */
  50. void aio::synchronous(aiocb *cb)
  51. {
  52. #ifdef _WIN32
  53. size_t ret_len;
  54. #else
  55. ssize_t ret_len;
  56. #endif
  57. int err= 0;
  58. switch (cb->m_opcode)
  59. {
  60. case aio_opcode::AIO_PREAD:
  61. ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
  62. break;
  63. case aio_opcode::AIO_PWRITE:
  64. ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
  65. break;
  66. default:
  67. abort();
  68. }
  69. #ifdef _WIN32
  70. if (static_cast<int>(ret_len) < 0)
  71. err= GetLastError();
  72. #else
  73. if (ret_len < 0)
  74. {
  75. err= errno;
  76. ret_len= 0;
  77. }
  78. #endif
  79. cb->m_ret_len = ret_len;
  80. cb->m_err = err;
  81. if (ret_len)
  82. finish_synchronous(cb);
  83. }
  84. /**
  85. Implementation of generic threadpool.
  86. This threadpool consists of the following components
  87. - The task queue. This queue is populated by submit()
  88. - Worker that execute the work items.
  89. - Timer thread that takes care of pool health
  90. The task queue is populated by submit() method.
  91. on submit(), a worker thread can be woken, or created
  92. to execute tasks.
  93. The timer thread watches if work items are being dequeued, and if not,
  94. this can indicate potential deadlock.
  95. Thus the timer thread can also wake or create a thread, to ensure some progress.
  96. Optimizations:
  97. - worker threads that are idle for long time will shutdown.
  98. - worker threads are woken in LIFO order, which minimizes context switching
  99. and also ensures that idle timeout works well. LIFO wakeup order ensures
  100. that active threads stay active, and idle ones stay idle.
  101. */
  102. /**
  103. Worker wakeup flags.
  104. */
  105. enum worker_wake_reason
  106. {
  107. WAKE_REASON_NONE,
  108. WAKE_REASON_TASK,
  109. WAKE_REASON_SHUTDOWN
  110. };
  111. /* A per-worker thread structure.*/
  112. struct worker_data
  113. {
  114. /** Condition variable to wakeup this worker.*/
  115. std::condition_variable m_cv;
  116. /** Reason why worker was woken. */
  117. worker_wake_reason m_wake_reason;
  118. /**
  119. If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
  120. */
  121. task* m_task;
  122. /** Struct is member of intrusive doubly linked list */
  123. worker_data* m_prev;
  124. worker_data* m_next;
  125. /* Current state of the worker.*/
  126. enum state
  127. {
  128. NONE = 0,
  129. EXECUTING_TASK = 1,
  130. LONG_TASK = 2,
  131. WAITING = 4
  132. };
  133. int m_state;
  134. /* Padding to avoid false sharing */
  135. char m_pad[CPU_LEVEL1_DCACHE_LINESIZE];
  136. bool is_executing_task()
  137. {
  138. return m_state & EXECUTING_TASK;
  139. }
  140. bool is_long_task()
  141. {
  142. return m_state & LONG_TASK;
  143. }
  144. bool is_waiting()
  145. {
  146. return m_state & WAITING;
  147. }
  148. std::chrono::system_clock::time_point m_task_start_time;
  149. worker_data() :
  150. m_cv(),
  151. m_wake_reason(WAKE_REASON_NONE),
  152. m_task(),
  153. m_prev(),
  154. m_next(),
  155. m_state(NONE),
  156. m_task_start_time()
  157. {}
  158. };
  159. static thread_local worker_data* tls_worker_data;
  160. class thread_pool_generic : public thread_pool
  161. {
  162. /** Cache for per-worker structures */
  163. cache<worker_data> m_thread_data_cache;
  164. /** The task queue */
  165. circular_queue<task*> m_task_queue;
  166. /** List of standby (idle) workers */
  167. doubly_linked_list<worker_data> m_standby_threads;
  168. /** List of threads that are executing tasks */
  169. doubly_linked_list<worker_data> m_active_threads;
  170. /* Mutex that protects the whole struct, most importantly
  171. the standby threads list, and task queue */
  172. std::mutex m_mtx;
  173. /** Timeout after which idle worker shuts down */
  174. std::chrono::milliseconds m_thread_timeout;
  175. /** How often should timer wakeup.*/
  176. std::chrono::milliseconds m_timer_interval;
  177. /** Another condition variable, used in pool shutdown */
  178. std::condition_variable m_cv_no_threads;
  179. /** Condition variable for the timer thread. Signaled on shutdown. */
  180. std::condition_variable m_cv_timer;
  181. /** Overall number of enqueues*/
  182. unsigned long long m_tasks_enqueued;
  183. /** Overall number of dequeued tasks. */
  184. unsigned long long m_tasks_dequeued;
  185. /** Statistic related, number of worker thread wakeups */
  186. int m_wakeups;
  187. /**
  188. Statistic related, number of spurious thread wakeups
  189. (i.e thread woke up, and the task queue is empty)
  190. */
  191. int m_spurious_wakeups;
  192. /** The desired concurrency. This number of workers should be
  193. actively executing. */
  194. unsigned int m_concurrency;
  195. /** True, if threadpool is being shutdown, false otherwise */
  196. bool m_in_shutdown= false;
  197. /** Maintenance timer state : true = active(ON),false = inactive(OFF)*/
  198. enum class timer_state_t
  199. {
  200. OFF, ON
  201. };
  202. timer_state_t m_timer_state= timer_state_t::OFF;
  203. void switch_timer(timer_state_t state);
  204. /* Updates idle_since, and maybe switches the timer off */
  205. void check_idle(std::chrono::system_clock::time_point now);
  206. /** time point when timer last ran, used as a coarse clock. */
  207. std::chrono::system_clock::time_point m_timestamp;
  208. /** Number of long running tasks. The long running tasks are excluded when
  209. adjusting concurrency */
  210. unsigned int m_long_tasks_count;
  211. unsigned int m_waiting_task_count;
  212. /** Last time thread was created*/
  213. std::chrono::system_clock::time_point m_last_thread_creation;
  214. /** Minimumum number of threads in this pool.*/
  215. unsigned int m_min_threads;
  216. /** Maximimum number of threads in this pool. */
  217. unsigned int m_max_threads;
  218. /* maintenance related statistics (see maintenance()) */
  219. size_t m_last_thread_count;
  220. unsigned long long m_last_activity;
  221. std::atomic_flag m_thread_creation_pending= ATOMIC_FLAG_INIT;
  222. void worker_main(worker_data *thread_data);
  223. void worker_end(worker_data* thread_data);
  224. /* Checks threadpool responsiveness, adjusts thread_counts */
  225. void maintenance();
  226. static void maintenance_func(void* arg)
  227. {
  228. ((thread_pool_generic *)arg)->maintenance();
  229. }
  230. bool add_thread();
  231. bool wake(worker_wake_reason reason, task *t = nullptr);
  232. void maybe_wake_or_create_thread();
  233. bool too_many_active_threads();
  234. bool get_task(worker_data *thread_var, task **t);
  235. bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
  236. worker_data *thread_var);
  237. void cancel_pending(task* t);
  238. size_t thread_count()
  239. {
  240. return m_active_threads.size() + m_standby_threads.size();
  241. }
  242. public:
  243. thread_pool_generic(int min_threads, int max_threads);
  244. ~thread_pool_generic() override;
  245. void wait_begin() override;
  246. void wait_end() override;
  247. void submit_task(task *task) override;
  248. aio *create_native_aio(int max_io) override
  249. {
  250. #ifdef _WIN32
  251. return create_win_aio(this, max_io);
  252. #elif defined(__linux__)
  253. return create_linux_aio(this,max_io);
  254. #else
  255. return nullptr;
  256. #endif
  257. }
  258. class timer_generic : public thr_timer_t, public timer
  259. {
  260. thread_pool_generic* m_pool;
  261. waitable_task m_task;
  262. callback_func m_callback;
  263. void* m_data;
  264. int m_period;
  265. std::mutex m_mtx;
  266. bool m_on;
  267. std::atomic<int> m_running;
  268. void run()
  269. {
  270. /*
  271. In rare cases, multiple callbacks can be scheduled,
  272. at the same time,. e.g with set_time(0,0) in a loop.
  273. We do not allow parallel execution, since it is against the expectations.
  274. */
  275. if (m_running.fetch_add(1, std::memory_order_acquire) > 0)
  276. return;
  277. do
  278. {
  279. m_callback(m_data);
  280. }
  281. while (m_running.fetch_sub(1, std::memory_order_release) != 1);
  282. if (m_pool && m_period)
  283. {
  284. std::unique_lock<std::mutex> lk(m_mtx);
  285. if (m_on)
  286. {
  287. DBUG_PUSH_EMPTY;
  288. thr_timer_end(this);
  289. thr_timer_settime(this, 1000ULL * m_period);
  290. DBUG_POP_EMPTY;
  291. }
  292. }
  293. }
  294. static void execute(void* arg)
  295. {
  296. auto timer = (timer_generic*)arg;
  297. timer->run();
  298. }
  299. static void submit_task(void* arg)
  300. {
  301. timer_generic* timer = (timer_generic*)arg;
  302. timer->m_pool->submit_task(&timer->m_task);
  303. }
  304. public:
  305. timer_generic(callback_func func, void* data, thread_pool_generic * pool):
  306. m_pool(pool),
  307. m_task(timer_generic::execute,this),
  308. m_callback(func),m_data(data),m_period(0),m_mtx(),
  309. m_on(true),m_running()
  310. {
  311. if (pool)
  312. {
  313. /* EXecute callback in threadpool*/
  314. thr_timer_init(this, submit_task, this);
  315. }
  316. else
  317. {
  318. /* run in "timer" thread */
  319. thr_timer_init(this, m_task.get_func(), m_task.get_arg());
  320. }
  321. }
  322. void set_time(int initial_delay_ms, int period_ms) override
  323. {
  324. std::unique_lock<std::mutex> lk(m_mtx);
  325. if (!m_on)
  326. return;
  327. thr_timer_end(this);
  328. if (!m_pool)
  329. thr_timer_set_period(this, 1000ULL * period_ms);
  330. else
  331. m_period = period_ms;
  332. thr_timer_settime(this, 1000ULL * initial_delay_ms);
  333. }
  334. /*
  335. Change only period of a periodic timer
  336. (after the next execution). Workarounds
  337. mysys timer deadlocks
  338. */
  339. void set_period(int period_ms)
  340. {
  341. std::unique_lock<std::mutex> lk(m_mtx);
  342. if (!m_on)
  343. return;
  344. if (!m_pool)
  345. thr_timer_set_period(this, 1000ULL * period_ms);
  346. else
  347. m_period = period_ms;
  348. }
  349. void disarm() override
  350. {
  351. std::unique_lock<std::mutex> lk(m_mtx);
  352. m_on = false;
  353. thr_timer_end(this);
  354. lk.unlock();
  355. if (m_task.m_group)
  356. {
  357. m_task.m_group->cancel_pending(&m_task);
  358. }
  359. if (m_pool)
  360. {
  361. m_pool->cancel_pending(&m_task);
  362. }
  363. m_task.wait();
  364. }
  365. ~timer_generic() override
  366. {
  367. disarm();
  368. }
  369. };
  370. timer_generic m_maintenance_timer;
  371. timer* create_timer(callback_func func, void *data) override
  372. {
  373. return new timer_generic(func, data, this);
  374. }
  375. void set_concurrency(unsigned int concurrency=0) override;
  376. };
  377. void thread_pool_generic::cancel_pending(task* t)
  378. {
  379. std::unique_lock <std::mutex> lk(m_mtx);
  380. for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
  381. {
  382. if (*it == t)
  383. {
  384. t->release();
  385. *it = nullptr;
  386. }
  387. }
  388. }
  389. /**
  390. Register worker in standby list, and wait to be woken.
  391. @retval true if thread was woken
  392. @retval false idle wait timeout exceeded (the current thread must shutdown)
  393. */
  394. bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
  395. worker_data *thread_data)
  396. {
  397. assert(m_task_queue.empty());
  398. assert(!m_in_shutdown);
  399. thread_data->m_wake_reason= WAKE_REASON_NONE;
  400. m_active_threads.erase(thread_data);
  401. m_standby_threads.push_back(thread_data);
  402. for (;;)
  403. {
  404. thread_data->m_cv.wait_for(lk, m_thread_timeout);
  405. if (thread_data->m_wake_reason != WAKE_REASON_NONE)
  406. {
  407. /* Woke up not due to timeout.*/
  408. return true;
  409. }
  410. if (thread_count() <= m_min_threads)
  411. {
  412. /* Do not shutdown thread, maintain required minimum of worker
  413. threads.*/
  414. continue;
  415. }
  416. /*
  417. Woke up due to timeout, remove this thread's from the standby list. In
  418. all other cases where it is signaled it is removed by the signaling
  419. thread.
  420. */
  421. m_standby_threads.erase(thread_data);
  422. m_active_threads.push_back(thread_data);
  423. return false;
  424. }
  425. }
  426. /**
  427. Workers "get next task" routine.
  428. A task can be handed over to the current thread directly during submit().
  429. if thread_var->m_wake_reason == WAKE_REASON_TASK.
  430. Or a task can be taken from the task queue.
  431. In case task queue is empty, the worker thread will park (wait for wakeup).
  432. */
  433. bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
  434. {
  435. std::unique_lock<std::mutex> lk(m_mtx);
  436. if (thread_var->is_long_task())
  437. {
  438. DBUG_ASSERT(m_long_tasks_count);
  439. m_long_tasks_count--;
  440. }
  441. DBUG_ASSERT(!thread_var->is_waiting());
  442. thread_var->m_state = worker_data::NONE;
  443. while (m_task_queue.empty())
  444. {
  445. if (m_in_shutdown)
  446. return false;
  447. if (!wait_for_tasks(lk, thread_var))
  448. return false;
  449. if (m_task_queue.empty())
  450. {
  451. m_spurious_wakeups++;
  452. continue;
  453. }
  454. }
  455. /* Dequeue from the task queue.*/
  456. *t= m_task_queue.front();
  457. m_task_queue.pop();
  458. m_tasks_dequeued++;
  459. thread_var->m_state |= worker_data::EXECUTING_TASK;
  460. thread_var->m_task_start_time = m_timestamp;
  461. return true;
  462. }
  463. /** Worker thread shutdown routine. */
  464. void thread_pool_generic::worker_end(worker_data* thread_data)
  465. {
  466. std::lock_guard<std::mutex> lk(m_mtx);
  467. DBUG_ASSERT(!thread_data->is_long_task());
  468. m_active_threads.erase(thread_data);
  469. m_thread_data_cache.put(thread_data);
  470. if (!thread_count() && m_in_shutdown)
  471. {
  472. /* Signal the destructor that no more threads are left. */
  473. m_cv_no_threads.notify_all();
  474. }
  475. }
  476. extern "C" void set_tls_pool(tpool::thread_pool* pool);
  477. /* The worker get/execute task loop.*/
  478. void thread_pool_generic::worker_main(worker_data *thread_var)
  479. {
  480. task* task;
  481. set_tls_pool(this);
  482. m_worker_init_callback();
  483. tls_worker_data = thread_var;
  484. m_thread_creation_pending.clear();
  485. while (get_task(thread_var, &task) && task)
  486. {
  487. task->execute();
  488. }
  489. m_worker_destroy_callback();
  490. worker_end(thread_var);
  491. }
  492. /*
  493. Check if threadpool had been idle for a while
  494. Switch off maintenance timer if it is in idle state
  495. for too long.
  496. Helper function, to be used inside maintenance callback,
  497. before m_last_activity is updated
  498. */
  499. static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
  500. constexpr auto max_idle_time= std::chrono::minutes(1);
  501. /* Time since maintenance timer had nothing to do */
  502. static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
  503. void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
  504. {
  505. DBUG_ASSERT(m_task_queue.empty());
  506. /*
  507. We think that there is no activity, if there were at most 2 tasks
  508. since last time, and there is a spare thread.
  509. The 2 tasks (and not 0) is to account for some periodic timers.
  510. */
  511. bool idle= m_standby_threads.m_count > 0;
  512. if (!idle)
  513. {
  514. idle_since= invalid_timestamp;
  515. return;
  516. }
  517. if (idle_since == invalid_timestamp)
  518. {
  519. idle_since= now;
  520. return;
  521. }
  522. /* Switch timer off after 1 minute of idle time */
  523. if (now - idle_since > max_idle_time && m_active_threads.empty())
  524. {
  525. idle_since= invalid_timestamp;
  526. switch_timer(timer_state_t::OFF);
  527. }
  528. }
  529. /*
  530. Periodic job to fix thread count and concurrency,
  531. in case of long tasks, etc
  532. */
  533. void thread_pool_generic::maintenance()
  534. {
  535. /*
  536. If pool is busy (i.e the its mutex is currently locked), we can
  537. skip the maintenance task, some times, to lower mutex contention
  538. */
  539. static int skip_counter;
  540. const int MAX_SKIPS = 10;
  541. std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
  542. if (skip_counter == MAX_SKIPS)
  543. {
  544. lk.lock();
  545. }
  546. else if (!lk.try_lock())
  547. {
  548. skip_counter++;
  549. return;
  550. }
  551. skip_counter = 0;
  552. m_timestamp = std::chrono::system_clock::now();
  553. if (m_task_queue.empty())
  554. {
  555. check_idle(m_timestamp);
  556. m_last_activity = m_tasks_dequeued + m_wakeups;
  557. return;
  558. }
  559. m_long_tasks_count = 0;
  560. for (auto thread_data = m_active_threads.front();
  561. thread_data;
  562. thread_data = thread_data->m_next)
  563. {
  564. if (thread_data->is_executing_task() &&
  565. !thread_data->is_waiting() &&
  566. (thread_data->is_long_task()
  567. || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
  568. {
  569. thread_data->m_state |= worker_data::LONG_TASK;
  570. m_long_tasks_count++;
  571. }
  572. }
  573. maybe_wake_or_create_thread();
  574. size_t thread_cnt = (int)thread_count();
  575. if (m_last_activity == m_tasks_dequeued + m_wakeups &&
  576. m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
  577. {
  578. // no progress made since last iteration. create new
  579. // thread
  580. add_thread();
  581. }
  582. m_last_activity = m_tasks_dequeued + m_wakeups;
  583. m_last_thread_count= thread_cnt;
  584. }
  585. /*
  586. Heuristic used for thread creation throttling.
  587. Returns interval in milliseconds between thread creation
  588. (depending on number of threads already in the pool, and
  589. desired concurrency level)
  590. */
  591. static int throttling_interval_ms(size_t n_threads,size_t concurrency)
  592. {
  593. if (n_threads < concurrency*4)
  594. return 0;
  595. if (n_threads < concurrency*8)
  596. return 50;
  597. if (n_threads < concurrency*16)
  598. return 100;
  599. return 200;
  600. }
  601. /* Create a new worker.*/
  602. bool thread_pool_generic::add_thread()
  603. {
  604. size_t n_threads = thread_count();
  605. if (n_threads >= m_max_threads)
  606. return false;
  607. /*
  608. Deadlock danger exists, so monitor pool health
  609. with maintenance timer.
  610. */
  611. switch_timer(timer_state_t::ON);
  612. if (n_threads >= m_min_threads)
  613. {
  614. auto now = std::chrono::system_clock::now();
  615. if (now - m_last_thread_creation <
  616. std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
  617. {
  618. /*
  619. Throttle thread creation and wakeup deadlock detection timer,
  620. if is it off.
  621. */
  622. return false;
  623. }
  624. }
  625. /* Check and set "thread creation pending" flag before creating the thread. We
  626. reset the flag in thread_pool_generic::worker_main in new thread created. The
  627. flag must be reset back in case we fail to create the thread. If this flag is
  628. not reset all future attempt to create thread for this pool would not work as
  629. we would return from here. */
  630. if (m_thread_creation_pending.test_and_set())
  631. return false;
  632. worker_data *thread_data = m_thread_data_cache.get();
  633. m_active_threads.push_back(thread_data);
  634. try
  635. {
  636. std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
  637. m_last_thread_creation = std::chrono::system_clock::now();
  638. thread.detach();
  639. }
  640. catch (std::system_error& e)
  641. {
  642. m_active_threads.erase(thread_data);
  643. m_thread_data_cache.put(thread_data);
  644. static bool warning_written;
  645. if (!warning_written)
  646. {
  647. fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
  648. "current number of threads in pool %zu\n", e.what(), thread_count());
  649. warning_written = true;
  650. }
  651. m_thread_creation_pending.clear();
  652. return false;
  653. }
  654. return true;
  655. }
  656. /** Wake a standby thread, and hand the given task over to this thread. */
  657. bool thread_pool_generic::wake(worker_wake_reason reason, task *)
  658. {
  659. assert(reason != WAKE_REASON_NONE);
  660. if (m_standby_threads.empty())
  661. return false;
  662. auto var= m_standby_threads.back();
  663. m_standby_threads.pop_back();
  664. m_active_threads.push_back(var);
  665. assert(var->m_wake_reason == WAKE_REASON_NONE);
  666. var->m_wake_reason= reason;
  667. var->m_cv.notify_one();
  668. m_wakeups++;
  669. return true;
  670. }
  671. thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
  672. m_thread_data_cache(max_threads),
  673. m_task_queue(10000),
  674. m_standby_threads(),
  675. m_active_threads(),
  676. m_mtx(),
  677. m_thread_timeout(std::chrono::milliseconds(60000)),
  678. m_timer_interval(std::chrono::milliseconds(400)),
  679. m_cv_no_threads(),
  680. m_cv_timer(),
  681. m_tasks_enqueued(),
  682. m_tasks_dequeued(),
  683. m_wakeups(),
  684. m_spurious_wakeups(),
  685. m_timer_state(timer_state_t::ON),
  686. m_timestamp(),
  687. m_long_tasks_count(),
  688. m_waiting_task_count(),
  689. m_last_thread_creation(),
  690. m_min_threads(min_threads),
  691. m_max_threads(max_threads),
  692. m_last_thread_count(),
  693. m_last_activity(),
  694. m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr)
  695. {
  696. set_concurrency();
  697. // start the timer
  698. m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
  699. }
  700. void thread_pool_generic::maybe_wake_or_create_thread()
  701. {
  702. if (m_task_queue.empty())
  703. return;
  704. DBUG_ASSERT(m_active_threads.size() >= static_cast<size_t>(m_long_tasks_count + m_waiting_task_count));
  705. if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency)
  706. return;
  707. if (!m_standby_threads.empty())
  708. {
  709. wake(WAKE_REASON_TASK);
  710. }
  711. else
  712. {
  713. add_thread();
  714. }
  715. }
  716. bool thread_pool_generic::too_many_active_threads()
  717. {
  718. return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count >
  719. m_concurrency* OVERSUBSCRIBE_FACTOR;
  720. }
  721. void thread_pool_generic::set_concurrency(unsigned int concurrency)
  722. {
  723. std::unique_lock<std::mutex> lk(m_mtx);
  724. if (concurrency == 0)
  725. concurrency= 2 * std::thread::hardware_concurrency();
  726. m_concurrency = concurrency;
  727. if (m_concurrency > m_max_threads)
  728. m_concurrency = m_max_threads;
  729. if (m_concurrency < m_min_threads)
  730. m_concurrency = m_min_threads;
  731. if (m_concurrency < 1)
  732. m_concurrency = 1;
  733. }
  734. /** Submit a new task*/
  735. void thread_pool_generic::submit_task(task* task)
  736. {
  737. std::unique_lock<std::mutex> lk(m_mtx);
  738. if (m_in_shutdown)
  739. return;
  740. task->add_ref();
  741. m_tasks_enqueued++;
  742. m_task_queue.push(task);
  743. maybe_wake_or_create_thread();
  744. }
  745. /* Notify thread pool that current thread is going to wait */
  746. void thread_pool_generic::wait_begin()
  747. {
  748. if (!tls_worker_data || tls_worker_data->is_long_task())
  749. return;
  750. std::unique_lock<std::mutex> lk(m_mtx);
  751. if(tls_worker_data->is_long_task())
  752. {
  753. /*
  754. Current task flag could have become "long-running"
  755. while waiting for the lock, thus recheck.
  756. */
  757. return;
  758. }
  759. DBUG_ASSERT(!tls_worker_data->is_waiting());
  760. tls_worker_data->m_state |= worker_data::WAITING;
  761. m_waiting_task_count++;
  762. /* Maintain concurrency */
  763. maybe_wake_or_create_thread();
  764. }
  765. void thread_pool_generic::wait_end()
  766. {
  767. if (tls_worker_data && tls_worker_data->is_waiting())
  768. {
  769. std::unique_lock<std::mutex> lk(m_mtx);
  770. tls_worker_data->m_state &= ~worker_data::WAITING;
  771. m_waiting_task_count--;
  772. }
  773. }
  774. void thread_pool_generic::switch_timer(timer_state_t state)
  775. {
  776. if (m_timer_state == state)
  777. return;
  778. /*
  779. We can't use timer::set_time, because mysys timers are deadlock
  780. prone.
  781. Instead, to switch off we increase the timer period
  782. and decrease period to switch on.
  783. This might introduce delays in thread creation when needed,
  784. as period will only be changed when timer fires next time.
  785. For this reason, we can't use very long periods for the "off" state.
  786. */
  787. m_timer_state= state;
  788. long long period= (state == timer_state_t::OFF) ?
  789. m_timer_interval.count()*10: m_timer_interval.count();
  790. m_maintenance_timer.set_period((int)period);
  791. }
  792. /**
  793. Wake up all workers, and wait until they are gone
  794. Stop the timer.
  795. */
  796. thread_pool_generic::~thread_pool_generic()
  797. {
  798. /*
  799. Stop AIO early.
  800. This is needed to prevent AIO completion thread
  801. from calling submit_task() on an object that is being destroyed.
  802. */
  803. m_aio.reset();
  804. /* Also stop the maintanence task early. */
  805. m_maintenance_timer.disarm();
  806. std::unique_lock<std::mutex> lk(m_mtx);
  807. m_in_shutdown= true;
  808. /* Wake up idle threads. */
  809. while (wake(WAKE_REASON_SHUTDOWN))
  810. {
  811. }
  812. while (thread_count())
  813. {
  814. m_cv_no_threads.wait(lk);
  815. }
  816. lk.unlock();
  817. }
  818. thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
  819. {
  820. return new thread_pool_generic(min_threads, max_threads);
  821. }
  822. } // namespace tpool