Browse Source

MDEV-25691: Simplify handlerton::drop_database for InnoDB

The implementation of handlerton::drop_database in InnoDB is
unnecessarily complex. The minimal implementation should check
that no conflicting locks or references exist on the tables,
delete all table metadata in a single transaction, and finally
delete the tablespaces.

Note: DROP DATABASE will delete each individual table that the
SQL layer knows about, one table per transaction.
The handlerton::drop_database is basically a final cleanup step
for removing any garbage that could have been left behind
in InnoDB due to some bug, or not having atomic DDL in the past.

hash_node_t: Remove. Use the proper data type name in pointers.

dict_drop_index_tree(): Do not take the table as a parameter.
Instead, return the tablespace ID if the tablespace should be dropped
(we are dropping a clustered index tree).

fil_delete_tablespace(), fil_system_t::detach(): Return a single
detached file handle. Multi-file tablespaces cannot be deleted
via this interface.

ha_innobase::delete_table(): Remove a work-around for non-atomic DDL
and do not try to drop tables with similar-looking name.

innodb_drop_database(): Complete rewrite.

innobase_drop_database(), dict_get_first_table_name_in_db(),
row_drop_database_for_mysql(), drop_all_foreign_keys_in_db(): Remove.

row_purge_remove_clust_if_poss_low(), row_undo_ins_remove_clust_rec():
If the tablespace is to be deleted, try to evict the table definition
from the cache. Failing that, set dict_table_t::space to nullptr.

lock_release_on_rollback(): On the rollback of CREATE TABLE, release all
locks that the transaction had on the table, to avoid heap-use-after-free.
bb-10.6-wlad-MDEV-22010
Marko Mäkelä 5 years ago
parent
commit
c366845a0b
  1. 5
      extra/mariabackup/xtrabackup.cc
  2. 23
      storage/innobase/dict/dict0crea.cc
  3. 83
      storage/innobase/dict/dict0load.cc
  4. 42
      storage/innobase/fil/fil0fil.cc
  5. 426
      storage/innobase/handler/ha_innodb.cc
  6. 10
      storage/innobase/include/dict0crea.h
  7. 11
      storage/innobase/include/dict0load.h
  8. 8
      storage/innobase/include/dict0mem.h
  9. 20
      storage/innobase/include/fil0fil.h
  10. 1
      storage/innobase/include/hash0hash.h
  11. 3
      storage/innobase/include/lock0lock.h
  12. 11
      storage/innobase/include/row0mysql.h
  13. 30
      storage/innobase/lock/lock0lock.cc
  14. 249
      storage/innobase/row/row0mysql.cc
  15. 57
      storage/innobase/row/row0purge.cc
  16. 38
      storage/innobase/row/row0uins.cc

5
extra/mariabackup/xtrabackup.cc

