You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

404 lines
10 KiB

  1. /*****************************************************************************
  2. Copyright (c) 2020 MariaDB Corporation.
  3. This program is free software; you can redistribute it and/or modify it under
  4. the terms of the GNU General Public License as published by the Free Software
  5. Foundation; version 2 of the License.
  6. This program is distributed in the hope that it will be useful, but WITHOUT
  7. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  8. FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  9. You should have received a copy of the GNU General Public License along with
  10. this program; if not, write to the Free Software Foundation, Inc.,
  11. 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
  12. *****************************************************************************/
  13. /*
  14. The group commit synchronization used in log_write_up_to()
  15. works as follows
  16. For simplicity, lets consider only write operation,synchronozation of
  17. flush operation works the same.
  18. Rules of the game
  19. A thread enters log_write_up_to() with lsn of the current transaction
  20. 1. If last written lsn is greater than wait lsn (another thread already
  21. wrote the log buffer),then there is no need to do anything.
  22. 2. If no other thread is currently writing, write the log buffer,
  23. and update last written lsn.
  24. 3. Otherwise, wait, and go to step 1.
  25. Synchronization can be done in different ways, e.g
  26. a) Simple mutex locking the entire check and write operation
  27. Disadvantage that threads that could continue after updating
  28. last written lsn, still wait.
  29. b) Spinlock, with periodic checks for last written lsn.
  30. Fixes a) but burns CPU unnecessary.
  31. c) Mutex / condition variable combo.
  32. Condtion variable notifies (broadcast) all waiters, whenever
  33. last written lsn is changed.
  34. Has a disadvantage of many suprious wakeups, stress on OS scheduler,
  35. and mutex contention.
  36. d) Something else.
  37. Make use of the waiter's lsn parameter, and only wakeup "right" waiting
  38. threads.
  39. We chose d). Even if implementation is more complicated than alternatves
  40. due to the need to maintain list of waiters, it provides the best performance.
  41. See group_commit_lock implementation for details.
  42. Note that if write operation is very fast, a) or b) can be fine as alternative.
  43. */
  44. #ifdef _WIN32
  45. #include <windows.h>
  46. #endif
  47. #ifdef __linux__
  48. #include <linux/futex.h>
  49. #include <sys/syscall.h>
  50. #endif
  51. #include <atomic>
  52. #include <thread>
  53. #include <mutex>
  54. #include <condition_variable>
  55. #include <my_cpu.h>
  56. #include <log0types.h>
  57. #include "log0sync.h"
  58. #include <mysql/service_thd_wait.h>
  59. #include <sql_class.h>
  60. /**
  61. Helper class , used in group commit lock.
  62. Binary semaphore, or (same thing), an auto-reset event
  63. Has state (signalled or not), and provides 2 operations.
  64. wait() and wake()
  65. The implementation uses efficient locking primitives on Linux and Windows.
  66. Or, mutex/condition combo elsewhere.
  67. */
  68. class binary_semaphore
  69. {
  70. public:
  71. /**Wait until semaphore becomes signalled, and atomically reset the state
  72. to non-signalled*/
  73. void wait();
  74. /** signals the semaphore */
  75. void wake();
  76. private:
  77. #if defined(__linux__) || defined (_WIN32)
  78. std::atomic<int> m_signalled;
  79. static constexpr std::memory_order mem_order= std::memory_order_acq_rel;
  80. public:
  81. binary_semaphore() :m_signalled(0) {}
  82. #else
  83. std::mutex m_mtx{};
  84. std::condition_variable m_cv{};
  85. bool m_signalled = false;
  86. #endif
  87. };
  88. #if defined (__linux__) || defined (_WIN32)
  89. void binary_semaphore::wait()
  90. {
  91. for (;;)
  92. {
  93. if (m_signalled.exchange(0, mem_order) == 1)
  94. {
  95. break;
  96. }
  97. #ifdef _WIN32
  98. int zero = 0;
  99. WaitOnAddress(&m_signalled, &zero, sizeof(m_signalled), INFINITE);
  100. #else
  101. syscall(SYS_futex, &m_signalled, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
  102. #endif
  103. }
  104. }
  105. void binary_semaphore::wake()
  106. {
  107. if (m_signalled.exchange(1, mem_order) == 0)
  108. {
  109. #ifdef _WIN32
  110. WakeByAddressSingle(&m_signalled);
  111. #else
  112. syscall(SYS_futex, &m_signalled, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
  113. #endif
  114. }
  115. }
  116. #else
  117. void binary_semaphore::wait()
  118. {
  119. std::unique_lock<std::mutex> lk(m_mtx);
  120. while (!m_signalled)
  121. m_cv.wait(lk);
  122. m_signalled = false;
  123. }
  124. void binary_semaphore::wake()
  125. {
  126. std::unique_lock<std::mutex> lk(m_mtx);
  127. m_signalled = true;
  128. m_cv.notify_one();
  129. }
  130. #endif
  131. /* A thread helper structure, used in group commit lock below*/
  132. struct group_commit_waiter_t
  133. {
  134. lsn_t m_value=0;
  135. binary_semaphore m_sema{};
  136. group_commit_waiter_t* m_next= nullptr;
  137. bool m_group_commit_leader=false;
  138. };
  139. group_commit_lock::group_commit_lock() :
  140. m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list()
  141. {
  142. }
  143. group_commit_lock::value_type group_commit_lock::value() const
  144. {
  145. return m_value.load(std::memory_order::memory_order_relaxed);
  146. }
  147. group_commit_lock::value_type group_commit_lock::pending() const
  148. {
  149. return m_pending_value.load(std::memory_order::memory_order_relaxed);
  150. }
  151. void group_commit_lock::set_pending(group_commit_lock::value_type num)
  152. {
  153. ut_a(num >= value());
  154. m_pending_value.store(num, std::memory_order::memory_order_relaxed);
  155. }
  156. const unsigned int MAX_SPINS = 1; /** max spins in acquire */
  157. static thread_local group_commit_waiter_t thread_local_waiter;
  158. static inline void do_completion_callback(const completion_callback* cb)
  159. {
  160. if (cb)
  161. cb->m_callback(cb->m_param);
  162. }
  163. group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
  164. {
  165. unsigned int spins = MAX_SPINS;
  166. for(;;)
  167. {
  168. if (num <= value())
  169. {
  170. /* No need to wait.*/
  171. do_completion_callback(callback);
  172. return lock_return_code::EXPIRED;
  173. }
  174. if(spins-- == 0)
  175. break;
  176. if (num > pending())
  177. {
  178. /* Longer wait expected (longer than currently running operation),
  179. don't spin.*/
  180. break;
  181. }
  182. ut_delay(1);
  183. }
  184. thread_local_waiter.m_value = num;
  185. thread_local_waiter.m_group_commit_leader= false;
  186. std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
  187. while (num > value() || thread_local_waiter.m_group_commit_leader)
  188. {
  189. lk.lock();
  190. /* Re-read current value after acquiring the lock*/
  191. if (num <= value() &&
  192. (!thread_local_waiter.m_group_commit_leader || m_lock))
  193. {
  194. lk.unlock();
  195. do_completion_callback(callback);
  196. return lock_return_code::EXPIRED;
  197. }
  198. if (!m_lock)
  199. {
  200. /* Take the lock, become group commit leader.*/
  201. m_lock = true;
  202. #ifndef DBUG_OFF
  203. m_owner_id = std::this_thread::get_id();
  204. #endif
  205. if (callback)
  206. m_pending_callbacks.push_back({num,*callback});
  207. return lock_return_code::ACQUIRED;
  208. }
  209. if (callback && (m_waiters_list || num <= pending()))
  210. {
  211. /*
  212. If num > pending(), we have a good candidate for the next group
  213. commit lead, that will be taking over the lock after current owner
  214. releases it. We put current thread into waiter's list so it sleeps
  215. and can be signaled and marked as group commit lead during lock release.
  216. For this to work well, pending() must deliver a good approximation for N
  217. in the next call to group_commit_lock::release(N).
  218. */
  219. m_pending_callbacks.push_back({num, *callback});
  220. return lock_return_code::CALLBACK_QUEUED;
  221. }
  222. /* Add yourself to waiters list.*/
  223. thread_local_waiter.m_group_commit_leader= false;
  224. thread_local_waiter.m_next = m_waiters_list;
  225. m_waiters_list = &thread_local_waiter;
  226. lk.unlock();
  227. /* Sleep until woken in release().*/
  228. thd_wait_begin(0,THD_WAIT_GROUP_COMMIT);
  229. thread_local_waiter.m_sema.wait();
  230. thd_wait_end(0);
  231. }
  232. do_completion_callback(callback);
  233. return lock_return_code::EXPIRED;
  234. }
  235. group_commit_lock::value_type group_commit_lock::release(value_type num)
  236. {
  237. completion_callback callbacks[950]; // 1000 fails with framesize 16384
  238. size_t callback_count = 0;
  239. value_type ret = 0;
  240. std::unique_lock<std::mutex> lk(m_mtx);
  241. m_lock = false;
  242. /* Update current value. */
  243. ut_a(num >= value());
  244. m_value.store(num, std::memory_order_relaxed);
  245. /*
  246. Wake waiters for value <= current value.
  247. Wake one more waiter, who will become the group commit lead.
  248. */
  249. group_commit_waiter_t* cur, * prev, * next;
  250. group_commit_waiter_t* wakeup_list = nullptr;
  251. for (auto& c : m_pending_callbacks)
  252. {
  253. if (c.first <= num)
  254. {
  255. if (callback_count < array_elements(callbacks))
  256. callbacks[callback_count++] = c.second;
  257. else
  258. c.second.m_callback(c.second.m_param);
  259. }
  260. }
  261. for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
  262. {
  263. next= cur->m_next;
  264. if (cur->m_value <= num)
  265. {
  266. /* Move current waiter to wakeup_list*/
  267. if (!prev)
  268. {
  269. /* Remove from the start of the list.*/
  270. m_waiters_list = next;
  271. }
  272. else
  273. {
  274. /* Remove from the middle of the list.*/
  275. prev->m_next= cur->m_next;
  276. }
  277. /* Append entry to the wakeup list.*/
  278. cur->m_next = wakeup_list;
  279. wakeup_list = cur;
  280. }
  281. else
  282. {
  283. prev= cur;
  284. }
  285. }
  286. auto it= std::remove_if(
  287. m_pending_callbacks.begin(), m_pending_callbacks.end(),
  288. [num](const pending_cb &c) { return c.first <= num; });
  289. m_pending_callbacks.erase(it, m_pending_callbacks.end());
  290. if (m_pending_callbacks.size() || m_waiters_list)
  291. {
  292. /*
  293. Ensure that after this thread released the lock,
  294. there is a new group commit leader
  295. We take this from waiters list or wakeup list. It
  296. might look like a spurious wake, but in fact we just
  297. ensure the waiter do not wait for eternity.
  298. */
  299. if (m_waiters_list)
  300. {
  301. /* Move one waiter to wakeup list */
  302. auto e= m_waiters_list;
  303. m_waiters_list= m_waiters_list->m_next;
  304. e->m_next= wakeup_list;
  305. e->m_group_commit_leader= true;
  306. wakeup_list = e;
  307. }
  308. else if (wakeup_list)
  309. {
  310. wakeup_list->m_group_commit_leader=true;
  311. }
  312. else
  313. {
  314. /* Tell the caller that some pending callbacks left, and he should
  315. do something to prevent stalls. This should be a rare situation.*/
  316. ret= m_pending_callbacks[0].first;
  317. }
  318. }
  319. lk.unlock();
  320. /*
  321. Release designated next group commit lead first,
  322. to minimize spurious wakeups.
  323. */
  324. if (wakeup_list && wakeup_list->m_group_commit_leader)
  325. {
  326. next = wakeup_list->m_next;
  327. wakeup_list->m_sema.wake();
  328. wakeup_list= next;
  329. }
  330. for (size_t i = 0; i < callback_count; i++)
  331. callbacks[i].m_callback(callbacks[i].m_param);
  332. for (cur= wakeup_list; cur; cur= next)
  333. {
  334. next= cur->m_next;
  335. cur->m_sema.wake();
  336. }
  337. return ret;
  338. }
  339. #ifndef DBUG_OFF
  340. bool group_commit_lock::is_owner()
  341. {
  342. return m_lock && std::this_thread::get_id() == m_owner_id;
  343. }
  344. #endif