diff --git a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc index 7ede78b3c0d..cadaa48ccea 100644 --- a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc +++ b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc @@ -425,6 +425,7 @@ static void print_db_env_struct (void) { "bool (*set_dir_per_db)(DB_ENV *, bool new_val)", "bool (*get_dir_per_db)(DB_ENV *)", "const char *(*get_data_dir)(DB_ENV *env)", + "void (*kill_waiter)(DB_ENV *, void *extra)", NULL}; sort_and_dump_fields("db_env", true, extra); @@ -545,8 +546,8 @@ static void print_db_txn_struct (void) { "int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)", "int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *, uint32_t flags)", "uint64_t (*id64) (DB_TXN*)", - "void (*set_client_id)(DB_TXN *, uint64_t client_id)", - "uint64_t (*get_client_id)(DB_TXN *)", + "void (*set_client_id)(DB_TXN *, uint64_t client_id, void *client_extra)", + "void (*get_client_id)(DB_TXN *, uint64_t *client_id, void **client_extra)", "bool (*is_prepared)(DB_TXN *)", "DB_TXN *(*get_child)(DB_TXN *)", "uint64_t (*get_start_time)(DB_TXN *)", diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.cc b/storage/tokudb/PerconaFT/ft/txn/txn.cc index dd03073a3ec..9e48d0d05dd 100644 --- a/storage/tokudb/PerconaFT/ft/txn/txn.cc +++ b/storage/tokudb/PerconaFT/ft/txn/txn.cc @@ -269,6 +269,7 @@ static txn_child_manager tcm; .state = TOKUTXN_LIVE, .num_pin = 0, .client_id = 0, + .client_extra = nullptr, .start_time = time(NULL), }; @@ -705,12 +706,14 @@ bool toku_txn_has_spilled_rollback(TOKUTXN txn) { return txn_has_spilled_rollback_logs(txn); } -uint64_t toku_txn_get_client_id(TOKUTXN txn) { - return txn->client_id; +void toku_txn_get_client_id(TOKUTXN txn, uint64_t *client_id, void **client_extra) { + *client_id = txn->client_id; + *client_extra = txn->client_extra; } -void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id) { +void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *client_extra) { txn->client_id = client_id; + txn->client_extra = client_extra; } time_t toku_txn_get_start_time(struct tokutxn *txn) { diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.h b/storage/tokudb/PerconaFT/ft/txn/txn.h index 51a46022150..34a76aa9cad 100644 --- a/storage/tokudb/PerconaFT/ft/txn/txn.h +++ b/storage/tokudb/PerconaFT/ft/txn/txn.h @@ -193,6 +193,7 @@ struct tokutxn { uint32_t num_pin; // number of threads (all hot indexes) that want this // txn to not transition to commit or abort uint64_t client_id; + void *client_extra; time_t start_time; }; typedef struct tokutxn *TOKUTXN; @@ -293,8 +294,8 @@ void toku_txn_unpin_live_txn(struct tokutxn *txn); bool toku_txn_has_spilled_rollback(struct tokutxn *txn); -uint64_t toku_txn_get_client_id(struct tokutxn *txn); -void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id); +void toku_txn_get_client_id(struct tokutxn *txn, uint64_t *client_id, void **client_extra); +void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id, void *client_extra); time_t toku_txn_get_start_time(struct tokutxn *txn); diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.cc b/storage/tokudb/PerconaFT/locktree/lock_request.cc index 22b6da9afc4..1bc613533db 100644 --- a/storage/tokudb/PerconaFT/locktree/lock_request.cc +++ b/storage/tokudb/PerconaFT/locktree/lock_request.cc @@ -65,6 +65,7 @@ void lock_request::create(void) { toku_cond_init(&m_wait_cond, nullptr); m_start_test_callback = nullptr; + m_start_before_pending_test_callback = nullptr; m_retry_test_callback = nullptr; } @@ -79,7 +80,7 @@ void lock_request::destroy(void) { } // set the lock request parameters. this API allows a lock request to be reused. -void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn) { +void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, lock_request::type lock_type, bool big_txn, void *extra) { invariant(m_state != state::PENDING); m_lt = lt; m_txnid = txnid; @@ -91,6 +92,7 @@ void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT m_state = state::INITIALIZED; m_info = lt ? lt->get_lock_request_info() : nullptr; m_big_txn = big_txn; + m_extra = extra; } // get rid of any stored left and right key copies and @@ -173,6 +175,7 @@ int lock_request::start(void) { m_state = state::PENDING; m_start_time = toku_current_time_microsec() / 1000; m_conflicting_txnid = conflicts.get(0); + if (m_start_before_pending_test_callback) m_start_before_pending_test_callback(); toku_mutex_lock(&m_info->mutex); insert_into_lock_requests(); if (deadlock_exists(conflicts)) { @@ -203,7 +206,18 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil toku_mutex_lock(&m_info->mutex); + // check again, this time locking out other retry calls + if (m_state == state::PENDING) { + retry(); + } + while (m_state == state::PENDING) { + // check if this thread is killed + if (killed_callback && killed_callback()) { + remove_from_lock_requests(); + complete(DB_LOCK_NOTGRANTED); + continue; + } // compute next wait time uint64_t t_wait; @@ -221,7 +235,7 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil invariant(r == 0 || r == ETIMEDOUT); t_now = toku_current_time_microsec(); - if (m_state == state::PENDING && (t_now >= t_end || (killed_callback && killed_callback()))) { + if (m_state == state::PENDING && t_now >= t_end) { m_info->counters.timeout_count += 1; // if we're still pending and we timed out, then remove our @@ -274,13 +288,15 @@ TXNID lock_request::get_conflicting_txnid(void) const { } int lock_request::retry(void) { + invariant(m_state == state::PENDING); int r; - invariant(m_state == state::PENDING); + txnid_set conflicts; + conflicts.create(); if (m_type == type::WRITE) { - r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn); + r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn); } else { - r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, nullptr, m_big_txn); + r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts, m_big_txn); } // if the acquisition succeeded then remove ourselves from the @@ -290,59 +306,77 @@ int lock_request::retry(void) { complete(r); if (m_retry_test_callback) m_retry_test_callback(); // test callback toku_cond_broadcast(&m_wait_cond); + } else { + m_conflicting_txnid = conflicts.get(0); } + conflicts.destroy(); return r; } -void lock_request::retry_all_lock_requests(locktree *lt) { +void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_test_callback)(void)) { lt_lock_request_info *info = lt->get_lock_request_info(); - // if a thread reads this bit to be true, then it should go ahead and - // take the locktree mutex and retry lock requests. we use this bit - // to prevent every single thread from waiting on the locktree mutex - // in order to retry requests, especially when no requests actually exist. - // - // it is important to note that this bit only provides an optimization. - // it is not problematic for it to be true when it should be false, - // but it can be problematic for it to be false when it should be true. - // therefore, the lock request code must ensures that when lock requests - // are added to this locktree, the bit is set. - // see lock_request::insert_into_lock_requests() - if (!info->should_retry_lock_requests) { + info->retry_want++; + + // if there are no pending lock requests than there is nothing to do + // the unlocked data race on pending_is_empty is OK since lock requests + // are retried after added to the pending set. + if (info->pending_is_empty) return; - } toku_mutex_lock(&info->mutex); - // let other threads know that they need not retry lock requests at this time. - // - // the motivation here is that if a bunch of threads have already released - // their locks in the rangetree, then its probably okay for only one thread - // to iterate over the list of requests and retry them. otherwise, at high - // thread counts and a large number of pending lock requests, you could - // end up wasting a lot of cycles. - info->should_retry_lock_requests = false; - - size_t i = 0; - while (i < info->pending_lock_requests.size()) { - lock_request *request; - int r = info->pending_lock_requests.fetch(i, &request); - invariant_zero(r); - - // retry the lock request. if it didn't succeed, - // move on to the next lock request. otherwise - // the request is gone from the list so we may - // read the i'th entry for the next one. - r = request->retry(); - if (r != 0) { - i++; + // here is the group retry algorithm. + // get the latest retry_want count and use it as the generation number of this retry operation. + // if this retry generation is > the last retry generation, then do the lock retries. otherwise, + // no lock retries are needed. + unsigned long long retry_gen = info->retry_want.load(); + if (retry_gen > info->retry_done) { + + // retry all of the pending lock requests. + for (size_t i = 0; i < info->pending_lock_requests.size(); ) { + lock_request *request; + int r = info->pending_lock_requests.fetch(i, &request); + invariant_zero(r); + + // retry this lock request. if it didn't succeed, + // move on to the next lock request. otherwise + // the request is gone from the list so we may + // read the i'th entry for the next one. + r = request->retry(); + if (r != 0) { + i++; + } } + if (after_retry_all_test_callback) after_retry_all_test_callback(); + info->retry_done = retry_gen; } - // future threads should only retry lock requests if some still exist - info->should_retry_lock_requests = info->pending_lock_requests.size() > 0; + toku_mutex_unlock(&info->mutex); +} +void *lock_request::get_extra(void) const { + return m_extra; +} + +void lock_request::kill_waiter(void) { + remove_from_lock_requests(); + complete(DB_LOCK_NOTGRANTED); + toku_cond_broadcast(&m_wait_cond); +} + +void lock_request::kill_waiter(locktree *lt, void *extra) { + lt_lock_request_info *info = lt->get_lock_request_info(); + toku_mutex_lock(&info->mutex); + for (size_t i = 0; i < info->pending_lock_requests.size(); i++) { + lock_request *request; + int r = info->pending_lock_requests.fetch(i, &request); + if (r == 0 && request->get_extra() == extra) { + request->kill_waiter(); + break; + } + } toku_mutex_unlock(&info->mutex); } @@ -364,9 +398,7 @@ void lock_request::insert_into_lock_requests(void) { invariant(r == DB_NOTFOUND); r = m_info->pending_lock_requests.insert_at(this, idx); invariant_zero(r); - - // ensure that this bit is true, now that at least one lock request is in the set - m_info->should_retry_lock_requests = true; + m_info->pending_is_empty = false; } // remove this lock request from the locktree's set. must hold the mutex. @@ -378,6 +410,8 @@ void lock_request::remove_from_lock_requests(void) { invariant(request == this); r = m_info->pending_lock_requests.delete_at(idx); invariant_zero(r); + if (m_info->pending_lock_requests.size() == 0) + m_info->pending_is_empty = true; } int lock_request::find_by_txnid(lock_request * const &request, const TXNID &txnid) { @@ -395,6 +429,10 @@ void lock_request::set_start_test_callback(void (*f)(void)) { m_start_test_callback = f; } +void lock_request::set_start_before_pending_test_callback(void (*f)(void)) { + m_start_before_pending_test_callback = f; +} + void lock_request::set_retry_test_callback(void (*f)(void)) { m_retry_test_callback = f; } diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.h b/storage/tokudb/PerconaFT/locktree/lock_request.h index 48d1279cde2..ab69253bcec 100644 --- a/storage/tokudb/PerconaFT/locktree/lock_request.h +++ b/storage/tokudb/PerconaFT/locktree/lock_request.h @@ -78,7 +78,7 @@ public: // effect: Resets the lock request parameters, allowing it to be reused. // requires: Lock request was already created at some point - void set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, type lock_type, bool big_txn); + void set(locktree *lt, TXNID txnid, const DBT *left_key, const DBT *right_key, type lock_type, bool big_txn, void *extra = nullptr); // effect: Tries to acquire a lock described by this lock request. // returns: The return code of locktree::acquire_[write,read]_lock() @@ -109,12 +109,18 @@ public: // effect: Retries all of the lock requests for the given locktree. // Any lock requests successfully restarted is completed and woken up. // The rest remain pending. - static void retry_all_lock_requests(locktree *lt); + static void retry_all_lock_requests(locktree *lt, void (*after_retry_test_callback)(void) = nullptr); void set_start_test_callback(void (*f)(void)); + void set_start_before_pending_test_callback(void (*f)(void)); void set_retry_test_callback(void (*f)(void)); -private: + void *get_extra(void) const; + + void kill_waiter(void); + static void kill_waiter(locktree *lt, void *extra); + +private: enum state { UNINITIALIZED, INITIALIZED, @@ -152,6 +158,8 @@ private: // locktree that this lock request is for. struct lt_lock_request_info *m_info; + void *m_extra; + // effect: tries again to acquire the lock described by this lock request // returns: 0 if retrying the request succeeded and is now complete int retry(void); @@ -187,6 +195,7 @@ private: static int find_by_txnid(lock_request * const &request, const TXNID &txnid); void (*m_start_test_callback)(void); + void (*m_start_before_pending_test_callback)(void); void (*m_retry_test_callback)(void); friend class lock_request_unit_test; diff --git a/storage/tokudb/PerconaFT/locktree/locktree.cc b/storage/tokudb/PerconaFT/locktree/locktree.cc index d3596d47eeb..11f8a4e5ff7 100644 --- a/storage/tokudb/PerconaFT/locktree/locktree.cc +++ b/storage/tokudb/PerconaFT/locktree/locktree.cc @@ -81,20 +81,14 @@ void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id, const compar m_sto_end_early_time = 0; m_lock_request_info.pending_lock_requests.create(); + m_lock_request_info.pending_is_empty = true; ZERO_STRUCT(m_lock_request_info.mutex); toku_mutex_init(&m_lock_request_info.mutex, nullptr); - m_lock_request_info.should_retry_lock_requests = false; + m_lock_request_info.retry_want = m_lock_request_info.retry_done = 0; ZERO_STRUCT(m_lock_request_info.counters); - // Threads read the should retry bit without a lock - // for performance. It's ok to read the wrong value. - // - If you think you should but you shouldn't, you waste a little time. - // - If you think you shouldn't but you should, then some other thread - // will come around to do the work of retrying requests instead of you. - TOKU_VALGRIND_HG_DISABLE_CHECKING( - &m_lock_request_info.should_retry_lock_requests, - sizeof(m_lock_request_info.should_retry_lock_requests)); - TOKU_DRD_IGNORE_VAR(m_lock_request_info.should_retry_lock_requests); + TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_lock_request_info.pending_is_empty, sizeof(m_lock_request_info.pending_is_empty)); + TOKU_DRD_IGNORE_VAR(m_lock_request_info.pending_is_empty); } void locktree::destroy(void) { diff --git a/storage/tokudb/PerconaFT/locktree/locktree.h b/storage/tokudb/PerconaFT/locktree/locktree.h index 710f9e7db06..64171c51b23 100644 --- a/storage/tokudb/PerconaFT/locktree/locktree.h +++ b/storage/tokudb/PerconaFT/locktree/locktree.h @@ -38,6 +38,8 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #pragma once +#include + #include #include #include @@ -80,9 +82,11 @@ namespace toku { // Lock request state for some locktree struct lt_lock_request_info { omt pending_lock_requests; + std::atomic_bool pending_is_empty; toku_mutex_t mutex; - bool should_retry_lock_requests; lt_counters counters; + std::atomic_ullong retry_want; + unsigned long long retry_done; }; // The locktree manager manages a set of locktrees, one for each open dictionary. @@ -159,6 +163,8 @@ namespace toku { // Add time t to the escalator's wait time statistics void add_escalator_wait_time(uint64_t t); + void kill_waiter(void *extra); + private: static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024; diff --git a/storage/tokudb/PerconaFT/locktree/manager.cc b/storage/tokudb/PerconaFT/locktree/manager.cc index 4708cdf4a5a..91ff7c5a007 100644 --- a/storage/tokudb/PerconaFT/locktree/manager.cc +++ b/storage/tokudb/PerconaFT/locktree/manager.cc @@ -483,4 +483,17 @@ void locktree_manager::get_status(LTM_STATUS statp) { *statp = ltm_status; } +void locktree_manager::kill_waiter(void *extra) { + mutex_lock(); + int r = 0; + size_t num_locktrees = m_locktree_map.size(); + for (size_t i = 0; i < num_locktrees; i++) { + locktree *lt; + r = m_locktree_map.fetch(i, <); + invariant_zero(r); + lock_request::kill_waiter(lt, extra); + } + mutex_unlock(); +} + } /* namespace toku */ diff --git a/storage/tokudb/PerconaFT/locktree/tests/kill_waiter.cc b/storage/tokudb/PerconaFT/locktree/tests/kill_waiter.cc new file mode 100644 index 00000000000..8d93c0bbbab --- /dev/null +++ b/storage/tokudb/PerconaFT/locktree/tests/kill_waiter.cc @@ -0,0 +1,100 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: + +// test the lock manager kill waiter function + +#include "locktree.h" +#include "lock_request.h" +#include "test.h" +#include "locktree_unit_test.h" +#include +#include + +namespace toku { + +const uint64_t my_lock_wait_time = 1000 * 1000; +const uint64_t my_killed_time = 500 * 1000; +const int n_locks = 4; + +static int my_killed_callback(void) { + if (1) fprintf(stderr, "%s:%u %s\n", __FILE__, __LINE__, __FUNCTION__); + return 0; +} + +static void locktree_release_lock(locktree *lt, TXNID txn_id, const DBT *left, const DBT *right) { + range_buffer buffer; + buffer.create(); + buffer.append(left, right); + lt->release_locks(txn_id, &buffer); + buffer.destroy(); +} + +static void wait_lock(lock_request *lr, std::atomic_int *done) { + int r = lr->wait(my_lock_wait_time, my_killed_time, my_killed_callback); + assert(r == DB_LOCK_NOTGRANTED); + *done = 1; +} + +static void test_kill_waiter(void) { + int r; + + locktree_manager mgr; + mgr.create(nullptr, nullptr, nullptr, nullptr); + + DICTIONARY_ID dict_id = { 1 }; + locktree *lt = mgr.get_lt(dict_id, dbt_comparator, nullptr); + + const DBT *one = get_dbt(1); + + lock_request locks[n_locks]; + std::thread waiters[n_locks-1]; + for (int i = 0; i < n_locks; i++) { + locks[i].create(); + locks[i].set(lt, i+1, one, one, lock_request::type::WRITE, false, &waiters[i]); + } + + // txn 'n_locks' grabs the lock + r = locks[n_locks-1].start(); + assert_zero(r); + + for (int i = 0; i < n_locks-1; i++) { + r = locks[i].start(); + assert(r == DB_LOCK_NOTGRANTED); + } + + std::atomic_int done[n_locks-1]; + for (int i = 0; i < n_locks-1; i++) { + done[i] = 0; + waiters[i] = std::thread(wait_lock, &locks[i], &done[i]); + } + + for (int i = 0; i < n_locks-1; i++) { + assert(!done[i]); + } + + sleep(1); + for (int i = 0; i < n_locks-1; i++) { + mgr.kill_waiter(&waiters[i]); + while (!done[i]) sleep(1); + waiters[i].join(); + for (int j = i+1; j < n_locks-1; j++) + assert(!done[j]); + } + + locktree_release_lock(lt, n_locks, one, one); + + for (int i = 0; i < n_locks; i++) { + locks[i].destroy(); + } + + mgr.release_lt(lt); + mgr.destroy(); +} + +} /* namespace toku */ + +int main(void) { + toku::test_kill_waiter(); + return 0; +} + diff --git a/storage/tokudb/PerconaFT/locktree/tests/lock_request_killed.cc b/storage/tokudb/PerconaFT/locktree/tests/lock_request_killed.cc index efd4092906b..ec464444271 100644 --- a/storage/tokudb/PerconaFT/locktree/tests/lock_request_killed.cc +++ b/storage/tokudb/PerconaFT/locktree/tests/lock_request_killed.cc @@ -51,8 +51,9 @@ static uint64_t t_do_kill; static int my_killed_callback(void) { uint64_t t_now = toku_current_time_microsec(); + if (t_now == t_last_kill) + return 0; assert(t_now >= t_last_kill); - assert(t_now - t_last_kill >= my_killed_time * 1000 / 2); // div by 2 for valgrind which is not very accurate t_last_kill = t_now; killed_calls++; if (t_now >= t_do_kill) diff --git a/storage/tokudb/PerconaFT/locktree/tests/lock_request_not_killed.cc b/storage/tokudb/PerconaFT/locktree/tests/lock_request_not_killed.cc index 702e2e2626c..647b4d3c418 100644 --- a/storage/tokudb/PerconaFT/locktree/tests/lock_request_not_killed.cc +++ b/storage/tokudb/PerconaFT/locktree/tests/lock_request_not_killed.cc @@ -52,7 +52,6 @@ static uint64_t t_last_kill; static int my_killed_callback(void) { uint64_t t_now = toku_current_time_microsec(); assert(t_now >= t_last_kill); - assert(t_now - t_last_kill >= my_killed_time * 1000 / 2); // div by 2 for valgrind which is not very accurate t_last_kill = t_now; killed_calls++; return 0; diff --git a/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_release_wait.cc b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_release_wait.cc new file mode 100644 index 00000000000..eb19ceb70e5 --- /dev/null +++ b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_release_wait.cc @@ -0,0 +1,89 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: + +// test the race between start, release, and wait. since start does not put its +// lock request into the pending set, the blocking txn could release its lock before +// the first txn waits. this will block the first txn because its lock request is +// not known when the lock is released. the bug fix is to try again when lock retries +// are locked out. + +#include "locktree.h" +#include "lock_request.h" +#include "test.h" +#include "locktree_unit_test.h" +#include +#include + +namespace toku { + +const uint64_t my_lock_wait_time = 1000 * 1000; // ms +const uint64_t my_killed_time = 1 * 1000; // ms + +static uint64_t t_wait; + +static int my_killed_callback(void) { + uint64_t t_now = toku_current_time_microsec(); + assert(t_now >= t_wait); + if (t_now - t_wait >= my_killed_time*1000) + abort(); + return 0; +} + +static void locktree_release_lock(locktree *lt, TXNID txn_id, const DBT *left, const DBT *right) { + range_buffer buffer; + buffer.create(); + buffer.append(left, right); + lt->release_locks(txn_id, &buffer); + buffer.destroy(); +} + +static void test_start_release_wait(void) { + int r; + + locktree_manager mgr; + mgr.create(nullptr, nullptr, nullptr, nullptr); + + DICTIONARY_ID dict_id = { 1 }; + locktree *lt = mgr.get_lt(dict_id, dbt_comparator, nullptr); + + const DBT *one = get_dbt(1); + + // a locks one + lock_request a; + a.create(); + a.set(lt, 1, one, one, lock_request::type::WRITE, false); + r = a.start(); + assert(r == 0); + + // b tries to lock one, fails + lock_request b; + b.create(); + b.set(lt, 2, one, one, lock_request::type::WRITE, false); + r = b.start(); + assert(r == DB_LOCK_NOTGRANTED); + + // a releases its lock + locktree_release_lock(lt, 1, one, one); + + // b waits for one, gets locks immediately + t_wait = toku_current_time_microsec(); + r = b.wait(my_lock_wait_time, my_killed_time, my_killed_callback); + assert(r == 0); + + // b releases its lock so we can exit cleanly + locktree_release_lock(lt, 2, one, one); + + a.destroy(); + b.destroy(); + + mgr.release_lt(lt); + mgr.destroy(); +} + +} /* namespace toku */ + +int main(void) { + toku::test_start_release_wait(); + return 0; +} + diff --git a/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race.cc b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race.cc index 3b653e9c6ef..88493ec9ce0 100644 --- a/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race.cc +++ b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race.cc @@ -37,6 +37,7 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." #include +#include #include "test.h" #include "locktree.h" #include "lock_request.h" @@ -47,15 +48,6 @@ Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. namespace toku { -struct locker_arg { - locktree *_lt; - TXNID _id; - const DBT *_key; - - locker_arg(locktree *lt, TXNID id, const DBT *key) : _lt(lt), _id(id), _key(key) { - } -}; - static void locker_callback(void) { usleep(10000); } @@ -97,20 +89,13 @@ static void run_locker(locktree *lt, TXNID txnid, const DBT *key) { toku_pthread_yield(); if ((i % 10) == 0) - std::cout << toku_pthread_self() << " " << i << std::endl; + std::cout << std::this_thread::get_id() << " " << i << std::endl; } } -static void *locker(void *v_arg) { - locker_arg *arg = static_cast(v_arg); - run_locker(arg->_lt, arg->_id, arg->_key); - return arg; -} - } /* namespace toku */ int main(void) { - int r; toku::locktree lt; DICTIONARY_ID dict_id = { 1 }; @@ -119,18 +104,12 @@ int main(void) { const DBT *one = toku::get_dbt(1); const int n_workers = 2; - toku_pthread_t ids[n_workers]; + std::thread worker[n_workers]; for (int i = 0; i < n_workers; i++) { - toku::locker_arg *arg = new toku::locker_arg(<, i, one); - r = toku_pthread_create(&ids[i], nullptr, toku::locker, arg); - assert_zero(r); + worker[i] = std::thread(toku::run_locker, <, i, one); } for (int i = 0; i < n_workers; i++) { - void *ret; - r = toku_pthread_join(ids[i], &ret); - assert_zero(r); - toku::locker_arg *arg = static_cast(ret); - delete arg; + worker[i].join(); } lt.release_reference(); diff --git a/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race_3.cc b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race_3.cc new file mode 100644 index 00000000000..92122861819 --- /dev/null +++ b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_race_3.cc @@ -0,0 +1,127 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see . + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see . +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include +#include +#include +#include "test.h" +#include "locktree.h" +#include "lock_request.h" + +// Suppose that 3 threads are running a lock acquire, release, retry sequence. There is +// a race in the retry algorithm with 2 threads running lock retry simultaneously. The +// first thread to run retry sets a flag that will cause the second thread to skip the +// lock retries. If the first thread progressed past the contended lock, then the second +// threa will HANG until its lock timer pops, even when the contended lock is no longer held. + +// This test exposes this problem as a test hang. The group retry algorithm fixes the race +// in the lock request retry algorihm and this test should no longer hang. + +namespace toku { + +// use 1000 when after_retry_all is implemented, otherwise use 100000 +static const int n_tests = 1000; // 100000; + +static void after_retry_all(void) { + usleep(10000); +} + +static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrier_t *b) { + for (int i = 0; i < n_tests; i++) { + int r; + r = pthread_barrier_wait(b); assert(r == 0 || r == PTHREAD_BARRIER_SERIAL_THREAD); + + lock_request request; + request.create(); + + request.set(lt, txnid, key, key, lock_request::type::WRITE, false); + + // try to acquire the lock + r = request.start(); + if (r == DB_LOCK_NOTGRANTED) { + // wait for the lock to be granted + r = request.wait(1000 * 1000); + } + + if (r == 0) { + // release the lock + range_buffer buffer; + buffer.create(); + buffer.append(key, key); + lt->release_locks(txnid, &buffer); + buffer.destroy(); + + // retry pending lock requests + lock_request::retry_all_lock_requests(lt, after_retry_all); + } + + request.destroy(); + memset(&request, 0xab, sizeof request); + + toku_pthread_yield(); + if ((i % 10) == 0) + std::cout << std::this_thread::get_id() << " " << i << std::endl; + } +} + +} /* namespace toku */ + +int main(void) { + + toku::locktree lt; + DICTIONARY_ID dict_id = { 1 }; + lt.create(nullptr, dict_id, toku::dbt_comparator); + + const DBT *one = toku::get_dbt(1); + + const int n_workers = 3; + std::thread worker[n_workers]; + pthread_barrier_t b; + int r = pthread_barrier_init(&b, nullptr, n_workers); assert(r == 0); + for (int i = 0; i < n_workers; i++) { + worker[i] = std::thread(toku::run_locker, <, i, one, &b); + } + for (int i = 0; i < n_workers; i++) { + worker[i].join(); + } + r = pthread_barrier_destroy(&b); assert(r == 0); + lt.release_reference(); + lt.destroy(); + return 0; +} + diff --git a/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_wait_race_2.cc b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_wait_race_2.cc new file mode 100644 index 00000000000..a2ceff99edb --- /dev/null +++ b/storage/tokudb/PerconaFT/locktree/tests/lock_request_start_retry_wait_race_2.cc @@ -0,0 +1,128 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see . + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see . +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include +#include +#include +#include "test.h" +#include "locktree.h" +#include "lock_request.h" + +// Suppose that 2 threads are running a lock acquire, release, retry sequence. There is a +// race between the acquire and the release with 2 threads. If thread 1 acquires a lock, +// and thread 2 tries to acquire the same lock and fails, thread 1 may release its lock and retry +// pending lock requests BEFORE thread 2 adds itself to the pending lock requests. If this +// happens, then thread 2 will HANG until its lock timer expires even when the lock it is +// waiting for is FREE. + +// This test exposes this problem as a test hang. If the race is fixed, then the test runs to +// completion. + +namespace toku { + +static void start_before_pending(void) { + usleep(10000); +} + +static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrier_t *b) { + for (int i = 0; i < 100000; i++) { + int r; + r = pthread_barrier_wait(b); assert(r == 0 || r == PTHREAD_BARRIER_SERIAL_THREAD); + + lock_request request; + request.create(); + request.set(lt, txnid, key, key, lock_request::type::WRITE, false); + + // if the callback is included, then the race is easy to reproduce. Otherwise, several + // test runs may be required before the race happens. + if (1) request.set_start_before_pending_test_callback(start_before_pending); + + // try to acquire the lock + r = request.start(); + if (r == DB_LOCK_NOTGRANTED) { + // wait for the lock to be granted + r = request.wait(1000 * 1000); + } + + if (r == 0) { + // release the lock + range_buffer buffer; + buffer.create(); + buffer.append(key, key); + lt->release_locks(txnid, &buffer); + buffer.destroy(); + + // retry pending lock requests + lock_request::retry_all_lock_requests(lt); + } + + request.destroy(); + memset(&request, 0xab, sizeof request); + + toku_pthread_yield(); + if ((i % 10) == 0) + std::cout << std::this_thread::get_id() << " " << i << std::endl; + } +} + +} /* namespace toku */ + +int main(void) { + + toku::locktree lt; + DICTIONARY_ID dict_id = { 1 }; + lt.create(nullptr, dict_id, toku::dbt_comparator); + + const DBT *one = toku::get_dbt(1); + + const int n_workers = 2; + std::thread worker[n_workers]; + pthread_barrier_t b; + int r = pthread_barrier_init(&b, nullptr, n_workers); assert(r == 0); + for (int i = 0; i < n_workers; i++) { + worker[i] = std::thread(toku::run_locker, <, i, one, &b); + } + for (int i = 0; i < n_workers; i++) { + worker[i].join(); + } + r = pthread_barrier_destroy(&b); assert(r == 0); + lt.release_reference(); + lt.destroy(); + return 0; +} + diff --git a/storage/tokudb/PerconaFT/src/tests/test_iterate_live_transactions.cc b/storage/tokudb/PerconaFT/src/tests/test_iterate_live_transactions.cc index c5561cdf90f..23c79620cd8 100644 --- a/storage/tokudb/PerconaFT/src/tests/test_iterate_live_transactions.cc +++ b/storage/tokudb/PerconaFT/src/tests/test_iterate_live_transactions.cc @@ -55,7 +55,8 @@ static int iterate_callback(DB_TXN *txn, iterate_row_locks_callback iterate_locks, void *locks_extra, void *extra) { uint64_t txnid = txn->id64(txn); - uint64_t client_id = txn->get_client_id(txn); + uint64_t client_id; void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); iterate_extra *info = reinterpret_cast(extra); DB *db; DBT left_key, right_key; @@ -93,13 +94,13 @@ int test_main(int UU(argc), char *const UU(argv[])) { r = env->open(env, TOKU_TEST_FILENAME, env_flags, 0755); CKERR(r); r = env->txn_begin(env, NULL, &txn1, 0); CKERR(r); - txn1->set_client_id(txn1, 0); + txn1->set_client_id(txn1, 0, NULL); txnid1 = txn1->id64(txn1); r = env->txn_begin(env, NULL, &txn2, 0); CKERR(r); - txn2->set_client_id(txn2, 1); + txn2->set_client_id(txn2, 1, NULL); txnid2 = txn2->id64(txn2); r = env->txn_begin(env, NULL, &txn3, 0); CKERR(r); - txn3->set_client_id(txn3, 2); + txn3->set_client_id(txn3, 2, NULL); txnid3 = txn3->id64(txn3); { diff --git a/storage/tokudb/PerconaFT/src/tests/test_stress0.cc b/storage/tokudb/PerconaFT/src/tests/test_stress0.cc index 88140dd1731..037ffdd312d 100644 --- a/storage/tokudb/PerconaFT/src/tests/test_stress0.cc +++ b/storage/tokudb/PerconaFT/src/tests/test_stress0.cc @@ -93,7 +93,8 @@ static int iterate_txns(DB_TXN *txn, iterate_row_locks_callback iterate_locks, void *locks_extra, void *extra) { uint64_t txnid = txn->id64(txn); - uint64_t client_id = txn->get_client_id(txn); + uint64_t client_id; void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); invariant_null(extra); invariant(txnid > 0); invariant(client_id == 0); diff --git a/storage/tokudb/PerconaFT/src/ydb.cc b/storage/tokudb/PerconaFT/src/ydb.cc index 3341f6d76c6..d51ee81700f 100644 --- a/storage/tokudb/PerconaFT/src/ydb.cc +++ b/storage/tokudb/PerconaFT/src/ydb.cc @@ -2620,6 +2620,10 @@ static void env_set_killed_callback(DB_ENV *env, uint64_t default_killed_time_ms env->i->killed_callback = killed_callback; } +static void env_kill_waiter(DB_ENV *env, void *extra) { + env->i->ltm.kill_waiter(extra); +} + static void env_do_backtrace(DB_ENV *env) { if (env->i->errcall) { db_env_do_backtrace_errfunc((toku_env_err_func) toku_env_err, (const void *) env); @@ -2719,6 +2723,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { USENV(set_dir_per_db); USENV(get_dir_per_db); USENV(get_data_dir); + USENV(kill_waiter); #undef USENV // unlocked methods diff --git a/storage/tokudb/PerconaFT/src/ydb_txn.cc b/storage/tokudb/PerconaFT/src/ydb_txn.cc index ae1f93011d1..40b479055f2 100644 --- a/storage/tokudb/PerconaFT/src/ydb_txn.cc +++ b/storage/tokudb/PerconaFT/src/ydb_txn.cc @@ -323,12 +323,12 @@ int locked_txn_abort(DB_TXN *txn) { return r; } -static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id) { - toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id); +static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id, void *client_extra) { + toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id, client_extra); } -static uint64_t locked_txn_get_client_id(DB_TXN *txn) { - return toku_txn_get_client_id(db_txn_struct_i(txn)->tokutxn); +static void locked_txn_get_client_id(DB_TXN *txn, uint64_t *client_id, void **client_extra) { + toku_txn_get_client_id(db_txn_struct_i(txn)->tokutxn, client_id, client_extra); } static int toku_txn_discard(DB_TXN *txn, uint32_t flags) { diff --git a/storage/tokudb/tokudb_information_schema.cc b/storage/tokudb/tokudb_information_schema.cc index b3d77eef2d9..86af3f14f91 100644 --- a/storage/tokudb/tokudb_information_schema.cc +++ b/storage/tokudb/tokudb_information_schema.cc @@ -75,7 +75,9 @@ int trx_callback( void *extra) { uint64_t txn_id = txn->id64(txn); - uint64_t client_id = txn->get_client_id(txn); + uint64_t client_id; + void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); uint64_t start_time = txn->get_start_time(txn); trx_extra_t* e = reinterpret_cast(extra); THD* thd = e->thd; @@ -314,7 +316,9 @@ int locks_callback( void* extra) { uint64_t txn_id = txn->id64(txn); - uint64_t client_id = txn->get_client_id(txn); + uint64_t client_id; + void *client_extra; + txn->get_client_id(txn, &client_id, &client_extra); locks_extra_t* e = reinterpret_cast(extra); THD* thd = e->thd; TABLE* table = e->table; diff --git a/storage/tokudb/tokudb_txn.h b/storage/tokudb/tokudb_txn.h index 67bf591d088..d0255415403 100644 --- a/storage/tokudb/tokudb_txn.h +++ b/storage/tokudb/tokudb_txn.h @@ -116,7 +116,7 @@ inline int txn_begin( int r = env->txn_begin(env, parent, txn, flags); if (r == 0 && thd) { DB_TXN* this_txn = *txn; - this_txn->set_client_id(this_txn, thd_get_thread_id(thd)); + this_txn->set_client_id(this_txn, thd_get_thread_id(thd), thd); } TOKUDB_TRACE_FOR_FLAGS( TOKUDB_DEBUG_TXN,