@ -181,12 +181,11 @@ static hash_table_t databases_exclude_hash;
static hash_table_t inc_dir_tables_hash;
struct xb_filter_entry_struct{
struct xb_filter_entry_t{
char* name;
ibool has_tables;
hash_node_t name_hash;
xb_filter_entry_t *name_hash;
};
typedef struct xb_filter_entry_struct xb_filter_entry_t;
lsn_t checkpoint_lsn_start;
lsn_t checkpoint_no_start;

23
storage/innobase/dict/dict0crea.cc

@ -826,10 +826,10 @@ dict_create_index_tree_in_mem(
/** Drop the index tree associated with a row in SYS_INDEXES table.
@param[in,out] pcur persistent cursor on rec
@param[in,out] trx dictionary transaction
@param[in,out] table table that the record belongs to
@param[in,out] mtr mini-transaction */
void dict_drop_index_tree(btr_pcur_t *pcur, trx_t *trx, dict_table_t *table,
mtr_t *mtr)
@param[in,out] mtr mini-transaction
@return tablespace ID to drop (if this is the clustered index)
@retval 0 if no tablespace is to be dropped */
uint32_t dict_drop_index_tree(btr_pcur_t *pcur, trx_t *trx, mtr_t *mtr)
{
rec_t *rec= btr_pcur_get_rec(pcur);
@ -846,7 +846,7 @@ void dict_drop_index_tree(btr_pcur_t *pcur, trx_t *trx, dict_table_t *table,
{
rec_corrupted:
ib::error() << "Corrupted SYS_INDEXES record";
return;
return 0;
}
if (rec_get_1byte_offs_flag(rec))
@ -875,14 +875,9 @@ rec_corrupted:
ut_ad(root_page_no == FIL_NULL || space_id <= SRV_SPACE_ID_UPPER_BOUND);
if (space_id && (type & DICT_CLUSTERED))
{
if (table && table->space_id == space_id)
table->space= nullptr;
else
ut_ad(!table);
fil_delete_tablespace(space_id, true);
}
else if (root_page_no == FIL_NULL)
return space_id;
if (root_page_no == FIL_NULL)
/* The tree has already been freed */;
else if (fil_space_t*s= fil_space_t::get(space_id))
{
@ -898,6 +893,8 @@ rec_corrupted:
}
s->release();
}
return 0;
}
/*********************************************************************//**

83
storage/innobase/dict/dict0load.cc

@ -198,89 +198,6 @@ name_of_col_is(
}
#endif /* UNIV_DEBUG */
/********************************************************************//**
Finds the first table name in the given database.
@return own: table name, NULL if does not exist; the caller must free
the memory in the string! */
char*
dict_get_first_table_name_in_db(
/*============================*/
const char* name) /*!< in: database name which ends in '/' */
{
dict_table_t* sys_tables;
btr_pcur_t pcur;
dict_index_t* sys_index;
dtuple_t* tuple;
mem_heap_t* heap;
dfield_t* dfield;
const rec_t* rec;
const byte* field;
ulint len;
mtr_t mtr;
dict_sys.assert_locked();
heap = mem_heap_create(1000);
mtr_start(&mtr);
sys_tables = dict_table_get_low("SYS_TABLES");
sys_index = UT_LIST_GET_FIRST(sys_tables->indexes);
ut_ad(!dict_table_is_comp(sys_tables));
tuple = dtuple_create(heap, 1);
dfield = dtuple_get_nth_field(tuple, 0);
dfield_set_data(dfield, name, strlen(name));
dict_index_copy_types(tuple, sys_index, 1);
btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
BTR_SEARCH_LEAF, &pcur, &mtr);
loop:
rec = btr_pcur_get_rec(&pcur);
if (!btr_pcur_is_on_user_rec(&pcur)) {
/* Not found */
btr_pcur_close(&pcur);
mtr_commit(&mtr);
mem_heap_free(heap);
return(NULL);
}
field = rec_get_nth_field_old(
rec, DICT_FLD__SYS_TABLES__NAME, &len);
if (len < strlen(name)
|| memcmp(name, field, strlen(name))) {
/* Not found */
btr_pcur_close(&pcur);
mtr_commit(&mtr);
mem_heap_free(heap);
return(NULL);
}
if (!rec_get_deleted_flag(rec, 0)) {
/* We found one */
char* table_name = mem_strdupl((char*) field, len);
btr_pcur_close(&pcur);
mtr_commit(&mtr);
mem_heap_free(heap);
return(table_name);
}
btr_pcur_move_to_next_user_rec(&pcur, &mtr);
goto loop;
}
/********************************************************************//**
This function gets the next system table record as it scans the table.
@return the next record if found, NULL if end of scan */

42
storage/innobase/fil/fil0fil.cc

@ -756,9 +756,12 @@ inline pfs_os_file_t fil_node_t::close_to_free(bool detach_handle)
return OS_FILE_CLOSED;
}
/** Detach a tablespace from the cache and close the files. */
std::vector<pfs_os_file_t> fil_system_t::detach(fil_space_t *space,
bool detach_handle)
/** Detach a tablespace from the cache and close the files.
@param space tablespace
@param detach_handle whether to detach the handle, instead of closing
@return detached handle
@retval OS_FILE_CLOSED if no handle was detached */
pfs_os_file_t fil_system_t::detach(fil_space_t *space, bool detach_handle)
{
mysql_mutex_assert_owner(&fil_system.mutex);
HASH_DELETE(fil_space_t, hash, &spaces, space->id, space);
@ -791,19 +794,17 @@ std::vector<pfs_os_file_t> fil_system_t::detach(fil_space_t *space,
n_open--;
}
std::vector<pfs_os_file_t> handles;
handles.reserve(UT_LIST_GET_LEN(space->chain));
ut_ad(!detach_handle || space->id);
ut_ad(!detach_handle || UT_LIST_GET_LEN(space->chain) <= 1);
pfs_os_file_t handle= OS_FILE_CLOSED;
for (fil_node_t* node= UT_LIST_GET_FIRST(space->chain); node;
node= UT_LIST_GET_NEXT(chain, node))
{
auto handle= node->close_to_free(detach_handle);
if (handle != OS_FILE_CLOSED)
handles.push_back(handle);
}
handle= node->close_to_free(detach_handle);
ut_ad(!space->referenced());
return handles;
return handle;
}
/** Free a tablespace object on which fil_system_t::detach() was invoked.
@ -1567,11 +1568,12 @@ fil_space_t *fil_space_t::check_pending_operations(ulint id)
mysql_mutex_lock(&fil_system.mutex);
fil_space_t *space= fil_space_get_by_id(id);
if (space)
if (!space);
else if (space->pending() & STOPPING)
space= nullptr;
else
{
space->reacquire();
ut_ad(!(space->pending() & STOPPING));
if (space->crypt_data)
{
mysql_mutex_unlock(&fil_system.mutex);
@ -1644,13 +1646,13 @@ void fil_close_tablespace(ulint id)
/** Delete a tablespace and associated .ibd file.
@param[in] id tablespace identifier
@param[in] if_exists whether to ignore missing tablespace
@param[in,out] detached_handles return detached handles if not nullptr
@param[out] detached deatched file handle (if closing is not wanted)
@return DB_SUCCESS or error */
dberr_t fil_delete_tablespace(ulint id, bool if_exists,
std::vector<pfs_os_file_t>* detached_handles)
pfs_os_file_t *detached)
{
ut_ad(!is_system_tablespace(id));
ut_ad(!detached_handles || detached_handles->empty());
ut_ad(!detached || *detached == OS_FILE_CLOSED);
dberr_t err;
fil_space_t *space = fil_space_t::check_pending_operations(id);
@ -1728,9 +1730,9 @@ func_exit:
ut_a(space == fil_space_get_by_id(id));
ut_a(!space->referenced());
ut_a(UT_LIST_GET_LEN(space->chain) == 1);
auto handles = fil_system.detach(space, detached_handles != nullptr);
if (detached_handles) {
*detached_handles = std::move(handles);
pfs_os_file_t handle = fil_system.detach(space, detached != nullptr);
if (detached) {
*detached = handle;
}
mysql_mutex_unlock(&fil_system.mutex);

426
storage/innobase/handler/ha_innodb.cc

@ -1272,16 +1272,291 @@ innobase_commit_by_xid(
handlerton* hton, /*!< in: InnoDB handlerton */
XID* xid); /*!< in: X/Open XA transaction
identification */
/** Ignore FOREIGN KEY constraints that would be violated by DROP DATABASE */
static ibool innodb_drop_database_ignore_fk(void*,void*) { return false; }
/** FOREIGN KEY error reporting context for DROP DATABASE */
struct innodb_drop_database_fk_report
{
/** database name, with trailing '/' */
const st_::span<char> name;
/** whether errors were found */
bool violated;
};
/** Report FOREIGN KEY constraints that would be violated by DROP DATABASE
@return whether processing should continue */
static ibool innodb_drop_database_fk(void *node, void *report)
{
auto s= static_cast<sel_node_t*>(node);
auto r= static_cast<innodb_drop_database_fk_report*>(report);
const dfield_t *name= que_node_get_val(s->select_list);
ut_ad(name->type.mtype == DATA_VARCHAR);
if (name->len == UNIV_SQL_NULL || name->len <= r->name.size() ||
memcmp(static_cast<const char*>(name->data), r->name.data(), name->len))
return false; /* End of matches */
node= que_node_get_next(s->select_list);
const dfield_t *id= que_node_get_val(node);
ut_ad(id->type.mtype == DATA_BINARY);
ut_ad(!que_node_get_next(node));
if (id->len != UNIV_SQL_NULL)
sql_print_error("DROP DATABASE: table %.*s is referenced"
" by FOREIGN KEY %.*s",
static_cast<int>(name->len),
static_cast<const char*>(name->data),
static_cast<int>(id->len),
static_cast<const char*>(id->data));
else
ut_ad("corrupted SYS_FOREIGN record" == 0);
return true;
}
/** Remove all tables in the named database inside InnoDB.
@param[in] hton handlerton from InnoDB
@param[in] path Database path; Inside InnoDB the name of the last
directory in the path is used as the database name.
For example, in 'mysql/data/test' the database name is 'test'. */
static
void
innobase_drop_database(
handlerton* hton,
char* path);
@param path database path */
static void innodb_drop_database(handlerton*, char *path)
{
if (high_level_read_only)
return;
ulint len= 0;
char *ptr;
for (ptr= strend(path) - 2; ptr >= path &&
#ifdef _WIN32
*ptr != '\\' &&
#endif
*ptr != '/'; ptr--)
len++;
ptr++;
char *namebuf= static_cast<char*>
(my_malloc(PSI_INSTRUMENT_ME, len + 2, MYF(0)));
if (!namebuf)
return;
memcpy(namebuf, ptr, len);
namebuf[len] = '/';
namebuf[len + 1] = '\0';
#ifdef _WIN32
innobase_casedn_str(namebuf);
#endif /* _WIN32 */
trx_t *trx= innobase_trx_allocate(current_thd);
retry:
row_mysql_lock_data_dictionary(trx);
for (auto i= dict_sys.table_id_hash.n_cells; i--; )
{
for (dict_table_t *table= static_cast<dict_table_t*>
(dict_sys.table_id_hash.array[i].node); table; table= table->id_hash)
{
ut_ad(table->cached);
if (!strncmp(table->name.m_name, namebuf, len) &&
!dict_stats_stop_bg(table))
{
row_mysql_unlock_data_dictionary(trx);
std::this_thread::sleep_for(std::chrono::milliseconds(250));
goto retry;
}
}
}
dberr_t err= DB_SUCCESS;
for (auto i= dict_sys.table_id_hash.n_cells; i--; )
{
for (dict_table_t *next, *table= static_cast<dict_table_t*>
(dict_sys.table_id_hash.array[i].node); table; table= next)
{
ut_ad(table->cached);
next= table->id_hash;
if (strncmp(table->name.m_name, namebuf, len + 1))
continue;
const auto n_handles= table->get_ref_count();
const bool locks= !n_handles && lock_table_has_locks(table);
const auto n_fk_checks= table->n_foreign_key_checks_running;
if (n_fk_checks || n_handles || locks)
{
err= DB_ERROR;
ib::error errmsg;
errmsg << "DROP DATABASE: cannot DROP TABLE " << table->name;
if (n_fk_checks)
errmsg << " due to " << n_fk_checks << " FOREIGN KEY checks";
else if (n_handles)
errmsg << " due to " << n_handles << " open handles";
else
errmsg << " due to locks";
continue;
}
dict_sys.remove(table);
}
}
static const char drop_database[] =
"PROCEDURE DROP_DATABASE_PROC () IS\n"
"fk CHAR;\n"
"name CHAR;\n"
"tid CHAR;\n"
"iid CHAR;\n"
"DECLARE FUNCTION fk_report;\n"
"DECLARE CURSOR fkf IS\n"
"SELECT ID FROM SYS_FOREIGN WHERE FOR_NAME >= :db FOR UPDATE\n"
"ORDER BY FOR_NAME;\n"
"DECLARE CURSOR fkr IS\n"
"SELECT REF_NAME,ID FROM SYS_FOREIGN WHERE REF_NAME >= :db FOR UPDATE\n"
"ORDER BY REF_NAME;\n"
"DECLARE CURSOR tab IS\n"
"SELECT ID,NAME FROM SYS_TABLES WHERE NAME >= :db FOR UPDATE;\n"
"DECLARE CURSOR idx IS\n"
"SELECT ID FROM SYS_INDEXES WHERE TABLE_ID = tid FOR UPDATE;\n"
"BEGIN\n"
"OPEN fkf;\n"
"WHILE 1 = 1 LOOP\n"
" FETCH fkf INTO fk;\n"
" IF (SQL % NOTFOUND) THEN EXIT; END IF;\n"
" IF SUBSTR(fk, 0, LENGTH(:db)) <> :db THEN EXIT; END IF;\n"
" DELETE FROM SYS_FOREIGN_COLS WHERE ID=fk;\n"
" DELETE FROM SYS_FOREIGN WHERE FOR_NAME=fk;\n"
"END LOOP;\n"
"CLOSE fkf;\n"
"OPEN fkr;\n"
"FETCH fkr INTO fk_report();\n"
"CLOSE fkr;\n"
"OPEN tab;\n"
"WHILE 1 = 1 LOOP\n"
" FETCH tab INTO tid,name;\n"
" IF (SQL % NOTFOUND) THEN EXIT; END IF;\n"
" IF SUBSTR(name, 0, LENGTH(:db)) <> :db THEN EXIT; END IF;\n"
" DELETE FROM SYS_COLUMNS WHERE TABLE_ID=tid;\n"
" DELETE FROM SYS_TABLES WHERE ID=tid;\n"
" OPEN idx;\n"
" WHILE 1 = 1 LOOP\n"
" FETCH idx INTO iid;\n"
" IF (SQL % NOTFOUND) THEN EXIT; END IF;\n"
" DELETE FROM SYS_FIELDS WHERE INDEX_ID=iid;\n"
" DELETE FROM SYS_INDEXES WHERE CURRENT OF idx;\n"
" END LOOP;\n"
" CLOSE idx;\n"
"END LOOP;\n"
"CLOSE tab;\n"
"END;\n";
innodb_drop_database_fk_report report{{namebuf, len + 1}, false};
trx_start_for_ddl(trx);
if (err == DB_SUCCESS)
{
pars_info_t* pinfo = pars_info_create();
pars_info_bind_function(pinfo, "fk_report", trx->check_foreigns
? innodb_drop_database_fk
: innodb_drop_database_ignore_fk, &report);
pars_info_add_str_literal(pinfo, "db", namebuf);
err= que_eval_sql(pinfo, drop_database, false, trx);
if (err == DB_SUCCESS && report.violated)
err= DB_CANNOT_DROP_CONSTRAINT;
}
const trx_id_t trx_id= trx->id;
if (err != DB_SUCCESS)
{
trx->rollback();
namebuf[len] = '\0';
ib::error() << "DROP DATABASE " << namebuf << ": " << err;
}
else
trx_commit_for_mysql(trx);
row_mysql_unlock_data_dictionary(trx);
trx->free();
if (err == DB_SUCCESS)
{
/* Eventually after the DELETE FROM SYS_INDEXES was committed,
purge would invoke dict_drop_index_tree() to delete the associated
tablespaces. Because the SQL layer expects the directory to be empty,
we will "manually" purge the tablespaces that belong to the
records that we delete-marked. */
mem_heap_t *heap= mem_heap_create(100);
dtuple_t *tuple= dtuple_create(heap, 1);
dfield_t *dfield= dtuple_get_nth_field(tuple, 0);
dict_index_t* sys_index= UT_LIST_GET_FIRST(dict_sys.sys_tables->indexes);
btr_pcur_t pcur;
namebuf[len++]= '/';
dfield_set_data(dfield, namebuf, len);
dict_index_copy_types(tuple, sys_index, 1);
std::vector<pfs_os_file_t> to_close;
mtr_t mtr;
mtr.start();
for (btr_pcur_open_on_user_rec(sys_index, tuple, PAGE_CUR_GE,
BTR_SEARCH_LEAF, &pcur, &mtr);
btr_pcur_is_on_user_rec(&pcur);
btr_pcur_move_to_next_user_rec(&pcur, &mtr))
{
const rec_t *rec= btr_pcur_get_rec(&pcur);
if (rec_get_n_fields_old(rec) != DICT_NUM_FIELDS__SYS_TABLES)
{
ut_ad("corrupted SYS_TABLES record" == 0);
break;
}
if (!rec_get_deleted_flag(rec, false))
continue;
ulint flen;
static_assert(DICT_FLD__SYS_TABLES__NAME == 0, "compatibility");
rec_get_nth_field_offs_old(rec, 0, &flen);
if (flen == UNIV_SQL_NULL || flen <= len || memcmp(rec, namebuf, len))
/* We ran out of tables that had existed in the database. */
break;
const byte *db_trx_id=
rec_get_nth_field_old(rec, DICT_FLD__SYS_TABLES__DB_TRX_ID, &flen);
if (flen != 6)
{
ut_ad("corrupted SYS_TABLES.SPACE" == 0);
break;
}
if (mach_read_from_6(db_trx_id) != trx_id)
/* This entry was modified by some other transaction than us.
Unfortunately, because SYS_TABLES.NAME is the PRIMARY KEY,
we cannot distinguish RENAME and DROP here. It is possible
that the table had been renamed to some other database. */
continue;
const byte *s=
rec_get_nth_field_old(rec, DICT_FLD__SYS_TABLES__SPACE, &flen);
if (flen != 4)
ut_ad("corrupted SYS_TABLES.SPACE" == 0);
else if (uint32_t space_id= mach_read_from_4(s))
{
pfs_os_file_t detached= OS_FILE_CLOSED;
fil_delete_tablespace(space_id, true, &detached);
if (detached != OS_FILE_CLOSED)
to_close.emplace_back(detached);
}
}
mtr.commit();
mem_heap_free(heap);
for (pfs_os_file_t detached : to_close)
os_file_close(detached);
}
my_free(namebuf);
}
/** Shut down the InnoDB storage engine.
@return 0 */
@ -3661,7 +3936,7 @@ static int innodb_init(void* p)
innobase_hton->commit_checkpoint_request = innodb_log_flush_request;
innobase_hton->create = innobase_create_handler;
innobase_hton->drop_database = innobase_drop_database;
innobase_hton->drop_database = innodb_drop_database;
innobase_hton->panic = innobase_end;
innobase_hton->pre_shutdown = innodb_preshutdown;
@ -12956,18 +13231,6 @@ ha_innobase::discard_or_import_tablespace(
DBUG_RETURN(0);
}
/**
@return 1 if frm file exists
@return 0 if it doesn't exists
*/
static bool frm_file_exists(const char *path)
{
char buff[FN_REFLEN];
strxnmov(buff, FN_REFLEN, path, reg_ext, NullS);
return !access(buff, F_OK);
}
/**
Drops a table from an InnoDB database. Before calling this function,
@ -13061,67 +13324,6 @@ inline int ha_innobase::delete_table(const char* name, enum_sql_command sqlcom)
}
}
if (err == DB_TABLE_NOT_FOUND &&
frm_file_exists(name))
{
/* Test to drop all tables which matches db/tablename + '#'.
Only partitions can have '#' as non-first character in
the table name!
Temporary table names always start with '#', partitions are
the only 'tables' that can have '#' after the first character
and table name must have length > 0. User tables cannot have
'#' since it would be translated to @0023. Therefor this should
only match partitions. */
uint len = (uint) strlen(norm_name);
ulint num_partitions;
ut_a(len < FN_REFLEN);
norm_name[len] = '#';
norm_name[len + 1] = 0;
err = row_drop_database_for_mysql(norm_name, trx,
&num_partitions);
norm_name[len] = 0;
table_name_t tbl_name(norm_name);
if (num_partitions == 0 && !tbl_name.is_temporary()) {
ib::error() << "Table " << tbl_name <<
" does not exist in the InnoDB"
" internal data dictionary though MariaDB is"
" trying to drop it. Have you copied the .frm"
" file of the table to the MariaDB database"
" directory from another database? "
<< TROUBLESHOOTING_MSG;
}
if (num_partitions == 0) {
err = DB_TABLE_NOT_FOUND;
}
}
if (err == DB_TABLE_NOT_FOUND
&& innobase_get_lower_case_table_names() == 1) {
char* is_part = is_partition(norm_name);
if (is_part != NULL) {
char par_case_name[FN_REFLEN];
#ifndef _WIN32
/* Check for the table using lower
case name, including the partition
separator "P" */
strcpy(par_case_name, norm_name);
innobase_casedn_str(par_case_name);
#else
/* On Windows platfrom, check
whether there exists table name in
system table whose name is
not being normalized to lower case */
create_table_info_t::normalize_table_name_low(
par_case_name, name, FALSE);
#endif /* _WIN32 */
err = row_drop_table_for_mysql(
par_case_name, trx, sqlcom, true);
}
}
ut_ad(!srv_read_only_mode);
innobase_commit_low(trx);
@ -13158,64 +13360,6 @@ int ha_innobase::delete_table(const char* name)
return delete_table(name, sqlcom);
}
/** Remove all tables in the named database inside InnoDB.
@param[in] hton handlerton from InnoDB
@param[in] path Database path; Inside InnoDB the name of the last
directory in the path is used as the database name.
For example, in 'mysql/data/test' the database name is 'test'. */
static
void
innobase_drop_database(
handlerton* hton,
char* path)
{
char* namebuf;
/* Get the transaction associated with the current thd, or create one
if not yet created */
DBUG_ASSERT(hton == innodb_hton_ptr);
if (high_level_read_only) {
return;
}
THD* thd = current_thd;
ulint len = 0;
char* ptr = strend(path) - 2;
while (ptr >= path && *ptr != '\\' && *ptr != '/') {
ptr--;
len++;
}
ptr++;
namebuf = (char*) my_malloc(PSI_INSTRUMENT_ME, (uint) len + 2, MYF(0));
memcpy(namebuf, ptr, len);
namebuf[len] = '/';
namebuf[len + 1] = '\0';
#ifdef _WIN32
innobase_casedn_str(namebuf);
#endif /* _WIN32 */
trx_t* trx = innobase_trx_allocate(thd);
trx->will_lock = true;
ulint dummy;
row_drop_database_for_mysql(namebuf, trx, &dummy);
my_free(namebuf);
innobase_commit_low(trx);
trx->free();
}
/** Rename an InnoDB table.
@param[in,out] trx InnoDB data dictionary transaction
@param[in] from old table name

10
storage/innobase/include/dict0crea.h

@ -101,11 +101,11 @@ dict_create_index_tree(
/** Drop the index tree associated with a row in SYS_INDEXES table.
@param[in,out] pcur persistent cursor on rec
@param[in,out] trx dictionary transaction
@param[in,out] table table that the record belongs to
@param[in,out] mtr mini-transaction */
void dict_drop_index_tree(btr_pcur_t *pcur, trx_t *trx, dict_table_t *table,
mtr_t *mtr)
MY_ATTRIBUTE((nonnull(1,4)));
@param[in,out] mtr mini-transaction
@return tablespace ID to drop (if this is the clustered index)
@retval 0 if no tablespace is to be dropped */
uint32_t dict_drop_index_tree(btr_pcur_t *pcur, trx_t *trx, mtr_t *mtr)
MY_ATTRIBUTE((nonnull(1,3), warn_unused_result));
/***************************************************************//**
Creates an index tree for the index if it is not a member of a cluster.

11
storage/innobase/include/dict0load.h

@ -1,7 +1,7 @@
/*****************************************************************************
Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2020, MariaDB Corporation.
Copyright (c) 2017, 2021, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@ -66,15 +66,6 @@ was needed and force_recovery is not set.
We also scan the biggest space id, and store it to fil_system. */
void dict_check_tablespaces_and_store_max_id();
/********************************************************************//**
Finds the first table name in the given database.
@return own: table name, NULL if does not exist; the caller must free
the memory in the string! */
char*
dict_get_first_table_name_in_db(
/*============================*/
const char* name); /*!< in: database name which ends to '/' */
/** Make sure the data_file_name is saved in dict_table_t if needed.
@param[in] table Table object
@param[in] dict_mutex_own true if dict_sys.mutex is owned already */

