|
|
|
@ -44,6 +44,17 @@ static inline void thd_data_set(THD *thd, int slot, void *data) { |
|
|
|
#include "hatoku_hton.h"
|
|
|
|
#include <mysql/plugin.h>
|
|
|
|
|
|
|
|
#define declare_lockretry \
|
|
|
|
int lockretrycount; |
|
|
|
|
|
|
|
#define lockretryN(N) \
|
|
|
|
for (lockretrycount=0; lockretrycount<(N); lockretrycount++) |
|
|
|
|
|
|
|
#define lockretry lockretryN(100)
|
|
|
|
|
|
|
|
#define lockretry_wait \
|
|
|
|
TOKUDB_TRACE("%s count=%d\n", __FUNCTION__, lockretrycount); \ |
|
|
|
usleep((lockretrycount<4 ? (1<<lockretrycount) : (1<<3)) * 1024) |
|
|
|
|
|
|
|
/** @brief
|
|
|
|
Simple lock controls. The "share" it creates is a structure we will |
|
|
|
@ -1698,7 +1709,7 @@ bool ha_tokudb::check_if_incompatible_data(HA_CREATE_INFO * info, uint table_cha |
|
|
|
return COMPATIBLE_DATA_NO; |
|
|
|
return COMPATIBLE_DATA_YES; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
// Stores a row in the table, called when handling an INSERT query
|
|
|
|
// Parameters:
|
|
|
|
@ -1716,6 +1727,8 @@ int ha_tokudb::write_row(uchar * record) { |
|
|
|
bool has_null; |
|
|
|
DB_TXN* sub_trans = NULL; |
|
|
|
DB_TXN* txn = NULL; |
|
|
|
tokudb_trx_data *trx = NULL; |
|
|
|
declare_lockretry; |
|
|
|
|
|
|
|
//
|
|
|
|
// some crap that needs to be done because MySQL does not properly abstract
|
|
|
|
@ -1775,13 +1788,29 @@ int ha_tokudb::write_row(uchar * record) { |
|
|
|
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { |
|
|
|
put_flags = DB_YESOVERWRITE; |
|
|
|
} |
|
|
|
error = share->file->put( |
|
|
|
share->file, |
|
|
|
txn, |
|
|
|
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null), |
|
|
|
&row, |
|
|
|
put_flags |
|
|
|
); |
|
|
|
|
|
|
|
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); |
|
|
|
if (trx->table_lock_wanted) { |
|
|
|
error = acquire_table_lock(trx->all ? trx->all : trx->stmt, |
|
|
|
(TABLE_LOCK_TYPE)trx->table_lock_type); |
|
|
|
if (error) |
|
|
|
TOKUDB_TRACE(" acquire_table_lock trx=%p type=%d error=%d\n", trx, trx->table_lock_type, error); |
|
|
|
trx->table_lock_wanted = false; |
|
|
|
} |
|
|
|
|
|
|
|
lockretry { |
|
|
|
error = share->file->put( |
|
|
|
share->file, |
|
|
|
txn, |
|
|
|
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null), |
|
|
|
&row, |
|
|
|
put_flags |
|
|
|
); |
|
|
|
if (error != DB_LOCK_NOTGRANTED) |
|
|
|
break; |
|
|
|
lockretry_wait; |
|
|
|
} |
|
|
|
|
|
|
|
if (error) { |
|
|
|
last_dup_key = primary_key; |
|
|
|
goto cleanup; |
|
|
|
@ -1808,22 +1837,32 @@ int ha_tokudb::write_row(uchar * record) { |
|
|
|
} |
|
|
|
cluster_row_created = true; |
|
|
|
} |
|
|
|
error = share->key_file[keynr]->put( |
|
|
|
share->key_file[keynr], |
|
|
|
txn, |
|
|
|
&key, |
|
|
|
&row, |
|
|
|
put_flags |
|
|
|
); |
|
|
|
lockretry { |
|
|
|
error = share->key_file[keynr]->put( |
|
|
|
share->key_file[keynr], |
|
|
|
txn, |
|
|
|
&key, |
|
|
|
&row, |
|
|
|
put_flags |
|
|
|
); |
|
|
|
if (error != DB_LOCK_NOTGRANTED) |
|
|
|
break; |
|
|
|
lockretry_wait; |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
error = share->key_file[keynr]->put( |
|
|
|
share->key_file[keynr], |
|
|
|
txn, |
|
|
|
&key, |
|
|
|
&prim_key, |
|
|
|
put_flags |
|
|
|
); |
|
|
|
lockretry { |
|
|
|
error = share->key_file[keynr]->put( |
|
|
|
share->key_file[keynr], |
|
|
|
txn, |
|
|
|
&key, |
|
|
|
&prim_key, |
|
|
|
put_flags |
|
|
|
); |
|
|
|
if (error != DB_LOCK_NOTGRANTED) |
|
|
|
break; |
|
|
|
lockretry_wait; |
|
|
|
} |
|
|
|
} |
|
|
|
//
|
|
|
|
// We break if we hit an error, unless it is a dup key error
|
|
|
|
@ -3260,15 +3299,23 @@ int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) { |
|
|
|
db->dbt_neg_infty(), db->dbt_neg_infty(), |
|
|
|
db->dbt_pos_infty(), db->dbt_pos_infty() |
|
|
|
); |
|
|
|
if (error) { goto cleanup; } |
|
|
|
if (error) break; |
|
|
|
} |
|
|
|
if (error) goto cleanup; |
|
|
|
} |
|
|
|
else if (lt == lock_write) { |
|
|
|
if (tokudb_debug & TOKUDB_DEBUG_LOCK) |
|
|
|
TOKUDB_TRACE("%s\n", __FUNCTION__); |
|
|
|
for (uint i = 0; i < curr_num_DBs; i++) { |
|
|
|
DB* db = share->key_file[i]; |
|
|
|
error = db->pre_acquire_table_lock(db, trans); |
|
|
|
if (error) { goto cleanup; } |
|
|
|
if (error == EINVAL) |
|
|
|
TOKUDB_TRACE("%s %d db=%p trans=%p\n", __FUNCTION__, i, db, trans); |
|
|
|
if (error) break; |
|
|
|
} |
|
|
|
if (tokudb_debug & TOKUDB_DEBUG_LOCK) |
|
|
|
TOKUDB_TRACE("%s error=%d\n", __FUNCTION__, error); |
|
|
|
if (error) goto cleanup; |
|
|
|
} |
|
|
|
else { |
|
|
|
error = ENOSYS; |
|
|
|
@ -3299,12 +3346,17 @@ cleanup: |
|
|
|
// error otherwise
|
|
|
|
//
|
|
|
|
int ha_tokudb::external_lock(THD * thd, int lock_type) { |
|
|
|
TOKUDB_DBUG_ENTER("ha_tokudb::external_lock %d", thd_sql_command(thd)); |
|
|
|
TOKUDB_DBUG_ENTER("ha_tokudb::external_lock cmd=%d %d", thd_sql_command(thd), lock_type); |
|
|
|
if (tokudb_debug & TOKUDB_DEBUG_LOCK) |
|
|
|
TOKUDB_TRACE("%s cmd=%d %d\n", __FUNCTION__, thd_sql_command(thd), lock_type); |
|
|
|
// QQQ this is here to allow experiments without transactions
|
|
|
|
int error = 0; |
|
|
|
ulong tx_isolation = thd_tx_isolation(thd); |
|
|
|
HA_TOKU_ISO_LEVEL toku_iso_level = tx_to_toku_iso(tx_isolation); |
|
|
|
tokudb_trx_data *trx = NULL; |
|
|
|
#if 0
|
|
|
|
declare_lockretry; |
|
|
|
#endif
|
|
|
|
|
|
|
|
//
|
|
|
|
// reset per-stmt variables
|
|
|
|
@ -3352,21 +3404,8 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { |
|
|
|
trx->sp_level = trx->all; |
|
|
|
trans_register_ha(thd, TRUE, tokudb_hton); |
|
|
|
if (thd->in_lock_tables && thd_sql_command(thd) == SQLCOM_LOCK_TABLES) { |
|
|
|
//
|
|
|
|
// grab table locks
|
|
|
|
// For the command "Lock tables foo read, bar read"
|
|
|
|
// This statement is grabbing the locks for the table
|
|
|
|
// foo. The locks for bar will be grabbed when
|
|
|
|
// trx->tokudb_lock_count has been initialized
|
|
|
|
//
|
|
|
|
if (lock.type <= TL_READ_NO_INSERT) { |
|
|
|
error = acquire_table_lock(trx->all,lock_read); |
|
|
|
} |
|
|
|
else { |
|
|
|
error = acquire_table_lock(trx->all,lock_write); |
|
|
|
} |
|
|
|
// Don't create stmt trans
|
|
|
|
if (error) {trx->tokudb_lock_count--;} |
|
|
|
trx->table_lock_wanted = true; |
|
|
|
trx->table_lock_type = lock.type < TL_READ_NO_INSERT ? lock_read : lock_write; |
|
|
|
goto cleanup; |
|
|
|
} |
|
|
|
} |
|
|
|
@ -3396,20 +3435,8 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { |
|
|
|
else { |
|
|
|
if (thd->in_lock_tables && thd_sql_command(thd) == SQLCOM_LOCK_TABLES) { |
|
|
|
assert(trx->all != NULL); |
|
|
|
//
|
|
|
|
// For the command "Lock tables foo read, bar read"
|
|
|
|
// This statement is grabbing the locks for the table
|
|
|
|
// bar. The locks for foo will be grabbed when
|
|
|
|
// trx->tokudb_lock_count is 0 and we are initializing
|
|
|
|
// trx->all above
|
|
|
|
//
|
|
|
|
if (lock.type <= TL_READ_NO_INSERT) { |
|
|
|
error = acquire_table_lock(trx->all,lock_read); |
|
|
|
} |
|
|
|
else { |
|
|
|
error = acquire_table_lock(trx->all,lock_write); |
|
|
|
} |
|
|
|
if (error) {trx->tokudb_lock_count--; goto cleanup;} |
|
|
|
trx->table_lock_wanted = true; |
|
|
|
trx->table_lock_type = lock.type < TL_READ_NO_INSERT ? lock_read : lock_write; |
|
|
|
} |
|
|
|
} |
|
|
|
transaction = trx->stmt; |
|
|
|
@ -3446,6 +3473,8 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { |
|
|
|
transaction = NULL; |
|
|
|
} |
|
|
|
cleanup: |
|
|
|
if (tokudb_debug & TOKUDB_DEBUG_LOCK) |
|
|
|
TOKUDB_TRACE("%s error=%d\n", __FUNCTION__, error); |
|
|
|
TOKUDB_DBUG_RETURN(error); |
|
|
|
} |
|
|
|
|
|
|
|
@ -3516,6 +3545,8 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { |
|
|
|
|
|
|
|
THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_lock_type lock_type) { |
|
|
|
TOKUDB_DBUG_ENTER("ha_tokudb::store_lock, lock_type=%d cmd=%d", lock_type, thd_sql_command(thd)); |
|
|
|
if (tokudb_debug & TOKUDB_DEBUG_LOCK) |
|
|
|
TOKUDB_TRACE("%s lock_type=%d cmd=%d\n", __FUNCTION__, lock_type, thd_sql_command(thd)); |
|
|
|
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) { |
|
|
|
/* If we are not doing a LOCK TABLE, then allow multiple writers */ |
|
|
|
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) && |
|
|
|
@ -3525,6 +3556,8 @@ THR_LOCK_DATA **ha_tokudb::store_lock(THD * thd, THR_LOCK_DATA ** to, enum thr_l |
|
|
|
lock.type = lock_type; |
|
|
|
} |
|
|
|
*to++ = &lock; |
|
|
|
if (tokudb_debug & TOKUDB_DEBUG_LOCK) |
|
|
|
TOKUDB_TRACE("%s lock_type=%d\n", __FUNCTION__, lock_type); |
|
|
|
DBUG_RETURN(to); |
|
|
|
} |
|
|
|
|
|
|
|
|