|
|
|
@ -13,9 +13,10 @@ |
|
|
|
#include <valgrind/helgrind.h> |
|
|
|
#include "ft/txn_manager.h" |
|
|
|
|
|
|
|
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION poll, |
|
|
|
void *poll_extra, bool release_multi_operation_client_lock); |
|
|
|
static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra); |
|
|
|
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION poll, |
|
|
|
void *poll_extra, bool release_mo_client_lock_if_held, bool *holds_mo_lock); |
|
|
|
static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, |
|
|
|
bool *holds_mo_lock); |
|
|
|
|
|
|
|
static int |
|
|
|
toku_txn_release_locks(DB_TXN* txn) { |
|
|
|
@ -56,15 +57,23 @@ toku_txn_destroy(DB_TXN *txn) { |
|
|
|
toku_free(txn); |
|
|
|
} |
|
|
|
|
|
|
|
static int |
|
|
|
toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, |
|
|
|
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, |
|
|
|
bool release_multi_operation_client_lock) { |
|
|
|
static int |
|
|
|
toku_txn_commit(DB_TXN * txn, u_int32_t flags, |
|
|
|
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, |
|
|
|
bool release_mo_lock_if_held, |
|
|
|
bool *holds_mo_lock) { |
|
|
|
HANDLE_PANICKED_ENV(txn->mgrp); |
|
|
|
// Don't take multi-operation lock until we see a non-readonly txn. |
|
|
|
if (!*holds_mo_lock && |
|
|
|
!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) { |
|
|
|
toku_multi_operation_client_lock(); |
|
|
|
*holds_mo_lock = true; |
|
|
|
} |
|
|
|
//Recursively kill off children |
|
|
|
if (db_txn_struct_i(txn)->child) { |
|
|
|
//commit of child sets the child pointer to NULL |
|
|
|
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, flags, NULL, NULL, false); |
|
|
|
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, |
|
|
|
flags, NULL, NULL, false, holds_mo_lock); |
|
|
|
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { |
|
|
|
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n"); |
|
|
|
} |
|
|
|
@ -116,19 +125,15 @@ toku_txn_commit_only(DB_TXN * txn, u_int32_t flags, |
|
|
|
// this lock must be held until the references to the open FTs is released |
|
|
|
// begin checkpoint logs these associations, so we must be protect |
|
|
|
// the changing of these associations with checkpointing |
|
|
|
if (release_multi_operation_client_lock) { |
|
|
|
if (release_mo_lock_if_held && *holds_mo_lock) { |
|
|
|
toku_multi_operation_client_unlock(); |
|
|
|
} |
|
|
|
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync); |
|
|
|
if (flags!=0) return EINVAL; |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
static int |
|
|
|
toku_txn_commit(DB_TXN * txn, u_int32_t flags, |
|
|
|
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, |
|
|
|
bool release_multi_operation_client_lock) { |
|
|
|
int r = toku_txn_commit_only(txn, flags, poll, poll_extra, release_multi_operation_client_lock); |
|
|
|
if (flags!=0) { |
|
|
|
r = EINVAL; |
|
|
|
goto cleanup; |
|
|
|
} |
|
|
|
cleanup: |
|
|
|
toku_txn_destroy(txn); |
|
|
|
return r; |
|
|
|
} |
|
|
|
@ -148,13 +153,21 @@ toku_txn_id64(DB_TXN * txn) { |
|
|
|
} |
|
|
|
|
|
|
|
static int |
|
|
|
toku_txn_abort_only(DB_TXN * txn, |
|
|
|
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { |
|
|
|
toku_txn_abort(DB_TXN * txn, |
|
|
|
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra, |
|
|
|
bool *holds_mo_lock) { |
|
|
|
HANDLE_PANICKED_ENV(txn->mgrp); |
|
|
|
// Don't take multi-operation lock until we see a non-readonly txn. |
|
|
|
if (!*holds_mo_lock && |
|
|
|
!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) { |
|
|
|
toku_multi_operation_client_lock(); |
|
|
|
*holds_mo_lock = true; |
|
|
|
} |
|
|
|
//Recursively kill off children (abort or commit are both correct, commit is cheaper) |
|
|
|
if (db_txn_struct_i(txn)->child) { |
|
|
|
//commit of child sets the child pointer to NULL |
|
|
|
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, DB_TXN_NOSYNC, NULL, NULL, false); |
|
|
|
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, |
|
|
|
DB_TXN_NOSYNC, NULL, NULL, false, holds_mo_lock); |
|
|
|
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { |
|
|
|
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent abort.\n"); |
|
|
|
} |
|
|
|
@ -176,29 +189,34 @@ toku_txn_abort_only(DB_TXN * txn, |
|
|
|
assert_zero(r); |
|
|
|
toku_txn_complete_txn(db_txn_struct_i(txn)->tokutxn); |
|
|
|
r = toku_txn_release_locks(txn); |
|
|
|
toku_txn_destroy(txn); |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
// requires: must hold the multi operation lock. it is |
|
|
|
// released here before the fsync. |
|
|
|
static int |
|
|
|
toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { |
|
|
|
int r = 0; |
|
|
|
if (!txn) { |
|
|
|
toku_multi_operation_client_unlock(); |
|
|
|
r = EINVAL; |
|
|
|
goto exit; |
|
|
|
} |
|
|
|
if (txn->parent) { |
|
|
|
toku_multi_operation_client_unlock(); |
|
|
|
r = 0; // make this a NO-OP, MySQL calls this |
|
|
|
goto exit; |
|
|
|
} |
|
|
|
HANDLE_PANICKED_ENV(txn->mgrp); |
|
|
|
// Take the mo lock as soon as a non-readonly txn is found |
|
|
|
bool holds_mo_lock = false; |
|
|
|
if (!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) { |
|
|
|
toku_multi_operation_client_lock(); |
|
|
|
holds_mo_lock = true; |
|
|
|
} |
|
|
|
//Recursively commit any children. |
|
|
|
if (db_txn_struct_i(txn)->child) { |
|
|
|
//commit of child sets the child pointer to NULL |
|
|
|
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, 0, NULL, NULL, false); |
|
|
|
|
|
|
|
// toku_txn_commit will take the mo_lock if not held and a non-readonly txn is found. |
|
|
|
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, 0, NULL, NULL, false, &holds_mo_lock); |
|
|
|
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { |
|
|
|
env_panic(txn->mgrp, r_child, "Recursive child commit failed during parent commit.\n"); |
|
|
|
} |
|
|
|
@ -213,9 +231,11 @@ toku_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { |
|
|
|
bool do_fsync; |
|
|
|
toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn); |
|
|
|
// release the multi operation lock before fsyncing the log |
|
|
|
toku_multi_operation_client_unlock(); |
|
|
|
if (holds_mo_lock) { |
|
|
|
toku_multi_operation_client_unlock(); |
|
|
|
} |
|
|
|
toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync); |
|
|
|
exit: |
|
|
|
exit: |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
@ -232,14 +252,6 @@ toku_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { |
|
|
|
return toku_txn_xa_prepare(txn, &xid); |
|
|
|
} |
|
|
|
|
|
|
|
static int |
|
|
|
toku_txn_abort(DB_TXN * txn, |
|
|
|
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) { |
|
|
|
int r = toku_txn_abort_only(txn, poll, poll_extra); |
|
|
|
toku_txn_destroy(txn); |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
static u_int32_t |
|
|
|
locked_txn_id(DB_TXN *txn) { |
|
|
|
u_int32_t r = toku_txn_id(txn); |
|
|
|
@ -265,11 +277,13 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags, |
|
|
|
if (toku_txn_requires_checkpoint(ttxn)) { |
|
|
|
toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT); |
|
|
|
} |
|
|
|
// cannot begin a checkpoint. the multi operation lock is taken here, |
|
|
|
// but released in toku_txn_commit_only. this way, we don't hold it |
|
|
|
// while we fsync the log. |
|
|
|
toku_multi_operation_client_lock(); |
|
|
|
int r = toku_txn_commit(txn, flags, poll, poll_extra, true); |
|
|
|
bool holds_mo_lock = false; |
|
|
|
// cannot begin a checkpoint. |
|
|
|
// the multi operation lock is taken the first time we |
|
|
|
// see a non-readonly txn in the recursive commit. |
|
|
|
// But released in the first-level toku_txn_commit (if taken), |
|
|
|
// this way, we don't hold it while we fsync the log. |
|
|
|
int r = toku_txn_commit(txn, flags, poll, poll_extra, true, &holds_mo_lock); |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
@ -277,9 +291,14 @@ static int |
|
|
|
locked_txn_abort_with_progress(DB_TXN *txn, |
|
|
|
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { |
|
|
|
// cannot begin a checkpoint |
|
|
|
toku_multi_operation_client_lock(); |
|
|
|
int r = toku_txn_abort(txn, poll, poll_extra); |
|
|
|
toku_multi_operation_client_unlock(); |
|
|
|
// the multi operation lock is taken the first time we |
|
|
|
// see a non-readonly txn in the abort (or recursive commit). |
|
|
|
// But released here so we don't have to hold additional state. |
|
|
|
bool holds_mo_lock = false; |
|
|
|
int r = toku_txn_abort(txn, poll, poll_extra, &holds_mo_lock); |
|
|
|
if (holds_mo_lock) { |
|
|
|
toku_multi_operation_client_unlock(); |
|
|
|
} |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
@ -295,22 +314,21 @@ locked_txn_abort(DB_TXN *txn) { |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
static int |
|
|
|
locked_txn_prepare (DB_TXN *txn, u_int8_t gid[DB_GID_SIZE]) { |
|
|
|
// toku_txn_prepare eventually releases the multi operation lock |
|
|
|
// before fsyncing the log |
|
|
|
toku_multi_operation_client_lock(); |
|
|
|
int r = toku_txn_prepare(txn, gid); |
|
|
|
return r; |
|
|
|
} |
|
|
|
|
|
|
|
static int |
|
|
|
locked_txn_xa_prepare (DB_TXN *txn, TOKU_XA_XID *xid) { |
|
|
|
// toku_txn_xa_prepare eventually releases the multi operation lock |
|
|
|
// before fsyncing the log |
|
|
|
toku_multi_operation_client_lock(); |
|
|
|
int r = toku_txn_xa_prepare(txn, xid); |
|
|
|
return r; |
|
|
|
static inline void |
|
|
|
txn_func_init(DB_TXN *txn) { |
|
|
|
#define STXN(name) txn->name = locked_txn_ ## name |
|
|
|
STXN(abort); |
|
|
|
STXN(commit); |
|
|
|
STXN(abort_with_progress); |
|
|
|
STXN(commit_with_progress); |
|
|
|
STXN(id); |
|
|
|
STXN(txn_stat); |
|
|
|
#undef STXN |
|
|
|
#define SUTXN(name) txn->name = toku_txn_ ## name |
|
|
|
SUTXN(prepare); |
|
|
|
SUTXN(xa_prepare); |
|
|
|
#undef SUTXN |
|
|
|
txn->id64 = toku_txn_id64; |
|
|
|
} |
|
|
|
|
|
|
|
int |
|
|
|
@ -398,17 +416,7 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { |
|
|
|
|
|
|
|
//toku_ydb_notef("parent=%p flags=0x%x\n", stxn, flags); |
|
|
|
result->mgrp = env; |
|
|
|
#define STXN(name) result->name = locked_txn_ ## name |
|
|
|
STXN(abort); |
|
|
|
STXN(commit); |
|
|
|
STXN(abort_with_progress); |
|
|
|
STXN(commit_with_progress); |
|
|
|
STXN(id); |
|
|
|
STXN(prepare); |
|
|
|
STXN(xa_prepare); |
|
|
|
STXN(txn_stat); |
|
|
|
#undef STXN |
|
|
|
result->id64 = toku_txn_id64; |
|
|
|
txn_func_init(result); |
|
|
|
|
|
|
|
result->parent = stxn; |
|
|
|
#if !TOKUDB_NATIVE_H |
|
|
|
@ -477,16 +485,7 @@ void toku_keep_prepared_txn_callback (DB_ENV *env, TOKUTXN tokutxn) { |
|
|
|
memset(eresult, 0, sizeof(*eresult)); |
|
|
|
DB_TXN *result = &eresult->external_part; |
|
|
|
result->mgrp = env; |
|
|
|
#define STXN(name) result->name = locked_txn_ ## name |
|
|
|
STXN(abort); |
|
|
|
STXN(commit); |
|
|
|
STXN(abort_with_progress); |
|
|
|
STXN(commit_with_progress); |
|
|
|
STXN(id); |
|
|
|
STXN(prepare); |
|
|
|
STXN(txn_stat); |
|
|
|
#undef STXN |
|
|
|
result->id64 = toku_txn_id64; |
|
|
|
txn_func_init(result); |
|
|
|
|
|
|
|
result->parent = NULL; |
|
|
|
|
|
|
|
|