8
storage/innobase/include/dict0mem.h

@ -2058,12 +2058,12 @@ private:
public:
/** Id of the table. */
table_id_t id;
/** Hash chain node. */
hash_node_t id_hash;
/** dict_sys.id_hash chain node */
dict_table_t* id_hash;
/** Table name. */
table_name_t name;
/** Hash chain node. */
hash_node_t name_hash;
/** dict_sys.name_hash chain node */
dict_table_t* name_hash;
/** Memory heap */
mem_heap_t* heap;

20
storage/innobase/include/fil0fil.h

@ -352,8 +352,11 @@ struct fil_space_t final
ut_ad(!latch_count);
latch.destroy();
}
ulint id; /*!< space id */
hash_node_t hash; /*!< hash chain node */
/** fil_system.spaces chain node */
fil_space_t *hash;
lsn_t max_lsn;
/*!< LSN of the most recent
fil_names_write_if_was_clean().
@ -1434,10 +1437,10 @@ public:
public:
/** Detach a tablespace from the cache and close the files.
@param space tablespace
@param detach_handle whether to detach or close handles
@return detached handles or empty vector */
std::vector<pfs_os_file_t> detach(fil_space_t *space,
bool detach_handle= false);
@param detach_handle whether to detach the handle, instead of closing
@return detached handle
@retval OS_FILE_CLOSED if no handle was detached */
pfs_os_file_t detach(fil_space_t *space, bool detach_handle= false);
/** the mutex protecting most data fields, and some fields of fil_space_t */
mysql_mutex_t mutex;
@ -1596,11 +1599,10 @@ MY_ATTRIBUTE((warn_unused_result));
/** Delete a tablespace and associated .ibd file.
@param[in] id tablespace identifier
@param[in] if_exists whether to ignore missing tablespace
@param[out] leaked_handles return detached handles here
@param[out] detached deatched file handle (if closing is not wanted)
@return DB_SUCCESS or error */
dberr_t
fil_delete_tablespace(ulint id, bool if_exists= false,
std::vector<pfs_os_file_t> *detached_handles= nullptr);
dberr_t fil_delete_tablespace(ulint id, bool if_exists= false,
pfs_os_file_t *detached= nullptr);
/** Close a single-table tablespace on failed IMPORT TABLESPACE.
The tablespace must be cached in the memory cache.

1
storage/innobase/include/hash0hash.h

@ -32,7 +32,6 @@ struct hash_table_t;
struct hash_cell_t{
void* node; /*!< hash chain node, NULL if none */
};
typedef void* hash_node_t;
/*******************************************************************//**
Inserts a struct to a hash table. */

3
storage/innobase/include/lock0lock.h

@ -412,6 +412,9 @@ lock_rec_unlock(
and release possible other transactions waiting because of these locks. */
void lock_release(trx_t* trx);
/** Release locks on a table whose creation is being rolled back */
ATTRIBUTE_COLD void lock_release_on_rollback(trx_t *trx, dict_table_t *table);
/**********************************************************************//**
Looks for a set bit in a record lock bitmap. Returns ULINT_UNDEFINED,
if none found.

11
storage/innobase/include/row0mysql.h

@ -451,17 +451,6 @@ row_import_tablespace_for_mysql(
row_prebuilt_t* prebuilt) /*!< in: prebuilt struct in MySQL */
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Drop a database for MySQL.
@param[in] name database name which ends at '/'
@param[in] trx transaction handle
@param[out] found number of dropped tables/partitions
@return error code or DB_SUCCESS */
dberr_t
row_drop_database_for_mysql(
const char* name,
trx_t* trx,
ulint* found);
/*********************************************************************//**
Renames a table for MySQL.
@return error code or DB_SUCCESS */

30
storage/innobase/lock/lock0lock.cc

@ -3865,6 +3865,36 @@ released:
#endif
}
/** Release locks on a table whose creation is being rolled back */
ATTRIBUTE_COLD void lock_release_on_rollback(trx_t *trx, dict_table_t *table)
{
trx->mod_tables.erase(table);
lock_sys.wr_lock(SRW_LOCK_CALL);
trx->mutex_lock();
for (lock_t *next, *lock= UT_LIST_GET_FIRST(table->locks); lock; lock= next)
{
next= UT_LIST_GET_NEXT(un_member.tab_lock.locks, lock);
ut_ad(lock->trx == trx);
UT_LIST_REMOVE(trx->lock.trx_locks, lock);
ut_list_remove(table->locks, lock, TableLockGetNode());
}
for (lock_t *p, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock; lock= p)
{
p= UT_LIST_GET_PREV(trx_locks, lock);
ut_ad(lock->trx == trx);
if (lock->is_table())
ut_ad(lock->un_member.tab_lock.table != table);
else if (lock->index->table == table)
lock_rec_dequeue_from_page(lock, false);
}
lock_sys.wr_unlock();
trx->mutex_unlock();
}
/*********************************************************************//**
Removes table locks of the transaction on a table to be dropped. */
static

249
storage/innobase/row/row0mysql.cc

@ -3266,7 +3266,7 @@ row_drop_table_for_mysql(
DBUG_RETURN(DB_TABLE_NOT_FOUND);
}
std::vector<pfs_os_file_t> detached_handles;
pfs_os_file_t detached_handle = OS_FILE_CLOSED;
const bool is_temp_name = strstr(table->name.m_name,
"/" TEMP_FILE_PREFIX);
@ -3620,7 +3620,7 @@ do_drop:
if (space->id != TRX_SYS_SPACE) {
err = fil_delete_tablespace(space->id, false,
&detached_handles);
&detached_handle);
}
break;
@ -3700,9 +3700,8 @@ funct_exit_all_freed:
row_mysql_unlock_data_dictionary(trx);
}
for (const auto& handle : detached_handles) {
ut_ad(handle != OS_FILE_CLOSED);
os_file_close(handle);
if (detached_handle != OS_FILE_CLOSED) {
os_file_close(detached_handle);
}
trx->op_info = "";
@ -3710,246 +3709,6 @@ funct_exit_all_freed:
DBUG_RETURN(err);
}
/*******************************************************************//**
Drop all foreign keys in a database, see Bug#18942.
Called at the end of row_drop_database_for_mysql().
@return error code or DB_SUCCESS */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
drop_all_foreign_keys_in_db(
/*========================*/
const char* name, /*!< in: database name which ends to '/' */
trx_t* trx) /*!< in: transaction handle */
{
pars_info_t* pinfo;
dberr_t err;
ut_a(name[strlen(name) - 1] == '/');
pinfo = pars_info_create();
pars_info_add_str_literal(pinfo, "dbname", name);
/** true if for_name is not prefixed with dbname */
#define TABLE_NOT_IN_THIS_DB \
"SUBSTR(for_name, 0, LENGTH(:dbname)) <> :dbname"
err = que_eval_sql(pinfo,
"PROCEDURE DROP_ALL_FOREIGN_KEYS_PROC () IS\n"
"foreign_id CHAR;\n"
"for_name CHAR;\n"
"found INT;\n"
"DECLARE CURSOR cur IS\n"
"SELECT ID, FOR_NAME FROM SYS_FOREIGN\n"
"WHERE FOR_NAME >= :dbname\n"
"LOCK IN SHARE MODE\n"
"ORDER BY FOR_NAME;\n"
"BEGIN\n"
"found := 1;\n"
"OPEN cur;\n"
"WHILE found = 1 LOOP\n"
" FETCH cur INTO foreign_id, for_name;\n"
" IF (SQL % NOTFOUND) THEN\n"
" found := 0;\n"
" ELSIF (" TABLE_NOT_IN_THIS_DB ") THEN\n"
" found := 0;\n"
" ELSIF (1=1) THEN\n"
" DELETE FROM SYS_FOREIGN_COLS\n"
" WHERE ID = foreign_id;\n"
" DELETE FROM SYS_FOREIGN\n"
" WHERE ID = foreign_id;\n"
" END IF;\n"
"END LOOP;\n"
"CLOSE cur;\n"
"COMMIT WORK;\n"
"END;\n",
FALSE, /* do not reserve dict mutex,
we are already holding it */
trx);
return(err);
}
/** Drop a database for MySQL.
@param[in] name database name which ends at '/'
@param[in] trx transaction handle
@param[out] found number of dropped tables/partitions
@return error code or DB_SUCCESS */
dberr_t
row_drop_database_for_mysql(
const char* name,
trx_t* trx,
ulint* found)
{
dict_table_t* table;
char* table_name;
dberr_t err = DB_SUCCESS;
ulint namelen = strlen(name);
bool is_partition = false;
ut_ad(found != NULL);
DBUG_ENTER("row_drop_database_for_mysql");
DBUG_PRINT("row_drop_database_for_mysql", ("db: '%s'", name));
ut_a(name != NULL);
/* Assert DB name or partition name. */
if (name[namelen - 1] == '#') {
ut_ad(name[namelen - 2] != '/');
is_partition = true;
trx->op_info = "dropping partitions";
} else {
ut_a(name[namelen - 1] == '/');
trx->op_info = "dropping database";
}
*found = 0;
trx->dict_operation = true;
trx_start_if_not_started_xa(trx, true);
loop:
row_mysql_lock_data_dictionary(trx);
while ((table_name = dict_get_first_table_name_in_db(name))) {
/* Drop parent table if it is a fts aux table, to
avoid accessing dropped fts aux tables in information
scheam when parent table still exists.
Note: Drop parent table will drop fts aux tables. */
char* parent_table_name = NULL;
table_id_t table_id;
index_id_t index_id;
if (fts_check_aux_table(
table_name, &table_id, &index_id)) {
dict_table_t* parent_table = dict_table_open_on_id(
table_id, TRUE, DICT_TABLE_OP_NORMAL);
if (parent_table != NULL) {
parent_table_name = mem_strdupl(
parent_table->name.m_name,
strlen(parent_table->name.m_name));
dict_table_close(parent_table, TRUE, FALSE);
}
}
if (parent_table_name != NULL) {
ut_free(table_name);
table_name = parent_table_name;
}
ut_a(memcmp(table_name, name, namelen) == 0);
table = dict_table_open_on_name(
table_name, TRUE, FALSE, static_cast<dict_err_ignore_t>(
DICT_ERR_IGNORE_INDEX_ROOT
| DICT_ERR_IGNORE_CORRUPT));
if (!table) {
ib::error() << "Cannot load table " << table_name
<< " from InnoDB internal data dictionary"
" during drop database";
ut_free(table_name);
err = DB_TABLE_NOT_FOUND;
break;
}
if (!table->name.is_temporary()) {
/* There could be orphan temp tables left from
interrupted alter table. Leave them, and handle
the rest.*/
if (table->can_be_evicted
&& (name[namelen - 1] != '#')) {
ib::warn() << "Orphan table encountered during"
" DROP DATABASE. This is possible if '"
<< table->name << ".frm' was lost.";
}
if (!table->is_readable() && !table->space) {
ib::warn() << "Missing .ibd file for table "
<< table->name << ".";
}
}
dict_table_close(table, TRUE, FALSE);
/* The dict_table_t object must not be accessed before
dict_table_open() or after dict_table_close() while
not holding dict_sys.mutex. */
dict_sys.assert_locked();
/* Disable statistics on the found table. */
if (!dict_stats_stop_bg(table)) {
row_mysql_unlock_data_dictionary(trx);
std::this_thread::sleep_for(
std::chrono::milliseconds(250));
ut_free(table_name);
goto loop;
}
/* Wait until MySQL does not have any queries running on
the table */
if (table->get_ref_count() > 0) {
row_mysql_unlock_data_dictionary(trx);
ib::warn() << "MySQL is trying to drop database "
<< ut_get_name(trx, name) << " though"
" there are still open handles to table "
<< table->name << ".";
std::this_thread::sleep_for(std::chrono::seconds(1));
ut_free(table_name);
goto loop;
}
err = row_drop_table_for_mysql(
table_name, trx, SQLCOM_DROP_DB);
trx_commit_for_mysql(trx);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
ib::error() << "DROP DATABASE "
<< ut_get_name(trx, name) << " failed"
" with error (" << err << ") for"
" table " << ut_get_name(trx, table_name);
ut_free(table_name);
break;
}
ut_free(table_name);
(*found)++;
}
/* Partitioning does not yet support foreign keys. */
if (err == DB_SUCCESS && !is_partition) {
/* after dropping all tables try to drop all leftover
foreign keys in case orphaned ones exist */
err = drop_all_foreign_keys_in_db(name, trx);
if (err != DB_SUCCESS) {
const std::string& db = ut_get_name(trx, name);
ib::error() << "DROP DATABASE " << db << " failed with"
" error " << err << " while dropping all"
" foreign keys";
}
}
trx_commit_for_mysql(trx);
row_mysql_unlock_data_dictionary(trx);
trx->op_info = "";
DBUG_RETURN(err);
}
/****************************************************************//**
Delete a single constraint.
@return error code or DB_SUCCESS */

57
storage/innobase/row/row0purge.cc

@ -48,6 +48,7 @@ Created 3/14/1997 Heikki Tuuri
#include "handler.h"
#include "ha_innodb.h"
#include "fil0fil.h"
#include <mysql/service_thd_mdl.h>
/*************************************************************************
IMPORTANT NOTE: Any operation that generates redo MUST check that there
@ -109,12 +110,16 @@ row_purge_remove_clust_if_poss_low(
index_id_t index_id = 0;
MDL_ticket* mdl_ticket = nullptr;
dict_table_t *table = nullptr;
pfs_os_file_t f = OS_FILE_CLOSED;
retry:
if (table_id) {
dict_sys.mutex_lock();
table = dict_table_open_on_id(
table_id, false, DICT_TABLE_OP_OPEN_ONLY_IF_CACHED,
table_id, true, DICT_TABLE_OP_OPEN_ONLY_IF_CACHED,
node->purge_thd, &mdl_ticket);
if (table && table->n_rec_locks) {
if (!table) {
dict_sys.mutex_unlock();
} else if (table->n_rec_locks) {
for (dict_index_t* ind = UT_LIST_GET_FIRST(
table->indexes); ind;
ind = UT_LIST_GET_NEXT(indexes, ind)) {
@ -128,16 +133,19 @@ retry:
mtr.start();
index->set_modified(mtr);
log_free_check();
bool success = true;
if (!row_purge_reposition_pcur(mode, node, &mtr)) {
/* The record was already removed. */
removed:
mtr.commit();
close_and_exit:
if (table) {
dict_table_close(table, false, false,
dict_table_close(table, true, false,
node->purge_thd, mdl_ticket);
dict_sys.mutex_unlock();
}
return true;
return success;
}
if (node->table->id == DICT_INDEXES_ID) {
@ -155,8 +163,39 @@ removed:
}
ut_ad("corrupted SYS_INDEXES record" == 0);
}
dict_drop_index_tree(&node->pcur, nullptr, table, &mtr);
if (const uint32_t space_id = dict_drop_index_tree(
&node->pcur, nullptr, &mtr)) {
if (table) {
if (table->release()) {
dict_sys.remove(table);
} else if (table->space_id == space_id) {
table->space = nullptr;
table->file_unreadable = true;
}
table = nullptr;
dict_sys.mutex_unlock();
if (!mdl_ticket);
else if (MDL_context* mdl_context =
static_cast<MDL_context*>(
thd_mdl_context(node->
purge_thd))) {
mdl_context->release_lock(mdl_ticket);
mdl_ticket = nullptr;
}
}
fil_delete_tablespace(space_id, true, &f);
}
mtr.commit();
if (table) {
dict_table_close(table, true, false,
node->purge_thd, mdl_ticket);
dict_sys.mutex_unlock();
table = nullptr;
}
mtr.start();
index->set_modified(mtr);
@ -172,7 +211,6 @@ removed:
rec_offs* offsets = rec_get_offsets(rec, index, offsets_,
index->n_core_fields,
ULINT_UNDEFINED, &heap);
bool success = true;
if (node->roll_ptr != row_get_rec_roll_ptr(rec, index, offsets)) {
/* Someone else has modified the record later: do not remove */
@ -217,12 +255,7 @@ func_exit:
mtr_commit(&mtr);
}
if (UNIV_LIKELY_NULL(table)) {
dict_table_close(table, false, false, node->purge_thd,
mdl_ticket);
}
return(success);
goto close_and_exit;
}
/***********************************************************//**

38
storage/innobase/row/row0uins.cc

@ -44,6 +44,7 @@ Created 2/25/1997 Heikki Tuuri
#include "ibuf0ibuf.h"
#include "log0log.h"
#include "fil0fil.h"
#include <mysql/service_thd_mdl.h>
/*************************************************************************
IMPORTANT NOTE: Any operation that generates redo MUST check that there
@ -152,8 +153,41 @@ restart:
}
ut_ad("corrupted SYS_INDEXES record" == 0);
}
dict_drop_index_tree(&node->pcur, node->trx,
table, &mtr);
if (const uint32_t space_id = dict_drop_index_tree(
&node->pcur, node->trx, &mtr)) {
if (table) {
lock_release_on_rollback(node->trx,
table);
if (!dict_locked) {
dict_sys.mutex_lock();
}
if (table->release()) {
dict_sys.remove(table);
} else if (table->space_id
== space_id) {
table->space = nullptr;
table->file_unreadable = true;
}
if (!dict_locked) {
dict_sys.mutex_unlock();
}
table = nullptr;
if (!mdl_ticket);
else if (MDL_context* mdl_context =
static_cast<MDL_context*>(
thd_mdl_context(
node->trx->
mysql_thd))) {
mdl_context->release_lock(
mdl_ticket);
mdl_ticket = nullptr;
}
}
fil_delete_tablespace(space_id, true);
}
mtr.commit();
mtr.start();

Loading…
Cancel
Save